From 046902f1e3a2bd59c0a678e55bfba3259311d8f1 Mon Sep 17 00:00:00 2001 From: 4am Date: Tue, 29 Jan 2019 19:31:10 -0500 Subject: [PATCH] update to latest wozardry --- passport.py | 4 +- passport/__init__.py | 8 +- passport/a2rimage.py | 25 +- passport/eddimage.py | 13 +- passport/{ => old}/wozimage.py | 0 passport/wozardry.py | 650 +++++++++++++++++++++++++++++++++ 6 files changed, 669 insertions(+), 31 deletions(-) rename passport/{ => old}/wozimage.py (100%) create mode 100755 passport/wozardry.py diff --git a/passport.py b/passport.py index c907efa..90b16b9 100755 --- a/passport.py +++ b/passport.py @@ -3,7 +3,7 @@ # (c) 2018-9 by 4am # MIT-licensed -from passport import eddimage, wozimage, a2rimage +from passport import eddimage, wozardry, a2rimage from passport import DefaultLogger, DebugLogger from passport import Crack, Verify, Convert from passport.strings import STRINGS @@ -32,7 +32,7 @@ class BaseCommand: base, ext = os.path.splitext(args.file) ext = ext.lower() if ext == ".woz": - self.reader = wozimage.WozReader + self.reader = wozardry.WozReader elif ext == ".edd": self.reader = eddimage.EDDReader elif ext == ".a2r": diff --git a/passport/__init__.py b/passport/__init__.py index f507e2a..311a35a 100755 --- a/passport/__init__.py +++ b/passport/__init__.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from passport import wozimage +from passport import wozardry from passport.patchers import * from passport.strings import * from passport.util import * @@ -1342,13 +1342,13 @@ class Convert(BasePassportProcessor): b = track.bits[:51021] # output_tracks is indexed on physical track number here because the # point of .woz is to capture the physical layout of the original disk - self.output_tracks[physical_track_num] = wozimage.Track(b, len(b)) + self.output_tracks[physical_track_num] = wozardry.Track(b, len(b)) def postprocess(self): source_base, source_ext = os.path.splitext(self.g.disk_image.filename) output_filename = source_base + '.woz' self.g.logger.PrintByID("writing", {"filename":output_filename}) - woz_image = wozimage.WozWriter(STRINGS["header"].strip()) + woz_image = wozardry.WozWriter(STRINGS["header"].strip()) woz_image.info["cleaned"] = self.g.found_and_cleaned_weakbits woz_image.info["write_protected"] = self.g.protection_enforces_write_protected woz_image.meta["image_date"] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()) @@ -1359,7 +1359,7 @@ class Convert(BasePassportProcessor): with open(output_filename, 'wb') as f: woz_image.write(f) try: - wozimage.WozReader(output_filename) + wozardry.WozReader(output_filename) except Exception as e: os.remove(output_filename) raise Exception from e diff --git a/passport/a2rimage.py b/passport/a2rimage.py index 160e933..dc8203a 100755 --- a/passport/a2rimage.py +++ b/passport/a2rimage.py @@ -1,8 +1,10 @@ -from passport.wozimage import DiskImage, Track, WozError, raise_if +from passport.wozardry import Track, raise_if from passport import a2rchery import bitarray import collections +class A2RSeekError(a2rchery.A2RError): pass + class A2RImage: def __init__(self, filename=None, stream=None): self.filename = filename @@ -33,35 +35,18 @@ class A2RImage: flux_total = 0 return bits, estimated_track_length, speed - def find_track_length(self, bits, estimated_track_length): - twice_bits = bits + bits - for matchlen in (8192, 4096, 2048, 1024): - if estimated_track_length < 32768 or len(bits) < 32768: continue - for offset in range(0, estimated_track_length, matchlen): - for length_delta in (0, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 7, -7, 8, -8, 9, -9, 10, -10, 11, -11, 12, -12): - real_length = estimated_track_length + length_delta - if real_length > 53168: continue - if twice_bits[8+offset:offset+matchlen] == twice_bits[real_length+8+offset:real_length+matchlen+offset]: - return real_length - return 0 - - def normalize(self, flux_records): - bits_and_lengths = [self.to_bits(flux_record) for flux_record in flux_records] - all_bits = [bits[8:self.find_track_length(bits, estimated_track_length)+8] for bits, estimated_track_length, speed in bits_and_lengths] - return all_bits - def seek(self, track_num): if type(track_num) != float: track_num = float(track_num) if track_num < 0.0 or \ track_num > 35.0 or \ track_num.as_integer_ratio()[1] not in (1,2,4): - raise WozError("Invalid track %s" % track_num) + raise A2RSeekError("Invalid track %s" % track_num) location = int(track_num * 4) if not self.tracks.get(location): all_bits = bitarray.bitarray() for flux_record in self.a2r_image.flux.get(location, [{}]): bits, track_length, speed = self.to_bits(flux_record) all_bits.extend(bits) - self.tracks[location] = Track(all_bits, len(all_bits), speed=speed) + self.tracks[location] = Track(all_bits, len(all_bits)) return self.tracks[location] diff --git a/passport/eddimage.py b/passport/eddimage.py index bb63b54..846a20c 100644 --- a/passport/eddimage.py +++ b/passport/eddimage.py @@ -1,13 +1,17 @@ -from passport.wozimage import DiskImage, Track, WozError, raise_if +from passport.wozardry import Track, raise_if import bitarray -class EDDReader(DiskImage): +class EDDError(Exception): pass # base class +class EDDLengthError(EDDError): pass +class EDDSeekError(EDDError): pass + +class EDDReader: def __init__(self, filename=None, stream=None): DiskImage.__init__(self, filename, stream) with stream or open(filename, 'rb') as f: for i in range(137): raw_bytes = f.read(16384) - raise_if(len(raw_bytes) != 16384, WozError, "Bad EDD file (did you image by quarter tracks?)") + raise_if(len(raw_bytes) != 16384, EDDLengthError, "Bad EDD file (did you image by quarter tracks?)") bits = bitarray.bitarray(endian="big") bits.frombytes(raw_bytes) self.tracks.append(Track(bits, 131072)) @@ -18,7 +22,6 @@ class EDDReader(DiskImage): if track_num < 0.0 or \ track_num > 35.0 or \ track_num.as_integer_ratio()[1] not in (1,2,4): - raise WozError("Invalid track %s" % track_num) + raise EDDSeekError("Invalid track %s" % track_num) trk_id = int(track_num * 4) return self.tracks[trk_id] - diff --git a/passport/wozimage.py b/passport/old/wozimage.py similarity index 100% rename from passport/wozimage.py rename to passport/old/wozimage.py diff --git a/passport/wozardry.py b/passport/wozardry.py new file mode 100755 index 0000000..5b27a5f --- /dev/null +++ b/passport/wozardry.py @@ -0,0 +1,650 @@ +#!/usr/bin/env python3 + +# (c) 2018 by 4am +# MIT-licensed + +import argparse +import binascii +import bitarray # https://pypi.org/project/bitarray/ +import collections +import json +import itertools +import os + +__version__ = "1.1pre" +__date__ = "2018-10-24" +__progname__ = "wozardry" +__displayname__ = __progname__ + " " + __version__ + " by 4am (" + __date__ + ")" + +# domain-specific constants defined in .woz specification +kWOZ1 = b"WOZ1" +kINFO = b"INFO" +kTMAP = b"TMAP" +kTRKS = b"TRKS" +kMETA = b"META" +kBitstreamLengthInBytes = 6646 +kLanguages = ("English","Spanish","French","German","Chinese","Japanese","Italian","Dutch","Portuguese","Danish","Finnish","Norwegian","Swedish","Russian","Polish","Turkish","Arabic","Thai","Czech","Hungarian","Catalan","Croatian","Greek","Hebrew","Romanian","Slovak","Ukrainian","Indonesian","Malay","Vietnamese","Other") +kRequiresRAM = ("16K","24K","32K","48K","64K","128K","256K","512K","768K","1M","1.25M","1.5M+","Unknown") +kRequiresMachine = ("2","2+","2e","2c","2e+","2gs","2c+","3","3+") + +# strings and things, for print routines and error messages +sEOF = "Unexpected EOF" +sBadChunkSize = "Bad chunk size" +dNoYes = {False:"no",True:"yes"} +tQuarters = (".00",".25",".50",".75") + +# errors that may be raised +class WozError(Exception): pass # base class +class WozCRCError(WozError): pass +class WozFormatError(WozError): pass +class WozEOFError(WozFormatError): pass +class WozHeaderError(WozFormatError): pass +class WozHeaderError_NoWOZ1(WozHeaderError): pass +class WozHeaderError_NoFF(WozHeaderError): pass +class WozHeaderError_NoLF(WozHeaderError): pass +class WozINFOFormatError(WozFormatError): pass +class WozINFOFormatError_BadVersion(WozINFOFormatError): pass +class WozINFOFormatError_BadDiskType(WozINFOFormatError): pass +class WozINFOFormatError_BadWriteProtected(WozINFOFormatError): pass +class WozINFOFormatError_BadSynchronized(WozINFOFormatError): pass +class WozINFOFormatError_BadCleaned(WozINFOFormatError): pass +class WozINFOFormatError_BadCreator(WozINFOFormatError): pass +class WozTMAPFormatError(WozFormatError): pass +class WozTMAPFormatError_BadTRKS(WozTMAPFormatError): pass +class WozTRKSFormatError(WozFormatError): pass +class WozMETAFormatError(WozFormatError): pass +class WozMETAFormatError_DuplicateKey(WozFormatError): pass +class WozMETAFormatError_BadValue(WozFormatError): pass +class WozMETAFormatError_BadLanguage(WozFormatError): pass +class WozMETAFormatError_BadRAM(WozFormatError): pass +class WozMETAFormatError_BadMachine(WozFormatError): pass + +def from_uint32(b): + return int.from_bytes(b, byteorder="little") +from_uint16=from_uint32 + +def to_uint32(b): + return b.to_bytes(4, byteorder="little") + +def to_uint16(b): + return b.to_bytes(2, byteorder="little") + +def to_uint8(b): + return b.to_bytes(1, byteorder="little") + +def raise_if(cond, e, s=""): + if cond: raise e(s) + +class Track: + def __init__(self, bits, bit_count): + self.bits = bits + while len(self.bits) > bit_count: + self.bits.pop() + self.bit_count = bit_count + self.bit_index = 0 + self.revolutions = 0 + + def bit(self): + b = self.bits[self.bit_index] and 1 or 0 + self.bit_index += 1 + if self.bit_index >= self.bit_count: + self.bit_index = 0 + self.revolutions += 1 + yield b + + def nibble(self): + b = 0 + while b == 0: + b = next(self.bit()) + n = 0x80 + for bit_index in range(6, -1, -1): + b = next(self.bit()) + n += b << bit_index + yield n + + def rewind(self, bit_count): + self.bit_index -= 1 + if self.bit_index < 0: + self.bit_index = self.bit_count - 1 + self.revolutions -= 1 + + def find(self, sequence): + starting_revolutions = self.revolutions + seen = [0] * len(sequence) + while (self.revolutions < starting_revolutions + 2): + del seen[0] + seen.append(next(self.nibble())) + if tuple(seen) == tuple(sequence): return True + return False + +class WozTrack(Track): + def __init__(self, bits, bit_count, splice_point = 0xFFFF, splice_nibble = 0, splice_bit_count = 0): + Track.__init__(self, bits, bit_count) + self.splice_point = splice_point + self.splice_nibble = splice_nibble + self.splice_bit_count = splice_bit_count + +class WozDiskImage: + def __init__(self): + self.tmap = [0xFF]*160 + self.tracks = [] + self.info = collections.OrderedDict() + self.meta = collections.OrderedDict() + + def track_num_to_half_phase(self, track_num): + if type(track_num) != float: + track_num = float(track_num) + if track_num < 0.0 or \ + track_num > 40.0 or \ + track_num.as_integer_ratio()[1] not in (1,2,4): + raise WozError("Invalid track %s" % track_num) + return int(track_num * 4) + + def seek(self, track_num): + """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" + half_phase = self.track_num_to_half_phase(track_num) + trk_id = self.tmap[half_phase] + if trk_id == 0xFF: return None + return self.tracks[trk_id] + + def clean(self): + """removes tracks from self.tracks that are not referenced from self.tmap, and adjusts remaining self.tmap indices""" + i = 0 + while i < len(self.tracks): + if i not in self.tmap: + del self.tracks[i] + for adjust in range(len(self.tmap)): + if (self.tmap[adjust] >= i) and (self.tmap[adjust] != 0xFF): + self.tmap[adjust] -= 1 + else: + i += 1 + + def add(self, half_phase, track): + trk_id = len(self.tracks) + self.tracks.append(track) + self.tmap[half_phase] = trk_id + if half_phase: + self.tmap[half_phase - 1] = trk_id + if half_phase < 159: + self.tmap[half_phase + 1] = trk_id + + def add_track(self, track_num, track): + self.add(self.track_num_to_half_phase(track_num), track) + + def remove(self, half_phase): + if self.tmap[half_phase] == 0xFF: return False + self.tmap[half_phase] = 0xFF + self.clean() + return True + + def remove_track(self, track_num): + """removes given track, returns True if anything was actually removed, or False if track wasn't found. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" + return self.remove(self.track_num_to_half_phase(track_num)) + + def from_json(self, json_string): + j = json.loads(json_string) + root = [x for x in j.keys()].pop() + self.meta.update(j[root]["meta"]) + + def to_json(self): + j = {"woz": {"info":self.info, "meta":self.meta}} + return json.dumps(j, indent=2) + +class WozValidator: + def validate_info_version(self, version): + raise_if(version != b'\x01', WozINFOFormatError_BadVersion, "Unknown version (expected 1, found %s)" % version) + + def validate_info_disk_type(self, disk_type): + raise_if(disk_type not in (b'\x01',b'\x02'), WozINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %s)" % disk_type) + + def validate_info_write_protected(self, write_protected): + raise_if(write_protected not in (b'\x00',b'\x01'), WozINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %s)" % write_protected) + + def validate_info_synchronized(self, synchronized): + raise_if(synchronized not in (b'\x00',b'\x01'), WozINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %s)" % synchronized) + + def validate_info_cleaned(self, cleaned): + raise_if(cleaned not in (b'\x00',b'\x01'), WozINFOFormatError_BadCleaned, "Unknown cleaned flag (expected 0 or 1, found %s)" % cleaned) + + def validate_info_creator(self, creator_as_bytes): + raise_if(len(creator_as_bytes) > 32, WozINFOFormatError_BadCreator, "Creator is longer than 32 bytes") + try: + creator_as_bytes.decode("UTF-8") + except: + raise_if(True, WozINFOFormatError_BadCreator, "Creator is not valid UTF-8") + + def encode_info_creator(self, creator_as_string): + creator_as_bytes = creator_as_string.encode("UTF-8").ljust(32, b" ") + self.validate_info_creator(creator_as_bytes) + return creator_as_bytes + + def decode_info_creator(self, creator_as_bytes): + self.validate_info_creator(creator_as_bytes) + return creator_as_bytes.decode("UTF-8").strip() + + def validate_metadata(self, metadata_as_bytes): + try: + metadata = metadata_as_bytes.decode("UTF-8") + except: + raise WozMETAFormatError("Metadata is not valid UTF-8") + + def decode_metadata(self, metadata_as_bytes): + self.validate_metadata(metadata_as_bytes) + return metadata_as_bytes.decode("UTF-8") + + def validate_metadata_value(self, value): + raise_if("\t" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains tab character)") + raise_if("\n" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains linefeed character)") + raise_if("|" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains pipe character)") + + def validate_metadata_language(self, language): + raise_if(language and (language not in kLanguages), WozMETAFormatError_BadLanguage, "Invalid metadata language") + + def validate_metadata_requires_ram(self, requires_ram): + raise_if(requires_ram and (requires_ram not in kRequiresRAM), WozMETAFormatError_BadRAM, "Invalid metadata requires_ram") + + def validate_metadata_requires_machine(self, requires_machine): + raise_if(requires_machine and (requires_machine not in kRequiresMachine), WozMETAFormatError_BadMachine, "Invalid metadata requires_machine") + +class WozReader(WozDiskImage, WozValidator): + def __init__(self, filename=None, stream=None): + WozDiskImage.__init__(self) + self.filename = filename + with stream or open(filename, "rb") as f: + header_raw = f.read(8) + raise_if(len(header_raw) != 8, WozEOFError, sEOF) + self.__process_header(header_raw) + crc_raw = f.read(4) + raise_if(len(crc_raw) != 4, WozEOFError, sEOF) + crc = from_uint32(crc_raw) + all_data = [] + while True: + chunk_id = f.read(4) + if not chunk_id: break + raise_if(len(chunk_id) != 4, WozEOFError, sEOF) + all_data.append(chunk_id) + chunk_size_raw = f.read(4) + raise_if(len(chunk_size_raw) != 4, WozEOFError, sEOF) + all_data.append(chunk_size_raw) + chunk_size = from_uint32(chunk_size_raw) + data = f.read(chunk_size) + raise_if(len(data) != chunk_size, WozEOFError, sEOF) + all_data.append(data) + if chunk_id == kINFO: + raise_if(chunk_size != 60, WozINFOFormatError, sBadChunkSize) + self.__process_info(data) + elif chunk_id == kTMAP: + raise_if(chunk_size != 160, WozTMAPFormatError, sBadChunkSize) + self.__process_tmap(data) + elif chunk_id == kTRKS: + self.__process_trks(data) + elif chunk_id == kMETA: + self.__process_meta(data) + if crc: + raise_if(crc != binascii.crc32(b"".join(all_data)) & 0xffffffff, WozCRCError, "Bad CRC") + + def __process_header(self, data): + raise_if(data[:4] != kWOZ1, WozHeaderError_NoWOZ1, "Magic string 'WOZ1' not present at offset 0") + raise_if(data[4] != 0xFF, WozHeaderError_NoFF, "Magic byte 0xFF not present at offset 4") + raise_if(data[5:8] != b"\x0A\x0D\x0A", WozHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5") + + def __process_info(self, data): + version = data[0] + self.validate_info_version(to_uint8(version)) + disk_type = data[1] + self.validate_info_disk_type(to_uint8(disk_type)) + write_protected = data[2] + self.validate_info_write_protected(to_uint8(write_protected)) + synchronized = data[3] + self.validate_info_synchronized(to_uint8(synchronized)) + cleaned = data[4] + self.validate_info_cleaned(to_uint8(cleaned)) + creator = self.decode_info_creator(data[5:37]) + self.info["version"] = version # int + self.info["disk_type"] = disk_type # int + self.info["write_protected"] = (write_protected == 1) # boolean + self.info["synchronized"] = (synchronized == 1) # boolean + self.info["cleaned"] = (cleaned == 1) # boolean + self.info["creator"] = creator # string + + def __process_tmap(self, data): + self.tmap = list(data) + + def __process_trks(self, data): + i = 0 + while i < len(data): + raw_bytes = data[i:i+kBitstreamLengthInBytes] + raise_if(len(raw_bytes) != kBitstreamLengthInBytes, WozEOFError, sEOF) + i += kBitstreamLengthInBytes + bytes_used_raw = data[i:i+2] + raise_if(len(bytes_used_raw) != 2, WozEOFError, sEOF) + bytes_used = from_uint16(bytes_used_raw) + raise_if(bytes_used > kBitstreamLengthInBytes, WozTRKSFormatError, "TRKS chunk %d bytes_used is out of range" % len(self.tracks)) + i += 2 + bit_count_raw = data[i:i+2] + raise_if(len(bit_count_raw) != 2, WozEOFError, sEOF) + bit_count = from_uint16(bit_count_raw) + i += 2 + splice_point_raw = data[i:i+2] + raise_if(len(splice_point_raw) != 2, WozEOFError, sEOF) + splice_point = from_uint16(splice_point_raw) + if splice_point != 0xFFFF: + raise_if(splice_point > bit_count, WozTRKSFormatError, "TRKS chunk %d splice_point is out of range" % len(self.tracks)) + i += 2 + splice_nibble = data[i] + i += 1 + splice_bit_count = data[i] + if splice_point != 0xFFFF: + raise_if(splice_bit_count not in (8,9,10), WozTRKSFormatError, "TRKS chunk %d splice_bit_count is out of range" % len(self.tracks)) + i += 3 + bits = bitarray.bitarray(endian="big") + bits.frombytes(raw_bytes) + self.tracks.append(WozTrack(bits, bit_count, splice_point, splice_nibble, splice_bit_count)) + for trk, i in zip(self.tmap, itertools.count()): + raise_if(trk != 0xFF and trk >= len(self.tracks), WozTMAPFormatError_BadTRKS, "Invalid TMAP entry: track %d%s points to non-existent TRKS chunk %d" % (i/4, tQuarters[i%4], trk)) + + def __process_meta(self, metadata_as_bytes): + metadata = self.decode_metadata(metadata_as_bytes) + for line in metadata.split("\n"): + if not line: continue + columns_raw = line.split("\t") + raise_if(len(columns_raw) != 2, WozMETAFormatError, "Malformed metadata") + key, value_raw = columns_raw + raise_if(key in self.meta, WozMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key) + values = value_raw.split("|") + if key == "language": + list(map(self.validate_metadata_language, values)) + elif key == "requires_ram": + list(map(self.validate_metadata_requires_ram, values)) + elif key == "requires_machine": + list(map(self.validate_metadata_requires_machine, values)) + self.meta[key] = len(values) == 1 and values[0] or tuple(values) + +class WozWriter(WozDiskImage, WozValidator): + def __init__(self, creator): + WozDiskImage.__init__(self) + self.info["version"] = 1 + self.info["disk_type"] = 1 + self.info["write_protected"] = False + self.info["synchronized"] = False + self.info["cleaned"] = False + self.info["creator"] = creator + + def build_info(self): + chunk = bytearray() + chunk.extend(kINFO) # chunk ID + chunk.extend(to_uint32(60)) # chunk size (constant) + version_raw = to_uint8(self.info["version"]) + self.validate_info_version(version_raw) + disk_type_raw = to_uint8(self.info["disk_type"]) + self.validate_info_disk_type(disk_type_raw) + write_protected_raw = to_uint8(self.info["write_protected"]) + self.validate_info_write_protected(write_protected_raw) + synchronized_raw = to_uint8(self.info["synchronized"]) + self.validate_info_synchronized(synchronized_raw) + cleaned_raw = to_uint8(self.info["cleaned"]) + self.validate_info_cleaned(cleaned_raw) + creator_raw = self.encode_info_creator(self.info["creator"]) + chunk.extend(version_raw) # version (int, probably 1) + chunk.extend(disk_type_raw) # disk type (1=5.25 inch, 2=3.5 inch) + chunk.extend(write_protected_raw) # write-protected (0=no, 1=yes) + chunk.extend(synchronized_raw) # tracks synchronized (0=no, 1=yes) + chunk.extend(cleaned_raw) # weakbits cleaned (0=no, 1=yes) + chunk.extend(creator_raw) # creator + chunk.extend(b"\x00" * 23) # reserved + return chunk + + def build_tmap(self): + chunk = bytearray() + chunk.extend(kTMAP) # chunk ID + chunk.extend(to_uint32(160)) # chunk size + chunk.extend(bytes(self.tmap)) + return chunk + + def build_trks(self): + chunk = bytearray() + chunk.extend(kTRKS) # chunk ID + chunk_size = len(self.tracks)*6656 + chunk.extend(to_uint32(chunk_size)) # chunk size + for track in self.tracks: + raw_bytes = track.bits.tobytes() + chunk.extend(raw_bytes) # bitstream as raw bytes + chunk.extend(b"\x00" * (6646 - len(raw_bytes))) # padding to 6646 bytes + chunk.extend(to_uint16(len(raw_bytes))) # bytes used + chunk.extend(to_uint16(track.bit_count)) # bit count + chunk.extend(b"\xFF\xFF") # splice point (none) + chunk.extend(b"\xFF") # splice nibble (none) + chunk.extend(b"\xFF") # splice bit count (none) + chunk.extend(b"\x00\x00") # reserved + return chunk + + def build_meta(self): + if not self.meta: return b"" + meta_tmp = {} + for key, value_raw in self.meta.items(): + if type(value_raw) == str: + values = [value_raw] + else: + values = value_raw + meta_tmp[key] = values + list(map(self.validate_metadata_value, values)) + if key == "language": + list(map(self.validate_metadata_language, values)) + elif key == "requires_ram": + list(map(self.validate_metadata_requires_ram, values)) + elif key == "requires_machine": + list(map(self.validate_metadata_requires_machine, values)) + data = b"\x0A".join( + [k.encode("UTF-8") + \ + b"\x09" + \ + "|".join(v).encode("UTF-8") \ + for k, v in meta_tmp.items()]) + chunk = bytearray() + chunk.extend(kMETA) # chunk ID + chunk.extend(to_uint32(len(data))) # chunk size + chunk.extend(data) + return chunk + + def build_head(self, crc): + chunk = bytearray() + chunk.extend(kWOZ1) # magic bytes + chunk.extend(b"\xFF\x0A\x0D\x0A") # more magic bytes + chunk.extend(to_uint32(crc)) # CRC32 of rest of file (calculated in caller) + return chunk + + def write(self, stream): + info = self.build_info() + tmap = self.build_tmap() + trks = self.build_trks() + meta = self.build_meta() + crc = binascii.crc32(info + tmap + trks + meta) + head = self.build_head(crc) + stream.write(head) + stream.write(info) + stream.write(tmap) + stream.write(trks) + stream.write(meta) + +#---------- command line interface ---------- + +class BaseCommand: + def __init__(self, name): + self.name = name + + def setup(self, subparser, description=None, epilog=None, help=".woz disk image", formatter_class=argparse.HelpFormatter): + self.parser = subparser.add_parser(self.name, description=description, epilog=epilog, formatter_class=formatter_class) + self.parser.add_argument("file", help=help) + self.parser.set_defaults(action=self) + + def __call__(self, args): + self.woz_image = WozReader(args.file) + +class CommandVerify(BaseCommand): + def __init__(self): + BaseCommand.__init__(self, "verify") + + def setup(self, subparser): + BaseCommand.setup(self, subparser, + description="Verify file structure and metadata of a .woz disk image (produces no output unless a problem is found)") + +class CommandDump(BaseCommand): + kWidth = 30 + + def __init__(self): + BaseCommand.__init__(self, "dump") + + def setup(self, subparser): + BaseCommand.setup(self, subparser, + description="Print all available information and metadata in a .woz disk image") + + def __call__(self, args): + BaseCommand.__call__(self, args) + self.print_tmap() + self.print_meta() + self.print_info() + + def print_info(self): + print("INFO: File format version:".ljust(self.kWidth), "%d" % self.woz_image.info["version"]) + print("INFO: Disk type:".ljust(self.kWidth), ("5.25-inch", "3.5-inch")[self.woz_image.info["disk_type"]-1]) + print("INFO: Write protected:".ljust(self.kWidth), dNoYes[self.woz_image.info["write_protected"]]) + print("INFO: Track synchronized:".ljust(self.kWidth), dNoYes[self.woz_image.info["synchronized"]]) + print("INFO: Weakbits cleaned:".ljust(self.kWidth), dNoYes[self.woz_image.info["cleaned"]]) + print("INFO: Creator:".ljust(self.kWidth), self.woz_image.info["creator"]) + + def print_tmap(self): + i = 0 + for trk, i in zip(self.woz_image.tmap, itertools.count()): + if trk != 0xFF: + print(("TMAP: Track %d%s" % (i/4, tQuarters[i%4])).ljust(self.kWidth), "TRKS %d" % (trk)) + + def print_meta(self): + if not self.woz_image.meta: return + for key, values in self.woz_image.meta.items(): + if type(values) == str: + values = [values] + print(("META: " + key + ":").ljust(self.kWidth), values[0]) + for value in values[1:]: + print("META: ".ljust(self.kWidth), value) + +class CommandExport(BaseCommand): + def __init__(self): + BaseCommand.__init__(self, "export") + + def setup(self, subparser): + BaseCommand.setup(self, subparser, + description="Export (as JSON) all information and metadata from a .woz disk image") + + def __call__(self, args): + BaseCommand.__call__(self, args) + print(self.woz_image.to_json()) + +class WriterBaseCommand(BaseCommand): + def __call__(self, args): + BaseCommand.__call__(self, args) + self.args = args + # maintain creator if there is one, otherwise use default + self.output = WozWriter(self.woz_image.info.get("creator", __displayname__)) + self.output.tmap = self.woz_image.tmap + self.output.tracks = self.woz_image.tracks + self.output.info = self.woz_image.info.copy() + self.output.meta = self.woz_image.meta.copy() + self.update() + tmpfile = args.file + ".ardry" + with open(tmpfile, "wb") as f: + self.output.write(f) + os.rename(tmpfile, args.file) + +class CommandEdit(WriterBaseCommand): + def __init__(self): + WriterBaseCommand.__init__(self, "edit") + + def setup(self, subparser): + WriterBaseCommand.setup(self, + subparser, + description="Edit information and metadata in a .woz disk image", + epilog="""Tips: + + - Use repeated flags to edit multiple fields at once. + - Use "key:" with no value to delete a metadata field. + - Keys are case-sensitive. + - Some values have format restrictions; read the .woz specification.""", + help=".woz disk image (modified in place)", + formatter_class=argparse.RawDescriptionHelpFormatter) + self.parser.add_argument("-i", "--info", type=str, action="append", + help="""change information field. +INFO format is "key:value". +Acceptable keys are disk_type, write_protected, synchronized, cleaned, creator, version. +Other keys are ignored. +For boolean fields, use "1" or "true" or "yes" for true, "0" or "false" or "no" for false.""") + self.parser.add_argument("-m", "--meta", type=str, action="append", + help="""change metadata field. +META format is "key:value". +Standard keys are title, subtitle, publisher, developer, copyright, version, language, requires_ram, +requires_machine, notes, side, side_name, contributor, image_date. Other keys are allowed.""") + + def update(self): + # add all new info fields + for i in self.args.info or (): + k, v = i.split(":", 1) + if k in ("write_protected","synchronized","cleaned"): + v = v.lower() in ("1", "true", "yes") + self.output.info[k] = v + # add all new metadata fields, and delete empty ones + for m in self.args.meta or (): + k, v = m.split(":", 1) + v = v.split("|") + if len(v) == 1: + v = v[0] + if v: + self.output.meta[k] = v + elif k in self.output.meta.keys(): + del self.output.meta[k] + +class CommandRemove(WriterBaseCommand): + def __init__(self): + WriterBaseCommand.__init__(self, "remove") + + def setup(self, subparser): + WriterBaseCommand.setup(self, + subparser, + description="Remove tracks from a .woz disk image", + epilog="""Tips: + + - Tracks can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.) + - Use repeated flags to remove multiple tracks at once. + - It is harmless to try to remove a track that doesn't exist.""", + help=".woz disk image (modified in place)", + formatter_class=argparse.RawDescriptionHelpFormatter) + self.parser.add_argument("-t", "--track", type=str, action="append", + help="""track to remove""") + + def update(self): + for i in self.args.track or (): + self.output.remove_track(float(i)) + +class CommandImport(WriterBaseCommand): + def __init__(self): + WriterBaseCommand.__init__(self, "import") + + def setup(self, subparser): + WriterBaseCommand.setup(self, subparser, + description="Import JSON file to update metadata in a .woz disk image") + + def update(self): + self.output.from_json(sys.stdin.read()) + +if __name__ == "__main__": + import sys + raise_if = lambda cond, e, s="": cond and sys.exit("%s: %s" % (e.__name__, s)) + cmds = [CommandDump(), CommandVerify(), CommandEdit(), CommandRemove(), CommandExport(), CommandImport()] + parser = argparse.ArgumentParser(prog=__progname__, + description="""A multi-purpose tool for manipulating .woz disk images. + +See '""" + __progname__ + """ -h' for help on individual commands.""", + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("-v", "--version", action="version", version=__displayname__) + sp = parser.add_subparsers(dest="command", help="command") + for command in cmds: + command.setup(sp) + args = parser.parse_args() + args.action(args)