- Support specializations of Sector, and promoting from Sector to a subclass by the Sector.fromSector() classmethod
- have the Sector() __init__ method register itself with the disk via disk.SetSectorOwner()
- Add a disk.RWTS sector type
- Add a disk.Taste classmethod that attempts to interpret a disk as a subclass, which may be rejected

dos33disk.py
- Read the VTOC, Catalog, File metadata and file content sectors and claim them in the disk

process.py
- moved the main() method here to iterate over disk images under a directory and attempt to taste them as DOS 3.3 disks
This commit is contained in:
kris 2017-04-16 00:00:46 +01:00
parent c66456a9e5
commit 6ae678890c
3 changed files with 313 additions and 38 deletions

View File

@ -1,7 +1,5 @@
import bitstring import bitstring
import hashlib import hashlib
import os
import sys
import zlib import zlib
SECTOR_SIZE = 256 SECTOR_SIZE = 256
@ -24,9 +22,24 @@ class Disk(object):
self.hash = hashlib.sha1(data).hexdigest() self.hash = hashlib.sha1(data).hexdigest()
self.sectors = {} self.sectors = {}
for (track, sector) in self.EnumerateSectors():
self._ReadSector(track, sector)
# Assign ownership of T0, S0 to RWTS
self.rwts = RWTS.fromSector(self.ReadSector(0, 0))
@classmethod
def Taste(cls, disk):
# TODO: return a defined exception here
return cls(disk.name, disk.data)
def SetSectorOwner(self, track, sector, owner):
self.sectors[(track, sector)] = owner
def EnumerateSectors(self):
for track in xrange(TRACKS_PER_DISK): for track in xrange(TRACKS_PER_DISK):
for sector in xrange(SECTORS_PER_TRACK): for sector in xrange(SECTORS_PER_TRACK):
self.sectors[(track, sector)] = self._ReadSector(track, sector) yield (track, sector)
def _ReadSector(self, track, sector): def _ReadSector(self, track, sector):
offset = track * TRACK_SIZE + sector * SECTOR_SIZE offset = track * TRACK_SIZE + sector * SECTOR_SIZE
@ -42,11 +55,11 @@ class Disk(object):
except KeyError: except KeyError:
raise IOError("Track $%02x sector $%02x out of bounds" % (track, sector)) raise IOError("Track $%02x sector $%02x out of bounds" % (track, sector))
def RWTS(self):
return self.sectors[(0,0)]
class Sector(object): class Sector(object):
# TODO: other types will include: VTOC, Catalog, File metadata, File content, Deleted file, Free space
TYPE = 'Unknown sector'
def __init__(self, disk, track, sector, data): def __init__(self, disk, track, sector, data):
# Reference back to parent disk # Reference back to parent disk
self.disk = disk self.disk = disk
@ -61,6 +74,16 @@ class Sector(object):
compressed_data = zlib.compress(data.tobytes()) compressed_data = zlib.compress(data.tobytes())
self.compress_ratio = len(compressed_data) * 100 / len(data.tobytes()) self.compress_ratio = len(compressed_data) * 100 / len(data.tobytes())
disk.SetSectorOwner(track, sector, self)
# TODO: if all callers are using disk.ReadSector(track, sector) to get the sector then do that here
@classmethod
def fromSector(cls, sector, *args, **kwargs):
"""Create and register a new Sector from an existing Sector object."""
# TODO: don't recompute hash and entropy
return cls(sector.disk, sector.track, sector.sector, sector.data, *args, **kwargs)
# TOOD: move RWTS ones into RWTS() class?
KNOWN_HASHES = { KNOWN_HASHES = {
'b376885ac8452b6cbf9ced81b1080bfd570d9b91': 'Zero sector', 'b376885ac8452b6cbf9ced81b1080bfd570d9b91': 'Zero sector',
'90e6b1a0689974743cb92ca0b833ff1e683f4a73': 'RWTS (DOS 3.3 August 1980)', '90e6b1a0689974743cb92ca0b833ff1e683f4a73': 'RWTS (DOS 3.3 August 1980)',
@ -81,38 +104,11 @@ class Sector(object):
return human_name return human_name
def __str__(self): def __str__(self):
return "Track $%02x Sector $%02x: %s" % (self.track, self.sector, self.HumanName()) return "Track $%02x Sector $%02x: %s (%s)" % (self.track, self.sector, self.TYPE, self.HumanName())
def main():
disks = {}
for root, dirs, files in os.walk(sys.argv[1]):
for f in files:
if not f.lower().endswith('.dsk') and not f.lower().endswith('.do'):
continue
print f class RWTS(Sector):
b = bytearray(open(os.path.join(root, f), 'r').read()) TYPE = "RWTS"
try:
disk = Disk(f, b)
disks[f] = disk
except IOError:
continue
except AssertionError:
continue
for ts, data in sorted(disk.sectors.iteritems()): def __init__(self, disk, track, sector, data):
print data super(RWTS, self).__init__(disk, track, sector, data)
# Group disks by hash of RWTS sector
rwts_hashes = {}
for f, d in disks.iteritems():
rwts_hash = d.RWTS().hash
rwts_hashes.setdefault(rwts_hash, []).append(f)
for h, disks in rwts_hashes.iteritems():
print h
for d in sorted(disks):
print " %s" % d
if __name__ == "__main__":
main()

230
src/apple2disk/dos33disk.py Normal file
View File

@ -0,0 +1,230 @@
import bitstring
import disk as disklib
import string
PRINTABLE = set(string.letters + string.digits + string.punctuation + ' ')
class File(object):
def __init__(self, short_type, long_type):
self.short_type = short_type
self.long_type = long_type
FILE_TYPES = {
0x00: File('T', 'TEXT'),
0x01: File('I', 'INTEGER BASIC'),
# TODO: add handler for parsing file content
0x02: File('A', 'APPLESOFT BASIC'),
0x04: File('B', 'BINARY'),
# TODO: others
}
class VTOCSector(disklib.Sector):
TYPE = 'DOS 3.3 VTOC'
def __init__(self, disk, track, sector, data):
super(VTOCSector, self).__init__(disk, track, sector, data)
(
catalog_track, catalog_sector, dos_release, volume, max_track_sector_pairs,
last_track_allocated, track_direction, tracks_per_disk, sectors_per_track,
bytes_per_sector, freemap
) = data.unpack(
'pad:8, uint:8, uint:8, uint:8, pad:16, uint:8, pad:256, uint:8, pad:64, uint:8, ' +
'int:8, pad:16, uint:8, uint:8, uintle:16, bits:1600'
)
# TODO: throw a better exception here to reject the identification as a DOS 3.3 disk
assert dos_release == 3
assert bytes_per_sector == disklib.SECTOR_SIZE
assert sectors_per_track == disklib.SECTORS_PER_TRACK
self.catalog_track = catalog_track
self.catalog_sector = catalog_sector
# TODO: why does DOS 3.3 sometimes display e.g. volume 254 when the VTOC says 178
self.volume = volume
class CatalogSector(disklib.Sector):
TYPE = 'DOS 3.3 Catalog'
def __init__(self, disk, track, sector, data):
super(CatalogSector, self).__init__(disk, track, sector, data)
(next_track, next_sector, file_entries) = data.unpack(
'pad:8, int:8, int:8, pad:64, bits:1960'
)
catalog_entries = []
offset = 0
while offset < len(file_entries):
file_entry = file_entries[offset:offset+(35*8)]
(file_track, file_sector, file_type, file_name, file_length) = file_entry.unpack(
'uint:8, uint:8, uint:8, bytes:30, uintle:16'
)
if file_track and file_sector:
entry = CatalogEntry(file_track, file_sector, file_type, file_name, file_length)
catalog_entries.append(entry)
offset += (35*8)
self.next_track = next_track
self.next_sector = next_sector
self.catalog_entries = catalog_entries
class FileMetadataSector(disklib.Sector):
def __init__(self, disk, track, sector, data, filename):
super(FileMetadataSector, self).__init__(disk, track, sector, data)
self.filename = filename
self.TYPE = 'DOS 3.3 File Metadata (%s)' % filename
(next_track, next_sector, sector_offset, data_sectors) = data.unpack(
'pad:8, uint:8, uint:8, pad:16, uintle:16, pad:40, bits:1952'
)
offset = 0
data_track_sectors = []
while offset < len(data_sectors):
ds = data_sectors[offset:offset + 16]
(t, s) = ds.unpack(
'uint:8, uint:8'
)
if t and s:
data_track_sectors.append((t, s))
offset += 16
self.next_track = next_track
self.next_sector = next_sector
self.sector_offset = sector_offset
self.data_track_sectors = data_track_sectors
class FileDataSector(disklib.Sector):
def __init__(self, disk, track, sector, data, filename):
super(FileDataSector, self).__init__(disk, track, sector, data)
self.filename = filename
self.TYPE = 'DOS 3.3 File Contents (%s)' % filename
class FreeSector(disklib.Sector):
TYPE = "DOS 3.3 Free Sector"
def __init__(self, disk, track, sector, data):
super(FreeSector, self).__init__(disk, track, sector, data)
class Dos33Disk(disklib.Disk):
def __init__(self, *args, **kwargs):
super(Dos33Disk, self).__init__(*args, **kwargs)
# TODO: read DOS tracks and compare to known images
self.vtoc = self._ReadVTOC()
self.catalog_track = self.vtoc.catalog_track
self.catalog_sector = self.vtoc.catalog_sector
# TODO: why does DOS 3.3 sometimes display e.g. volume 254 when the VTOC says 178
self.volume = self.vtoc.volume
self.ReadCatalog()
for catalog_entry in self.catalog.itervalues():
self.ReadCatalogEntry(catalog_entry)
def _ReadVTOC(self):
return VTOCSector.fromSector(self.ReadSector(0x11, 0x0))
def ReadCatalog(self):
next_track = self.catalog_track
next_sector = self.catalog_sector
catalog = {}
catalog_entries = []
while next_track and next_sector:
cs = CatalogSector.fromSector(self.ReadSector(next_track, next_sector))
(next_track, next_sector, new_entries) = (cs.next_track, cs.next_sector, cs.catalog_entries)
catalog_entries.extend(new_entries)
filenames = []
for entry in catalog_entries:
filename = entry.FileName().rstrip()
catalog[entry.FileName().rstrip()] = entry
filenames.append(filename)
self.filenames = filenames
self.catalog = catalog
def ReadCatalogEntry(self, entry):
next_track = entry.track
next_sector = entry.sector
sector_list = [None] * entry.length
# entry.length counts the number of data sectors as well as track/sector list sectors
track_sector_count = 0
while next_track and next_sector:
track_sector_count += 1
if next_track == 0xff:
# Deleted file
# TODO: add sector type for this. What to do about sectors claimed by this file that are in use by another file? May discover this before or after this entry
print "Found deleted file %s" % entry.FileName()
break
fs = FileMetadataSector.fromSector(self.ReadSector(next_track, next_sector), entry.FileName())
(next_track, next_sector) = (fs.next_track, fs.next_sector)
num_sectors = len(fs.data_track_sectors)
sector_list[fs.sector_offset:fs.sector_offset+num_sectors] = fs.data_track_sectors
# TODO: Assert we didn't have any holes. Or is this fine e.g. for a sparse text file?
#print track_sector_count
# We allocated space up-front for an unknown number of t/s list sectors, trim them from the end
sector_list = sector_list[:entry.length - track_sector_count]
#print sector_list
contents = bitstring.BitString()
for ts in sector_list:
if not ts:
#print "XXX found a sparse sector?"
continue
(t, s) = ts
fds = FileDataSector.fromSector(self.ReadSector(t, s), entry.FileName())
contents.append(fds.data)
return contents
def __str__(self):
catalog = ['DISK VOLUME %d\n' % self.volume]
for filename in self.catalog:
entry = self.files[filename]
try:
file_type = FILE_TYPES[entry.file_type][0]
except KeyError:
print "%s has unknown file type %02x" % (entry.FileName(), entry.file_type)
file_type = '?'
catalog.append(
'%s%s %03d %s' % (
'*' if entry.locked else ' ',
file_type, entry.length,
entry.FileName()
)
)
return '\n'.join(catalog)
class CatalogEntry(object):
def __init__(self, track, sector, file_type, file_name, length):
self.track = track
self.sector = sector
self.file_type = file_type & 0x7f
self.locked = file_type & 0x80
self.file_name = file_name
self.length = length
# TODO: handle deleted files (track = 0xff, original track in file_name[0x20])
def FileName(self):
return '%s' % ''.join([chr(ord(b) & 0x7f) for b in self.file_name])
def __str__(self):
return "Track $%02x Sector $%02x Type %s Name: %s Length: %d" % (self.track, self.sector, FILE_TYPES[self.file_type], self.FileName(), self.length)

49
src/apple2disk/process.py Normal file
View File

@ -0,0 +1,49 @@
import disk
import dos33disk
import os
import sys
def main():
disks = {}
for root, dirs, files in os.walk(sys.argv[1]):
for f in files:
if not f.lower().endswith('.dsk') and not f.lower().endswith('.do'):
continue
print f
b = bytearray(open(os.path.join(root, f), 'r').read())
try:
img = disk.Disk(f, b)
except IOError:
continue
except AssertionError:
continue
# See if this is a DOS 3.3 disk
try:
img = dos33disk.Dos33Disk.Taste(img)
print "%s is a DOS 3.3 disk, volume %d" % (f, img.volume)
except IOError:
pass
except AssertionError:
pass
disks[f] = img
for ts, data in sorted(img.sectors.iteritems()):
print data
# Group disks by hash of RWTS sector
rwts_hashes = {}
for f, d in disks.iteritems():
rwts_hash = d.rwts.hash
rwts_hashes.setdefault(rwts_hash, []).append(f)
for h, disks in rwts_hashes.iteritems():
print h
for d in sorted(disks):
print " %s" % d
if __name__ == "__main__":
main()