First code to add files to an Atari DOS image

This commit is contained in:
Rob McMullen 2017-02-21 19:25:47 -08:00
parent 0ba5c8546c
commit 767e76671b
5 changed files with 451 additions and 5 deletions

View File

@ -1,7 +1,7 @@
import numpy as np import numpy as np
from errors import * from errors import *
from diskimages import DiskImageBase from diskimages import DiskImageBase, Directory, VTOC, WriteableSector
from segments import EmptySegment, ObjSegment, RawSectorsSegment, DefaultSegment, SegmentSaver from segments import EmptySegment, ObjSegment, RawSectorsSegment, DefaultSegment, SegmentSaver
from utils import to_numpy from utils import to_numpy
@ -9,6 +9,59 @@ import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class AtariDosWriteableSector(WriteableSector):
@property
def next_sector_num(self):
return self._next_sector_num
@next_sector_num.setter
def next_sector_num(self, value):
self._next_sector_num = value
index = self.sector_size - 3
hi, lo = divmod(value, 256)
self.data[index] = self.used
self.data[index + 1] = lo
self.data[index + 2] = hi
log.debug("sector metadata for %d: %s" % (self._sector_num, self.data[index:index + 3]))
# file number will be added later when known.
class AtariDosVTOC(VTOC):
def parse_segments(self, segments):
self.vtoc1 = segments[0].data
bits = np.unpackbits(self.vtoc1[0x0a:0x64])
log.debug("vtoc before: %s" % bits)
self.sector_map[0:720] = bits
def calc_bitmap(self):
log.debug("vtoc after: %s" % self.sector_map[0:720])
packed = np.packbits(self.sector_map[0:720])
self.vtoc1[0x0a:0x64] = packed
s = WriteableSector(self.bytes_per_sector, self.vtoc1)
s.sector_num = 360
self.sectors.append(s)
class AtariDosDirectory(Directory):
@property
def dirent_class(self):
return AtariDosDirent
def encode_empty(self):
return np.zeros([16], dtype=np.uint8)
def encode_dirent(self, dirent):
data = dirent.encode_dirent()
log.debug("encoded dirent: %s" % data)
return data
def set_sector_numbers(self):
num = 361
for sector in self.sectors:
sector.sector_num = num
num += 1
class AtariDosDirent(object): class AtariDosDirent(object):
# ATR Dirent structure described at http://atari.kensclassics.org/dos.htm # ATR Dirent structure described at http://atari.kensclassics.org/dos.htm
format = np.dtype([ format = np.dtype([
@ -82,6 +135,23 @@ class AtariDosDirent(object):
self.filename = str(values[3]).rstrip() self.filename = str(values[3]).rstrip()
self.ext = str(values[4]).rstrip() self.ext = str(values[4]).rstrip()
self.is_sane = self.sanity_check(image) self.is_sane = self.sanity_check(image)
def encode_dirent(self):
data = np.zeros([16], dtype=np.uint8)
values = data.view(dtype=self.format)[0]
self.dos_2 = True
self.in_use = True
flag = (1 * int(self.opened_output)) | (2 * int(self.dos_2)) | (4 * int(self.mydos)) | (0x10 * int(self.is_dir)) | (0x20 * int(self.locked)) | (0x40 * int(self.in_use)) | (0x80 * int(self.deleted))
values[0] = flag
values[1] = self.num_sectors
values[2] = self.starting_sector
values[3] = self.filename
values[4] = self.ext
return data
def update_sector_info(self, sector_list):
self.num_sectors = sector_list.num_sectors
self.starting_sector = sector_list.first_sector
def sanity_check(self, image): def sanity_check(self, image):
if not self.in_use: if not self.in_use:
@ -120,6 +190,15 @@ class AtariDosDirent(object):
ext = ("." + self.ext) if self.ext else "" ext = ("." + self.ext) if self.ext else ""
return self.filename + ext return self.filename + ext
def set_values(self, filename, filetype, index):
if "." in filename:
filename, ext = filename.split(".", 1)
else:
ext = " "
self.filename = "%-8s" % filename[0:8]
self.ext = ext
self.file_num = index
class MydosDirent(AtariDosDirent): class MydosDirent(AtariDosDirent):
def process_raw_sector(self, image, raw): def process_raw_sector(self, image, raw):
@ -204,6 +283,26 @@ class AtariDosDiskImage(DiskImageBase):
self.first_data_after_vtoc = 369 self.first_data_after_vtoc = 369
DiskImageBase.__init__(self, *args, **kwargs) DiskImageBase.__init__(self, *args, **kwargs)
@property
def bytes_per_sector(self):
return self.header.sector_size
@property
def payload_bytes_per_sector(self):
return self.header.sector_size - 3
@property
def writeable_sector_class(self):
return AtariDosWriteableSector
@property
def vtoc_class(self):
return AtariDosVTOC
@property
def directory_class(self):
return AtariDosDirectory
def __str__(self): def __str__(self):
return "%s Atari DOS Format: %d usable sectors (%d free), %d files" % (self.header, self.total_sectors, self.unused_sectors, len(self.files)) return "%s Atari DOS Format: %d usable sectors (%d free), %d files" % (self.header, self.total_sectors, self.unused_sectors, len(self.files))
@ -253,7 +352,7 @@ class AtariDosDiskImage(DiskImageBase):
extra_free = data[122:124].view(dtype='<u2')[0] extra_free = data[122:124].view(dtype='<u2')[0]
self.unused_sectors += extra_free self.unused_sectors += extra_free
def get_directory(self): def get_directory(self, directory=None):
dir_bytes, style = self.get_sectors(361, 368) dir_bytes, style = self.get_sectors(361, 368)
i = 0 i = 0
num = 0 num = 0
@ -263,6 +362,8 @@ class AtariDosDiskImage(DiskImageBase):
if dirent.mydos: if dirent.mydos:
dirent = MydosDirent(self, num, dir_bytes[i:i+16]) dirent = MydosDirent(self, num, dir_bytes[i:i+16])
if directory is not None:
directory.set(num, dirent)
if dirent.in_use: if dirent.in_use:
files.append(dirent) files.append(dirent)
if not dirent.is_sane: if not dirent.is_sane:
@ -357,6 +458,7 @@ class AtariDosDiskImage(DiskImageBase):
log.debug("%s not a binary file; skipping segment generation" % str(segment)) log.debug("%s not a binary file; skipping segment generation" % str(segment))
return segments_out return segments_out
def get_xex(segments, runaddr): def get_xex(segments, runaddr):
total = 2 total = 2
for s in segments: for s in segments:

View File

@ -4,6 +4,10 @@ from errors import *
from segments import SegmentData, EmptySegment, ObjSegment, RawSectorsSegment from segments import SegmentData, EmptySegment, ObjSegment, RawSectorsSegment
from utils import to_numpy from utils import to_numpy
import logging
log = logging.getLogger(__name__)
class AtrHeader(object): class AtrHeader(object):
# ATR Format described in http://www.atarimax.com/jindroush.atari.org/afmtatr.html # ATR Format described in http://www.atarimax.com/jindroush.atari.org/afmtatr.html
format = np.dtype([ format = np.dtype([
@ -137,13 +141,37 @@ class DiskImageBase(object):
self.header = None self.header = None
self.total_sectors = 0 self.total_sectors = 0
self.unused_sectors = 0 self.unused_sectors = 0
self.files = [] self.files = [] # all dirents that show up in a normal dir listing
self.segments = [] self.segments = []
self.all_sane = True self.all_sane = True
self.setup() self.setup()
def __len__(self): def __len__(self):
return len(self.rawdata) return len(self.rawdata)
@property
def bytes_per_sector(self):
raise NotImplementedError
@property
def payload_bytes_per_sector(self):
raise NotImplementedError
@property
def writeable_sector_class(self):
return WriteableSector
@property
def vtoc_class(self):
return VTOC
@property
def directory_class(self):
return Directory
@property
def sector_list_class(self):
return SectorList
def set_filename(self, filename): def set_filename(self, filename):
if "." in filename: if "." in filename:
@ -164,6 +192,9 @@ class DiskImageBase(object):
self.read_header() self.read_header()
self.header.check_size(self.size - len(self.header)) self.header.check_size(self.size - len(self.header))
self.check_size() self.check_size()
self.get_metadata()
def get_metadata(self):
self.get_boot_sector_info() self.get_boot_sector_info()
self.get_vtoc() self.get_vtoc()
self.get_directory() self.get_directory()
@ -231,9 +262,10 @@ class DiskImageBase(object):
pass pass
def get_vtoc(self): def get_vtoc(self):
"""Get information from VTOC and populate the VTOC object"""
pass pass
def get_directory(self): def get_directory(self, directory=None):
pass pass
def get_raw_bytes(self, sector): def get_raw_bytes(self, sector):
@ -312,7 +344,7 @@ class DiskImageBase(object):
def get_vtoc_segments(self): def get_vtoc_segments(self):
return [] return []
def get_directory_segments(self): def get_directory_segments(self):
return [] return []
@ -339,6 +371,279 @@ class DiskImageBase(object):
segments.append(segment) segments.append(segment)
return segments return segments
# file writing methods
def write_file(self, filename, filetype, data):
"""Write data to a file on disk
This throws various exceptions on failures, for instance if there is
not enough space on disk or a free entry is not available in the
catalog.
"""
directory = self.directory_class(self.bytes_per_sector)
self.get_directory(directory)
dirent = directory.add_dirent(filename, filetype)
data = to_numpy(data)
sector_list = self.sector_list_class(self.bytes_per_sector, self.payload_bytes_per_sector, data, self.writeable_sector_class)
vtoc_segments = self.get_vtoc_segments()
vtoc = self.vtoc_class(self.bytes_per_sector, vtoc_segments)
sector_list.calc_sector_map(vtoc)
directory.save_dirent(dirent, sector_list)
self.write_sector_list(sector_list)
self.write_sector_list(vtoc)
self.write_sector_list(directory)
self.get_metadata()
def write_sector_list(self, sector_list):
for sector in sector_list:
pos, size = self.header.get_pos(sector.sector_num)
log.debug("writing: %s" % sector)
self.bytes[pos:pos + size] = sector.data
class WriteableSector(object):
def __init__(self, sector_size, data=None):
self._sector_num = -1
self._next_sector = 0
self.sector_size = sector_size
self.data = np.zeros([sector_size], dtype=np.uint8)
self.used = 0
self.ptr = self.used
if data is not None:
self.add_data(data)
def __str__(self):
return "sector=%d next=%d size=%d used=%d" % (self._sector_num, self._next_sector, self.sector_size, self.used)
@property
def sector_num(self):
return self._sector_num
@sector_num.setter
def sector_num(self, value):
self._sector_num = value
@property
def next_sector_num(self):
return self._next_sector_num
@sector_num.setter
def next_sector_num(self, value):
self._next_sector_num = value
@property
def space_remaining(self):
return self.sector_size - self.ptr
@property
def is_empty(self):
return self.ptr == 0
def add_data(self, data):
count = len(data)
if self.ptr + count > self.sector_size:
count = self.space_remaining
self.data[self.ptr:self.ptr + count] = data[0:count]
self.ptr += count
self.used += count
return data[count:]
class BaseSectorList(object):
def __init__(self, bytes_per_sector):
self.bytes_per_sector = bytes_per_sector
self.sectors = []
def __len__(self):
return len(self.sectors)
def __getitem__(self, index):
if index < 0 or index >= len(self):
raise IndexError
return self.sectors[index]
@property
def num_sectors(self):
return len(self.sectors)
@property
def first_sector(self):
if self.sectors:
return self.sectors[0].sector_num
return -1
def append(self, sector):
self.sectors.append(sector)
class Directory(BaseSectorList):
def __init__(self, bytes_per_sector, num_dirents=-1, sector_class=WriteableSector):
BaseSectorList.__init__(self, bytes_per_sector)
self.sector_class = sector_class
self.num_dirents = num_dirents
# number of dirents may be unlimited, so use a dict instead of a list
self.dirents = {}
def set(self, index, dirent):
self.dirents[index] = dirent
log.debug("set dirent #%d: %s" % (index, dirent))
def get_free_dirent(self):
used = set()
for i, d in self.dirents.iteritems():
if not d.in_use:
return i
used.add(i)
if len(used) >= self.num_dirents:
raise NoSpaceInDirectory()
def add_dirent(self, filename, filetype):
index = self.get_free_dirent()
dirent = self.dirent_class(None)
dirent.set_values(filename, filetype, index)
self.set(index, dirent)
return dirent
def save_dirent(self, dirent, sector_list):
dirent.update_sector_info(sector_list)
self.calc_sectors()
def set_location(self, sector):
raise NotImplementedError
def set_size(self, size):
raise NotImplementedError
@property
def dirent_class(self):
raise NotImplementedError
def calc_sectors(self):
self.sectors = []
self.current_sector = self.sector_class(self.bytes_per_sector)
self.encode_index = 0
d = self.dirents.items()
d.sort()
# there may be gaps, so fill in missing entries with blanks
current = 0
for index, dirent in d:
for missing in range(current, index):
log.debug("Encoding empty dirent at %d" % missing)
data = self.encode_empty()
self.store_encoded(data)
log.debug("Encoding dirent: %s" % dirent)
data = self.encode_dirent(dirent)
self.store_encoded(data)
current = index + 1
self.finish_encoding()
def encode_empty(self):
raise NotImplementedError
def encode_dirent(self, dirent):
raise NotImplementedError
def store_encoded(self, data):
while True:
log.debug("store_encoded: %d bytes in %s" % (len(data), self.current_sector))
data = self.current_sector.add_data(data)
if len(data) > 0:
self.sectors.append(self.current_sector)
self.current_sector = self.sector_class(self.bytes_per_sector)
else:
break
def finish_encoding(self):
if not self.current_sector.is_empty:
self.sectors.append(self.current_sector)
self.set_sector_numbers()
def set_sector_numbers(self):
raise NotImplementedError
class VTOC(BaseSectorList):
def __init__(self, bytes_per_sector, segments=None):
BaseSectorList.__init__(self, bytes_per_sector)
# sector map: 1 is free, 0 is allocated
self.sector_map = np.zeros([1280], dtype=np.uint8)
if segments is not None:
self.parse_segments(segments)
def parse_segments(self, segments):
raise NotImplementedError
def reserve_space(self, num):
order = []
for i in range(num):
order.append(self.get_next_free_sector())
log.debug("Sectors reserved: %s" % order)
self.calc_bitmap()
return order
def get_next_free_sector(self):
free = np.nonzero(self.sector_map)[0]
if len(free) > 0:
num = free[0]
log.debug("Found sector %d free" % num)
self.sector_map[num] = 0
return num
raise NotEnoughSpaceOnDisk("No space left in VTOC")
def calc_bitmap(self):
raise NotImplementedError
class SectorList(BaseSectorList):
def __init__(self, bytes_per_sector, usable, data, sector_class):
BaseSectorList.__init__(self, bytes_per_sector)
self.data = to_numpy(data)
self.usable_bytes = usable
self.split_into_sectors(sector_class)
self.file_length = -1
def split_into_sectors(self, sector_class):
index = 0
while index < len(self.data):
count = min(self.usable_bytes, len(self.data) - index)
sector = sector_class(self.bytes_per_sector, self.data[index:index + count])
self.sectors.append(sector)
index += count
def calc_sector_map(self, vtoc):
""" Map out the sectors and link the sectors together
raises NotEnoughSpaceOnDisk if the whole file won't fit. It will not
allow partial writes.
"""
self.calc_extra_sectors()
num = len(self.sectors)
order = vtoc.reserve_space(num)
if len(order) != len(self.sectors):
raise InvalidFile("VTOC reserved space for %d sectors. Sectors needed: %d" % (len(order), len(self.sectors)))
self.file_length = 0
last_sector = None
for sector, sector_num in zip(self.sectors, order):
sector.sector_num = sector_num
self.file_length += sector.used
if last_sector is not None:
last_sector.next_sector_num = sector_num
last_sector = sector
if last_sector is not None:
last_sector.next_sector_num = 0
def calc_extra_sectors(self):
""" Add extra sectors to the list.
For example, DOS 3.3 uses a track/sector list at the beginning of the
file
"""
pass
class BootDiskImage(DiskImageBase): class BootDiskImage(DiskImageBase):
def __str__(self): def __str__(self):

View File

@ -186,6 +186,14 @@ class Dos33DiskImage(DiskImageBase):
def read_header(self): def read_header(self):
self.header = Dos33Header() self.header = Dos33Header()
@property
def bytes_per_sector(self):
return 256
@property
def payload_bytes_per_sector(self):
return 256
def get_boot_sector_info(self): def get_boot_sector_info(self):
# based on logic from a2server # based on logic from a2server

View File

@ -30,3 +30,9 @@ class InvalidBinaryFile(InvalidFile):
class InvalidSegmentParser(AtrError): class InvalidSegmentParser(AtrError):
pass pass
class NoSpaceInDirectory(AtrError):
pass
class NotEnoughSpaceOnDisk(AtrError):
pass

25
test/test_add_file.py Normal file
View File

@ -0,0 +1,25 @@
import numpy as np
from mock import *
from atrcopy import SegmentData, AtariDosDiskImage, InvalidBinaryFile
class TestAtariDosSDImage(object):
def setup(self):
data = np.fromfile("../test_data/dos_sd_test1.atr", dtype=np.uint8)
rawdata = SegmentData(data)
self.image = AtariDosDiskImage(rawdata)
def test_small(self):
assert len(self.image.files) == 5
data = [0xff, 0xff, 0x00, 0x60, 0x01, 0x60, 1, 2]
self.image.write_file("TEST.XEX", None, data)
assert len(self.image.files) == 6
if __name__ == "__main__":
t = TestAtariDosFile()
t.setup()
t.test_segment()