From 74e8e994c9000c14837c2809221503f62ea21774 Mon Sep 17 00:00:00 2001 From: 4am Date: Thu, 29 Sep 2022 18:16:18 -0400 Subject: [PATCH] [WIP] start factoring out format-specific bit banging --- moofimage.py | 282 +++++++++++++++++++++++++++++++++++++++++++++++++++ wozardry.py | 91 +---------------- 2 files changed, 285 insertions(+), 88 deletions(-) create mode 100755 moofimage.py diff --git a/moofimage.py b/moofimage.py new file mode 100755 index 0000000..f4e684c --- /dev/null +++ b/moofimage.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 + +#(c) 2022 by 4am +#license:MIT + +import wozardry # https://github.com/a2-4am/wozardry +import bitarray # https://pypi.org/project/bitarray/ +import sys + +def myhex(b): + return hex(b)[2:].rjust(2, "0").upper() + +class MoofDiskImage(wozardry.WozDiskImage): + def __init__(self, iostream=None): + wozardry.WozDiskImage.__init__(self, iostream) + for i,t in zip(range(len(self.tracks)), self.tracks): + self.tracks[i] = MoofTrack(t.raw_bytes, t.raw_count) + +class MoofTrack(wozardry.Track): + def __init__(self, raw_bytes, raw_count): + wozardry.Track.__init__(self, raw_bytes, raw_count) + self.bit_index = 0 + self.revolutions = 0 + self.bits = bitarray.bitarray(endian="big") + self.bits.frombytes(self.raw_bytes) + while len(self.bits) > raw_count: + self.bits.pop() + + def bit(self): + b = self.bits[self.bit_index] and 1 or 0 + self.bit_index += 1 + if self.bit_index >= self.raw_count: + self.bit_index = 0 + self.revolutions += 1 + yield b + + def nibble(self): + b = 0 + while b == 0: + b = next(self.bit()) + n = 0x80 + for bit_index in range(6, -1, -1): + b = next(self.bit()) + n += b << bit_index + yield n + + def rewind(self, bit_count=1): + self.bit_index -= bit_count + if self.bit_index < 0: + self.bit_index = self.raw_count - 1 + self.revolutions -= 1 + + def find(self, sequence): + starting_revolutions = self.revolutions + seen = [0] * len(sequence) + while (self.revolutions < starting_revolutions + 2): + del seen[0] + seen.append(next(self.nibble())) + if tuple(seen) == tuple(sequence): return True + return False + + def find_this_not_that(self, good, bad): + starting_revolutions = self.revolutions + good = tuple(good) + bad = tuple(bad) + seen_good = [0] * len(good) + seen_bad = [0] * len(bad) + while (self.revolutions < starting_revolutions + 2): + del seen_good[0] + del seen_bad[0] + n = next(self.nibble()) + seen_good.append(n) + seen_bad.append(n) + if tuple(seen_bad) == bad: return False + if tuple(seen_good) == good: return True + return False + +class MoofAddressField: + def __init__(self, volume, track_id, sector_id, valid): + self.volume = volume + self.track_id = track_id + self.sector_id = sector_id + self.valid = valid + +class MoofBlock: + def __init__(self, address_field, decoded, start_bit_index=None, end_bit_index=None): + self.address_field = address_field + self.decoded = decoded + self.start_bit_index = start_bit_index + self.end_bit_index = end_bit_index + + def __getitem__(self, i): + return self.decoded[i] + +class MoofRWTS: + kDefaultAddressPrologue16 = (0xD5, 0xAA, 0x96) + kDefaultAddressEpilogue16 = (0xDE, 0xAA) + kDefaultDataPrologue16 = (0xD5, 0xAA, 0xAD) + kDefaultDataEpilogue16 = (0xDE, 0xAA) + kDefaultNibbleTranslationTable16 = { + 0x96: 0x00, 0x97: 0x01, 0x9A: 0x02, 0x9B: 0x03, 0x9D: 0x04, 0x9E: 0x05, 0x9F: 0x06, 0xA6: 0x07, + 0xA7: 0x08, 0xAB: 0x09, 0xAC: 0x0A, 0xAD: 0x0B, 0xAE: 0x0C, 0xAF: 0x0D, 0xB2: 0x0E, 0xB3: 0x0F, + 0xB4: 0x10, 0xB5: 0x11, 0xB6: 0x12, 0xB7: 0x13, 0xB9: 0x14, 0xBA: 0x15, 0xBB: 0x16, 0xBC: 0x17, + 0xBD: 0x18, 0xBE: 0x19, 0xBF: 0x1A, 0xCB: 0x1B, 0xCD: 0x1C, 0xCE: 0x1D, 0xCF: 0x1E, 0xD3: 0x1F, + 0xD6: 0x20, 0xD7: 0x21, 0xD9: 0x22, 0xDA: 0x23, 0xDB: 0x24, 0xDC: 0x25, 0xDD: 0x26, 0xDE: 0x27, + 0xDF: 0x28, 0xE5: 0x29, 0xE6: 0x2A, 0xE7: 0x2B, 0xE9: 0x2C, 0xEA: 0x2D, 0xEB: 0x2E, 0xEC: 0x2F, + 0xED: 0x30, 0xEE: 0x31, 0xEF: 0x32, 0xF2: 0x33, 0xF3: 0x34, 0xF4: 0x35, 0xF5: 0x36, 0xF6: 0x37, + 0xF7: 0x38, 0xF9: 0x39, 0xFA: 0x3A, 0xFB: 0x3B, 0xFC: 0x3C, 0xFD: 0x3D, 0xFE: 0x3E, 0xFF: 0x3F, + } + + def __init__(self, + address_prologue = kDefaultAddressPrologue16, + address_epilogue = kDefaultAddressEpilogue16, + data_prologue = kDefaultDataPrologue16, + data_epilogue = kDefaultDataEpilogue16, + nibble_translate_table = kDefaultNibbleTranslationTable16): + self.address_prologue = address_prologue + self.address_epilogue = address_epilogue + self.data_prologue = data_prologue + self.data_epilogue = data_epilogue + self.nibble_translate_table = nibble_translate_table + self.expected_sectors_per_track = dict(zip(range(0xA0), (i for i in range(0x0C,0x07,-1) for j in range(0x20)))) + + def _(self, track): + return self.nibble_translate_table[next(track.nibble())] + + def find_address_prologue(self, track): + return track.find(self.address_prologue) + + def address_field_at_point(self, track): + h0 = self._(track) + sector_id = self._(track) + h2 = self._(track) + volume = self._(track) + checksum = self._(track) + track_id = (h0 << 1) | ((h2 & 0b00000001) << 7) | ((h2 & 0b00100000) >> 5) + return MoofAddressField(volume, track_id, sector_id, h0 ^ sector_id ^ h2 ^ volume == checksum) + + def verify_nibbles_at_point(self, track, nibbles): + found = [] + for i in nibbles: + found.append(next(track.nibble())) + return tuple(found) == tuple(nibbles) + + def verify_address_epilogue_at_point(self, track): + return self.verify_nibbles_at_point(track, self.address_epilogue) + + def find_data_prologue(self, track): + return track.find_this_not_that(self.data_prologue, self.address_prologue) + + def data_field_at_point(self, track): + # three checksums + c1 = c2 = c3 = 0 + + # generator to decode grouped bytes while juggling three checksums + def gcr_generator(byte_groups): + nonlocal c1, c2, c3 + for d0, d1, d2 in byte_groups: + c1 = (c1 & 0b11111111) << 1 + if c1 > 0xFF: + c1 -= 0xFF + c3 += 1 + b = d0 ^ c1 + c3 += b + yield b + + if c3 > 0xFF: + c3 &= 0b11111111 + c2 += 1 + b = d1 ^ c3 + c2 += b + yield b + + if c2 > 0xFF: + c2 &= 0b11111111 + c1 += 1 + b = d2 ^ c2 + c1 += b + yield b + + # first nibble is sector number + sector_id = self._(track) + + # read 700 nibbles, decode each against nibble translate table, store in 175 groups of 4 + nibble_groups = [(self._(track), self._(track), self._(track), self._(track)) + for i in range(175)] + + # convert each group of 4 nibbles into a group of 3 bytes to pass into the decoder + gcr_byte = gcr_generator((((n[1] & 0b00111111) | ((n[0] << 2) & 0b11000000), + (n[2] & 0b00111111) | ((n[0] << 4) & 0b11000000), + (n[3] & 0b00111111) | ((n[0] << 6) & 0b11000000)) + for n in nibble_groups)) + + # decode 524 bytes (12 tag bytes + 512 data bytes) + tag_bytes = [next(gcr_byte) for i in range(12)] + decoded_bytes = [next(gcr_byte) for i in range(512)] + + # validate checksums against last data field nibble and three epilogue nibbles + valid = nibble_groups[-1][-1] == ((c1 & 0b11000000) >> 6) | ((c2 & 0b11000000) >> 4) | ((c3 & 0b11000000) >> 2) + valid &= self._(track) == (c3 & 0b00111111) + valid &= self._(track) == (c2 & 0b00111111) + valid &= self._(track) == (c1 & 0b00111111) + + return valid, sector_id, tag_bytes, decoded_bytes + + def verify_data_epilogue_at_point(self, track): + return self.verify_nibbles_at_point(track, self.data_epilogue) + +def driver(filename): + with open(filename, 'rb') as f: + mdisk = MoofDiskImage(f) + rwts = MoofRWTS() + for track_index in mdisk.tmap: + if track_index == 0xFF: continue + track = mdisk.tracks[track_index] + seen_sectors = [] + track_id = -1 + while True: + if not rwts.find_address_prologue(track): break + address_field = rwts.address_field_at_point(track) + if not address_field.valid: continue + if (address_field.track_id < 0) or (address_field.track_id > 0x9F): + print(f'/!\ invalid track ID {myhex(address_field.track_id)}') + continue + if (address_field.sector_id < 0) or \ + (address_field.sector_id > rwts.expected_sectors_per_track[address_field.track_id]): + print(f'/!\ invalid sector ID {myhex(address_field.sector_id)}') + continue + if not rwts.verify_address_epilogue_at_point(track): + print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: address epilogue does not match') + continue + if address_field.sector_id in seen_sectors: break + seen_sectors.append(address_field.sector_id) + bit_index = track.bit_index + if not rwts.find_data_prologue(track): + track.bit_index = bit_index + if rwts.verify_nibbles_at_point(track, (0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xAB, 0xCD, 0xEF, 0xEF)): + print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: found PACE decryption key ', end='') + # extract PACE decryption key + for i in range(4): + next(track.nibble()) + key = [] + for i in range(4): + x = (next(track.nibble()) << 8) + next(track.nibble()) + x = x & 0x5555 + x = (x | (x >> 1)) & 0x3333 + x = (x | (x >> 2)) & 0x0f0f + x = (x | (x >> 4)) & 0x00ff + x = (x | (x >> 8)) & 0xffff + key.append(x) + key.reverse() + for x in key: + print(myhex(x), end='') + print() + continue + try: + valid, sector_id, tags, decoded_bytes = rwts.data_field_at_point(track) + except KeyError: + print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: invalid nibble') + continue + if not valid: + print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: data checksums do not match') + continue + if valid: + valid = address_field.sector_id == sector_id + if not valid: + print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: address/data field sector IDs do not match') + continue + valid = rwts.verify_data_epilogue_at_point(track) + if not valid: + print(f'/!\ track {myhex(address_field.track_id)}, sector {myhex(address_field.sector_id)}: data epilogue does not match') + continue + if (track_id == -1) and (address_field.valid): + track_id = address_field.track_id + if track_id == -1: continue + sector_count = len(seen_sectors) + expected_count = rwts.expected_sectors_per_track[track_id] + if sector_count < expected_count: + print(f'/!\ track {myhex(track_id)} only has {sector_count} sectors (expected {expected_count})') + +if __name__ == '__main__': + driver(sys.argv[1]) diff --git a/wozardry.py b/wozardry.py index 829a193..98dc6bc 100755 --- a/wozardry.py +++ b/wozardry.py @@ -3,39 +3,6 @@ #(c) 2018-2022 by 4am #license:MIT -""" -// INFO chunk begins at byte 12 - data.append(Data("INFO".utf8)) // 12: beginning of INFO chunk - data.write32UL(60) // 16: INFO chunk size - data.write8U(0x01) // 20: INFO chunk version - // 21: disk type ( 0 = ?, 1 = 3.5 SSDD, 2 = 3.5 DSDD, 3 = 3.5 DSHD) - let isHD = disk.diskInfo.contains(.highDensity) - if disk.diskInfo.contains(.doubleSided) { - data.write8U(isHD ? 3 : 2) - } else { - data.write8U(1) - } - data.write8U(disk.writeProtected ? 0x01 : 0x00) // 22: write protected - data.write8U(disk.synchronized ? 0x01 : 0x00) // 23: tracks synchronized - data.write8U(isHD ? 8 : 16) // 24: optimal bit timing - // 25: creator - if let bundle_info = Bundle.main.infoDictionary { - let vers = bundle_info["CFBundleShortVersionString"] as? String ?? "x.x" - let creator: String - if disk.fastImaged { - creator = String(format: "Applesauce v%@ Fast Imager ", vers) - } else { - creator = String(format: "Applesauce v%@ ", vers) - } - data.append(Data(creator.utf8.prefix(32))) - } - data.write8U(0x00) // 57: pad (always zero) - data.write16UL(0x0000) // 58: largest track (will be updated to correct value later) - data.write16UL(0x0000) // 60: starting block of FLUX chunk - data.write16UL(0x0000) // 62: largest flux track (will be updated to correct value later) - // ... pad bytes out to 90 (68) ... - data.writeData(Data(repeating: 0x00, count: 68 - data.count)) -""" import argparse import binascii import collections @@ -189,62 +156,10 @@ def raise_if(cond, e, s=""): if cond: raise e(s) class Track: - def __init__(self, data, count, is_legacy=True): - if is_legacy: - # parameters are bits and bit count - self.oldinit(data, count) - else: - # parameters are bytes and byte count - self.newinit(data, count) - - def newinit(self, raw_bytes, raw_count): + def __init__(self, raw_bytes, raw_count): self.raw_bytes = raw_bytes self.raw_count = raw_count - def oldinit(self, bits, bit_count): - import bitarray # https://pypi.org/project/bitarray/ - self.bits = bitarray.bitarray(endian="big") - bits.frombytes(self.raw_bytes) - while len(self.bits) > bit_count: - self.bits.pop() - self.bit_count = bit_count - self.bit_index = 0 - self.revolutions = 0 - self.newinit(self.bits.tobytes(), (self.bit_count + 7) // 8) - - def bit(self): - b = self.bits[self.bit_index] and 1 or 0 - self.bit_index += 1 - if self.bit_index >= self.bit_count: - self.bit_index = 0 - self.revolutions += 1 - yield b - - def nibble(self): - b = 0 - while b == 0: - b = next(self.bit()) - n = 0x80 - for bit_index in range(6, -1, -1): - b = next(self.bit()) - n += b << bit_index - yield n - - def rewind(self, bit_count=1): - self.bit_index -= bit_count - if self.bit_index < 0: - self.bit_index = self.bit_count - 1 - self.revolutions -= 1 - - def find(self, sequence): - starting_revolutions = self.revolutions - seen = [0] * len(sequence) - while (self.revolutions < starting_revolutions + 2): - del seen[0] - seen.append(next(self.nibble())) - if tuple(seen) == tuple(sequence): return True - return False - class WozDiskImage: def __init__(self, iostream=None): if iostream: @@ -390,7 +305,7 @@ class WozDiskImage: if splice_point != 0xFFFF: raise_if(splice_bit_count not in (8,9,10), WozTRKSFormatError, "TRKS chunk %d splice_bit_count is out of range" % len(self.tracks)) i += 3 - self.tracks.append(Track(raw_bytes, count, False)) + self.tracks.append(Track(raw_bytes, count)) def _load_trks_v2(self, data): for trk in range(160): @@ -407,7 +322,7 @@ class WozDiskImage: raise_if(len(data) <= bits_index_into_data, WozTRKSFormatError_BadStartingBlock, sEOF) raw_bytes = data[bits_index_into_data : bits_index_into_data + block_count*512] raise_if(len(raw_bytes) != block_count*512, WozTRKSFormatError_BadBlockCount, sEOF) - self.tracks.append(Track(raw_bytes, count, False)) + self.tracks.append(Track(raw_bytes, count)) def _load_flux(self, data): self.flux = list(data)