mirror of
https://github.com/robmcmullen/atrcopy.git
synced 2025-04-11 22:36:57 +00:00
Added simple compressed container
This commit is contained in:
parent
b35361b125
commit
5d6e847541
@ -10,238 +10,20 @@ from .utils import to_numpy, to_numpy_list, uuid
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
|
||||
class DiskImageContainer:
|
||||
class Container:
|
||||
"""Disk image data storage and unpacker for disk image compression.
|
||||
|
||||
Segments point to this container and refer to the container's data rather
|
||||
than store copies.
|
||||
|
||||
Disk images may be stored as raw data or can be compressed by any number of
|
||||
techniques. Subclasses of DiskImageContainer implement the `unpack_bytes`
|
||||
techniques. Subclasses of Container implement the `unpack_bytes`
|
||||
method which examines the byte_data argument for the supported compression
|
||||
type, and if valid returns the unpacked bytes to be used in the disk image
|
||||
parsing.
|
||||
"""
|
||||
can_resize_default = False
|
||||
|
||||
base_serializable_attributes = ['origin', 'error', 'name', 'verbose_name', 'uuid', 'can_resize']
|
||||
extra_serializable_attributes = []
|
||||
|
||||
def __init__(self, data, style=None, origin=0, name="All", error=None, verbose_name=None, memory_map=None):
|
||||
self._data = None
|
||||
self._style = None
|
||||
self.set_data(data, style)
|
||||
|
||||
self.origin = int(origin) # force python int to decouple from possibly being a numpy datatype
|
||||
self.error = error
|
||||
self.name = name
|
||||
self.verbose_name = verbose_name
|
||||
self.uuid = uuid()
|
||||
if memory_map is None:
|
||||
memory_map = {}
|
||||
self.memory_map = memory_map
|
||||
self.comments = dict()
|
||||
self.user_data = dict()
|
||||
for i in range(1, style_bits.user_bit_mask):
|
||||
self.user_data[i] = dict()
|
||||
|
||||
# Some segments may be resized to contain additional segments not
|
||||
# present when the segment was created.
|
||||
self.can_resize = self.__class__.can_resize_default
|
||||
|
||||
#### initialization
|
||||
|
||||
def set_data(self, data, style):
|
||||
self.data = data
|
||||
self.style = style
|
||||
|
||||
#### properties
|
||||
|
||||
@property
|
||||
def data(self):
|
||||
return self._data
|
||||
|
||||
@data.setter
|
||||
def data(self, value):
|
||||
if self._data is not None:
|
||||
raise errors.ReadOnlyContainer("Container already populated with data")
|
||||
raw = value.tobytes()
|
||||
try:
|
||||
unpacked = self.unpack_bytes(raw)
|
||||
except EOFError as e:
|
||||
raise errors.InvalidContainer(e)
|
||||
self._data = to_numpy(unpacked)
|
||||
|
||||
@property
|
||||
def style(self):
|
||||
return self._style
|
||||
|
||||
@style.setter
|
||||
def style(self, value):
|
||||
if value is None:
|
||||
value = np.zeros(len(self._data), dtype=np.uint8)
|
||||
self._style = to_numpy(value)
|
||||
|
||||
@property
|
||||
def sha1(self):
|
||||
return hashlib.sha1(self.data).digest()
|
||||
|
||||
#### dunder methods
|
||||
|
||||
def __len__(self):
|
||||
return np.alen(self._data)
|
||||
|
||||
def __and__(self, other):
|
||||
return self._data & other
|
||||
|
||||
def __iand__(self, other):
|
||||
self._data &= other
|
||||
return self
|
||||
|
||||
def __getitem__(self, index):
|
||||
return self._data[index]
|
||||
|
||||
def __setitem__(self, index, value):
|
||||
self._data[index] = value
|
||||
|
||||
#### unpacking
|
||||
|
||||
def unpack_bytes(self, byte_data):
|
||||
"""Attempt to unpack `byte_data` using this unpacking algorithm.
|
||||
|
||||
`byte_data` is a byte string, and should return a byte string if
|
||||
successfully unpacked. Conversion to a numpy array will take place
|
||||
automatically, outside of this method.
|
||||
|
||||
If the data is not recognized by this subclass, raise an
|
||||
InvalidContainer exception. This signals to the caller that a different
|
||||
container type should be tried.
|
||||
|
||||
If the data is recognized by this subclass but the unpacking algorithm
|
||||
is not implemented, raise an UnsupportedContainer exception. This is
|
||||
different than the InvalidContainer exception because it indicates that
|
||||
the data was indeed recognized by this subclass (despite not being
|
||||
unpacked) and checking further containers is not necessary.
|
||||
"""
|
||||
return byte_data
|
||||
|
||||
#### packing
|
||||
|
||||
def pack_data(self, np_data):
|
||||
"""Pack `np_data` using this packing algorithm
|
||||
|
||||
`np_data` is numpy data, as this function is xpected to be called from
|
||||
the data held in a SourceSegment
|
||||
"""
|
||||
return np_data
|
||||
|
||||
#### serialization
|
||||
|
||||
def __getstate__(self):
|
||||
"""Custom jsonpickle state save routine
|
||||
|
||||
This routine culls down the list of attributes that should be
|
||||
serialized, and in some cases changes their format slightly so they
|
||||
have a better mapping to json objects. For instance, json can't handle
|
||||
dicts with integer keys, so dicts are turned into lists of lists.
|
||||
Tuples are also turned into lists because tuples don't have a direct
|
||||
representation in json, while lists have a compact representation in
|
||||
json.
|
||||
"""
|
||||
state = dict()
|
||||
for key in self.base_serializable_attributes:
|
||||
state[key] = getattr(self, key)
|
||||
for key in self.extra_serializable_attributes:
|
||||
state[key] = getattr(self, key)
|
||||
r = self.rawdata
|
||||
state['memory_map'] = sorted([list(i) for i in self.memory_map.items()])
|
||||
state['comment ranges'] = [list(a) for a in self.get_style_ranges(comment=True)]
|
||||
state['data ranges'] = [list(a) for a in self.get_style_ranges(data=True)]
|
||||
for i in range(1, style_bits.user_bit_mask):
|
||||
r = [list(a) for a in self.get_style_ranges(user=i)]
|
||||
if r:
|
||||
slot = "user style %d" % i
|
||||
state[slot] = r
|
||||
|
||||
# json serialization doesn't allow int keys, so convert to list of
|
||||
# pairs
|
||||
state['comments'] = self.get_sorted_comments()
|
||||
return state
|
||||
|
||||
def __setstate__(self, state):
|
||||
"""Custom jsonpickle state restore routine
|
||||
|
||||
The use of jsonpickle to recreate objects doesn't go through __init__,
|
||||
so there will be missing attributes when restoring old versions of the
|
||||
json. Once a version gets out in the wild and additional attributes are
|
||||
added to a segment, a default value should be applied here.
|
||||
"""
|
||||
self.memory_map = dict(state.pop('memory_map', []))
|
||||
self.uuid = state.pop('uuid', uuid())
|
||||
self.can_resize = state.pop('can_resize', self.__class__.can_resize_default)
|
||||
comments = state.pop('comments', {})
|
||||
for k, v in e['comments']:
|
||||
self.comments[k] = v
|
||||
ranges = state.pop('comment ranges')
|
||||
if 'comment ranges' in e:
|
||||
self.set_style_ranges(e['comment ranges'], comment=True)
|
||||
if 'data ranges' in e:
|
||||
self.set_style_ranges(e['data ranges'], user=data_style)
|
||||
if 'display list ranges' in e:
|
||||
# DEPRECATED, but supported on read. Converts display list to
|
||||
# disassembly type 0 for user index 1
|
||||
self.set_style_ranges(e['display list ranges'], data=True, user=1)
|
||||
self.set_user_data(e['display list ranges'], 1, 0)
|
||||
if 'user ranges 1' in e:
|
||||
# DEPRECATED, but supported on read. Converts user extra data 0
|
||||
# (antic dl), 1 (jumpman level), and 2 (jumpman harvest) to user
|
||||
# styles 2, 3, and 4. Data is now user style 1.
|
||||
for r, val in e['user ranges 1']:
|
||||
self.set_style_ranges([r], user=val + 2)
|
||||
for i in range(1, style_bits.user_bit_mask):
|
||||
slot = "user style %d" % i
|
||||
if slot in e:
|
||||
self.set_style_ranges(e[slot], user=i)
|
||||
self.restore_missing_serializable_defaults()
|
||||
self.__dict__.update(state)
|
||||
self.restore_renamed_serializable_attributes()
|
||||
|
||||
#### style
|
||||
|
||||
def set_style_at_indexes(self, indexes, **kwargs):
|
||||
style_bits = get_style_bits(**kwargs)
|
||||
self._style[indexes] |= style_bits
|
||||
|
||||
def clear_style_at_indexes(self, indexes, **kwargs):
|
||||
style_mask = get_style_mask(**kwargs)
|
||||
self.style[indexes] &= style_mask
|
||||
|
||||
def get_style_at_indexes(self, **kwargs):
|
||||
"""Return a list of start, end pairs that match the specified style
|
||||
"""
|
||||
style_bits = self.get_style_bits(**kwargs)
|
||||
matches = (self._style & style_bits) == style_bits
|
||||
return self.bool_to_ranges(matches)
|
||||
|
||||
def fixup_comments(self):
|
||||
"""Remove any style bytes that are marked as commented but have no
|
||||
comment, and add any style bytes where there's a comment but it isn't
|
||||
marked in the style data.
|
||||
|
||||
This happens on the base data, so only need to do this on one segment
|
||||
that uses this base data.
|
||||
"""
|
||||
style_base = self.rawdata.style_base
|
||||
comment_text_indexes = np.asarray(list(self.rawdata.extra.comments.keys()), dtype=np.uint32)
|
||||
comment_mask = self.get_style_mask(comment=True)
|
||||
has_comments = np.where(style_base & style_bits.comment_bit_mask > 0)[0]
|
||||
both = np.intersect1d(comment_text_indexes, has_comments)
|
||||
log.info("fixup comments: %d correctly marked, %d without style, %d empty text" % (np.alen(both), np.alen(comment_text_indexes) - np.alen(both), np.alen(has_comments) - np.alen(both)))
|
||||
style_base &= comment_mask
|
||||
comment_style = self.get_style_bits(comment=True)
|
||||
style_base[comment_text_indexes] |= comment_style
|
||||
|
||||
|
||||
def find_containers():
|
||||
@ -249,24 +31,22 @@ def find_containers():
|
||||
for entry_point in pkg_resources.iter_entry_points('atrcopy.containers'):
|
||||
mod = entry_point.load()
|
||||
log.debug(f"find_container: Found module {entry_point.name}={mod.__name__}")
|
||||
for name, obj in inspect.getmembers(mod):
|
||||
if inspect.isclass(obj) and DiskImageContainer in obj.__mro__[1:]:
|
||||
log.debug(f"find_containers: found container class {name}")
|
||||
containers.append(obj)
|
||||
containers.append(mod)
|
||||
return containers
|
||||
|
||||
|
||||
def guess_container(r, verbose=False):
|
||||
def guess_container(raw_data):
|
||||
uncompressed = raw_data
|
||||
for c in find_containers():
|
||||
if verbose:
|
||||
log.info(f"trying container {c}")
|
||||
log.info(f"trying container {c}")
|
||||
try:
|
||||
found = c(r)
|
||||
uncompressed = c.unpack_bytes(raw_data)
|
||||
except errors.InvalidContainer as e:
|
||||
continue
|
||||
else:
|
||||
if verbose:
|
||||
log.info(f"found container {c}")
|
||||
return found
|
||||
log.info(f"image does not appear to be compressed.")
|
||||
return DiskImageContainer(r)
|
||||
log.info(f"found container {c}")
|
||||
break
|
||||
else:
|
||||
c = None
|
||||
log.info(f"image does not appear to be compressed.")
|
||||
return c, uncompressed
|
||||
|
@ -1,18 +1,24 @@
|
||||
import bz2
|
||||
import io
|
||||
|
||||
import numpy as np
|
||||
|
||||
from . import errors
|
||||
from .utils import to_numpy
|
||||
from .. import errors
|
||||
|
||||
|
||||
class BZipContainer(DiskImageContainer):
|
||||
def unpack_bytes(self, byte_data):
|
||||
try:
|
||||
buf = io.BytesIO(byte_data)
|
||||
with bz2.BZ2File(buf, mode='rb') as f:
|
||||
unpacked = f.read()
|
||||
except OSError as e:
|
||||
raise errors.InvalidContainer(e)
|
||||
return unpacked
|
||||
name = "bzip"
|
||||
|
||||
def unpack_bytes(byte_data):
|
||||
try:
|
||||
buf = io.BytesIO(byte_data)
|
||||
with bz2.BZ2File(buf, mode='rb') as f:
|
||||
unpacked = f.read()
|
||||
except OSError as e:
|
||||
raise errors.InvalidContainer(e)
|
||||
return unpacked
|
||||
|
||||
|
||||
def pack_bytes(media_container):
|
||||
"""Pack the container using this packing algorithm
|
||||
|
||||
Return a byte string suitable to be written to disk
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
@ -1,47 +1,57 @@
|
||||
import numpy as np
|
||||
|
||||
from .. import errors
|
||||
from ..container import DiskImageContainer
|
||||
|
||||
|
||||
class DCMContainer(DiskImageContainer):
|
||||
valid_densities = {
|
||||
0: (720, 128),
|
||||
1: (720, 256),
|
||||
2: (1040, 128),
|
||||
}
|
||||
name = "dcm"
|
||||
|
||||
valid_densities = {
|
||||
0: (720, 128),
|
||||
1: (720, 256),
|
||||
2: (1040, 128),
|
||||
}
|
||||
|
||||
|
||||
def unpack_bytes(data):
|
||||
index = 0
|
||||
count = len(data)
|
||||
raw = data
|
||||
|
||||
def get_next():
|
||||
nonlocal index, raw
|
||||
|
||||
def get_next(self):
|
||||
try:
|
||||
data = self.raw[self.index]
|
||||
data = raw[index]
|
||||
except IndexError:
|
||||
raise errors.InvalidContainer("Incomplete DCM file")
|
||||
else:
|
||||
self.index += 1
|
||||
index += 1
|
||||
return data
|
||||
|
||||
def unpack_bytes(self, data):
|
||||
self.index = 0
|
||||
self.count = len(data)
|
||||
self.raw = data
|
||||
archive_type = self.get_next()
|
||||
if archive_type == 0xf9 or archive_type == 0xfa:
|
||||
archive_flags = self.get_next()
|
||||
if archive_flags & 0x1f != 1:
|
||||
if archive_type == 0xf9:
|
||||
raise errors.InvalidContainer("DCM multi-file archive combined in the wrong order")
|
||||
else:
|
||||
raise errors.InvalidContainer("Expected pass one of DCM archive first")
|
||||
density_flag = (archive_flags >> 5) & 3
|
||||
if density_flag not in self.valid_densities:
|
||||
raise errors.InvalidContainer(f"Unsupported density flag {density_flag} in DCM")
|
||||
else:
|
||||
raise errors.InvalidContainer("Not a DCM file")
|
||||
archive_type = get_next()
|
||||
if archive_type == 0xf9 or archive_type == 0xfa:
|
||||
archive_flags = get_next()
|
||||
if archive_flags & 0x1f != 1:
|
||||
if archive_type == 0xf9:
|
||||
raise errors.InvalidContainer("DCM multi-file archive combined in the wrong order")
|
||||
else:
|
||||
raise errors.InvalidContainer("Expected pass one of DCM archive first")
|
||||
density_flag = (archive_flags >> 5) & 3
|
||||
if density_flag not in valid_densities:
|
||||
raise errors.InvalidContainer(f"Unsupported density flag {density_flag} in DCM")
|
||||
else:
|
||||
raise errors.InvalidContainer("Not a DCM file")
|
||||
|
||||
# DCM decoding goes here. Currently, instead of decoding it raises the
|
||||
# UnsupportedContainer exception, which signals to the caller that the
|
||||
# container has been successfully identified but can't be parsed.
|
||||
#
|
||||
# When decoding is supported, return the decoded byte array instead of
|
||||
# this exception.
|
||||
raise errors.UnsupportedContainer("DCM archives are not yet supported")
|
||||
# DCM decoding goes here. Currently, instead of decoding it raises the
|
||||
# UnsupportedContainer exception, which signals to the caller that the
|
||||
# container has been successfully identified but can't be parsed.
|
||||
#
|
||||
# When decoding is supported, return the decoded byte array instead of
|
||||
# this exception.
|
||||
raise errors.UnsupportedContainer("DCM archives are not yet supported")
|
||||
|
||||
|
||||
def pack_bytes(media_container):
|
||||
"""Pack the container using this packing algorithm
|
||||
|
||||
Return a byte string suitable to be written to disk
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
@ -1,18 +1,24 @@
|
||||
import gzip
|
||||
import io
|
||||
|
||||
import numpy as np
|
||||
|
||||
from . import errors
|
||||
from .utils import to_numpy
|
||||
from .. import errors
|
||||
|
||||
|
||||
class GZipContainer(DiskImageContainer):
|
||||
def unpack_bytes(self, byte_data):
|
||||
try:
|
||||
buf = io.BytesIO(byte_data)
|
||||
with gzip.GzipFile(mode='rb', fileobj=buf) as f:
|
||||
unpacked = f.read()
|
||||
except OSError as e:
|
||||
raise errors.InvalidContainer(e)
|
||||
return unpacked
|
||||
name = "gzip"
|
||||
|
||||
def unpack_bytes(byte_data):
|
||||
try:
|
||||
buf = io.BytesIO(byte_data)
|
||||
with gzip.GzipFile(mode='rb', fileobj=buf) as f:
|
||||
unpacked = f.read()
|
||||
except OSError as e:
|
||||
raise errors.InvalidContainer(e)
|
||||
return unpacked
|
||||
|
||||
|
||||
def pack_bytes(media_container):
|
||||
"""Pack the container using this packing algorithm
|
||||
|
||||
Return a byte string suitable to be written to disk
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
@ -1,18 +1,24 @@
|
||||
import lzma
|
||||
import io
|
||||
|
||||
import numpy as np
|
||||
|
||||
from . import errors
|
||||
from .utils import to_numpy
|
||||
from .. import errors
|
||||
|
||||
|
||||
class LZMAContainer(DiskImageContainer):
|
||||
def unpack_bytes(self, byte_data):
|
||||
try:
|
||||
buf = io.BytesIO(byte_data)
|
||||
with lzma.LZMAFile(buf, mode='rb') as f:
|
||||
unpacked = f.read()
|
||||
except lzma.LZMAError as e:
|
||||
raise errors.InvalidContainer(e)
|
||||
return unpacked
|
||||
name = "lzma"
|
||||
|
||||
def unpack_bytes(byte_data):
|
||||
try:
|
||||
buf = io.BytesIO(byte_data)
|
||||
with lzma.LZMAFile(buf, mode='rb') as f:
|
||||
unpacked = f.read()
|
||||
except lzma.LZMAError as e:
|
||||
raise errors.InvalidContainer(e)
|
||||
return unpacked
|
||||
|
||||
|
||||
def pack_bytes(media_container):
|
||||
"""Pack the container using this packing algorithm
|
||||
|
||||
Return a byte string suitable to be written to disk
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
@ -5,8 +5,6 @@ import numpy as np
|
||||
from mock import *
|
||||
|
||||
from atrcopy.container import guess_container
|
||||
from atrcopy.parser import iter_parsers
|
||||
from atrcopy import get_xex, interleave_segments, user_bit_mask, diff_bit_mask
|
||||
from atrcopy import errors
|
||||
|
||||
|
||||
@ -14,18 +12,22 @@ class BaseContainerTest:
|
||||
base_path = None
|
||||
expected_mime = ""
|
||||
|
||||
@pytest.mark.parametrize("ext", ['.gz', '.bz2', '.xz', '.dcm'])
|
||||
def test_container(self, ext):
|
||||
@pytest.mark.parametrize(("ext", "mod_name"), [
|
||||
('.gz', 'gzip'),
|
||||
('.bz2', 'bzip'),
|
||||
('.xz', 'lzma'),
|
||||
('.dcm', 'dcm'),
|
||||
])
|
||||
def test_container(self, ext, mod_name):
|
||||
pathname = self.base_path + ext
|
||||
try:
|
||||
sample_data = np.fromfile(pathname, dtype=np.uint8)
|
||||
except OSError:
|
||||
pass
|
||||
else:
|
||||
container = guess_container(sample_data)
|
||||
mime, parser = iter_parsers(container)
|
||||
assert mime == self.expected_mime
|
||||
assert len(parser.image.files) == self.num_files_in_sample
|
||||
container, uncompressed_data = guess_container(sample_data)
|
||||
print(container.name)
|
||||
assert container.name == mod_name
|
||||
|
||||
class TestContainerAtariDosSDImage(BaseContainerTest):
|
||||
base_path = "../test_data/container_dos_sd_test1.atr"
|
||||
@ -41,3 +43,9 @@ class TestContainerAtariDosDDImage(BaseContainerTest):
|
||||
base_path = "../test_data/container_dos_dd_test1.atr"
|
||||
expected_mime = "application/vnd.atari8bit.atr"
|
||||
num_files_in_sample = 5
|
||||
|
||||
if __name__ == "__main__":
|
||||
import logging
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
c = TestContainerAtariDosSDImage()
|
||||
c.test_container(".gz", "gzip")
|
||||
|
Loading…
x
Reference in New Issue
Block a user