From a4726f1c5ac9103c78e4a4332e5774989df86ef3 Mon Sep 17 00:00:00 2001 From: Rob McMullen Date: Wed, 20 Jul 2016 17:37:38 -0700 Subject: [PATCH] Initial support for DOS 3.3 VTOC --- atrcopy/dos33.py | 224 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 219 insertions(+), 5 deletions(-) diff --git a/atrcopy/dos33.py b/atrcopy/dos33.py index 947be93..262b1a3 100644 --- a/atrcopy/dos33.py +++ b/atrcopy/dos33.py @@ -8,6 +8,129 @@ import logging log = logging.getLogger(__name__) +class Dos33Dirent(object): + format = np.dtype([ + ('track', 'u1'), + ('sector', 'u1'), + ('flags', 'u1'), + ('name','S30'), + ('num_sectors',' image.header.max_sectors: + return False + return True + + def get_track_sector_list(self, image): + sector = image.header.sector_from_track(self.track, self.sector) + sector_list = [] + while sector > 0: + image.assert_valid_sector(sector) + print "reading track/sector list", sector + values, style = image.get_sectors(sector) + sector = image.header.sector_from_track(values[1], values[2]) + i = 0x0c + while i < 256: + t = values[i] + s = values[i + 1] + i += 2 + if t == 0: + sector = 0 + break + sector_list.append(image.header.sector_from_track(t, s)) + self.sector_map = sector_list + + def start_read(self, image): + if not self.is_sane: + raise InvalidDirent("Invalid directory entry '%s'" % str(self)) + self.get_track_sector_list(image) + self.current_sector_index = 0 + self.current_read = self.num_sectors + + def read_sector(self, image): + sector = self.sector_map[self.current_sector_index] + last = (self.current_sector_index == len(self.sector_map) - 1) + raw, pos, size = image.get_raw_bytes(sector) + bytes, num_data_bytes = self.process_raw_sector(image, raw) + return bytes, last, pos, num_data_bytes + + def process_raw_sector(self, image, raw): + self.current_sector_index += 1 + num_bytes = len(raw) + return raw[0:num_bytes], num_bytes + + def get_filename(self): + return self.filename + + class Dos33Header(AtrHeader): file_format = "DOS 3.3" @@ -15,6 +138,7 @@ class Dos33Header(AtrHeader): AtrHeader.__init__(self, None, 256, 0) self.header_offset = 0 self.sector_order = range(16) + self.vtoc_sector = 17 * 16 def __str__(self): return "%s Disk Image (size=%d (%dx%db)" % (self.file_format, self.image_size, self.max_sectors, self.sector_size) @@ -42,9 +166,13 @@ class Dos33Header(AtrHeader): size = self.sector_size return pos, size + def sector_from_track(self, track, sector): + return track * 16 + sector + class Dos33DiskImage(DiskImageBase): def __init__(self, rawdata, filename=""): + self.first_catalog = 0 DiskImageBase.__init__(self, rawdata, filename) def __str__(self): @@ -60,19 +188,108 @@ class Dos33DiskImage(DiskImageBase): if (magic == [1, 56, 176, 3]).all(): raise InvalidDiskImage("ProDOS format found; not DOS 3.3 image") swap_order = False - data, style = self.get_sectors(17 * 16) + data, style = self.get_sectors(self.header.vtoc_sector) if data[3] == 3: if data[1] < 35 and data[2] < 16: - data, style = self.get_sectors(17 * 16 + 14) + data, style = self.get_sectors(self.header.vtoc_sector + 14) if data[2] != 13: swap_order = True else: raise InvalidDiskImage("Invalid VTOC location for DOS 3.3") print "swap", swap_order + + vtoc_type = np.dtype([ + ('unused1', 'S1'), + ('cat_track','u1'), + ('cat_sector','u1'), + ('dos_release', 'u1'), + ('unused2', 'S2'), + ('vol_num', 'u1'), + ('unused3', 'S32'), + ('max_pairs', 'u1'), + ('unused4', 'S8'), + ('last_track', 'u1'), + ('track_dir', 'i1'), + ('unused5', 'S2'), + ('num_tracks', 'u1'), + ('sectors_per_track', 'u1'), + ('bytes_per_sector', 'u2'), + ]) + + def get_vtoc(self): + data, style = self.get_sectors(self.header.vtoc_sector) + values = data[0:56].view(dtype=self.vtoc_type)[0] + self.first_catalog = self.header.sector_from_track(values[1], values[2]) + self.assert_valid_sector(self.first_catalog) + self.total_sectors = int(values['num_tracks']) * int(values['sectors_per_track']) + self.dos_release = values['dos_release'] + + def get_directory(self): + sector = self.first_catalog + num = 0 + files = [] + while sector > 0: + self.assert_valid_sector(sector) + print "reading catalog sector", sector + values, style = self.get_sectors(sector) + sector = self.header.sector_from_track(values[1], values[2]) + i = 0xb + while i < 256: + dirent = Dos33Dirent(self, num, values[i:i+0x23]) + if not dirent.is_sane: + sector = 0 + break + files.append(dirent) + print dirent + i += 0x23 + num += 1 + self.files = files def get_boot_segments(self): return [] + + def get_vtoc_segments(self): + r = self.rawdata + segments = [] + addr = 0 + start, count = self.get_contiguous_sectors(self.header.vtoc_sector, 1) + segment = RawSectorsSegment(r[start:start+count], self.header.vtoc_sector, 1, count, 0, 0, self.header.sector_size, name="VTOC") + segments.append(segment) + return segments + + def get_directory_segments(self): + byte_order = [] + r = self.rawdata + segments = [] + sector = self.first_catalog + while sector > 0: + self.assert_valid_sector(sector) + print "reading catalog sector", sector + raw, pos, size = self.get_raw_bytes(sector) + byte_order.extend(range(pos, pos + size)) + sector = self.header.sector_from_track(raw[1], raw[2]) + raw = self.rawdata.get_indexed(byte_order) + segment = DefaultSegment(raw, name="Catalog") + segments.append(segment) + return segments + + def get_file_segment(self, dirent): + byte_order = [] + dirent.start_read(self) + while True: + bytes, last, pos, size = dirent.read_sector(self) + byte_order.extend(range(pos, pos + size)) + if last: + break + if len(byte_order) > 0: + name = "%s %ds@%d" % (dirent.get_filename(), dirent.num_sectors, dirent.sector_map[0]) + verbose_name = "%s (%d sectors, first@%d) %s" % (dirent.get_filename(), dirent.num_sectors, dirent.sector_map[0], dirent.verbose_info) + raw = self.rawdata.get_indexed(byte_order) + segment = DefaultSegment(raw, name=name, verbose_name=verbose_name) + else: + segment = EmptySegment(self.rawdata, name=dirent.get_filename()) + return segment class ProdosHeader(Dos33Header): @@ -82,7 +299,6 @@ class ProdosHeader(Dos33Header): class ProdosDiskImage(Dos33DiskImage): def read_header(self): self.header = ProdosHeader() - print "HI" def get_boot_sector_info(self): # based on logic from a2server @@ -101,5 +317,3 @@ class ProdosDiskImage(Dos33DiskImage): swap_order = True else: raise InvalidDiskImage("No ProDOS header info found") - - print "swap", swap_order