Segment constructor now also takes segment and will generate offsets into container relative to the segment

* changed media_type to media in Container
This commit is contained in:
Rob McMullen 2019-03-25 10:12:23 -07:00
parent 1817483b3c
commit 697432c8f8
4 changed files with 57 additions and 56 deletions

View File

@ -43,7 +43,7 @@ class Container:
self.segments = [] self.segments = []
self.header = None self.header = None
self.filesystem = None self.filesystem = None
self._media_type = None self._media = None
self._data = None self._data = None
self._style = None self._style = None
@ -103,12 +103,12 @@ class Container:
return len(self.header) if self.header is not None else 0 return len(self.header) if self.header is not None else 0
@property @property
def media_type(self): def media(self):
return self._media_type return self._media
@media_type.setter @media.setter
def media_type(self, value): def media(self, value):
self._media_type = value self._media = value
self.segments = [] self.segments = []
if value.header: if value.header:
self.header = value.header self.header = value.header
@ -177,10 +177,10 @@ class Container:
def guess_media_type(self): def guess_media_type(self):
media = media_type.guess_media_type(self) media = media_type.guess_media_type(self)
self.media_type = media self.media = media
def guess_filesystem(self): def guess_filesystem(self):
fs = filesystem.guess_filesystem(self.media_type) fs = filesystem.guess_filesystem(self.media)
if fs is not None: if fs is not None:
self.filesystem = fs self.filesystem = fs
self.segments.append(fs) self.segments.append(fs)

View File

@ -51,14 +51,19 @@ class Segment:
base_serializable_attributes = ['origin', 'error', 'name', 'verbose_name', 'uuid', 'can_resize'] base_serializable_attributes = ['origin', 'error', 'name', 'verbose_name', 'uuid', 'can_resize']
extra_serializable_attributes = [] extra_serializable_attributes = []
def __init__(self, container, offset_or_offset_list, origin=0, name="All", error=None, verbose_name=None, length=None): def __init__(self, container_or_segment, offset_or_offset_list, origin=0, name="All", error=None, verbose_name=None, length=None):
self.container = container
try: # the container may be specified as the actual container or a segment
start_offset = int(offset_or_offset_list) # of the container. If a segment is specified, the offset list is
except TypeError: # calculated relative to the segment to get the real offset into the
self.set_offset_from_list(offset_or_offset_list) # container.
else: offset_list = self.calc_offset_list(offset_or_offset_list, length)
self.set_offset_from_ints(start_offset, length) if hasattr(container_or_segment, 'container_offset'):
offset_list = container_or_segment.container_offset[offset_list]
container_or_segment = container_or_segment.container
self.container = container_or_segment
self.container_offset = offset_list
self.verify_offsets() self.verify_offsets()
self.origin = int(origin) # force python int to decouple from possibly being a numpy datatype self.origin = int(origin) # force python int to decouple from possibly being a numpy datatype
@ -114,13 +119,14 @@ class Segment:
#### offsets #### offsets
def set_offset_from_list(self, offsets): def calc_offset_list(self, offset_or_offset_list, length):
self.container_offset = to_numpy_list(offsets) try:
start_offset = int(offset_or_offset_list)
def set_offset_from_ints(self, start, length): except TypeError:
if length is None: offset_list = to_numpy_list(offset_or_offset_list)
raise errors.InvalidSegmentLength else:
self.container_offset = np.arange(start, start + length, dtype=np.uint32) offset_list = np.arange(offset_or_offset_list, offset_or_offset_list + length, dtype=np.uint32)
return offset_list
def verify_offsets(self): def verify_offsets(self):
self.enforce_offset_bounds() self.enforce_offset_bounds()
@ -138,13 +144,6 @@ class Segment:
raise errors.InvalidSegmentOrder raise errors.InvalidSegmentOrder
return r return r
#### subset
def create_subset(self, new_order, *args, **kwargs):
new_order_of_source = self.container_offset[new_order]
segment = Segment(self.container, new_order_of_source, *args, **kwargs)
return segment
#### serialization #### serialization
def __getstate__(self): def __getstate__(self):

View File

@ -7,16 +7,16 @@ import os
import numpy as np import numpy as np
import pytest import pytest
from atrcopy.container import DiskImageContainer from atrcopy.container import Container
from atrcopy.segments import DefaultSegment from atrcopy.segment import Segment
from atrcopy import get_xex, interleave_segments, user_bit_mask, diff_bit_mask # from atrcopy import get_xex, interleave_segments, user_bit_mask, diff_bit_mask
from atrcopy import errors from atrcopy import errors
from functools import reduce from functools import reduce
def get_indexed(segment, num, scale): def get_indexed(segment, num, scale):
indexes = np.arange(num) * scale indexes = np.arange(num) * scale
s = segment.create_subset(indexes) s = Segment(segment, indexes)
return s, indexes return s, indexes
# class TestSegment1: # class TestSegment1:
@ -25,7 +25,7 @@ def get_indexed(segment, num, scale):
# for i in range(8): # for i in range(8):
# data = np.arange(1024, dtype=np.uint8) * i # data = np.arange(1024, dtype=np.uint8) * i
# r = SegmentData(data) # r = SegmentData(data)
# self.segments.append(DefaultSegment(r, i * 1024)) # self.segments.append(Segment(r, i * 1024))
# def test_xex(self): # def test_xex(self):
# items = [ # items = [
@ -73,8 +73,8 @@ class TestIndexed:
def setup(self): def setup(self):
data = np.arange(4096, dtype=np.uint8) data = np.arange(4096, dtype=np.uint8)
data[1::2] = np.repeat(np.arange(16, dtype=np.uint8), 128) data[1::2] = np.repeat(np.arange(16, dtype=np.uint8), 128)
self.container = DiskImageContainer(data) self.container = Container(data)
self.segment = DefaultSegment(self.container, 0, length=len(self.container)) self.segment = Segment(self.container, 0, length=len(self.container))
def test_offsets(self): def test_offsets(self):
assert np.array_equal(self.segment.container_offset, np.arange(len(self.container))) assert np.array_equal(self.segment.container_offset, np.arange(len(self.container)))
@ -106,7 +106,7 @@ class TestIndexed:
# base = self.segment # base = self.segment
# assert not base.rawdata.is_indexed # assert not base.rawdata.is_indexed
# raw = base.rawdata[512:1536] # 1024 byte segment # raw = base.rawdata[512:1536] # 1024 byte segment
# sub = DefaultSegment(raw, 512) # sub = Segment(raw, 512)
# assert not sub.rawdata.is_indexed # assert not sub.rawdata.is_indexed
# for i in range(len(sub)): # for i in range(len(sub)):
@ -148,9 +148,9 @@ class TestIndexed:
# def test_interleave(self): # def test_interleave(self):
# base = self.segment # base = self.segment
# r1 = base.rawdata[512:1024] # 512 byte segment # r1 = base.rawdata[512:1024] # 512 byte segment
# s1 = DefaultSegment(r1, 512) # s1 = Segment(r1, 512)
# r2 = base.rawdata[1024:1536] # 512 byte segment # r2 = base.rawdata[1024:1536] # 512 byte segment
# s2 = DefaultSegment(r2, 1024) # s2 = Segment(r2, 1024)
# indexes1 = r1.get_indexes_from_base() # indexes1 = r1.get_indexes_from_base()
# verify1 = np.arange(512, 1024, dtype=np.uint32) # verify1 = np.arange(512, 1024, dtype=np.uint32)
@ -186,9 +186,9 @@ class TestIndexed:
# def test_interleave_not_multiple(self): # def test_interleave_not_multiple(self):
# base = self.segment # base = self.segment
# r1 = base.rawdata[512:1024] # 512 byte segment # r1 = base.rawdata[512:1024] # 512 byte segment
# s1 = DefaultSegment(r1, 512) # s1 = Segment(r1, 512)
# r2 = base.rawdata[1024:1536] # 512 byte segment # r2 = base.rawdata[1024:1536] # 512 byte segment
# s2 = DefaultSegment(r2, 1024) # s2 = Segment(r2, 1024)
# indexes1 = r1.get_indexes_from_base() # indexes1 = r1.get_indexes_from_base()
# verify1 = np.arange(512, 1024, dtype=np.uint32) # verify1 = np.arange(512, 1024, dtype=np.uint32)
@ -216,9 +216,9 @@ class TestIndexed:
# def test_interleave_different_sizes(self): # def test_interleave_different_sizes(self):
# base = self.segment # base = self.segment
# r1 = base.rawdata[512:768] # 256 byte segment # r1 = base.rawdata[512:768] # 256 byte segment
# s1 = DefaultSegment(r1, 512) # s1 = Segment(r1, 512)
# r2 = base.rawdata[1024:1536] # 512 byte segment # r2 = base.rawdata[1024:1536] # 512 byte segment
# s2 = DefaultSegment(r2, 1024) # s2 = Segment(r2, 1024)
# indexes1 = r1.get_indexes_from_base() # indexes1 = r1.get_indexes_from_base()
# verify1 = np.arange(512, 768, dtype=np.uint32) # verify1 = np.arange(512, 768, dtype=np.uint32)
@ -260,8 +260,8 @@ class TestIndexed:
# def setup(self): # def setup(self):
# data = np.ones([4000], dtype=np.uint8) # data = np.ones([4000], dtype=np.uint8)
# r = SegmentData(data) # r = SegmentData(data)
# self.segment = DefaultSegment(r, 0) # self.segment = Segment(r, 0)
# self.sub_segment = DefaultSegment(r[2:202], 2) # self.sub_segment = Segment(r[2:202], 2)
# def test_locations(self): # def test_locations(self):
# s = self.segment # s = self.segment
@ -411,7 +411,7 @@ class TestIndexed:
# data = np.arange(4096, dtype=np.uint8) # data = np.arange(4096, dtype=np.uint8)
# data[1::2] = np.repeat(np.arange(16, dtype=np.uint8), 128) # data[1::2] = np.repeat(np.arange(16, dtype=np.uint8), 128)
# r = SegmentData(data) # r = SegmentData(data)
# self.container = DefaultSegment(r, 0) # self.container = Segment(r, 0)
# self.container.can_resize = True # self.container.can_resize = True
# def test_subset(self): # def test_subset(self):
@ -420,7 +420,7 @@ class TestIndexed:
# c = self.container # c = self.container
# assert not c.rawdata.is_indexed # assert not c.rawdata.is_indexed
# offset = 1000 # offset = 1000
# s = DefaultSegment(c.rawdata[offset:offset + offset], 0) # s = Segment(c.rawdata[offset:offset + offset], 0)
# assert not s.rawdata.is_indexed # assert not s.rawdata.is_indexed
# # Check that the small view has the same data as its parent # # Check that the small view has the same data as its parent
@ -484,7 +484,7 @@ class TestIndexed:
if __name__ == "__main__": if __name__ == "__main__":
t = TestIndexed() t = TestIndexed()
t.setup() t.setup()
t.test_indexed() t.test_subset()
# t.test_indexed_sub() # t.test_indexed_sub()
# t.test_interleave() # t.test_interleave()
# t = TestSegment1() # t = TestSegment1()

View File

@ -32,18 +32,20 @@ class TestMediaTypesInTestDataDir:
def test_test_data_dir(self): def test_test_data_dir(self):
for pathname in sorted(glob.glob(os.path.join(os.path.dirname(__file__), "../test_data/", "*"))): for pathname in sorted(glob.glob(os.path.join(os.path.dirname(__file__), "../test_data/", "*"))):
wrapped, ext = os.path.splitext(pathname) wrapped, ext = os.path.splitext(pathname)
if ext not in ext_to_valid_types:
# skip for now until recognize bare files
continue
print(f"checking {pathname}") print(f"checking {pathname}")
sample_data = np.fromfile(pathname, dtype=np.uint8) sample_data = np.fromfile(pathname, dtype=np.uint8)
container, uncompressed_data = guess_container(sample_data) container = guess_container(sample_data)
if container: if container.compression_algorithm != "no compression":
_, ext = os.path.splitext(wrapped) _, ext = os.path.splitext(wrapped)
print(len(uncompressed_data)) container.guess_media_type()
media = guess_media_type(uncompressed_data) print(ext, ext_to_valid_types)
print(f"{pathname}: {media}")
if ext in ext_to_valid_types: if ext in ext_to_valid_types:
assert media.__class__ in ext_to_valid_types[ext] assert container.media.__class__ in ext_to_valid_types[ext]
else: else:
assert media.__class__ == MediaType assert container.media.__class__ == MediaType
if __name__ == "__main__": if __name__ == "__main__":