diff --git a/rsrcfork/__init__.py b/rsrcfork/__init__.py index 94f251c..b2a2b8e 100644 --- a/rsrcfork/__init__.py +++ b/rsrcfork/__init__.py @@ -31,8 +31,8 @@ __all__ = [ "open", ] -from . import api, compress from .api import Resource, ResourceAttrs, ResourceFile, ResourceFileAttrs +from . import compress # noinspection PyShadowingBuiltins open = ResourceFile.open diff --git a/rsrcfork/__main__.py b/rsrcfork/__main__.py index 4c04346..8634a76 100644 --- a/rsrcfork/__main__.py +++ b/rsrcfork/__main__.py @@ -1,5 +1,4 @@ import argparse -import collections import enum import itertools import pathlib @@ -27,11 +26,14 @@ _REZ_ATTR_NAMES = { } F = typing.TypeVar("F", bound=enum.Flag) + + def decompose_flags(value: F) -> typing.Sequence[F]: """Decompose an enum.Flags instance into separate enum constants.""" return [bit for bit in type(value) if bit in value] + def is_printable(char: str) -> bool: """Determine whether a character is printable for our purposes. @@ -40,6 +42,7 @@ def is_printable(char: str) -> bool: return char.isprintable() or char == "\uf8ff" + def bytes_unescape(string: str) -> bytes: """Convert a string containing text (in _TEXT_ENCODING) and hex escapes to a bytestring. @@ -66,7 +69,8 @@ def bytes_unescape(string: str) -> bytes: return bytes(out) -def bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str: + +def bytes_escape(bs: bytes, *, quote: typing.Optional[str] = None) -> str: """Convert a bytestring to a string (using _TEXT_ENCODING), with non-printable characters hex-escaped. (We implement our own escaping mechanism here to not depend on Python's str or bytes repr.) @@ -83,9 +87,11 @@ def bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str: return "".join(out) + MIN_RESOURCE_ID = -0x8000 MAX_RESOURCE_ID = 0x7fff + class ResourceFilter(object): type: bytes min_id: int @@ -154,6 +160,7 @@ class ResourceFilter(object): def matches(self, res: api.Resource) -> bool: return res.type == self.type and self.min_id <= res.id <= self.max_id and (self.name is None or res.name == self.name) + def filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typing.Iterable[api.Resource]: if not filters: # Special case: an empty list of filters matches all resources rather than none @@ -167,6 +174,7 @@ def filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typ if any(filter_obj.matches(res) for filter_obj in filter_objs): yield res + def hexdump(data: bytes) -> None: last_line = None asterisk_shown = False @@ -189,13 +197,16 @@ def hexdump(data: bytes) -> None: if data: print(f"{len(data):08x}") + def raw_hexdump(data: bytes) -> None: for i in range(0, len(data), 16): print(" ".join(f"{byte:02x}" for byte in data[i:i + 16])) + def translate_text(data: bytes) -> str: return data.decode(_TEXT_ENCODING).replace("\r", "\n") + def describe_resource(res: api.Resource, *, include_type: bool, decompress: bool) -> str: id_desc_parts = [f"{res.id}"] @@ -231,6 +242,7 @@ def describe_resource(res: api.Resource, *, include_type: bool, decompress: bool desc = f"'{restype}' {desc}" return desc + def show_filtered_resources(resources: typing.Sequence[api.Resource], format: str, decompress: bool) -> None: if not resources: if format in ("dump", "dump-text"): @@ -319,6 +331,7 @@ def show_filtered_resources(resources: typing.Sequence[api.Resource], format: st else: raise ValueError(f"Unhandled output format: {format}") + def list_resources(resources: typing.List[api.Resource], *, sort: bool, group: str, decompress: bool) -> None: if len(resources) == 0: print("No resources matched the filter") @@ -357,6 +370,7 @@ def list_resources(resources: typing.List[api.Resource], *, sort: bool, group: s else: raise AssertionError(f"Unhandled group mode: {group!r}") + def format_compressed_header_info(header_info: compress.CompressedHeaderInfo) -> typing.Iterable[str]: yield f"Header length: {header_info.header_length} bytes" yield f"Compression type: 0x{header_info.compression_type:>04x}" @@ -390,6 +404,7 @@ def make_argument_parser(*, description: str, **kwargs: typing.Any) -> argparse. return ap + def add_resource_file_args(ap: argparse.ArgumentParser) -> None: """Define common options/arguments for specifying an input resource file. @@ -399,6 +414,7 @@ def add_resource_file_args(ap: argparse.ArgumentParser) -> None: ap.add_argument("--fork", choices=["auto", "data", "rsrc"], default="auto", help="The fork from which to read the resource file data, or auto to guess. Default: %(default)s") ap.add_argument("file", help="The file from which to read resources, or - for stdin.") + RESOURCE_FILTER_HELP = """ The resource filters use syntax similar to Rez (resource definition) files. Each filter can have one of the following forms: @@ -415,11 +431,13 @@ resource filter (using double quotes) to ensure that it is not interpreted or rewritten by the shell. """ + def add_resource_filter_args(ap: argparse.ArgumentParser) -> None: """Define common options/arguments for specifying resource filters.""" ap.add_argument("filter", nargs="*", help="One or more filters to select resources. If no filters are specified, all resources are selected.") + def open_resource_file(file: str, *, fork: str) -> api.ResourceFile: """Open a resource file at the given path, using the specified fork.""" @@ -500,6 +518,7 @@ on Mac OS X normally have both parts of the header data set to all zero bytes. sys.exit(0) + def do_info(prog: str, args: typing.List[str]) -> typing.NoReturn: """Display technical information about the resource file.""" @@ -534,6 +553,7 @@ Display technical information and stats about the resource file. sys.exit(0) + def do_list(prog: str, args: typing.List[str]) -> typing.NoReturn: """List the resources in a file.""" @@ -568,6 +588,7 @@ decompress the resource data. sys.exit(0) + def do_resource_info(prog: str, args: typing.List[str]) -> typing.NoReturn: """Display technical information about resources.""" @@ -633,6 +654,7 @@ Display technical information about one or more resources. sys.exit(0) + def do_read(prog: str, args: typing.List[str]) -> typing.NoReturn: """Read data from resources.""" @@ -663,6 +685,7 @@ Read the data of one or more resources. sys.exit(0) + def do_raw_compress_info(prog: str, args: typing.List[str]) -> typing.NoReturn: """Display technical information about raw compressed resource data.""" @@ -694,6 +717,7 @@ in a standalone file and not as a resource in a resource file. sys.exit(0) + def do_raw_decompress(prog: str, args: typing.List[str]) -> typing.NoReturn: """Decompress raw compressed resource data.""" @@ -856,5 +880,6 @@ rsrcfork library, which this tool is a part of. # Subcommand is valid, call the looked up subcommand and pass on further arguments. subcommand_func(f"{prog} {ns.subcommand}", ns.args) + if __name__ == "__main__": sys.exit(main()) diff --git a/rsrcfork/api.py b/rsrcfork/api.py index 8981d0a..d6dc98a 100644 --- a/rsrcfork/api.py +++ b/rsrcfork/api.py @@ -59,9 +59,11 @@ STRUCT_RESOURCE_REFERENCE = struct.Struct(">hHI4x") # 1 byte: Length of following resource name. STRUCT_RESOURCE_NAME_HEADER = struct.Struct(">B") + class InvalidResourceFileError(Exception): pass + class ResourceFileAttrs(enum.Flag): """Resource file attribute flags. The descriptions for these flags are taken from comments on the map*Bit and map* enum constants in .""" @@ -82,6 +84,7 @@ class ResourceFileAttrs(enum.Flag): _BIT_1 = 1 << 1 _BIT_0 = 1 << 0 + class ResourceAttrs(enum.Flag): """Resource attribute flags. The descriptions for these flags are taken from comments on the res*Bit and res* enum constants in .""" @@ -94,6 +97,7 @@ class ResourceAttrs(enum.Flag): resChanged = 1 << 1 # "Existing resource changed since last update", "Resource changed?" resCompressed = 1 << 0 # "indicates that the resource data is compressed" (only documented in https://github.com/kreativekorp/ksfl/wiki/Macintosh-Resource-File-Format) + class Resource(object): """A single resource from a resource file.""" @@ -229,6 +233,7 @@ class Resource(object): else: return self.data_raw + class _LazyResourceMap(typing.Mapping[int, Resource]): """Internal class: Read-only wrapper for a mapping of resource IDs to resource objects. @@ -274,6 +279,7 @@ class _LazyResourceMap(typing.Mapping[int, Resource]): return f"" + class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.ContextManager["ResourceFile"]): """A resource file reader operating on a byte stream.""" @@ -295,7 +301,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing. _references: typing.MutableMapping[bytes, typing.MutableMapping[int, Resource]] @classmethod - def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str="auto", **kwargs: typing.Any) -> "ResourceFile": + def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str = "auto", **kwargs: typing.Any) -> "ResourceFile": """Open the file at the given path as a ResourceFile. The fork parameter controls which fork of the file the resource data will be read from. It accepts the following values: @@ -354,7 +360,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing. else: raise ValueError(f"Unsupported value for the fork parameter: {fork!r}") - def __init__(self, stream: typing.BinaryIO, *, close: bool=False) -> None: + def __init__(self, stream: typing.BinaryIO, *, close: bool = False) -> None: """Create a ResourceFile wrapping the given byte stream. To read resource file data from a bytes object, wrap it in an io.BytesIO. diff --git a/rsrcfork/compress/__init__.py b/rsrcfork/compress/__init__.py index a178e8e..b6a23eb 100644 --- a/rsrcfork/compress/__init__.py +++ b/rsrcfork/compress/__init__.py @@ -28,7 +28,7 @@ DECOMPRESSORS = { } -def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]: +def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]: """Decompress compressed resource data from a stream, whose header has already been read and parsed into a CompressedHeaderInfo object.""" try: @@ -44,12 +44,14 @@ def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.B if decompressed_length != header_info.decompressed_length: raise DecompressError(f"Actual length of decompressed data ({decompressed_length}) does not match length stored in resource ({header_info.decompressed_length})") -def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes: + +def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool = False) -> bytes: """Decompress the given compressed resource data, whose header has already been removed and parsed into a CompressedHeaderInfo object.""" return b"".join(decompress_stream_parsed(header_info, io.BytesIO(data), debug=debug)) -def decompress_stream(stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]: + +def decompress_stream(stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]: """Decompress compressed resource data from a stream.""" header_info = CompressedHeaderInfo.parse_stream(stream) @@ -59,7 +61,8 @@ def decompress_stream(stream: typing.BinaryIO, *, debug: bool=False) -> typing.I yield from decompress_stream_parsed(header_info, stream, debug=debug) -def decompress(data: bytes, *, debug: bool=False) -> bytes: + +def decompress(data: bytes, *, debug: bool = False) -> bytes: """Decompress the given compressed resource data.""" return b"".join(decompress_stream(io.BytesIO(data), debug=debug)) diff --git a/rsrcfork/compress/common.py b/rsrcfork/compress/common.py index fa664d7..43de94f 100644 --- a/rsrcfork/compress/common.py +++ b/rsrcfork/compress/common.py @@ -112,9 +112,14 @@ if typing.TYPE_CHECKING: The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable. """ - def readable(self) -> bool: ... - def read(self, size: typing.Optional[int] = ...) -> bytes: ... - def peek(self, size: int = ...) -> bytes: ... + def readable(self) -> bool: + ... + + def read(self, size: typing.Optional[int] = ...) -> bytes: + ... + + def peek(self, size: int = ...) -> bytes: + ... class _PeekableIOWrapper(object): @@ -182,6 +187,7 @@ def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes: raise DecompressError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes") return data + def read_variable_length_integer(stream: typing.BinaryIO) -> int: """Read a variable-length integer from the stream. diff --git a/rsrcfork/compress/dcmp0.py b/rsrcfork/compress/dcmp0.py index d0196ff..c4c12a6 100644 --- a/rsrcfork/compress/dcmp0.py +++ b/rsrcfork/compress/dcmp0.py @@ -1,4 +1,3 @@ -import io import typing from . import common @@ -39,7 +38,7 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)] assert len(TABLE) == len(range(0x4b, 0xfe)) -def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]: +def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]: """Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging.""" if not isinstance(header_info, common.CompressedType8HeaderInfo): @@ -248,9 +247,10 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty else: raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}") -def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]: + +def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]: """Decompress compressed data in the format used by 'dcmp' (0).""" - + decompressed_length = 0 for chunk in decompress_stream_inner(header_info, stream, debug=debug): if debug: diff --git a/rsrcfork/compress/dcmp1.py b/rsrcfork/compress/dcmp1.py index 505ccee..17ac097 100644 --- a/rsrcfork/compress/dcmp1.py +++ b/rsrcfork/compress/dcmp1.py @@ -1,4 +1,3 @@ -import io import typing from . import common @@ -22,7 +21,7 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)] assert len(TABLE) == len(range(0xd5, 0xfe)) -def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]: +def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]: """Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging.""" if not isinstance(header_info, common.CompressedType8HeaderInfo): @@ -129,7 +128,8 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty else: raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}") -def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]: + +def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]: """Decompress compressed data in the format used by 'dcmp' (1).""" decompressed_length = 0 diff --git a/rsrcfork/compress/dcmp2.py b/rsrcfork/compress/dcmp2.py index 9fd5ef3..c57b06d 100644 --- a/rsrcfork/compress/dcmp2.py +++ b/rsrcfork/compress/dcmp2.py @@ -1,5 +1,4 @@ import enum -import io import struct import typing @@ -74,7 +73,7 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool ) -def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]: +def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]: while True: # Loop is terminated when EOF is reached. table_index_data = stream.read(1) if not table_index_data: @@ -93,7 +92,8 @@ def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, print(f"Reference: {table_index} -> {table[table_index]!r}") yield table[table_index] -def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]: + +def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]: while True: # Loop is terminated when EOF is reached. tag_data = stream.read(1) if not tag_data: @@ -133,7 +133,7 @@ def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, ta yield literal -def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]: +def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]: """Decompress compressed data in the format used by 'dcmp' (2).""" if not isinstance(header_info, common.CompressedType9HeaderInfo): diff --git a/tests/test_rsrcfork.py b/tests/test_rsrcfork.py index bed1990..24a6c6a 100644 --- a/tests/test_rsrcfork.py +++ b/tests/test_rsrcfork.py @@ -62,5 +62,6 @@ class ResourceFileReadTests(unittest.TestCase): self.assertEqual(actual_res.data, expected_data) self.assertEqual(actual_res.compressed_info, None) + if __name__ == "__main__": unittest.main()