From 697432c8f8c6e6d416ca50668d690444de1804e9 Mon Sep 17 00:00:00 2001 From: Rob McMullen Date: Mon, 25 Mar 2019 10:12:23 -0700 Subject: [PATCH] Segment constructor now also takes segment and will generate offsets into container relative to the segment * changed media_type to media in Container --- atrcopy/container.py | 16 +++++++------- atrcopy/segment.py | 43 ++++++++++++++++++------------------ test/test_indexed_segment.py | 38 +++++++++++++++---------------- test/test_media_types.py | 16 ++++++++------ 4 files changed, 57 insertions(+), 56 deletions(-) diff --git a/atrcopy/container.py b/atrcopy/container.py index 9314523..46bf798 100644 --- a/atrcopy/container.py +++ b/atrcopy/container.py @@ -43,7 +43,7 @@ class Container: self.segments = [] self.header = None self.filesystem = None - self._media_type = None + self._media = None self._data = None self._style = None @@ -103,12 +103,12 @@ class Container: return len(self.header) if self.header is not None else 0 @property - def media_type(self): - return self._media_type + def media(self): + return self._media - @media_type.setter - def media_type(self, value): - self._media_type = value + @media.setter + def media(self, value): + self._media = value self.segments = [] if value.header: self.header = value.header @@ -177,10 +177,10 @@ class Container: def guess_media_type(self): media = media_type.guess_media_type(self) - self.media_type = media + self.media = media def guess_filesystem(self): - fs = filesystem.guess_filesystem(self.media_type) + fs = filesystem.guess_filesystem(self.media) if fs is not None: self.filesystem = fs self.segments.append(fs) diff --git a/atrcopy/segment.py b/atrcopy/segment.py index 786abf7..300d097 100644 --- a/atrcopy/segment.py +++ b/atrcopy/segment.py @@ -51,14 +51,19 @@ class Segment: base_serializable_attributes = ['origin', 'error', 'name', 'verbose_name', 'uuid', 'can_resize'] extra_serializable_attributes = [] - def __init__(self, container, offset_or_offset_list, origin=0, name="All", error=None, verbose_name=None, length=None): - self.container = container - try: - start_offset = int(offset_or_offset_list) - except TypeError: - self.set_offset_from_list(offset_or_offset_list) - else: - self.set_offset_from_ints(start_offset, length) + def __init__(self, container_or_segment, offset_or_offset_list, origin=0, name="All", error=None, verbose_name=None, length=None): + + # the container may be specified as the actual container or a segment + # of the container. If a segment is specified, the offset list is + # calculated relative to the segment to get the real offset into the + # container. + offset_list = self.calc_offset_list(offset_or_offset_list, length) + if hasattr(container_or_segment, 'container_offset'): + offset_list = container_or_segment.container_offset[offset_list] + container_or_segment = container_or_segment.container + + self.container = container_or_segment + self.container_offset = offset_list self.verify_offsets() self.origin = int(origin) # force python int to decouple from possibly being a numpy datatype @@ -114,13 +119,14 @@ class Segment: #### offsets - def set_offset_from_list(self, offsets): - self.container_offset = to_numpy_list(offsets) - - def set_offset_from_ints(self, start, length): - if length is None: - raise errors.InvalidSegmentLength - self.container_offset = np.arange(start, start + length, dtype=np.uint32) + def calc_offset_list(self, offset_or_offset_list, length): + try: + start_offset = int(offset_or_offset_list) + except TypeError: + offset_list = to_numpy_list(offset_or_offset_list) + else: + offset_list = np.arange(offset_or_offset_list, offset_or_offset_list + length, dtype=np.uint32) + return offset_list def verify_offsets(self): self.enforce_offset_bounds() @@ -138,13 +144,6 @@ class Segment: raise errors.InvalidSegmentOrder return r - #### subset - - def create_subset(self, new_order, *args, **kwargs): - new_order_of_source = self.container_offset[new_order] - segment = Segment(self.container, new_order_of_source, *args, **kwargs) - return segment - #### serialization def __getstate__(self): diff --git a/test/test_indexed_segment.py b/test/test_indexed_segment.py index 6c2e4a5..f05264a 100644 --- a/test/test_indexed_segment.py +++ b/test/test_indexed_segment.py @@ -7,16 +7,16 @@ import os import numpy as np import pytest -from atrcopy.container import DiskImageContainer -from atrcopy.segments import DefaultSegment -from atrcopy import get_xex, interleave_segments, user_bit_mask, diff_bit_mask +from atrcopy.container import Container +from atrcopy.segment import Segment +# from atrcopy import get_xex, interleave_segments, user_bit_mask, diff_bit_mask from atrcopy import errors from functools import reduce def get_indexed(segment, num, scale): indexes = np.arange(num) * scale - s = segment.create_subset(indexes) + s = Segment(segment, indexes) return s, indexes # class TestSegment1: @@ -25,7 +25,7 @@ def get_indexed(segment, num, scale): # for i in range(8): # data = np.arange(1024, dtype=np.uint8) * i # r = SegmentData(data) -# self.segments.append(DefaultSegment(r, i * 1024)) +# self.segments.append(Segment(r, i * 1024)) # def test_xex(self): # items = [ @@ -73,8 +73,8 @@ class TestIndexed: def setup(self): data = np.arange(4096, dtype=np.uint8) data[1::2] = np.repeat(np.arange(16, dtype=np.uint8), 128) - self.container = DiskImageContainer(data) - self.segment = DefaultSegment(self.container, 0, length=len(self.container)) + self.container = Container(data) + self.segment = Segment(self.container, 0, length=len(self.container)) def test_offsets(self): assert np.array_equal(self.segment.container_offset, np.arange(len(self.container))) @@ -106,7 +106,7 @@ class TestIndexed: # base = self.segment # assert not base.rawdata.is_indexed # raw = base.rawdata[512:1536] # 1024 byte segment - # sub = DefaultSegment(raw, 512) + # sub = Segment(raw, 512) # assert not sub.rawdata.is_indexed # for i in range(len(sub)): @@ -148,9 +148,9 @@ class TestIndexed: # def test_interleave(self): # base = self.segment # r1 = base.rawdata[512:1024] # 512 byte segment - # s1 = DefaultSegment(r1, 512) + # s1 = Segment(r1, 512) # r2 = base.rawdata[1024:1536] # 512 byte segment - # s2 = DefaultSegment(r2, 1024) + # s2 = Segment(r2, 1024) # indexes1 = r1.get_indexes_from_base() # verify1 = np.arange(512, 1024, dtype=np.uint32) @@ -186,9 +186,9 @@ class TestIndexed: # def test_interleave_not_multiple(self): # base = self.segment # r1 = base.rawdata[512:1024] # 512 byte segment - # s1 = DefaultSegment(r1, 512) + # s1 = Segment(r1, 512) # r2 = base.rawdata[1024:1536] # 512 byte segment - # s2 = DefaultSegment(r2, 1024) + # s2 = Segment(r2, 1024) # indexes1 = r1.get_indexes_from_base() # verify1 = np.arange(512, 1024, dtype=np.uint32) @@ -216,9 +216,9 @@ class TestIndexed: # def test_interleave_different_sizes(self): # base = self.segment # r1 = base.rawdata[512:768] # 256 byte segment - # s1 = DefaultSegment(r1, 512) + # s1 = Segment(r1, 512) # r2 = base.rawdata[1024:1536] # 512 byte segment - # s2 = DefaultSegment(r2, 1024) + # s2 = Segment(r2, 1024) # indexes1 = r1.get_indexes_from_base() # verify1 = np.arange(512, 768, dtype=np.uint32) @@ -260,8 +260,8 @@ class TestIndexed: # def setup(self): # data = np.ones([4000], dtype=np.uint8) # r = SegmentData(data) -# self.segment = DefaultSegment(r, 0) -# self.sub_segment = DefaultSegment(r[2:202], 2) +# self.segment = Segment(r, 0) +# self.sub_segment = Segment(r[2:202], 2) # def test_locations(self): # s = self.segment @@ -411,7 +411,7 @@ class TestIndexed: # data = np.arange(4096, dtype=np.uint8) # data[1::2] = np.repeat(np.arange(16, dtype=np.uint8), 128) # r = SegmentData(data) -# self.container = DefaultSegment(r, 0) +# self.container = Segment(r, 0) # self.container.can_resize = True # def test_subset(self): @@ -420,7 +420,7 @@ class TestIndexed: # c = self.container # assert not c.rawdata.is_indexed # offset = 1000 -# s = DefaultSegment(c.rawdata[offset:offset + offset], 0) +# s = Segment(c.rawdata[offset:offset + offset], 0) # assert not s.rawdata.is_indexed # # Check that the small view has the same data as its parent @@ -484,7 +484,7 @@ class TestIndexed: if __name__ == "__main__": t = TestIndexed() t.setup() - t.test_indexed() + t.test_subset() # t.test_indexed_sub() # t.test_interleave() # t = TestSegment1() diff --git a/test/test_media_types.py b/test/test_media_types.py index 6e10794..6f9f286 100644 --- a/test/test_media_types.py +++ b/test/test_media_types.py @@ -32,18 +32,20 @@ class TestMediaTypesInTestDataDir: def test_test_data_dir(self): for pathname in sorted(glob.glob(os.path.join(os.path.dirname(__file__), "../test_data/", "*"))): wrapped, ext = os.path.splitext(pathname) + if ext not in ext_to_valid_types: + # skip for now until recognize bare files + continue print(f"checking {pathname}") sample_data = np.fromfile(pathname, dtype=np.uint8) - container, uncompressed_data = guess_container(sample_data) - if container: + container = guess_container(sample_data) + if container.compression_algorithm != "no compression": _, ext = os.path.splitext(wrapped) - print(len(uncompressed_data)) - media = guess_media_type(uncompressed_data) - print(f"{pathname}: {media}") + container.guess_media_type() + print(ext, ext_to_valid_types) if ext in ext_to_valid_types: - assert media.__class__ in ext_to_valid_types[ext] + assert container.media.__class__ in ext_to_valid_types[ext] else: - assert media.__class__ == MediaType + assert container.media.__class__ == MediaType if __name__ == "__main__":