diff --git a/moofimage.py b/moofimage.py index 6a0f823..61395fc 100755 --- a/moofimage.py +++ b/moofimage.py @@ -10,12 +10,6 @@ import sys def myhex(b): return hex(b)[2:].rjust(2, "0").upper() -class MoofDiskImage(wozardry.WozDiskImage): - def __init__(self, iostream=None): - wozardry.WozDiskImage.__init__(self, iostream) - for i,t in zip(range(len(self.tracks)), self.tracks): - self.tracks[i] = MoofTrack(t.raw_bytes, t.raw_count) - class MoofTrack(wozardry.Track): def __init__(self, raw_bytes, raw_count): wozardry.Track.__init__(self, raw_bytes, raw_count) @@ -26,30 +20,31 @@ class MoofTrack(wozardry.Track): while len(self.bits) > raw_count: self.bits.pop() - def bit(self): - b = self.bits[self.bit_index] and 1 or 0 - self.bit_index += 1 - if self.bit_index >= self.raw_count: - self.bit_index = 0 + def rewind(self, bit_count=1): + self.bit_index -= bit_count + while self.bit_index < 0: + self.bit_index += self.raw_count + self.revolutions -= 1 + + def forward(self, bit_count=1): + self.bit_index += bit_count + while self.bit_index >= self.raw_count: + self.bit_index -= self.raw_count self.revolutions += 1 + + def bit(self): + b = self.bits[self.bit_index] + self.forward() yield b def nibble(self): - b = 0 - while b == 0: - b = next(self.bit()) - n = 0x80 + while not next(self.bit()): pass + n = 0b10000000 for bit_index in range(6, -1, -1): b = next(self.bit()) - n += b << bit_index + n |= b << bit_index yield n - def rewind(self, bit_count=1): - self.bit_index -= bit_count - if self.bit_index < 0: - self.bit_index = self.raw_count - 1 - self.revolutions -= 1 - def find(self, sequence): starting_revolutions = self.revolutions seen = [0] * len(sequence) @@ -76,21 +71,23 @@ class MoofTrack(wozardry.Track): return False class MoofAddressField: - def __init__(self, volume, track_id, sector_id, valid): + def __init__(self, valid, volume, track_id, sector_id): + self.valid = valid self.volume = volume self.track_id = track_id self.sector_id = sector_id + +class MoofDataField: + def __init__(self, valid, sector_id, tags, data): self.valid = valid + self.sector_id = sector_id + self.tags = tags + self.data = data class MoofBlock: - def __init__(self, address_field, decoded, start_bit_index=None, end_bit_index=None): + def __init__(self, address_field, data_field): self.address_field = address_field - self.decoded = decoded - self.start_bit_index = start_bit_index - self.end_bit_index = end_bit_index - - def __getitem__(self, i): - return self.decoded[i] + self.data_field = data_field class MoofRWTS: kDefaultAddressPrologue16 = (0xD5, 0xAA, 0x96) @@ -119,7 +116,7 @@ class MoofRWTS: self.data_prologue = data_prologue self.data_epilogue = data_epilogue self.nibble_translate_table = nibble_translate_table - self.expected_sectors_per_track = dict(zip(range(0xA0), (i for i in range(0x0C,0x07,-1) for j in range(0x20)))) + self.sectors_per_track = dict(zip(range(0xA0), (i for i in range(0x0C,0x07,-1) for j in range(0x20)))) def _(self, track): return self.nibble_translate_table[next(track.nibble())] @@ -133,8 +130,9 @@ class MoofRWTS: h2 = self._(track) volume = self._(track) checksum = self._(track) + valid = h0 ^ sector_id ^ h2 ^ volume == checksum track_id = (h0 << 1) | ((h2 & 0b00000001) << 7) | ((h2 & 0b00100000) >> 5) - return MoofAddressField(volume, track_id, sector_id, h0 ^ sector_id ^ h2 ^ volume == checksum) + return MoofAddressField(valid, volume, track_id, sector_id) def verify_nibbles_at_point(self, track, nibbles): found = [] @@ -192,8 +190,8 @@ class MoofRWTS: for n in nibble_groups)) # decode 524 bytes (12 tag bytes + 512 data bytes) - tag_bytes = [next(gcr_byte) for i in range(12)] - decoded_bytes = [next(gcr_byte) for i in range(512)] + tags = [next(gcr_byte) for i in range(12)] + data = [next(gcr_byte) for i in range(512)] # validate checksums against last data field nibble and three epilogue nibbles valid = nibble_groups[-1][-1] == ((c1 & 0b11000000) >> 6) | ((c2 & 0b11000000) >> 4) | ((c3 & 0b11000000) >> 2) @@ -201,88 +199,190 @@ class MoofRWTS: valid &= self._(track) == (c2 & 0b00111111) valid &= self._(track) == (c1 & 0b00111111) - return valid, sector_id, tag_bytes, decoded_bytes + return MoofDataField(valid, sector_id, tags, data) def verify_data_epilogue_at_point(self, track): return self.verify_nibbles_at_point(track, self.data_epilogue) -def get_pace_key_at_point(rwts, track, bit_index): - track.bit_index, bit_index = bit_index, track.bit_index - key = [] - prologue = (0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xAB, 0xCD, 0xEF, 0xEF) - if rwts.verify_nibbles_at_point(track, prologue): - for i in range(4): - next(track.nibble()) - for i in range(4): - x = (next(track.nibble()) << 8) + next(track.nibble()) - x = x & 0x5555 - x = (x | (x >> 1)) & 0x3333 - x = (x | (x >> 2)) & 0x0f0f - x = (x | (x >> 4)) & 0x00ff - x = (x | (x >> 8)) & 0xffff - key.append(x) - key.reverse() - track.bit_index, bit_index = bit_index, track.bit_index - return "".join(map(myhex, key)) +class DefaultLogger: + def warn(self, message, T=None, S=None, X=None, Y=None): + if T: T = myhex(T) + if S: S = myhex(S) + message = message.format(**locals()) + sys.stderr.write(message) + sys.stderr.write('\n') + info=warn + error=warn + +class MoofDiskImage(wozardry.WozDiskImage): + kE7Bytestream = (0x2B, 0x00, 0x2B, 0xFD, 0x83, 0x6F, 0x20, 0xE2, + 0x8D, 0x99, 0x49, 0x44, 0x47, 0x82, 0xD9, 0x26, + 0xFB, 0xC6, 0x3, 0xF8) + kPACEPrologue = (0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xAB, 0xCD, 0xEF, 0xEF) + + def __init__(self, iostream=None, rwtsclass=MoofRWTS, loggerclass=DefaultLogger): + wozardry.WozDiskImage.__init__(self, iostream) + self.logger = loggerclass() + self.rwts = rwtsclass() + for i,t in zip(range(len(self.tracks)), self.tracks): + self.tracks[i] = MoofTrack(t.raw_bytes, t.raw_count) + self.parse() + + def get_pace_key_at_point(self, track, bit_index): + # save bitstream position + track.bit_index, bit_index = bit_index, track.bit_index + key = [] + if self.rwts.verify_nibbles_at_point(track, self.kPACEPrologue): + for i in range(4): + next(track.nibble()) + for i in range(4): + x = (next(track.nibble()) << 8) + next(track.nibble()) + x = x & 0x5555 + x = (x | (x >> 1)) & 0x3333 + x = (x | (x >> 2)) & 0x0f0f + x = (x | (x >> 4)) & 0x00ff + x = (x | (x >> 8)) & 0xffff + key.append(x) + key.reverse() + # restore bitstream position + track.bit_index, bit_index = bit_index, track.bit_index + return "".join(map(myhex, key)) + + def parse(self): + self.blocks = [] + # only 400K and 800K disks supported at the moment + if not self.info["disk_type"] in (1,2): return + for track_index in self.tmap: + if track_index == 0xFF: continue + track = self.tracks[track_index] + seen_sectors = [] + track_id = -1 + while self.rwts.find_address_prologue(track): + address_field = self.rwts.address_field_at_point(track) + + # log if address field checksum doesn't match + if not address_field.valid: + self.logger.warn( + 'T{T},S{S} Address field checksum invalid', + T=address_field.track_id, + S=address_field.sector_id + ) + continue + + # log if track ID is ridiculous + if not (0x00 <= address_field.track_id <= 0x9F): + self.logger.warn( + 'Address field track ID {T} invalid', + T=address_field.track_id + ) + continue + + # log if sector ID is ridiculous + if not (0x00 <= address_field.sector_id < self.rwts.sectors_per_track[address_field.track_id]): + self.logger.warn( + 'Address field sector ID {S} invalid', + S=address_field.sector_id + ) + continue + + # log if address field epilogue isn't next + if not self.rwts.verify_address_epilogue_at_point(track): + self.logger.warn( + 'T{T},S{S} Address field epilogue invalid', + T=address_field.track_id, + S=address_field.sector_id + ) + continue + + # if we see duplicate sector IDs, assume we're done + if address_field.sector_id in seen_sectors: break + seen_sectors.append(address_field.sector_id) + + old_bit_index = track.bit_index + if not self.rwts.find_data_prologue(track): + # if we didn't find any data field prologue at all + # before the next address field prologue, then check + # if this is a specially formatted protection sector + # from which we can extract some useful information + decryption_key = self.get_pace_key_at_point(track, old_bit_index) + if decryption_key: + self.logger.info( + 'T{T},S{S} Found PACE protection, key={X}', + T=address_field.track_id, + S=address_field.sector_id, + X=decryption_key + ) + continue + + try: + data_field = self.rwts.data_field_at_point(track) + except KeyError: + # log if GCR decoding failed + self.logger.warn( + 'T{T},S{S} Data field contains invalid nibble', + T=address_field.track_id, + S=address_field.sector_id + ) + continue + + # log if checksums didn't match after GCR decoding + if not data_field.valid: + self.logger.warn( + 'T{T},S{S} Data field checksums invalid', + T=address_field.track_id, + S=address_field.sector_id + ) + continue + + # address and data fields are supposed to contain + # matching sector IDs, so log if they don't match + if address_field.sector_id != data_field.sector_id: + self.logger.warn( + 'T{T},S{S} Address and data field sector IDs do not match', + T=address_field.track_id, + S=address_field.sector_id + ) + continue + + # log if sector data contains E7 bitstream + if (sum(data_field.data[:0x18E]) == 0) and (tuple(data_field.data[0x18F:0x1A3]) == self.kE7Bytestream): + #print(data_field.data[0x18F:0x1A3]) + self.logger.warn( + 'T{T},S{S} Found E7 bitstream', + T=address_field.track_id, + S=address_field.sector_id + ) + + # log if data field epilogue isn't next + if not self.rwts.verify_data_epilogue_at_point(track): + self.logger.warn( + 'T{T},S{S} Data field epilogue invalid', + T=address_field.track_id, + S=address_field.sector_id + ) + continue + + track_id = address_field.track_id + self.blocks.append(MoofBlock(address_field, data_field)) + + # move on if we didn't find any valid sectors + if track_id == -1: continue + + # log if we didn't find enough sectors + sector_count = len(seen_sectors) + expected_count = self.rwts.sectors_per_track[track_id] + if sector_count < expected_count: + self.logger.warn( + 'T{T} Found {X} sectors (expected {Y})', + T=track_id, + X=sector_count, + Y=expected_count + ) def driver(filename): with open(filename, 'rb') as f: mdisk = MoofDiskImage(f) - if not mdisk.info["disk_type"] in (1,2): - print("/!\ MFM disks not supported yet") - return - rwts = MoofRWTS() - for track_index in mdisk.tmap: - if track_index == 0xFF: continue - track = mdisk.tracks[track_index] - seen_sectors = [] - track_id = -1 - while True: - if not rwts.find_address_prologue(track): break - address_field = rwts.address_field_at_point(track) - if not address_field.valid: continue - if (address_field.track_id < 0) or (address_field.track_id > 0x9F): - print(f'/!\ invalid track ID {myhex(address_field.track_id)}') - continue - if (address_field.sector_id < 0) or \ - (address_field.sector_id > rwts.expected_sectors_per_track[address_field.track_id]): - print(f'/!\ invalid sector ID {myhex(address_field.sector_id)}') - continue - if not rwts.verify_address_epilogue_at_point(track): - print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: address epilogue does not match') - continue - if address_field.sector_id in seen_sectors: break - seen_sectors.append(address_field.sector_id) - old_bit_index = track.bit_index - if not rwts.find_data_prologue(track): - key = get_pace_key_at_point(rwts, track, old_bit_index) - if key: - print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: found PACE decryption key {key}') - continue - try: - valid, sector_id, tags, decoded_bytes = rwts.data_field_at_point(track) - except KeyError: - print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: invalid nibble') - continue - if not valid: - print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: data checksums do not match') - continue - if valid: - valid = address_field.sector_id == sector_id - if not valid: - print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: address/data field sector IDs do not match') - continue - valid = rwts.verify_data_epilogue_at_point(track) - if not valid: - print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: data epilogue does not match') - continue - if (track_id == -1) and (address_field.valid): - track_id = address_field.track_id - if track_id == -1: continue - sector_count = len(seen_sectors) - expected_count = rwts.expected_sectors_per_track[track_id] - if sector_count < expected_count: - print(f'/!\ track {myhex(track_id)} only has {sector_count} sectors (expected {expected_count})') if __name__ == '__main__': driver(sys.argv[1])