Fix a bunch of flake8 violations

This commit is contained in:
dgelessus 2019-12-30 03:00:12 +01:00
parent f690caac24
commit ba284d1800
9 changed files with 64 additions and 23 deletions

View File

@ -31,8 +31,8 @@ __all__ = [
"open",
]
from . import api, compress
from .api import Resource, ResourceAttrs, ResourceFile, ResourceFileAttrs
from . import compress
# noinspection PyShadowingBuiltins
open = ResourceFile.open

View File

@ -1,5 +1,4 @@
import argparse
import collections
import enum
import itertools
import pathlib
@ -27,11 +26,14 @@ _REZ_ATTR_NAMES = {
}
F = typing.TypeVar("F", bound=enum.Flag)
def decompose_flags(value: F) -> typing.Sequence[F]:
"""Decompose an enum.Flags instance into separate enum constants."""
return [bit for bit in type(value) if bit in value]
def is_printable(char: str) -> bool:
"""Determine whether a character is printable for our purposes.
@ -40,6 +42,7 @@ def is_printable(char: str) -> bool:
return char.isprintable() or char == "\uf8ff"
def bytes_unescape(string: str) -> bytes:
"""Convert a string containing text (in _TEXT_ENCODING) and hex escapes to a bytestring.
@ -66,7 +69,8 @@ def bytes_unescape(string: str) -> bytes:
return bytes(out)
def bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str:
def bytes_escape(bs: bytes, *, quote: typing.Optional[str] = None) -> str:
"""Convert a bytestring to a string (using _TEXT_ENCODING), with non-printable characters hex-escaped.
(We implement our own escaping mechanism here to not depend on Python's str or bytes repr.)
@ -83,9 +87,11 @@ def bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str:
return "".join(out)
MIN_RESOURCE_ID = -0x8000
MAX_RESOURCE_ID = 0x7fff
class ResourceFilter(object):
type: bytes
min_id: int
@ -154,6 +160,7 @@ class ResourceFilter(object):
def matches(self, res: api.Resource) -> bool:
return res.type == self.type and self.min_id <= res.id <= self.max_id and (self.name is None or res.name == self.name)
def filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typing.Iterable[api.Resource]:
if not filters:
# Special case: an empty list of filters matches all resources rather than none
@ -167,6 +174,7 @@ def filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typ
if any(filter_obj.matches(res) for filter_obj in filter_objs):
yield res
def hexdump(data: bytes) -> None:
last_line = None
asterisk_shown = False
@ -189,13 +197,16 @@ def hexdump(data: bytes) -> None:
if data:
print(f"{len(data):08x}")
def raw_hexdump(data: bytes) -> None:
for i in range(0, len(data), 16):
print(" ".join(f"{byte:02x}" for byte in data[i:i + 16]))
def translate_text(data: bytes) -> str:
return data.decode(_TEXT_ENCODING).replace("\r", "\n")
def describe_resource(res: api.Resource, *, include_type: bool, decompress: bool) -> str:
id_desc_parts = [f"{res.id}"]
@ -231,6 +242,7 @@ def describe_resource(res: api.Resource, *, include_type: bool, decompress: bool
desc = f"'{restype}' {desc}"
return desc
def show_filtered_resources(resources: typing.Sequence[api.Resource], format: str, decompress: bool) -> None:
if not resources:
if format in ("dump", "dump-text"):
@ -319,6 +331,7 @@ def show_filtered_resources(resources: typing.Sequence[api.Resource], format: st
else:
raise ValueError(f"Unhandled output format: {format}")
def list_resources(resources: typing.List[api.Resource], *, sort: bool, group: str, decompress: bool) -> None:
if len(resources) == 0:
print("No resources matched the filter")
@ -357,6 +370,7 @@ def list_resources(resources: typing.List[api.Resource], *, sort: bool, group: s
else:
raise AssertionError(f"Unhandled group mode: {group!r}")
def format_compressed_header_info(header_info: compress.CompressedHeaderInfo) -> typing.Iterable[str]:
yield f"Header length: {header_info.header_length} bytes"
yield f"Compression type: 0x{header_info.compression_type:>04x}"
@ -390,6 +404,7 @@ def make_argument_parser(*, description: str, **kwargs: typing.Any) -> argparse.
return ap
def add_resource_file_args(ap: argparse.ArgumentParser) -> None:
"""Define common options/arguments for specifying an input resource file.
@ -399,6 +414,7 @@ def add_resource_file_args(ap: argparse.ArgumentParser) -> None:
ap.add_argument("--fork", choices=["auto", "data", "rsrc"], default="auto", help="The fork from which to read the resource file data, or auto to guess. Default: %(default)s")
ap.add_argument("file", help="The file from which to read resources, or - for stdin.")
RESOURCE_FILTER_HELP = """
The resource filters use syntax similar to Rez (resource definition) files.
Each filter can have one of the following forms:
@ -415,11 +431,13 @@ resource filter (using double quotes) to ensure that it is not interpreted
or rewritten by the shell.
"""
def add_resource_filter_args(ap: argparse.ArgumentParser) -> None:
"""Define common options/arguments for specifying resource filters."""
ap.add_argument("filter", nargs="*", help="One or more filters to select resources. If no filters are specified, all resources are selected.")
def open_resource_file(file: str, *, fork: str) -> api.ResourceFile:
"""Open a resource file at the given path, using the specified fork."""
@ -500,6 +518,7 @@ on Mac OS X normally have both parts of the header data set to all zero bytes.
sys.exit(0)
def do_info(prog: str, args: typing.List[str]) -> typing.NoReturn:
"""Display technical information about the resource file."""
@ -534,6 +553,7 @@ Display technical information and stats about the resource file.
sys.exit(0)
def do_list(prog: str, args: typing.List[str]) -> typing.NoReturn:
"""List the resources in a file."""
@ -568,6 +588,7 @@ decompress the resource data.
sys.exit(0)
def do_resource_info(prog: str, args: typing.List[str]) -> typing.NoReturn:
"""Display technical information about resources."""
@ -633,6 +654,7 @@ Display technical information about one or more resources.
sys.exit(0)
def do_read(prog: str, args: typing.List[str]) -> typing.NoReturn:
"""Read data from resources."""
@ -663,6 +685,7 @@ Read the data of one or more resources.
sys.exit(0)
def do_raw_compress_info(prog: str, args: typing.List[str]) -> typing.NoReturn:
"""Display technical information about raw compressed resource data."""
@ -694,6 +717,7 @@ in a standalone file and not as a resource in a resource file.
sys.exit(0)
def do_raw_decompress(prog: str, args: typing.List[str]) -> typing.NoReturn:
"""Decompress raw compressed resource data."""
@ -856,5 +880,6 @@ rsrcfork library, which this tool is a part of.
# Subcommand is valid, call the looked up subcommand and pass on further arguments.
subcommand_func(f"{prog} {ns.subcommand}", ns.args)
if __name__ == "__main__":
sys.exit(main())

View File

@ -59,9 +59,11 @@ STRUCT_RESOURCE_REFERENCE = struct.Struct(">hHI4x")
# 1 byte: Length of following resource name.
STRUCT_RESOURCE_NAME_HEADER = struct.Struct(">B")
class InvalidResourceFileError(Exception):
pass
class ResourceFileAttrs(enum.Flag):
"""Resource file attribute flags. The descriptions for these flags are taken from comments on the map*Bit and map* enum constants in <CarbonCore/Resources.h>."""
@ -82,6 +84,7 @@ class ResourceFileAttrs(enum.Flag):
_BIT_1 = 1 << 1
_BIT_0 = 1 << 0
class ResourceAttrs(enum.Flag):
"""Resource attribute flags. The descriptions for these flags are taken from comments on the res*Bit and res* enum constants in <CarbonCore/Resources.h>."""
@ -94,6 +97,7 @@ class ResourceAttrs(enum.Flag):
resChanged = 1 << 1 # "Existing resource changed since last update", "Resource changed?"
resCompressed = 1 << 0 # "indicates that the resource data is compressed" (only documented in https://github.com/kreativekorp/ksfl/wiki/Macintosh-Resource-File-Format)
class Resource(object):
"""A single resource from a resource file."""
@ -229,6 +233,7 @@ class Resource(object):
else:
return self.data_raw
class _LazyResourceMap(typing.Mapping[int, Resource]):
"""Internal class: Read-only wrapper for a mapping of resource IDs to resource objects.
@ -274,6 +279,7 @@ class _LazyResourceMap(typing.Mapping[int, Resource]):
return f"<Resource map for type {self.type!r}, containing {contents}>"
class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.ContextManager["ResourceFile"]):
"""A resource file reader operating on a byte stream."""
@ -295,7 +301,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
_references: typing.MutableMapping[bytes, typing.MutableMapping[int, Resource]]
@classmethod
def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str="auto", **kwargs: typing.Any) -> "ResourceFile":
def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str = "auto", **kwargs: typing.Any) -> "ResourceFile":
"""Open the file at the given path as a ResourceFile.
The fork parameter controls which fork of the file the resource data will be read from. It accepts the following values:
@ -354,7 +360,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
else:
raise ValueError(f"Unsupported value for the fork parameter: {fork!r}")
def __init__(self, stream: typing.BinaryIO, *, close: bool=False) -> None:
def __init__(self, stream: typing.BinaryIO, *, close: bool = False) -> None:
"""Create a ResourceFile wrapping the given byte stream.
To read resource file data from a bytes object, wrap it in an io.BytesIO.

View File

@ -28,7 +28,7 @@ DECOMPRESSORS = {
}
def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Decompress compressed resource data from a stream, whose header has already been read and parsed into a CompressedHeaderInfo object."""
try:
@ -44,12 +44,14 @@ def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.B
if decompressed_length != header_info.decompressed_length:
raise DecompressError(f"Actual length of decompressed data ({decompressed_length}) does not match length stored in resource ({header_info.decompressed_length})")
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool = False) -> bytes:
"""Decompress the given compressed resource data, whose header has already been removed and parsed into a CompressedHeaderInfo object."""
return b"".join(decompress_stream_parsed(header_info, io.BytesIO(data), debug=debug))
def decompress_stream(stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream(stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Decompress compressed resource data from a stream."""
header_info = CompressedHeaderInfo.parse_stream(stream)
@ -59,7 +61,8 @@ def decompress_stream(stream: typing.BinaryIO, *, debug: bool=False) -> typing.I
yield from decompress_stream_parsed(header_info, stream, debug=debug)
def decompress(data: bytes, *, debug: bool=False) -> bytes:
def decompress(data: bytes, *, debug: bool = False) -> bytes:
"""Decompress the given compressed resource data."""
return b"".join(decompress_stream(io.BytesIO(data), debug=debug))

View File

@ -112,9 +112,14 @@ if typing.TYPE_CHECKING:
The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable.
"""
def readable(self) -> bool: ...
def read(self, size: typing.Optional[int] = ...) -> bytes: ...
def peek(self, size: int = ...) -> bytes: ...
def readable(self) -> bool:
...
def read(self, size: typing.Optional[int] = ...) -> bytes:
...
def peek(self, size: int = ...) -> bytes:
...
class _PeekableIOWrapper(object):
@ -182,6 +187,7 @@ def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes:
raise DecompressError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes")
return data
def read_variable_length_integer(stream: typing.BinaryIO) -> int:
"""Read a variable-length integer from the stream.

View File

@ -1,4 +1,3 @@
import io
import typing
from . import common
@ -39,7 +38,7 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
assert len(TABLE) == len(range(0x4b, 0xfe))
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
if not isinstance(header_info, common.CompressedType8HeaderInfo):
@ -248,9 +247,10 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
else:
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (0)."""
decompressed_length = 0
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
if debug:

View File

@ -1,4 +1,3 @@
import io
import typing
from . import common
@ -22,7 +21,7 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
assert len(TABLE) == len(range(0xd5, 0xfe))
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
if not isinstance(header_info, common.CompressedType8HeaderInfo):
@ -129,7 +128,8 @@ def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: ty
else:
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (1)."""
decompressed_length = 0

View File

@ -1,5 +1,4 @@
import enum
import io
import struct
import typing
@ -74,7 +73,7 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool
)
def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]:
def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
while True: # Loop is terminated when EOF is reached.
table_index_data = stream.read(1)
if not table_index_data:
@ -93,7 +92,8 @@ def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int,
print(f"Reference: {table_index} -> {table[table_index]!r}")
yield table[table_index]
def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]:
def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
while True: # Loop is terminated when EOF is reached.
tag_data = stream.read(1)
if not tag_data:
@ -133,7 +133,7 @@ def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, ta
yield literal
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (2)."""
if not isinstance(header_info, common.CompressedType9HeaderInfo):

View File

@ -62,5 +62,6 @@ class ResourceFileReadTests(unittest.TestCase):
self.assertEqual(actual_res.data, expected_data)
self.assertEqual(actual_res.compressed_info, None)
if __name__ == "__main__":
unittest.main()