From 47b9b27a4c05dad377aea43d667bfc40796b64cf Mon Sep 17 00:00:00 2001 From: 4am Date: Fri, 1 Feb 2019 09:22:45 -0500 Subject: [PATCH] split seek into seek and reseek for speed parsing A2R files, also refactor loggers and rwts into submodules --- passport.py | 2 +- passport/__init__.py | 614 +++------------------------------- passport/a2rimage.py | 19 +- passport/loggers/__init__.py | 20 ++ passport/loggers/debug.py | 8 + passport/loggers/default.py | 15 + passport/loggers/silent.py | 4 + passport/patchers/__init__.py | 38 +-- passport/patchers/sunburst.py | 3 +- passport/rwts/__init__.py | 217 ++++++++++++ passport/rwts/beca.py | 35 ++ passport/rwts/border.py | 16 + passport/rwts/d5timing.py | 22 ++ passport/rwts/dos33.py | 29 ++ passport/rwts/hereditydog.py | 21 ++ passport/rwts/infocom.py | 11 + passport/rwts/laureate.py | 22 ++ passport/rwts/mecc.py | 40 +++ passport/rwts/optimum.py | 16 + passport/rwts/sunburst.py | 22 ++ passport/rwts/universal.py | 63 ++++ passport/strings.py | 2 +- 22 files changed, 652 insertions(+), 587 deletions(-) create mode 100644 passport/loggers/__init__.py create mode 100644 passport/loggers/debug.py create mode 100644 passport/loggers/default.py create mode 100644 passport/loggers/silent.py create mode 100644 passport/rwts/__init__.py create mode 100644 passport/rwts/beca.py create mode 100644 passport/rwts/border.py create mode 100644 passport/rwts/d5timing.py create mode 100644 passport/rwts/dos33.py create mode 100644 passport/rwts/hereditydog.py create mode 100644 passport/rwts/infocom.py create mode 100644 passport/rwts/laureate.py create mode 100644 passport/rwts/mecc.py create mode 100644 passport/rwts/optimum.py create mode 100644 passport/rwts/sunburst.py create mode 100644 passport/rwts/universal.py diff --git a/passport.py b/passport.py index b184d08..a11bfc8 100755 --- a/passport.py +++ b/passport.py @@ -4,7 +4,7 @@ # MIT-licensed from passport import eddimage, wozardry, a2rimage -from passport import DefaultLogger, DebugLogger +from passport.loggers import DefaultLogger, DebugLogger from passport import Crack, Verify, Convert from passport.strings import __date__, STRINGS import argparse diff --git a/passport/__init__.py b/passport/__init__.py index 2336419..32c8209 100755 --- a/passport/__init__.py +++ b/passport/__init__.py @@ -1,52 +1,13 @@ -#!/usr/bin/env python3 - -from passport import wozardry +from passport.loggers import * +from passport.rwts import * from passport.patchers import * from passport.strings import * from passport.util import * +from passport import wozardry import bitarray -import collections import os.path -import sys import time -class BaseLogger: # base class - def __init__(self, g): - self.g = g - - def PrintByID(self, id, params = {}): - """prints a predefined string, parameterized with some passed parameters and some globals""" - pass - - def debug(self, s): - pass - - def to_hex_string(self, n): - if type(n) == int: - return hex(n)[2:].rjust(2, "0").upper() - if type(n) in (bytes, bytearray): - return "".join([self.to_hex_string(x) for x in n]) - -SilentLogger = BaseLogger - -class DefaultLogger(BaseLogger): - # logger that writes to sys.stdout - def PrintByID(self, id, params = {}): - p = params.copy() - if "track" not in p: - p["track"] = self.g.track - if "sector" not in params: - p["sector"] = self.g.sector - for k in ("track", "sector", "offset", "old_value", "new_value"): - p[k] = self.to_hex_string(p.get(k, 0)) - sys.stdout.write(STRINGS[id].format(**p)) - -class DebugLogger(DefaultLogger): - # logger that writes to sys.stdout, and writes debug information to sys.stderr - def debug(self, s): - sys.stderr.write(s) - sys.stderr.write("\n") - class PassportGlobals: def __init__(self): # things about the disk @@ -83,498 +44,10 @@ class PassportGlobals: self.protection_enforces_write_protected = False # things about the conversion process self.tried_univ = False - self.track = 0 - self.sector = 0 + self.track = 0 # display purposes only + self.sector = 0 # display purposes only self.last_track = 0 -class AddressField: - def __init__(self, volume, track_num, sector_num, checksum): - self.volume = volume - self.track_num = track_num - self.sector_num = sector_num - self.checksum = checksum - self.valid = (volume ^ track_num ^ sector_num ^ checksum) == 0 - -class Sector: - def __init__(self, address_field, decoded, start_bit_index=None, end_bit_index=None): - self.address_field = address_field - self.decoded = decoded - self.start_bit_index = start_bit_index - self.end_bit_index = end_bit_index - - def __getitem__(self, i): - return self.decoded[i] - -class RWTS: - kDefaultSectorOrder16 = (0x00, 0x07, 0x0E, 0x06, 0x0D, 0x05, 0x0C, 0x04, 0x0B, 0x03, 0x0A, 0x02, 0x09, 0x01, 0x08, 0x0F) - kDefaultAddressPrologue16 = (0xD5, 0xAA, 0x96) - kDefaultAddressEpilogue16 = (0xDE, 0xAA) - kDefaultDataPrologue16 = (0xD5, 0xAA, 0xAD) - kDefaultDataEpilogue16 = (0xDE, 0xAA) - kDefaultNibbleTranslationTable16 = { - 0x96: 0x00, 0x97: 0x01, 0x9a: 0x02, 0x9b: 0x03, 0x9d: 0x04, 0x9e: 0x05, 0x9f: 0x06, 0xa6: 0x07, - 0xa7: 0x08, 0xab: 0x09, 0xac: 0x0a, 0xad: 0x0b, 0xae: 0x0c, 0xaf: 0x0d, 0xb2: 0x0e, 0xb3: 0x0f, - 0xb4: 0x10, 0xb5: 0x11, 0xb6: 0x12, 0xb7: 0x13, 0xb9: 0x14, 0xba: 0x15, 0xbb: 0x16, 0xbc: 0x17, - 0xbd: 0x18, 0xbe: 0x19, 0xbf: 0x1a, 0xcb: 0x1b, 0xcd: 0x1c, 0xce: 0x1d, 0xcf: 0x1e, 0xd3: 0x1f, - 0xd6: 0x20, 0xd7: 0x21, 0xd9: 0x22, 0xda: 0x23, 0xdb: 0x24, 0xdc: 0x25, 0xdd: 0x26, 0xde: 0x27, - 0xdf: 0x28, 0xe5: 0x29, 0xe6: 0x2a, 0xe7: 0x2b, 0xe9: 0x2c, 0xea: 0x2d, 0xeb: 0x2e, 0xec: 0x2f, - 0xed: 0x30, 0xee: 0x31, 0xef: 0x32, 0xf2: 0x33, 0xf3: 0x34, 0xf4: 0x35, 0xf5: 0x36, 0xf6: 0x37, - 0xf7: 0x38, 0xf9: 0x39, 0xfa: 0x3a, 0xfb: 0x3b, 0xfc: 0x3c, 0xfd: 0x3d, 0xfe: 0x3e, 0xff: 0x3f, - } - - def __init__(self, - g, - sectors_per_track = 16, - address_prologue = kDefaultAddressPrologue16, - address_epilogue = kDefaultAddressEpilogue16, - data_prologue = kDefaultDataPrologue16, - data_epilogue = kDefaultDataEpilogue16, - sector_order = kDefaultSectorOrder16, - nibble_translate_table = kDefaultNibbleTranslationTable16): - self.sectors_per_track = sectors_per_track - self.address_prologue = address_prologue - self.address_epilogue = address_epilogue - self.data_prologue = data_prologue - self.data_epilogue = data_epilogue - self.sector_order = sector_order - self.nibble_translate_table = nibble_translate_table - self.g = g - self.logical_track_num = 0 - - def seek(self, logical_track_num): - self.logical_track_num = logical_track_num - return float(logical_track_num) - - def reorder_to_logical_sectors(self, physical_sectors): - logical = {} - for k, v in physical_sectors.items(): - logical[self.sector_order[k]] = v - return logical - - def find_address_prologue(self, track): - return track.find(self.address_prologue) - - def address_field_at_point(self, track): - volume = decode44(next(track.nibble()), next(track.nibble())) - track_num = decode44(next(track.nibble()), next(track.nibble())) - sector_num = decode44(next(track.nibble()), next(track.nibble())) - checksum = decode44(next(track.nibble()), next(track.nibble())) - return AddressField(volume, track_num, sector_num, checksum) - - def verify_nibbles_at_point(self, track, nibbles): - found = [] - for i in nibbles: - found.append(next(track.nibble())) - return tuple(found) == tuple(nibbles) - - def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): - return self.verify_nibbles_at_point(track, self.address_epilogue) - - def find_data_prologue(self, track, logical_track_num, physical_sector_num): - return track.find(self.data_prologue) - - def data_field_at_point(self, track, logical_track_num, physical_sector_num): - disk_nibbles = [] - for i in range(343): - disk_nibbles.append(next(track.nibble())) - checksum = 0 - secondary = [] - decoded = [] - for i in range(86): - n = disk_nibbles[i] - if n not in self.nibble_translate_table: return None - b = self.nibble_translate_table[n] - if b >= 0x80: return None - checksum ^= b - secondary.insert(0, checksum) - for i in range(86, 342): - n = disk_nibbles[i] - if n not in self.nibble_translate_table: return None - b = self.nibble_translate_table[n] - if b >= 0x80: return None - checksum ^= b - decoded.append(checksum << 2) - n = disk_nibbles[i] - if n not in self.nibble_translate_table: return None - b = self.nibble_translate_table[n] - if b >= 0x80: return None - checksum ^= b - for i in range(86): - low2 = secondary[85 - i] - decoded[i] += (((low2 & 0b000001) << 1) + ((low2 & 0b000010) >> 1)) - decoded[i + 86] += (((low2 & 0b000100) >> 1) + ((low2 & 0b001000) >> 3)) - if i < 84: - decoded[i + 172] += (((low2 & 0b010000) >> 3) + ((low2 & 0b100000) >> 5)) - return bytearray(decoded) - - def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): - return self.verify_nibbles_at_point(track, self.data_epilogue) - - def decode_track(self, track, logical_track_num, burn=0): - sectors = collections.OrderedDict() - if not track: return sectors - starting_revolutions = track.revolutions - verified_sectors = [] - while (len(verified_sectors) < self.sectors_per_track) and \ - (track.revolutions < starting_revolutions + 2): - # store start index within track (used for .edd -> .woz conversion) - start_bit_index = track.bit_index - if not self.find_address_prologue(track): - # if we can't even find a single address prologue, just give up - self.g.logger.debug("can't find a single address prologue so LGTM or whatever") - break - # for edd->woz conversion, only save some of the bits preceding - # the address prologue - if track.bit_index - start_bit_index > 256: - start_bit_index = track.bit_index - 256 - # decode address field - address_field = self.address_field_at_point(track) - self.g.logger.debug("found sector %s" % hex(address_field.sector_num)[2:].upper()) - if address_field.sector_num in verified_sectors: - # the sector we just found is a sector we've already decoded - # properly, so skip it - self.g.logger.debug("duplicate sector %d, continuing" % address_field.sector_num) - continue - if address_field.sector_num > self.sectors_per_track: - # found a weird sector whose ID is out of range - # TODO: will eventually need to tweak this logic to handle Ultima V and others - self.g.logger.debug("sector ID out of range %d" % address_field.sector_num) - continue - # put a placeholder for this sector in this position in the ordered dict - # so even if this copy doesn't pan out but a later copy does, sectors - # will still be in the original order - sectors[address_field.sector_num] = None - if not self.verify_address_epilogue_at_point(track, logical_track_num, address_field.sector_num): - # verifying the address field epilogue failed, but this is - # not necessarily fatal because there might be another copy - # of this sector later - self.g.logger.debug("verify_address_epilogue_at_point failed, continuing") - continue - if not self.find_data_prologue(track, logical_track_num, address_field.sector_num): - # if we can't find a data field prologue, just give up - self.g.logger.debug("find_data_prologue failed, giving up") - break - # read and decode the data field, and verify the data checksum - decoded = self.data_field_at_point(track, logical_track_num, address_field.sector_num) - if not decoded: - # decoding data field failed, but this is not necessarily fatal - # because there might be another copy of this sector later - self.g.logger.debug("data_field_at_point failed, continuing") - continue - if not self.verify_data_epilogue_at_point(track, logical_track_num, address_field.sector_num): - # verifying the data field epilogue failed, but this is - # not necessarily fatal because there might be another copy - # of this sector later - self.g.logger.debug("verify_data_epilogue_at_point failed") - continue - # store end index within track (used for .edd -> .woz conversion) - end_bit_index = track.bit_index - # if the caller told us to burn a certain number of sectors before - # saving the good ones, do it now (used for .edd -> .woz conversion) - if burn: - burn -= 1 - continue - # all good, and we want to save this sector, so do it - sectors[address_field.sector_num] = Sector(address_field, decoded, start_bit_index, end_bit_index) - verified_sectors.append(address_field.sector_num) - self.g.logger.debug("saved sector %s" % hex(address_field.sector_num)) - # remove placeholders of sectors that we found but couldn't decode properly - # (made slightly more difficult by the fact that we're trying to remove - # elements from an OrderedDict while iterating through the OrderedDict, - # which Python really doesn't want to do) - while None in sectors.values(): - for k in sectors: - if not sectors[k]: - del sectors[k] - break - return sectors - -class UniversalRWTS(RWTS): - acceptable_address_prologues = ((0xD4,0xAA,0x96), (0xD5,0xAA,0x96)) - - def __init__(self, g): - RWTS.__init__(self, g, address_epilogue=[], data_epilogue=[]) - - def find_address_prologue(self, track): - starting_revolutions = track.revolutions - seen = [0,0,0] - while (track.revolutions < starting_revolutions + 2): - del seen[0] - seen.append(next(track.nibble())) - if tuple(seen) in self.acceptable_address_prologues: return True - return False - - def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): -# return True - if not self.address_epilogue: - self.address_epilogue = [next(track.nibble())] - result = True - else: - result = RWTS.verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num) - next(track.nibble()) - next(track.nibble()) - return result - - def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): - if not self.data_epilogue: - self.data_epilogue = [next(track.nibble())] - result = True - else: - result = RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) - next(track.nibble()) - next(track.nibble()) - return result - -class UniversalRWTSIgnoreEpilogues(UniversalRWTS): - def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): - return True - - def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): - return True - -class Track00RWTS(UniversalRWTSIgnoreEpilogues): - def data_field_at_point(self, track, logical_track_num, physical_sector_num): - start_index = track.bit_index - start_revolutions = track.revolutions - decoded = UniversalRWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num) - if not decoded: - # If the sector didn't decode properly, rewind to the - # beginning of the data field before returning to the - # caller. This is for disks with a fake T00,S0A that - # is full of consecutive 0s, where if we consume the bitstream - # as nibbles, we'll end up consuming the next address field - # and it will seem like that sector doesn't exist. And that - # is generally logical sector 2, which is important not to - # miss at this stage because its absence triggers a different - # code path and everything falls apart. - track.bit_index = start_index - track.revolutions = start_revolutions - return decoded - -class DOS33RWTS(RWTS): - def __init__(self, logical_sectors, g): - self.g = g - self.reset(logical_sectors) - RWTS.__init__(self, - g, - sectors_per_track=16, - address_prologue=self.address_prologue, - address_epilogue=self.address_epilogue, - data_prologue=self.data_prologue, - data_epilogue=self.data_epilogue, - nibble_translate_table=self.nibble_translate_table) - - def reset(self, logical_sectors): - self.address_prologue = (logical_sectors[3][0x55], - logical_sectors[3][0x5F], - logical_sectors[3][0x6A]) - self.address_epilogue = (logical_sectors[3][0x91], - logical_sectors[3][0x9B]) - self.data_prologue = (logical_sectors[2][0xE7], - logical_sectors[2][0xF1], - logical_sectors[2][0xFC]) - self.data_epilogue = (logical_sectors[3][0x35], - logical_sectors[3][0x3F]) - self.nibble_translate_table = {} - for nibble in range(0x96, 0x100): - self.nibble_translate_table[nibble] = logical_sectors[4][nibble] - -class SunburstRWTS(DOS33RWTS): - def reset(self, logical_sectors): - DOS33RWTS.reset(self, logical_sectors) - self.address_epilogue = (logical_sectors[3][0x91],) - self.data_epilogue = (logical_sectors[3][0x35],) - self.address_prologue_third_nibble_by_track = logical_sectors[4][0x29:] - self.data_prologue_third_nibble_by_track = logical_sectors[4][0x34:] - - def seek(self, logical_track_num): - self.address_prologue = (self.address_prologue[0], - self.address_prologue[1], - self.address_prologue_third_nibble_by_track[logical_track_num]) - self.data_prologue = (self.data_prologue[0], - self.data_prologue[1], - self.data_prologue_third_nibble_by_track[logical_track_num]) - DOS33RWTS.seek(self, logical_track_num) - if logical_track_num >= 0x11: - return logical_track_num + 0.5 - else: - return float(logical_track_num) - -class BorderRWTS(DOS33RWTS): - # TODO doesn't work yet, not sure why - def reset(self, logical_sectors): - DOS33RWTS.reset(self, logical_sectors) - self.address_prologue = (logical_sectors[9][0x16], - logical_sectors[9][0x1B], - logical_sectors[9][0x20]) - self.address_epilogue = (logical_sectors[9][0x25], - logical_sectors[9][0x2A]) - self.data_prologue = (logical_sectors[8][0xFD], - logical_sectors[9][0x02], - logical_sectors[9][0x02]) - self.data_epilogue = (logical_sectors[9][0x0C], - logical_sectors[9][0x11]) - -class D5TimingBitRWTS(DOS33RWTS): - def reset(self, logical_sectors): - DOS33RWTS.reset(self, logical_sectors) - self.data_prologue = (logical_sectors[2][0xE7], - 0xAA, - logical_sectors[2][0xFC]) - self.data_epilogue = (logical_sectors[3][0x35], - 0xAA) - - def find_address_prologue(self, track): - starting_revolutions = track.revolutions - while (track.revolutions < starting_revolutions + 2): - if next(track.nibble()) == 0xD5: - bit = next(track.bit()) - if bit == 0: return True - track.rewind(1) - return False - - def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): - return True - -class InfocomRWTS(DOS33RWTS): - def reset(self, logical_sectors): - DOS33RWTS.reset(self, logical_sectors) - self.data_prologue = self.data_prologue[:2] - - def find_data_prologue(self, track, logical_track_num, physical_sector_num): - if not DOS33RWTS.find_data_prologue(self, track, logical_track_num, physical_sector_num): - return False - return next(track.nibble()) >= 0xAD - -class OptimumResourceRWTS(DOS33RWTS): - def data_field_at_point(self, track, logical_track_num, physical_sector_num): - if (logical_track_num, physical_sector_num) == (0x01, 0x0F): - # TODO actually decode these - disk_nibbles = [] - for i in range(343): - disk_nibbles.append(next(track.nibble())) - return bytearray(256) # all zeroes for now - return DOS33RWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num) - - def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): - if (logical_track_num, physical_sector_num) == (0x01, 0x0F): - return True - return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) - -class HeredityDogRWTS(DOS33RWTS): - def data_field_at_point(self, track, logical_track_num, physical_sector_num): - if (logical_track_num, physical_sector_num) == (0x00, 0x0A): - # This sector is fake, full of too many consecutive 0s, - # designed to read differently every time. We go through - # and clean the stray bits, and be careful not to go past - # the end so we don't include the next address prologue. - start_index = track.bit_index - while (track.bit_index < start_index + (343*8)): - if self.nibble_translate_table.get(next(track.nibble()), 0xFF) == 0xFF: - track.bits[track.bit_index-8:track.bit_index] = 0 - self.g.found_and_cleaned_weakbits = True - return bytearray(256) - return DOS33RWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num) - - def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): - if (logical_track_num, physical_sector_num) == (0x00, 0x0A): - return True - return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) - -class BECARWTS(DOS33RWTS): - def is_protected_sector(self, logical_track_num, physical_sector_num): - if logical_track_num > 0: return True - return physical_sector_num not in (0x00, 0x0D, 0x0B, 0x09, 0x07, 0x05, 0x03, 0x01, 0x0E, 0x0C) - - def reset(self, logical_sectors): - DOS33RWTS.reset(self, logical_sectors) - self.data_prologue = self.data_prologue[:2] - - def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): - if self.is_protected_sector(logical_track_num, physical_sector_num): - return DOS33RWTS.verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num) - return True - - def find_data_prologue(self, track, logical_track_num, physical_sector_num): - if not DOS33RWTS.find_data_prologue(self, track, logical_track_num, physical_sector_num): - return False - next(track.nibble()) - if self.is_protected_sector(logical_track_num, physical_sector_num): - next(track.bit()) - next(track.nibble()) - next(track.bit()) - next(track.bit()) - return True - - def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): - if self.is_protected_sector(logical_track_num, physical_sector_num): - next(track.nibble()) - if logical_track_num == 0: - next(track.nibble()) - next(track.nibble()) - return True - return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) - -class LaureateRWTS(DOS33RWTS): - # nibble table is in T00,S06 - # address prologue is T00,S05 A$55,A$5F,A$6A - # address epilogue is T00,S05 A$91,A$9B - # data prologue is T00,S04 A$E7,A$F1,A$FC - # data epilogue is T00,S05 A$35,A$3F - def reset(self, logical_sectors): - self.address_prologue = (logical_sectors[5][0x55], - logical_sectors[5][0x5F], - logical_sectors[5][0x6A]) - self.address_epilogue = (logical_sectors[5][0x91], - logical_sectors[5][0x9B]) - self.data_prologue = (logical_sectors[4][0xE7], - logical_sectors[4][0xF1], - logical_sectors[4][0xFC]) - self.data_epilogue = (logical_sectors[5][0x35], - logical_sectors[5][0x3F]) - self.nibble_translate_table = {} - for nibble in range(0x96, 0x100): - self.nibble_translate_table[nibble] = logical_sectors[6][nibble] - -class MECCRWTS(DOS33RWTS): - # MECC fastloaders - def __init__(self, mecc_variant, logical_sectors, g): - g.mecc_variant = mecc_variant - DOS33RWTS.__init__(self, logical_sectors, g) - - def reset(self, logical_sectors): - self.nibble_translate_table = self.kDefaultNibbleTranslationTable16 - self.address_epilogue = (0xDE, 0xAA) - self.data_epilogue = (0xDE, 0xAA) - if self.g.mecc_variant == 1: - self.address_prologue = (logical_sectors[0x0B][0x08], - logical_sectors[0x0B][0x12], - logical_sectors[0x0B][0x1D]) - self.data_prologue = (logical_sectors[0x0B][0x8F], - logical_sectors[0x0B][0x99], - logical_sectors[0x0B][0xA3]) - elif self.g.mecc_variant == 2: - self.address_prologue = (logical_sectors[7][0x83], - logical_sectors[7][0x8D], - logical_sectors[7][0x98]) - self.data_prologue = (logical_sectors[7][0x15], - logical_sectors[7][0x1F], - logical_sectors[7][0x2A]) - elif self.g.mecc_variant == 3: - self.address_prologue = (logical_sectors[0x0A][0xE8], - logical_sectors[0x0A][0xF2], - logical_sectors[0x0A][0xFD]) - self.data_prologue = (logical_sectors[0x0B][0x6F], - logical_sectors[0x0B][0x79], - logical_sectors[0x0B][0x83]) - elif self.g.mecc_variant == 4: - self.address_prologue = (logical_sectors[8][0x83], - logical_sectors[8][0x8D], - logical_sectors[8][0x98]) - self.data_prologue = (logical_sectors[8][0x15], - logical_sectors[8][0x1F], - logical_sectors[8][0x2A]) - class BasePassportProcessor: # base class def __init__(self, disk_image, logger_class=DefaultLogger): self.g = PassportGlobals() @@ -586,26 +59,26 @@ class BasePassportProcessor: # base class self.patches_found = [] self.patch_count = 0 # number of patches found across all tracks self.patcher_classes = [ - sunburst.SunburstPatcher, + SunburstPatcher, #JMPBCF0Patcher, #JMPBEB1Patcher, #JMPBECAPatcher, #JMPB660Patcher, #JMPB720Patcher, - bademu.BadEmuPatcher, - bademu2.BadEmu2Patcher, - rwts.RWTSPatcher, + BadEmuPatcher, + BadEmu2Patcher, + RWTSPatcher, #RWTSLogPatcher, - mecc1.MECC1Patcher, - mecc2.MECC2Patcher, - mecc3.MECC3Patcher, - mecc4.MECC4Patcher, + MECC1Patcher, + MECC2Patcher, + MECC3Patcher, + MECC4Patcher, #ROL1EPatcher, #JSRBB03Patcher, #DavidBB03Patcher, #RWTSSwapPatcher, #RWTSSwap2Patcher, - border.BorderPatcher, + BorderPatcher, #JMPAE8EPatcher, #JMPBBFEPatcher, #DatasoftPatcher, @@ -620,17 +93,17 @@ class BasePassportProcessor: # base class #MicrogramsPatcher, #DOS32Patcher, #DOS32DLMPatcher, - microfun.MicrofunPatcher, + MicrofunPatcher, #T11DiskVolPatcher, #T02VolumeNamePatcher, - universale7.UniversalE7Patcher, - a6bc95.A6BC95Patcher, - a5count.A5CountPatcher, - d5d5f7.D5D5F7Patcher, + UniversalE7Patcher, + A6BC95Patcher, + A5CountPatcher, + D5D5F7Patcher, #ProDOSRWTSPatcher, #ProDOS6APatcher, #ProDOSMECCPatcher, - bbf9.BBF9Patcher, + BBF9Patcher, #MemoryConfigPatcher, #OriginPatcher, #RWTSSwapMECCPatcher, @@ -643,10 +116,10 @@ class BasePassportProcessor: # base class #EAPatcher, #GamcoPatcher, #OptimumPatcher, - bootcounter.BootCounterPatcher, + BootCounterPatcher, #JMPB412Patcher, #JMPB400Patcher, - advint.AdventureInternationalPatcher, + AdventureInternationalPatcher, #JSR8635Patcher, #JMPB4BBPatcher, #DOS32MUSEPatcher, @@ -664,6 +137,10 @@ class BasePassportProcessor: # base class def SkipTrack(self, logical_track_num, track): # don't look for whole-track protections on track 0, that's silly if logical_track_num == 0: return False + # Missing track? + if not track.bits: + self.g.logger.PrintByID("unformat") + return True # Electronic Arts protection track? if logical_track_num == 6: if self.rwts.find_address_prologue(track): @@ -683,7 +160,7 @@ class BasePassportProcessor: # base class if repeated_nibble_count == 512: self.g.logger.PrintByID("sync") return True - # TODO IsUnformatted and other tests + # TODO IsUnformatted nibble test and other tests return False def IDDiversi(self, t00s00): @@ -978,6 +455,8 @@ class BasePassportProcessor: # base class def IDSunburst(self, logical_sectors): """returns True if |logical_sectors| contains track 0 of a Sunburst disk, False otherwise""" + if 4 not in logical_sectors: + return False return find.wild_at(0x69, logical_sectors[0x04], bytes.fromhex("48" "A5 2A" @@ -1001,12 +480,14 @@ class BasePassportProcessor: # base class "AD 78 04" "90 2B")) - def IDBootloader(self, t00): + def IDBootloader(self, t00, suppress_errors=False): """returns RWTS object that can (hopefully) read the rest of the disk""" temporary_rwts_for_t00 = Track00RWTS(self.g) physical_sectors = temporary_rwts_for_t00.decode_track(t00, 0) if 0 not in physical_sectors: - self.g.logger.PrintByID("fatal0000") + if not suppress_errors: + self.g.logger.PrintByID("fail") + self.g.logger.PrintByID("fatal0000") return None t00s00 = physical_sectors[0].decoded logical_sectors = temporary_rwts_for_t00.reorder_to_logical_sectors(physical_sectors) @@ -1033,7 +514,6 @@ class BasePassportProcessor: # base class self.g.logger.debug("mecc_variant = %d" % mecc_variant) if mecc_variant: return MECCRWTS(mecc_variant, logical_sectors, self.g) - # TODO MECC fastloader # TODO DOS 3.3P if self.IDLaureate(t00s00): self.g.logger.PrintByID("laureate") @@ -1177,12 +657,16 @@ class BasePassportProcessor: # base class def run(self): self.g.logger.PrintByID("header") self.g.logger.PrintByID("reading", {"filename":self.g.disk_image.filename}) + supports_reseek = ("reseek" in dir(self.g.disk_image)) # get raw track $00 data from the source disk self.tracks = {} self.tracks[0] = self.g.disk_image.seek(0) # analyze track $00 to create an RWTS - self.rwts = self.IDBootloader(self.tracks[0]) + self.rwts = self.IDBootloader(self.tracks[0], supports_reseek) + if not self.rwts and supports_reseek: + self.tracks[0] = self.g.disk_image.reseek(0) + self.rwts = self.IDBootloader(self.tracks[0]) if not self.rwts: return False # initialize all patchers @@ -1200,6 +684,7 @@ class BasePassportProcessor: # base class self.tracks[physical_track_num] = self.g.disk_image.seek(physical_track_num) self.g.logger.debug("Seeking to track %s" % hex(self.g.track)) try_again = True + tried_reseek = False while try_again: try_again = False physical_sectors = self.rwts.decode_track(self.tracks[physical_track_num], logical_track_num, self.burn) @@ -1207,18 +692,27 @@ class BasePassportProcessor: # base class # TODO this is bad, we should just ask the RWTS object if we decoded enough sectors, # so that SunburstRWTS can override the logic on track 0x11 continue - else: - self.g.logger.debug("found %d sectors" % len(physical_sectors)) + if supports_reseek and not tried_reseek: + self.tracks[physical_track_num] = self.g.disk_image.reseek(physical_track_num) + self.g.logger.debug("Reseeking to track %s" % hex(self.g.track)) + tried_reseek = True + try_again = True + continue + self.g.logger.debug("found %d sectors" % len(physical_sectors)) if self.rwts.__class__ is SunburstRWTS and logical_track_num == 0x11: # TODO this is bad, see above continue if (0x0F not in physical_sectors) and self.SkipTrack(logical_track_num, self.tracks[physical_track_num]): physical_sectors = None continue - # TODO wrong in case where we switch mid-track. - # Need to save the sectors that worked with the original RWTS - # then append the ones that worked with the universal RWTS - if not self.g.tried_univ: + if self.g.tried_univ: + if logical_track_num == 0x22 and (0x0F not in physical_sectors): + self.g.logger.PrintByID("fatal220f") + return False + else: + # TODO wrong in case where we switch mid-track. + # Need to save the sectors that worked with the original RWTS + # then append the ones that worked with the universal RWTS self.g.logger.PrintByID("switch", {"sector":0x0F}) # TODO find exact sector self.rwts = UniversalRWTS(self.g) self.g.tried_univ = True diff --git a/passport/a2rimage.py b/passport/a2rimage.py index dc8203a..b1e8f3a 100755 --- a/passport/a2rimage.py +++ b/passport/a2rimage.py @@ -44,9 +44,20 @@ class A2RImage: raise A2RSeekError("Invalid track %s" % track_num) location = int(track_num * 4) if not self.tracks.get(location): - all_bits = bitarray.bitarray() - for flux_record in self.a2r_image.flux.get(location, [{}]): + # just return the bits from the first flux read + # (if the caller determines that they're not good, it will call reseek() + # which is smarter but takes longer) + bits = bitarray.bitarray() + for flux_record in self.a2r_image.flux.get(location, [{}])[:1]: bits, track_length, speed = self.to_bits(flux_record) - all_bits.extend(bits) - self.tracks[location] = Track(all_bits, len(all_bits)) + self.tracks[location] = Track(bits, len(bits)) + return self.tracks[location] + + def reseek(self, track_num): + location = int(track_num * 4) + all_bits = bitarray.bitarray() + for flux_record in self.a2r_image.flux.get(location, [{}]): + bits, track_length, speed = self.to_bits(flux_record) + all_bits.extend(bits) + self.tracks[location] = Track(all_bits, len(all_bits)) return self.tracks[location] diff --git a/passport/loggers/__init__.py b/passport/loggers/__init__.py new file mode 100644 index 0000000..7da9580 --- /dev/null +++ b/passport/loggers/__init__.py @@ -0,0 +1,20 @@ +class BaseLogger: # base class + def __init__(self, g): + self.g = g + + def PrintByID(self, id, params = {}): + """prints a predefined string, parameterized with some passed parameters and some globals""" + pass + + def debug(self, s): + pass + + def to_hex_string(self, n): + if type(n) == int: + return hex(n)[2:].rjust(2, "0").upper() + if type(n) in (bytes, bytearray): + return "".join([self.to_hex_string(x) for x in n]) + +from .silent import SilentLogger +from .default import DefaultLogger +from .debug import DebugLogger diff --git a/passport/loggers/debug.py b/passport/loggers/debug.py new file mode 100644 index 0000000..343121f --- /dev/null +++ b/passport/loggers/debug.py @@ -0,0 +1,8 @@ +from passport.loggers.default import DefaultLogger +import sys + +class DebugLogger(DefaultLogger): + """print usual log to stdout, print extra debugging information to stderr""" + def debug(self, s): + sys.stderr.write(s) + sys.stderr.write("\n") diff --git a/passport/loggers/default.py b/passport/loggers/default.py new file mode 100644 index 0000000..c7b1efb --- /dev/null +++ b/passport/loggers/default.py @@ -0,0 +1,15 @@ +from passport.loggers import BaseLogger +from passport.strings import STRINGS +import sys + +class DefaultLogger(BaseLogger): + """print to stdout in a form and verbosity that more or less mimics Passport/6502""" + def PrintByID(self, id, params = {}): + p = params.copy() + if "track" not in p: + p["track"] = self.g.track + if "sector" not in params: + p["sector"] = self.g.sector + for k in ("track", "sector", "offset", "old_value", "new_value"): + p[k] = self.to_hex_string(p.get(k, 0)) + sys.stdout.write(STRINGS[id].format(**p)) diff --git a/passport/loggers/silent.py b/passport/loggers/silent.py new file mode 100644 index 0000000..eecd3d4 --- /dev/null +++ b/passport/loggers/silent.py @@ -0,0 +1,4 @@ +from passport.loggers import BaseLogger + +class SilentLogger(BaseLogger): + """print nothing""" diff --git a/passport/patchers/__init__.py b/passport/patchers/__init__.py index bbd219f..f62215b 100644 --- a/passport/patchers/__init__.py +++ b/passport/patchers/__init__.py @@ -1,23 +1,3 @@ -__all__ = [ - "a5count", - "a6bc95", - "advint", - "bademu", - "bademu2", - "bbf9", - "bootcounter", - "border", - "d5d5f7", - "mecc1", - "mecc2", - "mecc3", - "mecc4", - "microfun", - "rwts", - "sunburst", - "universale7", - ] - class Patch: # represents a single patch that could be applied to a disk image def __init__(self, track_num, sector_num, byte_offset, new_value, id=None, params={}): @@ -42,3 +22,21 @@ class Patcher: # base class def run(self, logical_sectors, track_num): """returns list of Patch objects representing patches that could be applied to logical_sectors""" return [] + +from .a5count import * +from .a6bc95 import * +from .advint import * +from .bademu import * +from .bademu2 import * +from .bbf9 import * +from .bootcounter import * +from .border import * +from .d5d5f7 import * +from .mecc1 import * +from .mecc2 import * +from .mecc3 import * +from .mecc4 import * +from .microfun import * +from .rwts import * +from .sunburst import * +from .universale7 import * diff --git a/passport/patchers/sunburst.py b/passport/patchers/sunburst.py index e417dd4..61bd944 100644 --- a/passport/patchers/sunburst.py +++ b/passport/patchers/sunburst.py @@ -50,4 +50,5 @@ tested on return [Patch(0, 3, 0x40, bytes.fromhex("F0")), Patch(0, 3, 0x9C, bytes.fromhex("D0")), Patch(0, 6, 0x69, bytes.fromhex("20 C3 BC 20 C3 BC")), - Patch(0, 8, 0x8C, bytes.fromhex("A0 B9"))] + Patch(0, 8, 0x8C, bytes.fromhex("A0 B9")), + Patch(0, 4, 0xC0, bytes.fromhex("C0 C1 C2 C3 C4 C5 C6 C7 C8 C9 CA"))] diff --git a/passport/rwts/__init__.py b/passport/rwts/__init__.py new file mode 100644 index 0000000..3ec0e9d --- /dev/null +++ b/passport/rwts/__init__.py @@ -0,0 +1,217 @@ +from collections import OrderedDict +from passport.util import * + +class AddressField: + def __init__(self, volume, track_num, sector_num, checksum): + self.volume = volume + self.track_num = track_num + self.sector_num = sector_num + self.checksum = checksum + self.valid = (volume ^ track_num ^ sector_num ^ checksum) == 0 + +class Sector: + def __init__(self, address_field, decoded, start_bit_index=None, end_bit_index=None): + self.address_field = address_field + self.decoded = decoded + self.start_bit_index = start_bit_index + self.end_bit_index = end_bit_index + + def __getitem__(self, i): + return self.decoded[i] + +class RWTS: + kDefaultSectorOrder16 = (0x00, 0x07, 0x0E, 0x06, 0x0D, 0x05, 0x0C, 0x04, 0x0B, 0x03, 0x0A, 0x02, 0x09, 0x01, 0x08, 0x0F) + kDefaultAddressPrologue16 = (0xD5, 0xAA, 0x96) + kDefaultAddressEpilogue16 = (0xDE, 0xAA) + kDefaultDataPrologue16 = (0xD5, 0xAA, 0xAD) + kDefaultDataEpilogue16 = (0xDE, 0xAA) + kDefaultNibbleTranslationTable16 = { + 0x96: 0x00, 0x97: 0x01, 0x9a: 0x02, 0x9b: 0x03, 0x9d: 0x04, 0x9e: 0x05, 0x9f: 0x06, 0xa6: 0x07, + 0xa7: 0x08, 0xab: 0x09, 0xac: 0x0a, 0xad: 0x0b, 0xae: 0x0c, 0xaf: 0x0d, 0xb2: 0x0e, 0xb3: 0x0f, + 0xb4: 0x10, 0xb5: 0x11, 0xb6: 0x12, 0xb7: 0x13, 0xb9: 0x14, 0xba: 0x15, 0xbb: 0x16, 0xbc: 0x17, + 0xbd: 0x18, 0xbe: 0x19, 0xbf: 0x1a, 0xcb: 0x1b, 0xcd: 0x1c, 0xce: 0x1d, 0xcf: 0x1e, 0xd3: 0x1f, + 0xd6: 0x20, 0xd7: 0x21, 0xd9: 0x22, 0xda: 0x23, 0xdb: 0x24, 0xdc: 0x25, 0xdd: 0x26, 0xde: 0x27, + 0xdf: 0x28, 0xe5: 0x29, 0xe6: 0x2a, 0xe7: 0x2b, 0xe9: 0x2c, 0xea: 0x2d, 0xeb: 0x2e, 0xec: 0x2f, + 0xed: 0x30, 0xee: 0x31, 0xef: 0x32, 0xf2: 0x33, 0xf3: 0x34, 0xf4: 0x35, 0xf5: 0x36, 0xf6: 0x37, + 0xf7: 0x38, 0xf9: 0x39, 0xfa: 0x3a, 0xfb: 0x3b, 0xfc: 0x3c, 0xfd: 0x3d, 0xfe: 0x3e, 0xff: 0x3f, + } + + def __init__(self, + g, + sectors_per_track = 16, + address_prologue = kDefaultAddressPrologue16, + address_epilogue = kDefaultAddressEpilogue16, + data_prologue = kDefaultDataPrologue16, + data_epilogue = kDefaultDataEpilogue16, + sector_order = kDefaultSectorOrder16, + nibble_translate_table = kDefaultNibbleTranslationTable16): + self.sectors_per_track = sectors_per_track + self.address_prologue = address_prologue + self.address_epilogue = address_epilogue + self.data_prologue = data_prologue + self.data_epilogue = data_epilogue + self.sector_order = sector_order + self.nibble_translate_table = nibble_translate_table + self.g = g + self.logical_track_num = 0 + + def seek(self, logical_track_num): + self.logical_track_num = logical_track_num + return float(logical_track_num) + + def reorder_to_logical_sectors(self, physical_sectors): + logical = {} + for k, v in physical_sectors.items(): + logical[self.sector_order[k]] = v + return logical + + def find_address_prologue(self, track): + return track.find(self.address_prologue) + + def address_field_at_point(self, track): + volume = decode44(next(track.nibble()), next(track.nibble())) + track_num = decode44(next(track.nibble()), next(track.nibble())) + sector_num = decode44(next(track.nibble()), next(track.nibble())) + checksum = decode44(next(track.nibble()), next(track.nibble())) + return AddressField(volume, track_num, sector_num, checksum) + + def verify_nibbles_at_point(self, track, nibbles): + found = [] + for i in nibbles: + found.append(next(track.nibble())) + return tuple(found) == tuple(nibbles) + + def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + return self.verify_nibbles_at_point(track, self.address_epilogue) + + def find_data_prologue(self, track, logical_track_num, physical_sector_num): + return track.find(self.data_prologue) + + def data_field_at_point(self, track, logical_track_num, physical_sector_num): + disk_nibbles = [] + for i in range(343): + disk_nibbles.append(next(track.nibble())) + checksum = 0 + secondary = [] + decoded = [] + for i in range(86): + n = disk_nibbles[i] + if n not in self.nibble_translate_table: return None + b = self.nibble_translate_table[n] + if b >= 0x80: return None + checksum ^= b + secondary.insert(0, checksum) + for i in range(86, 342): + n = disk_nibbles[i] + if n not in self.nibble_translate_table: return None + b = self.nibble_translate_table[n] + if b >= 0x80: return None + checksum ^= b + decoded.append(checksum << 2) + n = disk_nibbles[i] + if n not in self.nibble_translate_table: return None + b = self.nibble_translate_table[n] + if b >= 0x80: return None + checksum ^= b + for i in range(86): + low2 = secondary[85 - i] + decoded[i] += (((low2 & 0b000001) << 1) + ((low2 & 0b000010) >> 1)) + decoded[i + 86] += (((low2 & 0b000100) >> 1) + ((low2 & 0b001000) >> 3)) + if i < 84: + decoded[i + 172] += (((low2 & 0b010000) >> 3) + ((low2 & 0b100000) >> 5)) + return bytearray(decoded) + + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + return self.verify_nibbles_at_point(track, self.data_epilogue) + + def decode_track(self, track, logical_track_num, burn=0): + sectors = OrderedDict() + if not track: return sectors + if not track.bits: return sectors + starting_revolutions = track.revolutions + verified_sectors = [] + while (len(verified_sectors) < self.sectors_per_track) and \ + (track.revolutions < starting_revolutions + 2): + # store start index within track (used for .woz conversion) + start_bit_index = track.bit_index + if not self.find_address_prologue(track): + # if we can't even find a single address prologue, just give up + self.g.logger.debug("can't find a single address prologue so LGTM or whatever") + break + # for .woz conversion, only save some of the bits preceding + # the address prologue + if track.bit_index - start_bit_index > 256: + start_bit_index = track.bit_index - 256 + # decode address field + address_field = self.address_field_at_point(track) + self.g.logger.debug("found sector %s" % hex(address_field.sector_num)[2:].upper()) + if address_field.sector_num in verified_sectors: + # the sector we just found is a sector we've already decoded + # properly, so skip it + self.g.logger.debug("duplicate sector %d, continuing" % address_field.sector_num) + continue + if address_field.sector_num > self.sectors_per_track: + # found a weird sector whose ID is out of range + # TODO: will eventually need to tweak this logic to handle Ultima V and others + self.g.logger.debug("sector ID out of range %d" % address_field.sector_num) + continue + # put a placeholder for this sector in this position in the ordered dict + # so even if this copy doesn't pan out but a later copy does, sectors + # will still be in the original order + sectors[address_field.sector_num] = None + if not self.verify_address_epilogue_at_point(track, logical_track_num, address_field.sector_num): + # verifying the address field epilogue failed, but this is + # not necessarily fatal because there might be another copy + # of this sector later + self.g.logger.debug("verify_address_epilogue_at_point failed, continuing") + continue + if not self.find_data_prologue(track, logical_track_num, address_field.sector_num): + # if we can't find a data field prologue, just give up + self.g.logger.debug("find_data_prologue failed, giving up") + break + # read and decode the data field, and verify the data checksum + decoded = self.data_field_at_point(track, logical_track_num, address_field.sector_num) + if not decoded: + # decoding data field failed, but this is not necessarily fatal + # because there might be another copy of this sector later + self.g.logger.debug("data_field_at_point failed, continuing") + continue + if not self.verify_data_epilogue_at_point(track, logical_track_num, address_field.sector_num): + # verifying the data field epilogue failed, but this is + # not necessarily fatal because there might be another copy + # of this sector later + self.g.logger.debug("verify_data_epilogue_at_point failed") + continue + # store end index within track (used for .woz conversion) + end_bit_index = track.bit_index + # if the caller told us to burn a certain number of sectors before + # saving the good ones, do it now (used for .woz conversion) + if burn: + burn -= 1 + continue + # all good, and we want to save this sector, so do it + sectors[address_field.sector_num] = Sector(address_field, decoded, start_bit_index, end_bit_index) + verified_sectors.append(address_field.sector_num) + self.g.logger.debug("saved sector %s" % hex(address_field.sector_num)) + # remove placeholders of sectors that we found but couldn't decode properly + # (made slightly more difficult by the fact that we're trying to remove + # elements from an OrderedDict while iterating through the OrderedDict, + # which Python really doesn't want to do) + while None in sectors.values(): + for k in sectors: + if not sectors[k]: + del sectors[k] + break + return sectors + +from .universal import * +from .dos33 import * +from .sunburst import * +from .border import * +from .d5timing import * +from .infocom import * +from .optimum import * +from .hereditydog import * +from .beca import * +from .laureate import * +from .mecc import * diff --git a/passport/rwts/beca.py b/passport/rwts/beca.py new file mode 100644 index 0000000..943f672 --- /dev/null +++ b/passport/rwts/beca.py @@ -0,0 +1,35 @@ +from passport.rwts.dos33 import DOS33RWTS + +class BECARWTS(DOS33RWTS): + def is_protected_sector(self, logical_track_num, physical_sector_num): + if logical_track_num > 0: return True + return physical_sector_num not in (0x00, 0x0D, 0x0B, 0x09, 0x07, 0x05, 0x03, 0x01, 0x0E, 0x0C) + + def reset(self, logical_sectors): + DOS33RWTS.reset(self, logical_sectors) + self.data_prologue = self.data_prologue[:2] + + def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + if self.is_protected_sector(logical_track_num, physical_sector_num): + return DOS33RWTS.verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num) + return True + + def find_data_prologue(self, track, logical_track_num, physical_sector_num): + if not DOS33RWTS.find_data_prologue(self, track, logical_track_num, physical_sector_num): + return False + next(track.nibble()) + if self.is_protected_sector(logical_track_num, physical_sector_num): + next(track.bit()) + next(track.nibble()) + next(track.bit()) + next(track.bit()) + return True + + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + if self.is_protected_sector(logical_track_num, physical_sector_num): + next(track.nibble()) + if logical_track_num == 0: + next(track.nibble()) + next(track.nibble()) + return True + return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) diff --git a/passport/rwts/border.py b/passport/rwts/border.py new file mode 100644 index 0000000..f5acd0f --- /dev/null +++ b/passport/rwts/border.py @@ -0,0 +1,16 @@ +from passport.rwts.dos33 import DOS33RWTS + +class BorderRWTS(DOS33RWTS): + # TODO doesn't work yet, not sure why + def reset(self, logical_sectors): + DOS33RWTS.reset(self, logical_sectors) + self.address_prologue = (logical_sectors[9][0x16], + logical_sectors[9][0x1B], + logical_sectors[9][0x20]) + self.address_epilogue = (logical_sectors[9][0x25], + logical_sectors[9][0x2A]) + self.data_prologue = (logical_sectors[8][0xFD], + logical_sectors[9][0x02], + logical_sectors[9][0x02]) + self.data_epilogue = (logical_sectors[9][0x0C], + logical_sectors[9][0x11]) diff --git a/passport/rwts/d5timing.py b/passport/rwts/d5timing.py new file mode 100644 index 0000000..56849d5 --- /dev/null +++ b/passport/rwts/d5timing.py @@ -0,0 +1,22 @@ +from passport.rwts.dos33 import DOS33RWTS + +class D5TimingBitRWTS(DOS33RWTS): + def reset(self, logical_sectors): + DOS33RWTS.reset(self, logical_sectors) + self.data_prologue = (logical_sectors[2][0xE7], + 0xAA, + logical_sectors[2][0xFC]) + self.data_epilogue = (logical_sectors[3][0x35], + 0xAA) + + def find_address_prologue(self, track): + starting_revolutions = track.revolutions + while (track.revolutions < starting_revolutions + 2): + if next(track.nibble()) == 0xD5: + bit = next(track.bit()) + if bit == 0: return True + track.rewind(1) + return False + + def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + return True diff --git a/passport/rwts/dos33.py b/passport/rwts/dos33.py new file mode 100644 index 0000000..c986a31 --- /dev/null +++ b/passport/rwts/dos33.py @@ -0,0 +1,29 @@ +from passport.rwts import RWTS + +class DOS33RWTS(RWTS): + def __init__(self, logical_sectors, g): + self.g = g + self.reset(logical_sectors) + RWTS.__init__(self, + g, + sectors_per_track=16, + address_prologue=self.address_prologue, + address_epilogue=self.address_epilogue, + data_prologue=self.data_prologue, + data_epilogue=self.data_epilogue, + nibble_translate_table=self.nibble_translate_table) + + def reset(self, logical_sectors): + self.address_prologue = (logical_sectors[3][0x55], + logical_sectors[3][0x5F], + logical_sectors[3][0x6A]) + self.address_epilogue = (logical_sectors[3][0x91], + logical_sectors[3][0x9B]) + self.data_prologue = (logical_sectors[2][0xE7], + logical_sectors[2][0xF1], + logical_sectors[2][0xFC]) + self.data_epilogue = (logical_sectors[3][0x35], + logical_sectors[3][0x3F]) + self.nibble_translate_table = {} + for nibble in range(0x96, 0x100): + self.nibble_translate_table[nibble] = logical_sectors[4][nibble] diff --git a/passport/rwts/hereditydog.py b/passport/rwts/hereditydog.py new file mode 100644 index 0000000..c142b3c --- /dev/null +++ b/passport/rwts/hereditydog.py @@ -0,0 +1,21 @@ +from passport.rwts.dos33 import DOS33RWTS + +class HeredityDogRWTS(DOS33RWTS): + def data_field_at_point(self, track, logical_track_num, physical_sector_num): + if (logical_track_num, physical_sector_num) == (0x00, 0x0A): + # This sector is fake, full of too many consecutive 0s, + # designed to read differently every time. We go through + # and clean the stray bits, and be careful not to go past + # the end so we don't include the next address prologue. + start_index = track.bit_index + while (track.bit_index < start_index + (343*8)): + if self.nibble_translate_table.get(next(track.nibble()), 0xFF) == 0xFF: + track.bits[track.bit_index-8:track.bit_index] = 0 + self.g.found_and_cleaned_weakbits = True + return bytearray(256) + return DOS33RWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num) + + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + if (logical_track_num, physical_sector_num) == (0x00, 0x0A): + return True + return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) diff --git a/passport/rwts/infocom.py b/passport/rwts/infocom.py new file mode 100644 index 0000000..5c03302 --- /dev/null +++ b/passport/rwts/infocom.py @@ -0,0 +1,11 @@ +from passport.rwts.dos33 import DOS33RWTS + +class InfocomRWTS(DOS33RWTS): + def reset(self, logical_sectors): + DOS33RWTS.reset(self, logical_sectors) + self.data_prologue = self.data_prologue[:2] + + def find_data_prologue(self, track, logical_track_num, physical_sector_num): + if not DOS33RWTS.find_data_prologue(self, track, logical_track_num, physical_sector_num): + return False + return next(track.nibble()) >= 0xAD diff --git a/passport/rwts/laureate.py b/passport/rwts/laureate.py new file mode 100644 index 0000000..bba25ba --- /dev/null +++ b/passport/rwts/laureate.py @@ -0,0 +1,22 @@ +from passport.rwts.dos33 import DOS33RWTS + +class LaureateRWTS(DOS33RWTS): + # nibble table is in T00,S06 + # address prologue is T00,S05 A$55,A$5F,A$6A + # address epilogue is T00,S05 A$91,A$9B + # data prologue is T00,S04 A$E7,A$F1,A$FC + # data epilogue is T00,S05 A$35,A$3F + def reset(self, logical_sectors): + self.address_prologue = (logical_sectors[5][0x55], + logical_sectors[5][0x5F], + logical_sectors[5][0x6A]) + self.address_epilogue = (logical_sectors[5][0x91], + logical_sectors[5][0x9B]) + self.data_prologue = (logical_sectors[4][0xE7], + logical_sectors[4][0xF1], + logical_sectors[4][0xFC]) + self.data_epilogue = (logical_sectors[5][0x35], + logical_sectors[5][0x3F]) + self.nibble_translate_table = {} + for nibble in range(0x96, 0x100): + self.nibble_translate_table[nibble] = logical_sectors[6][nibble] diff --git a/passport/rwts/mecc.py b/passport/rwts/mecc.py new file mode 100644 index 0000000..5804f21 --- /dev/null +++ b/passport/rwts/mecc.py @@ -0,0 +1,40 @@ +from passport.rwts.dos33 import DOS33RWTS + +class MECCRWTS(DOS33RWTS): + # MECC fastloaders + def __init__(self, mecc_variant, logical_sectors, g): + g.mecc_variant = mecc_variant + DOS33RWTS.__init__(self, logical_sectors, g) + + def reset(self, logical_sectors): + self.nibble_translate_table = self.kDefaultNibbleTranslationTable16 + self.address_epilogue = (0xDE, 0xAA) + self.data_epilogue = (0xDE, 0xAA) + if self.g.mecc_variant == 1: + self.address_prologue = (logical_sectors[0x0B][0x08], + logical_sectors[0x0B][0x12], + logical_sectors[0x0B][0x1D]) + self.data_prologue = (logical_sectors[0x0B][0x8F], + logical_sectors[0x0B][0x99], + logical_sectors[0x0B][0xA3]) + elif self.g.mecc_variant == 2: + self.address_prologue = (logical_sectors[7][0x83], + logical_sectors[7][0x8D], + logical_sectors[7][0x98]) + self.data_prologue = (logical_sectors[7][0x15], + logical_sectors[7][0x1F], + logical_sectors[7][0x2A]) + elif self.g.mecc_variant == 3: + self.address_prologue = (logical_sectors[0x0A][0xE8], + logical_sectors[0x0A][0xF2], + logical_sectors[0x0A][0xFD]) + self.data_prologue = (logical_sectors[0x0B][0x6F], + logical_sectors[0x0B][0x79], + logical_sectors[0x0B][0x83]) + elif self.g.mecc_variant == 4: + self.address_prologue = (logical_sectors[8][0x83], + logical_sectors[8][0x8D], + logical_sectors[8][0x98]) + self.data_prologue = (logical_sectors[8][0x15], + logical_sectors[8][0x1F], + logical_sectors[8][0x2A]) diff --git a/passport/rwts/optimum.py b/passport/rwts/optimum.py new file mode 100644 index 0000000..804bf24 --- /dev/null +++ b/passport/rwts/optimum.py @@ -0,0 +1,16 @@ +from passport.rwts.dos33 import DOS33RWTS + +class OptimumResourceRWTS(DOS33RWTS): + def data_field_at_point(self, track, logical_track_num, physical_sector_num): + if (logical_track_num, physical_sector_num) == (0x01, 0x0F): + # TODO actually decode these + disk_nibbles = [] + for i in range(343): + disk_nibbles.append(next(track.nibble())) + return bytearray(256) # all zeroes for now + return DOS33RWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num) + + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + if (logical_track_num, physical_sector_num) == (0x01, 0x0F): + return True + return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) diff --git a/passport/rwts/sunburst.py b/passport/rwts/sunburst.py new file mode 100644 index 0000000..0014851 --- /dev/null +++ b/passport/rwts/sunburst.py @@ -0,0 +1,22 @@ +from passport.rwts.dos33 import DOS33RWTS + +class SunburstRWTS(DOS33RWTS): + def reset(self, logical_sectors): + DOS33RWTS.reset(self, logical_sectors) + self.address_epilogue = (logical_sectors[3][0x91],) + self.data_epilogue = (logical_sectors[3][0x35],) + self.address_prologue_third_nibble_by_track = logical_sectors[4][0x29:] + self.data_prologue_third_nibble_by_track = logical_sectors[4][0x34:] + + def seek(self, logical_track_num): + self.address_prologue = (self.address_prologue[0], + self.address_prologue[1], + self.address_prologue_third_nibble_by_track[logical_track_num]) + self.data_prologue = (self.data_prologue[0], + self.data_prologue[1], + self.data_prologue_third_nibble_by_track[logical_track_num]) + DOS33RWTS.seek(self, logical_track_num) + if logical_track_num >= 0x11: + return logical_track_num + 0.5 + else: + return float(logical_track_num) diff --git a/passport/rwts/universal.py b/passport/rwts/universal.py new file mode 100644 index 0000000..444060b --- /dev/null +++ b/passport/rwts/universal.py @@ -0,0 +1,63 @@ +from passport.rwts import RWTS + +class UniversalRWTS(RWTS): + acceptable_address_prologues = ((0xD4,0xAA,0x96), (0xD5,0xAA,0x96)) + + def __init__(self, g): + RWTS.__init__(self, g, address_epilogue=[], data_epilogue=[]) + + def find_address_prologue(self, track): + starting_revolutions = track.revolutions + seen = [0,0,0] + while (track.revolutions < starting_revolutions + 2): + del seen[0] + seen.append(next(track.nibble())) + if tuple(seen) in self.acceptable_address_prologues: return True + return False + + def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): +# return True + if not self.address_epilogue: + self.address_epilogue = [next(track.nibble())] + result = True + else: + result = RWTS.verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num) + next(track.nibble()) + next(track.nibble()) + return result + + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + if not self.data_epilogue: + self.data_epilogue = [next(track.nibble())] + result = True + else: + result = RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) + next(track.nibble()) + next(track.nibble()) + return result + +class UniversalRWTSIgnoreEpilogues(UniversalRWTS): + def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + return True + + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + return True + +class Track00RWTS(UniversalRWTSIgnoreEpilogues): + def data_field_at_point(self, track, logical_track_num, physical_sector_num): + start_index = track.bit_index + start_revolutions = track.revolutions + decoded = UniversalRWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num) + if not decoded: + # If the sector didn't decode properly, rewind to the + # beginning of the data field before returning to the + # caller. This is for disks with a fake T00,S0A that + # is full of consecutive 0s, where if we consume the bitstream + # as nibbles, we'll end up consuming the next address field + # and it will seem like that sector doesn't exist. And that + # is generally logical sector 2, which is important not to + # miss at this stage because its absence triggers a different + # code path and everything falls apart. + track.bit_index = start_index + track.revolutions = start_revolutions + return decoded diff --git a/passport/strings.py b/passport/strings.py index 73a70ea..5d5d379 100644 --- a/passport/strings.py +++ b/passport/strings.py @@ -1,4 +1,4 @@ -__date__ = "2019-01-29" +__date__ = "2019-02-01" STRINGS = { "header": "Passport.py by 4am (" + __date__ + ")\n", # max 32 characters