diff --git a/atrcopy/ataridos.py b/atrcopy/ataridos.py index d215d55..b8e3dab 100644 --- a/atrcopy/ataridos.py +++ b/atrcopy/ataridos.py @@ -1,7 +1,7 @@ import numpy as np from errors import * -from diskimages import DiskImageBase +from diskimages import DiskImageBase, Directory, VTOC, WriteableSector from segments import EmptySegment, ObjSegment, RawSectorsSegment, DefaultSegment, SegmentSaver from utils import to_numpy @@ -9,6 +9,59 @@ import logging log = logging.getLogger(__name__) +class AtariDosWriteableSector(WriteableSector): + @property + def next_sector_num(self): + return self._next_sector_num + + @next_sector_num.setter + def next_sector_num(self, value): + self._next_sector_num = value + index = self.sector_size - 3 + hi, lo = divmod(value, 256) + self.data[index] = self.used + self.data[index + 1] = lo + self.data[index + 2] = hi + log.debug("sector metadata for %d: %s" % (self._sector_num, self.data[index:index + 3])) + # file number will be added later when known. + + +class AtariDosVTOC(VTOC): + def parse_segments(self, segments): + self.vtoc1 = segments[0].data + bits = np.unpackbits(self.vtoc1[0x0a:0x64]) + log.debug("vtoc before: %s" % bits) + self.sector_map[0:720] = bits + + def calc_bitmap(self): + log.debug("vtoc after: %s" % self.sector_map[0:720]) + packed = np.packbits(self.sector_map[0:720]) + self.vtoc1[0x0a:0x64] = packed + s = WriteableSector(self.bytes_per_sector, self.vtoc1) + s.sector_num = 360 + self.sectors.append(s) + + +class AtariDosDirectory(Directory): + @property + def dirent_class(self): + return AtariDosDirent + + def encode_empty(self): + return np.zeros([16], dtype=np.uint8) + + def encode_dirent(self, dirent): + data = dirent.encode_dirent() + log.debug("encoded dirent: %s" % data) + return data + + def set_sector_numbers(self): + num = 361 + for sector in self.sectors: + sector.sector_num = num + num += 1 + + class AtariDosDirent(object): # ATR Dirent structure described at http://atari.kensclassics.org/dos.htm format = np.dtype([ @@ -82,6 +135,23 @@ class AtariDosDirent(object): self.filename = str(values[3]).rstrip() self.ext = str(values[4]).rstrip() self.is_sane = self.sanity_check(image) + + def encode_dirent(self): + data = np.zeros([16], dtype=np.uint8) + values = data.view(dtype=self.format)[0] + self.dos_2 = True + self.in_use = True + flag = (1 * int(self.opened_output)) | (2 * int(self.dos_2)) | (4 * int(self.mydos)) | (0x10 * int(self.is_dir)) | (0x20 * int(self.locked)) | (0x40 * int(self.in_use)) | (0x80 * int(self.deleted)) + values[0] = flag + values[1] = self.num_sectors + values[2] = self.starting_sector + values[3] = self.filename + values[4] = self.ext + return data + + def update_sector_info(self, sector_list): + self.num_sectors = sector_list.num_sectors + self.starting_sector = sector_list.first_sector def sanity_check(self, image): if not self.in_use: @@ -120,6 +190,15 @@ class AtariDosDirent(object): ext = ("." + self.ext) if self.ext else "" return self.filename + ext + def set_values(self, filename, filetype, index): + if "." in filename: + filename, ext = filename.split(".", 1) + else: + ext = " " + self.filename = "%-8s" % filename[0:8] + self.ext = ext + self.file_num = index + class MydosDirent(AtariDosDirent): def process_raw_sector(self, image, raw): @@ -204,6 +283,26 @@ class AtariDosDiskImage(DiskImageBase): self.first_data_after_vtoc = 369 DiskImageBase.__init__(self, *args, **kwargs) + @property + def bytes_per_sector(self): + return self.header.sector_size + + @property + def payload_bytes_per_sector(self): + return self.header.sector_size - 3 + + @property + def writeable_sector_class(self): + return AtariDosWriteableSector + + @property + def vtoc_class(self): + return AtariDosVTOC + + @property + def directory_class(self): + return AtariDosDirectory + def __str__(self): return "%s Atari DOS Format: %d usable sectors (%d free), %d files" % (self.header, self.total_sectors, self.unused_sectors, len(self.files)) @@ -253,7 +352,7 @@ class AtariDosDiskImage(DiskImageBase): extra_free = data[122:124].view(dtype=' self.sector_size: + count = self.space_remaining + self.data[self.ptr:self.ptr + count] = data[0:count] + self.ptr += count + self.used += count + return data[count:] + + +class BaseSectorList(object): + def __init__(self, bytes_per_sector): + self.bytes_per_sector = bytes_per_sector + self.sectors = [] + + def __len__(self): + return len(self.sectors) + + def __getitem__(self, index): + if index < 0 or index >= len(self): + raise IndexError + return self.sectors[index] + + @property + def num_sectors(self): + return len(self.sectors) + + @property + def first_sector(self): + if self.sectors: + return self.sectors[0].sector_num + return -1 + + def append(self, sector): + self.sectors.append(sector) + + +class Directory(BaseSectorList): + def __init__(self, bytes_per_sector, num_dirents=-1, sector_class=WriteableSector): + BaseSectorList.__init__(self, bytes_per_sector) + self.sector_class = sector_class + self.num_dirents = num_dirents + # number of dirents may be unlimited, so use a dict instead of a list + self.dirents = {} + + def set(self, index, dirent): + self.dirents[index] = dirent + log.debug("set dirent #%d: %s" % (index, dirent)) + + def get_free_dirent(self): + used = set() + for i, d in self.dirents.iteritems(): + if not d.in_use: + return i + used.add(i) + if len(used) >= self.num_dirents: + raise NoSpaceInDirectory() + + def add_dirent(self, filename, filetype): + index = self.get_free_dirent() + dirent = self.dirent_class(None) + dirent.set_values(filename, filetype, index) + self.set(index, dirent) + return dirent + + def save_dirent(self, dirent, sector_list): + dirent.update_sector_info(sector_list) + self.calc_sectors() + + def set_location(self, sector): + raise NotImplementedError + + def set_size(self, size): + raise NotImplementedError + + @property + def dirent_class(self): + raise NotImplementedError + + def calc_sectors(self): + self.sectors = [] + self.current_sector = self.sector_class(self.bytes_per_sector) + self.encode_index = 0 + + d = self.dirents.items() + d.sort() + # there may be gaps, so fill in missing entries with blanks + current = 0 + for index, dirent in d: + for missing in range(current, index): + log.debug("Encoding empty dirent at %d" % missing) + data = self.encode_empty() + self.store_encoded(data) + log.debug("Encoding dirent: %s" % dirent) + data = self.encode_dirent(dirent) + self.store_encoded(data) + current = index + 1 + self.finish_encoding() + + def encode_empty(self): + raise NotImplementedError + + def encode_dirent(self, dirent): + raise NotImplementedError + + def store_encoded(self, data): + while True: + log.debug("store_encoded: %d bytes in %s" % (len(data), self.current_sector)) + data = self.current_sector.add_data(data) + if len(data) > 0: + self.sectors.append(self.current_sector) + self.current_sector = self.sector_class(self.bytes_per_sector) + else: + break + + def finish_encoding(self): + if not self.current_sector.is_empty: + self.sectors.append(self.current_sector) + self.set_sector_numbers() + + def set_sector_numbers(self): + raise NotImplementedError + + +class VTOC(BaseSectorList): + def __init__(self, bytes_per_sector, segments=None): + BaseSectorList.__init__(self, bytes_per_sector) + + # sector map: 1 is free, 0 is allocated + self.sector_map = np.zeros([1280], dtype=np.uint8) + if segments is not None: + self.parse_segments(segments) + + def parse_segments(self, segments): + raise NotImplementedError + + def reserve_space(self, num): + order = [] + for i in range(num): + order.append(self.get_next_free_sector()) + log.debug("Sectors reserved: %s" % order) + self.calc_bitmap() + return order + + def get_next_free_sector(self): + free = np.nonzero(self.sector_map)[0] + if len(free) > 0: + num = free[0] + log.debug("Found sector %d free" % num) + self.sector_map[num] = 0 + return num + raise NotEnoughSpaceOnDisk("No space left in VTOC") + + def calc_bitmap(self): + raise NotImplementedError + + +class SectorList(BaseSectorList): + def __init__(self, bytes_per_sector, usable, data, sector_class): + BaseSectorList.__init__(self, bytes_per_sector) + self.data = to_numpy(data) + self.usable_bytes = usable + self.split_into_sectors(sector_class) + self.file_length = -1 + + def split_into_sectors(self, sector_class): + index = 0 + while index < len(self.data): + count = min(self.usable_bytes, len(self.data) - index) + sector = sector_class(self.bytes_per_sector, self.data[index:index + count]) + self.sectors.append(sector) + index += count + + def calc_sector_map(self, vtoc): + """ Map out the sectors and link the sectors together + + raises NotEnoughSpaceOnDisk if the whole file won't fit. It will not + allow partial writes. + """ + self.calc_extra_sectors() + num = len(self.sectors) + order = vtoc.reserve_space(num) + if len(order) != len(self.sectors): + raise InvalidFile("VTOC reserved space for %d sectors. Sectors needed: %d" % (len(order), len(self.sectors))) + self.file_length = 0 + last_sector = None + for sector, sector_num in zip(self.sectors, order): + sector.sector_num = sector_num + self.file_length += sector.used + if last_sector is not None: + last_sector.next_sector_num = sector_num + last_sector = sector + if last_sector is not None: + last_sector.next_sector_num = 0 + + + def calc_extra_sectors(self): + """ Add extra sectors to the list. + + For example, DOS 3.3 uses a track/sector list at the beginning of the + file + """ + pass + + class BootDiskImage(DiskImageBase): def __str__(self): diff --git a/atrcopy/dos33.py b/atrcopy/dos33.py index a8dd0ed..e1634d8 100644 --- a/atrcopy/dos33.py +++ b/atrcopy/dos33.py @@ -186,6 +186,14 @@ class Dos33DiskImage(DiskImageBase): def read_header(self): self.header = Dos33Header() + + @property + def bytes_per_sector(self): + return 256 + + @property + def payload_bytes_per_sector(self): + return 256 def get_boot_sector_info(self): # based on logic from a2server diff --git a/atrcopy/errors.py b/atrcopy/errors.py index 90726b0..9a41bc2 100644 --- a/atrcopy/errors.py +++ b/atrcopy/errors.py @@ -30,3 +30,9 @@ class InvalidBinaryFile(InvalidFile): class InvalidSegmentParser(AtrError): pass + +class NoSpaceInDirectory(AtrError): + pass + +class NotEnoughSpaceOnDisk(AtrError): + pass diff --git a/test/test_add_file.py b/test/test_add_file.py new file mode 100644 index 0000000..d7bbc45 --- /dev/null +++ b/test/test_add_file.py @@ -0,0 +1,25 @@ +import numpy as np + +from mock import * + +from atrcopy import SegmentData, AtariDosDiskImage, InvalidBinaryFile + + +class TestAtariDosSDImage(object): + def setup(self): + data = np.fromfile("../test_data/dos_sd_test1.atr", dtype=np.uint8) + rawdata = SegmentData(data) + self.image = AtariDosDiskImage(rawdata) + + def test_small(self): + assert len(self.image.files) == 5 + + data = [0xff, 0xff, 0x00, 0x60, 0x01, 0x60, 1, 2] + self.image.write_file("TEST.XEX", None, data) + assert len(self.image.files) == 6 + + +if __name__ == "__main__": + t = TestAtariDosFile() + t.setup() + t.test_segment()