mirror of
https://github.com/robmcmullen/atrcopy.git
synced 2024-11-29 11:51:14 +00:00
Moved file creation classes to utils.py
* prefer to pass around header than individual attributes so instances can get what they want
This commit is contained in:
parent
35c13bb9d5
commit
f84cea7170
@ -2,7 +2,7 @@ import numpy as np
|
||||
|
||||
from errors import *
|
||||
from segments import SegmentData, EmptySegment, ObjSegment, RawSectorsSegment
|
||||
from utils import to_numpy
|
||||
from utils import *
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
@ -10,6 +10,7 @@ log = logging.getLogger(__name__)
|
||||
|
||||
class BaseHeader(object):
|
||||
file_format = "generic" # text descriptor of file format
|
||||
sector_class = WriteableSector
|
||||
|
||||
def __init__(self, sector_size=256, initial_sectors=0, vtoc_sector=0, starting_sector_label=0):
|
||||
self.image_size = 0
|
||||
@ -72,6 +73,8 @@ class BaseHeader(object):
|
||||
def strict_check(self, image):
|
||||
pass
|
||||
|
||||
def create_sector(self, data):
|
||||
return self.sector_class(self.sector_size, data)
|
||||
|
||||
|
||||
|
||||
@ -228,8 +231,8 @@ class DiskImageBase(object):
|
||||
return Directory
|
||||
|
||||
@property
|
||||
def sector_list_class(self):
|
||||
return SectorList
|
||||
def sector_builder_class(self):
|
||||
return SectorBuilder
|
||||
|
||||
def set_filename(self, filename):
|
||||
if "." in filename:
|
||||
@ -452,11 +455,11 @@ class DiskImageBase(object):
|
||||
"""
|
||||
state = self.begin_transaction()
|
||||
try:
|
||||
directory = self.directory_class(self.bytes_per_sector)
|
||||
directory = self.directory_class(self.header)
|
||||
self.get_directory(directory)
|
||||
dirent = directory.add_dirent(filename, filetype)
|
||||
data = to_numpy(data)
|
||||
sector_list = self.sector_list_class(self.bytes_per_sector, self.payload_bytes_per_sector, data, self.writeable_sector_class)
|
||||
sector_list = self.sector_builder_class(self.header, self.payload_bytes_per_sector, data, self.writeable_sector_class)
|
||||
vtoc_segments = self.get_vtoc_segments()
|
||||
vtoc = self.vtoc_class(self.bytes_per_sector, vtoc_segments)
|
||||
directory.save_dirent(self, dirent, vtoc, sector_list)
|
||||
@ -479,12 +482,12 @@ class DiskImageBase(object):
|
||||
def delete_file(self, filename):
|
||||
state = self.begin_transaction()
|
||||
try:
|
||||
directory = self.directory_class(self.bytes_per_sector)
|
||||
directory = self.directory_class(self.header)
|
||||
self.get_directory(directory)
|
||||
dirent = directory.find_dirent(filename)
|
||||
sector_list = dirent.get_sector_list(self)
|
||||
vtoc_segments = self.get_vtoc_segments()
|
||||
vtoc = self.vtoc_class(self.bytes_per_sector, vtoc_segments)
|
||||
vtoc = self.vtoc_class(self.header, vtoc_segments)
|
||||
directory.remove_dirent(self, dirent, vtoc, sector_list)
|
||||
self.write_sector_list(sector_list)
|
||||
self.write_sector_list(vtoc)
|
||||
@ -497,270 +500,6 @@ class DiskImageBase(object):
|
||||
self.get_metadata()
|
||||
|
||||
|
||||
class WriteableSector(object):
|
||||
def __init__(self, sector_size, data=None, num=-1):
|
||||
self._sector_num = num
|
||||
self._next_sector = 0
|
||||
self.sector_size = sector_size
|
||||
self.file_num = 0
|
||||
self.data = np.zeros([sector_size], dtype=np.uint8)
|
||||
self.used = 0
|
||||
self.ptr = self.used
|
||||
if data is not None:
|
||||
self.add_data(data)
|
||||
|
||||
def __str__(self):
|
||||
return "sector=%d next=%d size=%d used=%d" % (self._sector_num, self._next_sector, self.sector_size, self.used)
|
||||
|
||||
@property
|
||||
def sector_num(self):
|
||||
return self._sector_num
|
||||
|
||||
@sector_num.setter
|
||||
def sector_num(self, value):
|
||||
self._sector_num = value
|
||||
|
||||
@property
|
||||
def next_sector_num(self):
|
||||
return self._next_sector_num
|
||||
|
||||
@sector_num.setter
|
||||
def next_sector_num(self, value):
|
||||
self._next_sector_num = value
|
||||
|
||||
@property
|
||||
def space_remaining(self):
|
||||
return self.sector_size - self.ptr
|
||||
|
||||
@property
|
||||
def is_empty(self):
|
||||
return self.ptr == 0
|
||||
|
||||
def add_data(self, data):
|
||||
count = len(data)
|
||||
if self.ptr + count > self.sector_size:
|
||||
count = self.space_remaining
|
||||
self.data[self.ptr:self.ptr + count] = data[0:count]
|
||||
self.ptr += count
|
||||
self.used += count
|
||||
return data[count:]
|
||||
|
||||
|
||||
class BaseSectorList(object):
|
||||
def __init__(self, bytes_per_sector):
|
||||
self.bytes_per_sector = bytes_per_sector
|
||||
self.sectors = []
|
||||
|
||||
def __len__(self):
|
||||
return len(self.sectors)
|
||||
|
||||
def __getitem__(self, index):
|
||||
if index < 0 or index >= len(self):
|
||||
raise IndexError
|
||||
return self.sectors[index]
|
||||
|
||||
@property
|
||||
def num_sectors(self):
|
||||
return len(self.sectors)
|
||||
|
||||
@property
|
||||
def first_sector(self):
|
||||
if self.sectors:
|
||||
return self.sectors[0].sector_num
|
||||
return -1
|
||||
|
||||
def append(self, sector):
|
||||
self.sectors.append(sector)
|
||||
|
||||
|
||||
class Directory(BaseSectorList):
|
||||
def __init__(self, bytes_per_sector, num_dirents=-1, sector_class=WriteableSector):
|
||||
BaseSectorList.__init__(self, bytes_per_sector)
|
||||
self.sector_class = sector_class
|
||||
self.num_dirents = num_dirents
|
||||
# number of dirents may be unlimited, so use a dict instead of a list
|
||||
self.dirents = {}
|
||||
|
||||
def set(self, index, dirent):
|
||||
self.dirents[index] = dirent
|
||||
log.debug("set dirent #%d: %s" % (index, dirent))
|
||||
|
||||
def get_free_dirent(self):
|
||||
used = set()
|
||||
d = self.dirents.items()
|
||||
d.sort()
|
||||
for i, dirent in d:
|
||||
if not dirent.in_use:
|
||||
return i
|
||||
used.add(i)
|
||||
if self.num_dirents > 0 and (len(used) >= self.num_dirents):
|
||||
raise NoSpaceInDirectory()
|
||||
i += 1
|
||||
used.add(i)
|
||||
return i
|
||||
|
||||
def add_dirent(self, filename, filetype):
|
||||
index = self.get_free_dirent()
|
||||
dirent = self.dirent_class(None)
|
||||
dirent.set_values(filename, filetype, index)
|
||||
self.set(index, dirent)
|
||||
return dirent
|
||||
|
||||
def find_dirent(self, filename):
|
||||
for dirent in self.dirents.values():
|
||||
if filename == dirent.get_filename():
|
||||
return dirent
|
||||
raise FileNotFound("%s not found on disk" % filename)
|
||||
|
||||
def save_dirent(self, image, dirent, vtoc, sector_list):
|
||||
self.place_sector_list(dirent, vtoc, sector_list)
|
||||
dirent.update_sector_info(sector_list)
|
||||
self.calc_sectors(image)
|
||||
|
||||
def place_sector_list(self, dirent, vtoc, sector_list):
|
||||
""" Map out the sectors and link the sectors together
|
||||
|
||||
raises NotEnoughSpaceOnDisk if the whole file won't fit. It will not
|
||||
allow partial writes.
|
||||
"""
|
||||
sector_list.calc_extra_sectors()
|
||||
num = len(sector_list)
|
||||
order = vtoc.reserve_space(num)
|
||||
if len(order) != num:
|
||||
raise InvalidFile("VTOC reserved space for %d sectors. Sectors needed: %d" % (len(order), num))
|
||||
file_length = 0
|
||||
last_sector = None
|
||||
for sector, sector_num in zip(sector_list.sectors, order):
|
||||
sector.sector_num = sector_num
|
||||
sector.file_num = dirent.file_num
|
||||
file_length += sector.used
|
||||
if last_sector is not None:
|
||||
last_sector.next_sector_num = sector_num
|
||||
last_sector = sector
|
||||
if last_sector is not None:
|
||||
last_sector.next_sector_num = 0
|
||||
sector_list.file_length = file_length
|
||||
|
||||
def remove_dirent(self, image, dirent, vtoc, sector_list):
|
||||
vtoc.free_sector_list(sector_list)
|
||||
dirent.mark_deleted()
|
||||
self.calc_sectors(image)
|
||||
|
||||
@property
|
||||
def dirent_class(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def calc_sectors(self, image):
|
||||
self.sectors = []
|
||||
self.current_sector = self.get_dirent_sector()
|
||||
self.encode_index = 0
|
||||
|
||||
d = self.dirents.items()
|
||||
d.sort()
|
||||
# there may be gaps, so fill in missing entries with blanks
|
||||
current = 0
|
||||
for index, dirent in d:
|
||||
for missing in range(current, index):
|
||||
log.debug("Encoding empty dirent at %d" % missing)
|
||||
data = self.encode_empty()
|
||||
self.store_encoded(data)
|
||||
log.debug("Encoding dirent: %s" % dirent)
|
||||
data = self.encode_dirent(dirent)
|
||||
self.store_encoded(data)
|
||||
current = index + 1
|
||||
self.finish_encoding(image)
|
||||
|
||||
def get_dirent_sector(self):
|
||||
return self.sector_class(self.bytes_per_sector)
|
||||
|
||||
def encode_empty(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def encode_dirent(self, dirent):
|
||||
raise NotImplementedError
|
||||
|
||||
def store_encoded(self, data):
|
||||
while True:
|
||||
log.debug("store_encoded: %d bytes in %s" % (len(data), self.current_sector))
|
||||
data = self.current_sector.add_data(data)
|
||||
if len(data) > 0:
|
||||
self.sectors.append(self.current_sector)
|
||||
self.current_sector = self.get_dirent_sector()
|
||||
else:
|
||||
break
|
||||
|
||||
def finish_encoding(self, image):
|
||||
if not self.current_sector.is_empty:
|
||||
self.sectors.append(self.current_sector)
|
||||
self.set_sector_numbers(image)
|
||||
|
||||
def set_sector_numbers(self, image):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class VTOC(BaseSectorList):
|
||||
def __init__(self, bytes_per_sector, segments=None):
|
||||
BaseSectorList.__init__(self, bytes_per_sector)
|
||||
|
||||
# sector map: 1 is free, 0 is allocated
|
||||
self.sector_map = np.zeros([1280], dtype=np.uint8)
|
||||
if segments is not None:
|
||||
self.parse_segments(segments)
|
||||
|
||||
def parse_segments(self, segments):
|
||||
raise NotImplementedError
|
||||
|
||||
def reserve_space(self, num):
|
||||
order = []
|
||||
for i in range(num):
|
||||
order.append(self.get_next_free_sector())
|
||||
log.debug("Sectors reserved: %s" % order)
|
||||
self.calc_bitmap()
|
||||
return order
|
||||
|
||||
def get_next_free_sector(self):
|
||||
free = np.nonzero(self.sector_map)[0]
|
||||
if len(free) > 0:
|
||||
num = free[0]
|
||||
log.debug("Found sector %d free" % num)
|
||||
self.sector_map[num] = 0
|
||||
return num
|
||||
raise NotEnoughSpaceOnDisk("No space left in VTOC")
|
||||
|
||||
def calc_bitmap(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def free_sector_list(self, sector_list):
|
||||
for sector in sector_list:
|
||||
self.sector_map[sector.sector_num] = 1
|
||||
|
||||
|
||||
class SectorList(BaseSectorList):
|
||||
def __init__(self, bytes_per_sector, usable, data, sector_class):
|
||||
BaseSectorList.__init__(self, bytes_per_sector)
|
||||
self.data = to_numpy(data)
|
||||
self.usable_bytes = usable
|
||||
self.split_into_sectors(sector_class)
|
||||
self.file_length = -1
|
||||
|
||||
def split_into_sectors(self, sector_class):
|
||||
index = 0
|
||||
while index < len(self.data):
|
||||
count = min(self.usable_bytes, len(self.data) - index)
|
||||
sector = sector_class(self.bytes_per_sector, self.data[index:index + count])
|
||||
self.sectors.append(sector)
|
||||
index += count
|
||||
|
||||
|
||||
def calc_extra_sectors(self):
|
||||
""" Add extra sectors to the list.
|
||||
|
||||
For example, DOS 3.3 uses a track/sector list at the beginning of the
|
||||
file
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
|
||||
class BootDiskImage(DiskImageBase):
|
||||
def __str__(self):
|
||||
|
@ -1,7 +1,7 @@
|
||||
import numpy as np
|
||||
|
||||
from errors import *
|
||||
from diskimages import BaseHeader, DiskImageBase, Directory, VTOC, WriteableSector, BaseSectorList
|
||||
from diskimages import BaseHeader, DiskImageBase, Directory, VTOC, WriteableSector, BaseSectorList, SectorBuilder
|
||||
from segments import DefaultSegment, EmptySegment, ObjSegment, RawTrackSectorSegment, SegmentSaver
|
||||
|
||||
import logging
|
||||
@ -232,16 +232,16 @@ class Dos33Dirent(object):
|
||||
self.sector_map = sector_list
|
||||
|
||||
def get_sector_list(self, image):
|
||||
sector_list = BaseSectorList(image.bytes_per_sector)
|
||||
sector_list = BaseSectorList(image.header.bytes_per_sector)
|
||||
self.start_read(image)
|
||||
sector_num = image.header.sector_from_track(self.track, self.sector)
|
||||
while sector_num > 0:
|
||||
sector = WriteableSector(image.bytes_per_sector, None, sector_num)
|
||||
sector = WriteableSector(image.header.bytes_per_sector, None, sector_num)
|
||||
sector_list.append(sector)
|
||||
values, style = image.get_sectors(sector_num)
|
||||
sector = image.header.sector_from_track(values[1], values[2])
|
||||
for sector_num in sector_list:
|
||||
sector = WriteableSector(image.bytes_per_sector, None, sector_num)
|
||||
sector = WriteableSector(image.header.bytes_per_sector, None, sector_num)
|
||||
sector_list.append(sector)
|
||||
return sector_list
|
||||
|
||||
@ -279,6 +279,14 @@ class Dos33Dirent(object):
|
||||
self.deleted = False
|
||||
|
||||
|
||||
class Dos33SectorBuilder(SectorBuilder):
|
||||
def calc_extra_sectors(self):
|
||||
"""Add track/sector list
|
||||
"""
|
||||
for ts_group_start in range(0, len(self), self.header.ts_pairs):
|
||||
print ts_group_start
|
||||
|
||||
|
||||
class Dos33Header(BaseHeader):
|
||||
file_format = "DOS 3.3"
|
||||
|
||||
@ -331,6 +339,10 @@ class Dos33DiskImage(DiskImageBase):
|
||||
def raw_sector_class(self):
|
||||
return RawTrackSectorSegment
|
||||
|
||||
@property
|
||||
def sector_builder_class(self):
|
||||
return Dos33SectorBuilder
|
||||
|
||||
def get_boot_sector_info(self):
|
||||
# based on logic from a2server
|
||||
data, style = self.get_sectors(0)
|
||||
@ -370,12 +382,13 @@ class Dos33DiskImage(DiskImageBase):
|
||||
def get_vtoc(self):
|
||||
data, style = self.get_sectors(self.header.first_vtoc)
|
||||
values = data[0:self.vtoc_type.itemsize].view(dtype=self.vtoc_type)[0]
|
||||
self.first_directory = self.header.sector_from_track(values['cat_track'], values['cat_sector'])
|
||||
self.sector_size = int(values['bytes_per_sector'])
|
||||
self.max_sectors = int(values['num_tracks']) * int(values['sectors_per_track'])
|
||||
self.dos_release = values['dos_release']
|
||||
self.last_track_num = values['last_track']
|
||||
self.track_alloc_dir = values['track_dir']
|
||||
self.header.first_directory = self.header.sector_from_track(values['cat_track'], values['cat_sector'])
|
||||
self.header.sector_size = int(values['bytes_per_sector'])
|
||||
self.header.max_sectors = int(values['num_tracks']) * int(values['sectors_per_track'])
|
||||
self.header.ts_pairs = int(values['max_pairs'])
|
||||
self.header.dos_release = values['dos_release']
|
||||
self.header.last_track_num = values['last_track']
|
||||
self.header.track_alloc_dir = values['track_dir']
|
||||
self.assert_valid_sector(self.first_directory)
|
||||
|
||||
def get_directory(self, directory=None):
|
||||
|
267
atrcopy/utils.py
267
atrcopy/utils.py
@ -17,3 +17,270 @@ def to_numpy_list(value):
|
||||
if type(value) is np.ndarray:
|
||||
return value
|
||||
return np.asarray(value, dtype=np.uint32)
|
||||
|
||||
|
||||
class WriteableSector(object):
|
||||
def __init__(self, sector_size, data=None, num=-1):
|
||||
self._sector_num = num
|
||||
self._next_sector = 0
|
||||
self.sector_size = sector_size
|
||||
self.file_num = 0
|
||||
self.data = np.zeros([sector_size], dtype=np.uint8)
|
||||
self.used = 0
|
||||
self.ptr = self.used
|
||||
if data is not None:
|
||||
self.add_data(data)
|
||||
|
||||
def __str__(self):
|
||||
return "sector=%d next=%d size=%d used=%d" % (self._sector_num, self._next_sector, self.sector_size, self.used)
|
||||
|
||||
@property
|
||||
def sector_num(self):
|
||||
return self._sector_num
|
||||
|
||||
@sector_num.setter
|
||||
def sector_num(self, value):
|
||||
self._sector_num = value
|
||||
|
||||
@property
|
||||
def next_sector_num(self):
|
||||
return self._next_sector_num
|
||||
|
||||
@sector_num.setter
|
||||
def next_sector_num(self, value):
|
||||
self._next_sector_num = value
|
||||
|
||||
@property
|
||||
def space_remaining(self):
|
||||
return self.sector_size - self.ptr
|
||||
|
||||
@property
|
||||
def is_empty(self):
|
||||
return self.ptr == 0
|
||||
|
||||
def add_data(self, data):
|
||||
count = len(data)
|
||||
if self.ptr + count > self.sector_size:
|
||||
count = self.space_remaining
|
||||
self.data[self.ptr:self.ptr + count] = data[0:count]
|
||||
self.ptr += count
|
||||
self.used += count
|
||||
return data[count:]
|
||||
|
||||
|
||||
class BaseSectorList(object):
|
||||
def __init__(self, bytes_per_sector):
|
||||
self.bytes_per_sector = bytes_per_sector
|
||||
self.sectors = []
|
||||
|
||||
def __len__(self):
|
||||
return len(self.sectors)
|
||||
|
||||
def __getitem__(self, index):
|
||||
if index < 0 or index >= len(self):
|
||||
raise IndexError
|
||||
return self.sectors[index]
|
||||
|
||||
@property
|
||||
def num_sectors(self):
|
||||
return len(self.sectors)
|
||||
|
||||
@property
|
||||
def first_sector(self):
|
||||
if self.sectors:
|
||||
return self.sectors[0].sector_num
|
||||
return -1
|
||||
|
||||
def append(self, sector):
|
||||
self.sectors.append(sector)
|
||||
|
||||
|
||||
class Directory(BaseSectorList):
|
||||
def __init__(self, header, num_dirents=-1, sector_class=WriteableSector):
|
||||
BaseSectorList.__init__(self, header.bytes_per_sector)
|
||||
self.sector_class = sector_class
|
||||
self.num_dirents = num_dirents
|
||||
# number of dirents may be unlimited, so use a dict instead of a list
|
||||
self.dirents = {}
|
||||
|
||||
def set(self, index, dirent):
|
||||
self.dirents[index] = dirent
|
||||
log.debug("set dirent #%d: %s" % (index, dirent))
|
||||
|
||||
def get_free_dirent(self):
|
||||
used = set()
|
||||
d = self.dirents.items()
|
||||
d.sort()
|
||||
for i, dirent in d:
|
||||
if not dirent.in_use:
|
||||
return i
|
||||
used.add(i)
|
||||
if self.num_dirents > 0 and (len(used) >= self.num_dirents):
|
||||
raise NoSpaceInDirectory()
|
||||
i += 1
|
||||
used.add(i)
|
||||
return i
|
||||
|
||||
def add_dirent(self, filename, filetype):
|
||||
index = self.get_free_dirent()
|
||||
dirent = self.dirent_class(None)
|
||||
dirent.set_values(filename, filetype, index)
|
||||
self.set(index, dirent)
|
||||
return dirent
|
||||
|
||||
def find_dirent(self, filename):
|
||||
for dirent in self.dirents.values():
|
||||
if filename == dirent.get_filename():
|
||||
return dirent
|
||||
raise FileNotFound("%s not found on disk" % filename)
|
||||
|
||||
def save_dirent(self, image, dirent, vtoc, sector_list):
|
||||
self.place_sector_list(dirent, vtoc, sector_list)
|
||||
dirent.update_sector_info(sector_list)
|
||||
self.calc_sectors(image)
|
||||
|
||||
def place_sector_list(self, dirent, vtoc, sector_list):
|
||||
""" Map out the sectors and link the sectors together
|
||||
|
||||
raises NotEnoughSpaceOnDisk if the whole file won't fit. It will not
|
||||
allow partial writes.
|
||||
"""
|
||||
sector_list.calc_extra_sectors()
|
||||
num = len(sector_list)
|
||||
order = vtoc.reserve_space(num)
|
||||
if len(order) != num:
|
||||
raise InvalidFile("VTOC reserved space for %d sectors. Sectors needed: %d" % (len(order), num))
|
||||
file_length = 0
|
||||
last_sector = None
|
||||
for sector, sector_num in zip(sector_list.sectors, order):
|
||||
sector.sector_num = sector_num
|
||||
sector.file_num = dirent.file_num
|
||||
file_length += sector.used
|
||||
if last_sector is not None:
|
||||
last_sector.next_sector_num = sector_num
|
||||
last_sector = sector
|
||||
if last_sector is not None:
|
||||
last_sector.next_sector_num = 0
|
||||
sector_list.file_length = file_length
|
||||
|
||||
def remove_dirent(self, image, dirent, vtoc, sector_list):
|
||||
vtoc.free_sector_list(sector_list)
|
||||
dirent.mark_deleted()
|
||||
self.calc_sectors(image)
|
||||
|
||||
@property
|
||||
def dirent_class(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def calc_sectors(self, image):
|
||||
self.sectors = []
|
||||
self.current_sector = self.get_dirent_sector()
|
||||
self.encode_index = 0
|
||||
|
||||
d = self.dirents.items()
|
||||
d.sort()
|
||||
# there may be gaps, so fill in missing entries with blanks
|
||||
current = 0
|
||||
for index, dirent in d:
|
||||
for missing in range(current, index):
|
||||
log.debug("Encoding empty dirent at %d" % missing)
|
||||
data = self.encode_empty()
|
||||
self.store_encoded(data)
|
||||
log.debug("Encoding dirent: %s" % dirent)
|
||||
data = self.encode_dirent(dirent)
|
||||
self.store_encoded(data)
|
||||
current = index + 1
|
||||
self.finish_encoding(image)
|
||||
|
||||
def get_dirent_sector(self):
|
||||
return self.sector_class(self.bytes_per_sector)
|
||||
|
||||
def encode_empty(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def encode_dirent(self, dirent):
|
||||
raise NotImplementedError
|
||||
|
||||
def store_encoded(self, data):
|
||||
while True:
|
||||
log.debug("store_encoded: %d bytes in %s" % (len(data), self.current_sector))
|
||||
data = self.current_sector.add_data(data)
|
||||
if len(data) > 0:
|
||||
self.sectors.append(self.current_sector)
|
||||
self.current_sector = self.get_dirent_sector()
|
||||
else:
|
||||
break
|
||||
|
||||
def finish_encoding(self, image):
|
||||
if not self.current_sector.is_empty:
|
||||
self.sectors.append(self.current_sector)
|
||||
self.set_sector_numbers(image)
|
||||
|
||||
def set_sector_numbers(self, image):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class VTOC(BaseSectorList):
|
||||
def __init__(self, header, segments=None):
|
||||
BaseSectorList.__init__(self, header.bytes_per_sector)
|
||||
|
||||
# sector map: 1 is free, 0 is allocated
|
||||
self.sector_map = np.zeros([1280], dtype=np.uint8)
|
||||
if segments is not None:
|
||||
self.parse_segments(segments)
|
||||
|
||||
def parse_segments(self, segments):
|
||||
raise NotImplementedError
|
||||
|
||||
def reserve_space(self, num):
|
||||
order = []
|
||||
for i in range(num):
|
||||
order.append(self.get_next_free_sector())
|
||||
log.debug("Sectors reserved: %s" % order)
|
||||
self.calc_bitmap()
|
||||
return order
|
||||
|
||||
def get_next_free_sector(self):
|
||||
free = np.nonzero(self.sector_map)[0]
|
||||
if len(free) > 0:
|
||||
num = free[0]
|
||||
log.debug("Found sector %d free" % num)
|
||||
self.sector_map[num] = 0
|
||||
return num
|
||||
raise NotEnoughSpaceOnDisk("No space left in VTOC")
|
||||
|
||||
def calc_bitmap(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def free_sector_list(self, sector_list):
|
||||
for sector in sector_list:
|
||||
self.sector_map[sector.sector_num] = 1
|
||||
|
||||
|
||||
class SectorBuilder(BaseSectorList):
|
||||
def __init__(self, header, usable, data, sector_class):
|
||||
BaseSectorList.__init__(self, header.bytes_per_sector)
|
||||
self.data = to_numpy(data)
|
||||
self.usable_bytes = usable
|
||||
self.split_into_sectors(header)
|
||||
self.file_length = -1
|
||||
|
||||
def split_into_sectors(self, header):
|
||||
index = 0
|
||||
while index < len(self.data):
|
||||
count = min(self.usable_bytes, len(self.data) - index)
|
||||
sector = header.create_sector(self.data[index:index + count])
|
||||
self.sectors.append(sector)
|
||||
index += count
|
||||
|
||||
|
||||
def calc_extra_sectors(self):
|
||||
""" Add extra sectors to the list.
|
||||
|
||||
For example, DOS 3.3 uses a track/sector list at the beginning
|
||||
of the file
|
||||
|
||||
Sectors will have their sector assignments when this function is
|
||||
called.
|
||||
"""
|
||||
pass
|
||||
|
Loading…
Reference in New Issue
Block a user