add a2r support

This commit is contained in:
4am 2018-09-10 10:19:46 -04:00
parent dc8d8c5e6e
commit 8b9fb866bd
5 changed files with 545 additions and 21 deletions

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from passport import eddimage, wozimage from passport import eddimage, wozimage, a2rimage
from passport import DefaultLogger, DebugLogger from passport import DefaultLogger, DebugLogger
from passport import Crack, Verify, EDDToWoz from passport import Crack, Verify, EDDToWoz
from passport.strings import STRINGS from passport.strings import STRINGS
@ -37,6 +37,8 @@ if ext == ".woz":
reader = wozimage.WozReader reader = wozimage.WozReader
elif ext == ".edd": elif ext == ".edd":
reader = eddimage.EDDReader reader = eddimage.EDDReader
elif ext == ".a2r":
reader = a2rimage.A2RImage
else: else:
print("unrecognized file type") print("unrecognized file type")
usage(1) usage(1)

View File

@ -104,7 +104,7 @@ class Sector:
def __getitem__(self, i): def __getitem__(self, i):
return self.decoded[i] return self.decoded[i]
class RWTS: class RWTS:
kDefaultSectorOrder16 = (0x00, 0x07, 0x0E, 0x06, 0x0D, 0x05, 0x0C, 0x04, 0x0B, 0x03, 0x0A, 0x02, 0x09, 0x01, 0x08, 0x0F) kDefaultSectorOrder16 = (0x00, 0x07, 0x0E, 0x06, 0x0D, 0x05, 0x0C, 0x04, 0x0B, 0x03, 0x0A, 0x02, 0x09, 0x01, 0x08, 0x0F)
kDefaultAddressPrologue16 = (0xD5, 0xAA, 0x96) kDefaultAddressPrologue16 = (0xD5, 0xAA, 0x96)
@ -143,7 +143,7 @@ class RWTS:
def seek(self, track_num): def seek(self, track_num):
self.track_num = track_num self.track_num = track_num
def reorder_to_logical_sectors(self, sectors): def reorder_to_logical_sectors(self, sectors):
logical = {} logical = {}
for k, v in sectors.items(): for k, v in sectors.items():
@ -165,7 +165,7 @@ class RWTS:
for i in nibbles: for i in nibbles:
found.append(next(track.nibble())) found.append(next(track.nibble()))
return tuple(found) == tuple(nibbles) return tuple(found) == tuple(nibbles)
def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
return self.verify_nibbles_at_point(track, self.address_epilogue) return self.verify_nibbles_at_point(track, self.address_epilogue)
@ -247,7 +247,7 @@ class RWTS:
# verifying the address field epilogue failed, but this is # verifying the address field epilogue failed, but this is
# not necessarily fatal because there might be another copy # not necessarily fatal because there might be another copy
# of this sector later # of this sector later
#self.g.logger.debug("verify_address_epilogue_at_point failed, continuing") self.g.logger.debug("verify_address_epilogue_at_point failed, continuing")
continue continue
if not self.find_data_prologue(track, track_num, address_field.sector_num): if not self.find_data_prologue(track, track_num, address_field.sector_num):
# if we can't find a data field prologue, just give up # if we can't find a data field prologue, just give up
@ -256,9 +256,9 @@ class RWTS:
# read and decode the data field, and verify the data checksum # read and decode the data field, and verify the data checksum
decoded = self.data_field_at_point(track, track_num, address_field.sector_num) decoded = self.data_field_at_point(track, track_num, address_field.sector_num)
if not decoded: if not decoded:
self.g.logger.debug("data_field_at_point failed, continuing")
# decoding data field failed, but this is not necessarily fatal # decoding data field failed, but this is not necessarily fatal
# because there might be another copy of this sector later # because there might be another copy of this sector later
self.g.logger.debug("data_field_at_point failed, continuing")
continue continue
if not self.verify_data_epilogue_at_point(track, track_num, address_field.sector_num): if not self.verify_data_epilogue_at_point(track, track_num, address_field.sector_num):
# verifying the data field epilogue failed, but this is # verifying the data field epilogue failed, but this is
@ -327,7 +327,7 @@ class UniversalRWTS(RWTS):
class UniversalRWTSIgnoreEpilogues(UniversalRWTS): class UniversalRWTSIgnoreEpilogues(UniversalRWTS):
def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num): def verify_address_epilogue_at_point(self, track, track_num, physical_sector_num):
return True return True
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
return True return True
@ -392,7 +392,7 @@ class BorderRWTS(DOS33RWTS):
logical_sectors[9][0x02]) logical_sectors[9][0x02])
self.data_epilogue = (logical_sectors[9][0x0C], self.data_epilogue = (logical_sectors[9][0x0C],
logical_sectors[9][0x11]) logical_sectors[9][0x11])
class D5TimingBitRWTS(DOS33RWTS): class D5TimingBitRWTS(DOS33RWTS):
def reset(self, logical_sectors): def reset(self, logical_sectors):
DOS33RWTS.reset(self, logical_sectors) DOS33RWTS.reset(self, logical_sectors)
@ -418,7 +418,7 @@ class InfocomRWTS(DOS33RWTS):
def reset(self, logical_sectors): def reset(self, logical_sectors):
DOS33RWTS.reset(self, logical_sectors) DOS33RWTS.reset(self, logical_sectors)
self.data_prologue = self.data_prologue[:2] self.data_prologue = self.data_prologue[:2]
def find_data_prologue(self, track, track_num, physical_sector_num): def find_data_prologue(self, track, track_num, physical_sector_num):
if not DOS33RWTS.find_data_prologue(self, track, track_num, physical_sector_num): if not DOS33RWTS.find_data_prologue(self, track, track_num, physical_sector_num):
return False return False
@ -453,7 +453,7 @@ class HeredityDogRWTS(DOS33RWTS):
self.g.found_and_cleaned_weakbits = True self.g.found_and_cleaned_weakbits = True
return bytearray(256) return bytearray(256)
return DOS33RWTS.data_field_at_point(self, track, track_num, physical_sector_num) return DOS33RWTS.data_field_at_point(self, track, track_num, physical_sector_num)
def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num): def verify_data_epilogue_at_point(self, track, track_num, physical_sector_num):
if (track_num, physical_sector_num) == (0x00, 0x0A): if (track_num, physical_sector_num) == (0x00, 0x0A):
return True return True
@ -463,7 +463,7 @@ class BECARWTS(DOS33RWTS):
def is_protected_sector(self, track_num, physical_sector_num): def is_protected_sector(self, track_num, physical_sector_num):
if track_num > 0: return True if track_num > 0: return True
return physical_sector_num not in (0x00, 0x0D, 0x0B, 0x09, 0x07, 0x05, 0x03, 0x01, 0x0E, 0x0C) return physical_sector_num not in (0x00, 0x0D, 0x0B, 0x09, 0x07, 0x05, 0x03, 0x01, 0x0E, 0x0C)
def reset(self, logical_sectors): def reset(self, logical_sectors):
DOS33RWTS.reset(self, logical_sectors) DOS33RWTS.reset(self, logical_sectors)
self.data_prologue = self.data_prologue[:2] self.data_prologue = self.data_prologue[:2]
@ -663,7 +663,7 @@ class BasePassportProcessor: # base class
return True return True
# TODO IsUnformatted and other tests # TODO IsUnformatted and other tests
return False return False
def IDDiversi(self, t00s00): def IDDiversi(self, t00s00):
"""returns True if T00S00 is Diversi-DOS bootloader, or False otherwise""" """returns True if T00S00 is Diversi-DOS bootloader, or False otherwise"""
return find.at(0xF1, t00s00, return find.at(0xF1, t00s00,
@ -757,7 +757,7 @@ class BasePassportProcessor: # base class
b'\xA0\x1F' b'\xA0\x1F'
b'\xB9\x00\x08' b'\xB9\x00\x08'
b'\x49') b'\x49')
def IDDOS33(self, t00s00): def IDDOS33(self, t00s00):
"""returns True if T00S00 is DOS bootloader or some variation """returns True if T00S00 is DOS bootloader or some variation
that can be safely boot traced, or False otherwise""" that can be safely boot traced, or False otherwise"""
@ -1040,7 +1040,7 @@ class BasePassportProcessor: # base class
if find.at(0x59, logical_sectors[3], b'\xBD\x8C\xC0\xC9\xD5'): if find.at(0x59, logical_sectors[3], b'\xBD\x8C\xC0\xC9\xD5'):
self.g.logger.PrintByID("diskrwts") self.g.logger.PrintByID("diskrwts")
return D5TimingBitRWTS(logical_sectors, self.g) return D5TimingBitRWTS(logical_sectors, self.g)
# TODO handle Milliken here # TODO handle Milliken here
# TODO handle Adventure International here # TODO handle Adventure International here
@ -1107,7 +1107,7 @@ class BasePassportProcessor: # base class
b'\x4C\xF0\xBB'): b'\x4C\xF0\xBB'):
self.g.protection_enforces_write_protected = True self.g.protection_enforces_write_protected = True
return HeredityDogRWTS(logical_sectors, self.g) return HeredityDogRWTS(logical_sectors, self.g)
if use_builtin: if use_builtin:
return self.StartWithUniv() return self.StartWithUniv()
@ -1120,10 +1120,10 @@ class BasePassportProcessor: # base class
self.g.tried_univ = True self.g.tried_univ = True
self.g.is_protdos = False self.g.is_protdos = False
return UniversalRWTS(self.g) return UniversalRWTS(self.g)
def preprocess(self): def preprocess(self):
return True return True
def run(self): def run(self):
self.g.logger.PrintByID("header") self.g.logger.PrintByID("header")
self.g.logger.PrintByID("reading", {"filename":self.g.disk_image.filename}) self.g.logger.PrintByID("reading", {"filename":self.g.disk_image.filename})
@ -1236,7 +1236,7 @@ class Verify(BasePassportProcessor):
class Crack(Verify): class Crack(Verify):
def save_track(self, track_num, physical_sectors): def save_track(self, track_num, physical_sectors):
self.output_tracks[float(track_num)] = Verify.save_track(self, track_num, physical_sectors) self.output_tracks[float(track_num)] = Verify.save_track(self, track_num, physical_sectors)
def apply_patches(self, logical_sectors, patches): def apply_patches(self, logical_sectors, patches):
for patch in patches: for patch in patches:
if patch.id: if patch.id:
@ -1249,7 +1249,7 @@ class Crack(Verify):
for i in range(len(patch.new_value)): for i in range(len(patch.new_value)):
b[patch.byte_offset + i] = patch.new_value[i] b[patch.byte_offset + i] = patch.new_value[i]
logical_sectors[patch.sector_num].decoded = b logical_sectors[patch.sector_num].decoded = b
def postprocess(self): def postprocess(self):
source_base, source_ext = os.path.splitext(self.g.disk_image.filename) source_base, source_ext = os.path.splitext(self.g.disk_image.filename)
output_filename = source_base + '.dsk' output_filename = source_base + '.dsk'
@ -1281,7 +1281,7 @@ class EDDToWoz(BasePassportProcessor):
# TODO this only works about half the time # TODO this only works about half the time
b = track.bits[:51021] b = track.bits[:51021]
self.output_tracks[track_num] = wozimage.Track(b, len(b)) self.output_tracks[track_num] = wozimage.Track(b, len(b))
def postprocess(self): def postprocess(self):
source_base, source_ext = os.path.splitext(self.g.disk_image.filename) source_base, source_ext = os.path.splitext(self.g.disk_image.filename)
output_filename = source_base + '.woz' output_filename = source_base + '.woz'

475
passport/a2rchery.py Executable file
View File

@ -0,0 +1,475 @@
#!/usr/bin/env python3
# (c) 2018 by 4am
# MIT-licensed
import argparse
import collections
import json
import os
import sys
__version__ = "1.0"
__date__ = "2018-09-08"
__progname__ = "a2rchery"
__displayname__ = __progname__ + " by 4am (" + __date__ + ")"
# chunk IDs for .a2r files
kA2R2 = b"A2R2"
kINFO = b"INFO"
kSTRM = b"STRM"
kMETA = b"META"
# other things defined in the .a2r specification
kLanguages = ("English","Spanish","French","German","Chinese","Japanese","Italian","Dutch","Portuguese","Danish","Finnish","Norwegian","Swedish","Russian","Polish","Turkish","Arabic","Thai","Czech","Hungarian","Catalan","Croatian","Greek","Hebrew","Romanian","Slovak","Ukrainian","Indonesian","Malay","Vietnamese","Other")
kRequiresRAM = ("16K","24K","32K","48K","64K","128K","256K","512K","768K","1M","1.25M","1.5M+","Unknown")
kRequiresMachine = ("2","2+","2e","2c","2e+","2gs","2c+","3","3+")
kCaptureTiming = 1
kCaptureBits = 2
kCaptureXTiming = 3
# strings and things, for print routines and error messages
sEOF = "Unexpected EOF"
sBadChunkSize = "Bad chunk size"
dNoYes = {False:"no",True:"yes"}
tQuarters = (".00",".25",".50",".75")
dTiming = {kCaptureTiming:"timing",kCaptureBits:"bits",kCaptureXTiming:"xtiming"}
# errors that may be raised
class A2RError(Exception): pass # base class
class A2REOFError(A2RError): pass
class A2RFormatError(A2RError): pass
class A2RHeaderError(A2RError): pass
class A2RHeaderError_NoA2R2(A2RHeaderError): pass
class A2RHeaderError_NoFF(A2RHeaderError): pass
class A2RHeaderError_NoLF(A2RHeaderError): pass
class A2RINFOFormatError(A2RFormatError): pass
class A2RINFOFormatError_BadVersion(A2RINFOFormatError): pass
class A2RINFOFormatError_BadDiskType(A2RINFOFormatError): pass
class A2RINFOFormatError_BadWriteProtected(A2RINFOFormatError): pass
class A2RINFOFormatError_BadSynchronized(A2RINFOFormatError): pass
class A2RINFOFormatError_BadCleaned(A2RINFOFormatError): pass
class A2RINFOFormatError_BadCreator(A2RINFOFormatError): pass
class A2RSTRMFormatError(A2RFormatError): pass
class A2RMETAFormatError(A2RFormatError): pass
class A2RMETAFormatError_DuplicateKey(A2RFormatError): pass
class A2RMETAFormatError_BadValue(A2RFormatError): pass
class A2RMETAFormatError_BadLanguage(A2RFormatError): pass
class A2RMETAFormatError_BadRAM(A2RFormatError): pass
class A2RMETAFormatError_BadMachine(A2RFormatError): pass
class A2RParseError(A2RError):
pass
def from_uint32(b):
return int.from_bytes(b, byteorder="little")
from_uint16=from_uint32
def to_uint32(b):
return b.to_bytes(4, byteorder="little")
def to_uint16(b):
return b.to_bytes(2, byteorder="little")
def to_uint8(b):
return b.to_bytes(1, byteorder="little")
def raise_if(cond, e, s=""):
if cond: raise e(s)
class DiskImage: # base class
def __init__(self, filename=None, stream=None):
raise_if(not filename and not stream, A2RError, "no input")
self.filename = filename
self.tracks = []
def seek(self, track_num):
"""returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)"""
return None
class A2RValidator:
def validate_info_version(self, version):
raise_if(version != b'\x01', A2RINFOFormatError_BadVersion, "Unknown version (expected 1, found %s)" % version)
def validate_info_disk_type(self, disk_type):
raise_if(disk_type not in (b'\x01',b'\x02'), A2RINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %s)" % disk_type)
def validate_info_write_protected(self, write_protected):
raise_if(write_protected not in (b'\x00',b'\x01'), A2RINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %s)" % write_protected)
def validate_info_synchronized(self, synchronized):
raise_if(synchronized not in (b'\x00',b'\x01'), A2RINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %s)" % synchronized)
def validate_info_creator(self, creator_as_bytes):
raise_if(len(creator_as_bytes) > 32, A2RINFOFormatError_BadCreator, "Creator is longer than 32 bytes")
try:
creator_as_bytes.decode("UTF-8")
except:
raise_if(True, A2RINFOFormatError_BadCreator, "Creator is not valid UTF-8")
def encode_info_creator(self, creator_as_string):
creator_as_bytes = creator_as_string.encode("UTF-8").ljust(32, b" ")
self.validate_info_creator(creator_as_bytes)
return creator_as_bytes
def decode_info_creator(self, creator_as_bytes):
self.validate_info_creator(creator_as_bytes)
return creator_as_bytes.decode("UTF-8").strip()
def validate_metadata(self, metadata_as_bytes):
try:
metadata = metadata_as_bytes.decode("UTF-8")
except:
raise A2RMETAFormatError("Metadata is not valid UTF-8")
def decode_metadata(self, metadata_as_bytes):
self.validate_metadata(metadata_as_bytes)
return metadata_as_bytes.decode("UTF-8")
def validate_metadata_value(self, value):
raise_if("\t" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains tab character)")
raise_if("\n" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains linefeed character)")
raise_if("|" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains pipe character)")
def validate_metadata_language(self, language):
raise_if(language and (language not in kLanguages), A2RMETAFormatError_BadLanguage, "Invalid metadata language")
def validate_metadata_requires_ram(self, requires_ram):
raise_if(requires_ram and (requires_ram not in kRequiresRAM), A2RMETAFormatError_BadRAM, "Invalid metadata requires_ram")
def validate_metadata_requires_machine(self, requires_machine):
raise_if(requires_machine and (requires_machine not in kRequiresMachine), A2RMETAFormatError_BadMachine, "Invalid metadata requires_machine")
class A2RReader(DiskImage, A2RValidator):
def __init__(self, filename=None, stream=None):
DiskImage.__init__(self, filename, stream)
self.info = collections.OrderedDict()
self.meta = collections.OrderedDict()
self.flux = collections.OrderedDict()
with stream or open(filename, "rb") as f:
header_raw = f.read(8)
raise_if(len(header_raw) != 8, A2REOFError, sEOF)
self.__process_header(header_raw)
while True:
chunk_id = f.read(4)
if not chunk_id: break
raise_if(len(chunk_id) != 4, A2REOFError, sEOF)
chunk_size_raw = f.read(4)
raise_if(len(chunk_size_raw) != 4, A2REOFError, sEOF)
chunk_size = from_uint32(chunk_size_raw)
data = f.read(chunk_size)
raise_if(len(data) != chunk_size, A2REOFError, sEOF)
if chunk_id == kINFO:
raise_if(chunk_size != 36, A2RFormatError, sBadChunkSize)
self.__process_info(data)
elif chunk_id == kSTRM:
self.__process_strm(data)
elif chunk_id == kMETA:
self.__process_meta(data)
def __process_header(self, data):
raise_if(data[:4] != kA2R2, A2RHeaderError_NoA2R2, "Magic string 'A2R2' not present at offset 0")
raise_if(data[4] != 0xFF, A2RHeaderError_NoFF, "Magic byte 0xFF not present at offset 4")
raise_if(data[5:8] != b"\x0A\x0D\x0A", A2RHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5")
def __process_info(self, data):
version = data[0]
self.validate_info_version(to_uint8(version))
disk_type = data[33]
self.validate_info_disk_type(to_uint8(disk_type))
write_protected = data[34]
self.validate_info_write_protected(to_uint8(write_protected))
synchronized = data[35]
self.validate_info_synchronized(to_uint8(synchronized))
creator = self.decode_info_creator(data[1:33])
self.info["version"] = version # int
self.info["disk_type"] = disk_type # int
self.info["write_protected"] = (write_protected == 1) # boolean
self.info["synchronized"] = (synchronized == 1) # boolean
self.info["creator"] = creator # string
def __process_strm(self, data):
raise_if(data[-1] != 0xFF, A2RSTRMFormatError, "Missing phase reset at end of STRM chunk")
i = 0
while i < len(data) - 1:
location = data[i]
capture_type = data[i+1]
data_length = from_uint32(data[i+2:i+6])
tick_count = from_uint32(data[i+6:i+10])
if location not in self.flux:
self.flux[location] = []
self.flux[location].append(
{"capture_type": capture_type,
"data_length": data_length,
"tick_count": tick_count,
"data": data[i+10:i+10+data_length]}
)
i = i + 10 + data_length
def __process_meta(self, metadata_as_bytes):
metadata = self.decode_metadata(metadata_as_bytes)
for line in metadata.split("\n"):
if not line: continue
columns_raw = line.split("\t")
raise_if(len(columns_raw) != 2, A2RMETAFormatError, "Malformed metadata")
key, value_raw = columns_raw
raise_if(key in self.meta, A2RMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key)
values = value_raw.split("|")
if key == "language":
list(map(self.validate_metadata_language, values))
elif key == "requires_ram":
list(map(self.validate_metadata_requires_ram, values))
elif key == "requires_machine":
list(map(self.validate_metadata_requires_machine, values))
self.meta[key] = len(values) == 1 and values[0] or tuple(values)
def to_json(self):
j = {"a2r": {"info":self.info, "meta":self.meta}}
return json.dumps(j, indent=2)
class A2RWriter(A2RValidator):
def __init__(self, creator):
self.info = collections.OrderedDict()
self.meta = collections.OrderedDict()
self.flux = collections.OrderedDict()
def from_json(self, json_string):
j = json.loads(json_string)
root = [x for x in j.keys()].pop()
self.meta.update(j[root]["meta"])
def build_head(self):
chunk = bytearray()
chunk.extend(kA2R2) # magic bytes
chunk.extend(b"\xFF\x0A\x0D\x0A") # more magic bytes
return chunk
def build_info(self):
chunk = bytearray()
chunk.extend(kINFO) # chunk ID
chunk.extend(to_uint32(36)) # chunk size (constant)
version_raw = to_uint8(self.info["version"])
self.validate_info_version(version_raw)
creator_raw = self.encode_info_creator(self.info["creator"])
disk_type_raw = to_uint8(self.info["disk_type"])
self.validate_info_disk_type(disk_type_raw)
write_protected_raw = to_uint8(self.info["write_protected"])
self.validate_info_write_protected(write_protected_raw)
synchronized_raw = to_uint8(self.info["synchronized"])
self.validate_info_synchronized(synchronized_raw)
chunk.extend(version_raw) # version (int, probably 1)
chunk.extend(creator_raw) # creator
chunk.extend(disk_type_raw) # disk type (1=5.25 inch, 2=3.5 inch)
chunk.extend(write_protected_raw) # write-protected (0=no, 1=yes)
chunk.extend(synchronized_raw) # tracks synchronized (0=no, 1=yes)
return chunk
def build_strm(self):
data_raw = bytearray()
for location in self.flux.keys():
for capture in self.flux[location]:
data_raw.extend(to_uint8(location)) # track where this capture happened
data_raw.extend(to_uint8(capture["capture_type"])) # 1 = timing, 2 = bits, 3 = xtiming
data_raw.extend(to_uint32(len(capture["data"]))) # data length in bytes
data_raw.extend(to_uint32(capture["tick_count"])) # estimated loop point in ticks
data_raw.extend(capture["data"])
data_raw.extend(b"\xFF")
chunk = bytearray()
chunk.extend(kSTRM) # chunk ID
chunk.extend(to_uint32(len(data_raw))) # chunk size
chunk.extend(data_raw) # all stream data
return chunk
def build_meta(self):
if not self.meta: return b""
meta_tmp = {}
for key, value_raw in self.meta.items():
if type(value_raw) == str:
values = [value_raw]
else:
values = value_raw
meta_tmp[key] = values
list(map(self.validate_metadata_value, values))
if key == "language":
list(map(self.validate_metadata_language, values))
elif key == "requires_ram":
list(map(self.validate_metadata_requires_ram, values))
elif key == "requires_machine":
list(map(self.validate_metadata_requires_machine, values))
data = b"\x0A".join(
[k.encode("UTF-8") + \
b"\x09" + \
"|".join(v).encode("UTF-8") \
for k, v in meta_tmp.items()]) + b"\x0A"
chunk = bytearray()
chunk.extend(kMETA) # chunk ID
chunk.extend(to_uint32(len(data))) # chunk size
chunk.extend(data)
return chunk
def write(self, stream):
stream.write(self.build_head())
stream.write(self.build_info())
stream.write(self.build_strm())
stream.write(self.build_meta())
#---------- command line interface ----------
class BaseCommand:
def __init__(self, name):
self.name = name
def setup(self, subparser, description=None, epilog=None, help=".a2r disk image", formatter_class=argparse.HelpFormatter):
self.parser = subparser.add_parser(self.name, description=description, epilog=epilog, formatter_class=formatter_class)
self.parser.add_argument("file", help=help)
self.parser.set_defaults(action=self)
def __call__(self, args):
self.a2r_image = A2RReader(args.file)
class CommandVerify(BaseCommand):
def __init__(self):
BaseCommand.__init__(self, "verify")
def setup(self, subparser):
BaseCommand.setup(self, subparser,
description="Verify file structure and metadata of a .a2r disk image (produces no output unless a problem is found)")
class CommandDump(BaseCommand):
kWidth = 30
def __init__(self):
BaseCommand.__init__(self, "dump")
def setup(self, subparser):
BaseCommand.setup(self, subparser,
description="Print all available information and metadata in a .a2r disk image")
def __call__(self, args):
BaseCommand.__call__(self, args)
self.print_flux()
self.print_meta()
self.print_info()
def print_info(self):
print("INFO: Format version:".ljust(self.kWidth), "%d" % self.a2r_image.info["version"])
print("INFO: Disk type:".ljust(self.kWidth), ("5.25-inch", "3.5-inch")[self.a2r_image.info["disk_type"]-1])
print("INFO: Write protected:".ljust(self.kWidth), dNoYes[self.a2r_image.info["write_protected"]])
print("INFO: Track synchronized:".ljust(self.kWidth), dNoYes[self.a2r_image.info["synchronized"]])
print("INFO: Creator:".ljust(self.kWidth), self.a2r_image.info["creator"])
def print_flux(self):
for location in self.a2r_image.flux:
for flux_record in self.a2r_image.flux[location]:
print(("STRM: Track %d%s" % (location/4, tQuarters[location%4])).ljust(self.kWidth),
dTiming[flux_record["capture_type"]], "capture,",
flux_record["tick_count"], "ticks")
def print_meta(self):
if not self.a2r_image.meta: return
for key, values in self.a2r_image.meta.items():
if type(values) == str:
values = [values]
print(("META: " + key + ":").ljust(self.kWidth), values[0])
for value in values[1:]:
print("META: ".ljust(self.kWidth), value)
class CommandExport(BaseCommand):
def __init__(self):
BaseCommand.__init__(self, "export")
def setup(self, subparser):
BaseCommand.setup(self, subparser,
description="Export (as JSON) all information and metadata from a .a2r disk image")
def __call__(self, args):
BaseCommand.__call__(self, args)
print(self.a2r_image.to_json())
class WriterBaseCommand(BaseCommand):
def __call__(self, args):
BaseCommand.__call__(self, args)
self.args = args
# maintain creator if there is one, otherwise use default
self.output = A2RWriter(self.a2r_image.info.get("creator", __displayname__))
self.output.flux = self.a2r_image.flux.copy()
self.output.info = self.a2r_image.info.copy()
self.output.meta = self.a2r_image.meta.copy()
self.update()
tmpfile = args.file + ".chery"
with open(tmpfile, "wb") as f:
self.output.write(f)
os.rename(tmpfile, args.file)
class CommandEdit(WriterBaseCommand):
def __init__(self):
WriterBaseCommand.__init__(self, "edit")
def setup(self, subparser):
WriterBaseCommand.setup(self,
subparser,
description="Edit information and metadata in a .a2r disk image",
epilog="""Tips:
- Use repeated flags to edit multiple fields at once.
- Use "key:" with no value to delete a metadata field.
- Keys are case-sensitive.
- Some values have format restrictions; read the .a2r specification.""",
help=".a2r disk image (modified in place)",
formatter_class=argparse.RawDescriptionHelpFormatter)
self.parser.add_argument("-i", "--info", type=str, action="append",
help="""change information field.
INFO format is "key:value".
Acceptable keys are disk_type, write_protected, synchronized, creator, version.
Other keys are ignored.
For boolean fields, use "1" or "true" or "yes" for true, "0" or "false" or "no" for false.""")
self.parser.add_argument("-m", "--meta", type=str, action="append",
help="""change metadata field.
META format is "key:value".
Standard keys are title, subtitle, publisher, developer, copyright, version, language, requires_ram,
requires_machine, notes, side, side_name, contributor, image_date. Other keys are allowed.""")
def update(self):
# add all new info fields
for i in self.args.info or ():
k, v = i.split(":", 1)
if k in ("write_protected","synchronized"):
v = v.lower() in ("1", "true", "yes")
self.output.info[k] = v
# add all new metadata fields, and delete empty ones
for m in self.args.meta or ():
k, v = m.split(":", 1)
v = v.split("|")
if len(v) == 1:
v = v[0]
if v:
self.output.meta[k] = v
elif k in self.output.meta.keys():
del self.output.meta[k]
class CommandImport(WriterBaseCommand):
def __init__(self):
WriterBaseCommand.__init__(self, "import")
def setup(self, subparser):
WriterBaseCommand.setup(self, subparser,
description="Import JSON file to update metadata in a .a2r disk image")
def update(self):
self.output.from_json(sys.stdin.read())
if __name__ == "__main__":
import sys
raise_if = lambda cond, e, s="": cond and sys.exit("%s: %s" % (e.__name__, s))
cmds = [CommandDump(), CommandVerify(), CommandEdit(), CommandExport(), CommandImport()]
parser = argparse.ArgumentParser(prog=__progname__,
description="""A multi-purpose tool for manipulating .a2r disk images.
See '""" + __progname__ + """ <command> -h' for help on individual commands.""",
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("-v", "--version", action="version", version=__displayname__)
sp = parser.add_subparsers(dest="command", help="command")
for command in cmds:
command.setup(sp)
args = parser.parse_args()
args.action(args)

47
passport/a2rimage.py Normal file
View File

@ -0,0 +1,47 @@
from passport.wozimage import DiskImage, Track, WozError, raise_if
from passport import a2rchery
import bitarray
import collections
class A2RImage:
def __init__(self, filename=None, stream=None):
self.filename = filename
self.tracks = collections.OrderedDict()
self.a2r_image = a2rchery.A2RReader(filename, stream)
def to_bits(self, flux_record):
"""|flux_record| is a dictionary of 'capture_type', 'data_length', 'tick_count', and 'data'"""
if not flux_record or flux_record["capture_type"] != a2rchery.kCaptureTiming:
return [], 0
bits = bitarray.bitarray()
track_length = 0
ticks = 0
flux_total = 0
fluxxen = flux_record["data"]
speeds = [(len([1 for i in fluxxen if i%t==0]), t) for t in range(0x1e,0x23)]
speeds.sort()
speed = speeds[-1][1]
for flux_value in fluxxen:
ticks += flux_value
if not track_length and ticks > flux_record["tick_count"]:
track_length = len(bits)
flux_total += flux_value
if flux_value == 0xFF:
continue
bits.extend([0] * ((flux_total - speed//2) // speed))
bits.append(1)
flux_total = 0
return bits, track_length
def seek(self, track_num):
if type(track_num) != float:
track_num = float(track_num)
if track_num < 0.0 or \
track_num > 35.0 or \
track_num.as_integer_ratio()[1] not in (1,2,4):
raise WozError("Invalid track %s" % track_num)
location = int(track_num * 4)
if not self.tracks.get(location):
bits, track_length = self.to_bits(self.a2r_image.flux.get(location, [{}])[0])
self.tracks[location] = Track(bits, len(bits))
return self.tracks[location]

View File

@ -1,5 +1,5 @@
STRINGS = { STRINGS = {
"header": "Passport.py by 4am (2018-07-03)\n", # max 32 characters "header": "Passport.py by 4am (2018-09-10)\n", # max 32 characters
"reading": "Reading from {filename}\n", "reading": "Reading from {filename}\n",
"diskrwts": "Using disk's own RWTS\n", "diskrwts": "Using disk's own RWTS\n",
"bb00": "T00,S05 Found $BB00 protection check\n" "bb00": "T00,S05 Found $BB00 protection check\n"