mirror of
https://github.com/robmcmullen/atrcopy.git
synced 2025-01-01 10:30:58 +00:00
Added file deletion for Atari DOS
This commit is contained in:
parent
7f2b07b221
commit
ea92e91865
@ -1,7 +1,7 @@
|
||||
import numpy as np
|
||||
|
||||
from errors import *
|
||||
from diskimages import DiskImageBase, Directory, VTOC, WriteableSector
|
||||
from diskimages import DiskImageBase, Directory, VTOC, WriteableSector, BaseSectorList
|
||||
from segments import EmptySegment, ObjSegment, RawSectorsSegment, DefaultSegment, SegmentSaver
|
||||
from utils import to_numpy
|
||||
|
||||
@ -139,8 +139,6 @@ class AtariDosDirent(object):
|
||||
def encode_dirent(self):
|
||||
data = np.zeros([16], dtype=np.uint8)
|
||||
values = data.view(dtype=self.format)[0]
|
||||
self.dos_2 = True
|
||||
self.in_use = True
|
||||
flag = (1 * int(self.opened_output)) | (2 * int(self.dos_2)) | (4 * int(self.mydos)) | (0x10 * int(self.is_dir)) | (0x20 * int(self.locked)) | (0x40 * int(self.in_use)) | (0x80 * int(self.deleted))
|
||||
values[0] = flag
|
||||
values[1] = self.num_sectors
|
||||
@ -149,6 +147,10 @@ class AtariDosDirent(object):
|
||||
values[4] = self.ext
|
||||
return data
|
||||
|
||||
def mark_deleted(self):
|
||||
self.deleted = True
|
||||
self.in_use = False
|
||||
|
||||
def update_sector_info(self, sector_list):
|
||||
self.num_sectors = sector_list.num_sectors
|
||||
self.starting_sector = sector_list.first_sector
|
||||
@ -162,6 +164,17 @@ class AtariDosDirent(object):
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_sector_list(self, image):
|
||||
sector_list = BaseSectorList(image.bytes_per_sector)
|
||||
self.start_read(image)
|
||||
while True:
|
||||
sector = WriteableSector(image.bytes_per_sector, None, self.current_sector)
|
||||
sector_list.append(sector)
|
||||
_, last, _, _ = self.read_sector(image)
|
||||
if last:
|
||||
break
|
||||
return sector_list
|
||||
|
||||
def start_read(self, image):
|
||||
if not self.is_sane:
|
||||
raise InvalidDirent("Invalid directory entry '%s'" % str(self))
|
||||
@ -198,6 +211,8 @@ class AtariDosDirent(object):
|
||||
self.filename = "%-8s" % filename[0:8]
|
||||
self.ext = ext
|
||||
self.file_num = index
|
||||
self.dos_2 = True
|
||||
self.in_use = True
|
||||
|
||||
|
||||
class MydosDirent(AtariDosDirent):
|
||||
@ -362,14 +377,15 @@ class AtariDosDiskImage(DiskImageBase):
|
||||
if dirent.mydos:
|
||||
dirent = MydosDirent(self, num, dir_bytes[i:i+16])
|
||||
|
||||
if directory is not None:
|
||||
directory.set(num, dirent)
|
||||
if dirent.in_use:
|
||||
files.append(dirent)
|
||||
if not dirent.is_sane:
|
||||
self.all_sane = False
|
||||
log.debug("dirent %d not sane: %s" % (num, dirent))
|
||||
elif dirent.flag == 0:
|
||||
break
|
||||
if directory is not None:
|
||||
directory.set(num, dirent)
|
||||
i += 16
|
||||
num += 1
|
||||
self.files = files
|
||||
|
@ -348,11 +348,15 @@ class DiskImageBase(object):
|
||||
def get_directory_segments(self):
|
||||
return []
|
||||
|
||||
def find_file(self, filename):
|
||||
def find_dirent(self, filename):
|
||||
for dirent in self.files:
|
||||
if filename == dirent.get_filename():
|
||||
return self.get_file(dirent)
|
||||
return ""
|
||||
return dirent
|
||||
raise FileNotFound("%s not found on disk" % filename)
|
||||
|
||||
def find_file(self, filename):
|
||||
dirent = self.find_dirent(filename)
|
||||
return self.get_file(dirent)
|
||||
|
||||
def get_file(self, dirent):
|
||||
segment = self.get_file_segment(dirent)
|
||||
@ -387,8 +391,7 @@ class DiskImageBase(object):
|
||||
sector_list = self.sector_list_class(self.bytes_per_sector, self.payload_bytes_per_sector, data, self.writeable_sector_class)
|
||||
vtoc_segments = self.get_vtoc_segments()
|
||||
vtoc = self.vtoc_class(self.bytes_per_sector, vtoc_segments)
|
||||
sector_list.calc_sector_map(dirent, vtoc)
|
||||
directory.save_dirent(dirent, sector_list)
|
||||
directory.save_dirent(dirent, vtoc, sector_list)
|
||||
self.write_sector_list(sector_list)
|
||||
self.write_sector_list(vtoc)
|
||||
self.write_sector_list(directory)
|
||||
@ -400,10 +403,23 @@ class DiskImageBase(object):
|
||||
log.debug("writing: %s" % sector)
|
||||
self.bytes[pos:pos + size] = sector.data
|
||||
|
||||
def delete_file(self, filename):
|
||||
directory = self.directory_class(self.bytes_per_sector)
|
||||
self.get_directory(directory)
|
||||
dirent = directory.find_dirent(filename)
|
||||
sector_list = dirent.get_sector_list(self)
|
||||
vtoc_segments = self.get_vtoc_segments()
|
||||
vtoc = self.vtoc_class(self.bytes_per_sector, vtoc_segments)
|
||||
directory.remove_dirent(dirent, vtoc, sector_list)
|
||||
self.write_sector_list(sector_list)
|
||||
self.write_sector_list(vtoc)
|
||||
self.write_sector_list(directory)
|
||||
self.get_metadata()
|
||||
|
||||
|
||||
class WriteableSector(object):
|
||||
def __init__(self, sector_size, data=None):
|
||||
self._sector_num = -1
|
||||
def __init__(self, sector_size, data=None, num=-1):
|
||||
self._sector_num = num
|
||||
self._next_sector = 0
|
||||
self.sector_size = sector_size
|
||||
self.file_num = 0
|
||||
@ -491,12 +507,17 @@ class Directory(BaseSectorList):
|
||||
|
||||
def get_free_dirent(self):
|
||||
used = set()
|
||||
for i, d in self.dirents.iteritems():
|
||||
if not d.in_use:
|
||||
d = self.dirents.items()
|
||||
d.sort()
|
||||
for i, dirent in d:
|
||||
if not dirent.in_use:
|
||||
return i
|
||||
used.add(i)
|
||||
if len(used) >= self.num_dirents:
|
||||
if self.num_dirents > 0 and (len(used) >= self.num_dirents):
|
||||
raise NoSpaceInDirectory()
|
||||
i += 1
|
||||
used.add(i)
|
||||
return i
|
||||
|
||||
def add_dirent(self, filename, filetype):
|
||||
index = self.get_free_dirent()
|
||||
@ -505,15 +526,45 @@ class Directory(BaseSectorList):
|
||||
self.set(index, dirent)
|
||||
return dirent
|
||||
|
||||
def save_dirent(self, dirent, sector_list):
|
||||
def find_dirent(self, filename):
|
||||
for dirent in self.dirents.values():
|
||||
if filename == dirent.get_filename():
|
||||
return dirent
|
||||
raise FileNotFound("%s not found on disk" % filename)
|
||||
|
||||
def save_dirent(self, dirent, vtoc, sector_list):
|
||||
self.place_sector_list(dirent, vtoc, sector_list)
|
||||
dirent.update_sector_info(sector_list)
|
||||
self.calc_sectors()
|
||||
|
||||
def set_location(self, sector):
|
||||
raise NotImplementedError
|
||||
def place_sector_list(self, dirent, vtoc, sector_list):
|
||||
""" Map out the sectors and link the sectors together
|
||||
|
||||
def set_size(self, size):
|
||||
raise NotImplementedError
|
||||
raises NotEnoughSpaceOnDisk if the whole file won't fit. It will not
|
||||
allow partial writes.
|
||||
"""
|
||||
sector_list.calc_extra_sectors()
|
||||
num = len(sector_list)
|
||||
order = vtoc.reserve_space(num)
|
||||
if len(order) != num:
|
||||
raise InvalidFile("VTOC reserved space for %d sectors. Sectors needed: %d" % (len(order), num))
|
||||
file_length = 0
|
||||
last_sector = None
|
||||
for sector, sector_num in zip(sector_list.sectors, order):
|
||||
sector.sector_num = sector_num
|
||||
sector.file_num = dirent.file_num
|
||||
file_length += sector.used
|
||||
if last_sector is not None:
|
||||
last_sector.next_sector_num = sector_num
|
||||
last_sector = sector
|
||||
if last_sector is not None:
|
||||
last_sector.next_sector_num = 0
|
||||
sector_list.file_length = file_length
|
||||
|
||||
def remove_dirent(self, dirent, vtoc, sector_list):
|
||||
vtoc.free_sector_list(sector_list)
|
||||
dirent.mark_deleted()
|
||||
self.calc_sectors()
|
||||
|
||||
@property
|
||||
def dirent_class(self):
|
||||
@ -596,6 +647,10 @@ class VTOC(BaseSectorList):
|
||||
def calc_bitmap(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def free_sector_list(self, sector_list):
|
||||
for sector in sector_list:
|
||||
self.sector_map[sector.sector_num] = 1
|
||||
|
||||
|
||||
class SectorList(BaseSectorList):
|
||||
def __init__(self, bytes_per_sector, usable, data, sector_class):
|
||||
@ -613,29 +668,6 @@ class SectorList(BaseSectorList):
|
||||
self.sectors.append(sector)
|
||||
index += count
|
||||
|
||||
def calc_sector_map(self, dirent, vtoc):
|
||||
""" Map out the sectors and link the sectors together
|
||||
|
||||
raises NotEnoughSpaceOnDisk if the whole file won't fit. It will not
|
||||
allow partial writes.
|
||||
"""
|
||||
self.calc_extra_sectors()
|
||||
num = len(self.sectors)
|
||||
order = vtoc.reserve_space(num)
|
||||
if len(order) != len(self.sectors):
|
||||
raise InvalidFile("VTOC reserved space for %d sectors. Sectors needed: %d" % (len(order), len(self.sectors)))
|
||||
self.file_length = 0
|
||||
last_sector = None
|
||||
for sector, sector_num in zip(self.sectors, order):
|
||||
sector.sector_num = sector_num
|
||||
sector.file_num = dirent.file_num
|
||||
self.file_length += sector.used
|
||||
if last_sector is not None:
|
||||
last_sector.next_sector_num = sector_num
|
||||
last_sector = sector
|
||||
if last_sector is not None:
|
||||
last_sector.next_sector_num = 0
|
||||
|
||||
|
||||
def calc_extra_sectors(self):
|
||||
""" Add extra sectors to the list.
|
||||
|
@ -36,3 +36,6 @@ class NoSpaceInDirectory(AtrError):
|
||||
|
||||
class NotEnoughSpaceOnDisk(AtrError):
|
||||
pass
|
||||
|
||||
class FileNotFound(AtrError):
|
||||
pass
|
||||
|
@ -12,11 +12,12 @@ class TestAtariDosSDImage(object):
|
||||
rawdata = SegmentData(data)
|
||||
self.image = AtariDosDiskImage(rawdata)
|
||||
|
||||
def check_entries(self, entries, save_image_name=None):
|
||||
def check_entries(self, entries, prefix="TEST", save=None):
|
||||
orig_num_files = len(self.image.files)
|
||||
filenames = []
|
||||
count = 1
|
||||
for data in entries:
|
||||
filename = "TEST%d.BIN" % count
|
||||
filename = "%s%d.BIN" % (prefix, count)
|
||||
self.image.write_file(filename, None, data)
|
||||
assert len(self.image.files) == orig_num_files + count
|
||||
data2 = self.image.find_file(filename)
|
||||
@ -26,13 +27,16 @@ class TestAtariDosSDImage(object):
|
||||
# loop over them again to make sure data wasn't overwritten
|
||||
count = 1
|
||||
for data in entries:
|
||||
filename = "TEST%d.BIN" % count
|
||||
filename = "%s%d.BIN" % (prefix, count)
|
||||
data2 = self.image.find_file(filename)
|
||||
assert data.tostring() == data2
|
||||
count += 1
|
||||
filenames.append(filename)
|
||||
|
||||
if save_image_name is not None:
|
||||
self.image.save(save_image_name)
|
||||
if save is not None:
|
||||
self.image.save(save)
|
||||
|
||||
return filenames
|
||||
|
||||
def test_small(self):
|
||||
assert len(self.image.files) == 5
|
||||
@ -68,7 +72,7 @@ class TestAtariDosSDImage(object):
|
||||
np.arange(9*1024, dtype=np.uint8),
|
||||
np.arange(10*1024, dtype=np.uint8),
|
||||
]
|
||||
self.check_entries(entries, "many_small.atr")
|
||||
self.check_entries(entries, save="many_small.atr")
|
||||
|
||||
def test_big_failure(self):
|
||||
assert len(self.image.files) == 5
|
||||
@ -80,6 +84,36 @@ class TestAtariDosSDImage(object):
|
||||
self.image.write_file("RAMP50K2.BIN", None, data)
|
||||
assert len(self.image.files) == 6
|
||||
|
||||
def test_delete(self):
|
||||
entries1 = [
|
||||
np.arange(3*1024, dtype=np.uint8),
|
||||
np.arange(3*1024, dtype=np.uint8),
|
||||
np.arange(3*1024, dtype=np.uint8),
|
||||
np.arange(3*1024, dtype=np.uint8),
|
||||
np.arange(3*1024, dtype=np.uint8),
|
||||
np.arange(3*1024, dtype=np.uint8),
|
||||
np.arange(3*1024, dtype=np.uint8),
|
||||
np.arange(3*1024, dtype=np.uint8),
|
||||
np.arange(3*1024, dtype=np.uint8),
|
||||
np.arange(10*1024, dtype=np.uint8),
|
||||
np.arange(10*1024, dtype=np.uint8),
|
||||
]
|
||||
entries2 = [
|
||||
np.arange(10*1024, dtype=np.uint8),
|
||||
np.arange(11*1024, dtype=np.uint8),
|
||||
]
|
||||
|
||||
filenames = self.check_entries(entries1, "FIRST")
|
||||
assert len(self.image.files) == 16
|
||||
self.image.delete_file(filenames[2])
|
||||
self.image.delete_file(filenames[5])
|
||||
self.image.delete_file(filenames[0])
|
||||
self.image.delete_file(filenames[8])
|
||||
assert len(self.image.files) == 12
|
||||
|
||||
filename = self.check_entries(entries2, "SECOND", save="test_delete.atr")
|
||||
assert len(self.image.files) == 14
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
t = TestAtariDosFile()
|
||||
|
Loading…
Reference in New Issue
Block a user