#!/usr/bin/env python3 from passport import wozimage from passport.patchers import * from passport.strings import * from passport.util import * import bitarray import collections import os.path import sys class BaseLogger: # base class def __init__(self, g): self.g = g def PrintByID(self, id, params = {}): """prints a predefined string, parameterized with some passed parameters and some globals""" pass def debug(self, s): pass def to_hex_string(self, n): if type(n) == int: return hex(n)[2:].rjust(2, "0").upper() if type(n) in (bytes, bytearray): return "".join([self.to_hex_string(x) for x in n]) SilentLogger = BaseLogger class DefaultLogger(BaseLogger): # logger that writes to sys.stdout def PrintByID(self, id, params = {}): p = params.copy() if "track" not in p: p["track"] = self.g.track if "sector" not in params: p["sector"] = self.g.sector for k in ("track", "sector", "offset", "old_value", "new_value"): p[k] = self.to_hex_string(p.get(k, 0)) sys.stdout.write(STRINGS[id].format(**p)) class DebugLogger(DefaultLogger): # logger that writes to sys.stdout, and writes debug information to sys.stderr def debug(self, s): sys.stderr.write(s) sys.stderr.write("\n") class PassportGlobals: def __init__(self): # things about the disk self.is_boot0 = False self.is_boot1 = False self.is_master = False self.is_rwts = False self.is_dos32 = False self.is_prodos = False self.is_dinkeydos = False self.is_pascal = False self.is_protdos = False self.is_daviddos = False self.is_ea = False self.possible_gamco = False self.is_optimum = False self.is_mecc_fastloader = False self.is_mecc1 = False self.is_mecc2 = False self.is_mecc3 = False self.is_mecc4 = False self.possible_d5d5f7 = False self.is_8b3 = False self.is_milliken1 = False self.is_adventure_international = False self.is_laureate = False self.is_datasoft = False self.is_sierra = False self.is_sierra13 = False self.is_f7f6 = False self.is_trillium = False self.polarware_tamper_check = False self.force_disk_vol = False self.captured_disk_volume_number = False self.disk_volume_number = None # things about the conversion process self.tried_univ = False self.track = 0 self.sector = 0 self.last_track = 0 class AddressField: def __init__(self, volume, track_num, sector_num, checksum): self.volume = volume self.track_num = track_num self.sector_num = sector_num self.checksum = checksum self.valid = (volume ^ track_num ^ sector_num ^ checksum) == 0 class Sector: def __init__(self, address_field, decoded, start_bit_index=None, end_bit_index=None): self.address_field = address_field self.decoded = decoded self.start_bit_index = start_bit_index self.end_bit_index = end_bit_index def __getitem__(self, i): return self.decoded[i] class RWTS: kDefaultSectorOrder16 = (0x00, 0x07, 0x0E, 0x06, 0x0D, 0x05, 0x0C, 0x04, 0x0B, 0x03, 0x0A, 0x02, 0x09, 0x01, 0x08, 0x0F) kDefaultAddressPrologue16 = (0xD5, 0xAA, 0x96) kDefaultAddressEpilogue16 = (0xDE, 0xAA) kDefaultDataPrologue16 = (0xD5, 0xAA, 0xAD) kDefaultDataEpilogue16 = (0xDE, 0xAA) kDefaultNibbleTranslationTable16 = { 0x96: 0x00, 0x97: 0x01, 0x9a: 0x02, 0x9b: 0x03, 0x9d: 0x04, 0x9e: 0x05, 0x9f: 0x06, 0xa6: 0x07, 0xa7: 0x08, 0xab: 0x09, 0xac: 0x0a, 0xad: 0x0b, 0xae: 0x0c, 0xaf: 0x0d, 0xb2: 0x0e, 0xb3: 0x0f, 0xb4: 0x10, 0xb5: 0x11, 0xb6: 0x12, 0xb7: 0x13, 0xb9: 0x14, 0xba: 0x15, 0xbb: 0x16, 0xbc: 0x17, 0xbd: 0x18, 0xbe: 0x19, 0xbf: 0x1a, 0xcb: 0x1b, 0xcd: 0x1c, 0xce: 0x1d, 0xcf: 0x1e, 0xd3: 0x1f, 0xd6: 0x20, 0xd7: 0x21, 0xd9: 0x22, 0xda: 0x23, 0xdb: 0x24, 0xdc: 0x25, 0xdd: 0x26, 0xde: 0x27, 0xdf: 0x28, 0xe5: 0x29, 0xe6: 0x2a, 0xe7: 0x2b, 0xe9: 0x2c, 0xea: 0x2d, 0xeb: 0x2e, 0xec: 0x2f, 0xed: 0x30, 0xee: 0x31, 0xef: 0x32, 0xf2: 0x33, 0xf3: 0x34, 0xf4: 0x35, 0xf5: 0x36, 0xf6: 0x37, 0xf7: 0x38, 0xf9: 0x39, 0xfa: 0x3a, 0xfb: 0x3b, 0xfc: 0x3c, 0xfd: 0x3d, 0xfe: 0x3e, 0xff: 0x3f, } def __init__(self, sectors_per_track = 16, address_prologue = kDefaultAddressPrologue16, address_epilogue = kDefaultAddressEpilogue16, data_prologue = kDefaultDataPrologue16, data_epilogue = kDefaultDataEpilogue16, sector_order = kDefaultSectorOrder16, nibble_translate_table = kDefaultNibbleTranslationTable16, logger = None): self.sectors_per_track = sectors_per_track self.address_prologue = address_prologue self.address_epilogue = address_epilogue self.data_prologue = data_prologue self.data_epilogue = data_epilogue self.sector_order = sector_order self.nibble_translate_table = nibble_translate_table self.logger = logger or SilentLogger def reorder_to_logical_sectors(self, sectors): logical = {} for k, v in sectors.items(): logical[self.sector_order[k]] = v return logical def find_address_prologue(self, track): return track.find(self.address_prologue) def address_field_at_point(self, track): volume = decode44(next(track.nibble()), next(track.nibble())) track_num = decode44(next(track.nibble()), next(track.nibble())) sector_num = decode44(next(track.nibble()), next(track.nibble())) checksum = decode44(next(track.nibble()), next(track.nibble())) return AddressField(volume, track_num, sector_num, checksum) def verify_nibbles_at_point(self, track, nibbles): found = [] for i in nibbles: found.append(next(track.nibble())) return tuple(found) == tuple(nibbles) def verify_address_epilogue_at_point(self, track): return self.verify_nibbles_at_point(track, self.address_epilogue) def find_data_prologue(self, track): return track.find(self.data_prologue) def data_field_at_point(self, track): disk_nibbles = [] for i in range(343): disk_nibbles.append(next(track.nibble())) checksum = 0 secondary = [] decoded = [] for i in range(86): n = disk_nibbles[i] if n not in self.nibble_translate_table: return None b = self.nibble_translate_table[n] if b >= 0x80: return None checksum ^= b secondary.insert(0, checksum) for i in range(86, 342): n = disk_nibbles[i] if n not in self.nibble_translate_table: return None b = self.nibble_translate_table[n] if b >= 0x80: return None checksum ^= b decoded.append(checksum << 2) n = disk_nibbles[i] if n not in self.nibble_translate_table: return None b = self.nibble_translate_table[n] if b >= 0x80: return None checksum ^= b for i in range(86): low2 = secondary[85 - i] decoded[i] += (((low2 & 0b000001) << 1) + ((low2 & 0b000010) >> 1)) decoded[i + 86] += (((low2 & 0b000100) >> 1) + ((low2 & 0b001000) >> 3)) if i < 84: decoded[i + 172] += (((low2 & 0b010000) >> 3) + ((low2 & 0b100000) >> 5)) return bytearray(decoded) def verify_data_epilogue_at_point(self, track): return self.verify_nibbles_at_point(track, self.data_epilogue) def decode_track(self, track, burn=0): sectors = collections.OrderedDict() if not track: return sectors starting_revolutions = track.revolutions verified_sectors = [] while (len(verified_sectors) < self.sectors_per_track) and \ (track.revolutions < starting_revolutions + 2): # store start index within track (used for .edd -> .woz conversion) start_bit_index = track.bit_index if not self.find_address_prologue(track): # if we can't even find a single address prologue, just give up self.logger.debug("can't find a single address prologue so LGTM or whatever") break # decode address field address_field = self.address_field_at_point(track) if address_field.sector_num in verified_sectors: # the sector we just found is a sector we've already decoded # properly, so skip past it self.logger.debug("duplicate sector %d" % address_field.sector_num) if self.find_data_prologue(track): if self.data_field_at_point(track): self.verify_data_epilogue_at_point(track) continue if address_field.sector_num > self.sectors_per_track: # found a weird sector whose ID is out of range # TODO: will eventually need to tweak this logic to handle Ultima V and others self.logger.debug("sector ID out of range %d" % address_field.sector_num) continue # put a placeholder for this sector in this position in the ordered dict # so even if this copy doesn't pan out but a later copy does, sectors # will still be in the original order sectors[address_field.sector_num] = None if not self.verify_address_epilogue_at_point(track): # verifying the address field epilogue failed, but this is # not necessarily fatal because there might be another copy # of this sector later continue if not self.find_data_prologue(track): # if we can't find a data field prologue, just give up self.logger.debug(repr(self.data_prologue)) break # read and decode the data field, and verify the data checksum decoded = self.data_field_at_point(track) if not decoded: self.logger.debug("data_field_at_point failed") # if DEBUG and address_field.sector_num == 0x0A: # DEBUG_CACHE.append(track.bits[start_bit_index:track.bit_index]) # if len(DEBUG_CACHE) == 2: # import code # cache = DEBUG_CACHE # code.interact(local=locals()) # decoding data field failed, but this is not necessarily fatal # because there might be another copy of this sector later continue if not self.verify_data_epilogue_at_point(track): # verifying the data field epilogue failed, but this is # not necessarily fatal because there might be another copy # of this sector later self.logger.debug("verify_data_epilogue_at_point failed") continue # store end index within track (used for .edd -> .woz conversion) end_bit_index = track.bit_index # if the caller told us to burn a certain number of sectors before # saving the good ones, do it now (used for .edd -> .woz conversion) if burn: burn -= 1 continue # all good, and we want to save this sector, so do it sectors[address_field.sector_num] = Sector(address_field, decoded, start_bit_index, end_bit_index) verified_sectors.append(address_field.sector_num) # remove placeholders of sectors that we found but couldn't decode properly # (made slightly more difficult by the fact that we're trying to remove # elements from an OrderedDict while iterating through the OrderedDict, # which Python really doesn't want to do) while None in sectors.values(): for k in sectors: if not sectors[k]: del sectors[k] break return sectors class UniversalRWTS(RWTS): acceptable_address_prologues = ((0xD4,0xAA,0x96), (0xD5,0xAA,0x96)) def __init__(self, logger): RWTS.__init__(self, address_epilogue=[], data_epilogue=[], logger=logger) def find_address_prologue(self, track): starting_revolutions = track.revolutions seen = [0,0,0] while (track.revolutions < starting_revolutions + 2): del seen[0] seen.append(next(track.nibble())) if tuple(seen) in self.acceptable_address_prologues: return True return False def verify_address_epilogue_at_point(self, track): # return True if not self.address_epilogue: self.address_epilogue = [next(track.nibble())] result = True else: result = RWTS.verify_address_epilogue_at_point(self, track) next(track.nibble()) next(track.nibble()) return result def verify_data_epilogue_at_point(self, track): if not self.data_epilogue: self.data_epilogue = [next(track.nibble())] result = True else: result = RWTS.verify_data_epilogue_at_point(self, track) next(track.nibble()) next(track.nibble()) return result class UniversalRWTSIgnoreEpilogues(UniversalRWTS): def verify_address_epilogue_at_point(self, track): return True def verify_data_epilogue_at_point(self, track): return True class DOS33RWTS(RWTS): def __init__(self, logical_sectors, logger): address_prologue = (logical_sectors[3][0x55], logical_sectors[3][0x5F], logical_sectors[3][0x6A]) address_epilogue = (logical_sectors[3][0x91], logical_sectors[3][0x9B]) data_prologue = (logical_sectors[2][0xE7], logical_sectors[2][0xF1], logical_sectors[2][0xFC]) data_epilogue = (logical_sectors[3][0x35], logical_sectors[3][0x3F]) nibble_translate_table = {} for nibble in range(0x96, 0x100): nibble_translate_table[nibble] = logical_sectors[4][nibble] RWTS.__init__(self, sectors_per_track=16, address_prologue=address_prologue, address_epilogue=address_epilogue, data_prologue=data_prologue, data_epilogue=data_epilogue, nibble_translate_table=nibble_translate_table, logger=logger) class D5TimingBitRWTS(RWTS): def __init__(self, logical_sectors, logger): data_prologue = (logical_sectors[2][0xE7], 0xAA, logical_sectors[2][0xFC]) data_epilogue = (logical_sectors[3][0x35], 0xAA) nibble_translate_table = {} for nibble in range(0x96, 0x100): nibble_translate_table[nibble] = logical_sectors[4][nibble] RWTS.__init__(self, sectors_per_track=16, data_prologue=data_prologue, data_epilogue=data_epilogue, nibble_translate_table=nibble_translate_table, logger=logger) def find_address_prologue(self, track): starting_revolutions = track.revolutions while (track.revolutions < starting_revolutions + 2): if next(track.nibble()) == 0xD5: bit = next(track.bit()) if bit == 0: return True: track.rewind(1) return False def verify_address_epilogue_at_point(self, track): return True class BasePassportProcessor: # base class def __init__(self, disk_image, logger_class=DefaultLogger): self.g = PassportGlobals() self.g.disk_image = disk_image self.logger = logger_class(self.g) self.rwts = None self.output_tracks = {} self.patchers = [] self.patches_found = [] self.patch_count = 0 # number of patches found across all tracks self.patcher_classes = [ #SunburstPatcher, #JMPBCF0Patcher, #JMPBEB1Patcher, #JMPBECAPatcher, #JMPB660Patcher, #JMPB720Patcher, bademu.BadEmuPatcher, #BadEmu2Patcher, rwts.RWTSPatcher, #RWTSLogPatcher, #MECC1Patcher, #MECC2Patcher, #MECC3Patcher, #MECC4Patcher, #ROL1EPatcher, #JSRBB03Patcher, #DavidBB03Patcher, #RWTSSwapPatcher, #RWTSSwap2Patcher, #BorderPatcher, #JMPAE8EPatcher, #JMPBBFEPatcher, #DatasoftPatcher, #NibTablePatcher, #DiskVolPatcher, #C9FFPatcher, #MillikenPatcher, #MethodsPatcher, #JSR8B3Patcher, #LaureatePatcher, #PascalRWTSPatcher, #MicrogramsPatcher, #DOS32Patcher, #DOS32DLMPatcher, microfun.MicrofunPatcher, #T11DiskVolPatcher, #T02VolumeNamePatcher, universale7.UniversalE7Patcher, a6bc95.A6BC95Patcher, a5count.A5CountPatcher, d5d5f7.D5D5F7Patcher, #ProDOSRWTSPatcher, #ProDOS6APatcher, #ProDOSMECCPatcher, #BBF9Patcher, #MemoryConfigPatcher, #OriginPatcher, #RWTSSwapMECCPatcher, #ProtectedDOSPatcher, #FBFFPatcher, #FBFFEncryptedPatcher, #PolarwarePatcher, #SierraPatcher, #CorrupterPatcher, #EAPatcher, #GamcoPatcher, #OptimumPatcher, #BootCounterPatcher, #JMPB412Patcher, #JMPB400Patcher, advint.AdventureInternationalPatcher, #JSR8635Patcher, #JMPB4BBPatcher, #DOS32MUSEPatcher, #SRAPatcher, #Sierra13Patcher, #SSPROTPatcher, #F7F6Patcher, #TrilliumPatcher, ] self.burn = 0 if self.preprocess(): if self.run(): self.postprocess() def SkipTrack(self, track_num, track): # don't look for whole-track protections on track 0, that's silly if track_num == 0: return False # Electronic Arts protection track? if track_num == 6: if self.rwts.find_address_prologue(track): address_field = self.rwts.address_field_at_point(track) if address_field and address_field.track_num == 5: return True # Nibble count track? repeated_nibble_count = 0 start_revolutions = track.revolutions last_nibble = 0x00 while (repeated_nibble_count < 512 and track.revolutions < start_revolutions + 2): n = next(track.nibble()) if n == last_nibble: repeated_nibble_count += 1 else: repeated_nibble_count = 0 last_nibble = n if repeated_nibble_count == 512: self.logger.PrintByID("sync") return True # TODO IsUnformatted and other tests return False def IDDiversi(self, t00s00): """returns True if T00S00 is Diversi-DOS bootloader, or False otherwise""" return find.at(0xF1, t00s00, b'\xB3\xA3\xA0\xD2\xCF\xD2\xD2\xC5' b'\x8D\x87\x8D') def IDProDOS(self, t00s00): """returns True if T00S00 is ProDOS bootloader, or False otherwise""" return find.at(0x00, t00s00, b'\x01' b'\x38' b'\xB0\x03' b'\x4C') def IDPascal(self, t00s00): """returns True if T00S00 is Pascal bootloader, or False otherwise""" if find.wild_at(0x00, t00s00, b'\x01' b'\xE0\x60' b'\xF0\x03' b'\x4C' + find.WILDCARD + b'\x08'): return True return find.at(0x00, t00s00, b'\x01' b'\xE0\x70' b'\xB0\x04' b'\xE0\x40' b'\xB0') def IDDavidDOS(self, t00s00): """returns True if T00S00 is David-DOS II bootloader, or False otherwise""" if not find.at(0x01, t00s00, b'\xA5\x27' b'\xC9\x09' b'\xD0\x17'): return False return find.wild_at(0x4A, t00s00, b'\xA2' + find.WILDCARD + \ b'\xBD' + find.WILDCARD + b'\x08' + \ b'\x9D' + find.WILDCARD + b'\x04' + \ b'\xCA' b'\x10\xF7') def IDDatasoft(self, t00s00): """returns True if T00S00 is encrypted Datasoft bootloader, or False otherwise""" return find.at(0x00, t00s00, b'\x01\x4C\x7E\x08\x04\x8A\x0C\xB8' b'\x00\x56\x10\x7A\x00\x00\x1A\x16' b'\x12\x0E\x0A\x06\x53\x18\x9A\x02' b'\x10\x1B\x02\x10\x4D\x56\x15\x0B' b'\xBF\x14\x14\x54\x54\x54\x92\x81' b'\x1B\x10\x10\x41\x06\x73\x0A\x10' b'\x33\x4E\x00\x73\x12\x10\x33\x7C' b'\x00\x11\x20\xE3\x49\x50\x73\x1A' b'\x10\x41\x00\x23\x80\x5B\x0A\x10' b'\x0B\x4E\x9D\x0A\x10\x9D\x0C\x10' b'\x60\x1E\x53\x10\x90\x53\xBC\x90' b'\x53\x00\x90\xD8\x52\x00\xD8\x7C' b'\x00\x53\x80\x0B\x06\x41\x00\x09' b'\x04\x45\x0C\x63\x04\x90\x94\xD0' b'\xD4\x23\x04\x91\xA1\xEB\xCD\x06' b'\x95\xA1\xE1\x98\x97\x86') def IDMicrograms(self, t00s00): """returns True if T00S00 is Micrograms bootloader, or False otherwise""" if not find.at(0x01, t00s00, b'\xA5\x27' b'\xC9\x09' b'\xD0\x12' b'\xA9\xC6' b'\x85\x3F'): return False return find.at(0x42, t00s00, b'\x4C\x00') def IDQuickDOS(self, t00s00): """returns True if T00S00 is Quick-DOS bootloader, or False otherwise""" return find.at(0x01, t00s00, b'\xA5\x27' b'\xC9\x09' b'\xD0\x27' b'\x78' b'\xAD\x83\xC0') def IDRDOS(self, t00s00): """returns True if T00S00 is Quick-DOS bootloader, or False otherwise""" return find.at(0x00, t00s00, b'\x01' b'\xA9\x60' b'\x8D\x01\x08' b'\xA2\x00' b'\xA0\x1F' b'\xB9\x00\x08' b'\x49') def IDDOS33(self, t00s00): """returns True if T00S00 is DOS bootloader or some variation that can be safely boot traced, or False otherwise""" # Code at $0801 must be standard (with one exception) if not find.wild_at(0x00, t00s00, b'\x01' b'\xA5\x27' b'\xC9\x09' b'\xD0\x18' b'\xA5\x2B' b'\x4A' b'\x4A' b'\x4A' b'\x4A' b'\x09\xC0' b'\x85\x3F' b'\xA9\x5C' b'\x85\x3E' b'\x18' b'\xAD\xFE\x08' b'\x6D\xFF\x08' + \ find.WILDCARD + find.WILDCARD + find.WILDCARD + \ b'\xAE\xFF\x08' b'\x30\x15' b'\xBD\x4D\x08' b'\x85\x3D' b'\xCE\xFF\x08' b'\xAD\xFE\x08' b'\x85\x27' b'\xCE\xFE\x08' b'\xA6\x2B' b'\x6C\x3E\x00' b'\xEE\xFE\x08' b'\xEE\xFE\x08'): return False # DOS 3.3 has JSR $FE89 / JSR $FE93 / JSR $FB2F # some Sierra have STA $C050 / STA $C057 / STA $C055 instead # with the unpleasant side-effect of showing text-mode garbage # if mixed-mode was enabled at the time if not find.at(0x3F, t00s00, b'\x20\x89\xFE' b'\x20\x93\xFE' b'\x20\x2F\xFB' b'\xA6\x2B'): if not find.at(0x3F, t00s00, b'\x8D\x50\xC0' b'\x8D\x57\xC0' b'\x8D\x55\xC0' b'\xA6\x2B'): return False # Sector order map must be standard (no exceptions) if not find.at(0x4D, t00s00, b'\x00\x0D\x0B\x09\x07\x05\x03\x01' b'\x0E\x0C\x0A\x08\x06\x04\x02\x0F'): return False # standard code at $081C -> success & done if find.at(0x1C, t00s00, b'\x8D\xFE\x08'): return True # Minor variant (e.g. Terrapin Logo 3.0) jumps to $08F0 and back # but is still safe to trace. Check for this jump and match # the code at $08F0 exactly. # unknown code at $081C -> failure if not find.at(0x1C, t00s00, b'\x4C\xF0\x08'): return False # unknown code at $08F0 -> failure, otherwise success & done return find.at(0xF0, t00s00, b'\x8D\xFE\x08' b'\xEE\xF3\x03' b'\x4C\x1F\x08') def IDPronto(self, t00s00): """returns True if T00S00 is Pronto-DOS bootloader, or False otherwise""" return find.at(0x5E, t00s00, b'\xB0\x50' b'\xAD\xCB\xB5' b'\x85\x42') def IDBootloader(self, t00): """returns RWTS object that can (hopefully) read the rest of the disk""" temporary_rwts_for_t00 = UniversalRWTSIgnoreEpilogues(self.logger) physical_sectors = temporary_rwts_for_t00.decode_track(t00) if 0 not in physical_sectors: self.logger.PrintByID("fatal0000") return None t00s00 = physical_sectors[0].decoded if self.IDDOS33(t00s00): self.g.is_boot0 = True if self.IDDiversi(t00s00): self.logger.PrintByID("diversidos") elif self.IDPronto(t00s00): self.logger.PrintByID("prontodos") else: self.logger.PrintByID("dos33boot0") logical_sectors = temporary_rwts_for_t00.reorder_to_logical_sectors(physical_sectors) return self.TraceDOS33(logical_sectors) # TODO JSR08B3 # TODO MECC fastloader # TODO DOS 3.3P # TODO Laureate # TODO Electronic Arts # TODO DOS 3.2 # TODO IDEncoded44 # TODO IDEncoded53 self.g.is_prodos = self.IDProDOS(t00s00) if self.g.is_prodos: # TODO IDVolumeName # TODO IDDinkeyDOS pass self.g.is_pascal = self.IDPascal(t00s00) self.g.is_daviddos = self.IDDavidDOS(t00s00) self.g.is_datasoft = self.IDDatasoft(t00s00) self.g.is_micrograms = self.IDMicrograms(t00s00) self.g.is_quickdos = self.IDQuickDOS(t00s00) self.g.is_rdos = self.IDRDOS(t00s00) return self.StartWithUniv() def TraceDOS33(self, logical_sectors): """returns RWTS object""" use_builtin = False # check that all the sectors of the RWTS were actually readable for i in range(1, 10): if i not in logical_sectors: use_builtin = True break # TODO handle Protected.DOS here if not use_builtin: # check for "STY $48;STA $49" at RWTS entry point ($BD00) use_builtin = not find.at(0x00, logical_sectors[7], b'\x84\x48\x85\x49') if not use_builtin: # check for "SEC;RTS" at $B942 use_builtin = not find.at(0x42, logical_sectors[3], b'\x38\x60') if not use_builtin: # check for "LDA $C08C,X" at $B94F use_builtin = not find.at(0x4F, logical_sectors[3], b'\xBD\x8C\xC0') if not use_builtin: # check for "JSR $xx00" at $BDB9 use_builtin = not find.at(0xB9, logical_sectors[7], b'\x20\x00') if not use_builtin: # check for RWTS variant that has extra code before # JSR $B800 e.g. Verb Viper (DLM), Advanced Analogies (Hartley) use_builtin = find.at(0xC5, logical_sectors[7], b'\x20\x00') if not use_builtin: # check for RWTS variant that uses non-standard address for slot # LDX $1FE8 e.g. Pinball Construction Set (1983) use_builtin = find.at(0x43, logical_sectors[8], b'\xAE\xE8\x1F') if not use_builtin: # check for D5+timing+bit RWTS if find.at(0x58, logical_sectors[3], b'\xEA\xBD\x8C\xC0\xC9\xD5'): self.logger.PrintByID("diskrwts") self.g.is_rwts = True return D5TimingBitRWTS(logical_sectors, self.logger) # TODO handle Milliken here # TODO handle Adventure International here # TODO handle Infocom here if use_builtin: return self.StartWithUniv() self.logger.PrintByID("diskrwts") self.g.is_rwts = True return DOS33RWTS(logical_sectors, self.logger) def StartWithUniv(self): """return Universal RWTS object, log that we're using it, and set global flags appropriately""" self.logger.PrintByID("builtin") self.g.tried_univ = True self.g.is_protdos = False return UniversalRWTS(self.logger) def preprocess(self): return True def run(self): self.logger.PrintByID("header") self.logger.PrintByID("reading", {"filename":self.g.disk_image.filename}) # get all raw track data from the source disk self.tracks = {} for track_num in range(0x23): self.tracks[float(track_num)] = self.g.disk_image.seek(float(track_num)) # analyze track $00 to create an RWTS self.rwts = self.IDBootloader(self.tracks[0]) if not self.rwts: return False # initialize all patchers for P in self.patcher_classes: self.patchers.append(P(self.g)) # main loop - loop through disk from track $22 down to track $00 for track_num in range(0x22, -1, -1): if track_num == 0 and self.g.tried_univ: self.rwts = UniversalRWTSIgnoreEpilogues(self.logger) should_run_patchers = False self.g.track = track_num physical_sectors = self.rwts.decode_track(self.tracks[track_num], self.burn) if 0x0F not in physical_sectors: if self.SkipTrack(track_num, self.tracks[track_num]): self.save_track(track_num, None) continue if len(physical_sectors) < self.rwts.sectors_per_track: # TODO wrong in case where we switch mid-track. # Need to save the sectors that worked with the original RWTS # then append the ones that worked with the universal RWTS if self.g.tried_univ: self.logger.PrintByID("fail") return False self.logger.PrintByID("switch", {"sector":0x0F}) # TODO find exact sector self.rwts = UniversalRWTS(self.logger) self.g.tried_univ = True physical_sectors = self.rwts.decode_track(self.tracks[track_num], self.burn) if len(physical_sectors) < self.rwts.sectors_per_track: self.logger.PrintByID("fail") # TODO find exact sector return False self.save_track(track_num, physical_sectors) return True def save_track(self, track_num, physical_sectors): pass def apply_patches(self, logical_sectors, patches): pass class Verify(BasePassportProcessor): def save_track(self, track_num, physical_sectors): if not physical_sectors: return {} logical_sectors = self.rwts.reorder_to_logical_sectors(physical_sectors) should_run_patchers = (len(physical_sectors) == 16) # TODO if should_run_patchers: for patcher in self.patchers: if patcher.should_run(track_num): patches = patcher.run(logical_sectors, track_num) if patches: self.apply_patches(logical_sectors, patches) self.patches_found.extend(patches) return logical_sectors def apply_patches(self, logical_sectors, patches): for patch in patches: if patch.id: self.logger.PrintByID(patch.id, patch.params) def postprocess(self): self.logger.PrintByID("passver") class Crack(Verify): def save_track(self, track_num, physical_sectors): self.output_tracks[float(track_num)] = Verify.save_track(self, track_num, physical_sectors) def apply_patches(self, logical_sectors, patches): for patch in patches: if patch.id: self.logger.PrintByID(patch.id, patch.params) if len(patch.new_value) > 0: b = logical_sectors[patch.sector_num].decoded patch.params["old_value"] = b[patch.byte_offset:patch.byte_offset+len(patch.new_value)] patch.params["new_value"] = patch.new_value self.logger.PrintByID("modify", patch.params) for i in range(len(patch.new_value)): b[patch.byte_offset + i] = patch.new_value[i] logical_sectors[patch.sector_num].decoded = b def postprocess(self): source_base, source_ext = os.path.splitext(self.g.disk_image.filename) output_filename = source_base + '.dsk' self.logger.PrintByID("writing", {"filename":output_filename}) with open(output_filename, "wb") as f: for track_num in range(0x23): if track_num in self.output_tracks: f.write(concat_track(self.output_tracks[track_num])) else: f.write(bytes(256*16)) if self.patches_found: self.logger.PrintByID("passcrack") else: self.logger.PrintByID("passcrack0") class EDDToWoz(BasePassportProcessor): def preprocess(self): self.burn = 2 return True def save_track(self, track_num, physical_sectors): track_num = float(track_num) track = self.tracks[track_num] if physical_sectors: b = bitarray.bitarray(endian="big") for s in physical_sectors.values(): b.extend(track.bits[s.start_bit_index:s.end_bit_index]) else: # TODO this only works about half the time b = track.bits[:51021] self.output_tracks[track_num] = wozimage.Track(b, len(b)) def postprocess(self): source_base, source_ext = os.path.splitext(self.g.disk_image.filename) output_filename = source_base + '.woz' self.logger.PrintByID("writing", {"filename":output_filename}) woz_image = wozimage.WozWriter(STRINGS["header"].strip()) for q in range(1 + (0x23 * 4)): track_num = q / 4 if track_num in self.output_tracks: woz_image.add_track(track_num, self.output_tracks[track_num]) with open(output_filename, 'wb') as f: woz_image.write(f) try: wozimage.WozReader(output_filename) except Exception as e: os.remove(output_filename) raise Exception from e