47 Commits

Author SHA1 Message Date
82b5926b4f Release version 1.8.0 2020-07-18 17:31:25 +02:00
5456013bf4 Use flake8 extend-exclude setting instead of exclude
This setting was added in flake8 3.8.0 and allows adding entries to the
exclude list without also removing the default entries.
2020-07-18 13:40:15 +02:00
b595456a05 Switch back to using attr directive in setup.cfg version
As of setuptools 46.4.0, this extracts the attribute value statically
using the ast module, if possible. This allows it to work properly even
if the attribute is stored in a file that cannot be imported at setup
time (e. g. because of dependencies that might not be installed yet).
2020-07-18 13:13:19 +02:00
d367a9238a Add bytes_quote helper function
bytes_quote does the same as bytes_escape, but automatically adds the
quote character around the escaped string.
2020-07-07 01:03:38 +02:00
33c4016124 Fix flake8 problems 2020-07-07 00:04:54 +02:00
b01cfc77cf Don't pass required=True to add_subparsers
The required kwarg of add_subparsers was only added in Python 3.7, and
we currently still support Python 3.6.
2020-07-07 00:01:57 +02:00
a9f54b678c Add py.typed marker file for PEP 561
This allows type checkers like mypy to use the type hints in our code
when type-checking another project that imports this library.
2020-07-06 23:57:25 +02:00
b46018e666 Use is_printable in definition of _TRANSLATE_NONPRINTABLES 2020-07-06 18:01:34 +02:00
b0eefe3889 Replace custom CLI subcommand system with standard argparse subparsers
This is a purely internal change and should have no visible effect on
the command-line interface.
2020-07-05 19:43:01 +02:00
3e0bbcee04 Add Python 3.8 classifier to metadata 2020-04-19 16:21:25 +02:00
13654c2560 Add pyproject.toml with PEP 517/PEP 518 metadata 2020-04-19 16:21:04 +02:00
d5199bd503 Replace setup.cfg metadata license_file with license_files
license_file has been deprecated in wheel 0.32.0 in favor of
license_files.
2020-04-19 16:20:07 +02:00
c5c3f24a10 Add tox environment for building and checking distributions 2020-04-19 16:16:21 +02:00
7c77c4ef20 Prepare setup.py/.cfg for additional import-time dependencies
Reading the version number using attr: rsrcfork.__version__ will no
longer work properly if rsrcfork has non-stdlib dependencies at import
time, because setuptools needs to be able to import rsrcfork and read
the version number before the dependencies are installed.

As a workaround, our setup.py now manually parses the version number
from rsrcfork/__init__.py using the ast module.
2020-04-03 22:32:10 +02:00
f7b6080c0e Remove random execute bits from test data files 2020-04-01 00:01:50 +02:00
007d15eb3d Fix tox configuration breaking on spaces in the project path
The {envpython} substitution is not quoted, so spaces in the path are
treated as argument separators and cause the test runs to fail.
To work around this, we now always use an unqualified python command
instead of the {envpython} substitution. This is safe because the tox
commands are always run in a virtual environment, so the python command
is guaranteed to point to the environment's Python and not the system
default.
2020-03-30 01:46:10 +02:00
246b69e375 Remove accidental empty comment from test_rsrcfork.py 2020-01-21 22:32:44 +01:00
d67ff64851 Add tests for reading from resource forks and fork auto-selection
These tests are only run on Mac, because they require native support
for resource forks.
2020-01-21 22:29:18 +01:00
5391d66a78 Add tests for reading resource files from streams instead of path 2020-01-21 15:20:46 +01:00
5b2700bf17 Add some missing asserts to test_compress_compare 2020-01-19 23:24:52 +01:00
c41b25fea1 Add test case for compressed resource handling and decompression 2020-01-19 23:19:19 +01:00
a45dbd8eca Remove upgrade of pip from CI workflow
The GitHub Actions environment clearly has a working pip pre-installed,
and it's unlikely that this project relies on any extremely new
features.
2020-01-19 19:59:42 +01:00
3401ce65dd Update actions/checkout to v2 2020-01-19 19:38:29 +01:00
890dd24f76 Also run CI workflow on pull requests 2020-01-19 19:36:53 +01:00
67c2b4acf0 Add test case for additional resource file and resource metadata 2020-01-19 19:22:59 +01:00
238c78a73e Simplify attribute asserts in tests 2020-01-19 19:05:05 +01:00
fbd861edf4 Fix test_textclipping not checking resource ID lists properly
Because Python's zip terminates once *any* of the input iterables
terminates, the previous code would not detect if the file was missing
resources or contained extra ones.
2020-01-19 02:30:10 +01:00
a7a407a1dd Add extra assertion to test_textclipping 2019-12-30 03:04:48 +01:00
ecee2616cf Add flake8-bugbear plugin 2019-12-30 03:04:27 +01:00
ba284d1800 Fix a bunch of flake8 violations 2019-12-30 03:00:12 +01:00
f690caac24 Add flake8 configuration 2019-12-30 02:57:31 +01:00
3a805c3e56 Add GitHub Actions workflow for CI 2019-12-30 01:59:05 +01:00
6adf8eb88d Fix mypy errors about byte strings as format string parameters 2019-12-30 01:48:33 +01:00
e132a91dea Fix missing sys.exit calls in CLI subcommand functions 2019-12-30 01:48:33 +01:00
4e1cd05412 Fix miscellaneous mypy errors 2019-12-30 01:48:33 +01:00
1a416defed Add tox configuration 2019-12-30 01:48:33 +01:00
1089a19c01 Add basic unit tests 2019-12-29 00:39:40 +01:00
8fc24040ea Add resource-info subcommand 2019-12-26 01:58:23 +01:00
d492d9a6a8 Remove an incorrect assertion from describe_resource
red.compressed_info can be None here if decompress is False.
2019-12-26 01:50:34 +01:00
d0e1eaf262 Add raw-compress-info subcommand (#6) 2019-12-26 00:34:27 +01:00
1e55569442 Add support for passing filters to the list subcommand 2019-12-25 01:47:03 +01:00
2abf6e2a06 Add class for resource filters in place of lambdas
This is easier to debug (printing out a lambda doesn't show what values
it checks against) and makes it easier to check that the filter values
are valid.
2019-12-25 00:15:35 +01:00
2b0bbb19ed Refactor filter_resources in __main__
With the new implementation, each filter is converted to a function,
then all resources are checked if they match any of the filter
functions. This is simpler than the old implementation, where the
resource lookup code was slightly different for some filter forms.
2019-12-25 00:15:35 +01:00
c009e8f80f Support passing an empty filter list to filter_resources 2019-12-25 00:15:35 +01:00
d67641d537 Remove compatibility code for old CLI syntax 2019-12-25 00:15:30 +01:00
d6dbfdb149 Fix version number in changelog 2019-12-17 12:17:31 +01:00
b2502c48a2 Bump version to 1.7.1.dev 2019-12-17 12:16:39 +01:00
29 changed files with 856 additions and 465 deletions

View File

@ -8,3 +8,7 @@ insert_final_newline = true
[*.rst]
indent_style = space
indent_size = 4
[*.yml]
indent_style = space
indent_size = 2

20
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,20 @@
on: [pull_request, push]
jobs:
test:
strategy:
matrix:
platform: [macos-latest, ubuntu-latest, windows-latest]
runs-on: ${{ matrix.platform }}
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: "3.6"
- uses: actions/setup-python@v1
with:
python-version: "3.7"
- uses: actions/setup-python@v1
with:
python-version: "3.8"
- run: python -m pip install --upgrade tox
- run: tox

3
.gitignore vendored
View File

@ -2,6 +2,9 @@
*.py[co]
__pycache__/
# tox
.tox/
# setuptools
*.egg-info/
build/

6
MANIFEST.in Normal file
View File

@ -0,0 +1,6 @@
# Note: See the PyPA documentation for a list of file names that are included/excluded by default:
# https://packaging.python.org/guides/using-manifest-in/#how-files-are-included-in-an-sdist
# Please only add entries here for files that are *not* already handled by default.
recursive-include tests *.py
recursive-include tests/data *.rsrc

View File

@ -113,8 +113,18 @@ For technical info and documentation about resource files and resources, see the
Changelog
---------
Version 1.7
^^^^^^^^^^^
Version 1.8.0
^^^^^^^^^^^^^
* Removed the old (non-subcommand-based) CLI syntax.
* Added filtering support to the ``list`` subcommand.
* Added a ``resource-info`` subcommand to display technical information about resources (more detailed than what is displayed by ``list`` and ``read``).
* Added a ``raw-compress-info`` subcommand to display technical header information about standalone compressed resource data.
* Made the library PEP 561-compliant by adding a py.typed file.
* Fixed an incorrect ``AssertionError`` when using the ``--no-decompress`` command-line options.
Version 1.7.0
^^^^^^^^^^^^^
* Added a ``raw-decompress`` subcommand to decompress compressed resource data stored in a standalone file rather than as a resource.
* Optimized lazy loading of ``Resource`` objects. Previously, resource data would be read from disk whenever a ``Resource`` object was looked up, even if the data itself is never used. Now the resource data is only loaded once the ``data`` (or ``data_raw``) attribute is accessed.

6
pyproject.toml Normal file
View File

@ -0,0 +1,6 @@
[build-system]
requires = [
"setuptools >= 46.4.0",
"wheel >= 0.32.0",
]
build-backend = "setuptools.build_meta"

View File

@ -20,7 +20,7 @@
# * Add a new empty section for the next version to the README.rst changelog.
# * Commit and push the changes to master.
__version__ = "1.7.0"
__version__ = "1.8.0"
__all__ = [
"Resource",
@ -31,8 +31,8 @@ __all__ = [
"open",
]
from . import api, compress
from .api import Resource, ResourceAttrs, ResourceFile, ResourceFileAttrs
from . import compress
# noinspection PyShadowingBuiltins
open = ResourceFile.open

File diff suppressed because it is too large Load Diff

View File

@ -59,9 +59,11 @@ STRUCT_RESOURCE_REFERENCE = struct.Struct(">hHI4x")
# 1 byte: Length of following resource name.
STRUCT_RESOURCE_NAME_HEADER = struct.Struct(">B")
class InvalidResourceFileError(Exception):
pass
class ResourceFileAttrs(enum.Flag):
"""Resource file attribute flags. The descriptions for these flags are taken from comments on the map*Bit and map* enum constants in <CarbonCore/Resources.h>."""
@ -82,6 +84,7 @@ class ResourceFileAttrs(enum.Flag):
_BIT_1 = 1 << 1
_BIT_0 = 1 << 0
class ResourceAttrs(enum.Flag):
"""Resource attribute flags. The descriptions for these flags are taken from comments on the res*Bit and res* enum constants in <CarbonCore/Resources.h>."""
@ -94,6 +97,7 @@ class ResourceAttrs(enum.Flag):
resChanged = 1 << 1 # "Existing resource changed since last update", "Resource changed?"
resCompressed = 1 << 0 # "indicates that the resource data is compressed" (only documented in https://github.com/kreativekorp/ksfl/wiki/Macintosh-Resource-File-Format)
class Resource(object):
"""A single resource from a resource file."""
@ -133,14 +137,14 @@ class Resource(object):
decompress_ok = True
if len(data) > 32:
data_repr = f"<{len(data)} bytes: {data[:32]}...>"
data_repr = f"<{len(data)} bytes: {data[:32]!r}...>"
else:
data_repr = repr(data)
if not decompress_ok:
data_repr = f"<decompression failed - compressed data: {data_repr}>"
return f"<{type(self).__qualname__} type {self.type}, id {self.id}, name {self.name}, attributes {self.attributes}, data {data_repr}>"
return f"<{type(self).__qualname__} type {self.type!r}, id {self.id}, name {self.name!r}, attributes {self.attributes}, data {data_repr}>"
@property
def resource_type(self) -> bytes:
@ -229,6 +233,7 @@ class Resource(object):
else:
return self.data_raw
class _LazyResourceMap(typing.Mapping[int, Resource]):
"""Internal class: Read-only wrapper for a mapping of resource IDs to resource objects.
@ -272,7 +277,8 @@ class _LazyResourceMap(typing.Mapping[int, Resource]):
else:
contents = f"{len(self)} resources with IDs {list(self)}"
return f"<Resource map for type {self.type}, containing {contents}>"
return f"<Resource map for type {self.type!r}, containing {contents}>"
class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.ContextManager["ResourceFile"]):
"""A resource file reader operating on a byte stream."""
@ -295,7 +301,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
_references: typing.MutableMapping[bytes, typing.MutableMapping[int, Resource]]
@classmethod
def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str="auto", **kwargs: typing.Any) -> "ResourceFile":
def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str = "auto", **kwargs: typing.Any) -> "ResourceFile":
"""Open the file at the given path as a ResourceFile.
The fork parameter controls which fork of the file the resource data will be read from. It accepts the following values:
@ -354,7 +360,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
else:
raise ValueError(f"Unsupported value for the fork parameter: {fork!r}")
def __init__(self, stream: typing.BinaryIO, *, close: bool=False) -> None:
def __init__(self, stream: typing.BinaryIO, *, close: bool = False) -> None:
"""Create a ResourceFile wrapping the given byte stream.
To read resource file data from a bytes object, wrap it in an io.BytesIO.

View File

@ -5,10 +5,12 @@ from . import dcmp0
from . import dcmp1
from . import dcmp2
from .common import DecompressError, CompressedHeaderInfo
from .common import DecompressError, CompressedHeaderInfo, CompressedType8HeaderInfo, CompressedType9HeaderInfo
__all__ = [
"CompressedHeaderInfo",
"CompressedType8HeaderInfo",
"CompressedType9HeaderInfo",
"DecompressError",
"decompress",
"decompress_parsed",
@ -26,7 +28,7 @@ DECOMPRESSORS = {
}
def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Decompress compressed resource data from a stream, whose header has already been read and parsed into a CompressedHeaderInfo object."""
try:
@ -42,12 +44,14 @@ def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.B
if decompressed_length != header_info.decompressed_length:
raise DecompressError(f"Actual length of decompressed data ({decompressed_length}) does not match length stored in resource ({header_info.decompressed_length})")
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool = False) -> bytes:
"""Decompress the given compressed resource data, whose header has already been removed and parsed into a CompressedHeaderInfo object."""
return b"".join(decompress_stream_parsed(header_info, io.BytesIO(data), debug=debug))
def decompress_stream(stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream(stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Decompress compressed resource data from a stream."""
header_info = CompressedHeaderInfo.parse_stream(stream)
@ -57,7 +61,8 @@ def decompress_stream(stream: typing.BinaryIO, *, debug: bool=False) -> typing.I
yield from decompress_stream_parsed(header_info, stream, debug=debug)
def decompress(data: bytes, *, debug: bool=False) -> bytes:
def decompress(data: bytes, *, debug: bool = False) -> bytes:
"""Decompress the given compressed resource data."""
return b"".join(decompress_stream(io.BytesIO(data), debug=debug))

View File

@ -41,9 +41,9 @@ class CompressedHeaderInfo(object):
try:
signature, header_length, compression_type, decompressed_length, remainder = STRUCT_COMPRESSED_HEADER.unpack(stream.read(STRUCT_COMPRESSED_HEADER.size))
except struct.error:
raise DecompressError(f"Invalid header")
raise DecompressError("Invalid header")
if signature != COMPRESSED_SIGNATURE:
raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE}")
raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE!r}")
if header_length != 0x12:
raise DecompressError(f"Unsupported header length: 0x{header_length:>04x}, expected 0x12")
@ -112,9 +112,14 @@ if typing.TYPE_CHECKING:
The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable.
"""
def readable(self) -> bool: ...
def read(self, size: typing.Optional[int] = ...) -> bytes: ...
def peek(self, size: int = ...) -> bytes: ...
def readable(self) -> bool:
...
def read(self, size: typing.Optional[int] = ...) -> bytes:
...
def peek(self, size: int = ...) -> bytes:
...
class _PeekableIOWrapper(object):
@ -165,7 +170,8 @@ def make_peekable(stream: typing.BinaryIO) -> "PeekableIO":
if hasattr(stream, "peek"):
# Stream is already peekable, nothing to be done.
return typing.cast("PeekableIO", stream)
elif isinstance(stream, io.RawIOBase):
elif not typing.TYPE_CHECKING and isinstance(stream, io.RawIOBase):
# This branch is skipped when type checking - mypy incorrectly warns about this code being unreachable, because it thinks that a typing.BinaryIO cannot be an instance of io.RawIOBase.
# Raw IO streams can be wrapped efficiently using BufferedReader.
return io.BufferedReader(stream)
else:
@ -181,6 +187,7 @@ def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes:
raise DecompressError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes")
return data
def read_variable_length_integer(stream: typing.BinaryIO) -> int:
"""Read a variable-length integer from the stream.

View File

@ -1,4 +1,3 @@
import io
import typing
from . import common
@ -39,7 +38,7 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
assert len(TABLE) == len(range(0x4b, 0xfe))
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
if not isinstance(header_info, common.CompressedType8HeaderInfo):
@ -111,7 +110,7 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
# Compact representation of (part of) a segment loader jump table, as used in 'CODE' (0) resources.
if debug:
print(f"Segment loader jump table entries")
print("Segment loader jump table entries")
# All generated jump table entries have the same segment number.
segment_number_int = common.read_variable_length_integer(stream)
@ -169,13 +168,13 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
raise common.DecompressError(f"Repeat count must be positive: {count}")
if debug:
print(f"\t-> {to_repeat} * {count}")
print(f"\t-> {to_repeat!r} * {count}")
yield to_repeat * count
elif kind == 0x04:
# A sequence of 16-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
if debug:
print(f"Difference-encoded 16-bit integers")
print("Difference-encoded 16-bit integers")
# The first integer is stored explicitly, as a signed value.
initial_int = common.read_variable_length_integer(stream)
@ -207,7 +206,7 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
# A sequence of 32-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
if debug:
print(f"Difference-encoded 32-bit integers")
print("Difference-encoded 32-bit integers")
# The first integer is stored explicitly, as a signed value.
initial_int = common.read_variable_length_integer(stream)
@ -243,18 +242,19 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
# Check that there really is no more data left.
extra = stream.read(1)
if extra:
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra})")
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra!r})")
break
else:
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (0)."""
decompressed_length = 0
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
if debug:
print(f"\t-> {chunk}")
print(f"\t-> {chunk!r}")
if header_info.decompressed_length % 2 != 0 and decompressed_length + len(chunk) == header_info.decompressed_length + 1:
# Special case: if the decompressed data length stored in the header is odd and one less than the length of the actual decompressed data, drop the last byte.

View File

@ -1,4 +1,3 @@
import io
import typing
from . import common
@ -22,7 +21,7 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
assert len(TABLE) == len(range(0xd5, 0xfe))
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
if not isinstance(header_info, common.CompressedType8HeaderInfo):
@ -112,7 +111,7 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
raise common.DecompressError(f"Repeat count must be positive: {count}")
if debug:
print(f"\t-> {to_repeat} * {count}")
print(f"\t-> {to_repeat!r} * {count}")
yield to_repeat * count
else:
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
@ -124,18 +123,19 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
# Check that there really is no more data left.
extra = stream.read(1)
if extra:
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra})")
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra!r})")
break
else:
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (1)."""
decompressed_length = 0
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
if debug:
print(f"\t-> {chunk}")
print(f"\t-> {chunk!r}")
decompressed_length += len(chunk)
yield chunk

View File

@ -1,5 +1,4 @@
import enum
import io
import struct
import typing
@ -74,7 +73,7 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool
)
def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]:
def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
while True: # Loop is terminated when EOF is reached.
table_index_data = stream.read(1)
if not table_index_data:
@ -83,17 +82,18 @@ def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int,
elif not stream.peek(1) and decompressed_length % 2 != 0:
# Special case: if we are at the last byte of the compressed data, and the decompressed data has an odd length, the last byte is a single literal byte, and not a table reference.
if debug:
print(f"Last byte: {table_index_data}")
print(f"Last byte: {table_index_data!r}")
yield table_index_data
break
# Compressed data is untagged, every byte is a table reference.
(table_index,) = table_index_data
if debug:
print(f"Reference: {table_index} -> {table[table_index]}")
print(f"Reference: {table_index} -> {table[table_index]!r}")
yield table[table_index]
def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]:
def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
while True: # Loop is terminated when EOF is reached.
tag_data = stream.read(1)
if not tag_data:
@ -102,7 +102,7 @@ def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, ta
elif not stream.peek(1) and decompressed_length % 2 != 0:
# Special case: if we are at the last byte of the compressed data, and the decompressed data has an odd length, the last byte is a single literal byte, and not a tag or a table reference.
if debug:
print(f"Last byte: {tag_data}")
print(f"Last byte: {tag_data!r}")
yield tag_data
break
@ -119,7 +119,7 @@ def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, ta
break
(table_index,) = table_index_data
if debug:
print(f"Reference: {table_index} -> {table[table_index]}")
print(f"Reference: {table_index} -> {table[table_index]!r}")
yield table[table_index]
else:
# This is a literal (two uncompressed bytes that are literally copied into the output).
@ -129,11 +129,11 @@ def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, ta
break
# Note: the literal may be only a single byte long if it is located exactly at EOF. This is intended and expected - the 1-byte literal is yielded normally, and on the next iteration, decompression is terminated as EOF is detected.
if debug:
print(f"Literal: {literal}")
print(f"Literal: {literal!r}")
yield literal
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (2)."""
if not isinstance(header_info, common.CompressedType9HeaderInfo):

0
rsrcfork/py.typed Normal file
View File

View File

@ -18,8 +18,10 @@ classifiers =
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
license = MIT
license_file = LICENSE
license_files =
LICENSE
description = A pure Python, cross-platform library/tool for reading Macintosh resource data, as stored in resource forks and ``.rsrc`` files
long_description = file: README.rst
long_description_content_type = text/x-rst
@ -33,11 +35,16 @@ keywords =
macos
[options]
setup_requires =
setuptools>=39.2.0
# mypy can only find type hints in the package if zip_safe is set to False,
# see https://mypy.readthedocs.io/en/latest/installed_packages.html#making-pep-561-compatible-packages
zip_safe = False
python_requires = >=3.6
packages = find:
[options.package_data]
rsrcfork =
py.typed
[options.packages.find]
include =
rsrcfork
@ -47,6 +54,25 @@ include =
console_scripts =
rsrcfork = rsrcfork.__main__:main
[flake8]
extend-exclude =
.mypy_cache/,
build/,
dist/,
# The following issues are ignored because they do not match our code style:
ignore =
E226, # missing whitespace around arithmetic operator
E261, # at least two spaces before inline comment
E501, # line too long
W293, # blank line contains whitespace
W503, # line break before binary operator
# flake8-tabs configuration
use-flake8-tabs = true
blank-lines-indent = always
indent-tabs-def = 1
[mypy]
files=rsrcfork/**/*.py
python_version = 3.6

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 355 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 127 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 884 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 478 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 159 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

BIN
tests/data/empty.rsrc Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 286 B

BIN
tests/data/testfile.rsrc Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 558 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 602 B

290
tests/test_rsrcfork.py Normal file
View File

@ -0,0 +1,290 @@
import collections
import io
import pathlib
import shutil
import sys
import tempfile
import typing
import unittest
import rsrcfork
RESOURCE_FORKS_SUPPORTED = sys.platform.startswith("darwin")
RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE = "Resource forks are only supported on Mac"
DATA_DIR = pathlib.Path(__file__).parent / "data"
EMPTY_RSRC_FILE = DATA_DIR / "empty.rsrc"
TEXTCLIPPING_RSRC_FILE = DATA_DIR / "unicode.textClipping.rsrc"
TESTFILE_RSRC_FILE = DATA_DIR / "testfile.rsrc"
COMPRESS_DATA_DIR = DATA_DIR / "compress"
COMPRESSED_DIR = COMPRESS_DATA_DIR / "compressed"
UNCOMPRESSED_DIR = COMPRESS_DATA_DIR / "uncompressed"
COMPRESS_RSRC_FILE_NAMES = [
"Finder.rsrc",
"Finder Help.rsrc",
# "Install.rsrc", # Commented out for performance - this file contains a lot of small resources.
"System.rsrc",
]
def make_pascal_string(s):
return bytes([len(s)]) + s
UNICODE_TEXT = "Here is some text, including Üñïçø∂é!"
DRAG_DATA = (
b"\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03"
b"utxt\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"utf8\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"TEXT\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
TEXTCLIPPING_RESOURCES = collections.OrderedDict([
(b"utxt", collections.OrderedDict([
(256, UNICODE_TEXT.encode("utf-16-be")),
])),
(b"utf8", collections.OrderedDict([
(256, UNICODE_TEXT.encode("utf-8")),
])),
(b"TEXT", collections.OrderedDict([
(256, UNICODE_TEXT.encode("macroman")),
])),
(b"drag", collections.OrderedDict([
(128, DRAG_DATA),
]))
])
TESTFILE_HEADER_SYSTEM_DATA = (
b"\xa7F$\x08 <\x00\x00\xab\x03\xa7F <\x00\x00"
b"\x01\x00\xb4\x88f\x06`\np\x00`\x06 <\x00\x00"
b"\x08testfile\x00\x02\x00\x02\x00rs"
b"rcRSED\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x02\x00rsrcRSED\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\xdaIp~\x00\x00\x00\x00\x00\x00\x02.\xfe\x84"
)
TESTFILE_HEADER_APPLICATION_DATA = b"This is the application-specific header data section. Apparently I can write whatever nonsense I want here. A few more bytes...."
TESTFILE_RESOURCES = collections.OrderedDict([
(b"STR ", collections.OrderedDict([
(128, (
None, rsrcfork.ResourceAttrs(0),
make_pascal_string(b"The String, without name or attributes"),
)),
(129, (
b"The Name", rsrcfork.ResourceAttrs(0),
make_pascal_string(b"The String, with name and no attributes"),
)),
(130, (
None, rsrcfork.ResourceAttrs.resProtected | rsrcfork.ResourceAttrs.resPreload,
make_pascal_string(b"The String, without name but with attributes"),
)),
(131, (
b"The Name with Attributes", rsrcfork.ResourceAttrs.resSysHeap,
make_pascal_string(b"The String, with both name and attributes"),
)),
])),
])
class UnseekableStreamWrapper(io.BufferedIOBase):
_wrapped: typing.BinaryIO
def __init__(self, wrapped: typing.BinaryIO) -> None:
super().__init__()
self._wrapped = wrapped
def read(self, size: typing.Optional[int] = -1) -> bytes:
return self._wrapped.read(size)
def open_resource_fork(path: pathlib.Path, mode: str) -> typing.BinaryIO:
return (path / "..namedfork" / "rsrc").open(mode)
class ResourceFileReadTests(unittest.TestCase):
def test_empty(self) -> None:
with rsrcfork.open(EMPTY_RSRC_FILE, fork="data") as rf:
self.assertEqual(rf.header_system_data, bytes(112))
self.assertEqual(rf.header_application_data, bytes(128))
self.assertEqual(rf.file_attributes, rsrcfork.ResourceFileAttrs(0))
self.assertEqual(list(rf), [])
def internal_test_textclipping(self, rf: rsrcfork.ResourceFile) -> None:
self.assertEqual(rf.header_system_data, bytes(112))
self.assertEqual(rf.header_application_data, bytes(128))
self.assertEqual(rf.file_attributes, rsrcfork.ResourceFileAttrs(0))
self.assertEqual(list(rf), list(TEXTCLIPPING_RESOURCES))
for (actual_type, actual_reses), (expected_type, expected_reses) in zip(rf.items(), TEXTCLIPPING_RESOURCES.items()):
with self.subTest(type=expected_type):
self.assertEqual(actual_type, expected_type)
self.assertEqual(list(actual_reses), list(expected_reses))
for (actual_id, actual_res), (expected_id, expected_data) in zip(actual_reses.items(), expected_reses.items()):
with self.subTest(id=expected_id):
self.assertEqual(actual_res.type, expected_type)
self.assertEqual(actual_id, expected_id)
self.assertEqual(actual_res.id, expected_id)
self.assertEqual(actual_res.name, None)
self.assertEqual(actual_res.attributes, rsrcfork.ResourceAttrs(0))
self.assertEqual(actual_res.data, expected_data)
self.assertEqual(actual_res.compressed_info, None)
def test_textclipping_seekable_stream(self) -> None:
with TEXTCLIPPING_RSRC_FILE.open("rb") as f:
with rsrcfork.ResourceFile(f) as rf:
self.internal_test_textclipping(rf)
def test_textclipping_unseekable_stream(self) -> None:
with TEXTCLIPPING_RSRC_FILE.open("rb") as f:
with UnseekableStreamWrapper(f) as usf:
with rsrcfork.ResourceFile(usf) as rf:
self.internal_test_textclipping(rf)
def test_textclipping_path_data_fork(self) -> None:
with rsrcfork.open(TEXTCLIPPING_RSRC_FILE, fork="data") as rf:
self.internal_test_textclipping(rf)
@unittest.skipUnless(RESOURCE_FORKS_SUPPORTED, RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE)
def test_textclipping_path_resource_fork(self) -> None:
with tempfile.NamedTemporaryFile() as tempf:
with TEXTCLIPPING_RSRC_FILE.open("rb") as dataf:
with open_resource_fork(pathlib.Path(tempf.name), "wb") as rsrcf:
shutil.copyfileobj(dataf, rsrcf)
with rsrcfork.open(tempf.name, fork="rsrc") as rf:
self.internal_test_textclipping(rf)
@unittest.skipUnless(RESOURCE_FORKS_SUPPORTED, RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE)
def test_textclipping_path_auto_resource_fork(self) -> None:
with tempfile.NamedTemporaryFile() as temp_data_fork:
with TEXTCLIPPING_RSRC_FILE.open("rb") as source_file:
with open_resource_fork(pathlib.Path(temp_data_fork.name), "wb") as temp_rsrc_fork:
shutil.copyfileobj(source_file, temp_rsrc_fork)
with self.subTest(data_fork="empty"):
# Resource fork is selected when data fork is empty.
with rsrcfork.open(temp_data_fork.name) as rf:
self.internal_test_textclipping(rf)
with self.subTest(data_fork="non-resource data"):
# Resource fork is selected when data fork contains non-resource data.
temp_data_fork.write(b"This is the file's data fork. It should not be read, as the file has a resource fork.")
with rsrcfork.open(temp_data_fork.name) as rf:
self.internal_test_textclipping(rf)
with self.subTest(data_fork="valid resource data"):
# Resource fork is selected even when data fork contains valid resource data.
with EMPTY_RSRC_FILE.open("rb") as source_file:
shutil.copyfileobj(source_file, temp_data_fork)
with rsrcfork.open(temp_data_fork.name) as rf:
self.internal_test_textclipping(rf)
@unittest.skipUnless(RESOURCE_FORKS_SUPPORTED, RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE)
def test_textclipping_path_auto_data_fork(self) -> None:
with tempfile.NamedTemporaryFile() as temp_data_fork:
with TEXTCLIPPING_RSRC_FILE.open("rb") as source_file:
shutil.copyfileobj(source_file, temp_data_fork)
# Have to flush the temporary file manually so that the data is visible to the other reads below.
# Normally this happens automatically as part of the close method, but that would also delete the temporary file, which we don't want.
temp_data_fork.flush()
with self.subTest(rsrc_fork="nonexistant"):
# Data fork is selected when resource fork does not exist.
with rsrcfork.open(temp_data_fork.name) as rf:
self.internal_test_textclipping(rf)
with self.subTest(rsrc_fork="empty"):
# Data fork is selected when resource fork exists, but is empty.
with open_resource_fork(pathlib.Path(temp_data_fork.name), "wb") as temp_rsrc_fork:
temp_rsrc_fork.write(b"")
with rsrcfork.open(temp_data_fork.name) as rf:
self.internal_test_textclipping(rf)
with self.subTest(rsrc_fork="non-resource data"):
# Data fork is selected when resource fork contains non-resource data.
with open_resource_fork(pathlib.Path(temp_data_fork.name), "wb") as temp_rsrc_fork:
temp_rsrc_fork.write(b"This is the file's resource fork. It contains junk, so it should be ignored in favor of the data fork.")
with rsrcfork.open(temp_data_fork.name) as rf:
self.internal_test_textclipping(rf)
def test_testfile(self) -> None:
with rsrcfork.open(TESTFILE_RSRC_FILE, fork="data") as rf:
self.assertEqual(rf.header_system_data, TESTFILE_HEADER_SYSTEM_DATA)
self.assertEqual(rf.header_application_data, TESTFILE_HEADER_APPLICATION_DATA)
self.assertEqual(rf.file_attributes, rsrcfork.ResourceFileAttrs.mapPrinterDriverMultiFinderCompatible | rsrcfork.ResourceFileAttrs.mapReadOnly)
self.assertEqual(list(rf), list(TESTFILE_RESOURCES))
for (actual_type, actual_reses), (expected_type, expected_reses) in zip(rf.items(), TESTFILE_RESOURCES.items()):
with self.subTest(type=expected_type):
self.assertEqual(actual_type, expected_type)
self.assertEqual(list(actual_reses), list(expected_reses))
for (actual_id, actual_res), (expected_id, (expected_name, expected_attrs, expected_data)) in zip(actual_reses.items(), expected_reses.items()):
with self.subTest(id=expected_id):
self.assertEqual(actual_res.type, expected_type)
self.assertEqual(actual_id, expected_id)
self.assertEqual(actual_res.id, expected_id)
self.assertEqual(actual_res.name, expected_name)
self.assertEqual(actual_res.attributes, expected_attrs)
self.assertEqual(actual_res.data, expected_data)
self.assertEqual(actual_res.compressed_info, None)
def test_compress_compare(self) -> None:
# This test goes through pairs of resource files: one original file with both compressed and uncompressed resources, and one modified file where all compressed resources have been decompressed (using ResEdit on System 7.5.5).
# It checks that the rsrcfork library performs automatic decompression on the compressed resources, so that the compressed resource file appears to the user like the uncompressed resource file (ignoring resource order, which was lost during decompression using ResEdit).
for name in COMPRESS_RSRC_FILE_NAMES:
with self.subTest(name=name):
with rsrcfork.open(COMPRESSED_DIR / name, fork="data") as compressed_rf, rsrcfork.open(UNCOMPRESSED_DIR / name, fork="data") as uncompressed_rf:
self.assertEqual(sorted(compressed_rf), sorted(uncompressed_rf))
for (compressed_type, compressed_reses), (uncompressed_type, uncompressed_reses) in zip(sorted(compressed_rf.items()), sorted(uncompressed_rf.items())):
with self.subTest(type=compressed_type):
self.assertEqual(compressed_type, uncompressed_type)
self.assertEqual(sorted(compressed_reses), sorted(uncompressed_reses))
for (compressed_id, compressed_res), (uncompressed_id, uncompressed_res) in zip(sorted(compressed_reses.items()), sorted(uncompressed_reses.items())):
with self.subTest(id=compressed_id):
# The metadata of the compressed and uncompressed resources must match.
self.assertEqual(compressed_res.type, uncompressed_res.type)
self.assertEqual(compressed_id, uncompressed_id)
self.assertEqual(compressed_res.id, compressed_id)
self.assertEqual(compressed_res.id, uncompressed_res.id)
self.assertEqual(compressed_res.name, uncompressed_res.name)
self.assertEqual(compressed_res.attributes & ~rsrcfork.ResourceAttrs.resCompressed, uncompressed_res.attributes)
# The uncompressed resource really has to be not compressed.
self.assertNotIn(rsrcfork.ResourceAttrs.resCompressed, uncompressed_res.attributes)
self.assertEqual(uncompressed_res.compressed_info, None)
self.assertEqual(uncompressed_res.data, uncompressed_res.data_raw)
self.assertEqual(uncompressed_res.length, uncompressed_res.length_raw)
# The compressed resource's (automatically decompressed) data must match the uncompressed data.
self.assertEqual(compressed_res.data, uncompressed_res.data)
self.assertEqual(compressed_res.length, uncompressed_res.length)
if rsrcfork.ResourceAttrs.resCompressed in compressed_res.attributes:
# Resources with the compressed attribute must expose correct compression metadata.
self.assertNotEqual(compressed_res.compressed_info, None)
self.assertEqual(compressed_res.compressed_info.decompressed_length, compressed_res.length)
else:
# Some resources in the "compressed" files are not actually compressed, in which case there is no compression metadata.
self.assertEqual(compressed_res.compressed_info, None)
self.assertEqual(compressed_res.data, compressed_res.data_raw)
self.assertEqual(compressed_res.length, compressed_res.length_raw)
if __name__ == "__main__":
unittest.main()

27
tox.ini Normal file
View File

@ -0,0 +1,27 @@
[tox]
# When adding a new Python version here, please also update the list of Python versions called by the GitHub Actions workflow (.github/workflows/ci.yml).
envlist = py{36,37,38},flake8,mypy,package
[testenv]
commands = python -m unittest discover --start-directory ./tests
[testenv:flake8]
deps =
flake8 >= 3.8.0
flake8-bugbear
flake8-tabs
commands = flake8
[testenv:mypy]
deps =
mypy
commands = mypy
[testenv:package]
deps =
twine
wheel >= 0.32.0
commands =
python setup.py sdist bdist_wheel
twine check dist/*