From 64bf4e237a6ddfc3cfbe7329439fa48e81a38053 Mon Sep 17 00:00:00 2001 From: 4am Date: Thu, 7 Jun 2018 10:28:25 -0400 Subject: [PATCH] add support for Infocom, Optimum Resource, Heredity Dog, JMPBECA, and update to latest wozardry --- passport/__init__.py | 299 +++++++++++++++++++++++++++++++++---------- passport/strings.py | 2 +- passport/wozimage.py | 195 +++++++++++++++++++--------- 3 files changed, 369 insertions(+), 127 deletions(-) diff --git a/passport/__init__.py b/passport/__init__.py index d568894..ab849bc 100755 --- a/passport/__init__.py +++ b/passport/__init__.py @@ -82,6 +82,8 @@ class PassportGlobals: self.force_disk_vol = False self.captured_disk_volume_number = False self.disk_volume_number = None + self.found_and_cleaned_weakbits = False + self.protection_enforces_write_protected = False # things about the conversion process self.tried_univ = False self.track = 0 @@ -124,14 +126,14 @@ class RWTS: } def __init__(self, + g, sectors_per_track = 16, address_prologue = kDefaultAddressPrologue16, address_epilogue = kDefaultAddressEpilogue16, data_prologue = kDefaultDataPrologue16, data_epilogue = kDefaultDataEpilogue16, sector_order = kDefaultSectorOrder16, - nibble_translate_table = kDefaultNibbleTranslationTable16, - logger = None): + nibble_translate_table = kDefaultNibbleTranslationTable16): self.sectors_per_track = sectors_per_track self.address_prologue = address_prologue self.address_epilogue = address_epilogue @@ -139,7 +141,7 @@ class RWTS: self.data_epilogue = data_epilogue self.sector_order = sector_order self.nibble_translate_table = nibble_translate_table - self.logger = logger or SilentLogger + self.g = g self.track_num = 0 def seek(self, track_num): @@ -167,13 +169,13 @@ class RWTS: found.append(next(track.nibble())) return tuple(found) == tuple(nibbles) - def verify_address_epilogue_at_point(self, track): + def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): return self.verify_nibbles_at_point(track, self.address_epilogue) - def find_data_prologue(self, track): + def find_data_prologue(self, track, track_num, physical_sector_num): return track.find(self.data_prologue) - def data_field_at_point(self, track): + def data_field_at_point(self, track, track_num, physical_sector_num): disk_nibbles = [] for i in range(343): disk_nibbles.append(next(track.nibble())) @@ -207,10 +209,10 @@ class RWTS: decoded[i + 172] += (((low2 & 0b010000) >> 3) + ((low2 & 0b100000) >> 5)) return bytearray(decoded) - def verify_data_epilogue_at_point(self, track): + def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): return self.verify_nibbles_at_point(track, self.data_epilogue) - def decode_track(self, track, burn=0): + def decode_track(self, track, track_num, burn=0): sectors = collections.OrderedDict() if not track: return sectors starting_revolutions = track.revolutions @@ -221,7 +223,7 @@ class RWTS: start_bit_index = track.bit_index if not self.find_address_prologue(track): # if we can't even find a single address prologue, just give up - self.logger.debug("can't find a single address prologue so LGTM or whatever") + self.g.logger.debug("can't find a single address prologue so LGTM or whatever") break # for edd->woz conversion, only save some of the bits preceding # the address prologue @@ -229,43 +231,43 @@ class RWTS: start_bit_index = track.bit_index - 256 # decode address field address_field = self.address_field_at_point(track) - self.logger.debug("found sector %s" % hex(address_field.sector_num)[2:].upper()) + self.g.logger.debug("found sector %s" % hex(address_field.sector_num)[2:].upper()) if address_field.sector_num in verified_sectors: # the sector we just found is a sector we've already decoded # properly, so skip it - self.logger.debug("duplicate sector %d, continuing" % address_field.sector_num) + self.g.logger.debug("duplicate sector %d, continuing" % address_field.sector_num) continue if address_field.sector_num > self.sectors_per_track: # found a weird sector whose ID is out of range # TODO: will eventually need to tweak this logic to handle Ultima V and others - self.logger.debug("sector ID out of range %d" % address_field.sector_num) + self.g.logger.debug("sector ID out of range %d" % address_field.sector_num) continue # put a placeholder for this sector in this position in the ordered dict # so even if this copy doesn't pan out but a later copy does, sectors # will still be in the original order sectors[address_field.sector_num] = None - if not self.verify_address_epilogue_at_point(track): + if not self.verify_address_epilogue_at_point(track, track_num, address_field.sector_num): # verifying the address field epilogue failed, but this is # not necessarily fatal because there might be another copy # of this sector later - self.logger.debug("verify_address_epilogue_at_point failed, continuing") + #self.g.logger.debug("verify_address_epilogue_at_point failed, continuing") continue - if not self.find_data_prologue(track): + if not self.find_data_prologue(track, track_num, address_field.sector_num): # if we can't find a data field prologue, just give up - self.logger.debug("find_data_prologue failed, giving up") + self.g.logger.debug("find_data_prologue failed, giving up") break # read and decode the data field, and verify the data checksum - decoded = self.data_field_at_point(track) + decoded = self.data_field_at_point(track, track_num, address_field.sector_num) if not decoded: - self.logger.debug("data_field_at_point failed, continuing") + self.g.logger.debug("data_field_at_point failed, continuing") # decoding data field failed, but this is not necessarily fatal # because there might be another copy of this sector later continue - if not self.verify_data_epilogue_at_point(track): + if not self.verify_data_epilogue_at_point(track, track_num, address_field.sector_num): # verifying the data field epilogue failed, but this is # not necessarily fatal because there might be another copy # of this sector later - self.logger.debug("verify_data_epilogue_at_point failed") + self.g.logger.debug("verify_data_epilogue_at_point failed") continue # store end index within track (used for .edd -> .woz conversion) end_bit_index = track.bit_index @@ -277,7 +279,7 @@ class RWTS: # all good, and we want to save this sector, so do it sectors[address_field.sector_num] = Sector(address_field, decoded, start_bit_index, end_bit_index) verified_sectors.append(address_field.sector_num) - self.logger.debug("saved sector %s" % hex(address_field.sector_num)) + self.g.logger.debug("saved sector %s" % hex(address_field.sector_num)) # remove placeholders of sectors that we found but couldn't decode properly # (made slightly more difficult by the fact that we're trying to remove # elements from an OrderedDict while iterating through the OrderedDict, @@ -292,8 +294,8 @@ class RWTS: class UniversalRWTS(RWTS): acceptable_address_prologues = ((0xD4,0xAA,0x96), (0xD5,0xAA,0x96)) - def __init__(self, logger): - RWTS.__init__(self, address_epilogue=[], data_epilogue=[], logger=logger) + def __init__(self, g): + RWTS.__init__(self, g, address_epilogue=[], data_epilogue=[]) def find_address_prologue(self, track): starting_revolutions = track.revolutions @@ -304,45 +306,64 @@ class UniversalRWTS(RWTS): if tuple(seen) in self.acceptable_address_prologues: return True return False - def verify_address_epilogue_at_point(self, track): + def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): # return True if not self.address_epilogue: self.address_epilogue = [next(track.nibble())] result = True else: - result = RWTS.verify_address_epilogue_at_point(self, track) + result = RWTS.verify_address_epilogue_at_point(self, track, track_num, physical_sector_num) next(track.nibble()) next(track.nibble()) return result - def verify_data_epilogue_at_point(self, track): + def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): if not self.data_epilogue: self.data_epilogue = [next(track.nibble())] result = True else: - result = RWTS.verify_data_epilogue_at_point(self, track) + result = RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num) next(track.nibble()) next(track.nibble()) return result class UniversalRWTSIgnoreEpilogues(UniversalRWTS): - def verify_address_epilogue_at_point(self, track): + def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): return True - def verify_data_epilogue_at_point(self, track): + def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): return True +class Track00RWTS(UniversalRWTSIgnoreEpilogues): + def data_field_at_point(self, track, track_num, physical_sector_num): + start_index = track.bit_index + start_revolutions = track.revolutions + decoded = UniversalRWTS.data_field_at_point(self, track, track_num, physical_sector_num) + if not decoded: + # If the sector didn't decode properly, rewind to the + # beginning of the data field before returning to the + # caller. This is for disks with a fake T00,S0A that + # is full of consecutive 0s, where if we consume the bitstream + # as nibbles, we'll end up consuming the next address field + # and it will seem like that sector doesn't exist. And that + # is generally logical sector 2, which is important not to + # miss at this stage because its absence triggers a different + # code path and everything falls apart. + track.bit_index = start_index + track.revolutions = start_revolutions + return decoded + class DOS33RWTS(RWTS): - def __init__(self, logical_sectors, logger): + def __init__(self, logical_sectors, g): self.reset(logical_sectors) RWTS.__init__(self, + g, sectors_per_track=16, address_prologue=self.address_prologue, address_epilogue=self.address_epilogue, data_prologue=self.data_prologue, data_epilogue=self.data_epilogue, - nibble_translate_table=self.nibble_translate_table, - logger=logger) + nibble_translate_table=self.nibble_translate_table) def reset(self, logical_sectors): self.address_prologue = (logical_sectors[3][0x55], @@ -392,14 +413,93 @@ class D5TimingBitRWTS(DOS33RWTS): track.rewind(1) return False - def verify_address_epilogue_at_point(self, track): + def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): return True - + +class InfocomRWTS(DOS33RWTS): + def reset(self, logical_sectors): + DOS33RWTS.reset(self, logical_sectors) + self.data_prologue = self.data_prologue[:2] + + def find_data_prologue(self, track, track_num, physical_sector_num): + if not DOS33RWTS.find_data_prologue(self, track, track_num, physical_sector_num): + return False + return next(track.nibble()) >= 0xAD + +class OptimumResourceRWTS(DOS33RWTS): + def data_field_at_point(self, track, track_num, physical_sector_num): + if (track_num, physical_sector_num) == (0x01, 0x0F): + # TODO actually decode these + disk_nibbles = [] + for i in range(343): + disk_nibbles.append(next(track.nibble())) + return bytearray(256) # all zeroes for now + return DOS33RWTS.data_field_at_point(self, track, track_num, physical_sector_num) + + def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): + if (track_num, physical_sector_num) == (0x01, 0x0F): + return True + return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num) + +class HeredityDogRWTS(DOS33RWTS): + def data_field_at_point(self, track, track_num, physical_sector_num): + if (track_num, physical_sector_num) == (0x00, 0x0A): + # This sector is fake, full of too many consecutive 0s, + # designed to read differently every time. We go through + # and clean the stray bits, and be careful not to go past + # the end so we don't include the next address prologue. + start_index = track.bit_index + while (track.bit_index < start_index + (343*8)): + if self.nibble_translate_table.get(next(track.nibble()), 0xFF) == 0xFF: + track.bits[track.bit_index-8:track.bit_index] = 0 + self.g.found_and_cleaned_weakbits = True + return bytearray(256) + return DOS33RWTS.data_field_at_point(self, track, track_num, physical_sector_num) + + def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): + if (track_num, physical_sector_num) == (0x00, 0x0A): + return True + return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num) + +class BECARWTS(DOS33RWTS): + def is_protected_sector(self, track_num, physical_sector_num): + if track_num > 0: return True + return physical_sector_num not in (0x00, 0x0D, 0x0B, 0x09, 0x07, 0x05, 0x03, 0x01, 0x0E, 0x0C) + + def reset(self, logical_sectors): + DOS33RWTS.reset(self, logical_sectors) + self.data_prologue = self.data_prologue[:2] + + def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): + if self.is_protected_sector(track_num, physical_sector_num): + return DOS33RWTS.verify_address_epilogue_at_point(self, track, track_num, physical_sector_num) + return True + + def find_data_prologue(self, track, track_num, physical_sector_num): + if not DOS33RWTS.find_data_prologue(self, track, track_num, physical_sector_num): + return False + next(track.nibble()) + if self.is_protected_sector(track_num, physical_sector_num): + next(track.bit()) + next(track.nibble()) + next(track.bit()) + next(track.bit()) + return True + + def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): + if self.is_protected_sector(track_num, physical_sector_num): + next(track.nibble()) + if track_num == 0: + next(track.nibble()) + next(track.nibble()) + return True + return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num) + class BasePassportProcessor: # base class def __init__(self, disk_image, logger_class=DefaultLogger): self.g = PassportGlobals() self.g.disk_image = disk_image - self.logger = logger_class(self.g) + self.g.logger = logger_class(self.g) self.rwts = None self.output_tracks = {} self.patchers = [] @@ -501,7 +601,7 @@ class BasePassportProcessor: # base class repeated_nibble_count = 0 last_nibble = n if repeated_nibble_count == 512: - self.logger.PrintByID("sync") + self.g.logger.PrintByID("sync") return True # TODO IsUnformatted and other tests return False @@ -677,24 +777,24 @@ class BasePassportProcessor: # base class def IDBootloader(self, t00): """returns RWTS object that can (hopefully) read the rest of the disk""" - temporary_rwts_for_t00 = UniversalRWTSIgnoreEpilogues(self.logger) - physical_sectors = temporary_rwts_for_t00.decode_track(t00) + temporary_rwts_for_t00 = Track00RWTS(self.g) + physical_sectors = temporary_rwts_for_t00.decode_track(t00, 0) if 0 not in physical_sectors: - self.logger.PrintByID("fatal0000") + self.g.logger.PrintByID("fatal0000") return None t00s00 = physical_sectors[0].decoded if self.IDDOS33(t00s00): self.g.is_boot0 = True if self.IDDiversi(t00s00): - self.logger.PrintByID("diversidos") + self.g.logger.PrintByID("diversidos") elif self.IDPronto(t00s00): - self.logger.PrintByID("prontodos") + self.g.logger.PrintByID("prontodos") else: - self.logger.PrintByID("dos33boot0") + self.g.logger.PrintByID("dos33boot0") logical_sectors = temporary_rwts_for_t00.reorder_to_logical_sectors(physical_sectors) if border.BorderPatcher(self.g).run(logical_sectors, 0): - return BorderRWTS(logical_sectors, self.logger) + return BorderRWTS(logical_sectors, self.g) return self.TraceDOS33(logical_sectors) # TODO JSR08B3 # TODO MECC fastloader @@ -750,32 +850,95 @@ class BasePassportProcessor: # base class if not use_builtin: # check for D5+timingbit RWTS if find.at(0x59, logical_sectors[3], b'\xBD\x8C\xC0\xC9\xD5'): - self.logger.PrintByID("diskrwts") - return D5TimingBitRWTS(logical_sectors, self.logger) + self.g.logger.PrintByID("diskrwts") + return D5TimingBitRWTS(logical_sectors, self.g) # TODO handle Milliken here # TODO handle Adventure International here - # TODO handle Infocom here + + if not use_builtin and (logical_sectors[0][0xFE] == 0x22): + return InfocomRWTS(logical_sectors, self.g) + + if not use_builtin and (find.at(0xF4, logical_sectors[2], + b'\x4C\xCA') or + find.at(0xFE, logical_sectors[2], + b'\x4C\xCA')): + self.g.logger.PrintByID("jmpbeca") + return BECARWTS(logical_sectors, self.g) + + if not use_builtin and (find.wild_at(0x5D, logical_sectors[0], + b'\x68' + b'\x85' + find.WILDCARD + \ + b'\x68' + \ + b'\x85' + find.WILDCARD + \ + b'\xA0\x01' + \ + b'\xB1' + find.WILDCARD + \ + b'\x85\x54')): + self.g.logger.PrintByID("optimum") + return OptimumResourceRWTS(logical_sectors, self.g) + + if not use_builtin and (find.wild_at(0x16, logical_sectors[5], + b'\xF0\x05' + b'\xA2\xB2' + b'\x4C\xF0\xBB' + b'\xBD\x8C\xC0' + b'\xA9' + find.WILDCARD + \ + b'\x8D\x00\x02' + b'\xBD\x8C\xC0' + b'\x10\xFB' + b'\xC9\xEB' + b'\xD0\xF7' + b'\xBD\x8C\xC0' + b'\x10\xFB' + b'\xC9\xD5' + b'\xD0\xEE' + b'\xBD\x8C\xC0' + b'\x10\xFB' + b'\xC9\xAA' + b'\xD0\xE5' + b'\xA9\x4C' + b'\xA0\x00' + b'\x99\x00\x95' + b'\x88' + b'\xD0\xFA' + b'\xCE\x46\xBB' + b'\xAD\x46\xBB' + b'\xC9\x07' + b'\xD0\xEC' + b'\xA9\x18' + b'\x8D\x42\xB9' + b'\xA9\x0A' + b'\x8D\xED\xB7' + b'\xD0\x05')): + self.g.logger.PrintByID("bb00") + if find.at(0x04, logical_sectors[5], + b'\xBD\x8D\xC0' + b'\xBD\x8E\xC0' + b'\x30\x05' + b'\xA2\xB1' + b'\x4C\xF0\xBB'): + self.g.protection_enforces_write_protected = True + return HeredityDogRWTS(logical_sectors, self.g) if use_builtin: return self.StartWithUniv() - self.logger.PrintByID("diskrwts") - return DOS33RWTS(logical_sectors, self.logger) + self.g.logger.PrintByID("diskrwts") + return DOS33RWTS(logical_sectors, self.g) def StartWithUniv(self): """return Universal RWTS object, log that we're using it, and set global flags appropriately""" - self.logger.PrintByID("builtin") + self.g.logger.PrintByID("builtin") self.g.tried_univ = True self.g.is_protdos = False - return UniversalRWTS(self.logger) + return UniversalRWTS(self.g) def preprocess(self): return True def run(self): - self.logger.PrintByID("header") - self.logger.PrintByID("reading", {"filename":self.g.disk_image.filename}) + self.g.logger.PrintByID("header") + self.g.logger.PrintByID("reading", {"filename":self.g.disk_image.filename}) # get all raw track data from the source disk self.tracks = {} @@ -794,15 +957,15 @@ class BasePassportProcessor: # base class for track_num in range(0x22, -1, -1): self.g.track = track_num self.rwts.seek(track_num) - self.logger.debug("Seeking to track %s" % hex(self.g.track)) + self.g.logger.debug("Seeking to track %s" % hex(self.g.track)) try_again = True while try_again: try_again = False - physical_sectors = self.rwts.decode_track(self.tracks[track_num], self.burn) + physical_sectors = self.rwts.decode_track(self.tracks[track_num], track_num, self.burn) if len(physical_sectors) == self.rwts.sectors_per_track: continue else: - self.logger.debug("found %d sectors" % len(physical_sectors)) + self.g.logger.debug("found %d sectors" % len(physical_sectors)) if (0x0F not in physical_sectors) and self.SkipTrack(track_num, self.tracks[track_num]): physical_sectors = None continue @@ -810,16 +973,16 @@ class BasePassportProcessor: # base class # Need to save the sectors that worked with the original RWTS # then append the ones that worked with the universal RWTS if not self.g.tried_univ: - self.logger.PrintByID("switch", {"sector":0x0F}) # TODO find exact sector - self.rwts = UniversalRWTS(self.logger) + self.g.logger.PrintByID("switch", {"sector":0x0F}) # TODO find exact sector + self.rwts = UniversalRWTS(self.g) self.g.tried_univ = True try_again = True continue if track_num == 0 and type(self.rwts) != UniversalRWTSIgnoreEpilogues: - self.rwts = UniversalRWTSIgnoreEpilogues(self.logger) + self.rwts = UniversalRWTSIgnoreEpilogues(self.g) try_again = True continue - self.logger.PrintByID("fail") + self.g.logger.PrintByID("fail") return False self.save_track(track_num, physical_sectors) return True @@ -877,10 +1040,10 @@ class Verify(BasePassportProcessor): def apply_patches(self, logical_sectors, patches): for patch in patches: if patch.id: - self.logger.PrintByID(patch.id, patch.params) + self.g.logger.PrintByID(patch.id, patch.params) def postprocess(self): - self.logger.PrintByID("passver") + self.g.logger.PrintByID("passver") class Crack(Verify): def save_track(self, track_num, physical_sectors): @@ -889,12 +1052,12 @@ class Crack(Verify): def apply_patches(self, logical_sectors, patches): for patch in patches: if patch.id: - self.logger.PrintByID(patch.id, patch.params) + self.g.logger.PrintByID(patch.id, patch.params) if len(patch.new_value) > 0: b = logical_sectors[patch.sector_num].decoded patch.params["old_value"] = b[patch.byte_offset:patch.byte_offset+len(patch.new_value)] patch.params["new_value"] = patch.new_value - self.logger.PrintByID("modify", patch.params) + self.g.logger.PrintByID("modify", patch.params) for i in range(len(patch.new_value)): b[patch.byte_offset + i] = patch.new_value[i] logical_sectors[patch.sector_num].decoded = b @@ -902,7 +1065,7 @@ class Crack(Verify): def postprocess(self): source_base, source_ext = os.path.splitext(self.g.disk_image.filename) output_filename = source_base + '.dsk' - self.logger.PrintByID("writing", {"filename":output_filename}) + self.g.logger.PrintByID("writing", {"filename":output_filename}) with open(output_filename, "wb") as f: for track_num in range(0x23): if track_num in self.output_tracks: @@ -910,9 +1073,9 @@ class Crack(Verify): else: f.write(bytes(256*16)) if self.patches_found: - self.logger.PrintByID("passcrack") + self.g.logger.PrintByID("passcrack") else: - self.logger.PrintByID("passcrack0") + self.g.logger.PrintByID("passcrack0") class EDDToWoz(BasePassportProcessor): def preprocess(self): @@ -934,8 +1097,10 @@ class EDDToWoz(BasePassportProcessor): def postprocess(self): source_base, source_ext = os.path.splitext(self.g.disk_image.filename) output_filename = source_base + '.woz' - self.logger.PrintByID("writing", {"filename":output_filename}) + self.g.logger.PrintByID("writing", {"filename":output_filename}) woz_image = wozimage.WozWriter(STRINGS["header"].strip()) + woz_image.info["cleaned"] = self.g.found_and_cleaned_weakbits + woz_image.info["write_protected"] = self.g.protection_enforces_write_protected woz_image.meta["image_date"] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()) for q in range(1 + (0x23 * 4)): track_num = q / 4 diff --git a/passport/strings.py b/passport/strings.py index 5dc2d6f..df5773d 100644 --- a/passport/strings.py +++ b/passport/strings.py @@ -1,5 +1,5 @@ STRINGS = { - "header": "Passport.py by 4am (2018-05-29)\n", # max 32 characters + "header": "Passport.py by 4am (2018-06-06)\n", # max 32 characters "reading": "Reading from {filename}\n", "diskrwts": "Using disk's own RWTS\n", "bb00": "T00,S05 Found $BB00 protection check\n" diff --git a/passport/wozimage.py b/passport/wozimage.py index f65d4ac..c5cb6a2 100755 --- a/passport/wozimage.py +++ b/passport/wozimage.py @@ -2,35 +2,35 @@ # (c) 2018 by 4am # MIT-licensed -# portions from MIT-licensed defedd.py (c) 2014 by Paul Hagstrom import argparse import binascii import bitarray # https://pypi.org/project/bitarray/ import collections import itertools +import os -__version__ = "0.1" -__date__ = "2018-05-31" +__version__ = "0.2" +__date__ = "2018-06-05" __progname__ = "wozardry" __displayname__ = __progname__ + " " + __version__ + " by 4am (" + __date__ + ")" # domain-specific constants defined in .woz specification -kWOZ1 = b'WOZ1' -kINFO = b'INFO' -kTMAP = b'TMAP' -kTRKS = b'TRKS' -kMETA = b'META' +kWOZ1 = b"WOZ1" +kINFO = b"INFO" +kTMAP = b"TMAP" +kTRKS = b"TRKS" +kMETA = b"META" kBitstreamLengthInBytes = 6646 -kLanguages = ('English','Spanish','French','German','Chinese','Japanese','Italian','Dutch','Portugese','Danish','Finnish','Norwegian','Swedish','Russian','Polish','Turkish','Arabic','Thai','Czech','Hungarian','Catalan','Croatian','Greek','Hebrew','Romanian','Slovak','Ukranian','Indonesian','Malay','Vietnamese','Other') -kRequiresRAM = ('16K','24K','32K','48K','64K','128K','256K','512K','768K','1M','1.25M','1.5M+','Unknown') -kRequiresMachine = ('2','2+','2e','2c','2e+','2gs','2c+','3','3+') +kLanguages = ("English","Spanish","French","German","Chinese","Japanese","Italian","Dutch","Portugese","Danish","Finnish","Norwegian","Swedish","Russian","Polish","Turkish","Arabic","Thai","Czech","Hungarian","Catalan","Croatian","Greek","Hebrew","Romanian","Slovak","Ukranian","Indonesian","Malay","Vietnamese","Other") +kRequiresRAM = ("16K","24K","32K","48K","64K","128K","256K","512K","768K","1M","1.25M","1.5M+","Unknown") +kRequiresMachine = ("2","2+","2e","2c","2e+","2gs","2c+","3","3+") # strings and things, for print routines and error messages sEOF = "Unexpected EOF" sBadChunkSize = "Bad chunk size" -dNoYes = {False:'no',True:'yes'} -tQuarters = ('.00','.25','.50','.75') +dNoYes = {False:"no",True:"yes"} +tQuarters = (".00",".25",".50",".75") # errors that may be raised class WozError(Exception): pass # base class @@ -47,11 +47,13 @@ class WozINFOFormatError_BadDiskType(WozINFOFormatError): pass class WozINFOFormatError_BadWriteProtected(WozINFOFormatError): pass class WozINFOFormatError_BadSynchronized(WozINFOFormatError): pass class WozINFOFormatError_BadCleaned(WozINFOFormatError): pass +class WozINFOFormatError_BadCreator(WozINFOFormatError): pass class WozTMAPFormatError(WozFormatError): pass class WozTMAPFormatError_BadTRKS(WozTMAPFormatError): pass class WozTRKSFormatError(WozFormatError): pass class WozMETAFormatError(WozFormatError): pass class WozMETAFormatError_DuplicateKey(WozFormatError): pass +class WozMETAFormatError_BadValue(WozFormatError): pass class WozMETAFormatError_BadLanguage(WozFormatError): pass class WozMETAFormatError_BadRAM(WozFormatError): pass class WozMETAFormatError_BadMachine(WozFormatError): pass @@ -131,14 +133,70 @@ class DiskImage: # base class """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" return None -class WozReader(DiskImage): +class WozValidator: + def validate_info_version(self, version): + raise_if(version != b'\x01', WozINFOFormatError_BadVersion, "Unknown version (expected 1, found %s)" % version) + + def validate_info_disk_type(self, disk_type): + raise_if(disk_type not in (b'\x01',b'\x02'), WozINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %s)" % disk_type) + + def validate_info_write_protected(self, write_protected): + raise_if(write_protected not in (b'\x00',b'\x01'), WozINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %s)" % write_protected) + + def validate_info_synchronized(self, synchronized): + raise_if(synchronized not in (b'\x00',b'\x01'), WozINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %s)" % synchronized) + + def validate_info_cleaned(self, cleaned): + raise_if(cleaned not in (b'\x00',b'\x01'), WozINFOFormatError_BadCleaned, "Unknown cleaned flag (expected 0 or 1, found %s)" % cleaned) + + def validate_info_creator(self, creator_as_bytes): + raise_if(len(creator_as_bytes) > 32, WozINFOFormatError_BadCreator, "Creator is longer than 32 bytes") + try: + creator_as_bytes.decode("UTF-8") + except: + raise_if(True, WozINFOFormatError_BadCreator, "Creator is not valid UTF-8") + + def encode_info_creator(self, creator_as_string): + creator_as_bytes = creator_as_string.encode("UTF-8").ljust(32, b" ") + self.validate_info_creator(creator_as_bytes) + return creator_as_bytes + + def decode_info_creator(self, creator_as_bytes): + self.validate_info_creator(creator_as_bytes) + return creator_as_bytes.decode("UTF-8").strip() + + def validate_metadata(self, metadata_as_bytes): + try: + metadata = metadata_as_bytes.decode("UTF-8") + except: + raise WozMETAFormatError("Metadata is not valid UTF-8") + + def decode_metadata(self, metadata_as_bytes): + self.validate_metadata(metadata_as_bytes) + return metadata_as_bytes.decode("UTF-8") + + def validate_metadata_value(self, value): + raise_if("\t" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains tab character)") + raise_if("\n" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains linefeed character)") + raise_if("|" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains pipe character)") + + def validate_metadata_language(self, language): + raise_if(language and (language not in kLanguages), WozMETAFormatError_BadLanguage, "Invalid metadata language") + + def validate_metadata_requires_ram(self, requires_ram): + raise_if(requires_ram and (requires_ram not in kRequiresRAM), WozMETAFormatError_BadRAM, "Invalid metadata requires_ram") + + def validate_metadata_requires_machine(self, requires_machine): + raise_if(requires_machine and (requires_machine not in kRequiresMachine), WozMETAFormatError_BadMachine, "Invalid metadata requires_machine") + +class WozReader(DiskImage, WozValidator): def __init__(self, filename=None, stream=None): DiskImage.__init__(self, filename, stream) self.tmap = None self.info = collections.OrderedDict() self.meta = collections.OrderedDict() - with stream or open(filename, 'rb') as f: + with stream or open(filename, "rb") as f: header_raw = f.read(8) raise_if(len(header_raw) != 8, WozEOFError, sEOF) self.__process_header(header_raw) @@ -169,28 +227,25 @@ class WozReader(DiskImage): elif chunk_id == kMETA: self.__process_meta(data) if crc: - raise_if(crc != binascii.crc32(b''.join(all_data)) & 0xffffffff, WozCRCError, "Bad CRC") + raise_if(crc != binascii.crc32(b"".join(all_data)) & 0xffffffff, WozCRCError, "Bad CRC") def __process_header(self, data): raise_if(data[:4] != kWOZ1, WozHeaderError_NoWOZ1, "Magic string 'WOZ1' not present at offset 0") raise_if(data[4] != 0xFF, WozHeaderError_NoFF, "Magic byte 0xFF not present at offset 4") - raise_if(data[5:8] != b'\x0A\x0D\x0A', WozHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5") + raise_if(data[5:8] != b"\x0A\x0D\x0A", WozHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5") def __process_info(self, data): version = data[0] - raise_if(version != 1, WozINFOFormatError_BadVersion, "Unknown version (expected 1, found %d)" % version) + self.validate_info_version(to_uint8(version)) disk_type = data[1] - raise_if(disk_type not in (1,2), WozINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %d)" % disk_type) + self.validate_info_disk_type(to_uint8(disk_type)) write_protected = data[2] - raise_if(write_protected not in (0,1), WozINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %d)" % write_protected) + self.validate_info_write_protected(to_uint8(write_protected)) synchronized = data[3] - raise_if(synchronized not in (0,1), WozINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %d)" % synchronized) + self.validate_info_synchronized(to_uint8(synchronized)) cleaned = data[4] - raise_if(cleaned not in (0,1), WozINFOFormatError_BadCleaned, "Unknown cleaned flag (expected 0 or 1, found %d)" % cleaned) - try: - creator = data[5:37].decode('UTF-8') - except: - raise WOZINFOFormatError("Creator is not valid UTF-8") + self.validate_info_cleaned(to_uint8(cleaned)) + creator = self.decode_info_creator(data[5:37]) self.info["version"] = version # int self.info["disk_type"] = disk_type # int self.info["write_protected"] = (write_protected == 1) # boolean @@ -234,27 +289,21 @@ class WozReader(DiskImage): for trk, i in zip(self.tmap, itertools.count()): raise_if(trk != 0xFF and trk >= len(self.tracks), WozTMAPFormatError_BadTRKS, "Invalid TMAP entry: track %d%s points to non-existent TRKS chunk %d" % (i/4, tQuarters[i%4], trk)) - def __process_meta(self, data): - try: - metadata = data.decode('UTF-8') - except: - raise WozMETAFormatError("Metadata is not valid UTF-8") - for line in metadata.split('\n'): + def __process_meta(self, metadata_as_bytes): + metadata = self.decode_metadata(metadata_as_bytes) + for line in metadata.split("\n"): if not line: continue - columns_raw = line.split('\t') + columns_raw = line.split("\t") raise_if(len(columns_raw) != 2, WozMETAFormatError, "Malformed metadata") key, value_raw = columns_raw raise_if(key in self.meta, WozMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key) values = value_raw.split("|") if key == "language": - for value in values: - raise_if(value and (value not in kLanguages), WozMETAFormatError_BadLanguage, "Invalid metadata language") + list(map(self.validate_metadata_language, values)) elif key == "requires_ram": - for value in values: - raise_if(value and (value not in kRequiresRAM), WozMETAFormatError_BadRAM, "Invalid metadata requires_ram") + list(map(self.validate_metadata_requires_ram, values)) elif key == "requires_machine": - for value in values: - raise_if(value and (value not in kRequiresMachine), WozMETAFormatError_BadMachine, "Invalid metadata requires_machine") + list(map(self.validate_metadata_requires_machine, values)) self.meta[key] = len(values) == 1 and values[0] or tuple(values) def seek(self, track_num): @@ -269,7 +318,7 @@ class WozReader(DiskImage): if trk_id == 0xFF: return None return self.tracks[trk_id] -class WozWriter: +class WozWriter(WozValidator): def __init__(self, creator): self.info = collections.OrderedDict() self.info["version"] = 1 @@ -296,13 +345,24 @@ class WozWriter: chunk = bytearray() chunk.extend(kINFO) # chunk ID chunk.extend(to_uint32(60)) # chunk size (constant) - chunk.extend(to_uint8(self.info["version"])) # version (int, probably 1) - chunk.extend(to_uint8(self.info["disk_type"])) # disk type (1=5.25 inch, 2=3.5 inch) - chunk.extend(to_uint8(self.info["write_protected"])) # write-protected (0=no, 1=yes) - chunk.extend(to_uint8(self.info["synchronized"])) # tracks synchronized (0=no, 1=yes) - chunk.extend(to_uint8(self.info["cleaned"])) # weakbits cleaned (0=no, 1=yes) - chunk.extend(self.info["creator"].encode("UTF-8").ljust(32, b" ")) # creator - chunk.extend(b'\x00' * 23) # reserved + version_raw = to_uint8(self.info["version"]) + self.validate_info_version(version_raw) + disk_type_raw = to_uint8(self.info["disk_type"]) + self.validate_info_disk_type(disk_type_raw) + write_protected_raw = to_uint8(self.info["write_protected"]) + self.validate_info_write_protected(write_protected_raw) + synchronized_raw = to_uint8(self.info["synchronized"]) + self.validate_info_synchronized(synchronized_raw) + cleaned_raw = to_uint8(self.info["cleaned"]) + self.validate_info_cleaned(cleaned_raw) + creator_raw = self.encode_info_creator(self.info["creator"]) + chunk.extend(version_raw) # version (int, probably 1) + chunk.extend(disk_type_raw) # disk type (1=5.25 inch, 2=3.5 inch) + chunk.extend(write_protected_raw) # write-protected (0=no, 1=yes) + chunk.extend(synchronized_raw) # tracks synchronized (0=no, 1=yes) + chunk.extend(cleaned_raw) # weakbits cleaned (0=no, 1=yes) + chunk.extend(creator_raw) # creator + chunk.extend(b"\x00" * 23) # reserved return chunk def build_tmap(self): @@ -320,20 +380,30 @@ class WozWriter: for track in self.tracks: raw_bytes = track.bits.tobytes() chunk.extend(raw_bytes) # bitstream as raw bytes - chunk.extend(b'\x00' * (6646 - len(raw_bytes))) # padding to 6646 bytes + chunk.extend(b"\x00" * (6646 - len(raw_bytes))) # padding to 6646 bytes chunk.extend(to_uint16(len(raw_bytes))) # bytes used chunk.extend(to_uint16(track.bit_count)) # bit count - chunk.extend(b'\xFF\xFF') # splice point (none) - chunk.extend(b'\xFF') # splice nibble (none) - chunk.extend(b'\xFF') # splice bit count (none) - chunk.extend(b'\x00\x00') # reserved + chunk.extend(b"\xFF\xFF") # splice point (none) + chunk.extend(b"\xFF") # splice nibble (none) + chunk.extend(b"\xFF") # splice bit count (none) + chunk.extend(b"\x00\x00") # reserved return chunk def build_meta(self): - if not self.meta: return b'' - data = b'\x0A'.join( + if not self.meta: return b"" + for key, value_raw in self.meta.items(): + if type(value_raw) == str: + values = [value_raw] + list(map(self.validate_metadata_value, values)) + if key == "language": + list(map(self.validate_metadata_language, values)) + elif key == "requires_ram": + list(map(self.validate_metadata_requires_ram, values)) + elif key == "requires_machine": + list(map(self.validate_metadata_requires_machine, values)) + data = b"\x0A".join( [k.encode("UTF-8") + \ - b'\x09' + \ + b"\x09" + \ (type(v) in (list,tuple) and "|".join(v) or v).encode("UTF-8") \ for k, v in self.meta.items()]) chunk = bytearray() @@ -345,7 +415,7 @@ class WozWriter: def build_head(self, crc): chunk = bytearray() chunk.extend(kWOZ1) # magic bytes - chunk.extend(b'\xFF\x0A\x0D\x0A') # more magic bytes + chunk.extend(b"\xFF\x0A\x0D\x0A") # more magic bytes chunk.extend(to_uint32(crc)) # CRC32 of rest of file (calculated in caller) return chunk @@ -443,7 +513,8 @@ class CommandEdit(BaseCommand): help="""change information field. INFO format is "key:value". Acceptable keys are disk_type, write_protected, synchronized, cleaned, creator, version. -Other keys are ignored.""") +Other keys are ignored. +For boolean fields, use "1" or "true" or "yes" for true, "0" or "false" or "no" for false.""") self.parser.add_argument("-m", "--meta", type=str, action="append", help="""change metadata field. META format is "key:value". @@ -461,6 +532,8 @@ requires_machine, notes, side, side_name, contributor, image_date. Other keys ar # add all new info fields for i in args.info or (): k, v = i.split(":", 1) + if k in ("write_protected","synchronized","cleaned"): + v = v.lower() in ("1", "true", "yes") output.info[k] = v # add all new metadata fields for m in args.meta or (): @@ -472,15 +545,19 @@ requires_machine, notes, side, side_name, contributor, image_date. Other keys ar output.meta[k] = v elif k in output.meta.keys(): del output.meta[k] - with open(args.file, 'wb') as f: + tmpfile = args.file + ".ardry" + with open(tmpfile, "wb") as f: output.write(f) + os.rename(tmpfile, args.file) if __name__ == "__main__": + import sys + raise_if = lambda cond, e, s="": cond and sys.exit("%s: %s" % (e.__name__, s)) cmds = [CommandDump(), CommandVerify(), CommandEdit()] parser = argparse.ArgumentParser(prog=__progname__, description="""A multi-purpose tool for manipulating .woz disk images. -See '" + __progname__ + " -h' for help on individual commands.""", +See '""" + __progname__ + """ -h' for help on individual commands.""", formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("-v", "--version", action="version", version=__displayname__) sp = parser.add_subparsers(dest="command", help="command")