diff --git a/atrcopy/container.py b/atrcopy/container.py index 7668963..df52a37 100644 --- a/atrcopy/container.py +++ b/atrcopy/container.py @@ -10,238 +10,20 @@ from .utils import to_numpy, to_numpy_list, uuid import logging log = logging.getLogger(__name__) +log.setLevel(logging.INFO) - -class DiskImageContainer: +class Container: """Disk image data storage and unpacker for disk image compression. Segments point to this container and refer to the container's data rather than store copies. Disk images may be stored as raw data or can be compressed by any number of - techniques. Subclasses of DiskImageContainer implement the `unpack_bytes` + techniques. Subclasses of Container implement the `unpack_bytes` method which examines the byte_data argument for the supported compression type, and if valid returns the unpacked bytes to be used in the disk image parsing. """ - can_resize_default = False - - base_serializable_attributes = ['origin', 'error', 'name', 'verbose_name', 'uuid', 'can_resize'] - extra_serializable_attributes = [] - - def __init__(self, data, style=None, origin=0, name="All", error=None, verbose_name=None, memory_map=None): - self._data = None - self._style = None - self.set_data(data, style) - - self.origin = int(origin) # force python int to decouple from possibly being a numpy datatype - self.error = error - self.name = name - self.verbose_name = verbose_name - self.uuid = uuid() - if memory_map is None: - memory_map = {} - self.memory_map = memory_map - self.comments = dict() - self.user_data = dict() - for i in range(1, style_bits.user_bit_mask): - self.user_data[i] = dict() - - # Some segments may be resized to contain additional segments not - # present when the segment was created. - self.can_resize = self.__class__.can_resize_default - - #### initialization - - def set_data(self, data, style): - self.data = data - self.style = style - - #### properties - - @property - def data(self): - return self._data - - @data.setter - def data(self, value): - if self._data is not None: - raise errors.ReadOnlyContainer("Container already populated with data") - raw = value.tobytes() - try: - unpacked = self.unpack_bytes(raw) - except EOFError as e: - raise errors.InvalidContainer(e) - self._data = to_numpy(unpacked) - - @property - def style(self): - return self._style - - @style.setter - def style(self, value): - if value is None: - value = np.zeros(len(self._data), dtype=np.uint8) - self._style = to_numpy(value) - - @property - def sha1(self): - return hashlib.sha1(self.data).digest() - - #### dunder methods - - def __len__(self): - return np.alen(self._data) - - def __and__(self, other): - return self._data & other - - def __iand__(self, other): - self._data &= other - return self - - def __getitem__(self, index): - return self._data[index] - - def __setitem__(self, index, value): - self._data[index] = value - - #### unpacking - - def unpack_bytes(self, byte_data): - """Attempt to unpack `byte_data` using this unpacking algorithm. - - `byte_data` is a byte string, and should return a byte string if - successfully unpacked. Conversion to a numpy array will take place - automatically, outside of this method. - - If the data is not recognized by this subclass, raise an - InvalidContainer exception. This signals to the caller that a different - container type should be tried. - - If the data is recognized by this subclass but the unpacking algorithm - is not implemented, raise an UnsupportedContainer exception. This is - different than the InvalidContainer exception because it indicates that - the data was indeed recognized by this subclass (despite not being - unpacked) and checking further containers is not necessary. - """ - return byte_data - - #### packing - - def pack_data(self, np_data): - """Pack `np_data` using this packing algorithm - - `np_data` is numpy data, as this function is xpected to be called from - the data held in a SourceSegment - """ - return np_data - - #### serialization - - def __getstate__(self): - """Custom jsonpickle state save routine - - This routine culls down the list of attributes that should be - serialized, and in some cases changes their format slightly so they - have a better mapping to json objects. For instance, json can't handle - dicts with integer keys, so dicts are turned into lists of lists. - Tuples are also turned into lists because tuples don't have a direct - representation in json, while lists have a compact representation in - json. - """ - state = dict() - for key in self.base_serializable_attributes: - state[key] = getattr(self, key) - for key in self.extra_serializable_attributes: - state[key] = getattr(self, key) - r = self.rawdata - state['memory_map'] = sorted([list(i) for i in self.memory_map.items()]) - state['comment ranges'] = [list(a) for a in self.get_style_ranges(comment=True)] - state['data ranges'] = [list(a) for a in self.get_style_ranges(data=True)] - for i in range(1, style_bits.user_bit_mask): - r = [list(a) for a in self.get_style_ranges(user=i)] - if r: - slot = "user style %d" % i - state[slot] = r - - # json serialization doesn't allow int keys, so convert to list of - # pairs - state['comments'] = self.get_sorted_comments() - return state - - def __setstate__(self, state): - """Custom jsonpickle state restore routine - - The use of jsonpickle to recreate objects doesn't go through __init__, - so there will be missing attributes when restoring old versions of the - json. Once a version gets out in the wild and additional attributes are - added to a segment, a default value should be applied here. - """ - self.memory_map = dict(state.pop('memory_map', [])) - self.uuid = state.pop('uuid', uuid()) - self.can_resize = state.pop('can_resize', self.__class__.can_resize_default) - comments = state.pop('comments', {}) - for k, v in e['comments']: - self.comments[k] = v - ranges = state.pop('comment ranges') - if 'comment ranges' in e: - self.set_style_ranges(e['comment ranges'], comment=True) - if 'data ranges' in e: - self.set_style_ranges(e['data ranges'], user=data_style) - if 'display list ranges' in e: - # DEPRECATED, but supported on read. Converts display list to - # disassembly type 0 for user index 1 - self.set_style_ranges(e['display list ranges'], data=True, user=1) - self.set_user_data(e['display list ranges'], 1, 0) - if 'user ranges 1' in e: - # DEPRECATED, but supported on read. Converts user extra data 0 - # (antic dl), 1 (jumpman level), and 2 (jumpman harvest) to user - # styles 2, 3, and 4. Data is now user style 1. - for r, val in e['user ranges 1']: - self.set_style_ranges([r], user=val + 2) - for i in range(1, style_bits.user_bit_mask): - slot = "user style %d" % i - if slot in e: - self.set_style_ranges(e[slot], user=i) - self.restore_missing_serializable_defaults() - self.__dict__.update(state) - self.restore_renamed_serializable_attributes() - - #### style - - def set_style_at_indexes(self, indexes, **kwargs): - style_bits = get_style_bits(**kwargs) - self._style[indexes] |= style_bits - - def clear_style_at_indexes(self, indexes, **kwargs): - style_mask = get_style_mask(**kwargs) - self.style[indexes] &= style_mask - - def get_style_at_indexes(self, **kwargs): - """Return a list of start, end pairs that match the specified style - """ - style_bits = self.get_style_bits(**kwargs) - matches = (self._style & style_bits) == style_bits - return self.bool_to_ranges(matches) - - def fixup_comments(self): - """Remove any style bytes that are marked as commented but have no - comment, and add any style bytes where there's a comment but it isn't - marked in the style data. - - This happens on the base data, so only need to do this on one segment - that uses this base data. - """ - style_base = self.rawdata.style_base - comment_text_indexes = np.asarray(list(self.rawdata.extra.comments.keys()), dtype=np.uint32) - comment_mask = self.get_style_mask(comment=True) - has_comments = np.where(style_base & style_bits.comment_bit_mask > 0)[0] - both = np.intersect1d(comment_text_indexes, has_comments) - log.info("fixup comments: %d correctly marked, %d without style, %d empty text" % (np.alen(both), np.alen(comment_text_indexes) - np.alen(both), np.alen(has_comments) - np.alen(both))) - style_base &= comment_mask - comment_style = self.get_style_bits(comment=True) - style_base[comment_text_indexes] |= comment_style def find_containers(): @@ -249,24 +31,22 @@ def find_containers(): for entry_point in pkg_resources.iter_entry_points('atrcopy.containers'): mod = entry_point.load() log.debug(f"find_container: Found module {entry_point.name}={mod.__name__}") - for name, obj in inspect.getmembers(mod): - if inspect.isclass(obj) and DiskImageContainer in obj.__mro__[1:]: - log.debug(f"find_containers: found container class {name}") - containers.append(obj) + containers.append(mod) return containers -def guess_container(r, verbose=False): +def guess_container(raw_data): + uncompressed = raw_data for c in find_containers(): - if verbose: - log.info(f"trying container {c}") + log.info(f"trying container {c}") try: - found = c(r) + uncompressed = c.unpack_bytes(raw_data) except errors.InvalidContainer as e: continue else: - if verbose: - log.info(f"found container {c}") - return found - log.info(f"image does not appear to be compressed.") - return DiskImageContainer(r) + log.info(f"found container {c}") + break + else: + c = None + log.info(f"image does not appear to be compressed.") + return c, uncompressed diff --git a/atrcopy/containers/bzip.py b/atrcopy/containers/bzip.py index 7325804..d039fa1 100644 --- a/atrcopy/containers/bzip.py +++ b/atrcopy/containers/bzip.py @@ -1,18 +1,24 @@ import bz2 import io -import numpy as np - -from . import errors -from .utils import to_numpy +from .. import errors -class BZipContainer(DiskImageContainer): - def unpack_bytes(self, byte_data): - try: - buf = io.BytesIO(byte_data) - with bz2.BZ2File(buf, mode='rb') as f: - unpacked = f.read() - except OSError as e: - raise errors.InvalidContainer(e) - return unpacked +name = "bzip" + +def unpack_bytes(byte_data): + try: + buf = io.BytesIO(byte_data) + with bz2.BZ2File(buf, mode='rb') as f: + unpacked = f.read() + except OSError as e: + raise errors.InvalidContainer(e) + return unpacked + + +def pack_bytes(media_container): + """Pack the container using this packing algorithm + + Return a byte string suitable to be written to disk + """ + raise NotImplementedError diff --git a/atrcopy/containers/dcm.py b/atrcopy/containers/dcm.py index 8cbbf3e..41db8e7 100644 --- a/atrcopy/containers/dcm.py +++ b/atrcopy/containers/dcm.py @@ -1,47 +1,57 @@ -import numpy as np - from .. import errors -from ..container import DiskImageContainer -class DCMContainer(DiskImageContainer): - valid_densities = { - 0: (720, 128), - 1: (720, 256), - 2: (1040, 128), - } +name = "dcm" + +valid_densities = { + 0: (720, 128), + 1: (720, 256), + 2: (1040, 128), +} + + +def unpack_bytes(data): + index = 0 + count = len(data) + raw = data + + def get_next(): + nonlocal index, raw - def get_next(self): try: - data = self.raw[self.index] + data = raw[index] except IndexError: raise errors.InvalidContainer("Incomplete DCM file") else: - self.index += 1 + index += 1 return data - def unpack_bytes(self, data): - self.index = 0 - self.count = len(data) - self.raw = data - archive_type = self.get_next() - if archive_type == 0xf9 or archive_type == 0xfa: - archive_flags = self.get_next() - if archive_flags & 0x1f != 1: - if archive_type == 0xf9: - raise errors.InvalidContainer("DCM multi-file archive combined in the wrong order") - else: - raise errors.InvalidContainer("Expected pass one of DCM archive first") - density_flag = (archive_flags >> 5) & 3 - if density_flag not in self.valid_densities: - raise errors.InvalidContainer(f"Unsupported density flag {density_flag} in DCM") - else: - raise errors.InvalidContainer("Not a DCM file") + archive_type = get_next() + if archive_type == 0xf9 or archive_type == 0xfa: + archive_flags = get_next() + if archive_flags & 0x1f != 1: + if archive_type == 0xf9: + raise errors.InvalidContainer("DCM multi-file archive combined in the wrong order") + else: + raise errors.InvalidContainer("Expected pass one of DCM archive first") + density_flag = (archive_flags >> 5) & 3 + if density_flag not in valid_densities: + raise errors.InvalidContainer(f"Unsupported density flag {density_flag} in DCM") + else: + raise errors.InvalidContainer("Not a DCM file") - # DCM decoding goes here. Currently, instead of decoding it raises the - # UnsupportedContainer exception, which signals to the caller that the - # container has been successfully identified but can't be parsed. - # - # When decoding is supported, return the decoded byte array instead of - # this exception. - raise errors.UnsupportedContainer("DCM archives are not yet supported") + # DCM decoding goes here. Currently, instead of decoding it raises the + # UnsupportedContainer exception, which signals to the caller that the + # container has been successfully identified but can't be parsed. + # + # When decoding is supported, return the decoded byte array instead of + # this exception. + raise errors.UnsupportedContainer("DCM archives are not yet supported") + + +def pack_bytes(media_container): + """Pack the container using this packing algorithm + + Return a byte string suitable to be written to disk + """ + raise NotImplementedError diff --git a/atrcopy/containers/gzip.py b/atrcopy/containers/gzip.py index c988ee5..1277637 100644 --- a/atrcopy/containers/gzip.py +++ b/atrcopy/containers/gzip.py @@ -1,18 +1,24 @@ import gzip import io -import numpy as np - -from . import errors -from .utils import to_numpy +from .. import errors -class GZipContainer(DiskImageContainer): - def unpack_bytes(self, byte_data): - try: - buf = io.BytesIO(byte_data) - with gzip.GzipFile(mode='rb', fileobj=buf) as f: - unpacked = f.read() - except OSError as e: - raise errors.InvalidContainer(e) - return unpacked +name = "gzip" + +def unpack_bytes(byte_data): + try: + buf = io.BytesIO(byte_data) + with gzip.GzipFile(mode='rb', fileobj=buf) as f: + unpacked = f.read() + except OSError as e: + raise errors.InvalidContainer(e) + return unpacked + + +def pack_bytes(media_container): + """Pack the container using this packing algorithm + + Return a byte string suitable to be written to disk + """ + raise NotImplementedError diff --git a/atrcopy/containers/lzma.py b/atrcopy/containers/lzma.py index e5b9fa0..3712b42 100644 --- a/atrcopy/containers/lzma.py +++ b/atrcopy/containers/lzma.py @@ -1,18 +1,24 @@ import lzma import io -import numpy as np - -from . import errors -from .utils import to_numpy +from .. import errors -class LZMAContainer(DiskImageContainer): - def unpack_bytes(self, byte_data): - try: - buf = io.BytesIO(byte_data) - with lzma.LZMAFile(buf, mode='rb') as f: - unpacked = f.read() - except lzma.LZMAError as e: - raise errors.InvalidContainer(e) - return unpacked +name = "lzma" + +def unpack_bytes(byte_data): + try: + buf = io.BytesIO(byte_data) + with lzma.LZMAFile(buf, mode='rb') as f: + unpacked = f.read() + except lzma.LZMAError as e: + raise errors.InvalidContainer(e) + return unpacked + + +def pack_bytes(media_container): + """Pack the container using this packing algorithm + + Return a byte string suitable to be written to disk + """ + raise NotImplementedError diff --git a/test/test_container.py b/test/test_container.py index c227ade..655540e 100644 --- a/test/test_container.py +++ b/test/test_container.py @@ -5,8 +5,6 @@ import numpy as np from mock import * from atrcopy.container import guess_container -from atrcopy.parser import iter_parsers -from atrcopy import get_xex, interleave_segments, user_bit_mask, diff_bit_mask from atrcopy import errors @@ -14,18 +12,22 @@ class BaseContainerTest: base_path = None expected_mime = "" - @pytest.mark.parametrize("ext", ['.gz', '.bz2', '.xz', '.dcm']) - def test_container(self, ext): + @pytest.mark.parametrize(("ext", "mod_name"), [ + ('.gz', 'gzip'), + ('.bz2', 'bzip'), + ('.xz', 'lzma'), + ('.dcm', 'dcm'), + ]) + def test_container(self, ext, mod_name): pathname = self.base_path + ext try: sample_data = np.fromfile(pathname, dtype=np.uint8) except OSError: pass else: - container = guess_container(sample_data) - mime, parser = iter_parsers(container) - assert mime == self.expected_mime - assert len(parser.image.files) == self.num_files_in_sample + container, uncompressed_data = guess_container(sample_data) + print(container.name) + assert container.name == mod_name class TestContainerAtariDosSDImage(BaseContainerTest): base_path = "../test_data/container_dos_sd_test1.atr" @@ -41,3 +43,9 @@ class TestContainerAtariDosDDImage(BaseContainerTest): base_path = "../test_data/container_dos_dd_test1.atr" expected_mime = "application/vnd.atari8bit.atr" num_files_in_sample = 5 + +if __name__ == "__main__": + import logging + logging.basicConfig(level=logging.DEBUG) + c = TestContainerAtariDosSDImage() + c.test_container(".gz", "gzip")