Added segment parsing of DOS files within ATR images

* removed IndexedByteSegment; moved segment order info into SegmentData and OrderWrapper
This commit is contained in:
Rob McMullen 2016-04-12 17:04:21 -07:00
parent 1ab03e2612
commit a33e5aa6b2
5 changed files with 145 additions and 123 deletions

View File

@ -9,7 +9,7 @@ from errors import *
from ataridos import AtariDosDiskImage, AtariDosFile
from diskimages import AtrHeader, BootDiskImage
from kboot import KBootImage
from segments import SegmentData, SegmentSaver, DefaultSegment, EmptySegment, ObjSegment, RawSectorsSegment, IndexedByteSegment
from segments import SegmentData, SegmentSaver, DefaultSegment, EmptySegment, ObjSegment, RawSectorsSegment
from spartados import SpartaDosDiskImage
from utils import to_numpy

View File

@ -2,7 +2,7 @@ import numpy as np
from errors import *
from diskimages import DiskImageBase
from segments import EmptySegment, ObjSegment, RawSectorsSegment, IndexedByteSegment, SegmentSaver
from segments import EmptySegment, ObjSegment, RawSectorsSegment, DefaultSegment, SegmentSaver
from utils import to_numpy
@ -144,8 +144,6 @@ class AtariDosFile(object):
"""
def __init__(self, rawdata):
self.rawdata = rawdata
self.bytes = rawdata.get_data()
self.style = rawdata.get_style()
self.size = len(rawdata)
self.segments = []
self.parse_segments()
@ -155,8 +153,7 @@ class AtariDosFile(object):
def parse_segments(self):
r = self.rawdata
b = self.bytes
s = self.style
b = r.get_data()
pos = 0
first = True
while pos < self.size:
@ -327,8 +324,20 @@ class AtariDosDiskImage(DiskImageBase):
if len(byte_order) > 0:
name = "%s %ds@%d" % (dirent.get_filename(), dirent.num_sectors, dirent.starting_sector)
verbose_name = "%s (%d sectors, first@%d) %s" % (dirent.get_filename(), dirent.num_sectors, dirent.starting_sector, dirent.verbose_info)
print verbose_name
segment = IndexedByteSegment(self.rawdata, byte_order, name=name, verbose_name=verbose_name)
raw = self.rawdata.get_indexed(byte_order)
segment = DefaultSegment(raw, name=name, verbose_name=verbose_name)
else:
segment = EmptySegment(self.rawdata, name=dirent.get_filename())
return segment
def get_file_segments(self):
segments_in = DiskImageBase.get_file_segments(self)
segments_out = []
for segment in segments_in:
segments_out.append(segment)
try:
binary = AtariDosFile(segment.rawdata)
segments_out.extend(binary.segments)
except InvalidBinaryFile:
pass
return segments_out

View File

@ -22,22 +22,67 @@ class SegmentSaver(object):
return "|".join(wildcards)
class OrderWrapper(object):
"""Wrapper for numpy data so that manipulations can use normal numpy syntax
and still affect the data according to the byte ordering.
Numpy's fancy indexing can't be used for setting set values, so this
intermediate layer is needed that defines the __setitem__ method that
explicitly references the byte ordering in the data array.
"""
def __init__(self, data, byte_order):
self.np_data = data
self.base = data.base # base array for numpy bounds determination
self.order = byte_order
def __len__(self):
return np.alen(self.order)
def __and__(self, other):
return self.np_data[self.order] & other
def __iand__(self, other):
self.np_data[self.order] &= other
return self
def __getitem__(self, index):
return self.np_data[self.order[index]]
def __setitem__(self, index, value):
self.np_data[self.order[index]] = value
def sub_index(self, index):
"""Return index of index so it can be used directly in a new
SegmentData object, rather than propagating multiple index lookups by
contructing a new OrderWrapper that calls parent OrderWrapper objects.
"""
return self.order[index]
class SegmentData(object):
def __init__(self, data, style=None, comments=None, debug=False):
self.data = to_numpy(data)
def __init__(self, data, style=None, comments=None, debug=False, order=None):
self.order = order
self.is_indexed = order is not None
if self.is_indexed:
self.data = OrderWrapper(data, order)
else:
self.data = to_numpy(data)
if style is None:
if debug:
self.style = np.arange(len(self), dtype=np.uint8)
else:
self.style = np.zeros(len(self), dtype=np.uint8)
else:
self.style = style
if self.is_indexed:
self.style = OrderWrapper(style, order)
else:
self.style = style
if comments is None:
comments = dict()
self.comments = comments
def __len__(self):
return np.alen(self.data)
return len(self.data)
def get_data(self):
return self.data
@ -48,12 +93,53 @@ class SegmentData(object):
def get_comments(self):
return self.comments
def __getitem__(self, index):
d = self.data[index]
s = self.style[index]
c = self.comments
return SegmentData(d, s, c)
def byte_bounds_offset(self):
"""Return start and end offsets of this segment's data into the
base array's data.
This ignores the byte order index. Arrays using the byte order index
will have the entire base array's raw data.
"""
if self.data.base is None:
if self.is_indexed:
basearray = self.data.np_data
else:
basearray = self.data
return 0, len(basearray)
data_start, data_end = np.byte_bounds(self.data)
base_start, base_end = np.byte_bounds(self.data.base)
return int(data_start - base_start), int(data_end - base_start)
def get_raw_index(self, i):
"""Get index into base array's raw data, given the index into this
segment
"""
if self.is_indexed:
i = self.order[i]
if self.data.base is None:
return i
data_start, data_end = np.byte_bounds(self.data)
base_start, base_end = np.byte_bounds(self.data.base)
return int(data_start - base_start + i)
def __getitem__(self, index):
if self.is_indexed:
order = self.data.sub_index(index)
d = self.data.np_data
s = self.style.np_data
else:
order = None
d = self.data[index]
s = self.style[index]
c = self.comments
return SegmentData(d, s, c, order=order)
def get_indexed(self, index):
index = to_numpy_list(index)
if self.is_indexed:
return self[index]
return SegmentData(self.data, self.style, self.comments, order=index)
class DefaultSegment(object):
savers = [SegmentSaver]
@ -73,17 +159,29 @@ class DefaultSegment(object):
self.data = rawdata.get_data()
self.style = rawdata.get_style()
def get_raw(self):
return self.rawdata
def __getstate__(self):
state = dict()
for key in ['start_addr', 'error', 'name', 'verbose_name', 'page_size', 'map_width']:
state[key] = getattr(self, key)
state['_rawdata_bounds'] = list(self.byte_bounds_offset())
r = self.rawdata
state['_rawdata_bounds'] = list(r.byte_bounds_offset())
if r.is_indexed:
state['_order_list'] = r.order.tolist() # more compact serialization in python list
else:
state['_order_list'] = None
return state
def reconstruct_raw(self, rawdata):
start, end = self._rawdata_bounds
r = rawdata[start:end]
delattr(self, '_rawdata_bounds')
if self._order_list:
order = to_numpy_list(self._order_list)
r = r.get_indexed(order)
delattr(self, '_order_list')
self.set_raw(r)
def __str__(self):
@ -95,7 +193,10 @@ class DefaultSegment(object):
@property
def verbose_info(self):
name = self.verbose_name or self.name
s = "%s ($%x bytes)" % (name, len(self))
if self.rawdata.is_indexed:
s = "%s ($%04x bytes) non-contiguous file; file index of first byte: $%04x" % (name, len(self), self.rawdata.order[0])
else:
s = "%s ($%04x bytes)" % (name, len(self))
if self.error:
s += " error='%s'" % self.error
return s
@ -111,21 +212,16 @@ class DefaultSegment(object):
self._search_copy = None
def byte_bounds_offset(self):
if self.data.base is None:
return 0, len(self.rawdata)
data_start, data_end = np.byte_bounds(self.data)
base_start, base_end = np.byte_bounds(self.data.base)
return int(data_start - base_start), int(data_end - base_start)
"""Return start and end offsets of this segment's data into the
base array's data
"""
return self.rawdata.byte_bounds_offset()
def get_raw_index(self, i):
"""Get index into base array's raw data, given the index into this
segment
"""
if self.data.base is None:
return i
data_start, data_end = np.byte_bounds(self.data)
base_start, base_end = np.byte_bounds(self.data.base)
return int(data_start - base_start + i)
return self.rawdata.get_raw_index(i)
def tostring(self):
return self.data.tostring()
@ -382,88 +478,3 @@ class RawSectorsSegment(DefaultSegment):
if lower_case:
return "s%03d:%02x" % (sector + self.first_sector, byte)
return "s%03d:%02X" % (sector + self.first_sector, byte)
class IndexedStyleWrapper(object):
"""Wrapper for style data so that style manipulations can use normal
numpy syntax and still affect the style according to the byte ordering
"""
def __init__(self, style, byte_order):
self.style = style
self.order = byte_order
def __len__(self):
return np.alen(self.order)
def __and__(self, other):
return self.style[self.order] & other
def __iand__(self, other):
self.style[self.order] &= other
return self
def __getitem__(self, index):
return self.style[self.order[index]]
def __setitem__(self, index, value):
self.style[self.order[index]] = value
class IndexedByteSegment(DefaultSegment):
def __init__(self, rawdata, byte_order, **kwargs):
# Convert to numpy list so fancy indexing works as argument to __getitem__
self.order = to_numpy_list(byte_order)
DefaultSegment.__init__(self, rawdata, **kwargs)
self.style = IndexedStyleWrapper(self.style, byte_order)
def __getstate__(self):
state = super(IndexedByteSegment, self).__getstate__()
# local byte_bounds_offset refers to first index in order; want offset
# into entire raw data to reconstruct properly
state['_rawdata_bounds'] = list(DefaultSegment.byte_bounds_offset(self))
state['_order_list'] = self.order.tolist() # more compact serialization in python list
return state
def reconstruct_raw(self, rawdata):
DefaultSegment.reconstruct_raw(self, rawdata)
self.order = to_numpy_list(self._order_list)
delattr(self, '_order_list')
def __str__(self):
s = "%s ($%x @ $%x)" % (self.name, len(self), self.order[0])
if self.error:
s += " " + self.error
return s
@property
def verbose_info(self):
name = self.verbose_name or self.name
s = "%s ($%04x bytes) non-contiguous file; file index of first byte: $%04x" % (name, len(self), self.order[0])
if self.error:
s += " error='%s'" % self.error
return s
def __len__(self):
return np.alen(self.order)
def __getitem__(self, index):
return self.data[self.order[index]]
def __setitem__(self, index, value):
self.data[self.order[index]] = value
self._search_copy = None
def byte_bounds_offset(self):
b = DefaultSegment.byte_bounds_offset(self)
return (b[0] + self.order[0], b[0] + self.order[-1])
def get_raw_index(self, i):
if self.data.base is None:
return self.order[i]
data_start, data_end = np.byte_bounds(self.data)
base_start, base_end = np.byte_bounds(self.data.base)
return int(data_start - base_start + self.order[i])
def tostring(self):
return self.data[self.order[:]].tostring()

View File

@ -3,7 +3,7 @@ import numpy as np
from errors import *
from ataridos import AtariDosDirent, XexSegment
from diskimages import DiskImageBase
from segments import EmptySegment, ObjSegment, RawSectorsSegment, IndexedByteSegment, SegmentSaver
from segments import DefaultSegment, EmptySegment, ObjSegment, RawSectorsSegment, SegmentSaver
class SpartaDosDirent(AtariDosDirent):
@ -236,7 +236,8 @@ class SpartaDosDiskImage(DiskImageBase):
if len(byte_order) > 0:
name = "%s %d@%d %s" % (dirent.get_filename(), dirent.length, dirent.starting_sector, dirent.str_timestamp)
verbose_name = "%s (%d bytes, sector map@%d) %s %s" % (dirent.get_filename(), dirent.length, dirent.starting_sector, dirent.verbose_info, dirent.str_timestamp)
segment = IndexedByteSegment(self.rawdata, byte_order, name=name, verbose_name=verbose_name)
raw = self.rawdata.get_indexed(byte_order)
segment = DefaultSegment(raw, name=name, verbose_name=verbose_name)
else:
segment = EmptySegment(self.rawdata, name=dirent.get_filename(), error=dirent.str_timestamp)
return segment

View File

@ -4,7 +4,7 @@ import jsonpickle
import numpy as np
from atrcopy import DefaultSegment, SegmentData, IndexedByteSegment
from atrcopy import DefaultSegment, SegmentData
class TestJsonPickle(object):
@ -16,13 +16,14 @@ class TestJsonPickle(object):
print self.segment.byte_bounds_offset(), len(self.segment)
r2 = self.segment.rawdata[100:400]
s2 = DefaultSegment(r2)
print s2.byte_bounds_offset(), len(s2)
print s2.byte_bounds_offset(), len(s2), s2.__getstate__()
r3 = s2.rawdata[100:200]
s3 = DefaultSegment(r3)
print s3.byte_bounds_offset(), len(s3)
print s3.byte_bounds_offset(), len(s3), s3.__getstate__()
order = list(reversed(range(700, 800)))
s4 = IndexedByteSegment(self.segment.rawdata, order)
print s4.byte_bounds_offset(), len(s4)
r4 = self.segment.rawdata.get_indexed(order)
s4 = DefaultSegment(r4)
print s4.byte_bounds_offset(), len(s4), s4.__getstate__()
slist = [s2, s3, s4]
for s in slist:
@ -37,8 +38,8 @@ class TestJsonPickle(object):
print s
for orig, rebuilt in zip(slist, slist2):
print "orig", orig.data
print "rebuilt", rebuilt.data
print "orig", orig.data[:]
print "rebuilt", rebuilt.data[:]
assert np.array_equal(orig[:], rebuilt[:])
if __name__ == "__main__":