From 304a9cc83874a4c83434be251f38faba311aaade Mon Sep 17 00:00:00 2001 From: 4am Date: Sat, 2 Jun 2018 10:23:11 -0400 Subject: [PATCH] add validation before write, temp file --- wozardry | 182 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 128 insertions(+), 54 deletions(-) diff --git a/wozardry b/wozardry index bbe3a9b..a8f5470 100755 --- a/wozardry +++ b/wozardry @@ -15,21 +15,21 @@ __progname__ = "wozardry" __displayname__ = __progname__ + " " + __version__ + " by 4am (" + __date__ + ")" # domain-specific constants defined in .woz specification -kWOZ1 = b'WOZ1' -kINFO = b'INFO' -kTMAP = b'TMAP' -kTRKS = b'TRKS' -kMETA = b'META' +kWOZ1 = b"WOZ1" +kINFO = b"INFO" +kTMAP = b"TMAP" +kTRKS = b"TRKS" +kMETA = b"META" kBitstreamLengthInBytes = 6646 -kLanguages = ('English','Spanish','French','German','Chinese','Japanese','Italian','Dutch','Portugese','Danish','Finnish','Norwegian','Swedish','Russian','Polish','Turkish','Arabic','Thai','Czech','Hungarian','Catalan','Croatian','Greek','Hebrew','Romanian','Slovak','Ukranian','Indonesian','Malay','Vietnamese','Other') -kRequiresRAM = ('16K','24K','32K','48K','64K','128K','256K','512K','768K','1M','1.25M','1.5M+','Unknown') -kRequiresMachine = ('2','2+','2e','2c','2e+','2gs','2c+','3','3+') +kLanguages = ("English","Spanish","French","German","Chinese","Japanese","Italian","Dutch","Portugese","Danish","Finnish","Norwegian","Swedish","Russian","Polish","Turkish","Arabic","Thai","Czech","Hungarian","Catalan","Croatian","Greek","Hebrew","Romanian","Slovak","Ukranian","Indonesian","Malay","Vietnamese","Other") +kRequiresRAM = ("16K","24K","32K","48K","64K","128K","256K","512K","768K","1M","1.25M","1.5M+","Unknown") +kRequiresMachine = ("2","2+","2e","2c","2e+","2gs","2c+","3","3+") # strings and things, for print routines and error messages sEOF = "Unexpected EOF" sBadChunkSize = "Bad chunk size" -dNoYes = {False:'no',True:'yes'} -tQuarters = ('.00','.25','.50','.75') +dNoYes = {False:"no",True:"yes"} +tQuarters = (".00",".25",".50",".75") # errors that may be raised class WozError(Exception): pass # base class @@ -46,11 +46,13 @@ class WozINFOFormatError_BadDiskType(WozINFOFormatError): pass class WozINFOFormatError_BadWriteProtected(WozINFOFormatError): pass class WozINFOFormatError_BadSynchronized(WozINFOFormatError): pass class WozINFOFormatError_BadCleaned(WozINFOFormatError): pass +class WozINFOFormatError_BadCreator(WozINFOFormatError): pass class WozTMAPFormatError(WozFormatError): pass class WozTMAPFormatError_BadTRKS(WozTMAPFormatError): pass class WozTRKSFormatError(WozFormatError): pass class WozMETAFormatError(WozFormatError): pass class WozMETAFormatError_DuplicateKey(WozFormatError): pass +class WozMETAFormatError_BadValue(WozFormatError): pass class WozMETAFormatError_BadLanguage(WozFormatError): pass class WozMETAFormatError_BadRAM(WozFormatError): pass class WozMETAFormatError_BadMachine(WozFormatError): pass @@ -130,14 +132,70 @@ class DiskImage: # base class """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" return None -class WozReader(DiskImage): +class WozValidator: + def validate_info_version(self, version): + raise_if(version != b'\x01', WozINFOFormatError_BadVersion, "Unknown version (expected 1, found %s)" % version) + + def validate_info_disk_type(self, disk_type): + raise_if(disk_type not in (b'\x01',b'\x02'), WozINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %s)" % disk_type) + + def validate_info_write_protected(self, write_protected): + raise_if(write_protected not in (b'\x00',b'\x01'), WozINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %s)" % write_protected) + + def validate_info_synchronized(self, synchronized): + raise_if(synchronized not in (b'\x00',b'\x01'), WozINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %s)" % synchronized) + + def validate_info_cleaned(self, cleaned): + raise_if(cleaned not in (b'\x00',b'\x01'), WozINFOFormatError_BadCleaned, "Unknown cleaned flag (expected 0 or 1, found %s)" % cleaned) + + def validate_info_creator(self, creator_as_bytes): + raise_if(len(creator_as_bytes) > 32, WozINFOFormatError_BadCreator, "Creator is longer than 32 bytes") + try: + creator_as_bytes.decode("UTF-8") + except: + raise_if(True, WozINFOFormatError_BadCreator, "Creator is not valid UTF-8") + + def encode_info_creator(self, creator_as_string): + creator_as_bytes = creator_as_string.encode("UTF-8").ljust(32, b" ") + self.validate_info_creator(creator_as_bytes) + return creator_as_bytes + + def decode_info_creator(self, creator_as_bytes): + self.validate_info_creator(creator_as_bytes) + return creator_as_bytes.decode("UTF-8").strip() + + def validate_metadata(self, metadata_as_bytes): + try: + metadata = metadata_as_bytes.decode("UTF-8") + except: + raise WozMETAFormatError("Metadata is not valid UTF-8") + + def decode_metadata(self, metadata_as_bytes): + self.validate_metadata(metadata_as_bytes) + return metadata_as_bytes.decode("UTF-8") + + def validate_metadata_value(self, value): + raise_if("\t" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains tab character)") + raise_if("\n" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains linefeed character)") + raise_if("|" in value, WozMETAFormatError_BadValue, "Invalid metadata value (contains pipe character)") + + def validate_metadata_language(self, language): + raise_if(language and (language not in kLanguages), WozMETAFormatError_BadLanguage, "Invalid metadata language") + + def validate_metadata_requires_ram(self, requires_ram): + raise_if(requires_ram and (requires_ram not in kRequiresRAM), WozMETAFormatError_BadRAM, "Invalid metadata requires_ram") + + def validate_metadata_requires_machine(self, requires_machine): + raise_if(requires_machine and (requires_machine not in kRequiresMachine), WozMETAFormatError_BadMachine, "Invalid metadata requires_machine") + +class WozReader(DiskImage, WozValidator): def __init__(self, filename=None, stream=None): DiskImage.__init__(self, filename, stream) self.tmap = None self.info = collections.OrderedDict() self.meta = collections.OrderedDict() - with stream or open(filename, 'rb') as f: + with stream or open(filename, "rb") as f: header_raw = f.read(8) raise_if(len(header_raw) != 8, WozEOFError, sEOF) self.__process_header(header_raw) @@ -168,28 +226,25 @@ class WozReader(DiskImage): elif chunk_id == kMETA: self.__process_meta(data) if crc: - raise_if(crc != binascii.crc32(b''.join(all_data)) & 0xffffffff, WozCRCError, "Bad CRC") + raise_if(crc != binascii.crc32(b"".join(all_data)) & 0xffffffff, WozCRCError, "Bad CRC") def __process_header(self, data): raise_if(data[:4] != kWOZ1, WozHeaderError_NoWOZ1, "Magic string 'WOZ1' not present at offset 0") raise_if(data[4] != 0xFF, WozHeaderError_NoFF, "Magic byte 0xFF not present at offset 4") - raise_if(data[5:8] != b'\x0A\x0D\x0A', WozHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5") + raise_if(data[5:8] != b"\x0A\x0D\x0A", WozHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5") def __process_info(self, data): version = data[0] - raise_if(version != 1, WozINFOFormatError_BadVersion, "Unknown version (expected 1, found %d)" % version) + self.validate_info_version(to_uint8(version)) disk_type = data[1] - raise_if(disk_type not in (1,2), WozINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %d)" % disk_type) + self.validate_info_disk_type(to_uint8(disk_type)) write_protected = data[2] - raise_if(write_protected not in (0,1), WozINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %d)" % write_protected) + self.validate_info_write_protected(to_uint8(write_protected)) synchronized = data[3] - raise_if(synchronized not in (0,1), WozINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %d)" % synchronized) + self.validate_info_synchronized(to_uint8(synchronized)) cleaned = data[4] - raise_if(cleaned not in (0,1), WozINFOFormatError_BadCleaned, "Unknown cleaned flag (expected 0 or 1, found %d)" % cleaned) - try: - creator = data[5:37].decode('UTF-8') - except: - raise WOZINFOFormatError("Creator is not valid UTF-8") + self.validate_info_cleaned(to_uint8(cleaned)) + creator = self.decode_info_creator(data[5:37]) self.info["version"] = version # int self.info["disk_type"] = disk_type # int self.info["write_protected"] = (write_protected == 1) # boolean @@ -233,27 +288,21 @@ class WozReader(DiskImage): for trk, i in zip(self.tmap, itertools.count()): raise_if(trk != 0xFF and trk >= len(self.tracks), WozTMAPFormatError_BadTRKS, "Invalid TMAP entry: track %d%s points to non-existent TRKS chunk %d" % (i/4, tQuarters[i%4], trk)) - def __process_meta(self, data): - try: - metadata = data.decode('UTF-8') - except: - raise WozMETAFormatError("Metadata is not valid UTF-8") - for line in metadata.split('\n'): + def __process_meta(self, metadata_as_bytes): + metadata = self.decode_metadata(metadata_as_bytes) + for line in metadata.split("\n"): if not line: continue - columns_raw = line.split('\t') + columns_raw = line.split("\t") raise_if(len(columns_raw) != 2, WozMETAFormatError, "Malformed metadata") key, value_raw = columns_raw raise_if(key in self.meta, WozMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key) values = value_raw.split("|") if key == "language": - for value in values: - raise_if(value and (value not in kLanguages), WozMETAFormatError_BadLanguage, "Invalid metadata language") + list(map(self.validate_metadata_language, values)) elif key == "requires_ram": - for value in values: - raise_if(value and (value not in kRequiresRAM), WozMETAFormatError_BadRAM, "Invalid metadata requires_ram") + list(map(self.validate_metadata_requires_ram, values)) elif key == "requires_machine": - for value in values: - raise_if(value and (value not in kRequiresMachine), WozMETAFormatError_BadMachine, "Invalid metadata requires_machine") + list(map(self.validate_metadata_requires_machine, values)) self.meta[key] = len(values) == 1 and values[0] or tuple(values) def seek(self, track_num): @@ -268,7 +317,7 @@ class WozReader(DiskImage): if trk_id == 0xFF: return None return self.tracks[trk_id] -class WozWriter: +class WozWriter(WozValidator): def __init__(self, creator): self.info = collections.OrderedDict() self.info["version"] = 1 @@ -295,13 +344,24 @@ class WozWriter: chunk = bytearray() chunk.extend(kINFO) # chunk ID chunk.extend(to_uint32(60)) # chunk size (constant) - chunk.extend(to_uint8(self.info["version"])) # version (int, probably 1) - chunk.extend(to_uint8(self.info["disk_type"])) # disk type (1=5.25 inch, 2=3.5 inch) - chunk.extend(to_uint8(self.info["write_protected"])) # write-protected (0=no, 1=yes) - chunk.extend(to_uint8(self.info["synchronized"])) # tracks synchronized (0=no, 1=yes) - chunk.extend(to_uint8(self.info["cleaned"])) # weakbits cleaned (0=no, 1=yes) - chunk.extend(self.info["creator"].encode("UTF-8").ljust(32, b" ")) # creator - chunk.extend(b'\x00' * 23) # reserved + version_raw = to_uint8(self.info["version"]) + self.validate_info_version(version_raw) + disk_type_raw = to_uint8(self.info["disk_type"]) + self.validate_info_disk_type(disk_type_raw) + write_protected_raw = to_uint8(self.info["write_protected"]) + self.validate_info_write_protected(write_protected_raw) + synchronized_raw = to_uint8(self.info["synchronized"]) + self.validate_info_synchronized(synchronized_raw) + cleaned_raw = to_uint8(self.info["cleaned"]) + self.validate_info_cleaned(cleaned_raw) + creator_raw = self.encode_info_creator(self.info["creator"]) + chunk.extend(version_raw) # version (int, probably 1) + chunk.extend(disk_type_raw) # disk type (1=5.25 inch, 2=3.5 inch) + chunk.extend(write_protected_raw) # write-protected (0=no, 1=yes) + chunk.extend(synchronized_raw) # tracks synchronized (0=no, 1=yes) + chunk.extend(cleaned_raw) # weakbits cleaned (0=no, 1=yes) + chunk.extend(creator_raw) # creator + chunk.extend(b"\x00" * 23) # reserved return chunk def build_tmap(self): @@ -319,20 +379,30 @@ class WozWriter: for track in self.tracks: raw_bytes = track.bits.tobytes() chunk.extend(raw_bytes) # bitstream as raw bytes - chunk.extend(b'\x00' * (6646 - len(raw_bytes))) # padding to 6646 bytes + chunk.extend(b"\x00" * (6646 - len(raw_bytes))) # padding to 6646 bytes chunk.extend(to_uint16(len(raw_bytes))) # bytes used chunk.extend(to_uint16(track.bit_count)) # bit count - chunk.extend(b'\xFF\xFF') # splice point (none) - chunk.extend(b'\xFF') # splice nibble (none) - chunk.extend(b'\xFF') # splice bit count (none) - chunk.extend(b'\x00\x00') # reserved + chunk.extend(b"\xFF\xFF") # splice point (none) + chunk.extend(b"\xFF") # splice nibble (none) + chunk.extend(b"\xFF") # splice bit count (none) + chunk.extend(b"\x00\x00") # reserved return chunk def build_meta(self): - if not self.meta: return b'' - data = b'\x0A'.join( + if not self.meta: return b"" + for key, value_raw in self.meta.items(): + if type(value_raw) == str: + values = [value_raw] + list(map(self.validate_metadata_value, values)) + if key == "language": + list(map(self.validate_metadata_language, values)) + elif key == "requires_ram": + list(map(self.validate_metadata_requires_ram, values)) + elif key == "requires_machine": + list(map(self.validate_metadata_requires_machine, values)) + data = b"\x0A".join( [k.encode("UTF-8") + \ - b'\x09' + \ + b"\x09" + \ (type(v) in (list,tuple) and "|".join(v) or v).encode("UTF-8") \ for k, v in self.meta.items()]) chunk = bytearray() @@ -344,7 +414,7 @@ class WozWriter: def build_head(self, crc): chunk = bytearray() chunk.extend(kWOZ1) # magic bytes - chunk.extend(b'\xFF\x0A\x0D\x0A') # more magic bytes + chunk.extend(b"\xFF\x0A\x0D\x0A") # more magic bytes chunk.extend(to_uint32(crc)) # CRC32 of rest of file (calculated in caller) return chunk @@ -471,10 +541,14 @@ requires_machine, notes, side, side_name, contributor, image_date. Other keys ar output.meta[k] = v elif k in output.meta.keys(): del output.meta[k] - with open(args.file, 'wb') as f: + tmpfile = args.file + ".ardry" + with open(tmpfile, "wb") as f: output.write(f) + os.rename(tmpfile, args.file) if __name__ == "__main__": + import sys + raise_if = lambda cond, e, s="": cond and sys.exit("%s: %s" % (e.__name__, s)) cmds = [CommandDump(), CommandVerify(), CommandEdit()] parser = argparse.ArgumentParser(prog=__progname__, description="""A multi-purpose tool for manipulating .woz disk images.