From 2907d9f9e84750d178c1aac49df28d967782151b Mon Sep 17 00:00:00 2001 From: dgelessus Date: Tue, 21 Jul 2020 14:45:48 +0200 Subject: [PATCH] Rewrite __main__ code to use stream-based resource reading --- rsrcfork/__main__.py | 168 ++++++++++++++++++++++++------------------- 1 file changed, 93 insertions(+), 75 deletions(-) diff --git a/rsrcfork/__main__.py b/rsrcfork/__main__.py index ea8a80f..c59109b 100644 --- a/rsrcfork/__main__.py +++ b/rsrcfork/__main__.py @@ -1,6 +1,8 @@ import argparse import enum +import io import itertools +import shutil import sys import typing @@ -183,11 +185,12 @@ def filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typ yield res -def hexdump(data: bytes) -> typing.Iterable[str]: +def hexdump_stream(stream: typing.BinaryIO) -> typing.Iterable[str]: last_line = None asterisk_shown = False - for i in range(0, len(data), 16): - line = data[i:i + 16] + line = stream.read(16) + i = 0 + while line: # If the same 16-byte lines appear multiple times, print only the first one, and replace all further lines with a single line with an asterisk. # This is unambiguous - to find out how many lines were collapsed this way, the user can compare the addresses of the lines before and after the asterisk. if line == last_line: @@ -201,14 +204,26 @@ def hexdump(data: bytes) -> typing.Iterable[str]: yield f"{i:08x} {line_hex_left:<{8*2+7}} {line_hex_right:<{8*2+7}} |{line_char}|" asterisk_shown = False last_line = line + i += len(line) + line = stream.read(16) - if data: - yield f"{len(data):08x}" + if i: + yield f"{i:08x}" + + +def hexdump(data: bytes) -> typing.Iterable[str]: + yield from hexdump_stream(io.BytesIO(data)) + + +def raw_hexdump_stream(stream: typing.BinaryIO) -> typing.Iterable[str]: + line = stream.read(16) + while line: + yield " ".join(f"{byte:02x}" for byte in line) + line = stream.read(16) def raw_hexdump(data: bytes) -> typing.Iterable[str]: - for i in range(0, len(data), 16): - yield " ".join(f"{byte:02x}" for byte in data[i:i + 16]) + yield from raw_hexdump_stream(io.BytesIO(data)) def translate_text(data: bytes) -> str: @@ -267,77 +282,80 @@ def show_filtered_resources(resources: typing.Sequence[api.Resource], format: st for res in resources: if decompress: - data = res.data + open_func = res.open else: - data = res.data_raw + open_func = res.open_raw - if format in ("dump", "dump-text"): - # Human-readable info and hex or text dump - desc = describe_resource(res, include_type=True, decompress=decompress) - print(f"Resource {desc}:") - if format == "dump": - for line in hexdump(data): - print(line) - elif format == "dump-text": - print(translate_text(data)) - else: - raise AssertionError(f"Unhandled format: {format!r}") - print() - elif format == "hex": - # Data only as hex - - for line in raw_hexdump(data): - print(line) - elif format == "raw": - # Data only as raw bytes - - sys.stdout.buffer.write(data) - elif format == "derez": - # Like DeRez with no resource definitions - - attrs = list(decompose_flags(res.attributes)) - - if decompress and api.ResourceAttrs.resCompressed in attrs: - attrs.remove(api.ResourceAttrs.resCompressed) - attrs_comment = " /* was compressed */" - else: - attrs_comment = "" - - attr_descs_with_none = [_REZ_ATTR_NAMES[attr] for attr in attrs] - if None in attr_descs_with_none: - attr_descs = [f"${res.attributes.value:02X}"] - else: - attr_descs = typing.cast(typing.List[str], attr_descs_with_none) - - parts = [str(res.id)] - - if res.name is not None: - parts.append(bytes_quote(res.name, '"')) - - parts += attr_descs - - quoted_restype = bytes_quote(res.type, "'") - print(f"data {quoted_restype} ({', '.join(parts)}{attrs_comment}) {{") - - for i in range(0, len(data), 16): - # Two-byte grouping is really annoying to implement. - groups = [] - for j in range(0, 16, 2): - if i+j >= len(data): - break - elif i+j+1 >= len(data): - groups.append(f"{data[i+j]:02X}") - else: - groups.append(f"{data[i+j]:02X}{data[i+j+1]:02X}") + with open_func() as f: + if format in ("dump", "dump-text"): + # Human-readable info and hex or text dump + desc = describe_resource(res, include_type=True, decompress=decompress) + print(f"Resource {desc}:") + if format == "dump": + for line in hexdump_stream(f): + print(line) + elif format == "dump-text": + print(translate_text(f.read())) + else: + raise AssertionError(f"Unhandled format: {format!r}") + print() + elif format == "hex": + # Data only as hex - s = f'$"{" ".join(groups)}"' - comment = "/* " + data[i:i + 16].decode(_TEXT_ENCODING).translate(_TRANSLATE_NONPRINTABLES) + " */" - print(f"\t{s:<54s}{comment}") - - print("};") - print() - else: - raise ValueError(f"Unhandled output format: {format}") + for line in raw_hexdump_stream(f): + print(line) + elif format == "raw": + # Data only as raw bytes + + shutil.copyfileobj(f, sys.stdout.buffer) + elif format == "derez": + # Like DeRez with no resource definitions + + attrs = list(decompose_flags(res.attributes)) + + if decompress and api.ResourceAttrs.resCompressed in attrs: + attrs.remove(api.ResourceAttrs.resCompressed) + attrs_comment = " /* was compressed */" + else: + attrs_comment = "" + + attr_descs_with_none = [_REZ_ATTR_NAMES[attr] for attr in attrs] + if None in attr_descs_with_none: + attr_descs = [f"${res.attributes.value:02X}"] + else: + attr_descs = typing.cast(typing.List[str], attr_descs_with_none) + + parts = [str(res.id)] + + if res.name is not None: + parts.append(bytes_quote(res.name, '"')) + + parts += attr_descs + + quoted_restype = bytes_quote(res.type, "'") + print(f"data {quoted_restype} ({', '.join(parts)}{attrs_comment}) {{") + + line = f.read(16) + while line: + # Two-byte grouping is really annoying to implement. + groups = [] + for j in range(0, 16, 2): + if j >= len(line): + break + elif j+1 >= len(line): + groups.append(f"{line[j]:02X}") + else: + groups.append(f"{line[j]:02X}{line[j+1]:02X}") + + s = f'$"{" ".join(groups)}"' + comment = "/* " + line.decode(_TEXT_ENCODING).translate(_TRANSLATE_NONPRINTABLES) + " */" + print(f"\t{s:<54s}{comment}") + line = f.read(16) + + print("};") + print() + else: + raise ValueError(f"Unhandled output format: {format}") def list_resources(resources: typing.List[api.Resource], *, sort: bool, group: str, decompress: bool) -> None: