Added media type recognition

This commit is contained in:
Rob McMullen 2019-03-21 22:10:23 -07:00
parent 5d6e847541
commit 7a51cb16f9
9 changed files with 720 additions and 17 deletions

View File

@ -38,15 +38,15 @@ def find_containers():
def guess_container(raw_data): def guess_container(raw_data):
uncompressed = raw_data uncompressed = raw_data
for c in find_containers(): for c in find_containers():"trying container {c}") log.debug(f"trying container {}")
try: try:
uncompressed = c.unpack_bytes(raw_data) uncompressed = c.unpack_bytes(raw_data)
except errors.InvalidContainer as e: except errors.InvalidContainer as e:
continue continue
else: else:"found container {c}") log.debug(f"found container {}")
break break
else: else:
c = None c = None"image does not appear to be compressed.") log.debug(f"image does not appear to be compressed.")
return c, uncompressed return c, uncompressed

View File

@ -10,14 +10,6 @@ class InvalidSegmentOrder(AtrError):
pass pass
class InvalidAtrHeader(AtrError):
class InvalidCartHeader(AtrError):
class InvalidDiskImage(AtrError): class InvalidDiskImage(AtrError):
""" Disk image is not recognized by a parser. """ Disk image is not recognized by a parser.
@ -35,15 +27,19 @@ class UnsupportedDiskImage(AtrError):
pass pass
class InvalidDirent(AtrError): class FilesystemError(AtrError):
pass pass
class LastDirent(AtrError): class InvalidDirent(FilesystemError):
pass pass
class InvalidFile(AtrError): class LastDirent(FilesystemError):
class InvalidFile(FilesystemError):
pass pass
@ -63,15 +59,15 @@ class InvalidSegmentParser(AtrError):
pass pass
class NoSpaceInDirectory(AtrError): class NoSpaceInDirectory(FilesystemError):
pass pass
class NotEnoughSpaceOnDisk(AtrError): class NotEnoughSpaceOnDisk(FilesystemError):
pass pass
class FileNotFound(AtrError): class FileNotFound(FilesystemError):
pass pass
@ -85,3 +81,20 @@ class ReadOnlyContainer(AtrError):
class InvalidContainer(AtrError): class InvalidContainer(AtrError):
pass pass
# Errors when trying to determine media type
class MediaError(AtrError):
class InvalidMediaSize(MediaError):
class InvalidAtrHeader(MediaError):
class InvalidCartHeader(MediaError):

atrcopy/ Normal file
View File

@ -0,0 +1,308 @@
import hashlib
import inspect
import pkg_resources
import numpy as np
from . import errors
from . import style_bits
from .utils import to_numpy, to_numpy_list, uuid
import logging
log = logging.getLogger(__name__)
class MediaType:
"""Media storage container
Instances of this class hold a contiguous block data that represent the
disk, cassette or cartridge image. Views of this data are in the form of
`Segment`s which only refer to this data via a mapping of indexes into this
container. Segments do not hold copies of the data. All operations on
segments actually affect the container's data, and because all segments
point to the container's data, a change to one segment can affect many
other segments.
pretty_name = "Raw Data"
can_resize_default = False
base_serializable_attributes = ['origin', 'error', 'name', 'verbose_name', 'uuid', 'can_resize']
extra_serializable_attributes = []
def __init__(self, data, style=None, origin=0, name="All", error=None, verbose_name=None, memory_map=None):
self._data = None
self._style = None
self.set_data(data, style)
self.origin = int(origin) # force python int to decouple from possibly being a numpy datatype
self.error = error = name
self.verbose_name = verbose_name
self.uuid = uuid()
if memory_map is None:
memory_map = {}
self.memory_map = memory_map
self.comments = dict()
self.user_data = dict()
for i in range(1, style_bits.user_bit_mask):
self.user_data[i] = dict()
# Some segments may be resized to contain additional segments not
# present when the segment was created.
self.can_resize = self.__class__.can_resize_default
#### initialization
def set_data(self, data, style): = data = style
def verify_header(self):
"""Subclasses should override this method to verify the integrity of
any header information, if any.
self.header_length = 0
def verify_data(self):
"""Subclasses should override this method to verify that the passed-in
data can be stored in this media.
#### properties
def data(self):
return self._data
def data(self, value):
if self._data is not None:
raise errors.ReadOnlyContainer("media_type already populated with data")
self._data = to_numpy(value)
def style(self):
return self._style
def style(self, value):
if value is None:
value = np.zeros(len(self._data), dtype=np.uint8)
self._style = to_numpy(value)
def sha1(self):
return hashlib.sha1(
#### dunder methods
def __str__(self):
return f"{self.pretty_name}, size={len(self)}"
def __len__(self):
return np.alen(self._data)
def __and__(self, other):
return self._data & other
def __iand__(self, other):
self._data &= other
return self
def __getitem__(self, index):
return self._data[index]
def __setitem__(self, index, value):
self._data[index] = value
#### serialization
def __getstate__(self):
"""Custom jsonpickle state save routine
This routine culls down the list of attributes that should be
serialized, and in some cases changes their format slightly so they
have a better mapping to json objects. For instance, json can't handle
dicts with integer keys, so dicts are turned into lists of lists.
Tuples are also turned into lists because tuples don't have a direct
representation in json, while lists have a compact representation in
state = dict()
for key in self.base_serializable_attributes:
state[key] = getattr(self, key)
for key in self.extra_serializable_attributes:
state[key] = getattr(self, key)
r = self.rawdata
state['memory_map'] = sorted([list(i) for i in self.memory_map.items()])
state['comment ranges'] = [list(a) for a in self.get_style_ranges(comment=True)]
state['data ranges'] = [list(a) for a in self.get_style_ranges(data=True)]
for i in range(1, style_bits.user_bit_mask):
r = [list(a) for a in self.get_style_ranges(user=i)]
if r:
slot = "user style %d" % i
state[slot] = r
# json serialization doesn't allow int keys, so convert to list of
# pairs
state['comments'] = self.get_sorted_comments()
return state
def __setstate__(self, state):
"""Custom jsonpickle state restore routine
The use of jsonpickle to recreate objects doesn't go through __init__,
so there will be missing attributes when restoring old versions of the
json. Once a version gets out in the wild and additional attributes are
added to a segment, a default value should be applied here.
self.memory_map = dict(state.pop('memory_map', []))
self.uuid = state.pop('uuid', uuid())
self.can_resize = state.pop('can_resize', self.__class__.can_resize_default)
comments = state.pop('comments', {})
for k, v in e['comments']:
self.comments[k] = v
ranges = state.pop('comment ranges')
if 'comment ranges' in e:
self.set_style_ranges(e['comment ranges'], comment=True)
if 'data ranges' in e:
self.set_style_ranges(e['data ranges'], user=data_style)
if 'display list ranges' in e:
# DEPRECATED, but supported on read. Converts display list to
# disassembly type 0 for user index 1
self.set_style_ranges(e['display list ranges'], data=True, user=1)
self.set_user_data(e['display list ranges'], 1, 0)
if 'user ranges 1' in e:
# DEPRECATED, but supported on read. Converts user extra data 0
# (antic dl), 1 (jumpman level), and 2 (jumpman harvest) to user
# styles 2, 3, and 4. Data is now user style 1.
for r, val in e['user ranges 1']:
self.set_style_ranges([r], user=val + 2)
for i in range(1, style_bits.user_bit_mask):
slot = "user style %d" % i
if slot in e:
self.set_style_ranges(e[slot], user=i)
#### style
def set_style_at_indexes(self, indexes, **kwargs):
style_bits = get_style_bits(**kwargs)
self._style[indexes] |= style_bits
def clear_style_at_indexes(self, indexes, **kwargs):
style_mask = get_style_mask(**kwargs)[indexes] &= style_mask
def get_style_at_indexes(self, **kwargs):
"""Return a list of start, end pairs that match the specified style
style_bits = self.get_style_bits(**kwargs)
matches = (self._style & style_bits) == style_bits
return self.bool_to_ranges(matches)
def fixup_comments(self):
"""Remove any style bytes that are marked as commented but have no
comment, and add any style bytes where there's a comment but it isn't
marked in the style data.
This happens on the base data, so only need to do this on one segment
that uses this base data.
style_base = self.rawdata.style_base
comment_text_indexes = np.asarray(list(self.rawdata.extra.comments.keys()), dtype=np.uint32)
comment_mask = self.get_style_mask(comment=True)
has_comments = np.where(style_base & style_bits.comment_bit_mask > 0)[0]
both = np.intersect1d(comment_text_indexes, has_comments)"fixup comments: %d correctly marked, %d without style, %d empty text" % (np.alen(both), np.alen(comment_text_indexes) - np.alen(both), np.alen(has_comments) - np.alen(both)))
style_base &= comment_mask
comment_style = self.get_style_bits(comment=True)
style_base[comment_text_indexes] |= comment_style
class DiskImage(MediaType):
pretty_name = "Disk Image"
sector_size = 128
expected_size = 0
starting_sector_label = 1
def __str__(self):
return f"{self.pretty_name}, size={len(self)} ({self.num_sectors}x{self.sector_size}B)"
def verify_data(self):
size = len(self) - self.header_length
def check_media_size(self, size):
if size != self.expected_size:
raise errors.InvalidMediaSize(f"{self.pretty_name} expects size {self.expected_size}; found {size}")
def check_sector_size(self, size):
if size % self.sector_size != 0:
raise errors.InvalidMediaSize("{self.pretty_name} requires integer number of sectors")
self.num_sectors = size // self.sector_size
def sector_is_valid(self, sector):
return (self.num_sectors < 0) or (sector >= self.starting_sector_label and sector < (self.num_sectors + self.starting_sector_label))
def get_index_of_sector(self, sector):
if not self.sector_is_valid(sector):
raise errors.ByteNotInFile166("Sector %d out of range" % sector)
pos = (sector - self.starting_sector_label) * self.sector_size
return pos + self.header_length, self.sector_size
class CartImage(MediaType):
pretty_name = "Cart Image"
expected_size = 0
def __str__(self):
return f"{len(self) // 1024}K {self.pretty_name}"
def verify_data(self):
size = len(self) - self.header_length
def check_media_size(self, size):
k, rem = divmod(size, 1024)
if rem > 0:
raise errors.InvalidMediaSize("Cart not multiple of 1K")
if size != self.expected_size:
raise errors.InvalidMediaSize(f"{self.pretty_name} expects size {self.expected_size}; found {size}")
ignore_base_class_media_types = set([DiskImage, CartImage])
def find_media_types():
media_types = []
for entry_point in pkg_resources.iter_entry_points('atrcopy.media_types'):
mod = entry_point.load()
log.debug(f"find_media_type: Found module {}={mod.__name__}")
for name, obj in inspect.getmembers(mod):
if inspect.isclass(obj) and MediaType in obj.__mro__[1:] and obj not in ignore_base_class_media_types:
log.debug(f"find_media_types: found media_type class {name}")
return media_types
def guess_media_type(data, verbose=False):
for m in find_media_types():
if verbose:"trying media_type {m}")
found = m(data)
except errors.MediaError as e:
log.debug(f"found error: {e}")
if verbose:"found media_type {m}")
return found"No recognized media type.")
return MediaType(data)

View File

View File

@ -0,0 +1,13 @@
import numpy as np
from .. import errors
from ..media_type import DiskImage
import logging
log = logging.getLogger(__name__)
class Apple16SectorDiskImage(DiskImage):
pretty_name = "Apple ][ Floppy Disk Image (16 sector tracks)"
sector_size = 256
expected_size = 143360

View File

@ -0,0 +1,164 @@
import numpy as np
from .. import errors
from ..media_type import CartImage
import logging
log = logging.getLogger(__name__)
# From atari800 source
known_cart_types = [
# (note: all size units in KB)
# atari800 index number
# name
# total size
# static size
# static offset
# static address
# banked size
# banked offset (for bank zero)
# banked address
(0, "", 0,),
(57, "Standard 2 KB", 2, 2, 0, 0xb800),
(58, "Standard 4 KB", 4, 4, 0, 0xb000),
(59, "Right slot 4 KB", 4, 4, 0, 0, 0x9000),
(1, "Standard 8 KB", 8, 8, 0, 0xa000),
(21, "Right slot 8 KB", 8,),
(2, "Standard 16 KB", 16, 16, 0, 0x8000),
(44, "OSS 8 KB", 8,),
(15, "OSS one chip 16 KB", 16,),
(3, "OSS two chip (034M) 16 KB", 16, 4, 12, 0xb000, 4, 0, 0xa000),
(45, "OSS two chip (043M) 16 KB", 16, 4, 12, 0xb000, 4, 0, 0xa000),
(12, "XEGS 32 KB", 32, 8, 24, 0xa000, 8, 0, 0x8000),
(13, "XEGS (banks 0-7) 64 KB", 64, 8, 56, 0xa000, 8, 0, 0x8000),
(67, "XEGS (banks 8-15) 64 KB", 64, 8, 56, 0xa000, 8, 0, 0x8000),
(14, "XEGS 128 KB", 128, 8, 120, 0xa000, 8, 0, 0x8000),
(23, "XEGS 256 KB", 256, 8, 248, 0xa000, 8, 0, 0x8000),
(24, "XEGS 512 KB", 512, 8, 504, 0xa000, 8, 0, 0x8000),
(25, "XEGS 1 MB", 1024, 8, 1016, 0xa000, 8, 0, 0x8000 ),
(33, "Switchable XEGS 32 KB", 32, 8, 24, 0xa000, 8, 0, 0x8000),
(34, "Switchable XEGS 64 KB", 64, 8, 56, 0xa000, 8, 0, 0x8000),
(35, "Switchable XEGS 128 KB", 128, 8, 120, 0xa000, 8, 0, 0x8000),
(36, "Switchable XEGS 256 KB", 256, 8, 248, 0xa000, 8, 0, 0x8000),
(37, "Switchable XEGS 512 KB", 512, 8, 504, 0xa000, 8, 0, 0x8000),
(38, "Switchable XEGS 1 MB", 1024, 8, 1016, 0xa000, 8, 0, 0x8000 ),
(22, "Williams 32 KB", 32,),
(8, "Williams 64 KB", 64,),
(9, "Express 64 KB", 64,),
(10, "Diamond 64 KB", 64,),
(11, "SpartaDOS X 64 KB", 64,),
(43, "SpartaDOS X 128 KB", 128,),
(17, "Atrax 128 KB", 128,),
(18, "Bounty Bob 40 KB", 40,),
(26, "MegaCart 16 KB", 16,),
(27, "MegaCart 32 KB", 32,),
(28, "MegaCart 64 KB", 64,),
(29, "MegaCart 128 KB", 128,),
(30, "MegaCart 256 KB", 256,),
(31, "MegaCart 512 KB", 512,),
(32, "MegaCart 1 MB", 1024,),
(39, "Phoenix 8 KB", 8,),
(46, "Blizzard 4 KB", 4,),
(40, "Blizzard 16 KB", 16, 16, 0, 0x8000),
(60, "Blizzard 32 KB", 32,),
(41, "Atarimax 128 KB Flash", 128,),
(42, "Atarimax 1 MB Flash", 1024,),
(47, "AST 32 KB", 32,),
(48, "Atrax SDX 64 KB", 64,),
(49, "Atrax SDX 128 KB", 128,),
(50, "Turbosoft 64 KB", 64,),
(51, "Turbosoft 128 KB", 128,),
(52, "Ultracart 32 KB", 32,),
(53, "Low bank 8 KB", 8, 8, 0, 0x8000),
(5, "DB 32 KB", 32,),
(54, "SIC! 128 KB", 128,),
(55, "SIC! 256 KB", 256,),
(56, "SIC! 512 KB", 512,),
(61, "MegaMax 2 MB", 2048,),
(62, "The!Cart 128 MB", 128*1024,),
(63, "Flash MegaCart 4 MB", 4096,),
(64, "MegaCart 2 MB", 2048,),
(65, "The!Cart 32 MB", 32*1024,),
(66, "The!Cart 64 MB", 64*1024,),
(20, "Standard 4 KB 5200", 4, 4, 0, 0x8000),
(19, "Standard 8 KB 5200", 8, 8, 0, 0x8000),
(4, "Standard 32 KB 5200", 32, 32, 0, 0x4000),
(16, "One chip 16 KB 5200", 16,),
(6, "Two chip 16 KB 5200", 16,),
(7, "Bounty Bob 40 KB 5200", 40,),
known_cart_type_map = {c[0]:i for i, c in enumerate(known_cart_types)}
def get_known_carts():
grouped = defaultdict(list)
for c in known_cart_types[1:]:
size = c[2]
return grouped
def get_cart(cart_type):
return known_cart_types[known_cart_type_map[cart_type]]
except KeyError:
raise errors.InvalidCartHeader("Unsupported cart type %d" % cart_type)
class A8CartHeader:
# Atari Cart format described by NOTE: Big endian!
format = np.dtype([
('magic', '|S4'),
('format', '>u4'),
('checksum', '>u4'),
def __init__(self, data):
if len(data) == 16:
header = data.view(dtype=self.format)[0]
if header[0] != b'CART':
raise errors.InvalidCartHeader
self.cart_type = int(header[1])
self.crc = int(header[2])
raise errors.InvalidCartHeader
def __str__(self):
return "%s Cartridge (atari800 type=%d size=%d, %d banks, crc=%d)" % (self.cart_name, self.cart_type, self.cart_size, self.bank_size, self.crc)
def set_type(self, cart_type):
self.cart_type = cart_type
c = get_cart(cart_type)
self.cart_name = c[1]
self.cart_size = c[2]
self.main_size = self.cart_size
if len(c) >= 6:
self.main_size, self.main_offset, self.main_origin = c[3:6]
if len(c) >= 9:
self.banks = []
self.bank_size, offset, self.bank_origin = c[6:9]
s = self.cart_size - self.main_size
while s > 0:
offset += self.bank_size
s -= self.bank_size
def check_media(self, media):
media_size = len(media) - 16
if self.cart_size != media_size:
raise errors.InvalidCartHeader("Invalid cart size: {media_size}, expected {self.cart_size} for {self.cart_name}")
class Atari8bitCart(CartImage):
pretty_name = "Atari 8bit Cart"
def verify_header(self):
header_data =[0:16]
if len(header_data) == 16:
self.header = A8CartHeader(header_data)
self.header_length = 16

View File

@ -0,0 +1,136 @@
import numpy as np
from .. import errors
from ..media_type import DiskImage
import logging
log = logging.getLogger(__name__)
class AtrHeader:
# ATR Format described in
format = np.dtype([
('wMagic', '<u2'),
('wPars', '<u2'),
('wSecSize', '<u2'),
('btParsHigh', 'u1'),
file_format = "ATR"
def __init__(self, data):
header = data[0:16]
if len(header) == 16:
values = header.view(dtype=self.format)[0]
if values[0] != 0x296:
raise errors.InvalidAtrHeader("no ATR header magic value")
self.image_size = (int(values[3]) * 256 * 256 + int(values[1])) * 16
self.sector_size = int(values[2])
self.crc = int(values[4])
self.unused = int(values[5])
self.flags = int(values[6])
raise errors.InvalidAtrHeader("incorrect AHC header size of %d" % len(bytes))
def encode(self, raw):
values = raw.view(dtype=self.format)[0]
values[0] = 0x296
paragraphs = self.image_size // 16
parshigh, pars = divmod(paragraphs, 256*256)
values[1] = pars
values[2] = self.sector_size
values[3] = parshigh
values[4] = self.crc
values[5] = self.unused
values[6] = self.flags
return raw
def check_media(self, media):
if self.sector_size != media.sector_size:
raise errors.InvalidAtrHeader("ExpectedMismatch between sector sizes: header claims {self.sector_size}, expected {media.sector_size} for {media.pretty_name}")
media_size = len(media) - 16
if self.image_size != media_size:
raise errors.InvalidAtrHeader("Invalid media size: header claims {self.image_size}, expected {media_size} for {media.pretty_name}")
class AtariSingleDensity(DiskImage):
pretty_name = "Atari SD (90K) Floppy Disk Image"
sector_size = 128
expected_size = 92160
def verify_header(self):
header_data =[0:16]
if len(header_data) == 16:
self.header = AtrHeader(header_data)
self.header_length = 16
except errors.InvalidAtrHeader:
self.header = None
self.header_length = 0
raise errors.InvalidAtrHeader(f"file size {len(data)} small to be {self.pretty_name}")
def verify_data(self):
if self.header is not None:
if self.header.sector_size != self.sector_size:
raise errors.InvalidMediaSize(f"Sector size {header.sector_size} invalid for {self.pretty_name}")
class AtariSingleDensityShortImage(AtariSingleDensity):
pretty_name = "Atari SD Non-Standard Image"
def check_media_size(self, size):
if size >= self.expected_size:
raise errors.InvalidMediaSize(f"{self.pretty_name} must be less than size {self.expected_size}")
class AtariEnhancedDensity(AtariSingleDensity):
pretty_name = "Atari ED (130K) Floppy Disk Image"
sector_size = 128
expected_size = 133120
class AtariDoubleDensity(AtariSingleDensity):
pretty_name = "Atari DD (180K) Floppy Disk Image"
sector_size = 256
expected_size = 184320
class AtariDoubleDensityShortBootSectors(AtariDoubleDensity):
pretty_name = "Atari DD (180K) Floppy Disk Image (Short Boot Sectors)"
expected_size = 183936
initial_sector_size = 128
num_initial_sectors = 3
def check_sector_size(self, size):
initial_size = self.initial_sector_size * self.num_initial_sectors
remaining_size = size - initial_size
if remaining_size % self.sector_size != 0:
raise errors.InvalidMediaSize("ATR image not an integer number of sectors")
self.num_sectors = ((size - initial_size) // self.sector_size) + self.num_initial_sectors
def get_index_of_sector(self, sector):
if not self.sector_is_valid(sector):
raise errors.ByteNotInFile166("Sector %d out of range" % sector)
if sector <= self.num_initial_sectors:
pos = self.num_initial_sectors * (sector - 1)
size = self.initial_sector_size
pos = self.num_initial_sectors * self.initial_sector_size + (sector - 1 - self.num_initial_sectors) * self.sector_size
size = self.sector_size
pos += self.header_length
return pos, size
class AtariDoubleDensityHardDriveImage(AtariDoubleDensity):
pretty_name = "Atari DD Hard Drive Image"
def check_media_size(self, size):
if size <= self.expected_size:
raise errors.InvalidMediaSize(f"{self.pretty_name} must be greater than size {self.expected_size}")

View File

@ -35,6 +35,12 @@ setup(name="atrcopy",
'lzma = atrcopy.containers.lzma', 'lzma = atrcopy.containers.lzma',
'dcm = atrcopy.containers.dcm', 'dcm = atrcopy.containers.dcm',
], ],
"atrcopy.media_types": [
'atari_disks = atrcopy.media_types.atari_disks',
'atari_carts = atrcopy.media_types.atari_carts',
'apple_disks = atrcopy.media_types.apple_disks',
}, },
description="Utility to manage file systems on Atari 8-bit (DOS 2) and Apple ][ (DOS 3.3) disk images.", description="Utility to manage file systems on Atari 8-bit (DOS 2) and Apple ][ (DOS 3.3) disk images.",
long_description=long_description, long_description=long_description,

test/ Normal file
View File

@ -0,0 +1,63 @@
import glob
import numpy as np
from mock import *
from atrcopy.container import guess_container
from atrcopy.media_type import MediaType, guess_media_type
from atrcopy import errors
from atrcopy.media_types.atari_disks import *
from atrcopy.media_types.apple_disks import *
ext_to_valid_types = {
'.atr': set([
'.dsk': set([
class TestMediaTypesInTestDataDir:
base_path = None
expected_mime = ""
def test_test_data_dir(self):
for pathname in sorted(glob.glob(os.path.join(os.path.dirname(__file__), "../test_data/", "*"))):
wrapped, ext = os.path.splitext(pathname)
print(f"checking {pathname}")
sample_data = np.fromfile(pathname, dtype=np.uint8)
container, uncompressed_data = guess_container(sample_data)
if container:
_, ext = os.path.splitext(wrapped)
media = guess_media_type(uncompressed_data)
print(f"{pathname}: {media}")
if ext in ext_to_valid_types:
assert media.__class__ in ext_to_valid_types[ext]
assert media.__class__ == MediaType
if __name__ == "__main__":
import logging
log = logging.getLogger("atrcopy.media_type")
import glob
for pathname in sorted(glob.glob(os.path.join(os.path.dirname(__file__), "../test_data/", "*"))):
print(f"checking {pathname}")
sample_data = np.fromfile(pathname, dtype=np.uint8)
container, uncompressed_data = guess_container(sample_data)
# if container: print(
media = guess_media_type(uncompressed_data)
print(f"{pathname}: {media}")