From 4aff749cb1a8e162249cedea6b34a31386ea05a6 Mon Sep 17 00:00:00 2001 From: 4am Date: Sat, 8 Sep 2018 18:28:21 -0400 Subject: [PATCH] 1.0 release --- .gitignore | 2 + README.md | 80 ++++++++- a2rchery.py | 475 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 555 insertions(+), 2 deletions(-) create mode 100644 .gitignore create mode 100755 a2rchery.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e0e8c99 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +.DS_Store diff --git a/README.md b/README.md index 2e153c9..c4e6ef9 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,78 @@ -# a2rchery -A multi-purpose tool for manipulating .woz disk images +``` +$ ./a2rchery.py verify -h +usage: a2rchery verify [-h] file + +Verify file structure and metadata of a .a2r disk image (produces no output +unless a problem is found) + +positional arguments: + file .a2r disk image + +optional arguments: + -h, --help show this help message and exit + + + +$ ./a2rchery.py dump -h +usage: a2rchery dump [-h] file + +Print all available information and metadata in a .a2r disk image + +positional arguments: + file .a2r disk image + +optional arguments: + -h, --help show this help message and exit + + + +$ ./a2rchery.py edit -h +usage: a2rchery edit [-h] [-i INFO] [-m META] file + +Edit information and metadata in a .a2r disk image + +positional arguments: + file .a2r disk image (modified in place) + +optional arguments: + -h, --help show this help message and exit + -i INFO, --info INFO change information field. INFO format is "key:value". + Acceptable keys are disk_type, write_protected, + synchronized, creator, version. Other keys are + ignored. For boolean fields, use "1" or "true" or + "yes" for true, "0" or "false" or "no" for false. + -m META, --meta META change metadata field. META format is "key:value". + Standard keys are title, subtitle, publisher, + developer, copyright, version, language, requires_ram, + requires_machine, notes, side, side_name, contributor, + image_date. Other keys are allowed. + +Tips: + + - Use repeated flags to edit multiple fields at once. + - Use "key:" with no value to delete a metadata field. + - Keys are case-sensitive. + - Some values have format restrictions; read the .a2r specification. + +$ ./a2rchery.py export -h +usage: a2rchery export [-h] file + +Export (as JSON) all information and metadata from a .a2r disk image + +positional arguments: + file .a2r disk image + +optional arguments: + -h, --help show this help message and exit + +$ ./a2rchery.py import -h +usage: a2rchery import [-h] file + +Import JSON file to update metadata in a .a2r disk image + +positional arguments: + file .a2r disk image + +optional arguments: + -h, --help show this help message and exit +``` diff --git a/a2rchery.py b/a2rchery.py new file mode 100755 index 0000000..d1fec12 --- /dev/null +++ b/a2rchery.py @@ -0,0 +1,475 @@ +#!/usr/bin/env python3 + +# (c) 2018 by 4am +# MIT-licensed + +import argparse +import collections +import json +import os +import sys + +__version__ = "1.0" +__date__ = "2018-09-08" +__progname__ = "a2rchery" +__displayname__ = __progname__ + " by 4am (" + __date__ + ")" + +# chunk IDs for .a2r files +kA2R2 = b"A2R2" +kINFO = b"INFO" +kSTRM = b"STRM" +kMETA = b"META" +# other things defined in the .a2r specification +kLanguages = ("English","Spanish","French","German","Chinese","Japanese","Italian","Dutch","Portuguese","Danish","Finnish","Norwegian","Swedish","Russian","Polish","Turkish","Arabic","Thai","Czech","Hungarian","Catalan","Croatian","Greek","Hebrew","Romanian","Slovak","Ukrainian","Indonesian","Malay","Vietnamese","Other") +kRequiresRAM = ("16K","24K","32K","48K","64K","128K","256K","512K","768K","1M","1.25M","1.5M+","Unknown") +kRequiresMachine = ("2","2+","2e","2c","2e+","2gs","2c+","3","3+") +kCaptureTiming = 1 +kCaptureBits = 2 +kCaptureXTiming = 3 + +# strings and things, for print routines and error messages +sEOF = "Unexpected EOF" +sBadChunkSize = "Bad chunk size" +dNoYes = {False:"no",True:"yes"} +tQuarters = (".00",".25",".50",".75") +dTiming = {kCaptureTiming:"timing",kCaptureBits:"bits",kCaptureXTiming:"xtiming"} + +# errors that may be raised +class A2RError(Exception): pass # base class +class A2REOFError(A2RError): pass +class A2RFormatError(A2RError): pass +class A2RHeaderError(A2RError): pass +class A2RHeaderError_NoA2R2(A2RHeaderError): pass +class A2RHeaderError_NoFF(A2RHeaderError): pass +class A2RHeaderError_NoLF(A2RHeaderError): pass +class A2RINFOFormatError(A2RFormatError): pass +class A2RINFOFormatError_BadVersion(A2RINFOFormatError): pass +class A2RINFOFormatError_BadDiskType(A2RINFOFormatError): pass +class A2RINFOFormatError_BadWriteProtected(A2RINFOFormatError): pass +class A2RINFOFormatError_BadSynchronized(A2RINFOFormatError): pass +class A2RINFOFormatError_BadCleaned(A2RINFOFormatError): pass +class A2RINFOFormatError_BadCreator(A2RINFOFormatError): pass +class A2RSTRMFormatError(A2RFormatError): pass +class A2RMETAFormatError(A2RFormatError): pass +class A2RMETAFormatError_DuplicateKey(A2RFormatError): pass +class A2RMETAFormatError_BadValue(A2RFormatError): pass +class A2RMETAFormatError_BadLanguage(A2RFormatError): pass +class A2RMETAFormatError_BadRAM(A2RFormatError): pass +class A2RMETAFormatError_BadMachine(A2RFormatError): pass + +class A2RParseError(A2RError): + pass + +def from_uint32(b): + return int.from_bytes(b, byteorder="little") +from_uint16=from_uint32 + +def to_uint32(b): + return b.to_bytes(4, byteorder="little") + +def to_uint16(b): + return b.to_bytes(2, byteorder="little") + +def to_uint8(b): + return b.to_bytes(1, byteorder="little") + +def raise_if(cond, e, s=""): + if cond: raise e(s) + +class DiskImage: # base class + def __init__(self, filename=None, stream=None): + raise_if(not filename and not stream, A2RError, "no input") + self.filename = filename + self.tracks = [] + + def seek(self, track_num): + """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" + return None + +class A2RValidator: + def validate_info_version(self, version): + raise_if(version != b'\x01', A2RINFOFormatError_BadVersion, "Unknown version (expected 1, found %s)" % version) + + def validate_info_disk_type(self, disk_type): + raise_if(disk_type not in (b'\x01',b'\x02'), A2RINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %s)" % disk_type) + + def validate_info_write_protected(self, write_protected): + raise_if(write_protected not in (b'\x00',b'\x01'), A2RINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %s)" % write_protected) + + def validate_info_synchronized(self, synchronized): + raise_if(synchronized not in (b'\x00',b'\x01'), A2RINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %s)" % synchronized) + + def validate_info_creator(self, creator_as_bytes): + raise_if(len(creator_as_bytes) > 32, A2RINFOFormatError_BadCreator, "Creator is longer than 32 bytes") + try: + creator_as_bytes.decode("UTF-8") + except: + raise_if(True, A2RINFOFormatError_BadCreator, "Creator is not valid UTF-8") + + def encode_info_creator(self, creator_as_string): + creator_as_bytes = creator_as_string.encode("UTF-8").ljust(32, b" ") + self.validate_info_creator(creator_as_bytes) + return creator_as_bytes + + def decode_info_creator(self, creator_as_bytes): + self.validate_info_creator(creator_as_bytes) + return creator_as_bytes.decode("UTF-8").strip() + + def validate_metadata(self, metadata_as_bytes): + try: + metadata = metadata_as_bytes.decode("UTF-8") + except: + raise A2RMETAFormatError("Metadata is not valid UTF-8") + + def decode_metadata(self, metadata_as_bytes): + self.validate_metadata(metadata_as_bytes) + return metadata_as_bytes.decode("UTF-8") + + def validate_metadata_value(self, value): + raise_if("\t" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains tab character)") + raise_if("\n" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains linefeed character)") + raise_if("|" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains pipe character)") + + def validate_metadata_language(self, language): + raise_if(language and (language not in kLanguages), A2RMETAFormatError_BadLanguage, "Invalid metadata language") + + def validate_metadata_requires_ram(self, requires_ram): + raise_if(requires_ram and (requires_ram not in kRequiresRAM), A2RMETAFormatError_BadRAM, "Invalid metadata requires_ram") + + def validate_metadata_requires_machine(self, requires_machine): + raise_if(requires_machine and (requires_machine not in kRequiresMachine), A2RMETAFormatError_BadMachine, "Invalid metadata requires_machine") + +class A2RReader(DiskImage, A2RValidator): + def __init__(self, filename=None, stream=None): + DiskImage.__init__(self, filename, stream) + self.info = collections.OrderedDict() + self.meta = collections.OrderedDict() + self.flux = collections.OrderedDict() + + with stream or open(filename, "rb") as f: + header_raw = f.read(8) + raise_if(len(header_raw) != 8, A2REOFError, sEOF) + self.__process_header(header_raw) + while True: + chunk_id = f.read(4) + if not chunk_id: break + raise_if(len(chunk_id) != 4, A2REOFError, sEOF) + chunk_size_raw = f.read(4) + raise_if(len(chunk_size_raw) != 4, A2REOFError, sEOF) + chunk_size = from_uint32(chunk_size_raw) + data = f.read(chunk_size) + raise_if(len(data) != chunk_size, A2REOFError, sEOF) + if chunk_id == kINFO: + raise_if(chunk_size != 36, A2RFormatError, sBadChunkSize) + self.__process_info(data) + elif chunk_id == kSTRM: + self.__process_strm(data) + elif chunk_id == kMETA: + self.__process_meta(data) + + def __process_header(self, data): + raise_if(data[:4] != kA2R2, A2RHeaderError_NoA2R2, "Magic string 'A2R2' not present at offset 0") + raise_if(data[4] != 0xFF, A2RHeaderError_NoFF, "Magic byte 0xFF not present at offset 4") + raise_if(data[5:8] != b"\x0A\x0D\x0A", A2RHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5") + + def __process_info(self, data): + version = data[0] + self.validate_info_version(to_uint8(version)) + disk_type = data[33] + self.validate_info_disk_type(to_uint8(disk_type)) + write_protected = data[34] + self.validate_info_write_protected(to_uint8(write_protected)) + synchronized = data[35] + self.validate_info_synchronized(to_uint8(synchronized)) + creator = self.decode_info_creator(data[1:33]) + self.info["version"] = version # int + self.info["disk_type"] = disk_type # int + self.info["write_protected"] = (write_protected == 1) # boolean + self.info["synchronized"] = (synchronized == 1) # boolean + self.info["creator"] = creator # string + + def __process_strm(self, data): + raise_if(data[-1] != 0xFF, A2RSTRMFormatError, "Missing phase reset at end of STRM chunk") + i = 0 + while i < len(data) - 1: + location = data[i] + capture_type = data[i+1] + data_length = from_uint32(data[i+2:i+6]) + tick_count = from_uint32(data[i+6:i+10]) + if location not in self.flux: + self.flux[location] = [] + self.flux[location].append( + {"capture_type": capture_type, + "data_length": data_length, + "tick_count": tick_count, + "data": data[i+10:i+10+data_length]} + ) + i = i + 10 + data_length + + def __process_meta(self, metadata_as_bytes): + metadata = self.decode_metadata(metadata_as_bytes) + for line in metadata.split("\n"): + if not line: continue + columns_raw = line.split("\t") + raise_if(len(columns_raw) != 2, A2RMETAFormatError, "Malformed metadata") + key, value_raw = columns_raw + raise_if(key in self.meta, A2RMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key) + values = value_raw.split("|") + if key == "language": + list(map(self.validate_metadata_language, values)) + elif key == "requires_ram": + list(map(self.validate_metadata_requires_ram, values)) + elif key == "requires_machine": + list(map(self.validate_metadata_requires_machine, values)) + self.meta[key] = len(values) == 1 and values[0] or tuple(values) + + def to_json(self): + j = {"a2r": {"info":self.info, "meta":self.meta}} + return json.dumps(j, indent=2) + +class A2RWriter(A2RValidator): + def __init__(self, creator): + self.info = collections.OrderedDict() + self.meta = collections.OrderedDict() + self.flux = collections.OrderedDict() + + def from_json(self, json_string): + j = json.loads(json_string) + root = [x for x in j.keys()].pop() + self.meta.update(j[root]["meta"]) + + def build_head(self): + chunk = bytearray() + chunk.extend(kA2R2) # magic bytes + chunk.extend(b"\xFF\x0A\x0D\x0A") # more magic bytes + return chunk + + def build_info(self): + chunk = bytearray() + chunk.extend(kINFO) # chunk ID + chunk.extend(to_uint32(36)) # chunk size (constant) + version_raw = to_uint8(self.info["version"]) + self.validate_info_version(version_raw) + creator_raw = self.encode_info_creator(self.info["creator"]) + disk_type_raw = to_uint8(self.info["disk_type"]) + self.validate_info_disk_type(disk_type_raw) + write_protected_raw = to_uint8(self.info["write_protected"]) + self.validate_info_write_protected(write_protected_raw) + synchronized_raw = to_uint8(self.info["synchronized"]) + self.validate_info_synchronized(synchronized_raw) + chunk.extend(version_raw) # version (int, probably 1) + chunk.extend(creator_raw) # creator + chunk.extend(disk_type_raw) # disk type (1=5.25 inch, 2=3.5 inch) + chunk.extend(write_protected_raw) # write-protected (0=no, 1=yes) + chunk.extend(synchronized_raw) # tracks synchronized (0=no, 1=yes) + return chunk + + def build_strm(self): + data_raw = bytearray() + for location in self.flux.keys(): + for capture in self.flux[location]: + data_raw.extend(to_uint8(location)) # track where this capture happened + data_raw.extend(to_uint8(capture["capture_type"])) # 1 = timing, 2 = bits, 3 = xtiming + data_raw.extend(to_uint32(len(capture["data"]))) # data length in bytes + data_raw.extend(to_uint32(capture["tick_count"])) # estimated loop point in ticks + data_raw.extend(capture["data"]) + data_raw.extend(b"\xFF") + chunk = bytearray() + chunk.extend(kSTRM) # chunk ID + chunk.extend(to_uint32(len(data_raw))) # chunk size + chunk.extend(data_raw) # all stream data + return chunk + + def build_meta(self): + if not self.meta: return b"" + meta_tmp = {} + for key, value_raw in self.meta.items(): + if type(value_raw) == str: + values = [value_raw] + else: + values = value_raw + meta_tmp[key] = values + list(map(self.validate_metadata_value, values)) + if key == "language": + list(map(self.validate_metadata_language, values)) + elif key == "requires_ram": + list(map(self.validate_metadata_requires_ram, values)) + elif key == "requires_machine": + list(map(self.validate_metadata_requires_machine, values)) + data = b"\x0A".join( + [k.encode("UTF-8") + \ + b"\x09" + \ + "|".join(v).encode("UTF-8") \ + for k, v in meta_tmp.items()]) + b"\x0A" + chunk = bytearray() + chunk.extend(kMETA) # chunk ID + chunk.extend(to_uint32(len(data))) # chunk size + chunk.extend(data) + return chunk + + def write(self, stream): + stream.write(self.build_head()) + stream.write(self.build_info()) + stream.write(self.build_strm()) + stream.write(self.build_meta()) + +#---------- command line interface ---------- + +class BaseCommand: + def __init__(self, name): + self.name = name + + def setup(self, subparser, description=None, epilog=None, help=".a2r disk image", formatter_class=argparse.HelpFormatter): + self.parser = subparser.add_parser(self.name, description=description, epilog=epilog, formatter_class=formatter_class) + self.parser.add_argument("file", help=help) + self.parser.set_defaults(action=self) + + def __call__(self, args): + self.a2r_image = A2RReader(args.file) + +class CommandVerify(BaseCommand): + def __init__(self): + BaseCommand.__init__(self, "verify") + + def setup(self, subparser): + BaseCommand.setup(self, subparser, + description="Verify file structure and metadata of a .a2r disk image (produces no output unless a problem is found)") + +class CommandDump(BaseCommand): + kWidth = 30 + + def __init__(self): + BaseCommand.__init__(self, "dump") + + def setup(self, subparser): + BaseCommand.setup(self, subparser, + description="Print all available information and metadata in a .a2r disk image") + + def __call__(self, args): + BaseCommand.__call__(self, args) + self.print_flux() + self.print_meta() + self.print_info() + + def print_info(self): + print("INFO: Format version:".ljust(self.kWidth), "%d" % self.a2r_image.info["version"]) + print("INFO: Disk type:".ljust(self.kWidth), ("5.25-inch", "3.5-inch")[self.a2r_image.info["disk_type"]-1]) + print("INFO: Write protected:".ljust(self.kWidth), dNoYes[self.a2r_image.info["write_protected"]]) + print("INFO: Track synchronized:".ljust(self.kWidth), dNoYes[self.a2r_image.info["synchronized"]]) + print("INFO: Creator:".ljust(self.kWidth), self.a2r_image.info["creator"]) + + def print_flux(self): + for location in self.a2r_image.flux: + for flux_record in self.a2r_image.flux[location]: + print(("STRM: Track %d%s" % (location/4, tQuarters[location%4])).ljust(self.kWidth), + dTiming[flux_record["capture_type"]], "capture,", + flux_record["tick_count"], "ticks") + + def print_meta(self): + if not self.a2r_image.meta: return + for key, values in self.a2r_image.meta.items(): + if type(values) == str: + values = [values] + print(("META: " + key + ":").ljust(self.kWidth), values[0]) + for value in values[1:]: + print("META: ".ljust(self.kWidth), value) + +class CommandExport(BaseCommand): + def __init__(self): + BaseCommand.__init__(self, "export") + + def setup(self, subparser): + BaseCommand.setup(self, subparser, + description="Export (as JSON) all information and metadata from a .a2r disk image") + + def __call__(self, args): + BaseCommand.__call__(self, args) + print(self.a2r_image.to_json()) + +class WriterBaseCommand(BaseCommand): + def __call__(self, args): + BaseCommand.__call__(self, args) + self.args = args + # maintain creator if there is one, otherwise use default + self.output = A2RWriter(self.a2r_image.info.get("creator", __displayname__)) + self.output.flux = self.a2r_image.flux.copy() + self.output.info = self.a2r_image.info.copy() + self.output.meta = self.a2r_image.meta.copy() + self.update() + tmpfile = args.file + ".chery" + with open(tmpfile, "wb") as f: + self.output.write(f) + os.rename(tmpfile, args.file) + +class CommandEdit(WriterBaseCommand): + def __init__(self): + WriterBaseCommand.__init__(self, "edit") + + def setup(self, subparser): + WriterBaseCommand.setup(self, + subparser, + description="Edit information and metadata in a .a2r disk image", + epilog="""Tips: + + - Use repeated flags to edit multiple fields at once. + - Use "key:" with no value to delete a metadata field. + - Keys are case-sensitive. + - Some values have format restrictions; read the .a2r specification.""", + help=".a2r disk image (modified in place)", + formatter_class=argparse.RawDescriptionHelpFormatter) + self.parser.add_argument("-i", "--info", type=str, action="append", + help="""change information field. +INFO format is "key:value". +Acceptable keys are disk_type, write_protected, synchronized, creator, version. +Other keys are ignored. +For boolean fields, use "1" or "true" or "yes" for true, "0" or "false" or "no" for false.""") + self.parser.add_argument("-m", "--meta", type=str, action="append", + help="""change metadata field. +META format is "key:value". +Standard keys are title, subtitle, publisher, developer, copyright, version, language, requires_ram, +requires_machine, notes, side, side_name, contributor, image_date. Other keys are allowed.""") + + def update(self): + # add all new info fields + for i in self.args.info or (): + k, v = i.split(":", 1) + if k in ("write_protected","synchronized"): + v = v.lower() in ("1", "true", "yes") + self.output.info[k] = v + # add all new metadata fields, and delete empty ones + for m in self.args.meta or (): + k, v = m.split(":", 1) + v = v.split("|") + if len(v) == 1: + v = v[0] + if v: + self.output.meta[k] = v + elif k in self.output.meta.keys(): + del self.output.meta[k] + +class CommandImport(WriterBaseCommand): + def __init__(self): + WriterBaseCommand.__init__(self, "import") + + def setup(self, subparser): + WriterBaseCommand.setup(self, subparser, + description="Import JSON file to update metadata in a .a2r disk image") + + def update(self): + self.output.from_json(sys.stdin.read()) + +if __name__ == "__main__": + import sys + raise_if = lambda cond, e, s="": cond and sys.exit("%s: %s" % (e.__name__, s)) + cmds = [CommandDump(), CommandVerify(), CommandEdit(), CommandExport(), CommandImport()] + parser = argparse.ArgumentParser(prog=__progname__, + description="""A multi-purpose tool for manipulating .a2r disk images. + +See '""" + __progname__ + """ -h' for help on individual commands.""", + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("-v", "--version", action="version", version=__displayname__) + sp = parser.add_subparsers(dest="command", help="command") + for command in cmds: + command.setup(sp) + args = parser.parse_args() + args.action(args)