From 86ca60775e7fa0adea3298c395de62b44863ad8a Mon Sep 17 00:00:00 2001 From: 4am Date: Thu, 31 May 2018 10:55:29 -0400 Subject: [PATCH] command line work --- passport/__init__.py | 2 + passport/eddimage.py | 24 +++ passport/strings.py | 2 +- passport/wozimage.py | 356 ++++++++++++++++++++++++++----------------- 4 files changed, 242 insertions(+), 142 deletions(-) create mode 100644 passport/eddimage.py diff --git a/passport/__init__.py b/passport/__init__.py index 3dc4e3b..d568894 100755 --- a/passport/__init__.py +++ b/passport/__init__.py @@ -8,6 +8,7 @@ import bitarray import collections import os.path import sys +import time class BaseLogger: # base class def __init__(self, g): @@ -935,6 +936,7 @@ class EDDToWoz(BasePassportProcessor): output_filename = source_base + '.woz' self.logger.PrintByID("writing", {"filename":output_filename}) woz_image = wozimage.WozWriter(STRINGS["header"].strip()) + woz_image.meta["image_date"] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()) for q in range(1 + (0x23 * 4)): track_num = q / 4 if track_num in self.output_tracks: diff --git a/passport/eddimage.py b/passport/eddimage.py new file mode 100644 index 0000000..bb63b54 --- /dev/null +++ b/passport/eddimage.py @@ -0,0 +1,24 @@ +from passport.wozimage import DiskImage, Track, WozError, raise_if +import bitarray + +class EDDReader(DiskImage): + def __init__(self, filename=None, stream=None): + DiskImage.__init__(self, filename, stream) + with stream or open(filename, 'rb') as f: + for i in range(137): + raw_bytes = f.read(16384) + raise_if(len(raw_bytes) != 16384, WozError, "Bad EDD file (did you image by quarter tracks?)") + bits = bitarray.bitarray(endian="big") + bits.frombytes(raw_bytes) + self.tracks.append(Track(bits, 131072)) + + def seek(self, track_num): + if type(track_num) != float: + track_num = float(track_num) + if track_num < 0.0 or \ + track_num > 35.0 or \ + track_num.as_integer_ratio()[1] not in (1,2,4): + raise WozError("Invalid track %s" % track_num) + trk_id = int(track_num * 4) + return self.tracks[trk_id] + diff --git a/passport/strings.py b/passport/strings.py index 71ba8de..5dc2d6f 100644 --- a/passport/strings.py +++ b/passport/strings.py @@ -1,5 +1,5 @@ STRINGS = { - "header": "Passport.py by 4am (2018-05-24)\n", # max 32 characters + "header": "Passport.py by 4am (2018-05-29)\n", # max 32 characters "reading": "Reading from {filename}\n", "diskrwts": "Using disk's own RWTS\n", "bb00": "T00,S05 Found $BB00 protection check\n" diff --git a/passport/wozimage.py b/passport/wozimage.py index 6d12e41..aa48c49 100755 --- a/passport/wozimage.py +++ b/passport/wozimage.py @@ -4,11 +4,16 @@ # MIT-licensed # portions from MIT-licensed defedd.py (c) 2014 by Paul Hagstrom +import argparse import binascii import bitarray # https://pypi.org/project/bitarray/ import collections import itertools -import sys + +__version__ = "0.1" +__date__ = "2018-05-31" +__programname__ = "wozardry" +__displayname__ = __programname__ + " " + __version__ + " by 4am (" + __date__ + ")" # domain-specific constants defined in .woz specification kWOZ1 = b'WOZ1' @@ -61,6 +66,9 @@ def to_uint32(b): def to_uint16(b): return b.to_bytes(2, byteorder="little") +def to_uint8(b): + return b.to_bytes(1, byteorder="little") + def raise_if(cond, e, s=""): if cond: raise e(s) @@ -123,110 +131,12 @@ class DiskImage: # base class """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" return None -class EDDReader(DiskImage): - def __init__(self, filename=None, stream=None): - DiskImage.__init__(self, filename, stream) - with stream or open(filename, 'rb') as f: - for i in range(137): - raw_bytes = f.read(16384) - raise_if(len(raw_bytes) != 16384, WozError, "Bad EDD file (did you image by quarter tracks?)") - bits = bitarray.bitarray(endian="big") - bits.frombytes(raw_bytes) - self.tracks.append(Track(bits, 131072)) - - def seek(self, track_num): - if type(track_num) != float: - track_num = float(track_num) - if track_num < 0.0 or \ - track_num > 35.0 or \ - track_num.as_integer_ratio()[1] not in (1,2,4): - raise WozError("Invalid track %s" % track_num) - trk_id = int(track_num * 4) - return self.tracks[trk_id] - -class WozWriter: - def __init__(self, creator): - self.tracks = [] - self.tmap = [0xFF]*160 - self.creator = creator - #self.meta = collections.OrderedDict() - - def add_track(self, track_num, track): - tmap_id = int(track_num * 4) - trk_id = len(self.tracks) - self.tracks.append(track) - self.tmap[tmap_id] = trk_id - if tmap_id: - self.tmap[tmap_id - 1] = trk_id - if tmap_id < 159: - self.tmap[tmap_id + 1] = trk_id - - def build_info(self): - chunk = bytearray() - chunk.extend(kINFO) # chunk ID - chunk.extend(to_uint32(60)) # chunk size - chunk.extend(b'\x01') # version = 1 - chunk.extend(b'\x01') # disk type = 1 (5.25-inch) - chunk.extend(b'\x00') # write-protected = 0 - chunk.extend(b'\x00') # synchronized = 0 - chunk.extend(b'\x00') # cleaned = 0 - chunk.extend(self.creator.encode("UTF-8").ljust(32, b" ")) # creator - chunk.extend(b'\x00' * 23) # reserved - return chunk - - def build_tmap(self): - chunk = bytearray() - chunk.extend(kTMAP) # chunk ID - chunk.extend(to_uint32(160)) # chunk size - chunk.extend(bytes(self.tmap)) - return chunk - - def build_trks(self): - chunk = bytearray() - chunk.extend(kTRKS) # chunk ID - chunk_size = len(self.tracks)*6656 - chunk.extend(to_uint32(chunk_size)) # chunk size - for track in self.tracks: - raw_bytes = track.bits.tobytes() - chunk.extend(raw_bytes) # bitstream as raw bytes - chunk.extend(b'\x00' * (6646 - len(raw_bytes))) # padding to 6646 bytes - chunk.extend(to_uint16(len(raw_bytes))) # bytes used - chunk.extend(to_uint16(track.bit_count)) # bit count - chunk.extend(b'\xFF\xFF') # splice point (none) - chunk.extend(b'\xFF') # splice nibble (none) - chunk.extend(b'\xFF') # splice bit count (none) - chunk.extend(b'\x00\x00') # reserved - return chunk - - def build_meta(self): - return b'' - - def build_head(self, crc): - chunk = bytearray() - chunk.extend(kWOZ1) # magic bytes - chunk.extend(b'\xFF\x0A\x0D\x0A') # more magic bytes - chunk.extend(to_uint32(crc)) # CRC32 of rest of file (calculated in caller) - return chunk - - def write(self, stream): - info = self.build_info() - tmap = self.build_tmap() - trks = self.build_trks() - meta = self.build_meta() - crc = binascii.crc32(info + tmap + trks + meta) - head = self.build_head(crc) - stream.write(head) - stream.write(info) - stream.write(tmap) - stream.write(trks) - stream.write(meta) - class WozReader(DiskImage): def __init__(self, filename=None, stream=None): DiskImage.__init__(self, filename, stream) self.tmap = None - self.info = None - self.meta = None + self.info = collections.OrderedDict() + self.meta = collections.OrderedDict() with stream or open(filename, 'rb') as f: header_raw = f.read(8) @@ -281,12 +191,12 @@ class WozReader(DiskImage): creator = data[5:37].decode('UTF-8') except: raise WOZINFOFormatError("Creator is not valid UTF-8") - self.info = {"version": version, - "disk_type": disk_type, - "write_protected": (write_protected == 1), - "synchronized": (synchronized == 1), - "cleaned": (cleaned == 1), - "creator": creator} + self.info["version"] = version # int + self.info["disk_type"] = disk_type # int + self.info["write_protected"] = (write_protected == 1) # boolean + self.info["synchronized"] = (synchronized == 1) # boolean + self.info["cleaned"] = (cleaned == 1) # boolean + self.info["creator"] = creator # string def __process_tmap(self, data): self.tmap = list(data) @@ -329,7 +239,6 @@ class WozReader(DiskImage): metadata = data.decode('UTF-8') except: raise WozMETAFormatError("Metadata is not valid UTF-8") - self.meta = collections.OrderedDict() for line in metadata.split('\n'): if not line: continue columns_raw = line.split('\t') @@ -346,7 +255,7 @@ class WozReader(DiskImage): elif key == "requires_machine": for value in values: raise_if(value and (value not in kRequiresMachine), WozMETAFormatError_BadMachine, "Invalid metadata requires_machine") - self.meta[key] = values + self.meta[key] = len(values) == 1 and values[0] or tuple(values) def seek(self, track_num): """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" @@ -360,40 +269,205 @@ class WozReader(DiskImage): if trk_id == 0xFF: return None return self.tracks[trk_id] -# ----- quick info dump routines ----- -kWidth = 20 # width of first column for printing info and metadata +class WozWriter: + def __init__(self, creator): + self.info = collections.OrderedDict() + self.info["version"] = 1 + self.info["disk_type"] = 1 + self.info["write_protected"] = False + self.info["synchronized"] = False + self.info["cleaned"] = False + self.info["creator"] = creator + self.tracks = [] + self.tmap = [0xFF]*160 + self.meta = collections.OrderedDict() -def print_info(wozimage): - print() - print("INFO") - print("File format version:".ljust(kWidth), "%d" % wozimage.info["version"]) - print("Disk type:".ljust(kWidth), ("5.25-inch", "3.5-inch")[wozimage.info["disk_type"]-1]) - print("Write protected:".ljust(kWidth), dNoYes[wozimage.info["write_protected"]]) - print("Track synchronized:".ljust(kWidth), dNoYes[wozimage.info["synchronized"]]) - print("Weakbits cleaned:".ljust(kWidth), dNoYes[wozimage.info["cleaned"]]) - print("Creator:".ljust(kWidth), wozimage.info["creator"]) + def add_track(self, track_num, track): + tmap_id = int(track_num * 4) + trk_id = len(self.tracks) + self.tracks.append(track) + self.tmap[tmap_id] = trk_id + if tmap_id: + self.tmap[tmap_id - 1] = trk_id + if tmap_id < 159: + self.tmap[tmap_id + 1] = trk_id -def print_tmap(wozimage): - print() - print("TMAP") - i = 0 - for tindex in wozimage.tmap: - if tindex != 0xFF: - print("Track %d%s -> TRKS %d" % (i/4, tQuarters[i%4], tindex)) - i += 1 + def build_info(self): + chunk = bytearray() + chunk.extend(kINFO) # chunk ID + chunk.extend(to_uint32(60)) # chunk size (constant) + chunk.extend(to_uint8(self.info["version"])) # version (int, probably 1) + chunk.extend(to_uint8(self.info["disk_type"])) # disk type (1=5.25 inch, 2=3.5 inch) + chunk.extend(to_uint8(self.info["write_protected"])) # write-protected (0=no, 1=yes) + chunk.extend(to_uint8(self.info["synchronized"])) # tracks synchronized (0=no, 1=yes) + chunk.extend(to_uint8(self.info["cleaned"])) # weakbits cleaned (0=no, 1=yes) + chunk.extend(self.info["creator"].encode("UTF-8").ljust(32, b" ")) # creator + chunk.extend(b'\x00' * 23) # reserved + return chunk + + def build_tmap(self): + chunk = bytearray() + chunk.extend(kTMAP) # chunk ID + chunk.extend(to_uint32(160)) # chunk size + chunk.extend(bytes(self.tmap)) + return chunk + + def build_trks(self): + chunk = bytearray() + chunk.extend(kTRKS) # chunk ID + chunk_size = len(self.tracks)*6656 + chunk.extend(to_uint32(chunk_size)) # chunk size + for track in self.tracks: + raw_bytes = track.bits.tobytes() + chunk.extend(raw_bytes) # bitstream as raw bytes + chunk.extend(b'\x00' * (6646 - len(raw_bytes))) # padding to 6646 bytes + chunk.extend(to_uint16(len(raw_bytes))) # bytes used + chunk.extend(to_uint16(track.bit_count)) # bit count + chunk.extend(b'\xFF\xFF') # splice point (none) + chunk.extend(b'\xFF') # splice nibble (none) + chunk.extend(b'\xFF') # splice bit count (none) + chunk.extend(b'\x00\x00') # reserved + return chunk -def print_meta(wozimage): - if not wozimage.meta: return - print() - print("META") - for key, values in wozimage.meta.items(): - print((key + ":").ljust(kWidth), values[0]) - for value in values[1:]: - print("".ljust(kWidth), value) + def build_meta(self): + if not self.meta: return b'' + data = b'\x0A'.join( + [k.encode("UTF-8") + \ + b'\x09' + \ + (type(v) in (list,tuple) and "|".join(v) or v).encode("UTF-8") \ + for k, v in self.meta.items()]) + chunk = bytearray() + chunk.extend(kMETA) # chunk ID + chunk.extend(to_uint32(len(data))) # chunk size + chunk.extend(data) + return chunk + + def build_head(self, crc): + chunk = bytearray() + chunk.extend(kWOZ1) # magic bytes + chunk.extend(b'\xFF\x0A\x0D\x0A') # more magic bytes + chunk.extend(to_uint32(crc)) # CRC32 of rest of file (calculated in caller) + return chunk + def write(self, stream): + info = self.build_info() + tmap = self.build_tmap() + trks = self.build_trks() + meta = self.build_meta() + crc = binascii.crc32(info + tmap + trks + meta) + head = self.build_head(crc) + stream.write(head) + stream.write(info) + stream.write(tmap) + stream.write(trks) + stream.write(meta) + +#---------- command line interface ---------- + +class BaseCommand: + def __init__(self, name): + self.name = name + + def setup(self, subparser, description=None, epilog=None, help=".woz disk image", formatter_class=argparse.HelpFormatter): + self.parser = subparser.add_parser(self.name, description=description, epilog=epilog, formatter_class=formatter_class) + self.parser.add_argument("file", help=help) + self.parser.set_defaults(action=self) + + def __call__(self, args): + self.woz_image = WozReader(args.file) + +class CommandVerify(BaseCommand): + def __init__(self): + BaseCommand.__init__(self, "verify") + + def setup(self, subparser): + BaseCommand.setup(self, subparser, description="Verify file structure and metadata of a .woz disk image (produces no output unless a problem is found)") + +class CommandDump(BaseCommand): + kWidth = 30 + + def __init__(self): + BaseCommand.__init__(self, "dump") + + def setup(self, subparser): + BaseCommand.setup(self, subparser, description="Print all available information and metadata in a .woz disk image") + + def __call__(self, args): + BaseCommand.__call__(self, args) + self.print_tmap() + self.print_meta() + self.print_info() + + def print_info(self): + print("INFO: File format version:".ljust(self.kWidth), "%d" % self.woz_image.info["version"]) + print("INFO: Disk type:".ljust(self.kWidth), ("5.25-inch", "3.5-inch")[self.woz_image.info["disk_type"]-1]) + print("INFO: Write protected:".ljust(self.kWidth), dNoYes[self.woz_image.info["write_protected"]]) + print("INFO: Track synchronized:".ljust(self.kWidth), dNoYes[self.woz_image.info["synchronized"]]) + print("INFO: Weakbits cleaned:".ljust(self.kWidth), dNoYes[self.woz_image.info["cleaned"]]) + print("INFO: Creator:".ljust(self.kWidth), self.woz_image.info["creator"]) + + def print_tmap(self): + i = 0 + for trk, i in zip(self.woz_image.tmap, itertools.count()): + if trk != 0xFF: + print(("TMAP: Track %d%s" % (i/4, tQuarters[i%4])).ljust(self.kWidth), "TRKS %d" % (trk)) + + def print_meta(self): + if not self.woz_image.meta: return + for key, values in self.woz_image.meta.items(): + if type(values) == str: + values = [values] + print(("META: " + key + ":").ljust(self.kWidth), values[0]) + for value in values[1:]: + print("META: ".ljust(self.kWidth), value) + +class CommandEdit(BaseCommand): + def __init__(self): + BaseCommand.__init__(self, "edit") + + def setup(self, subparser): + BaseCommand.setup(self, + subparser, + description="Edit information and metadata in a .woz disk image", + epilog="""Tips:\n - Use repeated flags to edit multiple fields at once.\n - Use "key:" with no value to delete a metadata field.\n - Keys are case-sensitive.\n - Some values have format restrictions; read the .woz specification.""", + help=".woz disk image (modified in place)", + formatter_class=argparse.RawDescriptionHelpFormatter) + self.parser.add_argument("-i", "--info", type=str, action="append", help="""change information field. INFO format is "key:value". Acceptable keys are disk_type, write_protected, synchronized, cleaned, creator, version. Other keys are ignored.""") + self.parser.add_argument("-m", "--meta", type=str, action="append", help="""change metadata field. META format is "key:value". Standard keys are title, subtitle, publisher, developer, copyright, version, language, requires_ram, requires_machine, notes, side, side_name, contributor, image_date. Other keys are allowed.""") + + def __call__(self, args): + BaseCommand.__call__(self, args) + # maintain creator if there is one, otherwise use default + output = WozWriter(self.woz_image.info.get("creator", __displayname__)) + output.tmap = self.woz_image.tmap + output.tracks = self.woz_image.tracks + output.info = self.woz_image.info + output.meta = self.woz_image.meta + # add all new info fields + for i in args.info or (): + k, v = i.split(":", 1) + k = k.lower() + output.info[k] = v + # add all new metadata fields + for m in args.meta or (): + k, v = m.split(":", 1) + k = k.lower() + v = v.split("|") + if len(v) == 1: + v = v[0] + if v: + output.meta[k] = v + elif k in output.meta.keys(): + del output.meta[k] + with open(args.file, 'wb') as f: + output.write(f) + if __name__ == "__main__": - for wozfile in sys.argv[1:]: - w = WozReader(wozfile) - print_tmap(w) - print_meta(w) - print_info(w) + cmds = [CommandDump(), CommandVerify(), CommandEdit()] + parser = argparse.ArgumentParser(prog=__programname__, description="A multi-purpose tool for manipulating .woz disk images.\n\nSee '" + __programname__ + " -h' for help on individual commands.", formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("-v", "--version", action="version", version=__displayname__) + sp = parser.add_subparsers(dest="command", help="command") + for command in cmds: + command.setup(sp) + args = parser.parse_args() + args.action(args)