From 92fb9986b66e09a23de3538908cc30783f29570f Mon Sep 17 00:00:00 2001 From: Rob McMullen Date: Mon, 25 Mar 2019 15:31:07 -0700 Subject: [PATCH] Added Atari DOS filesystem parser --- atrcopy/filesystem.py | 382 ++++++++++++++++++++++++++++++ atrcopy/filesystems/__init__.py | 0 atrcopy/filesystems/atari_dos2.py | 323 +++++++++++++++++++++++++ atrcopy/media_type.py | 49 ++-- setup.py | 4 + test/test_filesystems.py | 72 ++++++ 6 files changed, 808 insertions(+), 22 deletions(-) create mode 100644 atrcopy/filesystem.py create mode 100644 atrcopy/filesystems/__init__.py create mode 100644 atrcopy/filesystems/atari_dos2.py create mode 100644 test/test_filesystems.py diff --git a/atrcopy/filesystem.py b/atrcopy/filesystem.py new file mode 100644 index 0000000..7722be3 --- /dev/null +++ b/atrcopy/filesystem.py @@ -0,0 +1,382 @@ +import hashlib +import inspect +import pkg_resources + +import numpy as np + +from . import errors +from . import style_bits +from .segment import Segment +from .utils import to_numpy, to_numpy_list, uuid + +import logging +log = logging.getLogger(__name__) + +try: # Expensive debugging + _xd = _expensive_debugging +except NameError: + _xd = False + + +class Filesystem: + """Base class for a "filesystem", which takes a source segment and + subdivides it into a set of segments where each represents a file. Some + auxiliary segments include a `VTOC` and a list of 'dirent's that point to + files. + """ + pretty_name = "Filesystem" + + extra_serializable_attributes = [] + + def __init__(self, media): + self.check_media(media) + self.media = media + self.boot = self.calc_boot_segment() + self.vtoc = self.calc_vtoc_segment() + self.directory = self.calc_directory_segment() + + #### initialization + + def check_media(self): + """Subclasses should override this method to verify the media type is + supported by the filesystem. + + Subclasses should raise IncompatibleMediaError if the filesystem is not + possible on this media, for instance attempting to use a disk + filesystem on a cassette media image. + """ + pass + + def calc_boot_segment(self): + """Subclasses should override this method to create a boot segment if + the filesystem supports one and it is present. + + If it is present, return a single `Segment` instance comprising the + entire set of data, and use sub-segments if more detail is present. + + If this feature is not present, return None. + + Subclasses should raise the appropriate FilesystemError if the data is + incompatible with this filesystem. + """ + pass + + def calc_vtoc_segment(self): + """Subclasses should override this method to create a VTOC segment if + the filesystem supports one and it is present. + + If it is present, return a single `Segment` instance comprising the + entire set of data, and use sub-segments if more detail is present. + + If this feature is not present, return None. + + Subclasses should raise the appropriate FilesystemError if the data is + incompatible with this filesystem. + """ + pass + + def calc_directory_segment(self): + """Subclasses should override this method to create a directory segment + if the filesystem supports one and it is present. + + If it is present, return a single `Segment` instance comprising the + entire set of data, and use sub-segments if more detail is present. + + If this feature is not present, return None. + + Subclasses should raise the appropriate FilesystemError if the data is + incompatible with this filesystem. + """ + pass + + #### + + def iter_segments(self): + if self.boot is not None: + yield self.boot + if self.vtoc is not None: + yield self.vtoc + if self.directory is not None: + yield self.directory + + +class Dirent(Segment): + """Abstract base class for a directory entry + + """ + + def __init__(self, filesystem, parent, file_num, start, length): + self.filesystem = filesystem + self.file_num = file_num + Segment.__init__(self, parent, start, name=f"Dirent {file_num}", length=length) + + def __eq__(self, other): + raise errors.NotImplementedError + + @property + def in_use(self): + raise errors.NotImplementedError + + def extra_metadata(self, image): + raise errors.NotImplementedError + + def mark_deleted(self): + raise errors.NotImplementedError + + def parse_raw_dirent(self, image, bytes): + raise errors.NotImplementedError + + def encode_dirent(self): + raise errors.NotImplementedError + + def get_sectors_in_vtoc(self, image): + raise errors.NotImplementedError + + def start_read(self, image): + raise errors.NotImplementedError + + def read_sector(self, image): + raise errors.NotImplementedError + + +class Directory(Segment): + def __init__(self, filesystem): + self.filesystem = filesystem + offset, length = self.find_segment_location() + Segment.__init__(self, filesystem.media, offset, name="Directory", length=length) + + # Each segment is a dirent + self.segments = self.calc_dirents() + + @property + def media(self): + return self.filesystem.media + + def find_segment_location(self): + raise NotImplementedError("Subclasses must define this to declare where the directory segment is located in the media image") + + def calc_dirents(self): + raise NotImplementedError("Subclasses must define this to generate a list of Dirent segments") + + def set(self, index, dirent): + self.segments[index] = dirent + if _xd: log.debug("set dirent #%d: %s" % (index, dirent)) + + def get_free_dirent(self): + used = set() + d = list(self.segments.items()) + if d: + d.sort() + for i, dirent in d: + if not dirent.in_use: + return i + used.add(i) + if self.num_dirents > 0 and (len(used) >= self.num_dirents): + raise errors.NoSpaceInDirectory() + i += 1 + else: + i = 0 + used.add(i) + return i + + def add_dirent(self, filename, filetype): + index = self.get_free_dirent() + dirent = self.dirent_class(None) + dirent.set_values(filename, filetype, index) + self.set(index, dirent) + return dirent + + def find_dirent(self, filename): + if hasattr(filename, "filename"): + # we've been passed a dirent instead of a filename + for dirent in list(self.segments.values()): + if dirent == filename: + return dirent + else: + for dirent in list(self.segments.values()): + if filename == dirent.filename: + return dirent + raise errors.FileNotFound("%s not found on disk" % filename) + + def save_dirent(self, image, dirent, vtoc, sector_list): + vtoc.assign_sector_numbers(dirent, sector_list) + dirent.add_metadata_sectors(vtoc, sector_list, image.header) + dirent.update_sector_info(sector_list) + self.calc_sectors(image) + + def remove_dirent(self, image, dirent, vtoc, sector_list): + vtoc.free_sector_list(sector_list) + dirent.mark_deleted() + self.calc_sectors(image) + + @property + def dirent_class(self): + raise errors.NotImplementedError + + def calc_sectors(self, image): + self.sectors = [] + self.current_sector = self.get_dirent_sector() + self.encode_index = 0 + + d = list(self.segments.items()) + d.sort() + # there may be gaps, so fill in missing entries with blanks + current = 0 + for index, dirent in d: + for missing in range(current, index): + if _xd: log.debug("Encoding empty dirent at %d" % missing) + data = self.encode_empty() + self.store_encoded(data) + if _xd: log.debug("Encoding dirent: %s" % dirent) + data = self.encode_dirent(dirent) + self.store_encoded(data) + current = index + 1 + self.finish_encoding(image) + + def get_dirent_sector(self): + return self.sector_class(self.sector_size) + + def encode_empty(self): + raise errors.NotImplementedError + + def encode_dirent(self, dirent): + raise errors.NotImplementedError + + def store_encoded(self, data): + while True: + if _xd: log.debug("store_encoded: %d bytes in %s" % (len(data), self.current_sector)) + data = self.current_sector.add_data(data) + if len(data) > 0: + self.sectors.append(self.current_sector) + self.current_sector = self.get_dirent_sector() + else: + break + + def finish_encoding(self, image): + if not self.current_sector.is_empty: + self.sectors.append(self.current_sector) + self.set_sector_numbers(image) + + def set_sector_numbers(self, image): + raise errors.NotImplementedError + + +class VTOC(Segment): + def __init__(self, filesystem): + self.filesystem = filesystem + offset, length = self.find_segment_location() + Segment.__init__(self, filesystem.media, offset, name="VTOC", length=length) + + # sector map: 1 is free, 0 is allocated + self.sector_map = np.zeros([filesystem.media.num_sectors], dtype=np.uint8) + self.unpack_vtoc() + + @property + def media(self): + return self.filesystem.media + + def find_segment_location(self): + """Calculate the location on the media for the VTOC. Return either + sector number and count, or offset list + """ + raise NotImplementedError("Subclasses must define this to declare where the directory segment is located in the media image") + + # def __str__(self): + # return "%s\n (%d free)" % ("\n".join(["track %02d: %s" % (i, line) for i, line in enumerate(str(self.sector_map[self.header.starting_sector_label:(self.header.tracks_per_disk*self.header.sectors_per_track) + self.header.starting_sector_label].reshape([self.header.tracks_per_disk,self.header.sectors_per_track])).splitlines())]), self.num_free_sectors) + + def unpack_vtoc(self): + """Using the bit-encoded data, unpack it into the sector_map array + """ + raise NotImplementedError() + + def pack_vtoc(self): + """Pack the sector_map array into the segment + """ + raise NotImplementedError() + + @property + def num_free_sectors(self): + free = np.where(self.sector_map == 1)[0] + return len(free) + + def iter_free_sectors(self): + for i, pos, size in self.filesystem.media.iter_sectors(): + if self.sector_map[i] == 1: + yield i, pos, size + + def assign_sector_numbers(self, dirent, sector_list): + """ Map out the sectors and link the sectors together + + raises NotEnoughSpaceOnDisk if the whole file won't fit. It will not + allow partial writes. + """ + num = len(sector_list) + order = self.reserve_space(num) + if len(order) != num: + raise errors.InvalidFile("VTOC reserved space for %d sectors. Sectors needed: %d" % (len(order), num)) + file_length = 0 + last_sector = None + for sector, sector_num in zip(sector_list.sectors, order): + sector.sector_num = sector_num + sector.file_num = dirent.file_num + file_length += sector.used + if last_sector is not None: + last_sector.next_sector_num = sector_num + last_sector = sector + if last_sector is not None: + last_sector.next_sector_num = 0 + sector_list.file_length = file_length + + def reserve_space(self, num): + order = [] + for i in range(num): + order.append(self.get_next_free_sector()) + if _xd: log.debug("Sectors reserved: %s" % order) + self.calc_bitmap() + return order + + def get_next_free_sector(self): + free = np.nonzero(self.sector_map)[0] + if len(free) > 0: + num = free[0] + if _xd: log.debug("Found sector %d free" % num) + self.sector_map[num] = 0 + return num + raise errors.NotEnoughSpaceOnDisk("No space left in VTOC") + + def calc_bitmap(self): + raise errors.NotImplementedError + + def free_sector_list(self, sector_list): + for sector in sector_list: + self.sector_map[sector.sector_num] = 1 + self.calc_bitmap() + + +def find_filesystems(): + filesystems = [] + for entry_point in pkg_resources.iter_entry_points('atrcopy.filesystems'): + mod = entry_point.load() + log.debug(f"find_filesystems: Found module {entry_point.name}={mod.__name__}") + for name, obj in inspect.getmembers(mod): + if inspect.isclass(obj) and Filesystem in obj.__mro__[1:]: + log.debug(f"find_filesystems: found media_type class {name}") + filesystems.append(obj) + return filesystems + + +def guess_filesystem(segment, verbose=False): + for f in find_filesystems(): + if verbose: + log.info(f"trying filesystem {f}") + try: + found = f(segment) + except errors.FilesystemError as e: + log.debug(f"found error: {e}") + continue + else: + if verbose: + log.info(f"found filesystem {f}") + return found + log.info(f"No recognized filesystem.") + return None diff --git a/atrcopy/filesystems/__init__.py b/atrcopy/filesystems/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/atrcopy/filesystems/atari_dos2.py b/atrcopy/filesystems/atari_dos2.py new file mode 100644 index 0000000..390ae92 --- /dev/null +++ b/atrcopy/filesystems/atari_dos2.py @@ -0,0 +1,323 @@ +import numpy as np + +from .. import errors +from ..segment import Segment +from ..filesystem import VTOC, Dirent, Directory, Filesystem + +try: # Expensive debugging + _xd = _expensive_debugging +except NameError: + _xd = False + + +class AtariDosBootSegment(Segment): + boot_record_type = np.dtype([ + ('BFLAG', 'u1'), + ('BRCNT', 'u1'), + ('BLDADR', ' self.calc_vtoc_code(): + raise errors.InvalidDiskImage(f"Invalid number of VTOC sectors: {num}") + self.total_sectors = values[1] + self.unused_sectors = values[2] + return media.get_contiguous_sectors_offsets(self.first_vtoc, self.num_vtoc) + + def unpack_vtoc(self): + bits = np.unpackbits(self[0x0a:0x64]) + self.sector_map[0:720] = bits + if _xd: log.debug("vtoc before:\n%s" % str(self)) + + def pack_vtoc(self): + if _xd: log.debug("vtoc after:\n%s" % str(self)) + packed = np.packbits(self.sector_map[0:720]) + self[0x0a:0x64] = packed + + def calc_vtoc_code(self): + # From AA post: http://atariage.com/forums/topic/179868-mydos-vtoc-size/ + media = self.filesystem.media + num = 1 + (media.num_sectors + 80) // (media.sector_size * 8) + if media.sector_size == 128: + if num == 1: + code = 2 + else: + if num & 1: + num += 1 + code = ((num + 1) // 2) + 2 + else: + if media.num_sectors < 1024: + code = 2 + else: + code = 2 + num + return code + + +class AtariDosDirent(Dirent): + # ATR Dirent structure described at http://atari.kensclassics.org/dos.htm + format = np.dtype([ + ('FLAG', 'u1'), + ('COUNT', ' 0 + self.dos_2 = (flag&0x02) > 0 + self.mydos = (flag&0x04) > 0 + self.is_dir = (flag&0x10) > 0 + self.locked = (flag&0x20) > 0 + self._in_use = (flag&0x40) > 0 + self.deleted = (flag&0x80) > 0 + self.num_sectors = int(values[1]) + self.starting_sector = int(values[2]) + self.basename = bytes(values[3]).rstrip() + self.ext = bytes(values[4]).rstrip() + self.is_sane = self.sanity_check() + + def encode_dirent(self): + data = np.zeros([self.format.itemsize], dtype=np.uint8) + values = data.view(dtype=self.format)[0] + flag = (1 * int(self.opened_output)) | (2 * int(self.dos_2)) | (4 * int(self.mydos)) | (0x10 * int(self.is_dir)) | (0x20 * int(self.locked)) | (0x40 * int(self._in_use)) | (0x80 * int(self.deleted)) + values[0] = flag + values[1] = self.num_sectors + values[2] = self.starting_sector + values[3] = self.basename + values[4] = self.ext + return data + + def mark_deleted(self): + self.deleted = True + self._in_use = False + + def update_sector_info(self, sector_list): + self.num_sectors = sector_list.num_sectors + self.starting_sector = sector_list.first_sector + + def add_metadata_sectors(self, vtoc, sector_list, header): + # no extra sectors are needed for an Atari DOS file; the links to the + # next sector is contained in the sector. + pass + + def sanity_check(self): + media = self.filesystem.media + if not self._in_use: + return True + if not media.is_sector_valid(self.starting_sector): + return False + if self.num_sectors < 0 or self.num_sectors > media.num_sectors: + return False + return True + + def get_sectors_in_vtoc(self, image): + sector_list = BaseSectorList(image.header) + self.start_read(image) + while True: + sector = WriteableSector(image.header.sector_size, None, self.current_sector) + sector_list.append(sector) + _, last, _, _ = self.read_sector(image) + if last: + break + return sector_list + + def start_read(self, image): + if not self.is_sane: + raise errors.InvalidDirent("Invalid directory entry '%s'" % str(self)) + self.current_sector = self.starting_sector + self.current_read = self.num_sectors + self.sectors_seen = set() + + def read_sector(self, image): + raw, pos, size = image.get_raw_bytes(self.current_sector) + bytes, num_data_bytes = self.process_raw_sector(image, raw) + return bytes, self.current_sector == 0, pos, num_data_bytes + + def process_raw_sector(self, image, raw): + file_num = raw[-3] >> 2 + if file_num != self.file_num: + raise errors.FileNumberMismatchError164("Expecting file %d, found %d" % (self.file_num, file_num)) + self.sectors_seen.add(self.current_sector) + next_sector = ((raw[-3] & 0x3) << 8) + raw[-2] + if next_sector in self.sectors_seen: + raise errors.InvalidFile("Bad sector pointer data: attempting to reread sector %d" % next_sector) + self.current_sector = next_sector + num_bytes = raw[-1] + return raw[0:num_bytes], num_bytes + + def set_values(self, filename, filetype, index): + if type(filename) is not bytes: + filename = filename.encode("latin1") + if b'.' in filename: + filename, ext = filename.split(b'.', 1) + else: + ext = b' ' + self.basename = b'%-8s' % filename[0:8] + self.ext = ext + self.file_num = index + self.dos_2 = True + self._in_use = True + if _xd: log.debug("set_values: %s" % self) + + +class AtariDos2Directory(Directory): + def __init__(self, filesystem): + self.filesystem = filesystem + offset, length = self.find_segment_location() + Segment.__init__(self, filesystem.media, offset, name="Directory", length=length) + + # Each segment is a dirent + self.segments = self.calc_dirents() + + def find_segment_location(self): + media = self.media + if media.is_sector_valid(361): + return media.get_contiguous_sectors_offsets(361, 8) + else: + raise errors.FilesystemError("Disk image too small to contain a directory") + + def calc_dirents(self): + segments = [] + index = 0 + for filenum in range(64): + dirent = AtariDosDirent(self.filesystem, self, filenum, index) + if not dirent.in_use: + continue + dirent.set_comment_at(0x00, "FILE #%d: Flag" % filenum) + dirent.set_comment_at(0x01, "FILE #%d: Number of sectors in file" % filenum) + dirent.set_comment_at(0x03, "FILE #%d: Starting sector number" % filenum) + dirent.set_comment_at(0x05, "FILE #%d: Filename" % filenum) + dirent.set_comment_at(0x0d, "FILE #%d: Extension" % filenum) + index += 16 + segments.append(dirent) + return segments + + +class AtariDos2(Filesystem): + default_executable_extension = "XEX" + + def check_media(self, media): + try: + media.get_contiguous_sectors + except AttributeError: + raise errors.IncompatibleMediaError("Atari DOS needs sector access") + + def calc_boot_segment(self): + return AtariDosBootSegment(self) + + def calc_vtoc_segment(self): + return AtariDos2VTOC(self) + + def calc_directory_segment(self): + return AtariDos2Directory(self) diff --git a/atrcopy/media_type.py b/atrcopy/media_type.py index fee688b..9aad14e 100644 --- a/atrcopy/media_type.py +++ b/atrcopy/media_type.py @@ -8,6 +8,7 @@ from . import errors from . import style_bits from .segment import Segment from .utils import to_numpy, to_numpy_list, uuid +from . import filesystem import logging log = logging.getLogger(__name__) @@ -76,9 +77,10 @@ class MediaType(Segment): """ pass - def find_filesystem(self): + def guess_filesystem(self): fs = filesystem.guess_filesystem(self) if fs: + self.filesystem = fs self.segments = list(fs.iter_segments()) @@ -92,20 +94,8 @@ class DiskImage(MediaType): self.num_sectors = 0 MediaType.__init__(self, container) - def __str__(self): - return f"{self.pretty_name}, size={len(self)} ({self.num_sectors}x{self.sector_size}B)" - - @property - def verbose_info(self): - name = self.verbose_name or self.name - if self.num_sectors > 1: - s = "%s (sectors %d-%d)" % (name, self.first_sector, self.first_sector + self.num_sectors - 1) - else: - s = "%s (sector %d)" % (name, self.first_sector) - s += " $%x bytes" % (len(self), ) - if self.error: - s += " error='%s'" % self.error - return s + # def __str__(self): + # return f"{self.pretty_name}, size={len(self)} ({self.num_sectors}x{self.sector_size}B)" #### verification @@ -129,36 +119,51 @@ class DiskImage(MediaType): return "s%03d:%02x" % (sector + self.first_sector, byte) return "s%03d:%02X" % (sector + self.first_sector, byte) - def sector_is_valid(self, sector): + def is_sector_valid(self, sector): return (self.num_sectors < 0) or (sector >= self.starting_sector_label and sector < (self.num_sectors + self.starting_sector_label)) def get_index_of_sector(self, sector): - if not self.sector_is_valid(sector): + if not self.is_sector_valid(sector): raise errors.ByteNotInFile166("Sector %d out of range" % sector) pos = (sector - self.starting_sector_label) * self.sector_size return pos, self.sector_size - def get_contiguous_sectors(self, start, count): + def get_contiguous_sectors_offsets(self, start, count=1): index, _ = self.get_index_of_sector(start) last, size = self.get_index_of_sector(start + count - 1) - return Segment(self, index, length=(last + size - index)) + return index, last + size - index - def get_sector_list(self, sector_numbers): + def get_contiguous_sectors(self, start, count=1): + start, size = self.get_contiguous_sectors_offsets(start, count) + return Segment(self, start, length=size) + + def get_sector_list_offsets(self, sector_numbers): offsets = np.empty(len(sector_numbers) * self.sector_size, dtype=np.uint32) i = 0 for num in sector_numbers: index, size = self.get_index_of_sector(num) offsets[i:i+size] = np.arange(index, index + size) i += size + return offsets + + def get_sector_list(self, sector_numbers): + offsets = self.get_sector_list_offsets(sector_numbers) return Segment(self, offsets) + def iter_sectors(self): + i = self.starting_sector_label + while self.is_sector_valid(i): + pos, size = self.get_index_of_sector(i) + yield i, pos, size + i += 1 + class CartImage(MediaType): pretty_name = "Cart Image" expected_size = 0 - def __str__(self): - return f"{len(self) // 1024}K {self.pretty_name}" + # def __str__(self): + # return f"{len(self) // 1024}K {self.pretty_name}" def check_media_size(self): size = len(self) diff --git a/setup.py b/setup.py index c6edfd1..e07dd2e 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,10 @@ setup(name="atrcopy", 'atari_carts = atrcopy.media_types.atari_carts', 'apple_disks = atrcopy.media_types.apple_disks', ], + + "atrcopy.filesystems": [ + 'atari_dos = atrcopy.filesystems.atari_dos2', + ], }, description="Utility to manage file systems on Atari 8-bit (DOS 2) and Apple ][ (DOS 3.3) disk images.", long_description=long_description, diff --git a/test/test_filesystems.py b/test/test_filesystems.py new file mode 100644 index 0000000..2b495d8 --- /dev/null +++ b/test/test_filesystems.py @@ -0,0 +1,72 @@ +import glob + +import numpy as np + +from mock import * + +from atrcopy.container import guess_container +from atrcopy.media_type import MediaType, guess_media_type +from atrcopy import errors + +from atrcopy.media_types.atari_disks import * +from atrcopy.media_types.apple_disks import * + +ext_to_valid_types = { + '.atr': set([ + AtariDoubleDensity, + AtariDoubleDensityHardDriveImage, + AtariDoubleDensityShortBootSectors, + AtariEnhancedDensity, + AtariSingleDensity, + AtariSingleDensityShortImage, + ]), + '.dsk': set([ + Apple16SectorDiskImage, + ]), +} + +class TestAtariDos2: + base_path = None + expected_mime = "" + + @pytest.mark.parametrize("pathname", sorted(glob.glob(os.path.join(os.path.dirname(__file__), "../test_data/", "*")))) + def test_test_data_dir(self, pathname): + wrapped, ext = os.path.splitext(pathname) + print(f"checking {pathname}") + sample_data = np.fromfile(pathname, dtype=np.uint8) + container = guess_container(sample_data) + if container.compression_algorithm != "no compression": + _, ext = os.path.splitext(wrapped) + container.guess_media_type() + print(ext, ext_to_valid_types) + if ext in ext_to_valid_types: + assert container.media.__class__ in ext_to_valid_types[ext] + else: + assert container.media.__class__ == MediaType + + +if __name__ == "__main__": + import logging + logging.basicConfig(level=logging.WARNING) + log = logging.getLogger("atrcopy.media_type") + log.setLevel(logging.DEBUG) + + def check(pathname): + print(f"checking {pathname}") + sample_data = np.fromfile(pathname, dtype=np.uint8) + container = guess_container(sample_data) + container.guess_media_type() + print(container.verbose_info) + media = container.media + media.guess_filesystem() + print(media.filesystem) + print(container.verbose_info) + + import sys + import glob + if len(sys.argv) > 1: + images = sys.argv[1:] + else: + images = sorted(glob.glob(os.path.join(os.path.dirname(__file__), "../test_data/", "*"))) + for pathname in images: + check(pathname)