mirror of
https://github.com/robmcmullen/atrcopy.git
synced 2025-04-11 06:37:10 +00:00
Added Atari DOS filesystem parser
This commit is contained in:
parent
3897da6dc4
commit
92fb9986b6
382
atrcopy/filesystem.py
Normal file
382
atrcopy/filesystem.py
Normal file
@ -0,0 +1,382 @@
|
||||
import hashlib
|
||||
import inspect
|
||||
import pkg_resources
|
||||
|
||||
import numpy as np
|
||||
|
||||
from . import errors
|
||||
from . import style_bits
|
||||
from .segment import Segment
|
||||
from .utils import to_numpy, to_numpy_list, uuid
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
try: # Expensive debugging
|
||||
_xd = _expensive_debugging
|
||||
except NameError:
|
||||
_xd = False
|
||||
|
||||
|
||||
class Filesystem:
|
||||
"""Base class for a "filesystem", which takes a source segment and
|
||||
subdivides it into a set of segments where each represents a file. Some
|
||||
auxiliary segments include a `VTOC` and a list of 'dirent's that point to
|
||||
files.
|
||||
"""
|
||||
pretty_name = "Filesystem"
|
||||
|
||||
extra_serializable_attributes = []
|
||||
|
||||
def __init__(self, media):
|
||||
self.check_media(media)
|
||||
self.media = media
|
||||
self.boot = self.calc_boot_segment()
|
||||
self.vtoc = self.calc_vtoc_segment()
|
||||
self.directory = self.calc_directory_segment()
|
||||
|
||||
#### initialization
|
||||
|
||||
def check_media(self):
|
||||
"""Subclasses should override this method to verify the media type is
|
||||
supported by the filesystem.
|
||||
|
||||
Subclasses should raise IncompatibleMediaError if the filesystem is not
|
||||
possible on this media, for instance attempting to use a disk
|
||||
filesystem on a cassette media image.
|
||||
"""
|
||||
pass
|
||||
|
||||
def calc_boot_segment(self):
|
||||
"""Subclasses should override this method to create a boot segment if
|
||||
the filesystem supports one and it is present.
|
||||
|
||||
If it is present, return a single `Segment` instance comprising the
|
||||
entire set of data, and use sub-segments if more detail is present.
|
||||
|
||||
If this feature is not present, return None.
|
||||
|
||||
Subclasses should raise the appropriate FilesystemError if the data is
|
||||
incompatible with this filesystem.
|
||||
"""
|
||||
pass
|
||||
|
||||
def calc_vtoc_segment(self):
|
||||
"""Subclasses should override this method to create a VTOC segment if
|
||||
the filesystem supports one and it is present.
|
||||
|
||||
If it is present, return a single `Segment` instance comprising the
|
||||
entire set of data, and use sub-segments if more detail is present.
|
||||
|
||||
If this feature is not present, return None.
|
||||
|
||||
Subclasses should raise the appropriate FilesystemError if the data is
|
||||
incompatible with this filesystem.
|
||||
"""
|
||||
pass
|
||||
|
||||
def calc_directory_segment(self):
|
||||
"""Subclasses should override this method to create a directory segment
|
||||
if the filesystem supports one and it is present.
|
||||
|
||||
If it is present, return a single `Segment` instance comprising the
|
||||
entire set of data, and use sub-segments if more detail is present.
|
||||
|
||||
If this feature is not present, return None.
|
||||
|
||||
Subclasses should raise the appropriate FilesystemError if the data is
|
||||
incompatible with this filesystem.
|
||||
"""
|
||||
pass
|
||||
|
||||
####
|
||||
|
||||
def iter_segments(self):
|
||||
if self.boot is not None:
|
||||
yield self.boot
|
||||
if self.vtoc is not None:
|
||||
yield self.vtoc
|
||||
if self.directory is not None:
|
||||
yield self.directory
|
||||
|
||||
|
||||
class Dirent(Segment):
|
||||
"""Abstract base class for a directory entry
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, filesystem, parent, file_num, start, length):
|
||||
self.filesystem = filesystem
|
||||
self.file_num = file_num
|
||||
Segment.__init__(self, parent, start, name=f"Dirent {file_num}", length=length)
|
||||
|
||||
def __eq__(self, other):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
@property
|
||||
def in_use(self):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def extra_metadata(self, image):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def mark_deleted(self):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def parse_raw_dirent(self, image, bytes):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def encode_dirent(self):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def get_sectors_in_vtoc(self, image):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def start_read(self, image):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def read_sector(self, image):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
|
||||
class Directory(Segment):
|
||||
def __init__(self, filesystem):
|
||||
self.filesystem = filesystem
|
||||
offset, length = self.find_segment_location()
|
||||
Segment.__init__(self, filesystem.media, offset, name="Directory", length=length)
|
||||
|
||||
# Each segment is a dirent
|
||||
self.segments = self.calc_dirents()
|
||||
|
||||
@property
|
||||
def media(self):
|
||||
return self.filesystem.media
|
||||
|
||||
def find_segment_location(self):
|
||||
raise NotImplementedError("Subclasses must define this to declare where the directory segment is located in the media image")
|
||||
|
||||
def calc_dirents(self):
|
||||
raise NotImplementedError("Subclasses must define this to generate a list of Dirent segments")
|
||||
|
||||
def set(self, index, dirent):
|
||||
self.segments[index] = dirent
|
||||
if _xd: log.debug("set dirent #%d: %s" % (index, dirent))
|
||||
|
||||
def get_free_dirent(self):
|
||||
used = set()
|
||||
d = list(self.segments.items())
|
||||
if d:
|
||||
d.sort()
|
||||
for i, dirent in d:
|
||||
if not dirent.in_use:
|
||||
return i
|
||||
used.add(i)
|
||||
if self.num_dirents > 0 and (len(used) >= self.num_dirents):
|
||||
raise errors.NoSpaceInDirectory()
|
||||
i += 1
|
||||
else:
|
||||
i = 0
|
||||
used.add(i)
|
||||
return i
|
||||
|
||||
def add_dirent(self, filename, filetype):
|
||||
index = self.get_free_dirent()
|
||||
dirent = self.dirent_class(None)
|
||||
dirent.set_values(filename, filetype, index)
|
||||
self.set(index, dirent)
|
||||
return dirent
|
||||
|
||||
def find_dirent(self, filename):
|
||||
if hasattr(filename, "filename"):
|
||||
# we've been passed a dirent instead of a filename
|
||||
for dirent in list(self.segments.values()):
|
||||
if dirent == filename:
|
||||
return dirent
|
||||
else:
|
||||
for dirent in list(self.segments.values()):
|
||||
if filename == dirent.filename:
|
||||
return dirent
|
||||
raise errors.FileNotFound("%s not found on disk" % filename)
|
||||
|
||||
def save_dirent(self, image, dirent, vtoc, sector_list):
|
||||
vtoc.assign_sector_numbers(dirent, sector_list)
|
||||
dirent.add_metadata_sectors(vtoc, sector_list, image.header)
|
||||
dirent.update_sector_info(sector_list)
|
||||
self.calc_sectors(image)
|
||||
|
||||
def remove_dirent(self, image, dirent, vtoc, sector_list):
|
||||
vtoc.free_sector_list(sector_list)
|
||||
dirent.mark_deleted()
|
||||
self.calc_sectors(image)
|
||||
|
||||
@property
|
||||
def dirent_class(self):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def calc_sectors(self, image):
|
||||
self.sectors = []
|
||||
self.current_sector = self.get_dirent_sector()
|
||||
self.encode_index = 0
|
||||
|
||||
d = list(self.segments.items())
|
||||
d.sort()
|
||||
# there may be gaps, so fill in missing entries with blanks
|
||||
current = 0
|
||||
for index, dirent in d:
|
||||
for missing in range(current, index):
|
||||
if _xd: log.debug("Encoding empty dirent at %d" % missing)
|
||||
data = self.encode_empty()
|
||||
self.store_encoded(data)
|
||||
if _xd: log.debug("Encoding dirent: %s" % dirent)
|
||||
data = self.encode_dirent(dirent)
|
||||
self.store_encoded(data)
|
||||
current = index + 1
|
||||
self.finish_encoding(image)
|
||||
|
||||
def get_dirent_sector(self):
|
||||
return self.sector_class(self.sector_size)
|
||||
|
||||
def encode_empty(self):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def encode_dirent(self, dirent):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def store_encoded(self, data):
|
||||
while True:
|
||||
if _xd: log.debug("store_encoded: %d bytes in %s" % (len(data), self.current_sector))
|
||||
data = self.current_sector.add_data(data)
|
||||
if len(data) > 0:
|
||||
self.sectors.append(self.current_sector)
|
||||
self.current_sector = self.get_dirent_sector()
|
||||
else:
|
||||
break
|
||||
|
||||
def finish_encoding(self, image):
|
||||
if not self.current_sector.is_empty:
|
||||
self.sectors.append(self.current_sector)
|
||||
self.set_sector_numbers(image)
|
||||
|
||||
def set_sector_numbers(self, image):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
|
||||
class VTOC(Segment):
|
||||
def __init__(self, filesystem):
|
||||
self.filesystem = filesystem
|
||||
offset, length = self.find_segment_location()
|
||||
Segment.__init__(self, filesystem.media, offset, name="VTOC", length=length)
|
||||
|
||||
# sector map: 1 is free, 0 is allocated
|
||||
self.sector_map = np.zeros([filesystem.media.num_sectors], dtype=np.uint8)
|
||||
self.unpack_vtoc()
|
||||
|
||||
@property
|
||||
def media(self):
|
||||
return self.filesystem.media
|
||||
|
||||
def find_segment_location(self):
|
||||
"""Calculate the location on the media for the VTOC. Return either
|
||||
sector number and count, or offset list
|
||||
"""
|
||||
raise NotImplementedError("Subclasses must define this to declare where the directory segment is located in the media image")
|
||||
|
||||
# def __str__(self):
|
||||
# return "%s\n (%d free)" % ("\n".join(["track %02d: %s" % (i, line) for i, line in enumerate(str(self.sector_map[self.header.starting_sector_label:(self.header.tracks_per_disk*self.header.sectors_per_track) + self.header.starting_sector_label].reshape([self.header.tracks_per_disk,self.header.sectors_per_track])).splitlines())]), self.num_free_sectors)
|
||||
|
||||
def unpack_vtoc(self):
|
||||
"""Using the bit-encoded data, unpack it into the sector_map array
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def pack_vtoc(self):
|
||||
"""Pack the sector_map array into the segment
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def num_free_sectors(self):
|
||||
free = np.where(self.sector_map == 1)[0]
|
||||
return len(free)
|
||||
|
||||
def iter_free_sectors(self):
|
||||
for i, pos, size in self.filesystem.media.iter_sectors():
|
||||
if self.sector_map[i] == 1:
|
||||
yield i, pos, size
|
||||
|
||||
def assign_sector_numbers(self, dirent, sector_list):
|
||||
""" Map out the sectors and link the sectors together
|
||||
|
||||
raises NotEnoughSpaceOnDisk if the whole file won't fit. It will not
|
||||
allow partial writes.
|
||||
"""
|
||||
num = len(sector_list)
|
||||
order = self.reserve_space(num)
|
||||
if len(order) != num:
|
||||
raise errors.InvalidFile("VTOC reserved space for %d sectors. Sectors needed: %d" % (len(order), num))
|
||||
file_length = 0
|
||||
last_sector = None
|
||||
for sector, sector_num in zip(sector_list.sectors, order):
|
||||
sector.sector_num = sector_num
|
||||
sector.file_num = dirent.file_num
|
||||
file_length += sector.used
|
||||
if last_sector is not None:
|
||||
last_sector.next_sector_num = sector_num
|
||||
last_sector = sector
|
||||
if last_sector is not None:
|
||||
last_sector.next_sector_num = 0
|
||||
sector_list.file_length = file_length
|
||||
|
||||
def reserve_space(self, num):
|
||||
order = []
|
||||
for i in range(num):
|
||||
order.append(self.get_next_free_sector())
|
||||
if _xd: log.debug("Sectors reserved: %s" % order)
|
||||
self.calc_bitmap()
|
||||
return order
|
||||
|
||||
def get_next_free_sector(self):
|
||||
free = np.nonzero(self.sector_map)[0]
|
||||
if len(free) > 0:
|
||||
num = free[0]
|
||||
if _xd: log.debug("Found sector %d free" % num)
|
||||
self.sector_map[num] = 0
|
||||
return num
|
||||
raise errors.NotEnoughSpaceOnDisk("No space left in VTOC")
|
||||
|
||||
def calc_bitmap(self):
|
||||
raise errors.NotImplementedError
|
||||
|
||||
def free_sector_list(self, sector_list):
|
||||
for sector in sector_list:
|
||||
self.sector_map[sector.sector_num] = 1
|
||||
self.calc_bitmap()
|
||||
|
||||
|
||||
def find_filesystems():
|
||||
filesystems = []
|
||||
for entry_point in pkg_resources.iter_entry_points('atrcopy.filesystems'):
|
||||
mod = entry_point.load()
|
||||
log.debug(f"find_filesystems: Found module {entry_point.name}={mod.__name__}")
|
||||
for name, obj in inspect.getmembers(mod):
|
||||
if inspect.isclass(obj) and Filesystem in obj.__mro__[1:]:
|
||||
log.debug(f"find_filesystems: found media_type class {name}")
|
||||
filesystems.append(obj)
|
||||
return filesystems
|
||||
|
||||
|
||||
def guess_filesystem(segment, verbose=False):
|
||||
for f in find_filesystems():
|
||||
if verbose:
|
||||
log.info(f"trying filesystem {f}")
|
||||
try:
|
||||
found = f(segment)
|
||||
except errors.FilesystemError as e:
|
||||
log.debug(f"found error: {e}")
|
||||
continue
|
||||
else:
|
||||
if verbose:
|
||||
log.info(f"found filesystem {f}")
|
||||
return found
|
||||
log.info(f"No recognized filesystem.")
|
||||
return None
|
0
atrcopy/filesystems/__init__.py
Normal file
0
atrcopy/filesystems/__init__.py
Normal file
323
atrcopy/filesystems/atari_dos2.py
Normal file
323
atrcopy/filesystems/atari_dos2.py
Normal file
@ -0,0 +1,323 @@
|
||||
import numpy as np
|
||||
|
||||
from .. import errors
|
||||
from ..segment import Segment
|
||||
from ..filesystem import VTOC, Dirent, Directory, Filesystem
|
||||
|
||||
try: # Expensive debugging
|
||||
_xd = _expensive_debugging
|
||||
except NameError:
|
||||
_xd = False
|
||||
|
||||
|
||||
class AtariDosBootSegment(Segment):
|
||||
boot_record_type = np.dtype([
|
||||
('BFLAG', 'u1'),
|
||||
('BRCNT', 'u1'),
|
||||
('BLDADR', '<u2'),
|
||||
('BWTARR', '<u2'),
|
||||
])
|
||||
|
||||
def __init__(self, filesystem):
|
||||
media = filesystem.media
|
||||
size = self.find_segment_size(media)
|
||||
Segment.__init__(self, media, 0, self.bldadr, name="Boot Sectors", length=size)
|
||||
self.segments = self.calc_boot_segments()
|
||||
|
||||
def find_segment_size(self, media):
|
||||
self.first_sector = media.get_contiguous_sectors(1)
|
||||
self.values = media[0:6].view(dtype=self.boot_record_type)[0]
|
||||
self.bflag = self.values['BFLAG']
|
||||
if self.bflag == 0:
|
||||
# possible boot sector
|
||||
self.brcnt = self.values['BRCNT']
|
||||
if self.brcnt == 0:
|
||||
self.brcnt = 3
|
||||
else:
|
||||
self.brcnt = 3
|
||||
self.bldadr = self.values['BLDADR']
|
||||
index, _ = media.get_index_of_sector(1 + self.brcnt)
|
||||
return index
|
||||
|
||||
def calc_boot_segments(self):
|
||||
header = Segment(self, 0, self.bldadr, "Boot Header", length=6)
|
||||
code = Segment(self, 6, self.bldadr + 6, name="Boot Code", length=len(self) - 6)
|
||||
return [header, code]
|
||||
|
||||
|
||||
class AtariDos2VTOC(VTOC):
|
||||
vtoc_type = np.dtype([
|
||||
('code', 'u1'),
|
||||
('total','<u2'),
|
||||
('unused','<u2'),
|
||||
])
|
||||
|
||||
def find_segment_location(self):
|
||||
media = self.media
|
||||
values = media[0:5].view(dtype=self.vtoc_type)[0]
|
||||
code = values[0]
|
||||
if code == 0 or code == 2:
|
||||
num = 1
|
||||
else:
|
||||
num = (code * 2) - 3
|
||||
self.first_vtoc = 360 - num + 1
|
||||
if not media.is_sector_valid(self.first_vtoc):
|
||||
raise errors.FilesystemError(f"Invalid first VTOC sector {self.first_vtoc}")
|
||||
self.num_vtoc = num
|
||||
if num < 0 or num > self.calc_vtoc_code():
|
||||
raise errors.InvalidDiskImage(f"Invalid number of VTOC sectors: {num}")
|
||||
self.total_sectors = values[1]
|
||||
self.unused_sectors = values[2]
|
||||
return media.get_contiguous_sectors_offsets(self.first_vtoc, self.num_vtoc)
|
||||
|
||||
def unpack_vtoc(self):
|
||||
bits = np.unpackbits(self[0x0a:0x64])
|
||||
self.sector_map[0:720] = bits
|
||||
if _xd: log.debug("vtoc before:\n%s" % str(self))
|
||||
|
||||
def pack_vtoc(self):
|
||||
if _xd: log.debug("vtoc after:\n%s" % str(self))
|
||||
packed = np.packbits(self.sector_map[0:720])
|
||||
self[0x0a:0x64] = packed
|
||||
|
||||
def calc_vtoc_code(self):
|
||||
# From AA post: http://atariage.com/forums/topic/179868-mydos-vtoc-size/
|
||||
media = self.filesystem.media
|
||||
num = 1 + (media.num_sectors + 80) // (media.sector_size * 8)
|
||||
if media.sector_size == 128:
|
||||
if num == 1:
|
||||
code = 2
|
||||
else:
|
||||
if num & 1:
|
||||
num += 1
|
||||
code = ((num + 1) // 2) + 2
|
||||
else:
|
||||
if media.num_sectors < 1024:
|
||||
code = 2
|
||||
else:
|
||||
code = 2 + num
|
||||
return code
|
||||
|
||||
|
||||
class AtariDosDirent(Dirent):
|
||||
# ATR Dirent structure described at http://atari.kensclassics.org/dos.htm
|
||||
format = np.dtype([
|
||||
('FLAG', 'u1'),
|
||||
('COUNT', '<u2'),
|
||||
('START', '<u2'),
|
||||
('NAME','S8'),
|
||||
('EXT','S3'),
|
||||
])
|
||||
|
||||
def __init__(self, filesystem, parent, file_num, start):
|
||||
Dirent.__init__(self, filesystem, parent, file_num, start, 16)
|
||||
self.flag = 0
|
||||
self.opened_output = False
|
||||
self.dos_2 = False
|
||||
self.mydos = False
|
||||
self.is_dir = False
|
||||
self.locked = False
|
||||
self._in_use = False
|
||||
self.deleted = False
|
||||
self.num_sectors = 0
|
||||
self.starting_sector = 0
|
||||
self.basename = b''
|
||||
self.ext = b''
|
||||
self.is_sane = True
|
||||
self.current_sector = 0
|
||||
self.current_read = 0
|
||||
self.sectors_seen = None
|
||||
self.parse_raw_dirent()
|
||||
|
||||
def __str__(self):
|
||||
return "File #%-2d (%s) %03d %-8s%-3s %03d" % (self.file_num, self.summary, self.starting_sector, self.basename.decode("latin1"), self.ext.decode("latin1"), self.num_sectors)
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.__class__ == other.__class__ and self.filename == other.filename and self.starting_sector == other.starting_sector and self.num_sectors == other.num_sectors
|
||||
|
||||
@property
|
||||
def in_use(self):
|
||||
return self._in_use
|
||||
|
||||
@property
|
||||
def filename(self):
|
||||
ext = (b'.' + self.ext) if self.ext else b''
|
||||
return (self.basename + ext).decode('latin1')
|
||||
|
||||
@property
|
||||
def summary(self):
|
||||
output = "o" if self.opened_output else "."
|
||||
dos2 = "2" if self.dos_2 else "."
|
||||
mydos = "m" if self.mydos else "."
|
||||
in_use = "u" if self._in_use else "."
|
||||
deleted = "d" if self.deleted else "."
|
||||
locked = "*" if self.locked else " "
|
||||
flags = "%s%s%s%s%s%s" % (output, dos2, mydos, in_use, deleted, locked)
|
||||
return flags
|
||||
|
||||
@property
|
||||
def verbose_info(self):
|
||||
flags = []
|
||||
if self.opened_output: flags.append("OUT")
|
||||
if self.dos_2: flags.append("DOS2")
|
||||
if self.mydos: flags.append("MYDOS")
|
||||
if self._in_use: flags.append("IN_USE")
|
||||
if self.deleted: flags.append("DEL")
|
||||
if self.locked: flags.append("LOCK")
|
||||
return "flags=[%s]" % ", ".join(flags)
|
||||
|
||||
def extra_metadata(self, image):
|
||||
return self.verbose_info
|
||||
|
||||
def parse_raw_dirent(self):
|
||||
data = self.data[0:16]
|
||||
values = data.view(dtype=self.format)[0]
|
||||
flag = values[0]
|
||||
self.flag = flag
|
||||
self.opened_output = (flag&0x01) > 0
|
||||
self.dos_2 = (flag&0x02) > 0
|
||||
self.mydos = (flag&0x04) > 0
|
||||
self.is_dir = (flag&0x10) > 0
|
||||
self.locked = (flag&0x20) > 0
|
||||
self._in_use = (flag&0x40) > 0
|
||||
self.deleted = (flag&0x80) > 0
|
||||
self.num_sectors = int(values[1])
|
||||
self.starting_sector = int(values[2])
|
||||
self.basename = bytes(values[3]).rstrip()
|
||||
self.ext = bytes(values[4]).rstrip()
|
||||
self.is_sane = self.sanity_check()
|
||||
|
||||
def encode_dirent(self):
|
||||
data = np.zeros([self.format.itemsize], dtype=np.uint8)
|
||||
values = data.view(dtype=self.format)[0]
|
||||
flag = (1 * int(self.opened_output)) | (2 * int(self.dos_2)) | (4 * int(self.mydos)) | (0x10 * int(self.is_dir)) | (0x20 * int(self.locked)) | (0x40 * int(self._in_use)) | (0x80 * int(self.deleted))
|
||||
values[0] = flag
|
||||
values[1] = self.num_sectors
|
||||
values[2] = self.starting_sector
|
||||
values[3] = self.basename
|
||||
values[4] = self.ext
|
||||
return data
|
||||
|
||||
def mark_deleted(self):
|
||||
self.deleted = True
|
||||
self._in_use = False
|
||||
|
||||
def update_sector_info(self, sector_list):
|
||||
self.num_sectors = sector_list.num_sectors
|
||||
self.starting_sector = sector_list.first_sector
|
||||
|
||||
def add_metadata_sectors(self, vtoc, sector_list, header):
|
||||
# no extra sectors are needed for an Atari DOS file; the links to the
|
||||
# next sector is contained in the sector.
|
||||
pass
|
||||
|
||||
def sanity_check(self):
|
||||
media = self.filesystem.media
|
||||
if not self._in_use:
|
||||
return True
|
||||
if not media.is_sector_valid(self.starting_sector):
|
||||
return False
|
||||
if self.num_sectors < 0 or self.num_sectors > media.num_sectors:
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_sectors_in_vtoc(self, image):
|
||||
sector_list = BaseSectorList(image.header)
|
||||
self.start_read(image)
|
||||
while True:
|
||||
sector = WriteableSector(image.header.sector_size, None, self.current_sector)
|
||||
sector_list.append(sector)
|
||||
_, last, _, _ = self.read_sector(image)
|
||||
if last:
|
||||
break
|
||||
return sector_list
|
||||
|
||||
def start_read(self, image):
|
||||
if not self.is_sane:
|
||||
raise errors.InvalidDirent("Invalid directory entry '%s'" % str(self))
|
||||
self.current_sector = self.starting_sector
|
||||
self.current_read = self.num_sectors
|
||||
self.sectors_seen = set()
|
||||
|
||||
def read_sector(self, image):
|
||||
raw, pos, size = image.get_raw_bytes(self.current_sector)
|
||||
bytes, num_data_bytes = self.process_raw_sector(image, raw)
|
||||
return bytes, self.current_sector == 0, pos, num_data_bytes
|
||||
|
||||
def process_raw_sector(self, image, raw):
|
||||
file_num = raw[-3] >> 2
|
||||
if file_num != self.file_num:
|
||||
raise errors.FileNumberMismatchError164("Expecting file %d, found %d" % (self.file_num, file_num))
|
||||
self.sectors_seen.add(self.current_sector)
|
||||
next_sector = ((raw[-3] & 0x3) << 8) + raw[-2]
|
||||
if next_sector in self.sectors_seen:
|
||||
raise errors.InvalidFile("Bad sector pointer data: attempting to reread sector %d" % next_sector)
|
||||
self.current_sector = next_sector
|
||||
num_bytes = raw[-1]
|
||||
return raw[0:num_bytes], num_bytes
|
||||
|
||||
def set_values(self, filename, filetype, index):
|
||||
if type(filename) is not bytes:
|
||||
filename = filename.encode("latin1")
|
||||
if b'.' in filename:
|
||||
filename, ext = filename.split(b'.', 1)
|
||||
else:
|
||||
ext = b' '
|
||||
self.basename = b'%-8s' % filename[0:8]
|
||||
self.ext = ext
|
||||
self.file_num = index
|
||||
self.dos_2 = True
|
||||
self._in_use = True
|
||||
if _xd: log.debug("set_values: %s" % self)
|
||||
|
||||
|
||||
class AtariDos2Directory(Directory):
|
||||
def __init__(self, filesystem):
|
||||
self.filesystem = filesystem
|
||||
offset, length = self.find_segment_location()
|
||||
Segment.__init__(self, filesystem.media, offset, name="Directory", length=length)
|
||||
|
||||
# Each segment is a dirent
|
||||
self.segments = self.calc_dirents()
|
||||
|
||||
def find_segment_location(self):
|
||||
media = self.media
|
||||
if media.is_sector_valid(361):
|
||||
return media.get_contiguous_sectors_offsets(361, 8)
|
||||
else:
|
||||
raise errors.FilesystemError("Disk image too small to contain a directory")
|
||||
|
||||
def calc_dirents(self):
|
||||
segments = []
|
||||
index = 0
|
||||
for filenum in range(64):
|
||||
dirent = AtariDosDirent(self.filesystem, self, filenum, index)
|
||||
if not dirent.in_use:
|
||||
continue
|
||||
dirent.set_comment_at(0x00, "FILE #%d: Flag" % filenum)
|
||||
dirent.set_comment_at(0x01, "FILE #%d: Number of sectors in file" % filenum)
|
||||
dirent.set_comment_at(0x03, "FILE #%d: Starting sector number" % filenum)
|
||||
dirent.set_comment_at(0x05, "FILE #%d: Filename" % filenum)
|
||||
dirent.set_comment_at(0x0d, "FILE #%d: Extension" % filenum)
|
||||
index += 16
|
||||
segments.append(dirent)
|
||||
return segments
|
||||
|
||||
|
||||
class AtariDos2(Filesystem):
|
||||
default_executable_extension = "XEX"
|
||||
|
||||
def check_media(self, media):
|
||||
try:
|
||||
media.get_contiguous_sectors
|
||||
except AttributeError:
|
||||
raise errors.IncompatibleMediaError("Atari DOS needs sector access")
|
||||
|
||||
def calc_boot_segment(self):
|
||||
return AtariDosBootSegment(self)
|
||||
|
||||
def calc_vtoc_segment(self):
|
||||
return AtariDos2VTOC(self)
|
||||
|
||||
def calc_directory_segment(self):
|
||||
return AtariDos2Directory(self)
|
@ -8,6 +8,7 @@ from . import errors
|
||||
from . import style_bits
|
||||
from .segment import Segment
|
||||
from .utils import to_numpy, to_numpy_list, uuid
|
||||
from . import filesystem
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
@ -76,9 +77,10 @@ class MediaType(Segment):
|
||||
"""
|
||||
pass
|
||||
|
||||
def find_filesystem(self):
|
||||
def guess_filesystem(self):
|
||||
fs = filesystem.guess_filesystem(self)
|
||||
if fs:
|
||||
self.filesystem = fs
|
||||
self.segments = list(fs.iter_segments())
|
||||
|
||||
|
||||
@ -92,20 +94,8 @@ class DiskImage(MediaType):
|
||||
self.num_sectors = 0
|
||||
MediaType.__init__(self, container)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.pretty_name}, size={len(self)} ({self.num_sectors}x{self.sector_size}B)"
|
||||
|
||||
@property
|
||||
def verbose_info(self):
|
||||
name = self.verbose_name or self.name
|
||||
if self.num_sectors > 1:
|
||||
s = "%s (sectors %d-%d)" % (name, self.first_sector, self.first_sector + self.num_sectors - 1)
|
||||
else:
|
||||
s = "%s (sector %d)" % (name, self.first_sector)
|
||||
s += " $%x bytes" % (len(self), )
|
||||
if self.error:
|
||||
s += " error='%s'" % self.error
|
||||
return s
|
||||
# def __str__(self):
|
||||
# return f"{self.pretty_name}, size={len(self)} ({self.num_sectors}x{self.sector_size}B)"
|
||||
|
||||
#### verification
|
||||
|
||||
@ -129,36 +119,51 @@ class DiskImage(MediaType):
|
||||
return "s%03d:%02x" % (sector + self.first_sector, byte)
|
||||
return "s%03d:%02X" % (sector + self.first_sector, byte)
|
||||
|
||||
def sector_is_valid(self, sector):
|
||||
def is_sector_valid(self, sector):
|
||||
return (self.num_sectors < 0) or (sector >= self.starting_sector_label and sector < (self.num_sectors + self.starting_sector_label))
|
||||
|
||||
def get_index_of_sector(self, sector):
|
||||
if not self.sector_is_valid(sector):
|
||||
if not self.is_sector_valid(sector):
|
||||
raise errors.ByteNotInFile166("Sector %d out of range" % sector)
|
||||
pos = (sector - self.starting_sector_label) * self.sector_size
|
||||
return pos, self.sector_size
|
||||
|
||||
def get_contiguous_sectors(self, start, count):
|
||||
def get_contiguous_sectors_offsets(self, start, count=1):
|
||||
index, _ = self.get_index_of_sector(start)
|
||||
last, size = self.get_index_of_sector(start + count - 1)
|
||||
return Segment(self, index, length=(last + size - index))
|
||||
return index, last + size - index
|
||||
|
||||
def get_sector_list(self, sector_numbers):
|
||||
def get_contiguous_sectors(self, start, count=1):
|
||||
start, size = self.get_contiguous_sectors_offsets(start, count)
|
||||
return Segment(self, start, length=size)
|
||||
|
||||
def get_sector_list_offsets(self, sector_numbers):
|
||||
offsets = np.empty(len(sector_numbers) * self.sector_size, dtype=np.uint32)
|
||||
i = 0
|
||||
for num in sector_numbers:
|
||||
index, size = self.get_index_of_sector(num)
|
||||
offsets[i:i+size] = np.arange(index, index + size)
|
||||
i += size
|
||||
return offsets
|
||||
|
||||
def get_sector_list(self, sector_numbers):
|
||||
offsets = self.get_sector_list_offsets(sector_numbers)
|
||||
return Segment(self, offsets)
|
||||
|
||||
def iter_sectors(self):
|
||||
i = self.starting_sector_label
|
||||
while self.is_sector_valid(i):
|
||||
pos, size = self.get_index_of_sector(i)
|
||||
yield i, pos, size
|
||||
i += 1
|
||||
|
||||
|
||||
class CartImage(MediaType):
|
||||
pretty_name = "Cart Image"
|
||||
expected_size = 0
|
||||
|
||||
def __str__(self):
|
||||
return f"{len(self) // 1024}K {self.pretty_name}"
|
||||
# def __str__(self):
|
||||
# return f"{len(self) // 1024}K {self.pretty_name}"
|
||||
|
||||
def check_media_size(self):
|
||||
size = len(self)
|
||||
|
4
setup.py
4
setup.py
@ -41,6 +41,10 @@ setup(name="atrcopy",
|
||||
'atari_carts = atrcopy.media_types.atari_carts',
|
||||
'apple_disks = atrcopy.media_types.apple_disks',
|
||||
],
|
||||
|
||||
"atrcopy.filesystems": [
|
||||
'atari_dos = atrcopy.filesystems.atari_dos2',
|
||||
],
|
||||
},
|
||||
description="Utility to manage file systems on Atari 8-bit (DOS 2) and Apple ][ (DOS 3.3) disk images.",
|
||||
long_description=long_description,
|
||||
|
72
test/test_filesystems.py
Normal file
72
test/test_filesystems.py
Normal file
@ -0,0 +1,72 @@
|
||||
import glob
|
||||
|
||||
import numpy as np
|
||||
|
||||
from mock import *
|
||||
|
||||
from atrcopy.container import guess_container
|
||||
from atrcopy.media_type import MediaType, guess_media_type
|
||||
from atrcopy import errors
|
||||
|
||||
from atrcopy.media_types.atari_disks import *
|
||||
from atrcopy.media_types.apple_disks import *
|
||||
|
||||
ext_to_valid_types = {
|
||||
'.atr': set([
|
||||
AtariDoubleDensity,
|
||||
AtariDoubleDensityHardDriveImage,
|
||||
AtariDoubleDensityShortBootSectors,
|
||||
AtariEnhancedDensity,
|
||||
AtariSingleDensity,
|
||||
AtariSingleDensityShortImage,
|
||||
]),
|
||||
'.dsk': set([
|
||||
Apple16SectorDiskImage,
|
||||
]),
|
||||
}
|
||||
|
||||
class TestAtariDos2:
|
||||
base_path = None
|
||||
expected_mime = ""
|
||||
|
||||
@pytest.mark.parametrize("pathname", sorted(glob.glob(os.path.join(os.path.dirname(__file__), "../test_data/", "*"))))
|
||||
def test_test_data_dir(self, pathname):
|
||||
wrapped, ext = os.path.splitext(pathname)
|
||||
print(f"checking {pathname}")
|
||||
sample_data = np.fromfile(pathname, dtype=np.uint8)
|
||||
container = guess_container(sample_data)
|
||||
if container.compression_algorithm != "no compression":
|
||||
_, ext = os.path.splitext(wrapped)
|
||||
container.guess_media_type()
|
||||
print(ext, ext_to_valid_types)
|
||||
if ext in ext_to_valid_types:
|
||||
assert container.media.__class__ in ext_to_valid_types[ext]
|
||||
else:
|
||||
assert container.media.__class__ == MediaType
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import logging
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
log = logging.getLogger("atrcopy.media_type")
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
def check(pathname):
|
||||
print(f"checking {pathname}")
|
||||
sample_data = np.fromfile(pathname, dtype=np.uint8)
|
||||
container = guess_container(sample_data)
|
||||
container.guess_media_type()
|
||||
print(container.verbose_info)
|
||||
media = container.media
|
||||
media.guess_filesystem()
|
||||
print(media.filesystem)
|
||||
print(container.verbose_info)
|
||||
|
||||
import sys
|
||||
import glob
|
||||
if len(sys.argv) > 1:
|
||||
images = sys.argv[1:]
|
||||
else:
|
||||
images = sorted(glob.glob(os.path.join(os.path.dirname(__file__), "../test_data/", "*")))
|
||||
for pathname in images:
|
||||
check(pathname)
|
Loading…
x
Reference in New Issue
Block a user