add support for Infocom, Optimum Resource, Heredity Dog, JMPBECA, and update to latest wozardry

This commit is contained in:
4am 2018-06-07 10:28:25 -04:00
parent 3cf201f555
commit 64bf4e237a
3 changed files with 369 additions and 127 deletions

View File

@ -82,6 +82,8 @@ class PassportGlobals:
self.force_disk_vol = False self.force_disk_vol = False
self.captured_disk_volume_number = False self.captured_disk_volume_number = False
self.disk_volume_number = None self.disk_volume_number = None
self.found_and_cleaned_weakbits = False
self.protection_enforces_write_protected = False
# things about the conversion process # things about the conversion process
self.tried_univ = False self.tried_univ = False
self.track = 0 self.track = 0
@ -124,14 +126,14 @@ class RWTS:
} }
def __init__(self, def __init__(self,
g,
sectors_per_track = 16, sectors_per_track = 16,
address_prologue = kDefaultAddressPrologue16, address_prologue = kDefaultAddressPrologue16,
address_epilogue = kDefaultAddressEpilogue16, address_epilogue = kDefaultAddressEpilogue16,
data_prologue = kDefaultDataPrologue16, data_prologue = kDefaultDataPrologue16,
data_epilogue = kDefaultDataEpilogue16, data_epilogue = kDefaultDataEpilogue16,
sector_order = kDefaultSectorOrder16, sector_order = kDefaultSectorOrder16,
nibble_translate_table = kDefaultNibbleTranslationTable16, nibble_translate_table = kDefaultNibbleTranslationTable16):
logger = None):
self.sectors_per_track = sectors_per_track self.sectors_per_track = sectors_per_track
self.address_prologue = address_prologue self.address_prologue = address_prologue
self.address_epilogue = address_epilogue self.address_epilogue = address_epilogue
@ -139,7 +141,7 @@ class RWTS:
self.data_epilogue = data_epilogue self.data_epilogue = data_epilogue
self.sector_order = sector_order self.sector_order = sector_order
self.nibble_translate_table = nibble_translate_table self.nibble_translate_table = nibble_translate_table
self.logger = logger or SilentLogger self.g = g
self.track_num = 0 self.track_num = 0
def seek(self, track_num): def seek(self, track_num):
@ -167,13 +169,13 @@ class RWTS:
found.append(next(track.nibble())) found.append(next(track.nibble()))
return tuple(found) == tuple(nibbles) return tuple(found) == tuple(nibbles)
def verify_address_epilogue_at_point(self, track): def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
return self.verify_nibbles_at_point(track, self.address_epilogue) return self.verify_nibbles_at_point(track, self.address_epilogue)
def find_data_prologue(self, track): def find_data_prologue(self, track, track_num, physical_sector_num):
return track.find(self.data_prologue) return track.find(self.data_prologue)
def data_field_at_point(self, track): def data_field_at_point(self, track, track_num, physical_sector_num):
disk_nibbles = [] disk_nibbles = []
for i in range(343): for i in range(343):
disk_nibbles.append(next(track.nibble())) disk_nibbles.append(next(track.nibble()))
@ -207,10 +209,10 @@ class RWTS:
decoded[i + 172] += (((low2 & 0b010000) >> 3) + ((low2 & 0b100000) >> 5)) decoded[i + 172] += (((low2 & 0b010000) >> 3) + ((low2 & 0b100000) >> 5))
return bytearray(decoded) return bytearray(decoded)
def verify_data_epilogue_at_point(self, track): def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
return self.verify_nibbles_at_point(track, self.data_epilogue) return self.verify_nibbles_at_point(track, self.data_epilogue)
def decode_track(self, track, burn=0): def decode_track(self, track, track_num, burn=0):
sectors = collections.OrderedDict() sectors = collections.OrderedDict()
if not track: return sectors if not track: return sectors
starting_revolutions = track.revolutions starting_revolutions = track.revolutions
@ -221,7 +223,7 @@ class RWTS:
start_bit_index = track.bit_index start_bit_index = track.bit_index
if not self.find_address_prologue(track): if not self.find_address_prologue(track):
# if we can't even find a single address prologue, just give up # if we can't even find a single address prologue, just give up
self.logger.debug("can't find a single address prologue so LGTM or whatever") self.g.logger.debug("can't find a single address prologue so LGTM or whatever")
break break
# for edd->woz conversion, only save some of the bits preceding # for edd->woz conversion, only save some of the bits preceding
# the address prologue # the address prologue
@ -229,43 +231,43 @@ class RWTS:
start_bit_index = track.bit_index - 256 start_bit_index = track.bit_index - 256
# decode address field # decode address field
address_field = self.address_field_at_point(track) address_field = self.address_field_at_point(track)
self.logger.debug("found sector %s" % hex(address_field.sector_num)[2:].upper()) self.g.logger.debug("found sector %s" % hex(address_field.sector_num)[2:].upper())
if address_field.sector_num in verified_sectors: if address_field.sector_num in verified_sectors:
# the sector we just found is a sector we've already decoded # the sector we just found is a sector we've already decoded
# properly, so skip it # properly, so skip it
self.logger.debug("duplicate sector %d, continuing" % address_field.sector_num) self.g.logger.debug("duplicate sector %d, continuing" % address_field.sector_num)
continue continue
if address_field.sector_num > self.sectors_per_track: if address_field.sector_num > self.sectors_per_track:
# found a weird sector whose ID is out of range # found a weird sector whose ID is out of range
# TODO: will eventually need to tweak this logic to handle Ultima V and others # TODO: will eventually need to tweak this logic to handle Ultima V and others
self.logger.debug("sector ID out of range %d" % address_field.sector_num) self.g.logger.debug("sector ID out of range %d" % address_field.sector_num)
continue continue
# put a placeholder for this sector in this position in the ordered dict # put a placeholder for this sector in this position in the ordered dict
# so even if this copy doesn't pan out but a later copy does, sectors # so even if this copy doesn't pan out but a later copy does, sectors
# will still be in the original order # will still be in the original order
sectors[address_field.sector_num] = None sectors[address_field.sector_num] = None
if not self.verify_address_epilogue_at_point(track): if not self.verify_address_epilogue_at_point(track, track_num, address_field.sector_num):
# verifying the address field epilogue failed, but this is # verifying the address field epilogue failed, but this is
# not necessarily fatal because there might be another copy # not necessarily fatal because there might be another copy
# of this sector later # of this sector later
self.logger.debug("verify_address_epilogue_at_point failed, continuing") #self.g.logger.debug("verify_address_epilogue_at_point failed, continuing")
continue continue
if not self.find_data_prologue(track): if not self.find_data_prologue(track, track_num, address_field.sector_num):
# if we can't find a data field prologue, just give up # if we can't find a data field prologue, just give up
self.logger.debug("find_data_prologue failed, giving up") self.g.logger.debug("find_data_prologue failed, giving up")
break break
# read and decode the data field, and verify the data checksum # read and decode the data field, and verify the data checksum
decoded = self.data_field_at_point(track) decoded = self.data_field_at_point(track, track_num, address_field.sector_num)
if not decoded: if not decoded:
self.logger.debug("data_field_at_point failed, continuing") self.g.logger.debug("data_field_at_point failed, continuing")
# decoding data field failed, but this is not necessarily fatal # decoding data field failed, but this is not necessarily fatal
# because there might be another copy of this sector later # because there might be another copy of this sector later
continue continue
if not self.verify_data_epilogue_at_point(track): if not self.verify_data_epilogue_at_point(track, track_num, address_field.sector_num):
# verifying the data field epilogue failed, but this is # verifying the data field epilogue failed, but this is
# not necessarily fatal because there might be another copy # not necessarily fatal because there might be another copy
# of this sector later # of this sector later
self.logger.debug("verify_data_epilogue_at_point failed") self.g.logger.debug("verify_data_epilogue_at_point failed")
continue continue
# store end index within track (used for .edd -> .woz conversion) # store end index within track (used for .edd -> .woz conversion)
end_bit_index = track.bit_index end_bit_index = track.bit_index
@ -277,7 +279,7 @@ class RWTS:
# all good, and we want to save this sector, so do it # all good, and we want to save this sector, so do it
sectors[address_field.sector_num] = Sector(address_field, decoded, start_bit_index, end_bit_index) sectors[address_field.sector_num] = Sector(address_field, decoded, start_bit_index, end_bit_index)
verified_sectors.append(address_field.sector_num) verified_sectors.append(address_field.sector_num)
self.logger.debug("saved sector %s" % hex(address_field.sector_num)) self.g.logger.debug("saved sector %s" % hex(address_field.sector_num))
# remove placeholders of sectors that we found but couldn't decode properly # remove placeholders of sectors that we found but couldn't decode properly
# (made slightly more difficult by the fact that we're trying to remove # (made slightly more difficult by the fact that we're trying to remove
# elements from an OrderedDict while iterating through the OrderedDict, # elements from an OrderedDict while iterating through the OrderedDict,
@ -292,8 +294,8 @@ class RWTS:
class UniversalRWTS(RWTS): class UniversalRWTS(RWTS):
acceptable_address_prologues = ((0xD4,0xAA,0x96), (0xD5,0xAA,0x96)) acceptable_address_prologues = ((0xD4,0xAA,0x96), (0xD5,0xAA,0x96))
def __init__(self, logger): def __init__(self, g):
RWTS.__init__(self, address_epilogue=[], data_epilogue=[], logger=logger) RWTS.__init__(self, g, address_epilogue=[], data_epilogue=[])
def find_address_prologue(self, track): def find_address_prologue(self, track):
starting_revolutions = track.revolutions starting_revolutions = track.revolutions
@ -304,45 +306,64 @@ class UniversalRWTS(RWTS):
if tuple(seen) in self.acceptable_address_prologues: return True if tuple(seen) in self.acceptable_address_prologues: return True
return False return False
def verify_address_epilogue_at_point(self, track): def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
# return True # return True
if not self.address_epilogue: if not self.address_epilogue:
self.address_epilogue = [next(track.nibble())] self.address_epilogue = [next(track.nibble())]
result = True result = True
else: else:
result = RWTS.verify_address_epilogue_at_point(self, track) result = RWTS.verify_address_epilogue_at_point(self, track, track_num, physical_sector_num)
next(track.nibble()) next(track.nibble())
next(track.nibble()) next(track.nibble())
return result return result
def verify_data_epilogue_at_point(self, track): def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
if not self.data_epilogue: if not self.data_epilogue:
self.data_epilogue = [next(track.nibble())] self.data_epilogue = [next(track.nibble())]
result = True result = True
else: else:
result = RWTS.verify_data_epilogue_at_point(self, track) result = RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num)
next(track.nibble()) next(track.nibble())
next(track.nibble()) next(track.nibble())
return result return result
class UniversalRWTSIgnoreEpilogues(UniversalRWTS): class UniversalRWTSIgnoreEpilogues(UniversalRWTS):
def verify_address_epilogue_at_point(self, track): def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
return True return True
def verify_data_epilogue_at_point(self, track): def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
return True return True
class Track00RWTS(UniversalRWTSIgnoreEpilogues):
def data_field_at_point(self, track, track_num, physical_sector_num):
start_index = track.bit_index
start_revolutions = track.revolutions
decoded = UniversalRWTS.data_field_at_point(self, track, track_num, physical_sector_num)
if not decoded:
# If the sector didn't decode properly, rewind to the
# beginning of the data field before returning to the
# caller. This is for disks with a fake T00,S0A that
# is full of consecutive 0s, where if we consume the bitstream
# as nibbles, we'll end up consuming the next address field
# and it will seem like that sector doesn't exist. And that
# is generally logical sector 2, which is important not to
# miss at this stage because its absence triggers a different
# code path and everything falls apart.
track.bit_index = start_index
track.revolutions = start_revolutions
return decoded
class DOS33RWTS(RWTS): class DOS33RWTS(RWTS):
def __init__(self, logical_sectors, logger): def __init__(self, logical_sectors, g):
self.reset(logical_sectors) self.reset(logical_sectors)
RWTS.__init__(self, RWTS.__init__(self,
g,
sectors_per_track=16, sectors_per_track=16,
address_prologue=self.address_prologue, address_prologue=self.address_prologue,
address_epilogue=self.address_epilogue, address_epilogue=self.address_epilogue,
data_prologue=self.data_prologue, data_prologue=self.data_prologue,
data_epilogue=self.data_epilogue, data_epilogue=self.data_epilogue,
nibble_translate_table=self.nibble_translate_table, nibble_translate_table=self.nibble_translate_table)
logger=logger)
def reset(self, logical_sectors): def reset(self, logical_sectors):
self.address_prologue = (logical_sectors[3][0x55], self.address_prologue = (logical_sectors[3][0x55],
@ -392,14 +413,93 @@ class D5TimingBitRWTS(DOS33RWTS):
track.rewind(1) track.rewind(1)
return False return False
def verify_address_epilogue_at_point(self, track): def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
return True return True
class InfocomRWTS(DOS33RWTS):
def reset(self, logical_sectors):
DOS33RWTS.reset(self, logical_sectors)
self.data_prologue = self.data_prologue[:2]
def find_data_prologue(self, track, track_num, physical_sector_num):
if not DOS33RWTS.find_data_prologue(self, track, track_num, physical_sector_num):
return False
return next(track.nibble()) >= 0xAD
class OptimumResourceRWTS(DOS33RWTS):
def data_field_at_point(self, track, track_num, physical_sector_num):
if (track_num, physical_sector_num) == (0x01, 0x0F):
# TODO actually decode these
disk_nibbles = []
for i in range(343):
disk_nibbles.append(next(track.nibble()))
return bytearray(256) # all zeroes for now
return DOS33RWTS.data_field_at_point(self, track, track_num, physical_sector_num)
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
if (track_num, physical_sector_num) == (0x01, 0x0F):
return True
return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num)
class HeredityDogRWTS(DOS33RWTS):
def data_field_at_point(self, track, track_num, physical_sector_num):
if (track_num, physical_sector_num) == (0x00, 0x0A):
# This sector is fake, full of too many consecutive 0s,
# designed to read differently every time. We go through
# and clean the stray bits, and be careful not to go past
# the end so we don't include the next address prologue.
start_index = track.bit_index
while (track.bit_index < start_index + (343*8)):
if self.nibble_translate_table.get(next(track.nibble()), 0xFF) == 0xFF:
track.bits[track.bit_index-8:track.bit_index] = 0
self.g.found_and_cleaned_weakbits = True
return bytearray(256)
return DOS33RWTS.data_field_at_point(self, track, track_num, physical_sector_num)
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
if (track_num, physical_sector_num) == (0x00, 0x0A):
return True
return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num)
class BECARWTS(DOS33RWTS):
def is_protected_sector(self, track_num, physical_sector_num):
if track_num > 0: return True
return physical_sector_num not in (0x00, 0x0D, 0x0B, 0x09, 0x07, 0x05, 0x03, 0x01, 0x0E, 0x0C)
def reset(self, logical_sectors):
DOS33RWTS.reset(self, logical_sectors)
self.data_prologue = self.data_prologue[:2]
def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
if self.is_protected_sector(track_num, physical_sector_num):
return DOS33RWTS.verify_address_epilogue_at_point(self, track, track_num, physical_sector_num)
return True
def find_data_prologue(self, track, track_num, physical_sector_num):
if not DOS33RWTS.find_data_prologue(self, track, track_num, physical_sector_num):
return False
next(track.nibble())
if self.is_protected_sector(track_num, physical_sector_num):
next(track.bit())
next(track.nibble())
next(track.bit())
next(track.bit())
return True
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
if self.is_protected_sector(track_num, physical_sector_num):
next(track.nibble())
if track_num == 0:
next(track.nibble())
next(track.nibble())
return True
return DOS33RWTS.verify_data_epilogue_at_point(self, track, track_num, physical_sector_num)
class BasePassportProcessor: # base class class BasePassportProcessor: # base class
def __init__(self, disk_image, logger_class=DefaultLogger): def __init__(self, disk_image, logger_class=DefaultLogger):
self.g = PassportGlobals() self.g = PassportGlobals()
self.g.disk_image = disk_image self.g.disk_image = disk_image
self.logger = logger_class(self.g) self.g.logger = logger_class(self.g)
self.rwts = None self.rwts = None
self.output_tracks = {} self.output_tracks = {}
self.patchers = [] self.patchers = []
@ -501,7 +601,7 @@ class BasePassportProcessor: # base class
repeated_nibble_count = 0 repeated_nibble_count = 0
last_nibble = n last_nibble = n
if repeated_nibble_count == 512: if repeated_nibble_count == 512:
self.logger.PrintByID("sync") self.g.logger.PrintByID("sync")
return True return True
# TODO IsUnformatted and other tests # TODO IsUnformatted and other tests
return False return False
@ -677,24 +777,24 @@ class BasePassportProcessor: # base class
def IDBootloader(self, t00): def IDBootloader(self, t00):
"""returns RWTS object that can (hopefully) read the rest of the disk""" """returns RWTS object that can (hopefully) read the rest of the disk"""
temporary_rwts_for_t00 = UniversalRWTSIgnoreEpilogues(self.logger) temporary_rwts_for_t00 = Track00RWTS(self.g)
physical_sectors = temporary_rwts_for_t00.decode_track(t00) physical_sectors = temporary_rwts_for_t00.decode_track(t00, 0)
if 0 not in physical_sectors: if 0 not in physical_sectors:
self.logger.PrintByID("fatal0000") self.g.logger.PrintByID("fatal0000")
return None return None
t00s00 = physical_sectors[0].decoded t00s00 = physical_sectors[0].decoded
if self.IDDOS33(t00s00): if self.IDDOS33(t00s00):
self.g.is_boot0 = True self.g.is_boot0 = True
if self.IDDiversi(t00s00): if self.IDDiversi(t00s00):
self.logger.PrintByID("diversidos") self.g.logger.PrintByID("diversidos")
elif self.IDPronto(t00s00): elif self.IDPronto(t00s00):
self.logger.PrintByID("prontodos") self.g.logger.PrintByID("prontodos")
else: else:
self.logger.PrintByID("dos33boot0") self.g.logger.PrintByID("dos33boot0")
logical_sectors = temporary_rwts_for_t00.reorder_to_logical_sectors(physical_sectors) logical_sectors = temporary_rwts_for_t00.reorder_to_logical_sectors(physical_sectors)
if border.BorderPatcher(self.g).run(logical_sectors, 0): if border.BorderPatcher(self.g).run(logical_sectors, 0):
return BorderRWTS(logical_sectors, self.logger) return BorderRWTS(logical_sectors, self.g)
return self.TraceDOS33(logical_sectors) return self.TraceDOS33(logical_sectors)
# TODO JSR08B3 # TODO JSR08B3
# TODO MECC fastloader # TODO MECC fastloader
@ -750,32 +850,95 @@ class BasePassportProcessor: # base class
if not use_builtin: if not use_builtin:
# check for D5+timingbit RWTS # check for D5+timingbit RWTS
if find.at(0x59, logical_sectors[3], b'\xBD\x8C\xC0\xC9\xD5'): if find.at(0x59, logical_sectors[3], b'\xBD\x8C\xC0\xC9\xD5'):
self.logger.PrintByID("diskrwts") self.g.logger.PrintByID("diskrwts")
return D5TimingBitRWTS(logical_sectors, self.logger) return D5TimingBitRWTS(logical_sectors, self.g)
# TODO handle Milliken here # TODO handle Milliken here
# TODO handle Adventure International here # TODO handle Adventure International here
# TODO handle Infocom here
if not use_builtin and (logical_sectors[0][0xFE] == 0x22):
return InfocomRWTS(logical_sectors, self.g)
if not use_builtin and (find.at(0xF4, logical_sectors[2],
b'\x4C\xCA') or
find.at(0xFE, logical_sectors[2],
b'\x4C\xCA')):
self.g.logger.PrintByID("jmpbeca")
return BECARWTS(logical_sectors, self.g)
if not use_builtin and (find.wild_at(0x5D, logical_sectors[0],
b'\x68'
b'\x85' + find.WILDCARD + \
b'\x68' + \
b'\x85' + find.WILDCARD + \
b'\xA0\x01' + \
b'\xB1' + find.WILDCARD + \
b'\x85\x54')):
self.g.logger.PrintByID("optimum")
return OptimumResourceRWTS(logical_sectors, self.g)
if not use_builtin and (find.wild_at(0x16, logical_sectors[5],
b'\xF0\x05'
b'\xA2\xB2'
b'\x4C\xF0\xBB'
b'\xBD\x8C\xC0'
b'\xA9' + find.WILDCARD + \
b'\x8D\x00\x02'
b'\xBD\x8C\xC0'
b'\x10\xFB'
b'\xC9\xEB'
b'\xD0\xF7'
b'\xBD\x8C\xC0'
b'\x10\xFB'
b'\xC9\xD5'
b'\xD0\xEE'
b'\xBD\x8C\xC0'
b'\x10\xFB'
b'\xC9\xAA'
b'\xD0\xE5'
b'\xA9\x4C'
b'\xA0\x00'
b'\x99\x00\x95'
b'\x88'
b'\xD0\xFA'
b'\xCE\x46\xBB'
b'\xAD\x46\xBB'
b'\xC9\x07'
b'\xD0\xEC'
b'\xA9\x18'
b'\x8D\x42\xB9'
b'\xA9\x0A'
b'\x8D\xED\xB7'
b'\xD0\x05')):
self.g.logger.PrintByID("bb00")
if find.at(0x04, logical_sectors[5],
b'\xBD\x8D\xC0'
b'\xBD\x8E\xC0'
b'\x30\x05'
b'\xA2\xB1'
b'\x4C\xF0\xBB'):
self.g.protection_enforces_write_protected = True
return HeredityDogRWTS(logical_sectors, self.g)
if use_builtin: if use_builtin:
return self.StartWithUniv() return self.StartWithUniv()
self.logger.PrintByID("diskrwts") self.g.logger.PrintByID("diskrwts")
return DOS33RWTS(logical_sectors, self.logger) return DOS33RWTS(logical_sectors, self.g)
def StartWithUniv(self): def StartWithUniv(self):
"""return Universal RWTS object, log that we're using it, and set global flags appropriately""" """return Universal RWTS object, log that we're using it, and set global flags appropriately"""
self.logger.PrintByID("builtin") self.g.logger.PrintByID("builtin")
self.g.tried_univ = True self.g.tried_univ = True
self.g.is_protdos = False self.g.is_protdos = False
return UniversalRWTS(self.logger) return UniversalRWTS(self.g)
def preprocess(self): def preprocess(self):
return True return True
def run(self): def run(self):
self.logger.PrintByID("header") self.g.logger.PrintByID("header")
self.logger.PrintByID("reading", {"filename":self.g.disk_image.filename}) self.g.logger.PrintByID("reading", {"filename":self.g.disk_image.filename})
# get all raw track data from the source disk # get all raw track data from the source disk
self.tracks = {} self.tracks = {}
@ -794,15 +957,15 @@ class BasePassportProcessor: # base class
for track_num in range(0x22, -1, -1): for track_num in range(0x22, -1, -1):
self.g.track = track_num self.g.track = track_num
self.rwts.seek(track_num) self.rwts.seek(track_num)
self.logger.debug("Seeking to track %s" % hex(self.g.track)) self.g.logger.debug("Seeking to track %s" % hex(self.g.track))
try_again = True try_again = True
while try_again: while try_again:
try_again = False try_again = False
physical_sectors = self.rwts.decode_track(self.tracks[track_num], self.burn) physical_sectors = self.rwts.decode_track(self.tracks[track_num], track_num, self.burn)
if len(physical_sectors) == self.rwts.sectors_per_track: if len(physical_sectors) == self.rwts.sectors_per_track:
continue continue
else: else:
self.logger.debug("found %d sectors" % len(physical_sectors)) self.g.logger.debug("found %d sectors" % len(physical_sectors))
if (0x0F not in physical_sectors) and self.SkipTrack(track_num, self.tracks[track_num]): if (0x0F not in physical_sectors) and self.SkipTrack(track_num, self.tracks[track_num]):
physical_sectors = None physical_sectors = None
continue continue
@ -810,16 +973,16 @@ class BasePassportProcessor: # base class
# Need to save the sectors that worked with the original RWTS # Need to save the sectors that worked with the original RWTS
# then append the ones that worked with the universal RWTS # then append the ones that worked with the universal RWTS
if not self.g.tried_univ: if not self.g.tried_univ:
self.logger.PrintByID("switch", {"sector":0x0F}) # TODO find exact sector self.g.logger.PrintByID("switch", {"sector":0x0F}) # TODO find exact sector
self.rwts = UniversalRWTS(self.logger) self.rwts = UniversalRWTS(self.g)
self.g.tried_univ = True self.g.tried_univ = True
try_again = True try_again = True
continue continue
if track_num == 0 and type(self.rwts) != UniversalRWTSIgnoreEpilogues: if track_num == 0 and type(self.rwts) != UniversalRWTSIgnoreEpilogues:
self.rwts = UniversalRWTSIgnoreEpilogues(self.logger) self.rwts = UniversalRWTSIgnoreEpilogues(self.g)
try_again = True try_again = True
continue continue
self.logger.PrintByID("fail") self.g.logger.PrintByID("fail")
return False return False
self.save_track(track_num, physical_sectors) self.save_track(track_num, physical_sectors)
return True return True
@ -877,10 +1040,10 @@ class Verify(BasePassportProcessor):
def apply_patches(self, logical_sectors, patches): def apply_patches(self, logical_sectors, patches):
for patch in patches: for patch in patches:
if patch.id: if patch.id:
self.logger.PrintByID(patch.id, patch.params) self.g.logger.PrintByID(patch.id, patch.params)
def postprocess(self): def postprocess(self):
self.logger.PrintByID("passver") self.g.logger.PrintByID("passver")
class Crack(Verify): class Crack(Verify):
def save_track(self, track_num, physical_sectors): def save_track(self, track_num, physical_sectors):
@ -889,12 +1052,12 @@ class Crack(Verify):
def apply_patches(self, logical_sectors, patches): def apply_patches(self, logical_sectors, patches):
for patch in patches: for patch in patches:
if patch.id: if patch.id:
self.logger.PrintByID(patch.id, patch.params) self.g.logger.PrintByID(patch.id, patch.params)
if len(patch.new_value) > 0: if len(patch.new_value) > 0:
b = logical_sectors[patch.sector_num].decoded b = logical_sectors[patch.sector_num].decoded
patch.params["old_value"] = b[patch.byte_offset:patch.byte_offset+len(patch.new_value)] patch.params["old_value"] = b[patch.byte_offset:patch.byte_offset+len(patch.new_value)]
patch.params["new_value"] = patch.new_value patch.params["new_value"] = patch.new_value
self.logger.PrintByID("modify", patch.params) self.g.logger.PrintByID("modify", patch.params)
for i in range(len(patch.new_value)): for i in range(len(patch.new_value)):
b[patch.byte_offset + i] = patch.new_value[i] b[patch.byte_offset + i] = patch.new_value[i]
logical_sectors[patch.sector_num].decoded = b logical_sectors[patch.sector_num].decoded = b
@ -902,7 +1065,7 @@ class Crack(Verify):
def postprocess(self): def postprocess(self):
source_base, source_ext = os.path.splitext(self.g.disk_image.filename) source_base, source_ext = os.path.splitext(self.g.disk_image.filename)
output_filename = source_base + '.dsk' output_filename = source_base + '.dsk'
self.logger.PrintByID("writing", {"filename":output_filename}) self.g.logger.PrintByID("writing", {"filename":output_filename})
with open(output_filename, "wb") as f: with open(output_filename, "wb") as f:
for track_num in range(0x23): for track_num in range(0x23):
if track_num in self.output_tracks: if track_num in self.output_tracks:
@ -910,9 +1073,9 @@ class Crack(Verify):
else: else:
f.write(bytes(256*16)) f.write(bytes(256*16))
if self.patches_found: if self.patches_found:
self.logger.PrintByID("passcrack") self.g.logger.PrintByID("passcrack")
else: else:
self.logger.PrintByID("passcrack0") self.g.logger.PrintByID("passcrack0")
class EDDToWoz(BasePassportProcessor): class EDDToWoz(BasePassportProcessor):
def preprocess(self): def preprocess(self):
@ -934,8 +1097,10 @@ class EDDToWoz(BasePassportProcessor):
def postprocess(self): def postprocess(self):
source_base, source_ext = os.path.splitext(self.g.disk_image.filename) source_base, source_ext = os.path.splitext(self.g.disk_image.filename)
output_filename = source_base + '.woz' output_filename = source_base + '.woz'
self.logger.PrintByID("writing", {"filename":output_filename}) self.g.logger.PrintByID("writing", {"filename":output_filename})
woz_image = wozimage.WozWriter(STRINGS["header"].strip()) woz_image = wozimage.WozWriter(STRINGS["header"].strip())
woz_image.info["cleaned"] = self.g.found_and_cleaned_weakbits
woz_image.info["write_protected"] = self.g.protection_enforces_write_protected
woz_image.meta["image_date"] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()) woz_image.meta["image_date"] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime())
for q in range(1 + (0x23 * 4)): for q in range(1 + (0x23 * 4)):
track_num = q / 4 track_num = q / 4

View File

@ -1,5 +1,5 @@
STRINGS = { STRINGS = {
"header": "Passport.py by 4am (2018-05-29)\n", # max 32 characters "header": "Passport.py by 4am (2018-06-06)\n", # max 32 characters
"reading": "Reading from {filename}\n", "reading": "Reading from {filename}\n",
"diskrwts": "Using disk's own RWTS\n", "diskrwts": "Using disk's own RWTS\n",
"bb00": "T00,S05 Found $BB00 protection check\n" "bb00": "T00,S05 Found $BB00 protection check\n"

View File

@ -2,35 +2,35 @@
# (c) 2018 by 4am # (c) 2018 by 4am
# MIT-licensed # MIT-licensed
# portions from MIT-licensed defedd.py (c) 2014 by Paul Hagstrom
import argparse import argparse
import binascii import binascii
import bitarray # https://pypi.org/project/bitarray/ import bitarray # https://pypi.org/project/bitarray/
import collections import collections
import itertools import itertools
import os
__version__ = "0.1" __version__ = "0.2"
__date__ = "2018-05-31" __date__ = "2018-06-05"
__progname__ = "wozardry" __progname__ = "wozardry"
__displayname__ = __progname__ + " " + __version__ + " by 4am (" + __date__ + ")" __displayname__ = __progname__ + " " + __version__ + " by 4am (" + __date__ + ")"
# domain-specific constants defined in .woz specification # domain-specific constants defined in .woz specification
kWOZ1 = b'WOZ1' kWOZ1 = b"WOZ1"
kINFO = b'INFO' kINFO = b"INFO"
kTMAP = b'TMAP' kTMAP = b"TMAP"
kTRKS = b'TRKS' kTRKS = b"TRKS"
kMETA = b'META' kMETA = b"META"
kBitstreamLengthInBytes = 6646 kBitstreamLengthInBytes = 6646
kLanguages = ('English','Spanish','French','German','Chinese','Japanese','Italian','Dutch','Portugese','Danish','Finnish','Norwegian','Swedish','Russian','Polish','Turkish','Arabic','Thai','Czech','Hungarian','Catalan','Croatian','Greek','Hebrew','Romanian','Slovak','Ukranian','Indonesian','Malay','Vietnamese','Other') kLanguages = ("English","Spanish","French","German","Chinese","Japanese","Italian","Dutch","Portugese","Danish","Finnish","Norwegian","Swedish","Russian","Polish","Turkish","Arabic","Thai","Czech","Hungarian","Catalan","Croatian","Greek","Hebrew","Romanian","Slovak","Ukranian","Indonesian","Malay","Vietnamese","Other")
kRequiresRAM = ('16K','24K','32K','48K','64K','128K','256K','512K','768K','1M','1.25M','1.5M+','Unknown') kRequiresRAM = ("16K","24K","32K","48K","64K","128K","256K","512K","768K","1M","1.25M","1.5M+","Unknown")
kRequiresMachine = ('2','2+','2e','2c','2e+','2gs','2c+','3','3+') kRequiresMachine = ("2","2+","2e","2c","2e+","2gs","2c+","3","3+")
# strings and things, for print routines and error messages # strings and things, for print routines and error messages
sEOF = "Unexpected EOF" sEOF = "Unexpected EOF"
sBadChunkSize = "Bad chunk size" sBadChunkSize = "Bad chunk size"
dNoYes = {False:'no',True:'yes'} dNoYes = {False:"no",True:"yes"}
tQuarters = ('.00','.25','.50','.75') tQuarters = (".00",".25",".50",".75")
# errors that may be raised # errors that may be raised
class WozError(Exception): pass # base class class WozError(Exception): pass # base class
@ -47,11 +47,13 @@ class WozINFOFormatError_BadDiskType(WozINFOFormatError): pass
class WozINFOFormatError_BadWriteProtected(WozINFOFormatError): pass class WozINFOFormatError_BadWriteProtected(WozINFOFormatError): pass
class WozINFOFormatError_BadSynchronized(WozINFOFormatError): pass class WozINFOFormatError_BadSynchronized(WozINFOFormatError): pass
class WozINFOFormatError_BadCleaned(WozINFOFormatError): pass class WozINFOFormatError_BadCleaned(WozINFOFormatError): pass
class WozINFOFormatError_BadCreator(WozINFOFormatError): pass
class WozTMAPFormatError(WozFormatError): pass class WozTMAPFormatError(WozFormatError): pass
class WozTMAPFormatError_BadTRKS(WozTMAPFormatError): pass class WozTMAPFormatError_BadTRKS(WozTMAPFormatError): pass
class WozTRKSFormatError(WozFormatError): pass class WozTRKSFormatError(WozFormatError): pass
class WozMETAFormatError(WozFormatError): pass class WozMETAFormatError(WozFormatError): pass
class WozMETAFormatError_DuplicateKey(WozFormatError): pass class WozMETAFormatError_DuplicateKey(WozFormatError): pass
class WozMETAFormatError_BadValue(WozFormatError): pass
class WozMETAFormatError_BadLanguage(WozFormatError): pass class WozMETAFormatError_BadLanguage(WozFormatError): pass
class WozMETAFormatError_BadRAM(WozFormatError): pass class WozMETAFormatError_BadRAM(WozFormatError): pass
class WozMETAFormatError_BadMachine(WozFormatError): pass class WozMETAFormatError_BadMachine(WozFormatError): pass
@ -131,14 +133,70 @@ class DiskImage: # base class
"""returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)"""
return None return None
class WozReader(DiskImage): class WozValidator:
def validate_info_version(self, version):
raise_if(version != b'\x01', WozINFOFormatError_BadVersion, "Unknown version (expected 1, found %s)" % version)
def validate_info_disk_type(self, disk_type):
raise_if(disk_type not in (b'\x01',b'\x02'), WozINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %s)" % disk_type)
def validate_info_write_protected(self, write_protected):
raise_if(write_protected not in (b'\x00',b'\x01'), WozINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %s)" % write_protected)
def validate_info_synchronized(self, synchronized):
raise_if(synchronized not in (b'\x00',b'\x01'), WozINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %s)" % synchronized)
def validate_info_cleaned(self, cleaned):
raise_if(cleaned not in (b'\x00',b'\x01'), WozINFOFormatError_BadCleaned, "Unknown cleaned flag (expected 0 or 1, found %s)" % cleaned)
def validate_info_creator(self, creator_as_bytes):
raise_if(len(creator_as_bytes) > 32, WozINFOFormatError_BadCreator, "Creator is longer than 32 bytes")
try:
creator_as_bytes.decode("UTF-8")
except:
raise_if(True, WozINFOFormatError_BadCreator, "Creator is not valid UTF-8")
def encode_info_creator(self, creator_as_string):
creator_as_bytes = creator_as_string.encode("UTF-8").ljust(32, b" ")
self.validate_info_creator(creator_as_bytes)
return creator_as_bytes
def decode_info_creator(self, creator_as_bytes):
self.validate_info_creator(creator_as_bytes)
return creator_as_bytes.decode("UTF-8").strip()
def validate_metadata(self, metadata_as_bytes):
try:
metadata = metadata_as_bytes.decode("UTF-8")
except:
raise WozMETAFormatError("Metadata is not valid UTF-8")
def decode_metadata(self, metadata_as_bytes):
self.validate_metadata(metadata_as_bytes)
return metadata_as_bytes.decode("UTF-8")
def validate_metadata_value(self, value):
raise_if("\t" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains tab character)")
raise_if("\n" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains linefeed character)")
raise_if("|" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains pipe character)")
def validate_metadata_language(self, language):
raise_if(language and (language not in kLanguages), WozMETAFormatError_BadLanguage, "Invalid metadata language")
def validate_metadata_requires_ram(self, requires_ram):
raise_if(requires_ram and (requires_ram not in kRequiresRAM), WozMETAFormatError_BadRAM, "Invalid metadata requires_ram")
def validate_metadata_requires_machine(self, requires_machine):
raise_if(requires_machine and (requires_machine not in kRequiresMachine), WozMETAFormatError_BadMachine, "Invalid metadata requires_machine")
class WozReader(DiskImage, WozValidator):
def __init__(self, filename=None, stream=None): def __init__(self, filename=None, stream=None):
DiskImage.__init__(self, filename, stream) DiskImage.__init__(self, filename, stream)
self.tmap = None self.tmap = None
self.info = collections.OrderedDict() self.info = collections.OrderedDict()
self.meta = collections.OrderedDict() self.meta = collections.OrderedDict()
with stream or open(filename, 'rb') as f: with stream or open(filename, "rb") as f:
header_raw = f.read(8) header_raw = f.read(8)
raise_if(len(header_raw) != 8, WozEOFError, sEOF) raise_if(len(header_raw) != 8, WozEOFError, sEOF)
self.__process_header(header_raw) self.__process_header(header_raw)
@ -169,28 +227,25 @@ class WozReader(DiskImage):
elif chunk_id == kMETA: elif chunk_id == kMETA:
self.__process_meta(data) self.__process_meta(data)
if crc: if crc:
raise_if(crc != binascii.crc32(b''.join(all_data)) & 0xffffffff, WozCRCError, "Bad CRC") raise_if(crc != binascii.crc32(b"".join(all_data)) & 0xffffffff, WozCRCError, "Bad CRC")
def __process_header(self, data): def __process_header(self, data):
raise_if(data[:4] != kWOZ1, WozHeaderError_NoWOZ1, "Magic string 'WOZ1' not present at offset 0") raise_if(data[:4] != kWOZ1, WozHeaderError_NoWOZ1, "Magic string 'WOZ1' not present at offset 0")
raise_if(data[4] != 0xFF, WozHeaderError_NoFF, "Magic byte 0xFF not present at offset 4") raise_if(data[4] != 0xFF, WozHeaderError_NoFF, "Magic byte 0xFF not present at offset 4")
raise_if(data[5:8] != b'\x0A\x0D\x0A', WozHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5") raise_if(data[5:8] != b"\x0A\x0D\x0A", WozHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5")
def __process_info(self, data): def __process_info(self, data):
version = data[0] version = data[0]
raise_if(version != 1, WozINFOFormatError_BadVersion, "Unknown version (expected 1, found %d)" % version) self.validate_info_version(to_uint8(version))
disk_type = data[1] disk_type = data[1]
raise_if(disk_type not in (1,2), WozINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %d)" % disk_type) self.validate_info_disk_type(to_uint8(disk_type))
write_protected = data[2] write_protected = data[2]
raise_if(write_protected not in (0,1), WozINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %d)" % write_protected) self.validate_info_write_protected(to_uint8(write_protected))
synchronized = data[3] synchronized = data[3]
raise_if(synchronized not in (0,1), WozINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %d)" % synchronized) self.validate_info_synchronized(to_uint8(synchronized))
cleaned = data[4] cleaned = data[4]
raise_if(cleaned not in (0,1), WozINFOFormatError_BadCleaned, "Unknown cleaned flag (expected 0 or 1, found %d)" % cleaned) self.validate_info_cleaned(to_uint8(cleaned))
try: creator = self.decode_info_creator(data[5:37])
creator = data[5:37].decode('UTF-8')
except:
raise WOZINFOFormatError("Creator is not valid UTF-8")
self.info["version"] = version # int self.info["version"] = version # int
self.info["disk_type"] = disk_type # int self.info["disk_type"] = disk_type # int
self.info["write_protected"] = (write_protected == 1) # boolean self.info["write_protected"] = (write_protected == 1) # boolean
@ -234,27 +289,21 @@ class WozReader(DiskImage):
for trk, i in zip(self.tmap, itertools.count()): for trk, i in zip(self.tmap, itertools.count()):
raise_if(trk != 0xFF and trk >= len(self.tracks), WozTMAPFormatError_BadTRKS, "Invalid TMAP entry: track %d%s points to non-existent TRKS chunk %d" % (i/4, tQuarters[i%4], trk)) raise_if(trk != 0xFF and trk >= len(self.tracks), WozTMAPFormatError_BadTRKS, "Invalid TMAP entry: track %d%s points to non-existent TRKS chunk %d" % (i/4, tQuarters[i%4], trk))
def __process_meta(self, data): def __process_meta(self, metadata_as_bytes):
try: metadata = self.decode_metadata(metadata_as_bytes)
metadata = data.decode('UTF-8') for line in metadata.split("\n"):
except:
raise WozMETAFormatError("Metadata is not valid UTF-8")
for line in metadata.split('\n'):
if not line: continue if not line: continue
columns_raw = line.split('\t') columns_raw = line.split("\t")
raise_if(len(columns_raw) != 2, WozMETAFormatError, "Malformed metadata") raise_if(len(columns_raw) != 2, WozMETAFormatError, "Malformed metadata")
key, value_raw = columns_raw key, value_raw = columns_raw
raise_if(key in self.meta, WozMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key) raise_if(key in self.meta, WozMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key)
values = value_raw.split("|") values = value_raw.split("|")
if key == "language": if key == "language":
for value in values: list(map(self.validate_metadata_language, values))
raise_if(value and (value not in kLanguages), WozMETAFormatError_BadLanguage, "Invalid metadata language")
elif key == "requires_ram": elif key == "requires_ram":
for value in values: list(map(self.validate_metadata_requires_ram, values))
raise_if(value and (value not in kRequiresRAM), WozMETAFormatError_BadRAM, "Invalid metadata requires_ram")
elif key == "requires_machine": elif key == "requires_machine":
for value in values: list(map(self.validate_metadata_requires_machine, values))
raise_if(value and (value not in kRequiresMachine), WozMETAFormatError_BadMachine, "Invalid metadata requires_machine")
self.meta[key] = len(values) == 1 and values[0] or tuple(values) self.meta[key] = len(values) == 1 and values[0] or tuple(values)
def seek(self, track_num): def seek(self, track_num):
@ -269,7 +318,7 @@ class WozReader(DiskImage):
if trk_id == 0xFF: return None if trk_id == 0xFF: return None
return self.tracks[trk_id] return self.tracks[trk_id]
class WozWriter: class WozWriter(WozValidator):
def __init__(self, creator): def __init__(self, creator):
self.info = collections.OrderedDict() self.info = collections.OrderedDict()
self.info["version"] = 1 self.info["version"] = 1
@ -296,13 +345,24 @@ class WozWriter:
chunk = bytearray() chunk = bytearray()
chunk.extend(kINFO) # chunk ID chunk.extend(kINFO) # chunk ID
chunk.extend(to_uint32(60)) # chunk size (constant) chunk.extend(to_uint32(60)) # chunk size (constant)
chunk.extend(to_uint8(self.info["version"])) # version (int, probably 1) version_raw = to_uint8(self.info["version"])
chunk.extend(to_uint8(self.info["disk_type"])) # disk type (1=5.25 inch, 2=3.5 inch) self.validate_info_version(version_raw)
chunk.extend(to_uint8(self.info["write_protected"])) # write-protected (0=no, 1=yes) disk_type_raw = to_uint8(self.info["disk_type"])
chunk.extend(to_uint8(self.info["synchronized"])) # tracks synchronized (0=no, 1=yes) self.validate_info_disk_type(disk_type_raw)
chunk.extend(to_uint8(self.info["cleaned"])) # weakbits cleaned (0=no, 1=yes) write_protected_raw = to_uint8(self.info["write_protected"])
chunk.extend(self.info["creator"].encode("UTF-8").ljust(32, b" ")) # creator self.validate_info_write_protected(write_protected_raw)
chunk.extend(b'\x00' * 23) # reserved synchronized_raw = to_uint8(self.info["synchronized"])
self.validate_info_synchronized(synchronized_raw)
cleaned_raw = to_uint8(self.info["cleaned"])
self.validate_info_cleaned(cleaned_raw)
creator_raw = self.encode_info_creator(self.info["creator"])
chunk.extend(version_raw) # version (int, probably 1)
chunk.extend(disk_type_raw) # disk type (1=5.25 inch, 2=3.5 inch)
chunk.extend(write_protected_raw) # write-protected (0=no, 1=yes)
chunk.extend(synchronized_raw) # tracks synchronized (0=no, 1=yes)
chunk.extend(cleaned_raw) # weakbits cleaned (0=no, 1=yes)
chunk.extend(creator_raw) # creator
chunk.extend(b"\x00" * 23) # reserved
return chunk return chunk
def build_tmap(self): def build_tmap(self):
@ -320,20 +380,30 @@ class WozWriter:
for track in self.tracks: for track in self.tracks:
raw_bytes = track.bits.tobytes() raw_bytes = track.bits.tobytes()
chunk.extend(raw_bytes) # bitstream as raw bytes chunk.extend(raw_bytes) # bitstream as raw bytes
chunk.extend(b'\x00' * (6646 - len(raw_bytes))) # padding to 6646 bytes chunk.extend(b"\x00" * (6646 - len(raw_bytes))) # padding to 6646 bytes
chunk.extend(to_uint16(len(raw_bytes))) # bytes used chunk.extend(to_uint16(len(raw_bytes))) # bytes used
chunk.extend(to_uint16(track.bit_count)) # bit count chunk.extend(to_uint16(track.bit_count)) # bit count
chunk.extend(b'\xFF\xFF') # splice point (none) chunk.extend(b"\xFF\xFF") # splice point (none)
chunk.extend(b'\xFF') # splice nibble (none) chunk.extend(b"\xFF") # splice nibble (none)
chunk.extend(b'\xFF') # splice bit count (none) chunk.extend(b"\xFF") # splice bit count (none)
chunk.extend(b'\x00\x00') # reserved chunk.extend(b"\x00\x00") # reserved
return chunk return chunk
def build_meta(self): def build_meta(self):
if not self.meta: return b'' if not self.meta: return b""
data = b'\x0A'.join( for key, value_raw in self.meta.items():
if type(value_raw) == str:
values = [value_raw]
list(map(self.validate_metadata_value, values))
if key == "language":
list(map(self.validate_metadata_language, values))
elif key == "requires_ram":
list(map(self.validate_metadata_requires_ram, values))
elif key == "requires_machine":
list(map(self.validate_metadata_requires_machine, values))
data = b"\x0A".join(
[k.encode("UTF-8") + \ [k.encode("UTF-8") + \
b'\x09' + \ b"\x09" + \
(type(v) in (list,tuple) and "|".join(v) or v).encode("UTF-8") \ (type(v) in (list,tuple) and "|".join(v) or v).encode("UTF-8") \
for k, v in self.meta.items()]) for k, v in self.meta.items()])
chunk = bytearray() chunk = bytearray()
@ -345,7 +415,7 @@ class WozWriter:
def build_head(self, crc): def build_head(self, crc):
chunk = bytearray() chunk = bytearray()
chunk.extend(kWOZ1) # magic bytes chunk.extend(kWOZ1) # magic bytes
chunk.extend(b'\xFF\x0A\x0D\x0A') # more magic bytes chunk.extend(b"\xFF\x0A\x0D\x0A") # more magic bytes
chunk.extend(to_uint32(crc)) # CRC32 of rest of file (calculated in caller) chunk.extend(to_uint32(crc)) # CRC32 of rest of file (calculated in caller)
return chunk return chunk
@ -443,7 +513,8 @@ class CommandEdit(BaseCommand):
help="""change information field. help="""change information field.
INFO format is "key:value". INFO format is "key:value".
Acceptable keys are disk_type, write_protected, synchronized, cleaned, creator, version. Acceptable keys are disk_type, write_protected, synchronized, cleaned, creator, version.
Other keys are ignored.""") Other keys are ignored.
For boolean fields, use "1" or "true" or "yes" for true, "0" or "false" or "no" for false.""")
self.parser.add_argument("-m", "--meta", type=str, action="append", self.parser.add_argument("-m", "--meta", type=str, action="append",
help="""change metadata field. help="""change metadata field.
META format is "key:value". META format is "key:value".
@ -461,6 +532,8 @@ requires_machine, notes, side, side_name, contributor, image_date. Other keys ar
# add all new info fields # add all new info fields
for i in args.info or (): for i in args.info or ():
k, v = i.split(":", 1) k, v = i.split(":", 1)
if k in ("write_protected","synchronized","cleaned"):
v = v.lower() in ("1", "true", "yes")
output.info[k] = v output.info[k] = v
# add all new metadata fields # add all new metadata fields
for m in args.meta or (): for m in args.meta or ():
@ -472,15 +545,19 @@ requires_machine, notes, side, side_name, contributor, image_date. Other keys ar
output.meta[k] = v output.meta[k] = v
elif k in output.meta.keys(): elif k in output.meta.keys():
del output.meta[k] del output.meta[k]
with open(args.file, 'wb') as f: tmpfile = args.file + ".ardry"
with open(tmpfile, "wb") as f:
output.write(f) output.write(f)
os.rename(tmpfile, args.file)
if __name__ == "__main__": if __name__ == "__main__":
import sys
raise_if = lambda cond, e, s="": cond and sys.exit("%s: %s" % (e.__name__, s))
cmds = [CommandDump(), CommandVerify(), CommandEdit()] cmds = [CommandDump(), CommandVerify(), CommandEdit()]
parser = argparse.ArgumentParser(prog=__progname__, parser = argparse.ArgumentParser(prog=__progname__,
description="""A multi-purpose tool for manipulating .woz disk images. description="""A multi-purpose tool for manipulating .woz disk images.
See '" + __progname__ + " <command> -h' for help on individual commands.""", See '""" + __progname__ + """ <command> -h' for help on individual commands.""",
formatter_class=argparse.RawDescriptionHelpFormatter) formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("-v", "--version", action="version", version=__displayname__) parser.add_argument("-v", "--version", action="version", version=__displayname__)
sp = parser.add_subparsers(dest="command", help="command") sp = parser.add_subparsers(dest="command", help="command")