Compare commits
58 Commits
Author | SHA1 | Date | |
---|---|---|---|
82b5926b4f | |||
5456013bf4 | |||
b595456a05 | |||
d367a9238a | |||
33c4016124 | |||
b01cfc77cf | |||
a9f54b678c | |||
b46018e666 | |||
b0eefe3889 | |||
3e0bbcee04 | |||
13654c2560 | |||
d5199bd503 | |||
c5c3f24a10 | |||
7c77c4ef20 | |||
f7b6080c0e | |||
007d15eb3d | |||
246b69e375 | |||
d67ff64851 | |||
5391d66a78 | |||
5b2700bf17 | |||
c41b25fea1 | |||
a45dbd8eca | |||
3401ce65dd | |||
890dd24f76 | |||
67c2b4acf0 | |||
238c78a73e | |||
fbd861edf4 | |||
a7a407a1dd | |||
ecee2616cf | |||
ba284d1800 | |||
f690caac24 | |||
3a805c3e56 | |||
6adf8eb88d | |||
e132a91dea | |||
4e1cd05412 | |||
1a416defed | |||
1089a19c01 | |||
8fc24040ea | |||
d492d9a6a8 | |||
d0e1eaf262 | |||
1e55569442 | |||
2abf6e2a06 | |||
2b0bbb19ed | |||
c009e8f80f | |||
d67641d537 | |||
d6dbfdb149 | |||
b2502c48a2 | |||
158ca4884b | |||
8568f355c4 | |||
97d2dbe1b3 | |||
a4b6328782 | |||
393160b5da | |||
476eaecd17 | |||
546edbc31a | |||
cf6ce3c2a6 | |||
af2ac70676 | |||
5af455992b | |||
2193c81518 |
@ -8,3 +8,7 @@ insert_final_newline = true
|
||||
[*.rst]
|
||||
indent_style = space
|
||||
indent_size = 4
|
||||
|
||||
[*.yml]
|
||||
indent_style = space
|
||||
indent_size = 2
|
||||
|
20
.github/workflows/ci.yml
vendored
Normal file
@ -0,0 +1,20 @@
|
||||
on: [pull_request, push]
|
||||
jobs:
|
||||
test:
|
||||
strategy:
|
||||
matrix:
|
||||
platform: [macos-latest, ubuntu-latest, windows-latest]
|
||||
runs-on: ${{ matrix.platform }}
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-python@v1
|
||||
with:
|
||||
python-version: "3.6"
|
||||
- uses: actions/setup-python@v1
|
||||
with:
|
||||
python-version: "3.7"
|
||||
- uses: actions/setup-python@v1
|
||||
with:
|
||||
python-version: "3.8"
|
||||
- run: python -m pip install --upgrade tox
|
||||
- run: tox
|
3
.gitignore
vendored
@ -2,6 +2,9 @@
|
||||
*.py[co]
|
||||
__pycache__/
|
||||
|
||||
# tox
|
||||
.tox/
|
||||
|
||||
# setuptools
|
||||
*.egg-info/
|
||||
build/
|
||||
|
6
MANIFEST.in
Normal file
@ -0,0 +1,6 @@
|
||||
# Note: See the PyPA documentation for a list of file names that are included/excluded by default:
|
||||
# https://packaging.python.org/guides/using-manifest-in/#how-files-are-included-in-an-sdist
|
||||
# Please only add entries here for files that are *not* already handled by default.
|
||||
|
||||
recursive-include tests *.py
|
||||
recursive-include tests/data *.rsrc
|
20
README.rst
@ -113,6 +113,26 @@ For technical info and documentation about resource files and resources, see the
|
||||
Changelog
|
||||
---------
|
||||
|
||||
Version 1.8.0
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
* Removed the old (non-subcommand-based) CLI syntax.
|
||||
* Added filtering support to the ``list`` subcommand.
|
||||
* Added a ``resource-info`` subcommand to display technical information about resources (more detailed than what is displayed by ``list`` and ``read``).
|
||||
* Added a ``raw-compress-info`` subcommand to display technical header information about standalone compressed resource data.
|
||||
* Made the library PEP 561-compliant by adding a py.typed file.
|
||||
* Fixed an incorrect ``AssertionError`` when using the ``--no-decompress`` command-line options.
|
||||
|
||||
Version 1.7.0
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
* Added a ``raw-decompress`` subcommand to decompress compressed resource data stored in a standalone file rather than as a resource.
|
||||
* Optimized lazy loading of ``Resource`` objects. Previously, resource data would be read from disk whenever a ``Resource`` object was looked up, even if the data itself is never used. Now the resource data is only loaded once the ``data`` (or ``data_raw``) attribute is accessed.
|
||||
|
||||
* The same optimization applies to the ``name`` attribute, although this is unlikely to make a difference in practice.
|
||||
* As a result, it is no longer possible to construct ``Resource`` objects without a resource file. This was previously possible, but had no practical use.
|
||||
* Fixed a small error in the ``'dcmp' (0)`` decompression implementation.
|
||||
|
||||
Version 1.6.0
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
|
6
pyproject.toml
Normal file
@ -0,0 +1,6 @@
|
||||
[build-system]
|
||||
requires = [
|
||||
"setuptools >= 46.4.0",
|
||||
"wheel >= 0.32.0",
|
||||
]
|
||||
build-backend = "setuptools.build_meta"
|
@ -20,7 +20,7 @@
|
||||
# * Add a new empty section for the next version to the README.rst changelog.
|
||||
# * Commit and push the changes to master.
|
||||
|
||||
__version__ = "1.6.0"
|
||||
__version__ = "1.8.0"
|
||||
|
||||
__all__ = [
|
||||
"Resource",
|
||||
@ -31,8 +31,8 @@ __all__ = [
|
||||
"open",
|
||||
]
|
||||
|
||||
from . import api, compress
|
||||
from .api import Resource, ResourceAttrs, ResourceFile, ResourceFileAttrs
|
||||
from . import compress
|
||||
|
||||
# noinspection PyShadowingBuiltins
|
||||
open = ResourceFile.open
|
||||
|
168
rsrcfork/api.py
@ -59,9 +59,11 @@ STRUCT_RESOURCE_REFERENCE = struct.Struct(">hHI4x")
|
||||
# 1 byte: Length of following resource name.
|
||||
STRUCT_RESOURCE_NAME_HEADER = struct.Struct(">B")
|
||||
|
||||
|
||||
class InvalidResourceFileError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ResourceFileAttrs(enum.Flag):
|
||||
"""Resource file attribute flags. The descriptions for these flags are taken from comments on the map*Bit and map* enum constants in <CarbonCore/Resources.h>."""
|
||||
|
||||
@ -82,6 +84,7 @@ class ResourceFileAttrs(enum.Flag):
|
||||
_BIT_1 = 1 << 1
|
||||
_BIT_0 = 1 << 0
|
||||
|
||||
|
||||
class ResourceAttrs(enum.Flag):
|
||||
"""Resource attribute flags. The descriptions for these flags are taken from comments on the res*Bit and res* enum constants in <CarbonCore/Resources.h>."""
|
||||
|
||||
@ -94,27 +97,35 @@ class ResourceAttrs(enum.Flag):
|
||||
resChanged = 1 << 1 # "Existing resource changed since last update", "Resource changed?"
|
||||
resCompressed = 1 << 0 # "indicates that the resource data is compressed" (only documented in https://github.com/kreativekorp/ksfl/wiki/Macintosh-Resource-File-Format)
|
||||
|
||||
|
||||
class Resource(object):
|
||||
"""A single resource from a resource file."""
|
||||
|
||||
_resfile: "ResourceFile"
|
||||
type: bytes
|
||||
id: int
|
||||
name: typing.Optional[bytes]
|
||||
name_offset: int
|
||||
_name: typing.Optional[bytes]
|
||||
attributes: ResourceAttrs
|
||||
data_raw: bytes
|
||||
data_raw_offset: int
|
||||
_data_raw: bytes
|
||||
_compressed_info: compress.common.CompressedHeaderInfo
|
||||
_data_decompressed: bytes
|
||||
|
||||
def __init__(self, resource_type: bytes, resource_id: int, name: typing.Optional[bytes], attributes: ResourceAttrs, data_raw: bytes) -> None:
|
||||
"""Create a new resource with the given type code, ID, name, attributes, and data."""
|
||||
def __init__(self, resfile: "ResourceFile", resource_type: bytes, resource_id: int, name_offset: int, attributes: ResourceAttrs, data_raw_offset: int) -> None:
|
||||
"""Create a resource object representing a resource stored in a resource file.
|
||||
|
||||
External code should not call this constructor manually. Resources should be looked up through a ResourceFile object instead.
|
||||
"""
|
||||
|
||||
super().__init__()
|
||||
|
||||
self._resfile = resfile
|
||||
self.type = resource_type
|
||||
self.id = resource_id
|
||||
self.name = name
|
||||
self.name_offset = name_offset
|
||||
self.attributes = attributes
|
||||
self.data_raw = data_raw
|
||||
self.data_raw_offset = data_raw_offset
|
||||
|
||||
def __repr__(self) -> str:
|
||||
try:
|
||||
@ -126,14 +137,14 @@ class Resource(object):
|
||||
decompress_ok = True
|
||||
|
||||
if len(data) > 32:
|
||||
data_repr = f"<{len(data)} bytes: {data[:32]}...>"
|
||||
data_repr = f"<{len(data)} bytes: {data[:32]!r}...>"
|
||||
else:
|
||||
data_repr = repr(data)
|
||||
|
||||
if not decompress_ok:
|
||||
data_repr = f"<decompression failed - compressed data: {data_repr}>"
|
||||
|
||||
return f"{type(self).__module__}.{type(self).__qualname__}(type={self.type}, id={self.id}, name={self.name}, attributes={self.attributes}, data={data_repr})"
|
||||
return f"<{type(self).__qualname__} type {self.type!r}, id {self.id}, name {self.name!r}, attributes {self.attributes}, data {data_repr}>"
|
||||
|
||||
@property
|
||||
def resource_type(self) -> bytes:
|
||||
@ -145,6 +156,30 @@ class Resource(object):
|
||||
warnings.warn(DeprecationWarning("The resource_id attribute has been deprecated and will be removed in a future version. Please use the id attribute instead."))
|
||||
return self.id
|
||||
|
||||
@property
|
||||
def name(self) -> typing.Optional[bytes]:
|
||||
try:
|
||||
return self._name
|
||||
except AttributeError:
|
||||
if self.name_offset == 0xffff:
|
||||
self._name = None
|
||||
else:
|
||||
self._resfile._stream.seek(self._resfile.map_offset + self._resfile.map_name_list_offset + self.name_offset)
|
||||
(name_length,) = self._resfile._stream_unpack(STRUCT_RESOURCE_NAME_HEADER)
|
||||
self._name = self._resfile._read_exact(name_length)
|
||||
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def data_raw(self) -> bytes:
|
||||
try:
|
||||
return self._data_raw
|
||||
except AttributeError:
|
||||
self._resfile._stream.seek(self._resfile.data_offset + self.data_raw_offset)
|
||||
(data_raw_length,) = self._resfile._stream_unpack(STRUCT_RESOURCE_DATA_HEADER)
|
||||
self._data_raw = self._resfile._read_exact(data_raw_length)
|
||||
return self._data_raw
|
||||
|
||||
@property
|
||||
def compressed_info(self) -> typing.Optional[compress.common.CompressedHeaderInfo]:
|
||||
"""The compressed resource header information, or None if this resource is not compressed.
|
||||
@ -198,65 +233,56 @@ class Resource(object):
|
||||
else:
|
||||
return self.data_raw
|
||||
|
||||
|
||||
class _LazyResourceMap(typing.Mapping[int, Resource]):
|
||||
"""Internal class: Read-only wrapper for a mapping of resource IDs to resource objects.
|
||||
|
||||
This class behaves like a normal read-only mapping. The main difference to a plain dict (or similar mapping) is that this mapping has a specialized repr to avoid excessive output when working in the REPL.
|
||||
"""
|
||||
|
||||
type: bytes
|
||||
_submap: typing.Mapping[int, Resource]
|
||||
|
||||
def __init__(self, resource_type: bytes, submap: typing.Mapping[int, Resource]) -> None:
|
||||
"""Create a new _LazyResourceMap that wraps the given mapping."""
|
||||
|
||||
super().__init__()
|
||||
|
||||
self.type = resource_type
|
||||
self._submap = submap
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""Get the number of resources with this type code."""
|
||||
|
||||
return len(self._submap)
|
||||
|
||||
def __iter__(self) -> typing.Iterator[int]:
|
||||
"""Iterate over the IDs of all resources with this type code."""
|
||||
|
||||
return iter(self._submap)
|
||||
|
||||
def __contains__(self, key: object) -> bool:
|
||||
"""Check if a resource with the given ID exists for this type code."""
|
||||
|
||||
return key in self._submap
|
||||
|
||||
def __getitem__(self, key: int) -> Resource:
|
||||
"""Get a resource with the given ID for this type code."""
|
||||
|
||||
return self._submap[key]
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if len(self) == 1:
|
||||
contents = f"one resource: {next(iter(self.values()))}"
|
||||
else:
|
||||
contents = f"{len(self)} resources with IDs {list(self)}"
|
||||
|
||||
return f"<Resource map for type {self.type!r}, containing {contents}>"
|
||||
|
||||
|
||||
class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.ContextManager["ResourceFile"]):
|
||||
"""A resource file reader operating on a byte stream."""
|
||||
|
||||
# noinspection PyProtectedMember
|
||||
class _LazyResourceMap(typing.Mapping[int, Resource]):
|
||||
"""Internal class: Lazy mapping of resource IDs to resource objects, returned when subscripting a ResourceFile."""
|
||||
|
||||
_resfile: "ResourceFile"
|
||||
_restype: bytes
|
||||
_submap: typing.Mapping[int, typing.Tuple[int, ResourceAttrs, int]]
|
||||
|
||||
def __init__(self, resfile: "ResourceFile", restype: bytes) -> None:
|
||||
"""Create a new _LazyResourceMap "containing" all resources in resfile that have the type code restype."""
|
||||
|
||||
super().__init__()
|
||||
|
||||
self._resfile = resfile
|
||||
self._restype = restype
|
||||
self._submap = self._resfile._references[self._restype]
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""Get the number of resources with this type code."""
|
||||
|
||||
return len(self._submap)
|
||||
|
||||
def __iter__(self) -> typing.Iterator[int]:
|
||||
"""Iterate over the IDs of all resources with this type code."""
|
||||
|
||||
return iter(self._submap)
|
||||
|
||||
def __contains__(self, key: object) -> bool:
|
||||
"""Check if a resource with the given ID exists for this type code."""
|
||||
|
||||
return key in self._submap
|
||||
|
||||
def __getitem__(self, key: int) -> Resource:
|
||||
"""Get a resource with the given ID for this type code."""
|
||||
|
||||
name_offset, attributes, data_offset = self._submap[key]
|
||||
|
||||
if name_offset == 0xffff:
|
||||
name = None
|
||||
else:
|
||||
self._resfile._stream.seek(self._resfile.map_offset + self._resfile.map_name_list_offset + name_offset)
|
||||
(name_length,) = self._resfile._stream_unpack(STRUCT_RESOURCE_NAME_HEADER)
|
||||
name = self._resfile._read_exact(name_length)
|
||||
|
||||
self._resfile._stream.seek(self._resfile.data_offset + data_offset)
|
||||
(data_length,) = self._resfile._stream_unpack(STRUCT_RESOURCE_DATA_HEADER)
|
||||
data = self._resfile._read_exact(data_length)
|
||||
|
||||
return Resource(self._restype, key, name, attributes, data)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if len(self) == 1:
|
||||
return f"<{type(self).__module__}.{type(self).__qualname__} at {id(self):#x} containing one resource: {next(iter(self.values()))}>"
|
||||
else:
|
||||
return f"<{type(self).__module__}.{type(self).__qualname__} at {id(self):#x} containing {len(self)} resources with IDs: {list(self)}>"
|
||||
|
||||
_close_stream: bool
|
||||
_stream: typing.BinaryIO
|
||||
|
||||
@ -272,10 +298,10 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
|
||||
file_attributes: ResourceFileAttrs
|
||||
|
||||
_reference_counts: typing.MutableMapping[bytes, int]
|
||||
_references: typing.MutableMapping[bytes, typing.MutableMapping[int, typing.Tuple[int, ResourceAttrs, int]]]
|
||||
_references: typing.MutableMapping[bytes, typing.MutableMapping[int, Resource]]
|
||||
|
||||
@classmethod
|
||||
def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str="auto", **kwargs: typing.Any) -> "ResourceFile":
|
||||
def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str = "auto", **kwargs: typing.Any) -> "ResourceFile":
|
||||
"""Open the file at the given path as a ResourceFile.
|
||||
|
||||
The fork parameter controls which fork of the file the resource data will be read from. It accepts the following values:
|
||||
@ -334,7 +360,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
|
||||
else:
|
||||
raise ValueError(f"Unsupported value for the fork parameter: {fork!r}")
|
||||
|
||||
def __init__(self, stream: typing.BinaryIO, *, close: bool=False) -> None:
|
||||
def __init__(self, stream: typing.BinaryIO, *, close: bool = False) -> None:
|
||||
"""Create a ResourceFile wrapping the given byte stream.
|
||||
|
||||
To read resource file data from a bytes object, wrap it in an io.BytesIO.
|
||||
@ -433,7 +459,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
|
||||
self._references = collections.OrderedDict()
|
||||
|
||||
for resource_type, count in self._reference_counts.items():
|
||||
resmap: typing.MutableMapping[int, typing.Tuple[int, ResourceAttrs, int]] = collections.OrderedDict()
|
||||
resmap: typing.MutableMapping[int, Resource] = collections.OrderedDict()
|
||||
self._references[resource_type] = resmap
|
||||
for _ in range(count):
|
||||
(
|
||||
@ -445,7 +471,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
|
||||
attributes = attributes_and_data_offset >> 24
|
||||
data_offset = attributes_and_data_offset & ((1 << 24) - 1)
|
||||
|
||||
resmap[resource_id] = (name_offset, ResourceAttrs(attributes), data_offset)
|
||||
resmap[resource_id] = Resource(self, resource_type, resource_id, name_offset, ResourceAttrs(attributes), data_offset)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close this ResourceFile.
|
||||
@ -483,10 +509,10 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
|
||||
|
||||
return key in self._references
|
||||
|
||||
def __getitem__(self, key: bytes) -> "ResourceFile._LazyResourceMap":
|
||||
def __getitem__(self, key: bytes) -> "_LazyResourceMap":
|
||||
"""Get a lazy mapping of all resources with the given type in this ResourceFile."""
|
||||
|
||||
return ResourceFile._LazyResourceMap(self, key)
|
||||
return _LazyResourceMap(key, self._references[key])
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<{type(self).__module__}.{type(self).__qualname__} at {id(self):#x}, attributes {self.file_attributes}, containing {len(self)} resource types: {list(self)}>"
|
||||
|
@ -5,10 +5,12 @@ from . import dcmp0
|
||||
from . import dcmp1
|
||||
from . import dcmp2
|
||||
|
||||
from .common import DecompressError, CompressedHeaderInfo
|
||||
from .common import DecompressError, CompressedHeaderInfo, CompressedType8HeaderInfo, CompressedType9HeaderInfo
|
||||
|
||||
__all__ = [
|
||||
"CompressedHeaderInfo",
|
||||
"CompressedType8HeaderInfo",
|
||||
"CompressedType9HeaderInfo",
|
||||
"DecompressError",
|
||||
"decompress",
|
||||
"decompress_parsed",
|
||||
@ -26,7 +28,7 @@ DECOMPRESSORS = {
|
||||
}
|
||||
|
||||
|
||||
def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed resource data from a stream, whose header has already been read and parsed into a CompressedHeaderInfo object."""
|
||||
|
||||
try:
|
||||
@ -42,12 +44,14 @@ def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.B
|
||||
if decompressed_length != header_info.decompressed_length:
|
||||
raise DecompressError(f"Actual length of decompressed data ({decompressed_length}) does not match length stored in resource ({header_info.decompressed_length})")
|
||||
|
||||
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
|
||||
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool = False) -> bytes:
|
||||
"""Decompress the given compressed resource data, whose header has already been removed and parsed into a CompressedHeaderInfo object."""
|
||||
|
||||
return b"".join(decompress_stream_parsed(header_info, io.BytesIO(data), debug=debug))
|
||||
|
||||
def decompress_stream(stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
|
||||
def decompress_stream(stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed resource data from a stream."""
|
||||
|
||||
header_info = CompressedHeaderInfo.parse_stream(stream)
|
||||
@ -57,7 +61,8 @@ def decompress_stream(stream: typing.BinaryIO, *, debug: bool=False) -> typing.I
|
||||
|
||||
yield from decompress_stream_parsed(header_info, stream, debug=debug)
|
||||
|
||||
def decompress(data: bytes, *, debug: bool=False) -> bytes:
|
||||
|
||||
def decompress(data: bytes, *, debug: bool = False) -> bytes:
|
||||
"""Decompress the given compressed resource data."""
|
||||
|
||||
return b"".join(decompress_stream(io.BytesIO(data), debug=debug))
|
||||
|
@ -41,9 +41,9 @@ class CompressedHeaderInfo(object):
|
||||
try:
|
||||
signature, header_length, compression_type, decompressed_length, remainder = STRUCT_COMPRESSED_HEADER.unpack(stream.read(STRUCT_COMPRESSED_HEADER.size))
|
||||
except struct.error:
|
||||
raise DecompressError(f"Invalid header")
|
||||
raise DecompressError("Invalid header")
|
||||
if signature != COMPRESSED_SIGNATURE:
|
||||
raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE}")
|
||||
raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE!r}")
|
||||
if header_length != 0x12:
|
||||
raise DecompressError(f"Unsupported header length: 0x{header_length:>04x}, expected 0x12")
|
||||
|
||||
@ -112,9 +112,14 @@ if typing.TYPE_CHECKING:
|
||||
The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable.
|
||||
"""
|
||||
|
||||
def readable(self) -> bool: ...
|
||||
def read(self, size: typing.Optional[int] = ...) -> bytes: ...
|
||||
def peek(self, size: int = ...) -> bytes: ...
|
||||
def readable(self) -> bool:
|
||||
...
|
||||
|
||||
def read(self, size: typing.Optional[int] = ...) -> bytes:
|
||||
...
|
||||
|
||||
def peek(self, size: int = ...) -> bytes:
|
||||
...
|
||||
|
||||
|
||||
class _PeekableIOWrapper(object):
|
||||
@ -165,7 +170,8 @@ def make_peekable(stream: typing.BinaryIO) -> "PeekableIO":
|
||||
if hasattr(stream, "peek"):
|
||||
# Stream is already peekable, nothing to be done.
|
||||
return typing.cast("PeekableIO", stream)
|
||||
elif isinstance(stream, io.RawIOBase):
|
||||
elif not typing.TYPE_CHECKING and isinstance(stream, io.RawIOBase):
|
||||
# This branch is skipped when type checking - mypy incorrectly warns about this code being unreachable, because it thinks that a typing.BinaryIO cannot be an instance of io.RawIOBase.
|
||||
# Raw IO streams can be wrapped efficiently using BufferedReader.
|
||||
return io.BufferedReader(stream)
|
||||
else:
|
||||
@ -181,6 +187,7 @@ def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes:
|
||||
raise DecompressError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes")
|
||||
return data
|
||||
|
||||
|
||||
def read_variable_length_integer(stream: typing.BinaryIO) -> int:
|
||||
"""Read a variable-length integer from the stream.
|
||||
|
||||
|
@ -1,4 +1,3 @@
|
||||
import io
|
||||
import typing
|
||||
|
||||
from . import common
|
||||
@ -39,7 +38,7 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
|
||||
assert len(TABLE) == len(range(0x4b, 0xfe))
|
||||
|
||||
|
||||
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedType8HeaderInfo):
|
||||
@ -111,7 +110,7 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
|
||||
# Compact representation of (part of) a segment loader jump table, as used in 'CODE' (0) resources.
|
||||
|
||||
if debug:
|
||||
print(f"Segment loader jump table entries")
|
||||
print("Segment loader jump table entries")
|
||||
|
||||
# All generated jump table entries have the same segment number.
|
||||
segment_number_int = common.read_variable_length_integer(stream)
|
||||
@ -119,7 +118,7 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
|
||||
print(f"\t-> segment number: {segment_number_int:#x}")
|
||||
|
||||
# The tail part of all jump table entries (i. e. everything except for the address).
|
||||
entry_tail = b"?<" + segment_number_int.to_bytes(2, "big", signed=True) + b"\xa9\xf0"
|
||||
entry_tail = b"?<" + segment_number_int.to_bytes(2, "big", signed=False) + b"\xa9\xf0"
|
||||
# The tail is output once *without* an address in front, i. e. the first entry's address must be generated manually by a previous code.
|
||||
yield entry_tail
|
||||
|
||||
@ -169,13 +168,13 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
|
||||
raise common.DecompressError(f"Repeat count must be positive: {count}")
|
||||
|
||||
if debug:
|
||||
print(f"\t-> {to_repeat} * {count}")
|
||||
print(f"\t-> {to_repeat!r} * {count}")
|
||||
yield to_repeat * count
|
||||
elif kind == 0x04:
|
||||
# A sequence of 16-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
|
||||
|
||||
if debug:
|
||||
print(f"Difference-encoded 16-bit integers")
|
||||
print("Difference-encoded 16-bit integers")
|
||||
|
||||
# The first integer is stored explicitly, as a signed value.
|
||||
initial_int = common.read_variable_length_integer(stream)
|
||||
@ -207,7 +206,7 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
|
||||
# A sequence of 32-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
|
||||
|
||||
if debug:
|
||||
print(f"Difference-encoded 32-bit integers")
|
||||
print("Difference-encoded 32-bit integers")
|
||||
|
||||
# The first integer is stored explicitly, as a signed value.
|
||||
initial_int = common.read_variable_length_integer(stream)
|
||||
@ -243,18 +242,19 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
|
||||
# Check that there really is no more data left.
|
||||
extra = stream.read(1)
|
||||
if extra:
|
||||
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra})")
|
||||
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra!r})")
|
||||
break
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
|
||||
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (0)."""
|
||||
|
||||
|
||||
decompressed_length = 0
|
||||
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
|
||||
if debug:
|
||||
print(f"\t-> {chunk}")
|
||||
print(f"\t-> {chunk!r}")
|
||||
|
||||
if header_info.decompressed_length % 2 != 0 and decompressed_length + len(chunk) == header_info.decompressed_length + 1:
|
||||
# Special case: if the decompressed data length stored in the header is odd and one less than the length of the actual decompressed data, drop the last byte.
|
||||
|
@ -1,4 +1,3 @@
|
||||
import io
|
||||
import typing
|
||||
|
||||
from . import common
|
||||
@ -22,7 +21,7 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
|
||||
assert len(TABLE) == len(range(0xd5, 0xfe))
|
||||
|
||||
|
||||
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedType8HeaderInfo):
|
||||
@ -112,7 +111,7 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
|
||||
raise common.DecompressError(f"Repeat count must be positive: {count}")
|
||||
|
||||
if debug:
|
||||
print(f"\t-> {to_repeat} * {count}")
|
||||
print(f"\t-> {to_repeat!r} * {count}")
|
||||
yield to_repeat * count
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
|
||||
@ -124,18 +123,19 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
|
||||
# Check that there really is no more data left.
|
||||
extra = stream.read(1)
|
||||
if extra:
|
||||
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra})")
|
||||
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra!r})")
|
||||
break
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
|
||||
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (1)."""
|
||||
|
||||
decompressed_length = 0
|
||||
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
|
||||
if debug:
|
||||
print(f"\t-> {chunk}")
|
||||
print(f"\t-> {chunk!r}")
|
||||
|
||||
decompressed_length += len(chunk)
|
||||
yield chunk
|
||||
|
@ -1,5 +1,4 @@
|
||||
import enum
|
||||
import io
|
||||
import struct
|
||||
import typing
|
||||
|
||||
@ -74,7 +73,7 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool
|
||||
)
|
||||
|
||||
|
||||
def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
while True: # Loop is terminated when EOF is reached.
|
||||
table_index_data = stream.read(1)
|
||||
if not table_index_data:
|
||||
@ -83,17 +82,18 @@ def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int,
|
||||
elif not stream.peek(1) and decompressed_length % 2 != 0:
|
||||
# Special case: if we are at the last byte of the compressed data, and the decompressed data has an odd length, the last byte is a single literal byte, and not a table reference.
|
||||
if debug:
|
||||
print(f"Last byte: {table_index_data}")
|
||||
print(f"Last byte: {table_index_data!r}")
|
||||
yield table_index_data
|
||||
break
|
||||
|
||||
# Compressed data is untagged, every byte is a table reference.
|
||||
(table_index,) = table_index_data
|
||||
if debug:
|
||||
print(f"Reference: {table_index} -> {table[table_index]}")
|
||||
print(f"Reference: {table_index} -> {table[table_index]!r}")
|
||||
yield table[table_index]
|
||||
|
||||
def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
|
||||
def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
while True: # Loop is terminated when EOF is reached.
|
||||
tag_data = stream.read(1)
|
||||
if not tag_data:
|
||||
@ -102,7 +102,7 @@ def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, ta
|
||||
elif not stream.peek(1) and decompressed_length % 2 != 0:
|
||||
# Special case: if we are at the last byte of the compressed data, and the decompressed data has an odd length, the last byte is a single literal byte, and not a tag or a table reference.
|
||||
if debug:
|
||||
print(f"Last byte: {tag_data}")
|
||||
print(f"Last byte: {tag_data!r}")
|
||||
yield tag_data
|
||||
break
|
||||
|
||||
@ -119,7 +119,7 @@ def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, ta
|
||||
break
|
||||
(table_index,) = table_index_data
|
||||
if debug:
|
||||
print(f"Reference: {table_index} -> {table[table_index]}")
|
||||
print(f"Reference: {table_index} -> {table[table_index]!r}")
|
||||
yield table[table_index]
|
||||
else:
|
||||
# This is a literal (two uncompressed bytes that are literally copied into the output).
|
||||
@ -129,11 +129,11 @@ def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, ta
|
||||
break
|
||||
# Note: the literal may be only a single byte long if it is located exactly at EOF. This is intended and expected - the 1-byte literal is yielded normally, and on the next iteration, decompression is terminated as EOF is detected.
|
||||
if debug:
|
||||
print(f"Literal: {literal}")
|
||||
print(f"Literal: {literal!r}")
|
||||
yield literal
|
||||
|
||||
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (2)."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedType9HeaderInfo):
|
||||
|
0
rsrcfork/py.typed
Normal file
32
setup.cfg
@ -18,8 +18,10 @@ classifiers =
|
||||
Programming Language :: Python :: 3 :: Only
|
||||
Programming Language :: Python :: 3.6
|
||||
Programming Language :: Python :: 3.7
|
||||
Programming Language :: Python :: 3.8
|
||||
license = MIT
|
||||
license_file = LICENSE
|
||||
license_files =
|
||||
LICENSE
|
||||
description = A pure Python, cross-platform library/tool for reading Macintosh resource data, as stored in resource forks and ``.rsrc`` files
|
||||
long_description = file: README.rst
|
||||
long_description_content_type = text/x-rst
|
||||
@ -33,11 +35,16 @@ keywords =
|
||||
macos
|
||||
|
||||
[options]
|
||||
setup_requires =
|
||||
setuptools>=39.2.0
|
||||
# mypy can only find type hints in the package if zip_safe is set to False,
|
||||
# see https://mypy.readthedocs.io/en/latest/installed_packages.html#making-pep-561-compatible-packages
|
||||
zip_safe = False
|
||||
python_requires = >=3.6
|
||||
packages = find:
|
||||
|
||||
[options.package_data]
|
||||
rsrcfork =
|
||||
py.typed
|
||||
|
||||
[options.packages.find]
|
||||
include =
|
||||
rsrcfork
|
||||
@ -47,6 +54,25 @@ include =
|
||||
console_scripts =
|
||||
rsrcfork = rsrcfork.__main__:main
|
||||
|
||||
[flake8]
|
||||
extend-exclude =
|
||||
.mypy_cache/,
|
||||
build/,
|
||||
dist/,
|
||||
|
||||
# The following issues are ignored because they do not match our code style:
|
||||
ignore =
|
||||
E226, # missing whitespace around arithmetic operator
|
||||
E261, # at least two spaces before inline comment
|
||||
E501, # line too long
|
||||
W293, # blank line contains whitespace
|
||||
W503, # line break before binary operator
|
||||
|
||||
# flake8-tabs configuration
|
||||
use-flake8-tabs = true
|
||||
blank-lines-indent = always
|
||||
indent-tabs-def = 1
|
||||
|
||||
[mypy]
|
||||
files=rsrcfork/**/*.py
|
||||
python_version = 3.6
|
||||
|
BIN
tests/data/compress/compressed/Finder Help.rsrc
Normal file
After Width: | Height: | Size: 35 KiB |
BIN
tests/data/compress/compressed/Finder.rsrc
Normal file
After Width: | Height: | Size: 355 KiB |
BIN
tests/data/compress/compressed/Install.rsrc
Normal file
After Width: | Height: | Size: 127 KiB |
BIN
tests/data/compress/compressed/System.rsrc
Normal file
After Width: | Height: | Size: 884 KiB |
BIN
tests/data/compress/uncompressed/Finder Help.rsrc
Normal file
After Width: | Height: | Size: 51 KiB |
BIN
tests/data/compress/uncompressed/Finder.rsrc
Normal file
After Width: | Height: | Size: 478 KiB |
BIN
tests/data/compress/uncompressed/Install.rsrc
Normal file
After Width: | Height: | Size: 159 KiB |
BIN
tests/data/compress/uncompressed/System.rsrc
Normal file
After Width: | Height: | Size: 1.1 MiB |
BIN
tests/data/empty.rsrc
Normal file
After Width: | Height: | Size: 286 B |
BIN
tests/data/testfile.rsrc
Normal file
After Width: | Height: | Size: 558 B |
BIN
tests/data/unicode.textClipping.rsrc
Normal file
After Width: | Height: | Size: 602 B |
290
tests/test_rsrcfork.py
Normal file
@ -0,0 +1,290 @@
|
||||
import collections
|
||||
import io
|
||||
import pathlib
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import typing
|
||||
import unittest
|
||||
|
||||
import rsrcfork
|
||||
|
||||
RESOURCE_FORKS_SUPPORTED = sys.platform.startswith("darwin")
|
||||
RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE = "Resource forks are only supported on Mac"
|
||||
|
||||
DATA_DIR = pathlib.Path(__file__).parent / "data"
|
||||
EMPTY_RSRC_FILE = DATA_DIR / "empty.rsrc"
|
||||
TEXTCLIPPING_RSRC_FILE = DATA_DIR / "unicode.textClipping.rsrc"
|
||||
TESTFILE_RSRC_FILE = DATA_DIR / "testfile.rsrc"
|
||||
|
||||
COMPRESS_DATA_DIR = DATA_DIR / "compress"
|
||||
COMPRESSED_DIR = COMPRESS_DATA_DIR / "compressed"
|
||||
UNCOMPRESSED_DIR = COMPRESS_DATA_DIR / "uncompressed"
|
||||
COMPRESS_RSRC_FILE_NAMES = [
|
||||
"Finder.rsrc",
|
||||
"Finder Help.rsrc",
|
||||
# "Install.rsrc", # Commented out for performance - this file contains a lot of small resources.
|
||||
"System.rsrc",
|
||||
]
|
||||
|
||||
|
||||
def make_pascal_string(s):
|
||||
return bytes([len(s)]) + s
|
||||
|
||||
|
||||
UNICODE_TEXT = "Here is some text, including Üñïçø∂é!"
|
||||
DRAG_DATA = (
|
||||
b"\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03"
|
||||
b"utxt\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"utf8\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"TEXT\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
)
|
||||
TEXTCLIPPING_RESOURCES = collections.OrderedDict([
|
||||
(b"utxt", collections.OrderedDict([
|
||||
(256, UNICODE_TEXT.encode("utf-16-be")),
|
||||
])),
|
||||
(b"utf8", collections.OrderedDict([
|
||||
(256, UNICODE_TEXT.encode("utf-8")),
|
||||
])),
|
||||
(b"TEXT", collections.OrderedDict([
|
||||
(256, UNICODE_TEXT.encode("macroman")),
|
||||
])),
|
||||
(b"drag", collections.OrderedDict([
|
||||
(128, DRAG_DATA),
|
||||
]))
|
||||
])
|
||||
|
||||
TESTFILE_HEADER_SYSTEM_DATA = (
|
||||
b"\xa7F$\x08 <\x00\x00\xab\x03\xa7F <\x00\x00"
|
||||
b"\x01\x00\xb4\x88f\x06`\np\x00`\x06 <\x00\x00"
|
||||
b"\x08testfile\x00\x02\x00\x02\x00rs"
|
||||
b"rcRSED\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x02\x00rsrcRSED\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\xdaIp~\x00\x00\x00\x00\x00\x00\x02.\xfe\x84"
|
||||
)
|
||||
TESTFILE_HEADER_APPLICATION_DATA = b"This is the application-specific header data section. Apparently I can write whatever nonsense I want here. A few more bytes...."
|
||||
TESTFILE_RESOURCES = collections.OrderedDict([
|
||||
(b"STR ", collections.OrderedDict([
|
||||
(128, (
|
||||
None, rsrcfork.ResourceAttrs(0),
|
||||
make_pascal_string(b"The String, without name or attributes"),
|
||||
)),
|
||||
(129, (
|
||||
b"The Name", rsrcfork.ResourceAttrs(0),
|
||||
make_pascal_string(b"The String, with name and no attributes"),
|
||||
)),
|
||||
(130, (
|
||||
None, rsrcfork.ResourceAttrs.resProtected | rsrcfork.ResourceAttrs.resPreload,
|
||||
make_pascal_string(b"The String, without name but with attributes"),
|
||||
)),
|
||||
(131, (
|
||||
b"The Name with Attributes", rsrcfork.ResourceAttrs.resSysHeap,
|
||||
make_pascal_string(b"The String, with both name and attributes"),
|
||||
)),
|
||||
])),
|
||||
])
|
||||
|
||||
|
||||
class UnseekableStreamWrapper(io.BufferedIOBase):
|
||||
_wrapped: typing.BinaryIO
|
||||
|
||||
def __init__(self, wrapped: typing.BinaryIO) -> None:
|
||||
super().__init__()
|
||||
|
||||
self._wrapped = wrapped
|
||||
|
||||
def read(self, size: typing.Optional[int] = -1) -> bytes:
|
||||
return self._wrapped.read(size)
|
||||
|
||||
|
||||
def open_resource_fork(path: pathlib.Path, mode: str) -> typing.BinaryIO:
|
||||
return (path / "..namedfork" / "rsrc").open(mode)
|
||||
|
||||
|
||||
class ResourceFileReadTests(unittest.TestCase):
|
||||
def test_empty(self) -> None:
|
||||
with rsrcfork.open(EMPTY_RSRC_FILE, fork="data") as rf:
|
||||
self.assertEqual(rf.header_system_data, bytes(112))
|
||||
self.assertEqual(rf.header_application_data, bytes(128))
|
||||
self.assertEqual(rf.file_attributes, rsrcfork.ResourceFileAttrs(0))
|
||||
self.assertEqual(list(rf), [])
|
||||
|
||||
def internal_test_textclipping(self, rf: rsrcfork.ResourceFile) -> None:
|
||||
self.assertEqual(rf.header_system_data, bytes(112))
|
||||
self.assertEqual(rf.header_application_data, bytes(128))
|
||||
self.assertEqual(rf.file_attributes, rsrcfork.ResourceFileAttrs(0))
|
||||
self.assertEqual(list(rf), list(TEXTCLIPPING_RESOURCES))
|
||||
|
||||
for (actual_type, actual_reses), (expected_type, expected_reses) in zip(rf.items(), TEXTCLIPPING_RESOURCES.items()):
|
||||
with self.subTest(type=expected_type):
|
||||
self.assertEqual(actual_type, expected_type)
|
||||
self.assertEqual(list(actual_reses), list(expected_reses))
|
||||
|
||||
for (actual_id, actual_res), (expected_id, expected_data) in zip(actual_reses.items(), expected_reses.items()):
|
||||
with self.subTest(id=expected_id):
|
||||
self.assertEqual(actual_res.type, expected_type)
|
||||
self.assertEqual(actual_id, expected_id)
|
||||
self.assertEqual(actual_res.id, expected_id)
|
||||
self.assertEqual(actual_res.name, None)
|
||||
self.assertEqual(actual_res.attributes, rsrcfork.ResourceAttrs(0))
|
||||
self.assertEqual(actual_res.data, expected_data)
|
||||
self.assertEqual(actual_res.compressed_info, None)
|
||||
|
||||
def test_textclipping_seekable_stream(self) -> None:
|
||||
with TEXTCLIPPING_RSRC_FILE.open("rb") as f:
|
||||
with rsrcfork.ResourceFile(f) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
def test_textclipping_unseekable_stream(self) -> None:
|
||||
with TEXTCLIPPING_RSRC_FILE.open("rb") as f:
|
||||
with UnseekableStreamWrapper(f) as usf:
|
||||
with rsrcfork.ResourceFile(usf) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
def test_textclipping_path_data_fork(self) -> None:
|
||||
with rsrcfork.open(TEXTCLIPPING_RSRC_FILE, fork="data") as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
@unittest.skipUnless(RESOURCE_FORKS_SUPPORTED, RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE)
|
||||
def test_textclipping_path_resource_fork(self) -> None:
|
||||
with tempfile.NamedTemporaryFile() as tempf:
|
||||
with TEXTCLIPPING_RSRC_FILE.open("rb") as dataf:
|
||||
with open_resource_fork(pathlib.Path(tempf.name), "wb") as rsrcf:
|
||||
shutil.copyfileobj(dataf, rsrcf)
|
||||
|
||||
with rsrcfork.open(tempf.name, fork="rsrc") as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
@unittest.skipUnless(RESOURCE_FORKS_SUPPORTED, RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE)
|
||||
def test_textclipping_path_auto_resource_fork(self) -> None:
|
||||
with tempfile.NamedTemporaryFile() as temp_data_fork:
|
||||
with TEXTCLIPPING_RSRC_FILE.open("rb") as source_file:
|
||||
with open_resource_fork(pathlib.Path(temp_data_fork.name), "wb") as temp_rsrc_fork:
|
||||
shutil.copyfileobj(source_file, temp_rsrc_fork)
|
||||
|
||||
with self.subTest(data_fork="empty"):
|
||||
# Resource fork is selected when data fork is empty.
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
with self.subTest(data_fork="non-resource data"):
|
||||
# Resource fork is selected when data fork contains non-resource data.
|
||||
|
||||
temp_data_fork.write(b"This is the file's data fork. It should not be read, as the file has a resource fork.")
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
with self.subTest(data_fork="valid resource data"):
|
||||
# Resource fork is selected even when data fork contains valid resource data.
|
||||
|
||||
with EMPTY_RSRC_FILE.open("rb") as source_file:
|
||||
shutil.copyfileobj(source_file, temp_data_fork)
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
@unittest.skipUnless(RESOURCE_FORKS_SUPPORTED, RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE)
|
||||
def test_textclipping_path_auto_data_fork(self) -> None:
|
||||
with tempfile.NamedTemporaryFile() as temp_data_fork:
|
||||
with TEXTCLIPPING_RSRC_FILE.open("rb") as source_file:
|
||||
shutil.copyfileobj(source_file, temp_data_fork)
|
||||
# Have to flush the temporary file manually so that the data is visible to the other reads below.
|
||||
# Normally this happens automatically as part of the close method, but that would also delete the temporary file, which we don't want.
|
||||
temp_data_fork.flush()
|
||||
|
||||
with self.subTest(rsrc_fork="nonexistant"):
|
||||
# Data fork is selected when resource fork does not exist.
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
with self.subTest(rsrc_fork="empty"):
|
||||
# Data fork is selected when resource fork exists, but is empty.
|
||||
|
||||
with open_resource_fork(pathlib.Path(temp_data_fork.name), "wb") as temp_rsrc_fork:
|
||||
temp_rsrc_fork.write(b"")
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
with self.subTest(rsrc_fork="non-resource data"):
|
||||
# Data fork is selected when resource fork contains non-resource data.
|
||||
|
||||
with open_resource_fork(pathlib.Path(temp_data_fork.name), "wb") as temp_rsrc_fork:
|
||||
temp_rsrc_fork.write(b"This is the file's resource fork. It contains junk, so it should be ignored in favor of the data fork.")
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
def test_testfile(self) -> None:
|
||||
with rsrcfork.open(TESTFILE_RSRC_FILE, fork="data") as rf:
|
||||
self.assertEqual(rf.header_system_data, TESTFILE_HEADER_SYSTEM_DATA)
|
||||
self.assertEqual(rf.header_application_data, TESTFILE_HEADER_APPLICATION_DATA)
|
||||
self.assertEqual(rf.file_attributes, rsrcfork.ResourceFileAttrs.mapPrinterDriverMultiFinderCompatible | rsrcfork.ResourceFileAttrs.mapReadOnly)
|
||||
self.assertEqual(list(rf), list(TESTFILE_RESOURCES))
|
||||
|
||||
for (actual_type, actual_reses), (expected_type, expected_reses) in zip(rf.items(), TESTFILE_RESOURCES.items()):
|
||||
with self.subTest(type=expected_type):
|
||||
self.assertEqual(actual_type, expected_type)
|
||||
self.assertEqual(list(actual_reses), list(expected_reses))
|
||||
|
||||
for (actual_id, actual_res), (expected_id, (expected_name, expected_attrs, expected_data)) in zip(actual_reses.items(), expected_reses.items()):
|
||||
with self.subTest(id=expected_id):
|
||||
self.assertEqual(actual_res.type, expected_type)
|
||||
self.assertEqual(actual_id, expected_id)
|
||||
self.assertEqual(actual_res.id, expected_id)
|
||||
self.assertEqual(actual_res.name, expected_name)
|
||||
self.assertEqual(actual_res.attributes, expected_attrs)
|
||||
self.assertEqual(actual_res.data, expected_data)
|
||||
self.assertEqual(actual_res.compressed_info, None)
|
||||
|
||||
def test_compress_compare(self) -> None:
|
||||
# This test goes through pairs of resource files: one original file with both compressed and uncompressed resources, and one modified file where all compressed resources have been decompressed (using ResEdit on System 7.5.5).
|
||||
# It checks that the rsrcfork library performs automatic decompression on the compressed resources, so that the compressed resource file appears to the user like the uncompressed resource file (ignoring resource order, which was lost during decompression using ResEdit).
|
||||
|
||||
for name in COMPRESS_RSRC_FILE_NAMES:
|
||||
with self.subTest(name=name):
|
||||
with rsrcfork.open(COMPRESSED_DIR / name, fork="data") as compressed_rf, rsrcfork.open(UNCOMPRESSED_DIR / name, fork="data") as uncompressed_rf:
|
||||
self.assertEqual(sorted(compressed_rf), sorted(uncompressed_rf))
|
||||
|
||||
for (compressed_type, compressed_reses), (uncompressed_type, uncompressed_reses) in zip(sorted(compressed_rf.items()), sorted(uncompressed_rf.items())):
|
||||
with self.subTest(type=compressed_type):
|
||||
self.assertEqual(compressed_type, uncompressed_type)
|
||||
self.assertEqual(sorted(compressed_reses), sorted(uncompressed_reses))
|
||||
|
||||
for (compressed_id, compressed_res), (uncompressed_id, uncompressed_res) in zip(sorted(compressed_reses.items()), sorted(uncompressed_reses.items())):
|
||||
with self.subTest(id=compressed_id):
|
||||
# The metadata of the compressed and uncompressed resources must match.
|
||||
self.assertEqual(compressed_res.type, uncompressed_res.type)
|
||||
self.assertEqual(compressed_id, uncompressed_id)
|
||||
self.assertEqual(compressed_res.id, compressed_id)
|
||||
self.assertEqual(compressed_res.id, uncompressed_res.id)
|
||||
self.assertEqual(compressed_res.name, uncompressed_res.name)
|
||||
self.assertEqual(compressed_res.attributes & ~rsrcfork.ResourceAttrs.resCompressed, uncompressed_res.attributes)
|
||||
|
||||
# The uncompressed resource really has to be not compressed.
|
||||
self.assertNotIn(rsrcfork.ResourceAttrs.resCompressed, uncompressed_res.attributes)
|
||||
self.assertEqual(uncompressed_res.compressed_info, None)
|
||||
self.assertEqual(uncompressed_res.data, uncompressed_res.data_raw)
|
||||
self.assertEqual(uncompressed_res.length, uncompressed_res.length_raw)
|
||||
|
||||
# The compressed resource's (automatically decompressed) data must match the uncompressed data.
|
||||
self.assertEqual(compressed_res.data, uncompressed_res.data)
|
||||
self.assertEqual(compressed_res.length, uncompressed_res.length)
|
||||
|
||||
if rsrcfork.ResourceAttrs.resCompressed in compressed_res.attributes:
|
||||
# Resources with the compressed attribute must expose correct compression metadata.
|
||||
self.assertNotEqual(compressed_res.compressed_info, None)
|
||||
self.assertEqual(compressed_res.compressed_info.decompressed_length, compressed_res.length)
|
||||
else:
|
||||
# Some resources in the "compressed" files are not actually compressed, in which case there is no compression metadata.
|
||||
self.assertEqual(compressed_res.compressed_info, None)
|
||||
self.assertEqual(compressed_res.data, compressed_res.data_raw)
|
||||
self.assertEqual(compressed_res.length, compressed_res.length_raw)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
27
tox.ini
Normal file
@ -0,0 +1,27 @@
|
||||
[tox]
|
||||
# When adding a new Python version here, please also update the list of Python versions called by the GitHub Actions workflow (.github/workflows/ci.yml).
|
||||
envlist = py{36,37,38},flake8,mypy,package
|
||||
|
||||
[testenv]
|
||||
commands = python -m unittest discover --start-directory ./tests
|
||||
|
||||
[testenv:flake8]
|
||||
deps =
|
||||
flake8 >= 3.8.0
|
||||
flake8-bugbear
|
||||
flake8-tabs
|
||||
commands = flake8
|
||||
|
||||
[testenv:mypy]
|
||||
deps =
|
||||
mypy
|
||||
commands = mypy
|
||||
|
||||
[testenv:package]
|
||||
deps =
|
||||
twine
|
||||
wheel >= 0.32.0
|
||||
|
||||
commands =
|
||||
python setup.py sdist bdist_wheel
|
||||
twine check dist/*
|