Added segment parser to AtrDiskImage

* added -s flag to display segments
* added optional name for segments
This commit is contained in:
Rob McMullen 2015-10-16 10:15:00 -07:00
parent 12c01f3345
commit 846a744bd1

View File

@ -182,16 +182,20 @@ class InvalidBinaryFile(AtrError):
pass pass
class ObjSegment(object): class ObjSegment(object):
def __init__(self, metadata_start, data_start, start_addr, end_addr, data, error=None): def __init__(self, metadata_start, data_start, start_addr, end_addr, data, name="", error=None):
self.name = name
self.metadata_start = metadata_start self.metadata_start = metadata_start
self.data_start = data_start self.data_start = data_start
self.start_addr = start_addr self.start_addr = start_addr
self.end_addr = end_addr self.end_addr = end_addr
self.data = data self.data = data
self.error = error self.error = error
if name and not name.endswith(" "):
name += " "
self.name = name
def __str__(self): def __str__(self):
s = "%04x-%04x (%04x @ %04x)" % (self.start_addr, self.end_addr, len(self.data), self.data_start) s = "%s%04x-%04x (%04x @ %04x)" % (self.name, self.start_addr, self.end_addr, len(self.data), self.data_start)
if self.error: if self.error:
s += " " + self.error s += " " + self.error
return s return s
@ -242,125 +246,16 @@ class AtariDosFile(object):
self.segments.append(ObjSegment(pos, pos + 4, start, end, bytes[pos + 4:pos + 4 + count])) self.segments.append(ObjSegment(pos, pos + 4, start, end, bytes[pos + 4:pos + 4 + count]))
pos += 4 + count pos += 4 + count
class AtrDiskImageFromFile(object): class AtrFileSegment(ObjSegment):
def __init__(self, fh): def __init__(self, dirent, data, error=None):
self.fh = fh ObjSegment.__init__(self, 0, data, error)
self.header = None self.dirent = dirent
self.first_vtoc = 360
self.first_data_after_vtoc = 369
self.total_sectors = 0
self.unused_sectors = 0
self.files = []
self.all_sane = True
self.setup()
def __str__(self): def __str__(self):
if self.all_sane: s = str(self.dirent)
return "%s %d usable sectors (%d free), %d files" % (self.header, self.total_sectors, self.unused_sectors, len(self.files)) if self.error:
else: s += " " + self.error
return "%s bad directory entries; possible boot disk? Use -f option to try to extract anyway" % self.header return s
def dir(self):
lines = []
lines.append(str(self))
for dirent in self.files:
if dirent.in_use:
lines.append(str(dirent))
return "\n".join(lines)
def setup(self):
self.fh.seek(0, 2)
self.size = self.fh.tell()
self.read_atr_header()
self.check_size()
self.get_vtoc()
self.get_directory()
def read_atr_header(self):
self.fh.seek(0)
bytes = self.fh.read(16)
try:
self.header = AtrHeader(bytes)
except InvalidAtrHeader:
self.header = XfdHeader()
def check_size(self):
self.header.check_size(self.size)
def get_raw_bytes(self, sector):
pos, size = self.header.get_pos(sector)
self.fh.seek(pos)
raw = self.fh.read(size)
return raw
def get_sectors(self, start, end=None):
""" Get contiguous sectors
:param start: first sector number to read (note: numbering starts from 1)
:param end: last sector number to read
:returns: bytes
"""
output = StringIO()
pos, size = self.header.get_pos(start)
self.fh.seek(pos)
if end is None:
end = start
while start <= end:
bytes = self.fh.read(size)
output.write(bytes)
start += 1
pos, size = self.header.get_pos(start)
return output.getvalue()
def get_vtoc(self):
bytes = self.get_sectors(360)[0:5]
values = struct.unpack("<BHH", bytes)
code = values[0]
if code == 0 or code == 2:
num = 1
else:
num = (code * 2) - 3
self.first_vtoc = 360 - num + 1
self.total_sectors = values[1]
self.unused_sectors = values[2]
def get_directory(self):
dir_bytes = self.get_sectors(361, 368)
i = 0
num = 0
files = []
while i < len(dir_bytes):
dirent = AtrDirent(self, num, dir_bytes[i:i+16])
if dirent.mydos:
dirent = MydosDirent(self, num, dir_bytes[i:i+16])
if dirent.in_use:
files.append(dirent)
if not dirent.is_sane:
self.all_sane = False
elif dirent.flag == 0:
break
i += 16
num += 1
self.files = files
def get_file(self, dirent):
output = StringIO()
dirent.start_read()
while True:
bytes, last = dirent.read_sector(self)
output.write(bytes)
if last:
break
return output.getvalue()
def find_file(self, filename):
for dirent in self.files:
if filename == dirent.get_filename():
bytes = self.get_file(dirent)
return bytes
return ""
class AtrDiskImage(object): class AtrDiskImage(object):
def __init__(self, bytes): def __init__(self, bytes):
@ -371,6 +266,7 @@ class AtrDiskImage(object):
self.total_sectors = 0 self.total_sectors = 0
self.unused_sectors = 0 self.unused_sectors = 0
self.files = [] self.files = []
self.segments = []
self.all_sane = True self.all_sane = True
self.setup() self.setup()
@ -475,6 +371,41 @@ class AtrDiskImage(object):
return bytes return bytes
return "" return ""
def get_boot_segments(self):
bytes = self.get_sectors(1)[0:20]
values = struct.unpack("<BBHHBHBBBHBHBH", bytes)
flag = values[0]
segments = []
if flag == 0:
num = values[1]
addr = values[2]
bytes = self.get_sectors(1, num)
header = ObjSegment(0, 0, addr, addr + 20, bytes[0:20], name="Boot Header")
sectors = ObjSegment(0, 0, addr, addr + len(bytes), bytes, name="Boot Sectors")
code = ObjSegment(0, 0, addr + 20, addr + len(bytes), bytes[20:], name="Boot Code")
segments = [sectors, header, code]
return segments
def parse_segments(self):
if self.header.size_in_bytes > 0:
self.segments.append(ObjSegment(0, 0, 0, self.header.atr_header_offset, self.bytes[0:self.header.atr_header_offset], name="%s Header" % self.header.file_format))
self.segments.append(ObjSegment(0, 0, 0, self.header.size_in_bytes, self.bytes[self.header.atr_header_offset:], name="Raw disk sectors"))
self.segments.extend(self.get_boot_segments())
# for dirent in self.atr.files:
# try:
# bytes = self.get_file(dirent)
# error = None
# except atrcopy.FileNumberMismatchError164:
# bytes = None
# error = "Error 164"
# except atrcopy.ByteNotInFile166:
# bytes = None
# error = "Invalid sector"
# a = AtrFileSegment(dirent, bytes, error)
# self.segments.append(AtrSegment(dirent))
def process(dirent, options): def process(dirent, options):
skip = False skip = False
action = "copying to" action = "copying to"
@ -515,6 +446,7 @@ if __name__ == "__main__":
parser.add_argument("--xex", action="store_true", default=False, help="add .xex extension") parser.add_argument("--xex", action="store_true", default=False, help="add .xex extension")
parser.add_argument("-f", "--force", action="store_true", default=False, help="force operation on disk images that have bad directory entries or look like boot disks") parser.add_argument("-f", "--force", action="store_true", default=False, help="force operation on disk images that have bad directory entries or look like boot disks")
parser.add_argument("files", metavar="ATR", nargs="+", help="an ATR image file [or a list of them]") parser.add_argument("files", metavar="ATR", nargs="+", help="an ATR image file [or a list of them]")
parser.add_argument("-s", "--segments", action="store_true", default=False, help="display segments")
options, extra_args = parser.parse_known_args() options, extra_args = parser.parse_known_args()
for filename in options.files: for filename in options.files:
@ -531,7 +463,10 @@ if __name__ == "__main__":
except InvalidBinaryFile: except InvalidBinaryFile:
print "%s: Doesn't look like an XEX either" % filename print "%s: Doesn't look like an XEX either" % filename
continue continue
if atr.all_sane or options.force: if options.segments:
atr.parse_segments()
print "\n".join([str(a) for a in atr.segments])
elif atr.all_sane or options.force:
for dirent in atr.files: for dirent in atr.files:
try: try:
process(dirent, options) process(dirent, options)