diff --git a/passport.py b/passport.py index 76e2562..c907efa 100755 --- a/passport.py +++ b/passport.py @@ -1,48 +1,86 @@ #!/usr/bin/env python3 +# (c) 2018-9 by 4am +# MIT-licensed + from passport import eddimage, wozimage, a2rimage from passport import DefaultLogger, DebugLogger -from passport import Crack, Verify, EDDToWoz +from passport import Crack, Verify, Convert from passport.strings import STRINGS +import argparse import os.path -import sys -def usage(error_code): - exe = sys.argv[0] - print(STRINGS["header"]) - print("""usage: {exe} crack image.woz - {exe} verify image.woz - {exe} convert image.edd""".format(**locals())) - sys.exit(error_code) +__version__ = "0.2" # https://semver.org/ +__date__ = "2019-01-29" +__progname__ = "passport" -args = len(sys.argv) +class BaseCommand: + def __init__(self, name): + self.name = name + self.logger = None + self.reader = None + self.processor = None -if args < 3: - usage(0) + def setup(self, subparser, description=None, epilog=None, help="disk image (.a2r, .woz, .edd)", formatter_class=argparse.HelpFormatter): + self.parser = subparser.add_parser(self.name, description=description, epilog=epilog, formatter_class=formatter_class) + self.parser.add_argument("file", help=help) + self.parser.set_defaults(action=self) -cmd, inputfile = sys.argv[1:3] -if cmd == "crack": - processor = Crack -elif cmd == "verify": - processor = Verify -elif cmd == "convert": - processor = EDDToWoz -else: - print("unrecognized command") - usage(1) + def __call__(self, args): + if not self.processor: return + if not self.reader: + base, ext = os.path.splitext(args.file) + ext = ext.lower() + if ext == ".woz": + self.reader = wozimage.WozReader + elif ext == ".edd": + self.reader = eddimage.EDDReader + elif ext == ".a2r": + self.reader = a2rimage.A2RImage + else: + print("unrecognized file type") + if not self.logger: + self.logger = args.debug and DebugLogger or DefaultLogger + self.processor(self.reader(args.file), self.logger) -base, ext = os.path.splitext(inputfile) -ext = ext.lower() -if ext == ".woz": - reader = wozimage.WozReader -elif ext == ".edd": - reader = eddimage.EDDReader -elif ext == ".a2r": - reader = a2rimage.A2RImage -else: - print("unrecognized file type") - usage(1) +class CommandVerify(BaseCommand): + def __init__(self): + BaseCommand.__init__(self, "verify") + self.processor = Verify -logger = DefaultLogger # TODO add flag to change this + def setup(self, subparser): + BaseCommand.setup(self, subparser, + description="Verify track structure and sector data in a disk image") -processor(reader(inputfile), logger) +class CommandConvert(BaseCommand): + def __init__(self): + BaseCommand.__init__(self, "convert") + self.processor = Convert + + def setup(self, subparser): + BaseCommand.setup(self, subparser, + description="Convert a disk image to .woz format") + +class CommandCrack(BaseCommand): + def __init__(self): + BaseCommand.__init__(self, "crack") + self.processor = Crack + + def setup(self, subparser): + BaseCommand.setup(self, subparser, + description="Convert a disk image to .dsk format") + +if __name__ == "__main__": + cmds = [CommandVerify(), CommandConvert(), CommandCrack()] + parser = argparse.ArgumentParser(prog=__progname__, + description="""A multi-purpose tool for working with copy-protected Apple II disk images. + +See '""" + __progname__ + """ -h' for help on individual commands.""", + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("-v", "--version", action="version", version=STRINGS["header"]) + parser.add_argument("-d", "--debug", action="store_true", help="print debugging information while processing") + sp = parser.add_subparsers(dest="command", help="command") + for command in cmds: + command.setup(sp) + args = parser.parse_args() + args.action(args) diff --git a/passport/__init__.py b/passport/__init__.py index c778e45..f507e2a 100755 --- a/passport/__init__.py +++ b/passport/__init__.py @@ -139,14 +139,15 @@ class RWTS: self.sector_order = sector_order self.nibble_translate_table = nibble_translate_table self.g = g - self.track_num = 0 + self.logical_track_num = 0 - def seek(self, track_num): - self.track_num = track_num + def seek(self, logical_track_num): + self.logical_track_num = logical_track_num + return float(logical_track_num) - def reorder_to_logical_sectors(self, sectors): + def reorder_to_logical_sectors(self, physical_sectors): logical = {} - for k, v in sectors.items(): + for k, v in physical_sectors.items(): logical[self.sector_order[k]] = v return logical @@ -166,13 +167,13 @@ class RWTS: found.append(next(track.nibble())) return tuple(found) == tuple(nibbles) - def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): + def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): return self.verify_nibbles_at_point(track, self.address_epilogue) - def find_data_prologue(self, track, track_num, physical_sector_num): + def find_data_prologue(self, track, logical_track_num, physical_sector_num): return track.find(self.data_prologue) - def data_field_at_point(self, track, track_num, physical_sector_num): + def data_field_at_point(self, track, logical_track_num, physical_sector_num): disk_nibbles = [] for i in range(343): disk_nibbles.append(next(track.nibble())) @@ -206,10 +207,10 @@ class RWTS: decoded[i + 172] += (((low2 & 0b010000) >> 3) + ((low2 & 0b100000) >> 5)) return bytearray(decoded) - def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): return self.verify_nibbles_at_point(track, self.data_epilogue) - def decode_track(self, track, track_num, burn=0): + def decode_track(self, track, logical_track_num, burn=0): sectors = collections.OrderedDict() if not track: return sectors starting_revolutions = track.revolutions @@ -243,24 +244,24 @@ class RWTS: # so even if this copy doesn't pan out but a later copy does, sectors # will still be in the original order sectors[address_field.sector_num] = None - if not self.verify_address_epilogue_at_point(track, track_num, address_field.sector_num): + if not self.verify_address_epilogue_at_point(track, logical_track_num, address_field.sector_num): # verifying the address field epilogue failed, but this is # not necessarily fatal because there might be another copy # of this sector later self.g.logger.debug("verify_address_epilogue_at_point failed, continuing") continue - if not self.find_data_prologue(track, track_num, address_field.sector_num): + if not self.find_data_prologue(track, logical_track_num, address_field.sector_num): # if we can't find a data field prologue, just give up self.g.logger.debug("find_data_prologue failed, giving up") break # read and decode the data field, and verify the data checksum - decoded = self.data_field_at_point(track, track_num, address_field.sector_num) + decoded = self.data_field_at_point(track, logical_track_num, address_field.sector_num) if not decoded: # decoding data field failed, but this is not necessarily fatal # because there might be another copy of this sector later self.g.logger.debug("data_field_at_point failed, continuing") continue - if not self.verify_data_epilogue_at_point(track, track_num, address_field.sector_num): + if not self.verify_data_epilogue_at_point(track, logical_track_num, address_field.sector_num): # verifying the data field epilogue failed, but this is # not necessarily fatal because there might be another copy # of this sector later @@ -303,39 +304,39 @@ class UniversalRWTS(RWTS): if tuple(seen) in self.acceptable_address_prologues: return True return False - def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): + def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): # return True if not self.address_epilogue: self.address_epilogue = [next(track.nibble())] result = True else: - result = RWTS.verify_address_epilogue_at_point(self, track, track_num, physical_sector_num) + result = RWTS.verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num) next(track.nibble()) next(track.nibble()) return result - def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): if not self.data_epilogue: self.data_epilogue = [next(track.nibble())] result = True else: - result = RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num) + result = RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) next(track.nibble()) next(track.nibble()) return result class UniversalRWTSIgnoreEpilogues(UniversalRWTS): - def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): + def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): return True - def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): return True class Track00RWTS(UniversalRWTSIgnoreEpilogues): - def data_field_at_point(self, track, track_num, physical_sector_num): + def data_field_at_point(self, track, logical_track_num, physical_sector_num): start_index = track.bit_index start_revolutions = track.revolutions - decoded = UniversalRWTS.data_field_at_point(self, track, track_num, physical_sector_num) + decoded = UniversalRWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num) if not decoded: # If the sector didn't decode properly, rewind to the # beginning of the data field before returning to the @@ -378,6 +379,27 @@ class DOS33RWTS(RWTS): for nibble in range(0x96, 0x100): self.nibble_translate_table[nibble] = logical_sectors[4][nibble] +class SunburstRWTS(DOS33RWTS): + def reset(self, logical_sectors): + DOS33RWTS.reset(self, logical_sectors) + self.address_epilogue = (logical_sectors[3][0x91],) + self.data_epilogue = (logical_sectors[3][0x35],) + self.address_prologue_third_nibble_by_track = logical_sectors[4][0x29:] + self.data_prologue_third_nibble_by_track = logical_sectors[4][0x34:] + + def seek(self, logical_track_num): + self.address_prologue = (self.address_prologue[0], + self.address_prologue[1], + self.address_prologue_third_nibble_by_track[logical_track_num]) + self.data_prologue = (self.data_prologue[0], + self.data_prologue[1], + self.data_prologue_third_nibble_by_track[logical_track_num]) + DOS33RWTS.seek(self, logical_track_num) + if logical_track_num >= 0x11: + return logical_track_num + 0.5 + else: + return float(logical_track_num) + class BorderRWTS(DOS33RWTS): # TODO doesn't work yet, not sure why def reset(self, logical_sectors): @@ -411,7 +433,7 @@ class D5TimingBitRWTS(DOS33RWTS): track.rewind(1) return False - def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): + def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): return True class InfocomRWTS(DOS33RWTS): @@ -419,29 +441,29 @@ class InfocomRWTS(DOS33RWTS): DOS33RWTS.reset(self, logical_sectors) self.data_prologue = self.data_prologue[:2] - def find_data_prologue(self, track, track_num, physical_sector_num): - if not DOS33RWTS.find_data_prologue(self, track, track_num, physical_sector_num): + def find_data_prologue(self, track, logical_track_num, physical_sector_num): + if not DOS33RWTS.find_data_prologue(self, track, logical_track_num, physical_sector_num): return False return next(track.nibble()) >= 0xAD class OptimumResourceRWTS(DOS33RWTS): - def data_field_at_point(self, track, track_num, physical_sector_num): - if (track_num, physical_sector_num) == (0x01, 0x0F): + def data_field_at_point(self, track, logical_track_num, physical_sector_num): + if (logical_track_num, physical_sector_num) == (0x01, 0x0F): # TODO actually decode these disk_nibbles = [] for i in range(343): disk_nibbles.append(next(track.nibble())) return bytearray(256) # all zeroes for now - return DOS33RWTS.data_field_at_point(self, track, track_num, physical_sector_num) + return DOS33RWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num) - def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): - if (track_num, physical_sector_num) == (0x01, 0x0F): + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + if (logical_track_num, physical_sector_num) == (0x01, 0x0F): return True - return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num) + return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) class HeredityDogRWTS(DOS33RWTS): - def data_field_at_point(self, track, track_num, physical_sector_num): - if (track_num, physical_sector_num) == (0x00, 0x0A): + def data_field_at_point(self, track, logical_track_num, physical_sector_num): + if (logical_track_num, physical_sector_num) == (0x00, 0x0A): # This sector is fake, full of too many consecutive 0s, # designed to read differently every time. We go through # and clean the stray bits, and be careful not to go past @@ -452,46 +474,46 @@ class HeredityDogRWTS(DOS33RWTS): track.bits[track.bit_index-8:track.bit_index] = 0 self.g.found_and_cleaned_weakbits = True return bytearray(256) - return DOS33RWTS.data_field_at_point(self, track, track_num, physical_sector_num) + return DOS33RWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num) - def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): - if (track_num, physical_sector_num) == (0x00, 0x0A): + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + if (logical_track_num, physical_sector_num) == (0x00, 0x0A): return True - return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num) + return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) class BECARWTS(DOS33RWTS): - def is_protected_sector(self, track_num, physical_sector_num): - if track_num > 0: return True + def is_protected_sector(self, logical_track_num, physical_sector_num): + if logical_track_num > 0: return True return physical_sector_num not in (0x00, 0x0D, 0x0B, 0x09, 0x07, 0x05, 0x03, 0x01, 0x0E, 0x0C) def reset(self, logical_sectors): DOS33RWTS.reset(self, logical_sectors) self.data_prologue = self.data_prologue[:2] - def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): - if self.is_protected_sector(track_num, physical_sector_num): - return DOS33RWTS.verify_address_epilogue_at_point(self, track, track_num, physical_sector_num) + def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + if self.is_protected_sector(logical_track_num, physical_sector_num): + return DOS33RWTS.verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num) return True - def find_data_prologue(self, track, track_num, physical_sector_num): - if not DOS33RWTS.find_data_prologue(self, track, track_num, physical_sector_num): + def find_data_prologue(self, track, logical_track_num, physical_sector_num): + if not DOS33RWTS.find_data_prologue(self, track, logical_track_num, physical_sector_num): return False next(track.nibble()) - if self.is_protected_sector(track_num, physical_sector_num): + if self.is_protected_sector(logical_track_num, physical_sector_num): next(track.bit()) next(track.nibble()) next(track.bit()) next(track.bit()) return True - def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): - if self.is_protected_sector(track_num, physical_sector_num): + def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num): + if self.is_protected_sector(logical_track_num, physical_sector_num): next(track.nibble()) - if track_num == 0: + if logical_track_num == 0: next(track.nibble()) next(track.nibble()) return True - return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num) + return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num) class LaureateRWTS(DOS33RWTS): # nibble table is in T00,S06 @@ -639,11 +661,11 @@ class BasePassportProcessor: # base class if self.run(): self.postprocess() - def SkipTrack(self, track_num, track): + def SkipTrack(self, logical_track_num, track): # don't look for whole-track protections on track 0, that's silly - if track_num == 0: return False + if logical_track_num == 0: return False # Electronic Arts protection track? - if track_num == 6: + if logical_track_num == 6: if self.rwts.find_address_prologue(track): address_field = self.rwts.address_field_at_point(track) if address_field and address_field.track_num == 5: return True @@ -954,6 +976,31 @@ class BasePassportProcessor: # base class return 2 return 0 # unknown variant + def IDSunburst(self, logical_sectors): + """returns True if |logical_sectors| contains track 0 of a Sunburst disk, False otherwise""" + return find.wild_at(0x69, logical_sectors[0x04], + bytes.fromhex("48" + "A5 2A" + "4A" + "A8" + "B9 29 BA" + "8D 6A B9" + "8D 84 BC" + "B9 34 BA" + "8D FC B8" + "8D 5D B8" + "C0 11" + "D0 03" + "A9 02" + "AC" + "A9 0E" + "8D C0 BF" + "68" + "69 00" + "48" + "AD 78 04" + "90 2B")) + def IDBootloader(self, t00): """returns RWTS object that can (hopefully) read the rest of the disk""" temporary_rwts_for_t00 = Track00RWTS(self.g) @@ -974,6 +1021,9 @@ class BasePassportProcessor: # base class self.g.logger.PrintByID("dos33boot0") if border.BorderPatcher(self.g).run(logical_sectors, 0): return BorderRWTS(logical_sectors, self.g) + if self.IDSunburst(logical_sectors): + self.g.logger.PrintByID("sunburst") + return SunburstRWTS(logical_sectors, self.g) return self.TraceDOS33(logical_sectors) # TODO JSR08B3 if self.IDMECC(t00s00): @@ -1128,11 +1178,9 @@ class BasePassportProcessor: # base class self.g.logger.PrintByID("header") self.g.logger.PrintByID("reading", {"filename":self.g.disk_image.filename}) - # get all raw track data from the source disk + # get raw track $00 data from the source disk self.tracks = {} - for track_num in range(0x23): - self.tracks[float(track_num)] = self.g.disk_image.seek(float(track_num)) - + self.tracks[0] = self.g.disk_image.seek(0) # analyze track $00 to create an RWTS self.rwts = self.IDBootloader(self.tracks[0]) if not self.rwts: return False @@ -1142,19 +1190,29 @@ class BasePassportProcessor: # base class self.patchers.append(P(self.g)) # main loop - loop through disk from track $22 down to track $00 - for track_num in range(0x22, -1, -1): - self.g.track = track_num - self.rwts.seek(track_num) + for logical_track_num in range(0x22, -1, -1): + self.g.track = logical_track_num # for display purposes only + # distinguish between logical and physical track numbers to deal with + # disks like Sunburst that store logical track 0x11+ on physical track 0x11.5+ + physical_track_num = self.rwts.seek(logical_track_num) + # self.tracks must be indexed by physical track number so we can write out + # .woz files correctly + self.tracks[physical_track_num] = self.g.disk_image.seek(physical_track_num) self.g.logger.debug("Seeking to track %s" % hex(self.g.track)) try_again = True while try_again: try_again = False - physical_sectors = self.rwts.decode_track(self.tracks[track_num], track_num, self.burn) + physical_sectors = self.rwts.decode_track(self.tracks[physical_track_num], logical_track_num, self.burn) if len(physical_sectors) == self.rwts.sectors_per_track: + # TODO this is bad, we should just ask the RWTS object if we decoded enough sectors, + # so that SunburstRWTS can override the logic on track 0x11 continue else: self.g.logger.debug("found %d sectors" % len(physical_sectors)) - if (0x0F not in physical_sectors) and self.SkipTrack(track_num, self.tracks[track_num]): + if self.rwts.__class__ is SunburstRWTS and logical_track_num == 0x11: + # TODO this is bad, see above + continue + if (0x0F not in physical_sectors) and self.SkipTrack(logical_track_num, self.tracks[physical_track_num]): physical_sectors = None continue # TODO wrong in case where we switch mid-track. @@ -1166,16 +1224,16 @@ class BasePassportProcessor: # base class self.g.tried_univ = True try_again = True continue - if track_num == 0 and type(self.rwts) != UniversalRWTSIgnoreEpilogues: + if logical_track_num == 0 and type(self.rwts) != UniversalRWTSIgnoreEpilogues: self.rwts = UniversalRWTSIgnoreEpilogues(self.g) try_again = True continue self.g.logger.PrintByID("fail") return False - self.save_track(track_num, physical_sectors) + self.save_track(physical_track_num, logical_track_num, physical_sectors) return True - def save_track(self, track_num, physical_sectors): + def save_track(self, physical_track_num, logical_track_num, physical_sectors): pass def apply_patches(self, logical_sectors, patches): @@ -1208,18 +1266,19 @@ class Verify(BasePassportProcessor): b'\x8C\xC0\xDD\x8C\xC0\xD0\xF6\x88' b'\xD0\xF8\x68\xAA\xBD\x8E\xC0\xBD' b'\x8C\xC0\xA0\x08\xBD\x8C\xC0\x48') - - def save_track(self, track_num, physical_sectors): + + def save_track(self, physical_track_num, logical_track_num, physical_sectors): if not physical_sectors: return {} logical_sectors = self.rwts.reorder_to_logical_sectors(physical_sectors) should_run_patchers = (len(physical_sectors) == 16) # TODO if should_run_patchers: - if track_num == 0: + # patchers operate on logical tracks + if logical_track_num == 0: # set additional globals for patchers to use self.AnalyzeT00(logical_sectors) for patcher in self.patchers: - if patcher.should_run(track_num): - patches = patcher.run(logical_sectors, track_num) + if patcher.should_run(logical_track_num): + patches = patcher.run(logical_sectors, logical_track_num) if patches: self.apply_patches(logical_sectors, patches) self.patches_found.extend(patches) @@ -1234,8 +1293,10 @@ class Verify(BasePassportProcessor): self.g.logger.PrintByID("passver") class Crack(Verify): - def save_track(self, track_num, physical_sectors): - self.output_tracks[float(track_num)] = Verify.save_track(self, track_num, physical_sectors) + def save_track(self, physical_track_num, logical_track_num, physical_sectors): + # output_tracks is indexed on logical track number here because the + # point of cracking is normalizing to logical tracks and sectors + self.output_tracks[logical_track_num] = Verify.save_track(self, physical_track_num, logical_track_num, physical_sectors) def apply_patches(self, logical_sectors, patches): for patch in patches: @@ -1255,9 +1316,9 @@ class Crack(Verify): output_filename = source_base + '.dsk' self.g.logger.PrintByID("writing", {"filename":output_filename}) with open(output_filename, "wb") as f: - for track_num in range(0x23): - if track_num in self.output_tracks: - f.write(concat_track(self.output_tracks[track_num])) + for logical_track_num in range(0x23): + if logical_track_num in self.output_tracks: + f.write(concat_track(self.output_tracks[logical_track_num])) else: f.write(bytes(256*16)) if self.patches_found: @@ -1265,22 +1326,23 @@ class Crack(Verify): else: self.g.logger.PrintByID("passcrack0") -class EDDToWoz(BasePassportProcessor): +class Convert(BasePassportProcessor): def preprocess(self): self.burn = 2 return True - def save_track(self, track_num, physical_sectors): - track_num = float(track_num) - track = self.tracks[track_num] + def save_track(self, physical_track_num, logical_track_num, physical_sectors): + track = self.tracks[physical_track_num] if physical_sectors: b = bitarray.bitarray(endian="big") for s in physical_sectors.values(): b.extend(track.bits[s.start_bit_index:s.end_bit_index]) else: - # TODO this only works about half the time + # TODO call wozify here instead b = track.bits[:51021] - self.output_tracks[track_num] = wozimage.Track(b, len(b)) + # output_tracks is indexed on physical track number here because the + # point of .woz is to capture the physical layout of the original disk + self.output_tracks[physical_track_num] = wozimage.Track(b, len(b)) def postprocess(self): source_base, source_ext = os.path.splitext(self.g.disk_image.filename) @@ -1291,9 +1353,9 @@ class EDDToWoz(BasePassportProcessor): woz_image.info["write_protected"] = self.g.protection_enforces_write_protected woz_image.meta["image_date"] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()) for q in range(1 + (0x23 * 4)): - track_num = q / 4 - if track_num in self.output_tracks: - woz_image.add_track(track_num, self.output_tracks[track_num]) + physical_track_num = q / 4 + if physical_track_num in self.output_tracks: + woz_image.add_track(physical_track_num, self.output_tracks[physical_track_num]) with open(output_filename, 'wb') as f: woz_image.write(f) try: diff --git a/passport/a2rimage.py b/passport/a2rimage.py index 8f27b8a..160e933 100755 --- a/passport/a2rimage.py +++ b/passport/a2rimage.py @@ -11,27 +11,44 @@ class A2RImage: def to_bits(self, flux_record): """|flux_record| is a dictionary of 'capture_type', 'data_length', 'tick_count', and 'data'""" - if not flux_record or flux_record["capture_type"] != a2rchery.kCaptureTiming: - return [], 0 bits = bitarray.bitarray() - track_length = 0 + estimated_track_length = 0 + if not flux_record or flux_record["capture_type"] != a2rchery.kCaptureTiming: + return bits, estimated_track_length, 0 ticks = 0 flux_total = 0 fluxxen = flux_record["data"] - speeds = [(len([1 for i in fluxxen if i%t==0]), t) for t in range(0x1e,0x23)] + speeds = [(len([1 for i in fluxxen if i%t==0]), t) for t in range(0x1c,0x25)] speeds.sort() speed = speeds[-1][1] - for flux_value in fluxxen: + for flux_value in fluxxen[1:]: ticks += flux_value - if not track_length and ticks > flux_record["tick_count"]: - track_length = len(bits) + if not estimated_track_length and ticks > flux_record["tick_count"]: + estimated_track_length = len(bits) flux_total += flux_value if flux_value == 0xFF: continue bits.extend([0] * ((flux_total - speed//2) // speed)) bits.append(1) flux_total = 0 - return bits, track_length + return bits, estimated_track_length, speed + + def find_track_length(self, bits, estimated_track_length): + twice_bits = bits + bits + for matchlen in (8192, 4096, 2048, 1024): + if estimated_track_length < 32768 or len(bits) < 32768: continue + for offset in range(0, estimated_track_length, matchlen): + for length_delta in (0, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 7, -7, 8, -8, 9, -9, 10, -10, 11, -11, 12, -12): + real_length = estimated_track_length + length_delta + if real_length > 53168: continue + if twice_bits[8+offset:offset+matchlen] == twice_bits[real_length+8+offset:real_length+matchlen+offset]: + return real_length + return 0 + + def normalize(self, flux_records): + bits_and_lengths = [self.to_bits(flux_record) for flux_record in flux_records] + all_bits = [bits[8:self.find_track_length(bits, estimated_track_length)+8] for bits, estimated_track_length, speed in bits_and_lengths] + return all_bits def seek(self, track_num): if type(track_num) != float: @@ -44,7 +61,7 @@ class A2RImage: if not self.tracks.get(location): all_bits = bitarray.bitarray() for flux_record in self.a2r_image.flux.get(location, [{}]): - bits, track_length = self.to_bits(flux_record) + bits, track_length, speed = self.to_bits(flux_record) all_bits.extend(bits) - self.tracks[location] = Track(all_bits, len(all_bits)) + self.tracks[location] = Track(all_bits, len(all_bits), speed=speed) return self.tracks[location] diff --git a/passport/strings.py b/passport/strings.py index c3c6965..29f29c5 100644 --- a/passport/strings.py +++ b/passport/strings.py @@ -1,5 +1,5 @@ STRINGS = { - "header": "Passport.py by 4am (2018-09-10)\n", # max 32 characters + "header": "Passport.py by 4am (2019-01-28)\n", # max 32 characters "reading": "Reading from {filename}\n", "diskrwts": "Using disk's own RWTS\n", "bb00": "T00,S05 Found $BB00 protection check\n" diff --git a/passport/util/find.py b/passport/util/find.py index 56e9676..1f8ba8c 100644 --- a/passport/util/find.py +++ b/passport/util/find.py @@ -4,7 +4,7 @@ WILDCARD = b'\x97' def wild(source_bytes, search_bytes): """Search source_bytes (bytes object) for the first instance of search_bytes (bytes_object). search_bytes may contain WILDCARD, which matches any single byte (like "." in a regular expression). Returns index of first match, or -1 if no matches.""" - search_bytes = re.escape(search_bytes).replace(b'\\'+WILDCARD, b'.') + search_bytes = re.escape(search_bytes).replace(WILDCARD, b'.') match = re.search(search_bytes, source_bytes) if match: return match.start() diff --git a/passport/wozimage.py b/passport/wozimage.py index 70a4c05..242be74 100755 --- a/passport/wozimage.py +++ b/passport/wozimage.py @@ -75,11 +75,12 @@ def raise_if(cond, e, s=""): if cond: raise e(s) class Track: - def __init__(self, bits, bit_count): + def __init__(self, bits, bit_count, speed=None): self.bits = bits while len(self.bits) > bit_count: self.bits.pop() self.bit_count = bit_count + self.speed = speed self.bit_index = 0 self.revolutions = 0 @@ -331,15 +332,17 @@ class WozWriter(WozValidator): self.tmap = [0xFF]*160 self.meta = collections.OrderedDict() - def add_track(self, track_num, track): - tmap_id = int(track_num * 4) + def add(self, half_phase, track): trk_id = len(self.tracks) self.tracks.append(track) - self.tmap[tmap_id] = trk_id - if tmap_id: - self.tmap[tmap_id - 1] = trk_id - if tmap_id < 159: - self.tmap[tmap_id + 1] = trk_id + self.tmap[half_phase] = trk_id +# if half_phase: +# self.tmap[half_phase - 1] = trk_id +# if half_phase < 159: +# self.tmap[half_phase + 1] = trk_id + + def add_track(self, track_num, track): + self.add(int(track_num * 4), track) def build_info(self): chunk = bytearray()