update to latest wozardry

This commit is contained in:
4am 2019-01-29 19:31:10 -05:00
parent e615883ef7
commit 046902f1e3
6 changed files with 669 additions and 31 deletions

View File

@ -3,7 +3,7 @@
# (c) 2018-9 by 4am # (c) 2018-9 by 4am
# MIT-licensed # MIT-licensed
from passport import eddimage, wozimage, a2rimage from passport import eddimage, wozardry, a2rimage
from passport import DefaultLogger, DebugLogger from passport import DefaultLogger, DebugLogger
from passport import Crack, Verify, Convert from passport import Crack, Verify, Convert
from passport.strings import STRINGS from passport.strings import STRINGS
@ -32,7 +32,7 @@ class BaseCommand:
base, ext = os.path.splitext(args.file) base, ext = os.path.splitext(args.file)
ext = ext.lower() ext = ext.lower()
if ext == ".woz": if ext == ".woz":
self.reader = wozimage.WozReader self.reader = wozardry.WozReader
elif ext == ".edd": elif ext == ".edd":
self.reader = eddimage.EDDReader self.reader = eddimage.EDDReader
elif ext == ".a2r": elif ext == ".a2r":

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from passport import wozimage from passport import wozardry
from passport.patchers import * from passport.patchers import *
from passport.strings import * from passport.strings import *
from passport.util import * from passport.util import *
@ -1342,13 +1342,13 @@ class Convert(BasePassportProcessor):
b = track.bits[:51021] b = track.bits[:51021]
# output_tracks is indexed on physical track number here because the # output_tracks is indexed on physical track number here because the
# point of .woz is to capture the physical layout of the original disk # point of .woz is to capture the physical layout of the original disk
self.output_tracks[physical_track_num] = wozimage.Track(b, len(b)) self.output_tracks[physical_track_num] = wozardry.Track(b, len(b))
def postprocess(self): def postprocess(self):
source_base, source_ext = os.path.splitext(self.g.disk_image.filename) source_base, source_ext = os.path.splitext(self.g.disk_image.filename)
output_filename = source_base + '.woz' output_filename = source_base + '.woz'
self.g.logger.PrintByID("writing", {"filename":output_filename}) self.g.logger.PrintByID("writing", {"filename":output_filename})
woz_image = wozimage.WozWriter(STRINGS["header"].strip()) woz_image = wozardry.WozWriter(STRINGS["header"].strip())
woz_image.info["cleaned"] = self.g.found_and_cleaned_weakbits woz_image.info["cleaned"] = self.g.found_and_cleaned_weakbits
woz_image.info["write_protected"] = self.g.protection_enforces_write_protected woz_image.info["write_protected"] = self.g.protection_enforces_write_protected
woz_image.meta["image_date"] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()) woz_image.meta["image_date"] = time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime())
@ -1359,7 +1359,7 @@ class Convert(BasePassportProcessor):
with open(output_filename, 'wb') as f: with open(output_filename, 'wb') as f:
woz_image.write(f) woz_image.write(f)
try: try:
wozimage.WozReader(output_filename) wozardry.WozReader(output_filename)
except Exception as e: except Exception as e:
os.remove(output_filename) os.remove(output_filename)
raise Exception from e raise Exception from e

View File

@ -1,8 +1,10 @@
from passport.wozimage import DiskImage, Track, WozError, raise_if from passport.wozardry import Track, raise_if
from passport import a2rchery from passport import a2rchery
import bitarray import bitarray
import collections import collections
class A2RSeekError(a2rchery.A2RError): pass
class A2RImage: class A2RImage:
def __init__(self, filename=None, stream=None): def __init__(self, filename=None, stream=None):
self.filename = filename self.filename = filename
@ -33,35 +35,18 @@ class A2RImage:
flux_total = 0 flux_total = 0
return bits, estimated_track_length, speed return bits, estimated_track_length, speed
def find_track_length(self, bits, estimated_track_length):
twice_bits = bits + bits
for matchlen in (8192, 4096, 2048, 1024):
if estimated_track_length < 32768 or len(bits) < 32768: continue
for offset in range(0, estimated_track_length, matchlen):
for length_delta in (0, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 7, -7, 8, -8, 9, -9, 10, -10, 11, -11, 12, -12):
real_length = estimated_track_length + length_delta
if real_length > 53168: continue
if twice_bits[8+offset:offset+matchlen] == twice_bits[real_length+8+offset:real_length+matchlen+offset]:
return real_length
return 0
def normalize(self, flux_records):
bits_and_lengths = [self.to_bits(flux_record) for flux_record in flux_records]
all_bits = [bits[8:self.find_track_length(bits, estimated_track_length)+8] for bits, estimated_track_length, speed in bits_and_lengths]
return all_bits
def seek(self, track_num): def seek(self, track_num):
if type(track_num) != float: if type(track_num) != float:
track_num = float(track_num) track_num = float(track_num)
if track_num < 0.0 or \ if track_num < 0.0 or \
track_num > 35.0 or \ track_num > 35.0 or \
track_num.as_integer_ratio()[1] not in (1,2,4): track_num.as_integer_ratio()[1] not in (1,2,4):
raise WozError("Invalid track %s" % track_num) raise A2RSeekError("Invalid track %s" % track_num)
location = int(track_num * 4) location = int(track_num * 4)
if not self.tracks.get(location): if not self.tracks.get(location):
all_bits = bitarray.bitarray() all_bits = bitarray.bitarray()
for flux_record in self.a2r_image.flux.get(location, [{}]): for flux_record in self.a2r_image.flux.get(location, [{}]):
bits, track_length, speed = self.to_bits(flux_record) bits, track_length, speed = self.to_bits(flux_record)
all_bits.extend(bits) all_bits.extend(bits)
self.tracks[location] = Track(all_bits, len(all_bits), speed=speed) self.tracks[location] = Track(all_bits, len(all_bits))
return self.tracks[location] return self.tracks[location]

View File

@ -1,13 +1,17 @@
from passport.wozimage import DiskImage, Track, WozError, raise_if from passport.wozardry import Track, raise_if
import bitarray import bitarray
class EDDReader(DiskImage): class EDDError(Exception): pass # base class
class EDDLengthError(EDDError): pass
class EDDSeekError(EDDError): pass
class EDDReader:
def __init__(self, filename=None, stream=None): def __init__(self, filename=None, stream=None):
DiskImage.__init__(self, filename, stream) DiskImage.__init__(self, filename, stream)
with stream or open(filename, 'rb') as f: with stream or open(filename, 'rb') as f:
for i in range(137): for i in range(137):
raw_bytes = f.read(16384) raw_bytes = f.read(16384)
raise_if(len(raw_bytes) != 16384, WozError, "Bad EDD file (did you image by quarter tracks?)") raise_if(len(raw_bytes) != 16384, EDDLengthError, "Bad EDD file (did you image by quarter tracks?)")
bits = bitarray.bitarray(endian="big") bits = bitarray.bitarray(endian="big")
bits.frombytes(raw_bytes) bits.frombytes(raw_bytes)
self.tracks.append(Track(bits, 131072)) self.tracks.append(Track(bits, 131072))
@ -18,7 +22,6 @@ class EDDReader(DiskImage):
if track_num < 0.0 or \ if track_num < 0.0 or \
track_num > 35.0 or \ track_num > 35.0 or \
track_num.as_integer_ratio()[1] not in (1,2,4): track_num.as_integer_ratio()[1] not in (1,2,4):
raise WozError("Invalid track %s" % track_num) raise EDDSeekError("Invalid track %s" % track_num)
trk_id = int(track_num * 4) trk_id = int(track_num * 4)
return self.tracks[trk_id] return self.tracks[trk_id]

650
passport/wozardry.py Executable file
View File

@ -0,0 +1,650 @@
#!/usr/bin/env python3
# (c) 2018 by 4am
# MIT-licensed
import argparse
import binascii
import bitarray # https://pypi.org/project/bitarray/
import collections
import json
import itertools
import os
__version__ = "1.1pre"
__date__ = "2018-10-24"
__progname__ = "wozardry"
__displayname__ = __progname__ + " " + __version__ + " by 4am (" + __date__ + ")"
# domain-specific constants defined in .woz specification
kWOZ1 = b"WOZ1"
kINFO = b"INFO"
kTMAP = b"TMAP"
kTRKS = b"TRKS"
kMETA = b"META"
kBitstreamLengthInBytes = 6646
kLanguages = ("English","Spanish","French","German","Chinese","Japanese","Italian","Dutch","Portuguese","Danish","Finnish","Norwegian","Swedish","Russian","Polish","Turkish","Arabic","Thai","Czech","Hungarian","Catalan","Croatian","Greek","Hebrew","Romanian","Slovak","Ukrainian","Indonesian","Malay","Vietnamese","Other")
kRequiresRAM = ("16K","24K","32K","48K","64K","128K","256K","512K","768K","1M","1.25M","1.5M+","Unknown")
kRequiresMachine = ("2","2+","2e","2c","2e+","2gs","2c+","3","3+")
# strings and things, for print routines and error messages
sEOF = "Unexpected EOF"
sBadChunkSize = "Bad chunk size"
dNoYes = {False:"no",True:"yes"}
tQuarters = (".00",".25",".50",".75")
# errors that may be raised
class WozError(Exception): pass # base class
class WozCRCError(WozError): pass
class WozFormatError(WozError): pass
class WozEOFError(WozFormatError): pass
class WozHeaderError(WozFormatError): pass
class WozHeaderError_NoWOZ1(WozHeaderError): pass
class WozHeaderError_NoFF(WozHeaderError): pass
class WozHeaderError_NoLF(WozHeaderError): pass
class WozINFOFormatError(WozFormatError): pass
class WozINFOFormatError_BadVersion(WozINFOFormatError): pass
class WozINFOFormatError_BadDiskType(WozINFOFormatError): pass
class WozINFOFormatError_BadWriteProtected(WozINFOFormatError): pass
class WozINFOFormatError_BadSynchronized(WozINFOFormatError): pass
class WozINFOFormatError_BadCleaned(WozINFOFormatError): pass
class WozINFOFormatError_BadCreator(WozINFOFormatError): pass
class WozTMAPFormatError(WozFormatError): pass
class WozTMAPFormatError_BadTRKS(WozTMAPFormatError): pass
class WozTRKSFormatError(WozFormatError): pass
class WozMETAFormatError(WozFormatError): pass
class WozMETAFormatError_DuplicateKey(WozFormatError): pass
class WozMETAFormatError_BadValue(WozFormatError): pass
class WozMETAFormatError_BadLanguage(WozFormatError): pass
class WozMETAFormatError_BadRAM(WozFormatError): pass
class WozMETAFormatError_BadMachine(WozFormatError): pass
def from_uint32(b):
return int.from_bytes(b, byteorder="little")
from_uint16=from_uint32
def to_uint32(b):
return b.to_bytes(4, byteorder="little")
def to_uint16(b):
return b.to_bytes(2, byteorder="little")
def to_uint8(b):
return b.to_bytes(1, byteorder="little")
def raise_if(cond, e, s=""):
if cond: raise e(s)
class Track:
def __init__(self, bits, bit_count):
self.bits = bits
while len(self.bits) > bit_count:
self.bits.pop()
self.bit_count = bit_count
self.bit_index = 0
self.revolutions = 0
def bit(self):
b = self.bits[self.bit_index] and 1 or 0
self.bit_index += 1
if self.bit_index >= self.bit_count:
self.bit_index = 0
self.revolutions += 1
yield b
def nibble(self):
b = 0
while b == 0:
b = next(self.bit())
n = 0x80
for bit_index in range(6, -1, -1):
b = next(self.bit())
n += b << bit_index
yield n
def rewind(self, bit_count):
self.bit_index -= 1
if self.bit_index < 0:
self.bit_index = self.bit_count - 1
self.revolutions -= 1
def find(self, sequence):
starting_revolutions = self.revolutions
seen = [0] * len(sequence)
while (self.revolutions < starting_revolutions + 2):
del seen[0]
seen.append(next(self.nibble()))
if tuple(seen) == tuple(sequence): return True
return False
class WozTrack(Track):
def __init__(self, bits, bit_count, splice_point = 0xFFFF, splice_nibble = 0, splice_bit_count = 0):
Track.__init__(self, bits, bit_count)
self.splice_point = splice_point
self.splice_nibble = splice_nibble
self.splice_bit_count = splice_bit_count
class WozDiskImage:
def __init__(self):
self.tmap = [0xFF]*160
self.tracks = []
self.info = collections.OrderedDict()
self.meta = collections.OrderedDict()
def track_num_to_half_phase(self, track_num):
if type(track_num) != float:
track_num = float(track_num)
if track_num < 0.0 or \
track_num > 40.0 or \
track_num.as_integer_ratio()[1] not in (1,2,4):
raise WozError("Invalid track %s" % track_num)
return int(track_num * 4)
def seek(self, track_num):
"""returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)"""
half_phase = self.track_num_to_half_phase(track_num)
trk_id = self.tmap[half_phase]
if trk_id == 0xFF: return None
return self.tracks[trk_id]
def clean(self):
"""removes tracks from self.tracks that are not referenced from self.tmap, and adjusts remaining self.tmap indices"""
i = 0
while i < len(self.tracks):
if i not in self.tmap:
del self.tracks[i]
for adjust in range(len(self.tmap)):
if (self.tmap[adjust] >= i) and (self.tmap[adjust] != 0xFF):
self.tmap[adjust] -= 1
else:
i += 1
def add(self, half_phase, track):
trk_id = len(self.tracks)
self.tracks.append(track)
self.tmap[half_phase] = trk_id
if half_phase:
self.tmap[half_phase - 1] = trk_id
if half_phase < 159:
self.tmap[half_phase + 1] = trk_id
def add_track(self, track_num, track):
self.add(self.track_num_to_half_phase(track_num), track)
def remove(self, half_phase):
if self.tmap[half_phase] == 0xFF: return False
self.tmap[half_phase] = 0xFF
self.clean()
return True
def remove_track(self, track_num):
"""removes given track, returns True if anything was actually removed, or False if track wasn't found. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)"""
return self.remove(self.track_num_to_half_phase(track_num))
def from_json(self, json_string):
j = json.loads(json_string)
root = [x for x in j.keys()].pop()
self.meta.update(j[root]["meta"])
def to_json(self):
j = {"woz": {"info":self.info, "meta":self.meta}}
return json.dumps(j, indent=2)
class WozValidator:
def validate_info_version(self, version):
raise_if(version != b'\x01', WozINFOFormatError_BadVersion, "Unknown version (expected 1, found %s)" % version)
def validate_info_disk_type(self, disk_type):
raise_if(disk_type not in (b'\x01',b'\x02'), WozINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %s)" % disk_type)
def validate_info_write_protected(self, write_protected):
raise_if(write_protected not in (b'\x00',b'\x01'), WozINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %s)" % write_protected)
def validate_info_synchronized(self, synchronized):
raise_if(synchronized not in (b'\x00',b'\x01'), WozINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %s)" % synchronized)
def validate_info_cleaned(self, cleaned):
raise_if(cleaned not in (b'\x00',b'\x01'), WozINFOFormatError_BadCleaned, "Unknown cleaned flag (expected 0 or 1, found %s)" % cleaned)
def validate_info_creator(self, creator_as_bytes):
raise_if(len(creator_as_bytes) > 32, WozINFOFormatError_BadCreator, "Creator is longer than 32 bytes")
try:
creator_as_bytes.decode("UTF-8")
except:
raise_if(True, WozINFOFormatError_BadCreator, "Creator is not valid UTF-8")
def encode_info_creator(self, creator_as_string):
creator_as_bytes = creator_as_string.encode("UTF-8").ljust(32, b" ")
self.validate_info_creator(creator_as_bytes)
return creator_as_bytes
def decode_info_creator(self, creator_as_bytes):
self.validate_info_creator(creator_as_bytes)
return creator_as_bytes.decode("UTF-8").strip()
def validate_metadata(self, metadata_as_bytes):
try:
metadata = metadata_as_bytes.decode("UTF-8")
except:
raise WozMETAFormatError("Metadata is not valid UTF-8")
def decode_metadata(self, metadata_as_bytes):
self.validate_metadata(metadata_as_bytes)
return metadata_as_bytes.decode("UTF-8")
def validate_metadata_value(self, value):
raise_if("\t" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains tab character)")
raise_if("\n" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains linefeed character)")
raise_if("|" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains pipe character)")
def validate_metadata_language(self, language):
raise_if(language and (language not in kLanguages), WozMETAFormatError_BadLanguage, "Invalid metadata language")
def validate_metadata_requires_ram(self, requires_ram):
raise_if(requires_ram and (requires_ram not in kRequiresRAM), WozMETAFormatError_BadRAM, "Invalid metadata requires_ram")
def validate_metadata_requires_machine(self, requires_machine):
raise_if(requires_machine and (requires_machine not in kRequiresMachine), WozMETAFormatError_BadMachine, "Invalid metadata requires_machine")
class WozReader(WozDiskImage, WozValidator):
def __init__(self, filename=None, stream=None):
WozDiskImage.__init__(self)
self.filename = filename
with stream or open(filename, "rb") as f:
header_raw = f.read(8)
raise_if(len(header_raw) != 8, WozEOFError, sEOF)
self.__process_header(header_raw)
crc_raw = f.read(4)
raise_if(len(crc_raw) != 4, WozEOFError, sEOF)
crc = from_uint32(crc_raw)
all_data = []
while True:
chunk_id = f.read(4)
if not chunk_id: break
raise_if(len(chunk_id) != 4, WozEOFError, sEOF)
all_data.append(chunk_id)
chunk_size_raw = f.read(4)
raise_if(len(chunk_size_raw) != 4, WozEOFError, sEOF)
all_data.append(chunk_size_raw)
chunk_size = from_uint32(chunk_size_raw)
data = f.read(chunk_size)
raise_if(len(data) != chunk_size, WozEOFError, sEOF)
all_data.append(data)
if chunk_id == kINFO:
raise_if(chunk_size != 60, WozINFOFormatError, sBadChunkSize)
self.__process_info(data)
elif chunk_id == kTMAP:
raise_if(chunk_size != 160, WozTMAPFormatError, sBadChunkSize)
self.__process_tmap(data)
elif chunk_id == kTRKS:
self.__process_trks(data)
elif chunk_id == kMETA:
self.__process_meta(data)
if crc:
raise_if(crc != binascii.crc32(b"".join(all_data)) & 0xffffffff, WozCRCError, "Bad CRC")
def __process_header(self, data):
raise_if(data[:4] != kWOZ1, WozHeaderError_NoWOZ1, "Magic string 'WOZ1' not present at offset 0")
raise_if(data[4] != 0xFF, WozHeaderError_NoFF, "Magic byte 0xFF not present at offset 4")
raise_if(data[5:8] != b"\x0A\x0D\x0A", WozHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5")
def __process_info(self, data):
version = data[0]
self.validate_info_version(to_uint8(version))
disk_type = data[1]
self.validate_info_disk_type(to_uint8(disk_type))
write_protected = data[2]
self.validate_info_write_protected(to_uint8(write_protected))
synchronized = data[3]
self.validate_info_synchronized(to_uint8(synchronized))
cleaned = data[4]
self.validate_info_cleaned(to_uint8(cleaned))
creator = self.decode_info_creator(data[5:37])
self.info["version"] = version # int
self.info["disk_type"] = disk_type # int
self.info["write_protected"] = (write_protected == 1) # boolean
self.info["synchronized"] = (synchronized == 1) # boolean
self.info["cleaned"] = (cleaned == 1) # boolean
self.info["creator"] = creator # string
def __process_tmap(self, data):
self.tmap = list(data)
def __process_trks(self, data):
i = 0
while i < len(data):
raw_bytes = data[i:i+kBitstreamLengthInBytes]
raise_if(len(raw_bytes) != kBitstreamLengthInBytes, WozEOFError, sEOF)
i += kBitstreamLengthInBytes
bytes_used_raw = data[i:i+2]
raise_if(len(bytes_used_raw) != 2, WozEOFError, sEOF)
bytes_used = from_uint16(bytes_used_raw)
raise_if(bytes_used > kBitstreamLengthInBytes, WozTRKSFormatError, "TRKS chunk %d bytes_used is out of range" % len(self.tracks))
i += 2
bit_count_raw = data[i:i+2]
raise_if(len(bit_count_raw) != 2, WozEOFError, sEOF)
bit_count = from_uint16(bit_count_raw)
i += 2
splice_point_raw = data[i:i+2]
raise_if(len(splice_point_raw) != 2, WozEOFError, sEOF)
splice_point = from_uint16(splice_point_raw)
if splice_point != 0xFFFF:
raise_if(splice_point > bit_count, WozTRKSFormatError, "TRKS chunk %d splice_point is out of range" % len(self.tracks))
i += 2
splice_nibble = data[i]
i += 1
splice_bit_count = data[i]
if splice_point != 0xFFFF:
raise_if(splice_bit_count not in (8,9,10), WozTRKSFormatError, "TRKS chunk %d splice_bit_count is out of range" % len(self.tracks))
i += 3
bits = bitarray.bitarray(endian="big")
bits.frombytes(raw_bytes)
self.tracks.append(WozTrack(bits, bit_count, splice_point, splice_nibble, splice_bit_count))
for trk, i in zip(self.tmap, itertools.count()):
raise_if(trk != 0xFF and trk >= len(self.tracks), WozTMAPFormatError_BadTRKS, "Invalid TMAP entry: track %d%s points to non-existent TRKS chunk %d" % (i/4, tQuarters[i%4], trk))
def __process_meta(self, metadata_as_bytes):
metadata = self.decode_metadata(metadata_as_bytes)
for line in metadata.split("\n"):
if not line: continue
columns_raw = line.split("\t")
raise_if(len(columns_raw) != 2, WozMETAFormatError, "Malformed metadata")
key, value_raw = columns_raw
raise_if(key in self.meta, WozMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key)
values = value_raw.split("|")
if key == "language":
list(map(self.validate_metadata_language, values))
elif key == "requires_ram":
list(map(self.validate_metadata_requires_ram, values))
elif key == "requires_machine":
list(map(self.validate_metadata_requires_machine, values))
self.meta[key] = len(values) == 1 and values[0] or tuple(values)
class WozWriter(WozDiskImage, WozValidator):
def __init__(self, creator):
WozDiskImage.__init__(self)
self.info["version"] = 1
self.info["disk_type"] = 1
self.info["write_protected"] = False
self.info["synchronized"] = False
self.info["cleaned"] = False
self.info["creator"] = creator
def build_info(self):
chunk = bytearray()
chunk.extend(kINFO) # chunk ID
chunk.extend(to_uint32(60)) # chunk size (constant)
version_raw = to_uint8(self.info["version"])
self.validate_info_version(version_raw)
disk_type_raw = to_uint8(self.info["disk_type"])
self.validate_info_disk_type(disk_type_raw)
write_protected_raw = to_uint8(self.info["write_protected"])
self.validate_info_write_protected(write_protected_raw)
synchronized_raw = to_uint8(self.info["synchronized"])
self.validate_info_synchronized(synchronized_raw)
cleaned_raw = to_uint8(self.info["cleaned"])
self.validate_info_cleaned(cleaned_raw)
creator_raw = self.encode_info_creator(self.info["creator"])
chunk.extend(version_raw) # version (int, probably 1)
chunk.extend(disk_type_raw) # disk type (1=5.25 inch, 2=3.5 inch)
chunk.extend(write_protected_raw) # write-protected (0=no, 1=yes)
chunk.extend(synchronized_raw) # tracks synchronized (0=no, 1=yes)
chunk.extend(cleaned_raw) # weakbits cleaned (0=no, 1=yes)
chunk.extend(creator_raw) # creator
chunk.extend(b"\x00" * 23) # reserved
return chunk
def build_tmap(self):
chunk = bytearray()
chunk.extend(kTMAP) # chunk ID
chunk.extend(to_uint32(160)) # chunk size
chunk.extend(bytes(self.tmap))
return chunk
def build_trks(self):
chunk = bytearray()
chunk.extend(kTRKS) # chunk ID
chunk_size = len(self.tracks)*6656
chunk.extend(to_uint32(chunk_size)) # chunk size
for track in self.tracks:
raw_bytes = track.bits.tobytes()
chunk.extend(raw_bytes) # bitstream as raw bytes
chunk.extend(b"\x00" * (6646 - len(raw_bytes))) # padding to 6646 bytes
chunk.extend(to_uint16(len(raw_bytes))) # bytes used
chunk.extend(to_uint16(track.bit_count)) # bit count
chunk.extend(b"\xFF\xFF") # splice point (none)
chunk.extend(b"\xFF") # splice nibble (none)
chunk.extend(b"\xFF") # splice bit count (none)
chunk.extend(b"\x00\x00") # reserved
return chunk
def build_meta(self):
if not self.meta: return b""
meta_tmp = {}
for key, value_raw in self.meta.items():
if type(value_raw) == str:
values = [value_raw]
else:
values = value_raw
meta_tmp[key] = values
list(map(self.validate_metadata_value, values))
if key == "language":
list(map(self.validate_metadata_language, values))
elif key == "requires_ram":
list(map(self.validate_metadata_requires_ram, values))
elif key == "requires_machine":
list(map(self.validate_metadata_requires_machine, values))
data = b"\x0A".join(
[k.encode("UTF-8") + \
b"\x09" + \
"|".join(v).encode("UTF-8") \
for k, v in meta_tmp.items()])
chunk = bytearray()
chunk.extend(kMETA) # chunk ID
chunk.extend(to_uint32(len(data))) # chunk size
chunk.extend(data)
return chunk
def build_head(self, crc):
chunk = bytearray()
chunk.extend(kWOZ1) # magic bytes
chunk.extend(b"\xFF\x0A\x0D\x0A") # more magic bytes
chunk.extend(to_uint32(crc)) # CRC32 of rest of file (calculated in caller)
return chunk
def write(self, stream):
info = self.build_info()
tmap = self.build_tmap()
trks = self.build_trks()
meta = self.build_meta()
crc = binascii.crc32(info + tmap + trks + meta)
head = self.build_head(crc)
stream.write(head)
stream.write(info)
stream.write(tmap)
stream.write(trks)
stream.write(meta)
#---------- command line interface ----------
class BaseCommand:
def __init__(self, name):
self.name = name
def setup(self, subparser, description=None, epilog=None, help=".woz disk image", formatter_class=argparse.HelpFormatter):
self.parser = subparser.add_parser(self.name, description=description, epilog=epilog, formatter_class=formatter_class)
self.parser.add_argument("file", help=help)
self.parser.set_defaults(action=self)
def __call__(self, args):
self.woz_image = WozReader(args.file)
class CommandVerify(BaseCommand):
def __init__(self):
BaseCommand.__init__(self, "verify")
def setup(self, subparser):
BaseCommand.setup(self, subparser,
description="Verify file structure and metadata of a .woz disk image (produces no output unless a problem is found)")
class CommandDump(BaseCommand):
kWidth = 30
def __init__(self):
BaseCommand.__init__(self, "dump")
def setup(self, subparser):
BaseCommand.setup(self, subparser,
description="Print all available information and metadata in a .woz disk image")
def __call__(self, args):
BaseCommand.__call__(self, args)
self.print_tmap()
self.print_meta()
self.print_info()
def print_info(self):
print("INFO: File format version:".ljust(self.kWidth), "%d" % self.woz_image.info["version"])
print("INFO: Disk type:".ljust(self.kWidth), ("5.25-inch", "3.5-inch")[self.woz_image.info["disk_type"]-1])
print("INFO: Write protected:".ljust(self.kWidth), dNoYes[self.woz_image.info["write_protected"]])
print("INFO: Track synchronized:".ljust(self.kWidth), dNoYes[self.woz_image.info["synchronized"]])
print("INFO: Weakbits cleaned:".ljust(self.kWidth), dNoYes[self.woz_image.info["cleaned"]])
print("INFO: Creator:".ljust(self.kWidth), self.woz_image.info["creator"])
def print_tmap(self):
i = 0
for trk, i in zip(self.woz_image.tmap, itertools.count()):
if trk != 0xFF:
print(("TMAP: Track %d%s" % (i/4, tQuarters[i%4])).ljust(self.kWidth), "TRKS %d" % (trk))
def print_meta(self):
if not self.woz_image.meta: return
for key, values in self.woz_image.meta.items():
if type(values) == str:
values = [values]
print(("META: " + key + ":").ljust(self.kWidth), values[0])
for value in values[1:]:
print("META: ".ljust(self.kWidth), value)
class CommandExport(BaseCommand):
def __init__(self):
BaseCommand.__init__(self, "export")
def setup(self, subparser):
BaseCommand.setup(self, subparser,
description="Export (as JSON) all information and metadata from a .woz disk image")
def __call__(self, args):
BaseCommand.__call__(self, args)
print(self.woz_image.to_json())
class WriterBaseCommand(BaseCommand):
def __call__(self, args):
BaseCommand.__call__(self, args)
self.args = args
# maintain creator if there is one, otherwise use default
self.output = WozWriter(self.woz_image.info.get("creator", __displayname__))
self.output.tmap = self.woz_image.tmap
self.output.tracks = self.woz_image.tracks
self.output.info = self.woz_image.info.copy()
self.output.meta = self.woz_image.meta.copy()
self.update()
tmpfile = args.file + ".ardry"
with open(tmpfile, "wb") as f:
self.output.write(f)
os.rename(tmpfile, args.file)
class CommandEdit(WriterBaseCommand):
def __init__(self):
WriterBaseCommand.__init__(self, "edit")
def setup(self, subparser):
WriterBaseCommand.setup(self,
subparser,
description="Edit information and metadata in a .woz disk image",
epilog="""Tips:
- Use repeated flags to edit multiple fields at once.
- Use "key:" with no value to delete a metadata field.
- Keys are case-sensitive.
- Some values have format restrictions; read the .woz specification.""",
help=".woz disk image (modified in place)",
formatter_class=argparse.RawDescriptionHelpFormatter)
self.parser.add_argument("-i", "--info", type=str, action="append",
help="""change information field.
INFO format is "key:value".
Acceptable keys are disk_type, write_protected, synchronized, cleaned, creator, version.
Other keys are ignored.
For boolean fields, use "1" or "true" or "yes" for true, "0" or "false" or "no" for false.""")
self.parser.add_argument("-m", "--meta", type=str, action="append",
help="""change metadata field.
META format is "key:value".
Standard keys are title, subtitle, publisher, developer, copyright, version, language, requires_ram,
requires_machine, notes, side, side_name, contributor, image_date. Other keys are allowed.""")
def update(self):
# add all new info fields
for i in self.args.info or ():
k, v = i.split(":", 1)
if k in ("write_protected","synchronized","cleaned"):
v = v.lower() in ("1", "true", "yes")
self.output.info[k] = v
# add all new metadata fields, and delete empty ones
for m in self.args.meta or ():
k, v = m.split(":", 1)
v = v.split("|")
if len(v) == 1:
v = v[0]
if v:
self.output.meta[k] = v
elif k in self.output.meta.keys():
del self.output.meta[k]
class CommandRemove(WriterBaseCommand):
def __init__(self):
WriterBaseCommand.__init__(self, "remove")
def setup(self, subparser):
WriterBaseCommand.setup(self,
subparser,
description="Remove tracks from a .woz disk image",
epilog="""Tips:
- Tracks can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)
- Use repeated flags to remove multiple tracks at once.
- It is harmless to try to remove a track that doesn't exist.""",
help=".woz disk image (modified in place)",
formatter_class=argparse.RawDescriptionHelpFormatter)
self.parser.add_argument("-t", "--track", type=str, action="append",
help="""track to remove""")
def update(self):
for i in self.args.track or ():
self.output.remove_track(float(i))
class CommandImport(WriterBaseCommand):
def __init__(self):
WriterBaseCommand.__init__(self, "import")
def setup(self, subparser):
WriterBaseCommand.setup(self, subparser,
description="Import JSON file to update metadata in a .woz disk image")
def update(self):
self.output.from_json(sys.stdin.read())
if __name__ == "__main__":
import sys
raise_if = lambda cond, e, s="": cond and sys.exit("%s: %s" % (e.__name__, s))
cmds = [CommandDump(), CommandVerify(), CommandEdit(), CommandRemove(), CommandExport(), CommandImport()]
parser = argparse.ArgumentParser(prog=__progname__,
description="""A multi-purpose tool for manipulating .woz disk images.
See '""" + __progname__ + """ <command> -h' for help on individual commands.""",
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("-v", "--version", action="version", version=__displayname__)
sp = parser.add_subparsers(dest="command", help="command")
for command in cmds:
command.setup(sp)
args = parser.parse_args()
args.action(args)