Add SunburstRWTS

This commit is contained in:
4am 2019-01-29 18:30:21 -05:00
parent 802526fe6f
commit e615883ef7
6 changed files with 258 additions and 138 deletions

View File

@ -1,48 +1,86 @@
#!/usr/bin/env python3
# (c) 2018-9 by 4am
# MIT-licensed
from passport import eddimage, wozimage, a2rimage
from passport import DefaultLogger, DebugLogger
from passport import Crack, Verify, EDDToWoz
from passport import Crack, Verify, Convert
from passport.strings import STRINGS
import argparse
import os.path
import sys
def usage(error_code):
exe = sys.argv[0]
print(STRINGS["header"])
print("""usage: {exe} crack image.woz
{exe} verify image.woz
{exe} convert image.edd""".format(**locals()))
sys.exit(error_code)
__version__ = "0.2" # https://semver.org/
__date__ = "2019-01-29"
__progname__ = "passport"
args = len(sys.argv)
class BaseCommand:
def __init__(self, name):
self.name = name
self.logger = None
self.reader = None
self.processor = None
if args < 3:
usage(0)
def setup(self, subparser, description=None, epilog=None, help="disk image (.a2r, .woz, .edd)", formatter_class=argparse.HelpFormatter):
self.parser = subparser.add_parser(self.name, description=description, epilog=epilog, formatter_class=formatter_class)
self.parser.add_argument("file", help=help)
self.parser.set_defaults(action=self)
cmd, inputfile = sys.argv[1:3]
if cmd == "crack":
processor = Crack
elif cmd == "verify":
processor = Verify
elif cmd == "convert":
processor = EDDToWoz
else:
print("unrecognized command")
usage(1)
def __call__(self, args):
if not self.processor: return
if not self.reader:
base, ext = os.path.splitext(args.file)
ext = ext.lower()
if ext == ".woz":
self.reader = wozimage.WozReader
elif ext == ".edd":
self.reader = eddimage.EDDReader
elif ext == ".a2r":
self.reader = a2rimage.A2RImage
else:
print("unrecognized file type")
if not self.logger:
self.logger = args.debug and DebugLogger or DefaultLogger
self.processor(self.reader(args.file), self.logger)
base, ext = os.path.splitext(inputfile)
ext = ext.lower()
if ext == ".woz":
reader = wozimage.WozReader
elif ext == ".edd":
reader = eddimage.EDDReader
elif ext == ".a2r":
reader = a2rimage.A2RImage
else:
print("unrecognized file type")
usage(1)
class CommandVerify(BaseCommand):
def __init__(self):
BaseCommand.__init__(self, "verify")
self.processor = Verify
logger = DefaultLogger # TODO add flag to change this
def setup(self, subparser):
BaseCommand.setup(self, subparser,
description="Verify track structure and sector data in a disk image")
processor(reader(inputfile), logger)
class CommandConvert(BaseCommand):
def __init__(self):
BaseCommand.__init__(self, "convert")
self.processor = Convert
def setup(self, subparser):
BaseCommand.setup(self, subparser,
description="Convert a disk image to .woz format")
class CommandCrack(BaseCommand):
def __init__(self):
BaseCommand.__init__(self, "crack")
self.processor = Crack
def setup(self, subparser):
BaseCommand.setup(self, subparser,
description="Convert a disk image to .dsk format")
if __name__ == "__main__":
cmds = [CommandVerify(), CommandConvert(), CommandCrack()]
parser = argparse.ArgumentParser(prog=__progname__,
description="""A multi-purpose tool for working with copy-protected Apple II disk images.
See '""" + __progname__ + """ <command> -h' for help on individual commands.""",
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("-v", "--version", action="version", version=STRINGS["header"])
parser.add_argument("-d", "--debug", action="store_true", help="print debugging information while processing")
sp = parser.add_subparsers(dest="command", help="command")
for command in cmds:
command.setup(sp)
args = parser.parse_args()
args.action(args)

View File

@ -139,14 +139,15 @@ class RWTS:
self.sector_order = sector_order
self.nibble_translate_table = nibble_translate_table
self.g = g
self.track_num = 0
self.logical_track_num = 0
def seek(self, track_num):
self.track_num = track_num
def seek(self, logical_track_num):
self.logical_track_num = logical_track_num
return float(logical_track_num)
def reorder_to_logical_sectors(self, sectors):
def reorder_to_logical_sectors(self, physical_sectors):
logical = {}
for k, v in sectors.items():
for k, v in physical_sectors.items():
logical[self.sector_order[k]] = v
return logical
@ -166,13 +167,13 @@ class RWTS:
found.append(next(track.nibble()))
return tuple(found) == tuple(nibbles)
def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
return self.verify_nibbles_at_point(track, self.address_epilogue)
def find_data_prologue(self, track, track_num, physical_sector_num):
def find_data_prologue(self, track, logical_track_num, physical_sector_num):
return track.find(self.data_prologue)
def data_field_at_point(self, track, track_num, physical_sector_num):
def data_field_at_point(self, track, logical_track_num, physical_sector_num):
disk_nibbles = []
for i in range(343):
disk_nibbles.append(next(track.nibble()))
@ -206,10 +207,10 @@ class RWTS:
decoded[i + 172] += (((low2 & 0b010000) >> 3) + ((low2 & 0b100000) >> 5))
return bytearray(decoded)
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
return self.verify_nibbles_at_point(track, self.data_epilogue)
def decode_track(self, track, track_num, burn=0):
def decode_track(self, track, logical_track_num, burn=0):
sectors = collections.OrderedDict()
if not track: return sectors
starting_revolutions = track.revolutions
@ -243,24 +244,24 @@ class RWTS:
# so even if this copy doesn't pan out but a later copy does, sectors
# will still be in the original order
sectors[address_field.sector_num] = None
if not self.verify_address_epilogue_at_point(track, track_num, address_field.sector_num):
if not self.verify_address_epilogue_at_point(track, logical_track_num, address_field.sector_num):
# verifying the address field epilogue failed, but this is
# not necessarily fatal because there might be another copy
# of this sector later
self.g.logger.debug("verify_address_epilogue_at_point failed, continuing")
continue
if not self.find_data_prologue(track, track_num, address_field.sector_num):
if not self.find_data_prologue(track, logical_track_num, address_field.sector_num):
# if we can't find a data field prologue, just give up
self.g.logger.debug("find_data_prologue failed, giving up")
break
# read and decode the data field, and verify the data checksum
decoded = self.data_field_at_point(track, track_num, address_field.sector_num)
decoded = self.data_field_at_point(track, logical_track_num, address_field.sector_num)
if not decoded:
# decoding data field failed, but this is not necessarily fatal
# because there might be another copy of this sector later
self.g.logger.debug("data_field_at_point failed, continuing")
continue
if not self.verify_data_epilogue_at_point(track, track_num, address_field.sector_num):
if not self.verify_data_epilogue_at_point(track, logical_track_num, address_field.sector_num):
# verifying the data field epilogue failed, but this is
# not necessarily fatal because there might be another copy
# of this sector later
@ -303,39 +304,39 @@ class UniversalRWTS(RWTS):
if tuple(seen) in self.acceptable_address_prologues: return True
return False
def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
# return True
if not self.address_epilogue:
self.address_epilogue = [next(track.nibble())]
result = True
else:
result = RWTS.verify_address_epilogue_at_point(self, track, track_num, physical_sector_num)
result = RWTS.verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num)
next(track.nibble())
next(track.nibble())
return result
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
if not self.data_epilogue:
self.data_epilogue = [next(track.nibble())]
result = True
else:
result = RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num)
result = RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num)
next(track.nibble())
next(track.nibble())
return result
class UniversalRWTSIgnoreEpilogues(UniversalRWTS):
def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
return True
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
return True
class Track00RWTS(UniversalRWTSIgnoreEpilogues):
def data_field_at_point(self, track, track_num, physical_sector_num):
def data_field_at_point(self, track, logical_track_num, physical_sector_num):
start_index = track.bit_index
start_revolutions = track.revolutions
decoded = UniversalRWTS.data_field_at_point(self, track, track_num, physical_sector_num)
decoded = UniversalRWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num)
if not decoded:
# If the sector didn't decode properly, rewind to the
# beginning of the data field before returning to the
@ -378,6 +379,27 @@ class DOS33RWTS(RWTS):
for nibble in range(0x96, 0x100):
self.nibble_translate_table[nibble] = logical_sectors[4][nibble]
class SunburstRWTS(DOS33RWTS):
def reset(self, logical_sectors):
DOS33RWTS.reset(self, logical_sectors)
self.address_epilogue = (logical_sectors[3][0x91],)
self.data_epilogue = (logical_sectors[3][0x35],)
self.address_prologue_third_nibble_by_track = logical_sectors[4][0x29:]
self.data_prologue_third_nibble_by_track = logical_sectors[4][0x34:]
def seek(self, logical_track_num):
self.address_prologue = (self.address_prologue[0],
self.address_prologue[1],
self.address_prologue_third_nibble_by_track[logical_track_num])
self.data_prologue = (self.data_prologue[0],
self.data_prologue[1],
self.data_prologue_third_nibble_by_track[logical_track_num])
DOS33RWTS.seek(self, logical_track_num)
if logical_track_num >= 0x11:
return logical_track_num + 0.5
else:
return float(logical_track_num)
class BorderRWTS(DOS33RWTS):
# TODO doesn't work yet, not sure why
def reset(self, logical_sectors):
@ -411,7 +433,7 @@ class D5TimingBitRWTS(DOS33RWTS):
track.rewind(1)
return False
def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
return True
class InfocomRWTS(DOS33RWTS):
@ -419,29 +441,29 @@ class InfocomRWTS(DOS33RWTS):
DOS33RWTS.reset(self, logical_sectors)
self.data_prologue = self.data_prologue[:2]
def find_data_prologue(self, track, track_num, physical_sector_num):
if not DOS33RWTS.find_data_prologue(self, track, track_num, physical_sector_num):
def find_data_prologue(self, track, logical_track_num, physical_sector_num):
if not DOS33RWTS.find_data_prologue(self, track, logical_track_num, physical_sector_num):
return False
return next(track.nibble()) >= 0xAD
class OptimumResourceRWTS(DOS33RWTS):
def data_field_at_point(self, track, track_num, physical_sector_num):
if (track_num, physical_sector_num) == (0x01, 0x0F):
def data_field_at_point(self, track, logical_track_num, physical_sector_num):
if (logical_track_num, physical_sector_num) == (0x01, 0x0F):
# TODO actually decode these
disk_nibbles = []
for i in range(343):
disk_nibbles.append(next(track.nibble()))
return bytearray(256) # all zeroes for now
return DOS33RWTS.data_field_at_point(self, track, track_num, physical_sector_num)
return DOS33RWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num)
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
if (track_num, physical_sector_num) == (0x01, 0x0F):
def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
if (logical_track_num, physical_sector_num) == (0x01, 0x0F):
return True
return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num)
return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num)
class HeredityDogRWTS(DOS33RWTS):
def data_field_at_point(self, track, track_num, physical_sector_num):
if (track_num, physical_sector_num) == (0x00, 0x0A):
def data_field_at_point(self, track, logical_track_num, physical_sector_num):
if (logical_track_num, physical_sector_num) == (0x00, 0x0A):
# This sector is fake, full of too many consecutive 0s,
# designed to read differently every time. We go through
# and clean the stray bits, and be careful not to go past
@ -452,46 +474,46 @@ class HeredityDogRWTS(DOS33RWTS):
track.bits[track.bit_index-8:track.bit_index] = 0
self.g.found_and_cleaned_weakbits = True
return bytearray(256)
return DOS33RWTS.data_field_at_point(self, track, track_num, physical_sector_num)
return DOS33RWTS.data_field_at_point(self, track, logical_track_num, physical_sector_num)
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
if (track_num, physical_sector_num) == (0x00, 0x0A):
def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
if (logical_track_num, physical_sector_num) == (0x00, 0x0A):
return True
return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num)
return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num)
class BECARWTS(DOS33RWTS):
def is_protected_sector(self, track_num, physical_sector_num):
if track_num > 0: return True
def is_protected_sector(self, logical_track_num, physical_sector_num):
if logical_track_num > 0: return True
return physical_sector_num not in (0x00, 0x0D, 0x0B, 0x09, 0x07, 0x05, 0x03, 0x01, 0x0E, 0x0C)
def reset(self, logical_sectors):
DOS33RWTS.reset(self, logical_sectors)
self.data_prologue = self.data_prologue[:2]
def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
if self.is_protected_sector(track_num, physical_sector_num):
return DOS33RWTS.verify_address_epilogue_at_point(self, track, track_num, physical_sector_num)
def verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
if self.is_protected_sector(logical_track_num, physical_sector_num):
return DOS33RWTS.verify_address_epilogue_at_point(self, track, logical_track_num, physical_sector_num)
return True
def find_data_prologue(self, track, track_num, physical_sector_num):
if not DOS33RWTS.find_data_prologue(self, track, track_num, physical_sector_num):
def find_data_prologue(self, track, logical_track_num, physical_sector_num):
if not DOS33RWTS.find_data_prologue(self, track, logical_track_num, physical_sector_num):
return False
next(track.nibble())
if self.is_protected_sector(track_num, physical_sector_num):
if self.is_protected_sector(logical_track_num, physical_sector_num):
next(track.bit())
next(track.nibble())
next(track.bit())
next(track.bit())
return True
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
if self.is_protected_sector(track_num, physical_sector_num):
def verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num):
if self.is_protected_sector(logical_track_num, physical_sector_num):
next(track.nibble())
if track_num == 0:
if logical_track_num == 0:
next(track.nibble())
next(track.nibble())
return True
return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num)
return DOS33RWTS.verify_data_epilogue_at_point(self, track, logical_track_num, physical_sector_num)
class LaureateRWTS(DOS33RWTS):
# nibble table is in T00,S06
@ -639,11 +661,11 @@ class BasePassportProcessor: # base class
if self.run():
self.postprocess()
def SkipTrack(self, track_num, track):
def SkipTrack(self, logical_track_num, track):
# don't look for whole-track protections on track 0, that's silly
if track_num == 0: return False
if logical_track_num == 0: return False
# Electronic Arts protection track?
if track_num == 6:
if logical_track_num == 6:
if self.rwts.find_address_prologue(track):
address_field = self.rwts.address_field_at_point(track)
if address_field and address_field.track_num == 5: return True
@ -954,6 +976,31 @@ class BasePassportProcessor: # base class
return 2
return 0 # unknown variant
def IDSunburst(self, logical_sectors):
"""returns True if |logical_sectors| contains track 0 of a Sunburst disk, False otherwise"""
return find.wild_at(0x69, logical_sectors[0x04],
bytes.fromhex("48"
"A5 2A"
"4A"
"A8"
"B9 29 BA"
"8D 6A B9"
"8D 84 BC"
"B9 34 BA"
"8D FC B8"
"8D 5D B8"
"C0 11"
"D0 03"
"A9 02"
"AC"
"A9 0E"
"8D C0 BF"
"68"
"69 00"
"48"
"AD 78 04"
"90 2B"))
def IDBootloader(self, t00):
"""returns RWTS object that can (hopefully) read the rest of the disk"""
temporary_rwts_for_t00 = Track00RWTS(self.g)
@ -974,6 +1021,9 @@ class BasePassportProcessor: # base class
self.g.logger.PrintByID("dos33boot0")
if border.BorderPatcher(self.g).run(logical_sectors, 0):
return BorderRWTS(logical_sectors, self.g)
if self.IDSunburst(logical_sectors):
self.g.logger.PrintByID("sunburst")
return SunburstRWTS(logical_sectors, self.g)
return self.TraceDOS33(logical_sectors)
# TODO JSR08B3
if self.IDMECC(t00s00):
@ -1128,11 +1178,9 @@ class BasePassportProcessor: # base class
self.g.logger.PrintByID("header")
self.g.logger.PrintByID("reading", {"filename":self.g.disk_image.filename})
# get all raw track data from the source disk
# get raw track $00 data from the source disk
self.tracks = {}
for track_num in range(0x23):
self.tracks[float(track_num)] = self.g.disk_image.seek(float(track_num))
self.tracks[0] = self.g.disk_image.seek(0)
# analyze track $00 to create an RWTS
self.rwts = self.IDBootloader(self.tracks[0])
if not self.rwts: return False
@ -1142,19 +1190,29 @@ class BasePassportProcessor: # base class
self.patchers.append(P(self.g))
# main loop - loop through disk from track $22 down to track $00
for track_num in range(0x22, -1, -1):
self.g.track = track_num
self.rwts.seek(track_num)
for logical_track_num in range(0x22, -1, -1):
self.g.track = logical_track_num # for display purposes only
# distinguish between logical and physical track numbers to deal with
# disks like Sunburst that store logical track 0x11+ on physical track 0x11.5+
physical_track_num = self.rwts.seek(logical_track_num)
# self.tracks must be indexed by physical track number so we can write out
# .woz files correctly
self.tracks[physical_track_num] = self.g.disk_image.seek(physical_track_num)
self.g.logger.debug("Seeking to track %s" % hex(self.g.track))
try_again = True
while try_again:
try_again = False
physical_sectors = self.rwts.decode_track(self.tracks[track_num], track_num, self.burn)
physical_sectors = self.rwts.decode_track(self.tracks[physical_track_num], logical_track_num, self.burn)
if len(physical_sectors) == self.rwts.sectors_per_track:
# TODO this is bad, we should just ask the RWTS object if we decoded enough sectors,
# so that SunburstRWTS can override the logic on track 0x11
continue
else:
self.g.logger.debug("found %d sectors" % len(physical_sectors))
if (0x0F not in physical_sectors) and self.SkipTrack(track_num, self.tracks[track_num]):
if self.rwts.__class__ is SunburstRWTS and logical_track_num == 0x11:
# TODO this is bad, see above
continue
if (0x0F not in physical_sectors) and self.SkipTrack(logical_track_num, self.tracks[physical_track_num]):
physical_sectors = None
continue
# TODO wrong in case where we switch mid-track.
@ -1166,16 +1224,16 @@ class BasePassportProcessor: # base class
self.g.tried_univ = True
try_again = True
continue
if track_num == 0 and type(self.rwts) != UniversalRWTSIgnoreEpilogues:
if logical_track_num == 0 and type(self.rwts) != UniversalRWTSIgnoreEpilogues:
self.rwts = UniversalRWTSIgnoreEpilogues(self.g)
try_again = True
continue
self.g.logger.PrintByID("fail")
return False
self.save_track(track_num, physical_sectors)
self.save_track(physical_track_num, logical_track_num, physical_sectors)
return True
def save_track(self, track_num, physical_sectors):
def save_track(self, physical_track_num, logical_track_num, physical_sectors):
pass
def apply_patches(self, logical_sectors, patches):
@ -1208,18 +1266,19 @@ class Verify(BasePassportProcessor):
b'\x8C\xC0\xDD\x8C\xC0\xD0\xF6\x88'
b'\xD0\xF8\x68\xAA\xBD\x8E\xC0\xBD'
b'\x8C\xC0\xA0\x08\xBD\x8C\xC0\x48')
def save_track(self, track_num, physical_sectors):
def save_track(self, physical_track_num, logical_track_num, physical_sectors):
if not physical_sectors: return {}
logical_sectors = self.rwts.reorder_to_logical_sectors(physical_sectors)
should_run_patchers = (len(physical_sectors) == 16) # TODO
if should_run_patchers:
if track_num == 0:
# patchers operate on logical tracks
if logical_track_num == 0:
# set additional globals for patchers to use
self.AnalyzeT00(logical_sectors)
for patcher in self.patchers:
if patcher.should_run(track_num):
patches = patcher.run(logical_sectors, track_num)
if patcher.should_run(logical_track_num):
patches = patcher.run(logical_sectors, logical_track_num)
if patches:
self.apply_patches(logical_sectors, patches)
self.patches_found.extend(patches)
@ -1234,8 +1293,10 @@ class Verify(BasePassportProcessor):
self.g.logger.PrintByID("passver")
class Crack(Verify):
def save_track(self, track_num, physical_sectors):
self.output_tracks[float(track_num)] = Verify.save_track(self, track_num, physical_sectors)
def save_track(self, physical_track_num, logical_track_num, physical_sectors):
# output_tracks is indexed on logical track number here because the
# point of cracking is normalizing to logical tracks and sectors
self.output_tracks[logical_track_num] = Verify.save_track(self, physical_track_num, logical_track_num, physical_sectors)
def apply_patches(self, logical_sectors, patches):
for patch in patches:
@ -1255,9 +1316,9 @@ class Crack(Verify):
output_filename = source_base + '.dsk'
self.g.logger.PrintByID("writing", {"filename":output_filename})
with open(output_filename, "wb") as f:
for track_num in range(0x23):
if track_num in self.output_tracks:
f.write(concat_track(self.output_tracks[track_num]))
for logical_track_num in range(0x23):
if logical_track_num in self.output_tracks:
f.write(concat_track(self.output_tracks[logical_track_num]))
else:
f.write(bytes(256*16))
if self.patches_found:
@ -1265,22 +1326,23 @@ class Crack(Verify):
else:
self.g.logger.PrintByID("passcrack0")
class EDDToWoz(BasePassportProcessor):
class Convert(BasePassportProcessor):
def preprocess(self):
self.burn = 2
return True
def save_track(self, track_num, physical_sectors):
track_num = float(track_num)
track = self.tracks[track_num]
def save_track(self, physical_track_num, logical_track_num, physical_sectors):
track = self.tracks[physical_track_num]
if physical_sectors:
b = bitarray.bitarray(endian="big")
for s in physical_sectors.values():
b.extend(track.bits[s.start_bit_index:s.end_bit_index])
else:
# TODO this only works about half the time
# TODO call wozify here instead
b = track.bits[:51021]
self.output_tracks[track_num] = wozimage.Track(b, len(b))
# output_tracks is indexed on physical track number here because the
# point of .woz is to capture the physical layout of the original disk
self.output_tracks[physical_track_num] = wozimage.Track(b, len(b))
def postprocess(self):
source_base, source_ext = os.path.splitext(self.g.disk_image.filename)
@ -1291,9 +1353,9 @@ class EDDToWoz(BasePassportProcessor):
woz_image.info["write_protected"] = self.g.protection_enforces_write_protected
woz_image.meta["image_date"] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime())
for q in range(1 + (0x23 * 4)):
track_num = q / 4
if track_num in self.output_tracks:
woz_image.add_track(track_num, self.output_tracks[track_num])
physical_track_num = q / 4
if physical_track_num in self.output_tracks:
woz_image.add_track(physical_track_num, self.output_tracks[physical_track_num])
with open(output_filename, 'wb') as f:
woz_image.write(f)
try:

View File

@ -11,27 +11,44 @@ class A2RImage:
def to_bits(self, flux_record):
"""|flux_record| is a dictionary of 'capture_type', 'data_length', 'tick_count', and 'data'"""
if not flux_record or flux_record["capture_type"] != a2rchery.kCaptureTiming:
return [], 0
bits = bitarray.bitarray()
track_length = 0
estimated_track_length = 0
if not flux_record or flux_record["capture_type"] != a2rchery.kCaptureTiming:
return bits, estimated_track_length, 0
ticks = 0
flux_total = 0
fluxxen = flux_record["data"]
speeds = [(len([1 for i in fluxxen if i%t==0]), t) for t in range(0x1e,0x23)]
speeds = [(len([1 for i in fluxxen if i%t==0]), t) for t in range(0x1c,0x25)]
speeds.sort()
speed = speeds[-1][1]
for flux_value in fluxxen:
for flux_value in fluxxen[1:]:
ticks += flux_value
if not track_length and ticks > flux_record["tick_count"]:
track_length = len(bits)
if not estimated_track_length and ticks > flux_record["tick_count"]:
estimated_track_length = len(bits)
flux_total += flux_value
if flux_value == 0xFF:
continue
bits.extend([0] * ((flux_total - speed//2) // speed))
bits.append(1)
flux_total = 0
return bits, track_length
return bits, estimated_track_length, speed
def find_track_length(self, bits, estimated_track_length):
twice_bits = bits + bits
for matchlen in (8192, 4096, 2048, 1024):
if estimated_track_length < 32768 or len(bits) < 32768: continue
for offset in range(0, estimated_track_length, matchlen):
for length_delta in (0, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 7, -7, 8, -8, 9, -9, 10, -10, 11, -11, 12, -12):
real_length = estimated_track_length + length_delta
if real_length > 53168: continue
if twice_bits[8+offset:offset+matchlen] == twice_bits[real_length+8+offset:real_length+matchlen+offset]:
return real_length
return 0
def normalize(self, flux_records):
bits_and_lengths = [self.to_bits(flux_record) for flux_record in flux_records]
all_bits = [bits[8:self.find_track_length(bits, estimated_track_length)+8] for bits, estimated_track_length, speed in bits_and_lengths]
return all_bits
def seek(self, track_num):
if type(track_num) != float:
@ -44,7 +61,7 @@ class A2RImage:
if not self.tracks.get(location):
all_bits = bitarray.bitarray()
for flux_record in self.a2r_image.flux.get(location, [{}]):
bits, track_length = self.to_bits(flux_record)
bits, track_length, speed = self.to_bits(flux_record)
all_bits.extend(bits)
self.tracks[location] = Track(all_bits, len(all_bits))
self.tracks[location] = Track(all_bits, len(all_bits), speed=speed)
return self.tracks[location]

View File

@ -1,5 +1,5 @@
STRINGS = {
"header": "Passport.py by 4am (2018-09-10)\n", # max 32 characters
"header": "Passport.py by 4am (2019-01-28)\n", # max 32 characters
"reading": "Reading from {filename}\n",
"diskrwts": "Using disk's own RWTS\n",
"bb00": "T00,S05 Found $BB00 protection check\n"

View File

@ -4,7 +4,7 @@ WILDCARD = b'\x97'
def wild(source_bytes, search_bytes):
"""Search source_bytes (bytes object) for the first instance of search_bytes (bytes_object). search_bytes may contain WILDCARD, which matches any single byte (like "." in a regular expression). Returns index of first match, or -1 if no matches."""
search_bytes = re.escape(search_bytes).replace(b'\\'+WILDCARD, b'.')
search_bytes = re.escape(search_bytes).replace(WILDCARD, b'.')
match = re.search(search_bytes, source_bytes)
if match:
return match.start()

View File

@ -75,11 +75,12 @@ def raise_if(cond, e, s=""):
if cond: raise e(s)
class Track:
def __init__(self, bits, bit_count):
def __init__(self, bits, bit_count, speed=None):
self.bits = bits
while len(self.bits) > bit_count:
self.bits.pop()
self.bit_count = bit_count
self.speed = speed
self.bit_index = 0
self.revolutions = 0
@ -331,15 +332,17 @@ class WozWriter(WozValidator):
self.tmap = [0xFF]*160
self.meta = collections.OrderedDict()
def add_track(self, track_num, track):
tmap_id = int(track_num * 4)
def add(self, half_phase, track):
trk_id = len(self.tracks)
self.tracks.append(track)
self.tmap[tmap_id] = trk_id
if tmap_id:
self.tmap[tmap_id - 1] = trk_id
if tmap_id < 159:
self.tmap[tmap_id + 1] = trk_id
self.tmap[half_phase] = trk_id
# if half_phase:
# self.tmap[half_phase - 1] = trk_id
# if half_phase < 159:
# self.tmap[half_phase + 1] = trk_id
def add_track(self, track_num, track):
self.add(int(track_num * 4), track)
def build_info(self):
chunk = bytearray()