mirror of
https://github.com/a2-4am/wozardry.git
synced 2024-12-27 06:29:20 +00:00
add E7 detection, refactor, add some comments
This commit is contained in:
parent
cfadf05a96
commit
82352e50f9
286
moofimage.py
286
moofimage.py
@ -10,12 +10,6 @@ import sys
|
||||
def myhex(b):
|
||||
return hex(b)[2:].rjust(2, "0").upper()
|
||||
|
||||
class MoofDiskImage(wozardry.WozDiskImage):
|
||||
def __init__(self, iostream=None):
|
||||
wozardry.WozDiskImage.__init__(self, iostream)
|
||||
for i,t in zip(range(len(self.tracks)), self.tracks):
|
||||
self.tracks[i] = MoofTrack(t.raw_bytes, t.raw_count)
|
||||
|
||||
class MoofTrack(wozardry.Track):
|
||||
def __init__(self, raw_bytes, raw_count):
|
||||
wozardry.Track.__init__(self, raw_bytes, raw_count)
|
||||
@ -26,30 +20,31 @@ class MoofTrack(wozardry.Track):
|
||||
while len(self.bits) > raw_count:
|
||||
self.bits.pop()
|
||||
|
||||
def bit(self):
|
||||
b = self.bits[self.bit_index] and 1 or 0
|
||||
self.bit_index += 1
|
||||
if self.bit_index >= self.raw_count:
|
||||
self.bit_index = 0
|
||||
def rewind(self, bit_count=1):
|
||||
self.bit_index -= bit_count
|
||||
while self.bit_index < 0:
|
||||
self.bit_index += self.raw_count
|
||||
self.revolutions -= 1
|
||||
|
||||
def forward(self, bit_count=1):
|
||||
self.bit_index += bit_count
|
||||
while self.bit_index >= self.raw_count:
|
||||
self.bit_index -= self.raw_count
|
||||
self.revolutions += 1
|
||||
|
||||
def bit(self):
|
||||
b = self.bits[self.bit_index]
|
||||
self.forward()
|
||||
yield b
|
||||
|
||||
def nibble(self):
|
||||
b = 0
|
||||
while b == 0:
|
||||
b = next(self.bit())
|
||||
n = 0x80
|
||||
while not next(self.bit()): pass
|
||||
n = 0b10000000
|
||||
for bit_index in range(6, -1, -1):
|
||||
b = next(self.bit())
|
||||
n += b << bit_index
|
||||
n |= b << bit_index
|
||||
yield n
|
||||
|
||||
def rewind(self, bit_count=1):
|
||||
self.bit_index -= bit_count
|
||||
if self.bit_index < 0:
|
||||
self.bit_index = self.raw_count - 1
|
||||
self.revolutions -= 1
|
||||
|
||||
def find(self, sequence):
|
||||
starting_revolutions = self.revolutions
|
||||
seen = [0] * len(sequence)
|
||||
@ -76,21 +71,23 @@ class MoofTrack(wozardry.Track):
|
||||
return False
|
||||
|
||||
class MoofAddressField:
|
||||
def __init__(self, volume, track_id, sector_id, valid):
|
||||
def __init__(self, valid, volume, track_id, sector_id):
|
||||
self.valid = valid
|
||||
self.volume = volume
|
||||
self.track_id = track_id
|
||||
self.sector_id = sector_id
|
||||
|
||||
class MoofDataField:
|
||||
def __init__(self, valid, sector_id, tags, data):
|
||||
self.valid = valid
|
||||
self.sector_id = sector_id
|
||||
self.tags = tags
|
||||
self.data = data
|
||||
|
||||
class MoofBlock:
|
||||
def __init__(self, address_field, decoded, start_bit_index=None, end_bit_index=None):
|
||||
def __init__(self, address_field, data_field):
|
||||
self.address_field = address_field
|
||||
self.decoded = decoded
|
||||
self.start_bit_index = start_bit_index
|
||||
self.end_bit_index = end_bit_index
|
||||
|
||||
def __getitem__(self, i):
|
||||
return self.decoded[i]
|
||||
self.data_field = data_field
|
||||
|
||||
class MoofRWTS:
|
||||
kDefaultAddressPrologue16 = (0xD5, 0xAA, 0x96)
|
||||
@ -119,7 +116,7 @@ class MoofRWTS:
|
||||
self.data_prologue = data_prologue
|
||||
self.data_epilogue = data_epilogue
|
||||
self.nibble_translate_table = nibble_translate_table
|
||||
self.expected_sectors_per_track = dict(zip(range(0xA0), (i for i in range(0x0C,0x07,-1) for j in range(0x20))))
|
||||
self.sectors_per_track = dict(zip(range(0xA0), (i for i in range(0x0C,0x07,-1) for j in range(0x20))))
|
||||
|
||||
def _(self, track):
|
||||
return self.nibble_translate_table[next(track.nibble())]
|
||||
@ -133,8 +130,9 @@ class MoofRWTS:
|
||||
h2 = self._(track)
|
||||
volume = self._(track)
|
||||
checksum = self._(track)
|
||||
valid = h0 ^ sector_id ^ h2 ^ volume == checksum
|
||||
track_id = (h0 << 1) | ((h2 & 0b00000001) << 7) | ((h2 & 0b00100000) >> 5)
|
||||
return MoofAddressField(volume, track_id, sector_id, h0 ^ sector_id ^ h2 ^ volume == checksum)
|
||||
return MoofAddressField(valid, volume, track_id, sector_id)
|
||||
|
||||
def verify_nibbles_at_point(self, track, nibbles):
|
||||
found = []
|
||||
@ -192,8 +190,8 @@ class MoofRWTS:
|
||||
for n in nibble_groups))
|
||||
|
||||
# decode 524 bytes (12 tag bytes + 512 data bytes)
|
||||
tag_bytes = [next(gcr_byte) for i in range(12)]
|
||||
decoded_bytes = [next(gcr_byte) for i in range(512)]
|
||||
tags = [next(gcr_byte) for i in range(12)]
|
||||
data = [next(gcr_byte) for i in range(512)]
|
||||
|
||||
# validate checksums against last data field nibble and three epilogue nibbles
|
||||
valid = nibble_groups[-1][-1] == ((c1 & 0b11000000) >> 6) | ((c2 & 0b11000000) >> 4) | ((c3 & 0b11000000) >> 2)
|
||||
@ -201,16 +199,41 @@ class MoofRWTS:
|
||||
valid &= self._(track) == (c2 & 0b00111111)
|
||||
valid &= self._(track) == (c1 & 0b00111111)
|
||||
|
||||
return valid, sector_id, tag_bytes, decoded_bytes
|
||||
return MoofDataField(valid, sector_id, tags, data)
|
||||
|
||||
def verify_data_epilogue_at_point(self, track):
|
||||
return self.verify_nibbles_at_point(track, self.data_epilogue)
|
||||
|
||||
def get_pace_key_at_point(rwts, track, bit_index):
|
||||
class DefaultLogger:
|
||||
def warn(self, message, T=None, S=None, X=None, Y=None):
|
||||
if T: T = myhex(T)
|
||||
if S: S = myhex(S)
|
||||
message = message.format(**locals())
|
||||
sys.stderr.write(message)
|
||||
sys.stderr.write('\n')
|
||||
info=warn
|
||||
error=warn
|
||||
|
||||
class MoofDiskImage(wozardry.WozDiskImage):
|
||||
kE7Bytestream = (0x2B, 0x00, 0x2B, 0xFD, 0x83, 0x6F, 0x20, 0xE2,
|
||||
0x8D, 0x99, 0x49, 0x44, 0x47, 0x82, 0xD9, 0x26,
|
||||
0xFB, 0xC6, 0x3, 0xF8)
|
||||
kPACEPrologue = (0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xAB, 0xCD, 0xEF, 0xEF)
|
||||
|
||||
def __init__(self, iostream=None, rwtsclass=MoofRWTS, loggerclass=DefaultLogger):
|
||||
wozardry.WozDiskImage.__init__(self, iostream)
|
||||
self.logger = loggerclass()
|
||||
self.rwts = rwtsclass()
|
||||
for i,t in zip(range(len(self.tracks)), self.tracks):
|
||||
self.tracks[i] = MoofTrack(t.raw_bytes, t.raw_count)
|
||||
self.parse()
|
||||
|
||||
def get_pace_key_at_point(self, track, bit_index):
|
||||
# save bitstream position
|
||||
track.bit_index, bit_index = bit_index, track.bit_index
|
||||
key = []
|
||||
prologue = (0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xAB, 0xCD, 0xEF, 0xEF)
|
||||
if rwts.verify_nibbles_at_point(track, prologue):
|
||||
if self.rwts.verify_nibbles_at_point(track, self.kPACEPrologue):
|
||||
for i in range(4):
|
||||
next(track.nibble())
|
||||
for i in range(4):
|
||||
@ -222,67 +245,144 @@ def get_pace_key_at_point(rwts, track, bit_index):
|
||||
x = (x | (x >> 8)) & 0xffff
|
||||
key.append(x)
|
||||
key.reverse()
|
||||
# restore bitstream position
|
||||
track.bit_index, bit_index = bit_index, track.bit_index
|
||||
return "".join(map(myhex, key))
|
||||
|
||||
def parse(self):
|
||||
self.blocks = []
|
||||
# only 400K and 800K disks supported at the moment
|
||||
if not self.info["disk_type"] in (1,2): return
|
||||
for track_index in self.tmap:
|
||||
if track_index == 0xFF: continue
|
||||
track = self.tracks[track_index]
|
||||
seen_sectors = []
|
||||
track_id = -1
|
||||
while self.rwts.find_address_prologue(track):
|
||||
address_field = self.rwts.address_field_at_point(track)
|
||||
|
||||
# log if address field checksum doesn't match
|
||||
if not address_field.valid:
|
||||
self.logger.warn(
|
||||
'T{T},S{S} Address field checksum invalid',
|
||||
T=address_field.track_id,
|
||||
S=address_field.sector_id
|
||||
)
|
||||
continue
|
||||
|
||||
# log if track ID is ridiculous
|
||||
if not (0x00 <= address_field.track_id <= 0x9F):
|
||||
self.logger.warn(
|
||||
'Address field track ID {T} invalid',
|
||||
T=address_field.track_id
|
||||
)
|
||||
continue
|
||||
|
||||
# log if sector ID is ridiculous
|
||||
if not (0x00 <= address_field.sector_id < self.rwts.sectors_per_track[address_field.track_id]):
|
||||
self.logger.warn(
|
||||
'Address field sector ID {S} invalid',
|
||||
S=address_field.sector_id
|
||||
)
|
||||
continue
|
||||
|
||||
# log if address field epilogue isn't next
|
||||
if not self.rwts.verify_address_epilogue_at_point(track):
|
||||
self.logger.warn(
|
||||
'T{T},S{S} Address field epilogue invalid',
|
||||
T=address_field.track_id,
|
||||
S=address_field.sector_id
|
||||
)
|
||||
continue
|
||||
|
||||
# if we see duplicate sector IDs, assume we're done
|
||||
if address_field.sector_id in seen_sectors: break
|
||||
seen_sectors.append(address_field.sector_id)
|
||||
|
||||
old_bit_index = track.bit_index
|
||||
if not self.rwts.find_data_prologue(track):
|
||||
# if we didn't find any data field prologue at all
|
||||
# before the next address field prologue, then check
|
||||
# if this is a specially formatted protection sector
|
||||
# from which we can extract some useful information
|
||||
decryption_key = self.get_pace_key_at_point(track, old_bit_index)
|
||||
if decryption_key:
|
||||
self.logger.info(
|
||||
'T{T},S{S} Found PACE protection, key={X}',
|
||||
T=address_field.track_id,
|
||||
S=address_field.sector_id,
|
||||
X=decryption_key
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
data_field = self.rwts.data_field_at_point(track)
|
||||
except KeyError:
|
||||
# log if GCR decoding failed
|
||||
self.logger.warn(
|
||||
'T{T},S{S} Data field contains invalid nibble',
|
||||
T=address_field.track_id,
|
||||
S=address_field.sector_id
|
||||
)
|
||||
continue
|
||||
|
||||
# log if checksums didn't match after GCR decoding
|
||||
if not data_field.valid:
|
||||
self.logger.warn(
|
||||
'T{T},S{S} Data field checksums invalid',
|
||||
T=address_field.track_id,
|
||||
S=address_field.sector_id
|
||||
)
|
||||
continue
|
||||
|
||||
# address and data fields are supposed to contain
|
||||
# matching sector IDs, so log if they don't match
|
||||
if address_field.sector_id != data_field.sector_id:
|
||||
self.logger.warn(
|
||||
'T{T},S{S} Address and data field sector IDs do not match',
|
||||
T=address_field.track_id,
|
||||
S=address_field.sector_id
|
||||
)
|
||||
continue
|
||||
|
||||
# log if sector data contains E7 bitstream
|
||||
if (sum(data_field.data[:0x18E]) == 0) and (tuple(data_field.data[0x18F:0x1A3]) == self.kE7Bytestream):
|
||||
#print(data_field.data[0x18F:0x1A3])
|
||||
self.logger.warn(
|
||||
'T{T},S{S} Found E7 bitstream',
|
||||
T=address_field.track_id,
|
||||
S=address_field.sector_id
|
||||
)
|
||||
|
||||
# log if data field epilogue isn't next
|
||||
if not self.rwts.verify_data_epilogue_at_point(track):
|
||||
self.logger.warn(
|
||||
'T{T},S{S} Data field epilogue invalid',
|
||||
T=address_field.track_id,
|
||||
S=address_field.sector_id
|
||||
)
|
||||
continue
|
||||
|
||||
track_id = address_field.track_id
|
||||
self.blocks.append(MoofBlock(address_field, data_field))
|
||||
|
||||
# move on if we didn't find any valid sectors
|
||||
if track_id == -1: continue
|
||||
|
||||
# log if we didn't find enough sectors
|
||||
sector_count = len(seen_sectors)
|
||||
expected_count = self.rwts.sectors_per_track[track_id]
|
||||
if sector_count < expected_count:
|
||||
self.logger.warn(
|
||||
'T{T} Found {X} sectors (expected {Y})',
|
||||
T=track_id,
|
||||
X=sector_count,
|
||||
Y=expected_count
|
||||
)
|
||||
|
||||
def driver(filename):
|
||||
with open(filename, 'rb') as f:
|
||||
mdisk = MoofDiskImage(f)
|
||||
if not mdisk.info["disk_type"] in (1,2):
|
||||
print("/!\ MFM disks not supported yet")
|
||||
return
|
||||
rwts = MoofRWTS()
|
||||
for track_index in mdisk.tmap:
|
||||
if track_index == 0xFF: continue
|
||||
track = mdisk.tracks[track_index]
|
||||
seen_sectors = []
|
||||
track_id = -1
|
||||
while True:
|
||||
if not rwts.find_address_prologue(track): break
|
||||
address_field = rwts.address_field_at_point(track)
|
||||
if not address_field.valid: continue
|
||||
if (address_field.track_id < 0) or (address_field.track_id > 0x9F):
|
||||
print(f'/!\ invalid track ID {myhex(address_field.track_id)}')
|
||||
continue
|
||||
if (address_field.sector_id < 0) or \
|
||||
(address_field.sector_id > rwts.expected_sectors_per_track[address_field.track_id]):
|
||||
print(f'/!\ invalid sector ID {myhex(address_field.sector_id)}')
|
||||
continue
|
||||
if not rwts.verify_address_epilogue_at_point(track):
|
||||
print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: address epilogue does not match')
|
||||
continue
|
||||
if address_field.sector_id in seen_sectors: break
|
||||
seen_sectors.append(address_field.sector_id)
|
||||
old_bit_index = track.bit_index
|
||||
if not rwts.find_data_prologue(track):
|
||||
key = get_pace_key_at_point(rwts, track, old_bit_index)
|
||||
if key:
|
||||
print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: found PACE decryption key {key}')
|
||||
continue
|
||||
try:
|
||||
valid, sector_id, tags, decoded_bytes = rwts.data_field_at_point(track)
|
||||
except KeyError:
|
||||
print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: invalid nibble')
|
||||
continue
|
||||
if not valid:
|
||||
print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: data checksums do not match')
|
||||
continue
|
||||
if valid:
|
||||
valid = address_field.sector_id == sector_id
|
||||
if not valid:
|
||||
print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: address/data field sector IDs do not match')
|
||||
continue
|
||||
valid = rwts.verify_data_epilogue_at_point(track)
|
||||
if not valid:
|
||||
print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: data epilogue does not match')
|
||||
continue
|
||||
if (track_id == -1) and (address_field.valid):
|
||||
track_id = address_field.track_id
|
||||
if track_id == -1: continue
|
||||
sector_count = len(seen_sectors)
|
||||
expected_count = rwts.expected_sectors_per_track[track_id]
|
||||
if sector_count < expected_count:
|
||||
print(f'/!\ track {myhex(track_id)} only has {sector_count} sectors (expected {expected_count})')
|
||||
|
||||
if __name__ == '__main__':
|
||||
driver(sys.argv[1])
|
||||
|
Loading…
Reference in New Issue
Block a user