From e3b9a097e26c00a44499434f023fcc9abd8bcfd3 Mon Sep 17 00:00:00 2001 From: 4am Date: Mon, 21 May 2018 20:33:43 -0400 Subject: [PATCH] initial commit --- .gitignore | 1 + passport.py | 15 + passport/.gitignore | 1 + passport/__init__.py | 754 +++++++++++++++++++++++++++++++ passport/patchers/__init__.py | 32 ++ passport/patchers/d5d5f7.py | 29 ++ passport/patchers/microfun.py | 12 + passport/patchers/rwts.py | 68 +++ passport/patchers/universale7.py | 15 + passport/strings.py | 140 ++++++ passport/util/__init__.py | 14 + passport/util/find.py | 22 + passport/wozimage.py | 393 ++++++++++++++++ 13 files changed, 1496 insertions(+) create mode 100644 .gitignore create mode 100755 passport.py create mode 100644 passport/.gitignore create mode 100755 passport/__init__.py create mode 100644 passport/patchers/__init__.py create mode 100644 passport/patchers/d5d5f7.py create mode 100644 passport/patchers/microfun.py create mode 100644 passport/patchers/rwts.py create mode 100644 passport/patchers/universale7.py create mode 100644 passport/strings.py create mode 100644 passport/util/__init__.py create mode 100644 passport/util/find.py create mode 100755 passport/wozimage.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/passport.py b/passport.py new file mode 100755 index 0000000..316c959 --- /dev/null +++ b/passport.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 + +from passport import * +import sys + +def opener(filename): + base, ext = os.path.splitext(filename) + ext = ext.lower() + if ext == '.woz': + return wozimage.WozReader(filename) + if ext == '.edd': + return wozimage.EDDReader(filename) + raise RuntimeError("unrecognized file type") + +EDDToWoz(opener(sys.argv[1]), DefaultLogger) diff --git a/passport/.gitignore b/passport/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/passport/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/passport/__init__.py b/passport/__init__.py new file mode 100755 index 0000000..45c0cdc --- /dev/null +++ b/passport/__init__.py @@ -0,0 +1,754 @@ +#!/usr/bin/env python3 + +from passport import wozimage +from passport.patchers import * +from passport.strings import * +from passport.util import * +import bitarray +import collections +import os.path +import sys + +class BaseLogger: # base class + def __init__(self, g): + self.g = g + + def PrintByID(self, id, params = {}): + """prints a predefined string, parameterized with some passed parameters and some globals""" + pass + + def debug(self, s): + pass + + def to_hex_string(self, n): + if type(n) == int: + return hex(n)[2:].rjust(2, "0").upper() + if type(n) in (bytes, bytearray): + return "".join([self.to_hex_string(x) for x in n]) + +SilentLogger = BaseLogger + +class DefaultLogger(BaseLogger): + # logger that writes to sys.stdout + def PrintByID(self, id, params = {}): + p = params.copy() + if "track" not in p: + p["track"] = self.g.track + if "sector" not in params: + p["sector"] = self.g.sector + for k in ("track", "sector", "offset", "old_value", "new_value"): + p[k] = self.to_hex_string(p.get(k, 0)) + sys.stdout.write(STRINGS[id].format(**p)) + +class DebugLogger(DefaultLogger): + # logger that writes to sys.stdout, and writes debug information to sys.stderr + def debug(self, s): + sys.stderr.write(s) + sys.stderr.write("\n") + +class PassportGlobals: + def __init__(self): + # things about the disk + self.is_boot0 = False + self.is_boot1 = False + self.is_master = False + self.is_rwts = False + self.is_dos32 = False + self.is_prodos = False + self.is_dinkeydos = False + self.is_pascal = False + self.is_protdos = False + self.is_daviddos = False + self.is_ea = False + self.possible_gamco = False + self.is_optimum = False + self.is_mecc_fastloader = False + self.is_mecc1 = False + self.is_mecc2 = False + self.is_mecc3 = False + self.is_mecc4 = False + self.possible_d5d5f7 = False + self.is_8b3 = False + self.is_milliken1 = False + self.is_adventure_international = False + self.is_laureate = False + self.is_datasoft = False + self.is_sierra = False + self.is_sierra13 = False + self.is_f7f6 = False + self.is_trillium = False + self.polarware_tamper_check = False + self.force_disk_vol = False + self.captured_disk_volume_number = False + self.disk_volume_number = None + # things about the conversion process + self.tried_univ = False + self.track = 0 + self.sector = 0 + self.last_track = 0 + +class AddressField: + def __init__(self, volume, track_num, sector_num, checksum): + self.volume = volume + self.track_num = track_num + self.sector_num = sector_num + self.checksum = checksum + self.valid = (volume ^ track_num ^ sector_num ^ checksum) == 0 + +class Sector: + def __init__(self, address_field, decoded, start_bit_index=None, end_bit_index=None): + self.address_field = address_field + self.decoded = decoded + self.start_bit_index = start_bit_index + self.end_bit_index = end_bit_index + + def __getitem__(self, i): + return self.decoded[i] + +class RWTS: + kDefaultSectorOrder16 = (0x00, 0x07, 0x0E, 0x06, 0x0D, 0x05, 0x0C, 0x04, 0x0B, 0x03, 0x0A, 0x02, 0x09, 0x01, 0x08, 0x0F) + kDefaultAddressPrologue16 = (0xD5, 0xAA, 0x96) + kDefaultAddressEpilogue16 = (0xDE, 0xAA) + kDefaultDataPrologue16 = (0xD5, 0xAA, 0xAD) + kDefaultDataEpilogue16 = (0xDE, 0xAA) + kDefaultNibbleTranslationTable16 = { + 0x96: 0x00, 0x97: 0x01, 0x9a: 0x02, 0x9b: 0x03, 0x9d: 0x04, 0x9e: 0x05, 0x9f: 0x06, 0xa6: 0x07, + 0xa7: 0x08, 0xab: 0x09, 0xac: 0x0a, 0xad: 0x0b, 0xae: 0x0c, 0xaf: 0x0d, 0xb2: 0x0e, 0xb3: 0x0f, + 0xb4: 0x10, 0xb5: 0x11, 0xb6: 0x12, 0xb7: 0x13, 0xb9: 0x14, 0xba: 0x15, 0xbb: 0x16, 0xbc: 0x17, + 0xbd: 0x18, 0xbe: 0x19, 0xbf: 0x1a, 0xcb: 0x1b, 0xcd: 0x1c, 0xce: 0x1d, 0xcf: 0x1e, 0xd3: 0x1f, + 0xd6: 0x20, 0xd7: 0x21, 0xd9: 0x22, 0xda: 0x23, 0xdb: 0x24, 0xdc: 0x25, 0xdd: 0x26, 0xde: 0x27, + 0xdf: 0x28, 0xe5: 0x29, 0xe6: 0x2a, 0xe7: 0x2b, 0xe9: 0x2c, 0xea: 0x2d, 0xeb: 0x2e, 0xec: 0x2f, + 0xed: 0x30, 0xee: 0x31, 0xef: 0x32, 0xf2: 0x33, 0xf3: 0x34, 0xf4: 0x35, 0xf5: 0x36, 0xf6: 0x37, + 0xf7: 0x38, 0xf9: 0x39, 0xfa: 0x3a, 0xfb: 0x3b, 0xfc: 0x3c, 0xfd: 0x3d, 0xfe: 0x3e, 0xff: 0x3f, + } + + def __init__(self, + sectors_per_track = 16, + address_prologue = kDefaultAddressPrologue16, + address_epilogue = kDefaultAddressEpilogue16, + data_prologue = kDefaultDataPrologue16, + data_epilogue = kDefaultDataEpilogue16, + sector_order = kDefaultSectorOrder16, + nibble_translate_table = kDefaultNibbleTranslationTable16, + logger = None): + self.sectors_per_track = sectors_per_track + self.address_prologue = address_prologue + self.address_epilogue = address_epilogue + self.data_prologue = data_prologue + self.data_epilogue = data_epilogue + self.sector_order = sector_order + self.nibble_translate_table = nibble_translate_table + self.logger = logger or SilentLogger + + def reorder_to_logical_sectors(self, sectors): + logical = {} + for k, v in sectors.items(): + logical[self.sector_order[k]] = v + return logical + + def find_address_prologue(self, track): + return track.find(self.address_prologue) + + def address_field_at_point(self, track): + volume = decode44(next(track.nibble()), next(track.nibble())) + track_num = decode44(next(track.nibble()), next(track.nibble())) + sector_num = decode44(next(track.nibble()), next(track.nibble())) + checksum = decode44(next(track.nibble()), next(track.nibble())) + return AddressField(volume, track_num, sector_num, checksum) + + def verify_nibbles_at_point(self, track, nibbles): + found = [] + for i in nibbles: + found.append(next(track.nibble())) + return tuple(found) == tuple(nibbles) + + def verify_address_epilogue_at_point(self, track): + return self.verify_nibbles_at_point(track, self.address_epilogue) + + def find_data_prologue(self, track): + return track.find(self.data_prologue) + + def data_field_at_point(self, track): + disk_nibbles = [] + for i in range(343): + disk_nibbles.append(next(track.nibble())) + checksum = 0 + secondary = [] + decoded = [] + for i in range(86): + n = disk_nibbles[i] + if n not in self.nibble_translate_table: return None + b = self.nibble_translate_table[n] + if b >= 0x80: return None + checksum ^= b + secondary.insert(0, checksum) + for i in range(86, 342): + n = disk_nibbles[i] + if n not in self.nibble_translate_table: return None + b = self.nibble_translate_table[n] + if b >= 0x80: return None + checksum ^= b + decoded.append(checksum << 2) + n = disk_nibbles[i] + if n not in self.nibble_translate_table: return None + b = self.nibble_translate_table[n] + if b >= 0x80: return None + checksum ^= b + for i in range(86): + low2 = secondary[85 - i] + decoded[i] += (((low2 & 0b000001) << 1) + ((low2 & 0b000010) >> 1)) + decoded[i + 86] += (((low2 & 0b000100) >> 1) + ((low2 & 0b001000) >> 3)) + if i < 84: + decoded[i + 172] += (((low2 & 0b010000) >> 3) + ((low2 & 0b100000) >> 5)) + return bytearray(decoded) + + def verify_data_epilogue_at_point(self, track): + return self.verify_nibbles_at_point(track, self.data_epilogue) + + def decode_track(self, track, burn=0): + sectors = collections.OrderedDict() + if not track: return sectors + starting_revolutions = track.revolutions + verified_sectors = [] + while (len(verified_sectors) < self.sectors_per_track) and \ + (track.revolutions < starting_revolutions + 2): + # store start index within track (used for .edd -> .woz conversion) + start_bit_index = track.bit_index + if not self.find_address_prologue(track): + # if we can't even find a single address prologue, just give up + break + # decode address field + address_field = self.address_field_at_point(track) + if address_field.sector_num in verified_sectors: + # the sector we just found is a sector we've already decoded + # properly, so skip past it + self.logger.debug("duplicate sector %d" % address_field.sector_num) + if self.find_data_prologue(track): + if self.data_field_at_point(track): + self.verify_data_epilogue_at_point(track) + continue + if address_field.sector_num > self.sectors_per_track: + # found a weird sector whose ID is out of range + # TODO: will eventually need to tweak this logic to handle Ultima V and others + self.logger.debug("sector ID out of range %d" % address_field.sector_num) + continue + # put a placeholder for this sector in this position in the ordered dict + # so even if this copy doesn't pan out but a later copy does, sectors + # will still be in the original order + sectors[address_field.sector_num] = None + if not self.verify_address_epilogue_at_point(track): + # verifying the address field epilogue failed, but this is + # not necessarily fatal because there might be another copy + # of this sector later + continue + if not self.find_data_prologue(track): + # if we can't find a data field prologue, just give up + break + # read and decode the data field, and verify the data checksum + decoded = self.data_field_at_point(track) + if not decoded: + self.logger.debug("data_field_at_point failed") +# if DEBUG and address_field.sector_num == 0x0A: +# DEBUG_CACHE.append(track.bits[start_bit_index:track.bit_index]) +# if len(DEBUG_CACHE) == 2: +# import code +# cache = DEBUG_CACHE +# code.interact(local=locals()) + # decoding data field failed, but this is not necessarily fatal + # because there might be another copy of this sector later + continue + if not self.verify_data_epilogue_at_point(track): + # verifying the data field epilogue failed, but this is + # not necessarily fatal because there might be another copy + # of this sector later + self.logger.debug("verify_data_epilogue_at_point failed") + continue + # store end index within track (used for .edd -> .woz conversion) + end_bit_index = track.bit_index + # if the caller told us to burn a certain number of sectors before + # saving the good ones, do it now (used for .edd -> .woz conversion) + if burn: + burn -= 1 + continue + # all good, and we want to save this sector, so do it + sectors[address_field.sector_num] = Sector(address_field, decoded, start_bit_index, end_bit_index) + verified_sectors.append(address_field.sector_num) + # remove placeholders of sectors that we found but couldn't decode properly + # (made slightly more difficult by the fact that we're trying to remove + # elements from an OrderedDict while iterating through the OrderedDict, + # which Python really doesn't want to do) + while None in sectors.values(): + for k in sectors: + if not sectors[k]: + del sectors[k] + break + return sectors + +class UniversalRWTS(RWTS): + acceptable_address_prologues = ((0xD4,0xAA,0x96), (0xD5,0xAA,0x96)) + + def __init__(self, logger): + RWTS.__init__(self, address_epilogue=[], data_epilogue=[], logger=logger) + + def find_address_prologue(self, track): + starting_revolutions = track.revolutions + seen = [0,0,0] + while (track.revolutions < starting_revolutions + 2): + del seen[0] + seen.append(next(track.nibble())) + if tuple(seen) in self.acceptable_address_prologues: return True + return False + + def verify_address_epilogue_at_point(self, track): + return True + if not self.address_epilogue: + self.address_epilogue = [next(track.nibble())] + result = True + else: + result = RWTS.verify_address_epilogue_at_point(self, track) + next(track.nibble()) + next(track.nibble()) + return result + + def verify_data_epilogue_at_point(self, track): + if not self.data_epilogue: + self.data_epilogue = [next(track.nibble())] + result = True + else: + result = RWTS.verify_data_epilogue_at_point(self, track) + next(track.nibble()) + next(track.nibble()) + return result + +class UniversalRWTSIgnoreEpilogues(UniversalRWTS): + def verify_address_epilogue_at_point(self, track): + return True + + def verify_data_epilogue_at_point(self, track): + return True + +class DOS33RWTS(RWTS): + def __init__(self, logical_sectors, logger): + address_prologue = (logical_sectors[3][0x55], + logical_sectors[3][0x5F], + logical_sectors[3][0x6A]) + address_epilogue = (logical_sectors[3][0x91], + logical_sectors[3][0x9B]) + data_prologue = (logical_sectors[2][0xE7], + logical_sectors[2][0xF1], + logical_sectors[2][0xFC]) + data_epilogue = (logical_sectors[3][0x35], + logical_sectors[3][0x3F]) + nibble_translate_table = {} + for nibble in range(0x96, 0x100): + nibble_translate_table[nibble] = logical_sectors[4][nibble] + RWTS.__init__(self, + sectors_per_track=16, + address_prologue=address_prologue, + address_epilogue=address_epilogue, + data_prologue=data_prologue, + data_epilogue=data_epilogue, + nibble_translate_table=nibble_translate_table, + logger=logger) + +class BasePassportProcessor: # base class + def __init__(self, disk_image, logger_class=DefaultLogger): + self.g = PassportGlobals() + self.g.disk_image = disk_image + self.logger = logger_class(self.g) + self.output_tracks = {} + self.patchers = [] + self.patches_found = [] + self.patch_count = 0 # number of patches found across all tracks + self.patcher_classes = [ + #SunburstPatcher, + #JMPBCF0Patcher, + #JMPBEB1Patcher, + #JMPBECAPatcher, + #JMPB660Patcher, + #JMPB720Patcher, + #BadEmuPatcher, + #BadEmu2Patcher, + rwts.RWTSPatcher, + #RWTSLogPatcher, + #MECC1Patcher, + #MECC2Patcher, + #MECC3Patcher, + #MECC4Patcher, + #ROL1EPatcher, + #JSRBB03Patcher, + #DavidBB03Patcher, + #RWTSSwapPatcher, + #RWTSSwap2Patcher, + #BorderPatcher, + #JMPAE8EPatcher, + #JMPBBFEPatcher, + #DatasoftPatcher, + #NibTablePatcher, + #DiskVolPatcher, + #C9FFPatcher, + #MillikenPatcher, + #MethodsPatcher, + #JSR8B3Patcher, + #LaureatePatcher, + #PascalRWTSPatcher, + #MicrogramsPatcher, + #DOS32Patcher, + #DOS32DLMPatcher, + microfun.MicrofunPatcher, + #T11DiskVolPatcher, + #T02VolumeNamePatcher, + universale7.UniversalE7Patcher, + #A6BC95Patcher, + #A5CountPatcher, + d5d5f7.D5D5F7Patcher, + #ProDOSRWTSPatcher, + #ProDOS6APatcher, + #ProDOSMECCPatcher, + #BBF9Patcher, + #MemoryConfigPatcher, + #OriginPatcher, + #RWTSSwapMECCPatcher, + #ProtectedDOSPatcher, + #FBFFPatcher, + #FBFFEncryptedPatcher, + #PolarwarePatcher, + #SierraPatcher, + #CorrupterPatcher, + #EAPatcher, + #GamcoPatcher, + #OptimumPatcher, + #BootCounterPatcher, + #JMPB412Patcher, + #JMPB400Patcher, + #AdvIntPatcher, + #JSR8635Patcher, + #JMPB4BBPatcher, + #DOS32MUSEPatcher, + #SRAPatcher, + #Sierra13Patcher, + #SSPROTPatcher, + #F7F6Patcher, + #TrilliumPatcher, + ] + self.burn = 0 + if self.preprocess(): + if self.run(): + self.postprocess() + + def SkipTrack(self, rwts, track_num, track): + # don't look for whole-track protections on track 0, that's silly + if track_num == 0: return False + # Electronic Arts protection track? + if track_num == 6: + if rwts.find_address_prologue(track): + address_field = rwts.address_field_at_point(track) + if address_field and address_field.track_num == 5: return True + # Nibble count track? + repeated_nibble_count = 0 + start_revolutions = track.revolutions + last_nibble = 0x00 + while (repeated_nibble_count < 512 and track.revolutions < start_revolutions + 2): + n = next(track.nibble()) + if n == last_nibble: + repeated_nibble_count += 1 + else: + repeated_nibble_count = 0 + last_nibble = n + if repeated_nibble_count == 512: + self.logger.PrintByID("sync") + return True + # TODO IsUnformatted and other tests + return False + + def IDDiversi(self, t00s00): + """returns True if T00S00 is Diversi-DOS bootloader, or False otherwise""" + return find.at(0xF1, t00s00, + b'\xB3\xA3\xA0\xD2\xCF\xD2\xD2\xC5' + b'\x8D\x87\x8D') + + def IDDOS33(self, t00s00): + """returns True if T00S00 is DOS bootloader or some variation + that can be safely boot traced, or False otherwise""" + # Code at $0801 must be standard (with one exception) + if not find.wild_at(0x00, t00s00, + b'\x01' + b'\xA5\x27' + b'\xC9\x09' + b'\xD0\x18' + b'\xA5\x2B' + b'\x4A' + b'\x4A' + b'\x4A' + b'\x4A' + b'\x09\xC0' + b'\x85\x3F' + b'\xA9\x5C' + b'\x85\x3E' + b'\x18' + b'\xAD\xFE\x08' + b'\x6D\xFF\x08' + \ + find.WILDCARD + find.WILDCARD + find.WILDCARD + \ + b'\xAE\xFF\x08' + b'\x30\x15' + b'\xBD\x4D\x08' + b'\x85\x3D' + b'\xCE\xFF\x08' + b'\xAD\xFE\x08' + b'\x85\x27' + b'\xCE\xFE\x08' + b'\xA6\x2B' + b'\x6C\x3E\x00' + b'\xEE\xFE\x08' + b'\xEE\xFE\x08'): return False + # DOS 3.3 has JSR $FE89 / JSR $FE93 / JSR $FB2F + # some Sierra have STA $C050 / STA $C057 / STA $C055 instead + # with the unpleasant side-effect of showing text-mode garbage + # if mixed-mode was enabled at the time + if not find.at(0x3F, t00s00, + b'\x20\x89\xFE' + b'\x20\x93\xFE' + b'\x20\x2F\xFB' + b'\xA6\x2B'): + if not find.at(0x3F, t00s00, + b'\x8D\x50\xC0' + b'\x8D\x57\xC0' + b'\x8D\x55\xC0' + b'\xA6\x2B'): return False + # Sector order map must be standard (no exceptions) + if not find.at(0x4D, t00s00, + b'\x00\x0D\x0B\x09\x07\x05\x03\x01' + b'\x0E\x0C\x0A\x08\x06\x04\x02\x0F'): return False + # standard code at $081C -> success & done + if find.at(0x1C, t00s00, + b'\x8D\xFE\x08'): return True + + # Minor variant (e.g. Terrapin Logo 3.0) jumps to $08F0 and back + # but is still safe to trace. Check for this jump and match + # the code at $08F0 exactly. + # unknown code at $081C -> failure + if not find.at(0x1C, t00s00, + b'\x4C\xF0\x08'): return False + # unknown code at $08F0 -> failure, otherwise success & done + return find.at(0xF0, t00s00, + b'\x8D\xFE\x08' + b'\xEE\xF3\x03' + b'\x4C\x1F\x08') + + def IDPronto(self, t00s00): + """returns True if T00S00 is Pronto-DOS bootloader, or False otherwise""" + return find.at(0x5E, t00s00, + b'\xB0\x50' + b'\xAD\xCB\xB5' + b'\x85\x42') + + def IDBootloader(self, t00): + """returns RWTS object that can (hopefully) read the rest of the disk""" + rwts = UniversalRWTSIgnoreEpilogues(self.logger) + physical_sectors = rwts.decode_track(t00) + if 0 not in physical_sectors: + self.logger.PrintByID("fatal0000") + return None + t00s00 = physical_sectors[0] + + if self.IDDOS33(t00s00): + self.g.is_boot0 = True + if self.IDDiversi(t00s00): + self.logger.PrintByID("diversidos") + elif self.IDPronto(t00s00): + self.logger.PrintByID("prontodos") + else: + self.logger.PrintByID("dos33boot0") + # TODO handle JSR08B3 here + rwts = self.TraceDOS33(rwts.reorder_to_logical_sectors(physical_sectors), rwts) + else: + self.logger.PrintByID("builtin") + self.g.tried_univ = True + rwts = UniversalRWTS(self.logger) + return rwts + + def TraceDOS33(self, logical_sectors, rwts): + """returns RWTS object""" + + use_builtin = False + # check that all the sectors of the RWTS were actually readable + for i in range(1, 10): + if i not in logical_sectors: + use_builtin = True + break + + # TODO handle Protected.DOS here + + if not use_builtin: + # check for "STY $48;STA $49" at RWTS entry point ($BD00) + use_builtin = not find.at(0x00, logical_sectors[7], b'\x84\x48\x85\x49') + if not use_builtin: + # check for "SEC;RTS" at $B942 + use_builtin = not find.at(0x42, logical_sectors[3], b'\x38\x60') + if not use_builtin: + # check for "LDA $C08C,X" at $B94F + use_builtin = not find.at(0x4F, logical_sectors[3], b'\xBD\x8C\xC0') + if not use_builtin: + # check for "JSR $xx00" at $BDB9 + use_builtin = not find.at(0xB9, logical_sectors[7], b'\x20\x00') + if not use_builtin: + # check for RWTS variant that has extra code before + # JSR $B800 e.g. Verb Viper (DLM), Advanced Analogies (Hartley) + use_builtin = find.at(0xC5, logical_sectors[7], b'\x20\x00') + if not use_builtin: + # check for RWTS variant that uses non-standard address for slot + # LDX $1FE8 e.g. Pinball Construction Set (1983) + use_builtin = find.at(0x43, logical_sectors[8], b'\xAE\xE8\x1F') + + # TODO handle Milliken here + # TODO handle Adventure International here + # TODO handle Infocom here + + if use_builtin: + self.logger.PrintByID("builtin") + return rwts + + self.logger.PrintByID("diskrwts") + self.g.is_rwts = True + return DOS33RWTS(logical_sectors, self.logger) + + def preprocess(self): + return True + + def run(self): + self.logger.PrintByID("header") + self.logger.PrintByID("reading", {"filename":self.g.disk_image.filename}) + + # get all raw track data from the source disk + self.tracks = {} + for track_num in range(0x23): + self.tracks[float(track_num)] = self.g.disk_image.seek(float(track_num)) + + # analyze track $00 to create an RWTS + rwts = self.IDBootloader(self.tracks[0]) + if not rwts: return False + + # initialize all patchers + for P in self.patcher_classes: + self.patchers.append(P(self.g)) + + # main loop - loop through disk from track $22 down to track $00 + for track_num in range(0x22, -1, -1): + if track_num == 0 and self.g.tried_univ: + rwts = UniversalRWTSIgnoreEpilogues(self.logger) + should_run_patchers = False + self.g.track = track_num + physical_sectors = rwts.decode_track(self.tracks[track_num], self.burn) + if 0x0F not in physical_sectors: + if self.SkipTrack(rwts, track_num, self.tracks[track_num]): + self.save_track(rwts, track_num, None) + continue + if len(physical_sectors) < rwts.sectors_per_track: + # TODO wrong in case where we switch mid-track. + # Need to save the sectors that worked with the original RWTS + # then append the ones that worked with the universal RWTS + if self.g.tried_univ: + self.logger.PrintByID("fail") + return False + self.logger.PrintByID("switch", {"sector":0x0F}) # TODO find exact sector + rwts = UniversalRWTS(self.logger) + self.g.tried_univ = True + physical_sectors = rwts.decode_track(self.tracks[track_num], self.burn) + if len(physical_sectors) < rwts.sectors_per_track: + self.logger.PrintByID("fail") # TODO find exact sector + return False + self.save_track(rwts, track_num, physical_sectors) + return True + + def save_track(self, rwts, track_num, physical_sectors): + pass + + def apply_patches(self, logical_sectors, patches): + pass + +class Verify(BasePassportProcessor): + def save_track(self, rwts, track_num, physical_sectors): + if not physical_sectors: return {} + logical_sectors = rwts.reorder_to_logical_sectors(physical_sectors) + should_run_patchers = (len(physical_sectors) == 16) # TODO + if should_run_patchers: + for patcher in self.patchers: + if patcher.should_run(track_num): + patches = patcher.run(logical_sectors, track_num) + if patches: + self.apply_patches(logical_sectors, patches) + self.patches_found.extend(patches) + return logical_sectors + + def apply_patches(self, logical_sectors, patches): + for patch in patches: + if patch.id: + self.logger.PrintByID(patch.id, patch.params) + + def postprocess(self): + self.logger.PrintByID("passver") + +class Crack(Verify): + def save_track(self, rwts, track_num, physical_sectors): + self.output_tracks[float(track_num)] = Verify.save_track(self, rwts, track_num, physical_sectors) + + def apply_patches(self, logical_sectors, patches): + for patch in patches: + if patch.id: + self.logger.PrintByID(patch.id, patch.params) + if len(patch.new_value) > 0: + b = logical_sectors[patch.sector_num].decoded + patch.params["old_value"] = b[patch.byte_offset:patch.byte_offset+len(patch.new_value)] + patch.params["new_value"] = patch.new_value + self.logger.PrintByID("modify", patch.params) + for i in range(len(patch.new_value)): + b[patch.byte_offset + i] = patch.new_value[i] + logical_sectors[patch.sector_num].decoded = b + + def postprocess(self): + source_base, source_ext = os.path.splitext(self.g.disk_image.filename) + output_filename = source_base + '.dsk' + self.logger.PrintByID("writing", {"filename":output_filename}) + with open(output_filename, "wb") as f: + for track_num in range(0x23): + if track_num in self.output_tracks: + f.write(concat_track(self.output_tracks[track_num])) + else: + f.write(bytes(256*16)) + if self.patches_found: + self.logger.PrintByID("passcrack") + else: + self.logger.PrintByID("passcrack0") + +class EDDToWoz(BasePassportProcessor): + def preprocess(self): + self.burn = 2 + return True + + def save_track(self, rwts, track_num, physical_sectors): + track_num = float(track_num) + track = self.tracks[track_num] + if physical_sectors: + b = bitarray.bitarray(endian="big") + for s in physical_sectors.values(): + b.extend(track.bits[s.start_bit_index:s.end_bit_index]) + else: + b = track.bits[:51021] + self.output_tracks[track_num] = wozimage.Track(b, len(b)) + + def postprocess(self): + source_base, source_ext = os.path.splitext(self.g.disk_image.filename) + output_filename = source_base + '.woz' + self.logger.PrintByID("writing", {"filename":output_filename}) + woz_image = wozimage.WozWriter(STRINGS["header"].strip()) + for q in range(1 + (0x23 * 4)): + track_num = q / 4 + if track_num in self.output_tracks: + woz_image.add_track(track_num, self.output_tracks[track_num]) + with open(output_filename, 'wb') as f: + woz_image.write(f) + try: + wozimage.WozReader(output_filename) + except Exception as e: + os.remove(output_filename) + raise Exception from e diff --git a/passport/patchers/__init__.py b/passport/patchers/__init__.py new file mode 100644 index 0000000..93908e1 --- /dev/null +++ b/passport/patchers/__init__.py @@ -0,0 +1,32 @@ +__all__ = [ + "d5d5f7", + "microfun", + "rwts", + "universale7", + ] + +class Patch: + # represents a single patch that could be applied to a disk image + def __init__(self, track_num, sector_num, byte_offset, new_value, id=None, params={}): + self.track_num = track_num + self.sector_num = sector_num + self.byte_offset = byte_offset + self.new_value = new_value # (can be 0-length bytearray if this "patch" is really just an informational message with no changes) + self.id = id # for logger.PrintByID (can be None) + self.params = params.copy() + self.params["track"] = track_num + self.params["sector"] = sector_num + self.params["offset"] = byte_offset + +class Patcher: # base class + def __init__(self, g): + self.g = g + + def should_run(self, track_num): + """returns True if this patcher applies to the given track in the current process (possibly affected by state in self.g), or False otherwise""" + return False + + def run(self, logical_sectors, track_num): + """returns list of Patch objects representing patches that could be applied to logical_sectors""" + return [] + diff --git a/passport/patchers/d5d5f7.py b/passport/patchers/d5d5f7.py new file mode 100644 index 0000000..4f01d1a --- /dev/null +++ b/passport/patchers/d5d5f7.py @@ -0,0 +1,29 @@ +from passport.patchers import Patch, Patcher +from passport.util import * + +class D5D5F7Patcher(Patcher): + def should_run(self, track_num): + # TODO + return True + + def run(self, logical_sectors, track_num): + offset = find.wild(concat_track(logical_sectors), + b'\xBD\x8C\xC0' + b'\x10\xFB' + b'\x48' + b'\x68' + b'\xC9\xD5' + b'\xD0\xF5' + b'\xA0\x00' + \ + b'\x8C' + find.WILDCARD + find.WILDCARD + \ + b'\xBD\x8C\xC0' + b'\x10\xFB' + b'\xC9\xD5' + b'\xF0\x0F' + b'\xC9\xF7' + b'\xD0\x01' + b'\xC8' + b'\x18' + b'\x6D') + if offset == -1: return [] + return [Patch(track_num, offset // 256, offset % 256, b'\x60', "d5d5f7")] diff --git a/passport/patchers/microfun.py b/passport/patchers/microfun.py new file mode 100644 index 0000000..9b92bb8 --- /dev/null +++ b/passport/patchers/microfun.py @@ -0,0 +1,12 @@ +from passport.patchers import Patch, Patcher +from passport.util import * + +class MicrofunPatcher(Patcher): + def should_run(self, track_num): + return self.g.is_rwts and (track_num == 0) + + def run(self, logical_sectors, track_num): + offset = find.wild(concat_track(logical_sectors), + b'\xA0\x00\x84\x26\x84\x27\xBD\x8C\xC0') + if offset == -1: return [] + return [Patch(track_num, offset // 256, offset % 256, b'\x18\x60', "microfun")] diff --git a/passport/patchers/rwts.py b/passport/patchers/rwts.py new file mode 100644 index 0000000..57a3eff --- /dev/null +++ b/passport/patchers/rwts.py @@ -0,0 +1,68 @@ +from passport.patchers import Patch, Patcher +from passport.util import * + +class RWTSPatcher(Patcher): + def should_run(self, track_num): + return self.g.is_rwts and (track_num == 0) + + def run(self, logical_sectors, track_num): + patches = [] + lda_bpl = b'\xBD\x8C\xC0\x10\xFB' + lda_bpl_cmp = lda_bpl + b'\xC9' + find.WILDCARD + lda_bpl_eor = lda_bpl + b'\x49' + find.WILDCARD + lda_jsr = b'\xA9' + find.WILDCARD + b'\x20' + lda_jsr_d5 = lda_jsr + b'\xD5' + lda_jsr_b8 = lda_jsr + b'\xB8' + for a, b, c, d, e in ( + # address prologue byte 1 (read) + (0x55, 3, b'\xD5', 0x4F, lda_bpl_cmp + b'\xD0\xF0\xEA'), + # address prologue byte 2 (read) + (0x5F, 3, b'\xAA', 0x59, lda_bpl_cmp + b'\xD0\xF2\xA0\x03'), + # address prologue byte 3 (read) + (0x6A, 3, b'\x96', 0x64, lda_bpl_cmp + b'\xD0\xE7'), + # address epilogue byte 1 (read) + (0x91, 3, b'\xDE', 0x8B, lda_bpl_cmp + b'\xD0\xAE'), + # address epilogue byte 2 (read) + (0x9B, 3, b'\xAA', 0x95, lda_bpl_cmp + b'\xD0\xA4\x18'), + # data prologue byte 1 (read) + (0xE7, 2, b'\xD5', 0xE1, lda_bpl_eor + b'\xD0\xF4\xEA'), + # data prologue byte 2 (read) + (0xF1, 2, b'\xAA', 0xEB, lda_bpl_cmp + b'\xD0\xF2\xA0\x56'), + # data prologue byte 3 (read) + (0xFC, 2, b'\xAD', 0xF6, lda_bpl_cmp + b'\xD0\xE7'), + # data epilogue byte 1 (read) + (0x35, 3, b'\xDE', 0x2F, lda_bpl_cmp + b'\xD0\x0A\xEA'), + # data epilogue byte 2 (read) + (0x3F, 3, b'\xAA', 0x39, lda_bpl_cmp + b'\xF0\x5C\x38'), + # address prologue byte 1 (write) + (0x7A, 6, b'\xD5', 0x79, lda_jsr_d5), + # address prologue byte 2 (write) + (0x7F, 6, b'\xAA', 0x7E, lda_jsr_d5), + # address prologue byte 3 (write) + (0x84, 6, b'\x96', 0x83, lda_jsr_d5), + # address epilogue byte 1 (write) + (0xAE, 6, b'\xDE', 0xAD, lda_jsr_d5), + # address epilogue byte 2 (write) + (0xB3, 6, b'\xAA', 0xB2, lda_jsr_d5), + # address epilogue byte 3 (write) + (0xB8, 6, b'\xEB', 0xB7, lda_jsr_d5), + # data prologue byte 1 (write) + (0x53, 2, b'\xD5', 0x52, lda_jsr_b8), + # data prologue byte 2 (write) + (0x58, 2, b'\xAA', 0x57, lda_jsr_b8), + # data prologue byte 3 (write) + (0x5D, 2, b'\xAD', 0x5C, lda_jsr_b8), + # data epilogue byte 1 (write) + (0x9E, 2, b'\xDE', 0x9D, lda_jsr_b8), + # data epilogue byte 2 (write) + (0xA3, 2, b'\xAA', 0xA2, lda_jsr_b8), + # data epilogue byte 3 (write) + (0xA8, 2, b'\xEB', 0xA7, lda_jsr_b8), + # data epilogue byte 4 (write) + # needed by some Sunburst disks + (0xAD, 2, b'\xFF', 0xAC, lda_jsr_b8), + ): + if not find.at(a, logical_sectors[b], c) and \ + find.wild_at(d, logical_sectors[b], e): + patches.append(Patch(0, b, a, c)) + return patches diff --git a/passport/patchers/universale7.py b/passport/patchers/universale7.py new file mode 100644 index 0000000..c3d4ddb --- /dev/null +++ b/passport/patchers/universale7.py @@ -0,0 +1,15 @@ +from passport.patchers import Patch, Patcher +from passport.util import * + +class UniversalE7Patcher(Patcher): + e7sector = b'\x00'*0xA0 + b'\xAC\x00'*0x30 + + def should_run(self, track_num): + return True + + def run(self, logical_sectors, track_num): + patches = [] + for sector_num in logical_sectors: + if find.at(0x00, logical_sectors[sector_num], self.e7sector): + patches.append(Patch(track_num, sector_num, 0xA3, b'\x64\xB4\x44\x80\x2C\xDC\x18\xB4\x44\x80\x44\xB4', "e7")) + return patches diff --git a/passport/strings.py b/passport/strings.py new file mode 100644 index 0000000..92c55fc --- /dev/null +++ b/passport/strings.py @@ -0,0 +1,140 @@ +STRINGS = { + "header": "Passport.py by 4am (2018-05-21)\n", # max 32 characters + "reading": "Reading from {filename}\n", + "diskrwts": "Using disk's own RWTS\n", + "bb00": "T00,S05 Found $BB00 protection check\n" + "T00,S0A might be unreadable\n", + "sunburst": "T00,S04 Found Sunburst disk\n" + "T11,S0F might be unreadable\n", + "optimum": "T00,S00 Found Optimum Resource disk\n" + "T01,S0F might be unreadable\n", + "builtin": "Using built-in RWTS\n", + "switch": "T{track},S{sector} Switching to built-in RWTS\n", + "writing": "Writing to {filename}\n", + "unformat": "T{track} is unformatted\n", + "f7": "T{track} Found $F7F6EFEEAB protection track\n", + "sync": "T{track} Found nibble count protection track\n", + "optbad": "T{track},S{sector} is unreadable (ignoring)\n", + "passver": "Verification complete. The disk is good.\n", + "passdemuf": "Demuffin complete.\n", + "passcrack": "Crack complete.\n", + "passcrack0": "\n" + "The disk was copied successfully, but\n" + "Passport did not apply any patches.\n\n" + "Possible reasons:\n" + "- The source disk is not copy protected.\n" + "- The target disk works without patches.\n" + "- The disk uses an unknown protection,\n" + " and Passport can not help any further.\n", + "fail": "\n" + "T{track},S{sector} Fatal read error\n\n", + "fatal0000": "\n" + "Possible reasons:\n" + "- The source file does not exist.\n" + "- This is not an Apple ][ disk.\n" + "- The disk is 13-sector only.\n" + "- The disk is unformatted.\n\n", + "fatal220f": "\n" + "Passport does not work on this disk.\n\n" + "Possible reasons:\n" + "- This is not a 13- or 16-sector disk.\n" + "- The disk modifies its RWTS in ways\n" + " that Passport is not able to detect.\n\n", + "modify": "T{track},S{sector},${offset}: {old_value} -> {new_value}\n", + "dos33boot0": "T00,S00 Found DOS 3.3 bootloader\n", + "dos32boot0": "T00,S00 Found DOS 3.2 bootloader\n", + "prodosboot0": "T00,S00 Found ProDOS bootloader\n", + "pascalboot0": "T00,S00 Found Pascal bootloader\n", + "mecc": "T00,S00 Found MECC bootloader\n", + "sierra": "T{track},S{sector} Found Sierra protection check\n", + "a6bc95": "T{track},S{sector} Found A6BC95 protection check\n", + "jmpbcf0": "T00,S03 RWTS requires a timing bit after\n" + "the first data epilogue by jumping to\n" + "$BCF0.\n", + "rol1e": "T00,S03 RWTS accumulates timing bits in\n" + "$1E and checks its value later.\n", + "runhello": "T{track},S{sector} Startup program executes a\n" + "protection check before running the real\n" + "startup program.\n", + "e7": "T{track},S{sector} Found E7 bitstream\n", + "jmpb4bb": "T{track},S{sector} Disk calls a protection check at\n" + "$B4BB before initializing DOS.\n", + "jmpb400": "T{track},S{sector} Disk calls a protection check at\n" + "$B400 before initializing DOS.\n", + "jmpbeca": "T00,S02 RWTS requires extra nibbles and\n" + "timing bits after the data prologue by\n" + "jumping to $BECA.\n", + "jsrbb03": "T00,S05 Found a self-decrypting\n" + "protection check at $BB03.\n", + "thunder": "T00,S03 RWTS counts timing bits and\n" + "checks them later.\n", + "jmpae8e": "T00,S0D Disk calls a protection check at\n" + "$AE8E after initializing DOS.\n", + "diskvol": "T00,S08 RWTS requires a non-standard\n" + "disk volume number.\n", + "d5d5f7": "T{track},S{sector} Found D5D5F7 protection check\n", + "construct": "T01,S0F Reconstructing missing data\n", + "datasoftb0": "T00,S00 Found Datasoft bootloader\n", + "datasoft": "T{track},S{sector} Found Datasoft protection check\n", + "lsr6a": "T{track},S{sector} RWTS accepts $D4 or $D5 for the\n" + "first address prologue nibble.\n", + "bcs08": "T{track},S{sector} RWTS accepts $DE or a timing bit\n" + "for the first address epilogue nibble.\n", + "jmpb660": "T00,S02 RWTS requires timing bits after\n" + "the data prologue by jumping to $B660.\n", + "protdos": "T00,S01 Found encrypted RWTS, key=${key}\n", + "protdosw": "T00 Decrypting RWTS before writing\n", + "protserial": "T{track},S{sector} Erasing serial number {serial}\n", + "fbff": "T{track},S{sector} Found FBFF protection check\n", + "encoded44": "\n" + "T00,S00 Fatal error\n\n" + "Passport does not work on this disk,\n" + "because it uses a 4-and-4 encoding.\n", + "encoded53": "\n" + "T00,S00 Fatal error\n\n" + "Passport does not work on this disk,\n" + "because it uses a 5-and-3 encoding.\n", + "specdel": "T00,S00 Found DOS 3.3P bootloader\n", + "bytrack": "T{track},S{sector} RWTS changes based on track\n", + "a5count": "T{track},S{sector} Found A5 nibble count\n", + "restart": "Restarting scan\n", + "corrupter": "T13,S0E Protection check intentionally\n" + "destroys unauthorized copies\n", + "eaboot0": "T00 Found Electronic Arts bootloader\n", + "eatrk6": "T06 Found EA protection track\n", + "poke": "T{track},S{sector} BASIC program POKEs protection\n" + "check into memory and CALLs it.\n", + "bootcounter": "T{track},S{sector} Original disk destroys itself\n" + "after a limited number of boots.\n", + "milliken": "T00,S0A Found Milliken protection check\n" + "T02,S05 might be unreadable\n", + "jsr8b3": "T00,S00 Found JSR $08B3 bootloader\n", + "daviddos": "T00,S00 Found David-DOS bootloader\n", + "quickdos": "T00,S00 Found Quick-DOS bootloader\n", + "diversidos": "T00,S00 Found Diversi-DOS bootloader\n", + "prontodos": "T00,S00 Found Pronto-DOS bootloader\n", + "jmpb412": "T02,S00 Disk calls a protection check\n" + "at $B412 before initializing DOS.\n", + "laureate": "T00,S00 Found Laureate bootloader\n", + "bbf9": "T{track},S{sector} Found BBF9 protection check\n", + "micrograms": "T00,S00 Found Micrograms bootloader\n", + "cmpbne0": "T{track},S{sector} RWTS accepts any value for the\n" + "first address epilogue nibble.\n", + "d5timing": "T{track},S{sector} RWTS accepts $D5 plus a timing\n" + "bit as the entire address prologue.\n", + "advint": "T{track},S{sector} Found Adventure International\n" + "protection check\n", + "bootwrite": "T00,S00 Writing Standard Delivery\n" + "bootloader\n", + "rwtswrite": "T00,S02 Writing built-in RWTS\n", + "rdos": "T00,S00 Found RDOS bootloader\n", + "sra": "T{track},S{sector} Found SRA protection check\n", + "muse": "T00,S08 RWTS doubles every sector ID\n", + "origin": "T{track},S{sector} RWTS alters the sector ID if the\n" + "address epilogue contains a timing bit.\n", + "volumename": "T{track},S{sector} Volume name is ", # no \n + "dinkeydos": "T00,S0B Found Dinkey-DOS\n", + "trillium": "T{track},S{sector} Found Trillium protection check\n", + "tamper": "T{track},S{sector} Found anti-tamper check\n", + "microfun": "T{track},S{sector} Found Micro Fun protection check\n", +} diff --git a/passport/util/__init__.py b/passport/util/__init__.py new file mode 100644 index 0000000..1ff9829 --- /dev/null +++ b/passport/util/__init__.py @@ -0,0 +1,14 @@ +__all__ = ["find", "decode44", "concat_track"] + +def decode44(n1, n2): + return ((n1 << 1) + 1) & n2 + +def concat_track(logical_sectors): + """returns a single bytes object containing all data from logical_sectors dict, in order""" + data = [] + for i in range(16): + if i in logical_sectors: + data.append(logical_sectors[i].decoded) + else: + data.append(bytearray(256)) + return b''.join(data) diff --git a/passport/util/find.py b/passport/util/find.py new file mode 100644 index 0000000..3b2404a --- /dev/null +++ b/passport/util/find.py @@ -0,0 +1,22 @@ +WILDCARD = b'\x97' + +def wild(source_bytes, search_bytes): + """Search source_bytes (bytes object) for the first instance of search_bytes (bytes_object). search_bytes may contain a wildcard that matches any byte, like '.' in a regular expression. Returns index of first match or -1, like string find() method.""" + ranges = search_bytes.split(WILDCARD) + first_index = last_index = source_bytes.find(ranges[0]) + if first_index == -1: return -1 + last_index += len(ranges[0]) + for search_range in ranges[1:]: + last_index += 1 + if not search_range: continue + if source_bytes[last_index:last_index + len(search_range)] != search_range: return -1 + last_index += len(search_range) + return first_index + +def wild_at(offset, source_bytes, search_bytes): + """returns True if the search_bytes was found in source_bytes at offset (search_bytes may include wildcards), otherwise False""" + return wild(source_bytes[offset:], search_bytes) == 0 + +def at(offset, source_bytes, search_bytes): + """returns True if the exact bytes search_bytes was found in source_bytes at offset (no wildcards), otherwise False""" + return source_bytes[offset:offset+len(search_bytes)] == search_bytes diff --git a/passport/wozimage.py b/passport/wozimage.py new file mode 100755 index 0000000..7ae261d --- /dev/null +++ b/passport/wozimage.py @@ -0,0 +1,393 @@ +#!/usr/bin/env python3 + +# (c) 2018 by 4am +# MIT-licensed +# portions from MIT-licensed defedd.py (c) 2014 by Paul Hagstrom + +import binascii +import bitarray # https://pypi.org/project/bitarray/ +import collections +import itertools +import sys + +# domain-specific constants defined in .woz specification +kWOZ1 = b'WOZ1' +kINFO = b'INFO' +kTMAP = b'TMAP' +kTRKS = b'TRKS' +kMETA = b'META' +kBitstreamLengthInBytes = 6646 +kLanguages = ('English','Spanish','French','German','Chinese','Japanese','Italian','Dutch','Portugese','Danish','Finnish','Norwegian','Swedish','Russian','Polish','Turkish','Arabic','Thai','Czech','Hungarian','Catalan','Croatian','Greek','Hebrew','Romanian','Slovak','Ukranian','Indonesian','Malay','Vietnamese','Other') +kRequiresRAM = ('16K','24K','32K','48K','64K','128K','256K','512K','768K','1M','1.25M','1.5M+','Unknown') +kRequiresMachine = ('2','2+','2e','2c','2e+','2gs','2c+','3','3+') + +# strings and things, for print routines and error messages +sEOF = "Unexpected EOF" +sBadChunkSize = "Bad chunk size" +dNoYes = {False:'no',True:'yes'} +tQuarters = ('.00','.25','.50','.75') + +# errors that may be raised +class WozError(Exception): pass # base class +class WozCRCError(WozError): pass +class WozFormatError(WozError): pass +class WozEOFError(WozFormatError): pass +class WozHeaderError(WozFormatError): pass +class WozHeaderError_NoWOZ1(WozHeaderError): pass +class WozHeaderError_NoFF(WozHeaderError): pass +class WozHeaderError_NoLF(WozHeaderError): pass +class WozINFOFormatError(WozFormatError): pass +class WozINFOFormatError_BadVersion(WozINFOFormatError): pass +class WozINFOFormatError_BadDiskType(WozINFOFormatError): pass +class WozINFOFormatError_BadWriteProtected(WozINFOFormatError): pass +class WozINFOFormatError_BadSynchronized(WozINFOFormatError): pass +class WozINFOFormatError_BadCleaned(WozINFOFormatError): pass +class WozTMAPFormatError(WozFormatError): pass +class WozTMAPFormatError_BadTRKS(WozTMAPFormatError): pass +class WozTRKSFormatError(WozFormatError): pass +class WozMETAFormatError(WozFormatError): pass +class WozMETAFormatError_DuplicateKey(WozFormatError): pass +class WozMETAFormatError_BadLanguage(WozFormatError): pass +class WozMETAFormatError_BadRAM(WozFormatError): pass +class WozMETAFormatError_BadMachine(WozFormatError): pass + +def from_uint32(b): + return int.from_bytes(b, byteorder="little") +from_uint16=from_uint32 + +def to_uint32(b): + return b.to_bytes(4, byteorder="little") + +def to_uint16(b): + return b.to_bytes(2, byteorder="little") + +def raise_if(cond, e, s=""): + if cond: raise e(s) + +class Track: + def __init__(self, bits, bit_count): + self.bits = bits + while len(self.bits) > bit_count: + self.bits.pop() + self.bit_count = bit_count + self.bit_index = 0 + self.revolutions = 0 + + def bit(self): + b = self.bits[self.bit_index] and 1 or 0 + self.bit_index += 1 + if self.bit_index >= self.bit_count: + self.bit_index = 0 + self.revolutions += 1 + yield b + + def nibble(self): + b = 0 + while b == 0: + b = next(self.bit()) + n = 0x80 + for bit_index in range(6, -1, -1): + b = next(self.bit()) + n += b << bit_index + yield n + + def find(self, sequence): + starting_revolutions = self.revolutions + seen = [0] * len(sequence) + while (self.revolutions < starting_revolutions + 2): + del seen[0] + seen.append(next(self.nibble())) + if tuple(seen) == tuple(sequence): return True + return False + +class WozTrack(Track): + def __init__(self, bits, bit_count, splice_point = 0xFFFF, splice_nibble = 0, splice_bit_count = 0): + Track.__init__(self, bits, bit_count) + self.splice_point = splice_point + self.splice_nibble = splice_nibble + self.splice_bit_count = splice_bit_count + +class DiskImage: # base class + def __init__(self, filename=None, stream=None): + raise_if(not filename and not stream, WozError, "no input") + self.filename = filename + self.tracks = [] + + def seek(self, track_num): + """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" + return None + +class EDDReader(DiskImage): + def __init__(self, filename=None, stream=None): + DiskImage.__init__(self, filename, stream) + with stream or open(filename, 'rb') as f: + for i in range(137): + raw_bytes = f.read(16384) + raise_if(len(raw_bytes) != 16384, WozError, "Bad EDD file (did you image by quarter tracks?)") + bits = bitarray.bitarray(endian="big") + bits.frombytes(raw_bytes) + self.tracks.append(Track(bits, 131072)) + + def seek(self, track_num): + if type(track_num) != float: + track_num = float(track_num) + if track_num < 0.0 or \ + track_num > 35.0 or \ + track_num.as_integer_ratio()[1] not in (1,2,4): + raise WozError("Invalid track %s" % track_num) + trk_id = int(track_num * 4) + return self.tracks[trk_id] + +class WozWriter: + def __init__(self, creator): + self.tracks = [] + self.tmap = [0xFF]*160 + self.creator = creator + #self.meta = collections.OrderedDict() + + def add_track(self, track_num, track): + tmap_id = int(track_num * 4) + trk_id = len(self.tracks) + self.tracks.append(track) + self.tmap[tmap_id] = trk_id + if tmap_id: + self.tmap[tmap_id - 1] = trk_id + if tmap_id < 159: + self.tmap[tmap_id + 1] = trk_id + + def build_info(self): + chunk = bytearray() + chunk.extend(kINFO) # chunk ID + chunk.extend(to_uint32(60)) # chunk size + chunk.extend(b'\x01') # version = 1 + chunk.extend(b'\x01') # disk type = 1 (5.25-inch) + chunk.extend(b'\x00') # write-protected = 0 + chunk.extend(b'\x00') # synchronized = 0 + chunk.extend(b'\x00') # cleaned = 0 + chunk.extend(self.creator.encode("UTF-8").ljust(32, b" ")) # creator + chunk.extend(b'\x00' * 23) # reserved + return chunk + + def build_tmap(self): + chunk = bytearray() + chunk.extend(kTMAP) # chunk ID + chunk.extend(to_uint32(160)) # chunk size + chunk.extend(bytes(self.tmap)) + return chunk + + def build_trks(self): + chunk = bytearray() + chunk.extend(kTRKS) # chunk ID + chunk_size = len(self.tracks)*6656 + chunk.extend(to_uint32(chunk_size)) # chunk size + for track in self.tracks: + raw_bytes = track.bits.tobytes() + chunk.extend(raw_bytes) # bitstream as raw bytes + chunk.extend(b'\x00' * (6646 - len(raw_bytes))) # padding to 6646 bytes + chunk.extend(to_uint16(len(raw_bytes))) # bytes used + chunk.extend(to_uint16(track.bit_count)) # bit count + chunk.extend(b'\xFF\xFF') # splice point (none) + chunk.extend(b'\xFF') # splice nibble (none) + chunk.extend(b'\xFF') # splice bit count (none) + chunk.extend(b'\x00\x00') # reserved + return chunk + + def build_meta(self): + return b'' + + def build_head(self, crc): + chunk = bytearray() + chunk.extend(kWOZ1) # magic bytes + chunk.extend(b'\xFF\x0A\x0D\x0A') # more magic bytes + chunk.extend(to_uint32(crc)) # CRC32 of rest of file (calculated in caller) + return chunk + + def write(self, stream): + info = self.build_info() + tmap = self.build_tmap() + trks = self.build_trks() + meta = self.build_meta() + crc = binascii.crc32(info + tmap + trks + meta) + head = self.build_head(crc) + stream.write(head) + stream.write(info) + stream.write(tmap) + stream.write(trks) + stream.write(meta) + +class WozReader(DiskImage): + def __init__(self, filename=None, stream=None): + DiskImage.__init__(self, filename, stream) + self.tmap = None + self.info = None + self.meta = None + + with stream or open(filename, 'rb') as f: + header_raw = f.read(8) + raise_if(len(header_raw) != 8, WozEOFError, sEOF) + self.__process_header(header_raw) + crc_raw = f.read(4) + raise_if(len(crc_raw) != 4, WozEOFError, sEOF) + crc = from_uint32(crc_raw) + all_data = [] + while True: + chunk_id = f.read(4) + if not chunk_id: break + raise_if(len(chunk_id) != 4, WozEOFError, sEOF) + all_data.append(chunk_id) + chunk_size_raw = f.read(4) + raise_if(len(chunk_size_raw) != 4, WozEOFError, sEOF) + all_data.append(chunk_size_raw) + chunk_size = from_uint32(chunk_size_raw) + data = f.read(chunk_size) + raise_if(len(data) != chunk_size, WozEOFError, sEOF) + all_data.append(data) + if chunk_id == kINFO: + raise_if(chunk_size != 60, WozINFOFormatError, sBadChunkSize) + self.__process_info(data) + elif chunk_id == kTMAP: + raise_if(chunk_size != 160, WozTMAPFormatError, sBadChunkSize) + self.__process_tmap(data) + elif chunk_id == kTRKS: + self.__process_trks(data) + elif chunk_id == kMETA: + self.__process_meta(data) + if crc: + raise_if(crc != binascii.crc32(b''.join(all_data)) & 0xffffffff, WozCRCError, "Bad CRC") + + def __process_header(self, data): + raise_if(data[:4] != kWOZ1, WozHeaderError_NoWOZ1, "Magic string 'WOZ1' not present at offset 0") + raise_if(data[4] != 0xFF, WozHeaderError_NoFF, "Magic byte 0xFF not present at offset 4") + raise_if(data[5:8] != b'\x0A\x0D\x0A', WozHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5") + + def __process_info(self, data): + version = data[0] + raise_if(version != 1, WozINFOFormatError_BadVersion, "Unknown version (expected 1, found %d)" % version) + disk_type = data[1] + raise_if(disk_type not in (1,2), WozINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %d)" % disk_type) + write_protected = data[2] + raise_if(write_protected not in (0,1), WozINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %d)" % write_protected) + synchronized = data[3] + raise_if(synchronized not in (0,1), WozINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %d)" % synchronized) + cleaned = data[4] + raise_if(cleaned not in (0,1), WozINFOFormatError_BadCleaned, "Unknown cleaned flag (expected 0 or 1, found %d)" % cleaned) + try: + creator = data[5:37].decode('UTF-8') + except: + raise WOZINFOFormatError("Creator is not valid UTF-8") + self.info = {"version": version, + "disk_type": disk_type, + "write_protected": (write_protected == 1), + "synchronized": (synchronized == 1), + "cleaned": (cleaned == 1), + "creator": creator} + + def __process_tmap(self, data): + self.tmap = list(data) + + def __process_trks(self, data): + i = 0 + while i < len(data): + raw_bytes = data[i:i+kBitstreamLengthInBytes] + raise_if(len(raw_bytes) != kBitstreamLengthInBytes, WozEOFError, sEOF) + i += kBitstreamLengthInBytes + bytes_used_raw = data[i:i+2] + raise_if(len(bytes_used_raw) != 2, WozEOFError, sEOF) + bytes_used = from_uint16(bytes_used_raw) + raise_if(bytes_used > kBitstreamLengthInBytes, WozTRKSFormatError, "TRKS chunk %d bytes_used is out of range" % len(self.tracks)) + i += 2 + bit_count_raw = data[i:i+2] + raise_if(len(bit_count_raw) != 2, WozEOFError, sEOF) + bit_count = from_uint16(bit_count_raw) + i += 2 + splice_point_raw = data[i:i+2] + raise_if(len(splice_point_raw) != 2, WozEOFError, sEOF) + splice_point = from_uint16(splice_point_raw) + if splice_point != 0xFFFF: + raise_if(splice_point > bit_count, WozTRKSFormatError, "TRKS chunk %d splice_point is out of range" % len(self.tracks)) + i += 2 + splice_nibble = data[i] + i += 1 + splice_bit_count = data[i] + if splice_point != 0xFFFF: + raise_if(splice_bit_count not in (8,9,10), WozTRKSFormatError, "TRKS chunk %d splice_bit_count is out of range" % len(self.tracks)) + i += 3 + bits = bitarray.bitarray(endian="big") + bits.frombytes(raw_bytes) + self.tracks.append(WozTrack(bits, bit_count, splice_point, splice_nibble, splice_bit_count)) + for trk, i in zip(self.tmap, itertools.count()): + raise_if(trk != 0xFF and trk >= len(self.tracks), WozTMAPFormatError_BadTRKS, "Invalid TMAP entry: track %d%s points to non-existent TRKS chunk %d" % (i/4, tQuarters[i%4], trk)) + + def __process_meta(self, data): + try: + metadata = data.decode('UTF-8') + except: + raise WozMETAFormatError("Metadata is not valid UTF-8") + self.meta = collections.OrderedDict() + for line in metadata.split('\n'): + if not line: continue + columns_raw = line.split('\t') + raise_if(len(columns_raw) != 2, WozMETAFormatError, "Malformed metadata") + key, value_raw = columns_raw + raise_if(key in self.meta, WozMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key) + values = value_raw.split("|") + if key == "language": + for value in values: + raise_if(value and (value not in kLanguages), WozMETAFormatError_BadLanguage, "Invalid metadata language") + elif key == "requires_ram": + for value in values: + raise_if(value and (value not in kRequiresRAM), WozMETAFormatError_BadRAM, "Invalid metadata requires_ram") + elif key == "requires_machine": + for value in values: + raise_if(value and (value not in kRequiresMachine), WozMETAFormatError_BadMachine, "Invalid metadata requires_machine") + self.meta[key] = values + + def seek(self, track_num): + """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" + if type(track_num) != float: + track_num = float(track_num) + if track_num < 0.0 or \ + track_num > 40.0 or \ + track_num.as_integer_ratio()[1] not in (1,2,4): + raise WozError("Invalid track %s" % track_num) + trk_id = self.tmap[int(track_num * 4)] + if trk_id == 0xFF: return None + return self.tracks[trk_id] + +# ----- quick info dump routines ----- +kWidth = 20 # width of first column for printing info and metadata + +def print_info(wozimage): + print() + print("INFO") + print("File format version:".ljust(kWidth), "%d" % wozimage.info["version"]) + print("Disk type:".ljust(kWidth), ("5.25-inch", "3.5-inch")[wozimage.info["disk_type"]-1]) + print("Write protected:".ljust(kWidth), dNoYes[wozimage.info["write_protected"]]) + print("Track synchronized:".ljust(kWidth), dNoYes[wozimage.info["synchronized"]]) + print("Weakbits cleaned:".ljust(kWidth), dNoYes[wozimage.info["cleaned"]]) + print("Creator:".ljust(kWidth), wozimage.info["creator"]) + +def print_tmap(wozimage): + print() + print("TMAP") + i = 0 + for tindex in wozimage.tmap: + if tindex != 0xFF: + print("Track %d%s -> TRKS %d" % (i/4, tQuarters[i%4], tindex)) + i += 1 + +def print_meta(wozimage): + if not wozimage.meta: return + print() + print("META") + for key, values in wozimage.meta.items(): + print((key + ":").ljust(kWidth), values[0]) + for value in values[1:]: + print("".ljust(kWidth), value) + +if __name__ == "__main__": + for wozfile in sys.argv[1:]: + w = WozReader(wozfile) + print_tmap(w) + print_meta(w) + print_info(w)