mirror of
https://github.com/dgelessus/python-rsrcfork.git
synced 2025-07-02 08:24:09 +00:00
Compare commits
11 Commits
Author | SHA1 | Date | |
---|---|---|---|
d342614f55 | |||
a5fb30e194 | |||
f3b3de496e | |||
a71274d554 | |||
6d69d0097d | |||
8db1b22bdc | |||
6559cbc337 | |||
1e79dc3c50 | |||
db48212ade | |||
3a72bd3406 | |||
cb868b8005 |
10
README.rst
10
README.rst
@ -174,6 +174,16 @@ If these links are no longer functional, some are archived in the `Internet Arch
|
||||
Changelog
|
||||
---------
|
||||
|
||||
Version 1.5.0
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
* Added stream-based decompression methods to the ``rsrcfork.compress`` module.
|
||||
|
||||
* The internal decompressor implementations have been refactored to use streams.
|
||||
* This allows for incremental decompression of compressed resource data. In practice this has no noticeable effect yet, because the main ``rsrcfork`` API doesn't support incremental reading of resource data.
|
||||
|
||||
* Fixed the command line tool always displaying an incorrect error "Cannot specify an explicit fork when reading from stdin" when using ``-`` (stdin) as the input file.
|
||||
|
||||
Version 1.4.0
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
|
@ -20,7 +20,7 @@
|
||||
# * Add a new empty section for the next version to the README.rst changelog.
|
||||
# * Commit and push the changes to master.
|
||||
|
||||
__version__ = "1.4.0"
|
||||
__version__ = "1.5.0"
|
||||
|
||||
__all__ = [
|
||||
"Resource",
|
||||
|
@ -26,12 +26,12 @@ _REZ_ATTR_NAMES = {
|
||||
}
|
||||
|
||||
F = typing.TypeVar("F", bound=enum.Flag)
|
||||
def _decompose_flags(value: F) -> typing.Sequence[F]:
|
||||
def decompose_flags(value: F) -> typing.Sequence[F]:
|
||||
"""Decompose an enum.Flags instance into separate enum constants."""
|
||||
|
||||
return [bit for bit in type(value) if bit in value]
|
||||
|
||||
def _is_printable(char: str) -> bool:
|
||||
def is_printable(char: str) -> bool:
|
||||
"""Determine whether a character is printable for our purposes.
|
||||
|
||||
We mainly use Python's definition of printable (i. e. everything that Unicode does not consider a separator or "other" character). However, we also treat U+F8FF as printable, which is the private use codepoint used for the Apple logo character.
|
||||
@ -39,7 +39,7 @@ def _is_printable(char: str) -> bool:
|
||||
|
||||
return char.isprintable() or char == "\uf8ff"
|
||||
|
||||
def _bytes_unescape(string: str) -> bytes:
|
||||
def bytes_unescape(string: str) -> bytes:
|
||||
"""Convert a string containing text (in _TEXT_ENCODING) and hex escapes to a bytestring.
|
||||
|
||||
(We implement our own unescaping mechanism here to not depend on any of Python's string/bytes escape syntax.)
|
||||
@ -65,7 +65,7 @@ def _bytes_unescape(string: str) -> bytes:
|
||||
|
||||
return bytes(out)
|
||||
|
||||
def _bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str:
|
||||
def bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str:
|
||||
"""Convert a bytestring to a string (using _TEXT_ENCODING), with non-printable characters hex-escaped.
|
||||
|
||||
(We implement our own escaping mechanism here to not depend on Python's str or bytes repr.)
|
||||
@ -75,14 +75,14 @@ def _bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str:
|
||||
for byte, char in zip(bs, bs.decode(_TEXT_ENCODING)):
|
||||
if char in {quote, "\\"}:
|
||||
out.append(f"\\{char}")
|
||||
elif _is_printable(char):
|
||||
elif is_printable(char):
|
||||
out.append(char)
|
||||
else:
|
||||
out.append(f"\\x{byte:02x}")
|
||||
|
||||
return "".join(out)
|
||||
|
||||
def _filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typing.List[api.Resource]:
|
||||
def filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typing.List[api.Resource]:
|
||||
matching: typing.MutableMapping[typing.Tuple[bytes, int], api.Resource] = collections.OrderedDict()
|
||||
|
||||
for filter in filters:
|
||||
@ -96,7 +96,7 @@ def _filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> ty
|
||||
matching[res.type, res.id] = res
|
||||
elif filter[0] == filter[-1] == "'":
|
||||
try:
|
||||
resources = rf[_bytes_unescape(filter[1:-1])]
|
||||
resources = rf[bytes_unescape(filter[1:-1])]
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
@ -114,7 +114,7 @@ def _filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> ty
|
||||
if not restype_str[0] == restype_str[-1] == "'":
|
||||
raise ValueError(
|
||||
f"Invalid filter {filter!r}: Resource type is not a single-quoted type identifier: {restype_str!r}")
|
||||
restype = _bytes_unescape(restype_str[1:-1])
|
||||
restype = bytes_unescape(restype_str[1:-1])
|
||||
|
||||
if len(restype) != 4:
|
||||
raise ValueError(
|
||||
@ -130,7 +130,7 @@ def _filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> ty
|
||||
continue
|
||||
|
||||
if resid_str[0] == resid_str[-1] == '"':
|
||||
name = _bytes_unescape(resid_str[1:-1])
|
||||
name = bytes_unescape(resid_str[1:-1])
|
||||
|
||||
for res in resources.values():
|
||||
if res.name == name:
|
||||
@ -155,7 +155,7 @@ def _filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> ty
|
||||
|
||||
return list(matching.values())
|
||||
|
||||
def _hexdump(data: bytes) -> None:
|
||||
def hexdump(data: bytes) -> None:
|
||||
last_line = None
|
||||
asterisk_shown = False
|
||||
for i in range(0, len(data), 16):
|
||||
@ -177,18 +177,18 @@ def _hexdump(data: bytes) -> None:
|
||||
if data:
|
||||
print(f"{len(data):08x}")
|
||||
|
||||
def _raw_hexdump(data: bytes) -> None:
|
||||
def raw_hexdump(data: bytes) -> None:
|
||||
for i in range(0, len(data), 16):
|
||||
print(" ".join(f"{byte:02x}" for byte in data[i:i + 16]))
|
||||
|
||||
def _translate_text(data: bytes) -> str:
|
||||
def translate_text(data: bytes) -> str:
|
||||
return data.decode(_TEXT_ENCODING).replace("\r", "\n")
|
||||
|
||||
def _describe_resource(res: api.Resource, *, include_type: bool, decompress: bool) -> str:
|
||||
def describe_resource(res: api.Resource, *, include_type: bool, decompress: bool) -> str:
|
||||
id_desc_parts = [f"{res.id}"]
|
||||
|
||||
if res.name is not None:
|
||||
name = _bytes_escape(res.name, quote='"')
|
||||
name = bytes_escape(res.name, quote='"')
|
||||
id_desc_parts.append(f'"{name}"')
|
||||
|
||||
id_desc = ", ".join(id_desc_parts)
|
||||
@ -208,7 +208,7 @@ def _describe_resource(res: api.Resource, *, include_type: bool, decompress: boo
|
||||
length_desc = f"{res.length_raw} bytes"
|
||||
content_desc_parts.append(length_desc)
|
||||
|
||||
attrs = _decompose_flags(res.attributes)
|
||||
attrs = decompose_flags(res.attributes)
|
||||
if attrs:
|
||||
content_desc_parts.append(" | ".join(attr.name for attr in attrs))
|
||||
|
||||
@ -216,11 +216,11 @@ def _describe_resource(res: api.Resource, *, include_type: bool, decompress: boo
|
||||
|
||||
desc = f"({id_desc}): {content_desc}"
|
||||
if include_type:
|
||||
restype = _bytes_escape(res.type, quote="'")
|
||||
restype = bytes_escape(res.type, quote="'")
|
||||
desc = f"'{restype}' {desc}"
|
||||
return desc
|
||||
|
||||
def _parse_args() -> argparse.Namespace:
|
||||
def parse_args() -> argparse.Namespace:
|
||||
ap = argparse.ArgumentParser(
|
||||
add_help=False,
|
||||
fromfile_prefix_chars="@",
|
||||
@ -259,13 +259,13 @@ def _parse_args() -> argparse.Namespace:
|
||||
ns = ap.parse_args()
|
||||
return ns
|
||||
|
||||
def _show_header_data(data: bytes, *, format: str) -> None:
|
||||
def show_header_data(data: bytes, *, format: str) -> None:
|
||||
if format == "dump":
|
||||
_hexdump(data)
|
||||
hexdump(data)
|
||||
elif format == "dump-text":
|
||||
print(_translate_text(data))
|
||||
print(translate_text(data))
|
||||
elif format == "hex":
|
||||
_raw_hexdump(data)
|
||||
raw_hexdump(data)
|
||||
elif format == "raw":
|
||||
sys.stdout.buffer.write(data)
|
||||
elif format == "derez":
|
||||
@ -274,7 +274,7 @@ def _show_header_data(data: bytes, *, format: str) -> None:
|
||||
else:
|
||||
raise ValueError(f"Unhandled output format: {format}")
|
||||
|
||||
def _show_filtered_resources(resources: typing.Sequence[api.Resource], format: str, decompress: bool) -> None:
|
||||
def show_filtered_resources(resources: typing.Sequence[api.Resource], format: str, decompress: bool) -> None:
|
||||
if not resources:
|
||||
if format in ("dump", "dump-text"):
|
||||
print("No resources matched the filter")
|
||||
@ -297,19 +297,19 @@ def _show_filtered_resources(resources: typing.Sequence[api.Resource], format: s
|
||||
|
||||
if format in ("dump", "dump-text"):
|
||||
# Human-readable info and hex or text dump
|
||||
desc = _describe_resource(res, include_type=True, decompress=decompress)
|
||||
desc = describe_resource(res, include_type=True, decompress=decompress)
|
||||
print(f"Resource {desc}:")
|
||||
if format == "dump":
|
||||
_hexdump(data)
|
||||
hexdump(data)
|
||||
elif format == "dump-text":
|
||||
print(_translate_text(data))
|
||||
print(translate_text(data))
|
||||
else:
|
||||
raise AssertionError(f"Unhandled format: {format!r}")
|
||||
print()
|
||||
elif format == "hex":
|
||||
# Data only as hex
|
||||
|
||||
_raw_hexdump(data)
|
||||
raw_hexdump(data)
|
||||
elif format == "raw":
|
||||
# Data only as raw bytes
|
||||
|
||||
@ -317,7 +317,7 @@ def _show_filtered_resources(resources: typing.Sequence[api.Resource], format: s
|
||||
elif format == "derez":
|
||||
# Like DeRez with no resource definitions
|
||||
|
||||
attrs = list(_decompose_flags(res.attributes))
|
||||
attrs = list(decompose_flags(res.attributes))
|
||||
|
||||
if decompress and api.ResourceAttrs.resCompressed in attrs:
|
||||
attrs.remove(api.ResourceAttrs.resCompressed)
|
||||
@ -334,12 +334,12 @@ def _show_filtered_resources(resources: typing.Sequence[api.Resource], format: s
|
||||
parts = [str(res.id)]
|
||||
|
||||
if res.name is not None:
|
||||
name = _bytes_escape(res.name, quote='"')
|
||||
name = bytes_escape(res.name, quote='"')
|
||||
parts.append(f'"{name}"')
|
||||
|
||||
parts += attr_descs
|
||||
|
||||
restype = _bytes_escape(res.type, quote="'")
|
||||
restype = bytes_escape(res.type, quote="'")
|
||||
print(f"data '{restype}' ({', '.join(parts)}{attrs_comment}) {{")
|
||||
|
||||
for i in range(0, len(data), 16):
|
||||
@ -362,16 +362,16 @@ def _show_filtered_resources(resources: typing.Sequence[api.Resource], format: s
|
||||
else:
|
||||
raise ValueError(f"Unhandled output format: {format}")
|
||||
|
||||
def _list_resource_file(rf: api.ResourceFile, *, sort: bool, group: str, decompress: bool) -> None:
|
||||
def list_resource_file(rf: api.ResourceFile, *, sort: bool, group: str, decompress: bool) -> None:
|
||||
if rf.header_system_data != bytes(len(rf.header_system_data)):
|
||||
print("Header system data:")
|
||||
_hexdump(rf.header_system_data)
|
||||
hexdump(rf.header_system_data)
|
||||
|
||||
if rf.header_application_data != bytes(len(rf.header_application_data)):
|
||||
print("Header application data:")
|
||||
_hexdump(rf.header_application_data)
|
||||
hexdump(rf.header_application_data)
|
||||
|
||||
attrs = _decompose_flags(rf.file_attributes)
|
||||
attrs = decompose_flags(rf.file_attributes)
|
||||
if attrs:
|
||||
print("File attributes: " + " | ".join(attr.name for attr in attrs))
|
||||
|
||||
@ -387,20 +387,20 @@ def _list_resource_file(rf: api.ResourceFile, *, sort: bool, group: str, decompr
|
||||
all_resources.sort(key=lambda res: (res.type, res.id))
|
||||
print(f"{len(all_resources)} resources:")
|
||||
for res in all_resources:
|
||||
print(_describe_resource(res, include_type=True, decompress=decompress))
|
||||
print(describe_resource(res, include_type=True, decompress=decompress))
|
||||
elif group == "type":
|
||||
print(f"{len(rf)} resource types:")
|
||||
restype_items: typing.Collection[typing.Tuple[bytes, typing.Mapping[int, api.Resource]]] = rf.items()
|
||||
if sort:
|
||||
restype_items = sorted(restype_items, key=lambda item: item[0])
|
||||
for typecode, resources_map in restype_items:
|
||||
restype = _bytes_escape(typecode, quote="'")
|
||||
restype = bytes_escape(typecode, quote="'")
|
||||
print(f"'{restype}': {len(resources_map)} resources:")
|
||||
resources_items: typing.Collection[typing.Tuple[int, api.Resource]] = resources_map.items()
|
||||
if sort:
|
||||
resources_items = sorted(resources_items, key=lambda item: item[0])
|
||||
for resid, res in resources_items:
|
||||
print(_describe_resource(res, include_type=False, decompress=decompress))
|
||||
print(describe_resource(res, include_type=False, decompress=decompress))
|
||||
print()
|
||||
elif group == "id":
|
||||
all_resources = []
|
||||
@ -414,16 +414,16 @@ def _list_resource_file(rf: api.ResourceFile, *, sort: bool, group: str, decompr
|
||||
if sort:
|
||||
resources.sort(key=lambda res: res.type)
|
||||
for res in resources:
|
||||
print(_describe_resource(res, include_type=True, decompress=decompress))
|
||||
print(describe_resource(res, include_type=True, decompress=decompress))
|
||||
print()
|
||||
else:
|
||||
raise AssertionError(f"Unhandled group mode: {group!r}")
|
||||
|
||||
def main() -> typing.NoReturn:
|
||||
ns = _parse_args()
|
||||
ns = parse_args()
|
||||
|
||||
if ns.file == "-":
|
||||
if ns.fork is not None:
|
||||
if ns.fork != "auto":
|
||||
print("Cannot specify an explicit fork when reading from stdin", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
@ -438,10 +438,10 @@ def main() -> typing.NoReturn:
|
||||
else:
|
||||
data = rf.header_application_data
|
||||
|
||||
_show_header_data(data, format=ns.format)
|
||||
show_header_data(data, format=ns.format)
|
||||
elif ns.filter or ns.all:
|
||||
if ns.filter:
|
||||
resources = _filter_resources(rf, ns.filter)
|
||||
resources = filter_resources(rf, ns.filter)
|
||||
else:
|
||||
resources = []
|
||||
for reses in rf.values():
|
||||
@ -450,9 +450,9 @@ def main() -> typing.NoReturn:
|
||||
if ns.sort:
|
||||
resources.sort(key=lambda res: (res.type, res.id))
|
||||
|
||||
_show_filtered_resources(resources, format=ns.format, decompress=ns.decompress)
|
||||
show_filtered_resources(resources, format=ns.format, decompress=ns.decompress)
|
||||
else:
|
||||
_list_resource_file(rf, sort=ns.sort, group=ns.group, decompress=ns.decompress)
|
||||
list_resource_file(rf, sort=ns.sort, group=ns.group, decompress=ns.decompress)
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
@ -1,44 +1,63 @@
|
||||
import io
|
||||
import typing
|
||||
|
||||
from . import dcmp0
|
||||
from . import dcmp1
|
||||
from . import dcmp2
|
||||
|
||||
from .common import DecompressError, CompressedApplicationHeaderInfo, CompressedHeaderInfo, CompressedSystemHeaderInfo
|
||||
from .common import DecompressError, CompressedHeaderInfo
|
||||
|
||||
__all__ = [
|
||||
"CompressedHeaderInfo",
|
||||
"DecompressError",
|
||||
"decompress",
|
||||
"decompress_parsed",
|
||||
"decompress_stream",
|
||||
"decompress_stream_parsed",
|
||||
]
|
||||
|
||||
|
||||
# Maps 'dcmp' IDs to their corresponding Python implementations.
|
||||
# Each decompressor has the signature (header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes.
|
||||
# Each decompressor has the signature (header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes].
|
||||
DECOMPRESSORS = {
|
||||
0: dcmp0.decompress,
|
||||
1: dcmp1.decompress,
|
||||
2: dcmp2.decompress,
|
||||
0: dcmp0.decompress_stream,
|
||||
1: dcmp1.decompress_stream,
|
||||
2: dcmp2.decompress_stream,
|
||||
}
|
||||
|
||||
|
||||
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress the given compressed resource data, whose header has already been removed and parsed into a CompressedHeaderInfo object."""
|
||||
def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed resource data from a stream, whose header has already been read and parsed into a CompressedHeaderInfo object."""
|
||||
|
||||
try:
|
||||
decompress_func = DECOMPRESSORS[header_info.dcmp_id]
|
||||
except KeyError:
|
||||
raise DecompressError(f"Unsupported 'dcmp' ID: {header_info.dcmp_id}")
|
||||
|
||||
decompressed = decompress_func(header_info, data, debug=debug)
|
||||
if len(decompressed) != header_info.decompressed_length:
|
||||
raise DecompressError(f"Actual length of decompressed data ({len(decompressed)}) does not match length stored in resource ({header_info.decompressed_length})")
|
||||
return decompressed
|
||||
|
||||
|
||||
def decompress(data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress the given compressed resource data."""
|
||||
decompressed_length = 0
|
||||
for chunk in decompress_func(header_info, stream, debug=debug):
|
||||
decompressed_length += len(chunk)
|
||||
yield chunk
|
||||
|
||||
header_info = CompressedHeaderInfo.parse(data)
|
||||
if decompressed_length != header_info.decompressed_length:
|
||||
raise DecompressError(f"Actual length of decompressed data ({decompressed_length}) does not match length stored in resource ({header_info.decompressed_length})")
|
||||
|
||||
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress the given compressed resource data, whose header has already been removed and parsed into a CompressedHeaderInfo object."""
|
||||
|
||||
return b"".join(decompress_stream_parsed(header_info, io.BytesIO(data), debug=debug))
|
||||
|
||||
def decompress_stream(stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed resource data from a stream."""
|
||||
|
||||
header_info = CompressedHeaderInfo.parse_stream(stream)
|
||||
|
||||
if debug:
|
||||
print(f"Compressed resource data header: {header_info}")
|
||||
|
||||
return decompress_parsed(header_info, data[header_info.header_length:], debug=debug)
|
||||
yield from decompress_stream_parsed(header_info, stream, debug=debug)
|
||||
|
||||
def decompress(data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress the given compressed resource data."""
|
||||
|
||||
return b"".join(decompress_stream(io.BytesIO(data), debug=debug))
|
||||
|
@ -1,3 +1,4 @@
|
||||
import io
|
||||
import struct
|
||||
import typing
|
||||
|
||||
@ -8,37 +9,37 @@ class DecompressError(Exception):
|
||||
|
||||
# The signature of all compressed resource data, 0xa89f6572 in hex, or "®üer" in MacRoman.
|
||||
COMPRESSED_SIGNATURE = b"\xa8\x9fer"
|
||||
# The compression type commonly used for application resources.
|
||||
COMPRESSED_TYPE_APPLICATION = 0x0801
|
||||
# The compression type commonly used for System file resources.
|
||||
COMPRESSED_TYPE_SYSTEM = 0x0901
|
||||
# The number of the "type 8" compression type. This type is used in the Finder, ResEdit, and some other system files.
|
||||
COMPRESSED_TYPE_8 = 0x0801
|
||||
# The number of the "type 9" compression type. This type is used in the System file and System 7.5's Installer.
|
||||
COMPRESSED_TYPE_9 = 0x0901
|
||||
|
||||
# Common header for compressed resources of all types.
|
||||
# 4 bytes: Signature (see above).
|
||||
# 2 bytes: Length of the complete header (this common part and the type-specific part that follows it). (This meaning is just a guess - the field's value is always 0x0012, so there's no way to know for certain what it means.)
|
||||
# 2 bytes: Compression type. Known so far: 0x0901 is used in the System file's resources. 0x0801 is used in other files' resources.
|
||||
# 2 bytes: Compression type. Known so far: 0x0801 ("type 8") and 0x0901 ("type 9").
|
||||
# 4 bytes: Length of the data after decompression.
|
||||
# 6 bytes: Remainder of the header. The exact format varies depending on the compression type.
|
||||
STRUCT_COMPRESSED_HEADER = struct.Struct(">4sHHI6s")
|
||||
|
||||
# Remainder of header for an "application" compressed resource.
|
||||
# Remainder of header for a "type 8" compressed resource.
|
||||
# 1 byte: "Working buffer fractional size" - the ratio of the compressed data size to the uncompressed data size, times 256.
|
||||
# 1 byte: "Expansion buffer size" - the maximum number of bytes that the data might grow during decompression.
|
||||
# 2 bytes: The ID of the 'dcmp' resource that can decompress this resource. Currently only ID 0 is supported.
|
||||
# 2 bytes: Reserved (always zero).
|
||||
STRUCT_COMPRESSED_APPLICATION_HEADER = struct.Struct(">BBhH")
|
||||
STRUCT_COMPRESSED_TYPE_8_HEADER = struct.Struct(">BBhH")
|
||||
|
||||
# Remainder of header for a "system" compressed resource.
|
||||
# Remainder of header for a "type 9" compressed resource.
|
||||
# 2 bytes: The ID of the 'dcmp' resource that can decompress this resource. Currently only ID 2 is supported.
|
||||
# 4 bytes: Decompressor-specific parameters.
|
||||
STRUCT_COMPRESSED_SYSTEM_HEADER = struct.Struct(">h4s")
|
||||
STRUCT_COMPRESSED_TYPE_9_HEADER = struct.Struct(">h4s")
|
||||
|
||||
|
||||
class CompressedHeaderInfo(object):
|
||||
@classmethod
|
||||
def parse(cls, data: bytes) -> "CompressedHeaderInfo":
|
||||
def parse_stream(cls, stream: typing.BinaryIO) -> "CompressedHeaderInfo":
|
||||
try:
|
||||
signature, header_length, compression_type, decompressed_length, remainder = STRUCT_COMPRESSED_HEADER.unpack_from(data)
|
||||
signature, header_length, compression_type, decompressed_length, remainder = STRUCT_COMPRESSED_HEADER.unpack(stream.read(STRUCT_COMPRESSED_HEADER.size))
|
||||
except struct.error:
|
||||
raise DecompressError(f"Invalid header")
|
||||
if signature != COMPRESSED_SIGNATURE:
|
||||
@ -46,20 +47,24 @@ class CompressedHeaderInfo(object):
|
||||
if header_length != 0x12:
|
||||
raise DecompressError(f"Unsupported header length: 0x{header_length:>04x}, expected 0x12")
|
||||
|
||||
if compression_type == COMPRESSED_TYPE_APPLICATION:
|
||||
working_buffer_fractional_size, expansion_buffer_size, dcmp_id, reserved = STRUCT_COMPRESSED_APPLICATION_HEADER.unpack(remainder)
|
||||
if compression_type == COMPRESSED_TYPE_8:
|
||||
working_buffer_fractional_size, expansion_buffer_size, dcmp_id, reserved = STRUCT_COMPRESSED_TYPE_8_HEADER.unpack(remainder)
|
||||
|
||||
if reserved != 0:
|
||||
raise DecompressError(f"Reserved field should be 0, not 0x{reserved:>04x}")
|
||||
|
||||
return CompressedApplicationHeaderInfo(header_length, compression_type, decompressed_length, dcmp_id, working_buffer_fractional_size, expansion_buffer_size)
|
||||
elif compression_type == COMPRESSED_TYPE_SYSTEM:
|
||||
dcmp_id, parameters = STRUCT_COMPRESSED_SYSTEM_HEADER.unpack(remainder)
|
||||
return CompressedType8HeaderInfo(header_length, compression_type, decompressed_length, dcmp_id, working_buffer_fractional_size, expansion_buffer_size)
|
||||
elif compression_type == COMPRESSED_TYPE_9:
|
||||
dcmp_id, parameters = STRUCT_COMPRESSED_TYPE_9_HEADER.unpack(remainder)
|
||||
|
||||
return CompressedSystemHeaderInfo(header_length, compression_type, decompressed_length, dcmp_id, parameters)
|
||||
return CompressedType9HeaderInfo(header_length, compression_type, decompressed_length, dcmp_id, parameters)
|
||||
else:
|
||||
raise DecompressError(f"Unsupported compression type: 0x{compression_type:>04x}")
|
||||
|
||||
@classmethod
|
||||
def parse(cls, data: bytes) -> "CompressedHeaderInfo":
|
||||
return cls.parse_stream(io.BytesIO(data))
|
||||
|
||||
header_length: int
|
||||
compression_type: int
|
||||
decompressed_length: int
|
||||
@ -74,7 +79,7 @@ class CompressedHeaderInfo(object):
|
||||
self.dcmp_id = dcmp_id
|
||||
|
||||
|
||||
class CompressedApplicationHeaderInfo(CompressedHeaderInfo):
|
||||
class CompressedType8HeaderInfo(CompressedHeaderInfo):
|
||||
working_buffer_fractional_size: int
|
||||
expansion_buffer_size: int
|
||||
|
||||
@ -88,7 +93,7 @@ class CompressedApplicationHeaderInfo(CompressedHeaderInfo):
|
||||
return f"{type(self).__qualname__}(header_length={self.header_length}, compression_type=0x{self.compression_type:>04x}, decompressed_length={self.decompressed_length}, dcmp_id={self.dcmp_id}, working_buffer_fractional_size={self.working_buffer_fractional_size}, expansion_buffer_size={self.expansion_buffer_size})"
|
||||
|
||||
|
||||
class CompressedSystemHeaderInfo(CompressedHeaderInfo):
|
||||
class CompressedType9HeaderInfo(CompressedHeaderInfo):
|
||||
parameters: bytes
|
||||
|
||||
def __init__(self, header_length: int, compression_type: int, decompressed_length: int, dcmp_id: int, parameters: bytes) -> None:
|
||||
@ -100,19 +105,94 @@ class CompressedSystemHeaderInfo(CompressedHeaderInfo):
|
||||
return f"{type(self).__qualname__}(header_length={self.header_length}, compression_type=0x{self.compression_type:>04x}, decompressed_length={self.decompressed_length}, dcmp_id={self.dcmp_id}, parameters={self.parameters!r})"
|
||||
|
||||
|
||||
def _read_variable_length_integer(data: bytes, position: int) -> typing.Tuple[int, int]:
|
||||
"""Read a variable-length integer starting at the given position in the data, and return the integer as well as the number of bytes consumed.
|
||||
if typing.TYPE_CHECKING:
|
||||
class PeekableIO(typing.Protocol):
|
||||
"""Minimal protocol for binary IO streams that support the peek method.
|
||||
|
||||
The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable.
|
||||
"""
|
||||
|
||||
def readable(self) -> bool: ...
|
||||
def read(self, size: typing.Optional[int] = ...) -> bytes: ...
|
||||
def peek(self, size: int = ...) -> bytes: ...
|
||||
|
||||
|
||||
class _PeekableIOWrapper(object):
|
||||
"""Wrapper class to add peek support to an existing stream. Do not instantiate this class directly, use the make_peekable function instead.
|
||||
|
||||
Python provides a standard io.BufferedReader class, which supports the peek method. However, according to its documentation, it only supports wrapping io.RawIOBase subclasses, and not streams which are already otherwise buffered.
|
||||
|
||||
Warning: this class does not perform any buffering of its own, outside of what is required to make peek work. It is strongly recommended to only wrap streams that are already buffered or otherwise fast to read from. In particular, raw streams (io.RawIOBase subclasses) should be wrapped using io.BufferedReader instead.
|
||||
"""
|
||||
|
||||
_wrapped: typing.BinaryIO
|
||||
_readahead: bytes
|
||||
|
||||
def __init__(self, wrapped: typing.BinaryIO) -> None:
|
||||
super().__init__()
|
||||
|
||||
self._wrapped = wrapped
|
||||
self._readahead = b""
|
||||
|
||||
def readable(self) -> bool:
|
||||
return self._wrapped.readable()
|
||||
|
||||
def read(self, size: typing.Optional[int] = None) -> bytes:
|
||||
if size is None or size < 0:
|
||||
ret = self._readahead + self._wrapped.read()
|
||||
self._readahead = b""
|
||||
elif size <= len(self._readahead):
|
||||
ret = self._readahead[:size]
|
||||
self._readahead = self._readahead[size:]
|
||||
else:
|
||||
ret = self._readahead + self._wrapped.read(size - len(self._readahead))
|
||||
self._readahead = b""
|
||||
|
||||
return ret
|
||||
|
||||
def peek(self, size: int = -1) -> bytes:
|
||||
if not self._readahead:
|
||||
self._readahead = self._wrapped.read(io.DEFAULT_BUFFER_SIZE if size < 0 else size)
|
||||
return self._readahead
|
||||
|
||||
|
||||
def make_peekable(stream: typing.BinaryIO) -> "PeekableIO":
|
||||
"""Wrap an arbitrary binary IO stream so that it supports the peek method.
|
||||
|
||||
The stream is wrapped as efficiently as possible (or not at all if it already supports the peek method). However, in the worst case a custom wrapper class needs to be used, which may not be particularly efficient and only supports a very minimal interface. The only methods that are guaranteed to exist on the returned stream are readable, read, and peek.
|
||||
"""
|
||||
|
||||
if hasattr(stream, "peek"):
|
||||
# Stream is already peekable, nothing to be done.
|
||||
return typing.cast("PeekableIO", stream)
|
||||
elif isinstance(stream, io.RawIOBase):
|
||||
# Raw IO streams can be wrapped efficiently using BufferedReader.
|
||||
return io.BufferedReader(stream)
|
||||
else:
|
||||
# Other streams need to be wrapped using our custom wrapper class.
|
||||
return _PeekableIOWrapper(stream)
|
||||
|
||||
|
||||
def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes:
|
||||
"""Read byte_count bytes from the stream and raise an exception if too few bytes are read (i. e. if EOF was hit prematurely)."""
|
||||
|
||||
data = stream.read(byte_count)
|
||||
if len(data) != byte_count:
|
||||
raise DecompressError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes")
|
||||
return data
|
||||
|
||||
def read_variable_length_integer(stream: typing.BinaryIO) -> int:
|
||||
"""Read a variable-length integer from the stream.
|
||||
|
||||
This variable-length integer format is used by the 0xfe codes in the compression formats used by 'dcmp' (0) and 'dcmp' (1).
|
||||
"""
|
||||
|
||||
assert len(data) > position
|
||||
if data[position] == 0xff:
|
||||
assert len(data) > position + 4
|
||||
return int.from_bytes(data[position+1:position+5], "big", signed=True), 5
|
||||
elif data[position] >= 0x80:
|
||||
assert len(data) > position + 1
|
||||
data_modified = bytes([(data[position] - 0xc0) & 0xff, data[position+1]])
|
||||
return int.from_bytes(data_modified, "big", signed=True), 2
|
||||
head = read_exact(stream, 1)
|
||||
|
||||
if head[0] == 0xff:
|
||||
return int.from_bytes(read_exact(stream, 4), "big", signed=True)
|
||||
elif head[0] >= 0x80:
|
||||
data_modified = bytes([(head[0] - 0xc0) & 0xff]) + read_exact(stream, 1)
|
||||
return int.from_bytes(data_modified, "big", signed=True)
|
||||
else:
|
||||
return int.from_bytes(data[position:position+1], "big", signed=True), 1
|
||||
return int.from_bytes(head, "big", signed=True)
|
||||
|
@ -1,3 +1,4 @@
|
||||
import io
|
||||
import typing
|
||||
|
||||
from . import common
|
||||
@ -38,97 +39,73 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
|
||||
assert len(TABLE) == len(range(0x4b, 0xfe))
|
||||
|
||||
|
||||
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (0)."""
|
||||
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedApplicationHeaderInfo):
|
||||
if not isinstance(header_info, common.CompressedType8HeaderInfo):
|
||||
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
|
||||
|
||||
prev_literals: typing.List[bytes] = []
|
||||
decompressed = b""
|
||||
|
||||
i = 0
|
||||
|
||||
while i < len(data):
|
||||
byte = data[i]
|
||||
while True: # Loop is terminated when the EOF marker (0xff) is encountered
|
||||
(byte,) = common.read_exact(stream, 1)
|
||||
if debug:
|
||||
print(f"Tag byte 0x{byte:>02x}, at 0x{i:x}, decompressing to 0x{len(decompressed):x}")
|
||||
print(f"Tag byte 0x{byte:>02x}")
|
||||
|
||||
if byte in range(0x00, 0x20):
|
||||
# Literal byte sequence.
|
||||
if byte in (0x00, 0x10):
|
||||
# The length of the literal data is stored in the next byte.
|
||||
count_div2 = data[i+1]
|
||||
begin = i + 2
|
||||
(count_div2,) = common.read_exact(stream, 1)
|
||||
else:
|
||||
# The length of the literal data is stored in the low nibble of the tag byte.
|
||||
count_div2 = byte >> 0 & 0xf
|
||||
begin = i + 1
|
||||
end = begin + 2*count_div2
|
||||
count = 2 * count_div2
|
||||
# Controls whether or not the literal is stored so that it can be referenced again later.
|
||||
do_store = byte >= 0x10
|
||||
literal = data[begin:end]
|
||||
literal = common.read_exact(stream, count)
|
||||
if debug:
|
||||
print(f"Literal (storing: {do_store})")
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
if do_store:
|
||||
if debug:
|
||||
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
|
||||
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
|
||||
prev_literals.append(literal)
|
||||
i = end
|
||||
yield literal
|
||||
elif byte in (0x20, 0x21):
|
||||
# Backreference to a previous literal, 2-byte form.
|
||||
# This can reference literals with index in range(0x28, 0x228).
|
||||
table_index = 0x28 + ((byte - 0x20) << 8 | data[i+1])
|
||||
i += 2
|
||||
(next_byte,) = common.read_exact(stream, 1)
|
||||
table_index = 0x28 + ((byte - 0x20) << 8 | next_byte)
|
||||
if debug:
|
||||
print(f"Backreference (2-byte form) to 0x{table_index:>02x}")
|
||||
literal = prev_literals[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
yield prev_literals[table_index]
|
||||
elif byte == 0x22:
|
||||
# Backreference to a previous literal, 3-byte form.
|
||||
# This can reference any literal with index 0x28 and higher, but is only necessary for literals with index 0x228 and higher.
|
||||
table_index = 0x28 + int.from_bytes(data[i+1:i+3], "big", signed=False)
|
||||
i += 3
|
||||
table_index = 0x28 + int.from_bytes(common.read_exact(stream, 2), "big", signed=False)
|
||||
if debug:
|
||||
print(f"Backreference (3-byte form) to 0x{table_index:>02x}")
|
||||
literal = prev_literals[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
yield prev_literals[table_index]
|
||||
elif byte in range(0x23, 0x4b):
|
||||
# Backreference to a previous literal, 1-byte form.
|
||||
# This can reference literals with indices in range(0x28).
|
||||
table_index = byte - 0x23
|
||||
i += 1
|
||||
if debug:
|
||||
print(f"Backreference (1-byte form) to 0x{table_index:>02x}")
|
||||
literal = prev_literals[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
yield prev_literals[table_index]
|
||||
elif byte in range(0x4b, 0xfe):
|
||||
# Reference into a fixed table of two-byte literals.
|
||||
# All compressed resources use the same table.
|
||||
table_index = byte - 0x4b
|
||||
i += 1
|
||||
if debug:
|
||||
print(f"Fixed table reference to 0x{table_index:>02x}")
|
||||
entry = TABLE[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {entry}")
|
||||
decompressed += entry
|
||||
yield TABLE[table_index]
|
||||
elif byte == 0xfe:
|
||||
# Extended code, whose meaning is controlled by the following byte.
|
||||
|
||||
i += 1
|
||||
kind = data[i]
|
||||
(kind,) = common.read_exact(stream, 1)
|
||||
if debug:
|
||||
print(f"Extended code: 0x{kind:>02x}")
|
||||
i += 1
|
||||
|
||||
if kind == 0x00:
|
||||
# Compact representation of (part of) a segment loader jump table, as used in 'CODE' (0) resources.
|
||||
@ -137,37 +114,28 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
print(f"Segment loader jump table entries")
|
||||
|
||||
# All generated jump table entries have the same segment number.
|
||||
segment_number_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
segment_number_int = common.read_variable_length_integer(stream)
|
||||
if debug:
|
||||
print(f"\t-> segment number: {segment_number_int:#x}")
|
||||
|
||||
# The tail part of all jump table entries (i. e. everything except for the address).
|
||||
entry_tail = b"?<" + segment_number_int.to_bytes(2, "big", signed=True) + b"\xa9\xf0"
|
||||
if debug:
|
||||
print(f"\t-> tail of first entry: {entry_tail}")
|
||||
# The tail is output once *without* an address in front, i. e. the first entry's address must be generated manually by a previous code.
|
||||
decompressed += entry_tail
|
||||
yield entry_tail
|
||||
|
||||
count, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
count = common.read_variable_length_integer(stream)
|
||||
if count <= 0:
|
||||
raise common.DecompressError(f"Jump table entry count must be greater than 0, not {count}")
|
||||
|
||||
# The second entry's address is stored explicitly.
|
||||
current_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
current_int = common.read_variable_length_integer(stream)
|
||||
if debug:
|
||||
print(f"-> address of second entry: {current_int:#x}")
|
||||
entry = current_int.to_bytes(2, "big", signed=False) + entry_tail
|
||||
if debug:
|
||||
print(f"-> second entry: {entry}")
|
||||
decompressed += entry
|
||||
print(f"\t-> address of second entry: {current_int:#x}")
|
||||
yield current_int.to_bytes(2, "big", signed=False) + entry_tail
|
||||
|
||||
for _ in range(1, count):
|
||||
# All further entries' addresses are stored as differences relative to the previous entry's address.
|
||||
diff, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
diff = common.read_variable_length_integer(stream)
|
||||
# For some reason, each difference is 6 higher than it should be.
|
||||
diff -= 6
|
||||
|
||||
@ -175,10 +143,7 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
current_int = (current_int + diff) & 0xffff
|
||||
if debug:
|
||||
print(f"\t-> difference {diff:#x}: {current_int:#x}")
|
||||
entry = current_int.to_bytes(2, "big", signed=False) + entry_tail
|
||||
if debug:
|
||||
print(f"\t-> {entry}")
|
||||
decompressed += entry
|
||||
yield current_int.to_bytes(2, "big", signed=False) + entry_tail
|
||||
elif kind in (0x02, 0x03):
|
||||
# Repeat 1 or 2 bytes a certain number of times.
|
||||
|
||||
@ -193,23 +158,19 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
print(f"Repeat {byte_count}-byte value")
|
||||
|
||||
# The byte(s) to repeat, stored as a variable-length integer. The value is treated as unsigned, i. e. the integer is never negative.
|
||||
to_repeat_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
to_repeat_int = common.read_variable_length_integer(stream)
|
||||
try:
|
||||
to_repeat = to_repeat_int.to_bytes(byte_count, "big", signed=False)
|
||||
except OverflowError:
|
||||
raise common.DecompressError(f"Value to repeat out of range for {byte_count}-byte repeat: {to_repeat_int:#x}")
|
||||
|
||||
count_m1, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
count = count_m1 + 1
|
||||
count = common.read_variable_length_integer(stream) + 1
|
||||
if count <= 0:
|
||||
raise common.DecompressError(f"Repeat count must be positive: {count}")
|
||||
|
||||
repeated = to_repeat * count
|
||||
if debug:
|
||||
print(f"\t-> {to_repeat} * {count}: {repeated}")
|
||||
decompressed += repeated
|
||||
print(f"\t-> {to_repeat} * {count}")
|
||||
yield to_repeat * count
|
||||
elif kind == 0x04:
|
||||
# A sequence of 16-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
|
||||
|
||||
@ -217,18 +178,16 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
print(f"Difference-encoded 16-bit integers")
|
||||
|
||||
# The first integer is stored explicitly, as a signed value.
|
||||
initial_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
initial_int = common.read_variable_length_integer(stream)
|
||||
try:
|
||||
initial = initial_int.to_bytes(2, "big", signed=True)
|
||||
except OverflowError:
|
||||
raise common.DecompressError(f"Initial value out of range for 16-bit integer difference encoding: {initial_int:#x}")
|
||||
if debug:
|
||||
print(f"\t-> initial: {initial}")
|
||||
decompressed += initial
|
||||
print(f"\t-> initial: 0x{initial_int:>04x}")
|
||||
yield initial
|
||||
|
||||
count, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
count = common.read_variable_length_integer(stream)
|
||||
if count < 0:
|
||||
raise common.DecompressError(f"Count cannot be negative: {count}")
|
||||
|
||||
@ -237,64 +196,74 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
for _ in range(count):
|
||||
# The difference to the previous integer is stored as an 8-bit signed integer.
|
||||
# The usual variable-length integer format is *not* used here.
|
||||
diff = int.from_bytes(data[i:i+1], "big", signed=True)
|
||||
i += 1
|
||||
diff = int.from_bytes(common.read_exact(stream, 1), "big", signed=True)
|
||||
|
||||
# Simulate 16-bit integer wraparound.
|
||||
current_int = (current_int + diff) & 0xffff
|
||||
current = current_int.to_bytes(2, "big", signed=False)
|
||||
if debug:
|
||||
print(f"\t-> difference {diff:#x}: {current}")
|
||||
decompressed += current
|
||||
print(f"\t-> difference {diff:#x}: 0x{current_int:>04x}")
|
||||
yield current_int.to_bytes(2, "big", signed=False)
|
||||
elif kind == 0x06:
|
||||
# A sequence of 32-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
|
||||
|
||||
if debug:
|
||||
print(f"Difference-encoded 16-bit integers")
|
||||
print(f"Difference-encoded 32-bit integers")
|
||||
|
||||
# The first integer is stored explicitly, as a signed value.
|
||||
initial_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
initial_int = common.read_variable_length_integer(stream)
|
||||
try:
|
||||
initial = initial_int.to_bytes(4, "big", signed=True)
|
||||
except OverflowError:
|
||||
raise common.DecompressError(f"Initial value out of range for 32-bit integer difference encoding: {initial_int:#x}")
|
||||
if debug:
|
||||
print(f"\t-> initial: {initial}")
|
||||
decompressed += initial
|
||||
print(f"\t-> initial: 0x{initial_int:>08x}")
|
||||
yield initial
|
||||
|
||||
count, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
count = common.read_variable_length_integer(stream)
|
||||
assert count >= 0
|
||||
|
||||
# To make the following calculations simpler, the signed initial_int value is converted to unsigned.
|
||||
current_int = initial_int & 0xffffffff
|
||||
for _ in range(count):
|
||||
# The difference to the previous integer is stored as a variable-length integer, whose value may be negative.
|
||||
diff, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
diff = common.read_variable_length_integer(stream)
|
||||
|
||||
# Simulate 32-bit integer wraparound.
|
||||
current_int = (current_int + diff) & 0xffffffff
|
||||
current = current_int.to_bytes(4, "big", signed=False)
|
||||
if debug:
|
||||
print(f"\t-> difference {diff:#x}: {current}")
|
||||
decompressed += current
|
||||
print(f"\t-> difference {diff:#x}: 0x{current_int:>08x}")
|
||||
yield current_int.to_bytes(4, "big", signed=False)
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
|
||||
elif byte == 0xff:
|
||||
# End of data marker, always occurs exactly once as the last byte of the compressed data.
|
||||
if debug:
|
||||
print("End marker")
|
||||
if i != len(data) - 1:
|
||||
raise common.DecompressError(f"End marker reached at {i}, before the expected end of data at {len(data) - 1}")
|
||||
i += 1
|
||||
|
||||
# Check that there really is no more data left.
|
||||
extra = stream.read(1)
|
||||
if extra:
|
||||
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra})")
|
||||
break
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown tag byte: 0x{data[i]:>02x}")
|
||||
|
||||
if header_info.decompressed_length % 2 != 0 and len(decompressed) == header_info.decompressed_length + 1:
|
||||
# Special case: if the decompressed data length stored in the header is odd and one less than the length of the actual decompressed data, drop the last byte.
|
||||
# This is necessary because nearly all codes generate data in groups of 2 or 4 bytes, so it is basically impossible to represent data with an odd length using this compression format.
|
||||
decompressed = decompressed[:-1]
|
||||
|
||||
return decompressed
|
||||
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
|
||||
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (0)."""
|
||||
|
||||
decompressed_length = 0
|
||||
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
|
||||
if debug:
|
||||
print(f"\t-> {chunk}")
|
||||
|
||||
if header_info.decompressed_length % 2 != 0 and decompressed_length + len(chunk) == header_info.decompressed_length + 1:
|
||||
# Special case: if the decompressed data length stored in the header is odd and one less than the length of the actual decompressed data, drop the last byte.
|
||||
# This is necessary because nearly all codes generate data in groups of 2 or 4 bytes, so it is basically impossible to represent data with an odd length using this compression format.
|
||||
decompressed_length += len(chunk) - 1
|
||||
yield chunk[:-1]
|
||||
else:
|
||||
decompressed_length += len(chunk)
|
||||
yield chunk
|
||||
|
||||
if debug:
|
||||
print(f"Decompressed {decompressed_length:#x} bytes so far")
|
||||
|
@ -1,3 +1,4 @@
|
||||
import io
|
||||
import typing
|
||||
|
||||
from . import common
|
||||
@ -21,99 +22,75 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
|
||||
assert len(TABLE) == len(range(0xd5, 0xfe))
|
||||
|
||||
|
||||
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (1)."""
|
||||
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedApplicationHeaderInfo):
|
||||
if not isinstance(header_info, common.CompressedType8HeaderInfo):
|
||||
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
|
||||
|
||||
prev_literals: typing.List[bytes] = []
|
||||
decompressed = b""
|
||||
|
||||
i = 0
|
||||
|
||||
while i < len(data):
|
||||
byte = data[i]
|
||||
while True: # Loop is terminated when the EOF marker (0xff) is encountered
|
||||
(byte,) = common.read_exact(stream, 1)
|
||||
if debug:
|
||||
print(f"Tag byte 0x{byte:>02x}, at 0x{i:x}, decompressing to 0x{len(decompressed):x}")
|
||||
print(f"Tag byte 0x{byte:>02x}")
|
||||
|
||||
if byte in range(0x00, 0x20):
|
||||
# Literal byte sequence, 1-byte header.
|
||||
# The length of the literal data is stored in the low nibble of the tag byte.
|
||||
count = (byte >> 0 & 0xf) + 1
|
||||
begin = i + 1
|
||||
end = begin + count
|
||||
# Controls whether or not the literal is stored so that it can be referenced again later.
|
||||
do_store = byte >= 0x10
|
||||
literal = data[begin:end]
|
||||
literal = common.read_exact(stream, count)
|
||||
if debug:
|
||||
print(f"Literal (1-byte header, storing: {do_store})")
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
if do_store:
|
||||
if debug:
|
||||
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
|
||||
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
|
||||
prev_literals.append(literal)
|
||||
i = end
|
||||
yield literal
|
||||
elif byte in range(0x20, 0xd0):
|
||||
# Backreference to a previous literal, 1-byte form.
|
||||
# This can reference literals with indices in range(0xb0).
|
||||
table_index = byte - 0x20
|
||||
i += 1
|
||||
if debug:
|
||||
print(f"Backreference (1-byte form) to 0x{table_index:>02x}")
|
||||
literal = prev_literals[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
yield prev_literals[table_index]
|
||||
elif byte in (0xd0, 0xd1):
|
||||
# Literal byte sequence, 2-byte header.
|
||||
# The length of the literal data is stored in the following byte.
|
||||
count = data[i+1]
|
||||
begin = i + 2
|
||||
end = begin + count
|
||||
(count,) = common.read_exact(stream, 1)
|
||||
# Controls whether or not the literal is stored so that it can be referenced again later.
|
||||
do_store = byte == 0xd1
|
||||
literal = data[begin:end]
|
||||
literal = common.read_exact(stream, count)
|
||||
if debug:
|
||||
print(f"Literal (2-byte header, storing: {do_store})")
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
if do_store:
|
||||
if debug:
|
||||
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
|
||||
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
|
||||
prev_literals.append(literal)
|
||||
i = end
|
||||
yield literal
|
||||
elif byte == 0xd2:
|
||||
# Backreference to a previous literal, 2-byte form.
|
||||
# This can reference literals with indices in range(0xb0, 0x1b0).
|
||||
table_index = data[i+1] + 0xb0
|
||||
i += 2
|
||||
(next_byte,) = common.read_exact(stream, 1)
|
||||
table_index = next_byte + 0xb0
|
||||
if debug:
|
||||
print(f"Backreference (2-byte form) to 0x{table_index:>02x}")
|
||||
literal = prev_literals[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
yield prev_literals[table_index]
|
||||
elif byte in range(0xd5, 0xfe):
|
||||
# Reference into a fixed table of two-byte literals.
|
||||
# All compressed resources use the same table.
|
||||
table_index = byte - 0xd5
|
||||
i += 1
|
||||
if debug:
|
||||
print(f"Fixed table reference to 0x{table_index:>02x}")
|
||||
entry = TABLE[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {entry}")
|
||||
decompressed += entry
|
||||
yield TABLE[table_index]
|
||||
elif byte == 0xfe:
|
||||
# Extended code, whose meaning is controlled by the following byte.
|
||||
|
||||
i += 1
|
||||
kind = data[i]
|
||||
(kind,) = common.read_exact(stream, 1)
|
||||
if debug:
|
||||
print(f"Extended code: 0x{kind:>02x}")
|
||||
i += 1
|
||||
|
||||
if kind == 0x02:
|
||||
# Repeat 1 byte a certain number of times.
|
||||
@ -124,33 +101,44 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
print(f"Repeat {byte_count}-byte value")
|
||||
|
||||
# The byte(s) to repeat, stored as a variable-length integer. The value is treated as unsigned, i. e. the integer is never negative.
|
||||
to_repeat_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
to_repeat_int = common.read_variable_length_integer(stream)
|
||||
try:
|
||||
to_repeat = to_repeat_int.to_bytes(byte_count, "big", signed=False)
|
||||
except OverflowError:
|
||||
raise common.DecompressError(f"Value to repeat out of range for {byte_count}-byte repeat: {to_repeat_int:#x}")
|
||||
|
||||
count_m1, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
count = count_m1 + 1
|
||||
count = common.read_variable_length_integer(stream) + 1
|
||||
if count <= 0:
|
||||
raise common.DecompressError(f"Repeat count must be positive: {count}")
|
||||
|
||||
repeated = to_repeat * count
|
||||
if debug:
|
||||
print(f"\t-> {to_repeat} * {count}: {repeated}")
|
||||
decompressed += repeated
|
||||
print(f"\t-> {to_repeat} * {count}")
|
||||
yield to_repeat * count
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
|
||||
elif byte == 0xff:
|
||||
# End of data marker, always occurs exactly once as the last byte of the compressed data.
|
||||
if debug:
|
||||
print("End marker")
|
||||
if i != len(data) - 1:
|
||||
raise common.DecompressError(f"End marker reached at {i}, before the expected end of data at {len(data) - 1}")
|
||||
i += 1
|
||||
|
||||
# Check that there really is no more data left.
|
||||
extra = stream.read(1)
|
||||
if extra:
|
||||
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra})")
|
||||
break
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown tag byte: 0x{data[i]:>02x}")
|
||||
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
|
||||
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (1)."""
|
||||
|
||||
return decompressed
|
||||
decompressed_length = 0
|
||||
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
|
||||
if debug:
|
||||
print(f"\t-> {chunk}")
|
||||
|
||||
decompressed_length += len(chunk)
|
||||
yield chunk
|
||||
|
||||
if debug:
|
||||
print(f"Decompressed {decompressed_length:#x} bytes so far")
|
||||
|
@ -1,4 +1,5 @@
|
||||
import enum
|
||||
import io
|
||||
import struct
|
||||
import typing
|
||||
|
||||
@ -73,68 +74,69 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool
|
||||
)
|
||||
|
||||
|
||||
def _decompress_system_untagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
|
||||
parts = []
|
||||
i = 0
|
||||
while i < len(data):
|
||||
if i == len(data) - 1 and decompressed_length % 2 != 0:
|
||||
def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
while True: # Loop is terminated when EOF is reached.
|
||||
table_index_data = stream.read(1)
|
||||
if not table_index_data:
|
||||
# End of compressed data.
|
||||
break
|
||||
elif not stream.peek(1) and decompressed_length % 2 != 0:
|
||||
# Special case: if we are at the last byte of the compressed data, and the decompressed data has an odd length, the last byte is a single literal byte, and not a table reference.
|
||||
if debug:
|
||||
print(f"Last byte: {data[-1:]}")
|
||||
parts.append(data[-1:])
|
||||
print(f"Last byte: {table_index_data}")
|
||||
yield table_index_data
|
||||
break
|
||||
|
||||
# Compressed data is untagged, every byte is a table reference.
|
||||
(table_index,) = table_index_data
|
||||
if debug:
|
||||
print(f"Reference: {data[i]} -> {table[data[i]]}")
|
||||
parts.append(table[data[i]])
|
||||
i += 1
|
||||
|
||||
return b"".join(parts)
|
||||
print(f"Reference: {table_index} -> {table[table_index]}")
|
||||
yield table[table_index]
|
||||
|
||||
def _decompress_system_tagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
|
||||
parts = []
|
||||
i = 0
|
||||
while i < len(data):
|
||||
if i == len(data) - 1 and decompressed_length % 2 != 0:
|
||||
def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
while True: # Loop is terminated when EOF is reached.
|
||||
tag_data = stream.read(1)
|
||||
if not tag_data:
|
||||
# End of compressed data.
|
||||
break
|
||||
elif not stream.peek(1) and decompressed_length % 2 != 0:
|
||||
# Special case: if we are at the last byte of the compressed data, and the decompressed data has an odd length, the last byte is a single literal byte, and not a tag or a table reference.
|
||||
if debug:
|
||||
print(f"Last byte: {data[-1:]}")
|
||||
parts.append(data[-1:])
|
||||
print(f"Last byte: {tag_data}")
|
||||
yield tag_data
|
||||
break
|
||||
|
||||
# Compressed data is tagged, each tag byte is followed by 8 table references and/or literals.
|
||||
tag = data[i]
|
||||
(tag,) = tag_data
|
||||
if debug:
|
||||
print(f"Tag: 0b{tag:>08b}")
|
||||
i += 1
|
||||
for is_ref in _split_bits(tag):
|
||||
if is_ref:
|
||||
# This is a table reference (a single byte that is an index into the table).
|
||||
table_index_data = stream.read(1)
|
||||
if not table_index_data:
|
||||
# End of compressed data.
|
||||
break
|
||||
(table_index,) = table_index_data
|
||||
if debug:
|
||||
print(f"Reference: {data[i]} -> {table[data[i]]}")
|
||||
parts.append(table[data[i]])
|
||||
i += 1
|
||||
print(f"Reference: {table_index} -> {table[table_index]}")
|
||||
yield table[table_index]
|
||||
else:
|
||||
# This is a literal (two uncompressed bytes that are literally copied into the output).
|
||||
# Note: if i == len(data)-1, the literal is actually only a single byte long.
|
||||
# This case is handled automatically - the slice extends one byte past the end of the data, and only one byte is returned.
|
||||
literal = stream.read(2)
|
||||
if not literal:
|
||||
# End of compressed data.
|
||||
break
|
||||
# Note: the literal may be only a single byte long if it is located exactly at EOF. This is intended and expected - the 1-byte literal is yielded normally, and on the next iteration, decompression is terminated as EOF is detected.
|
||||
if debug:
|
||||
print(f"Literal: {data[i:i+2]}")
|
||||
parts.append(data[i:i + 2])
|
||||
i += 2
|
||||
|
||||
# If the end of the compressed data is reached in the middle of a chunk, all further tag bits are ignored (they should be zero) and decompression ends.
|
||||
if i >= len(data):
|
||||
break
|
||||
|
||||
return b"".join(parts)
|
||||
print(f"Literal: {literal}")
|
||||
yield literal
|
||||
|
||||
|
||||
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (2)."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedSystemHeaderInfo):
|
||||
if not isinstance(header_info, common.CompressedType9HeaderInfo):
|
||||
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
|
||||
|
||||
unknown, table_count_m1, flags_raw = STRUCT_PARAMETERS.unpack(header_info.parameters)
|
||||
@ -155,24 +157,21 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
print(f"Flags: {flags}")
|
||||
|
||||
if ParameterFlags.CUSTOM_TABLE in flags:
|
||||
table_start = 0
|
||||
data_start = table_start + table_count * 2
|
||||
table = []
|
||||
for i in range(table_start, data_start, 2):
|
||||
table.append(data[i:i + 2])
|
||||
for _ in range(table_count):
|
||||
table.append(common.read_exact(stream, 2))
|
||||
if debug:
|
||||
print(f"Using custom table: {table}")
|
||||
else:
|
||||
if table_count_m1 != 0:
|
||||
raise common.DecompressError(f"table_count_m1 field is {table_count_m1}, but must be zero when the default table is used")
|
||||
table = DEFAULT_TABLE
|
||||
data_start = 0
|
||||
if debug:
|
||||
print("Using default table")
|
||||
|
||||
if ParameterFlags.TAGGED in flags:
|
||||
decompress_func = _decompress_system_tagged
|
||||
decompress_func = _decompress_tagged
|
||||
else:
|
||||
decompress_func = _decompress_system_untagged
|
||||
decompress_func = _decompress_untagged
|
||||
|
||||
return decompress_func(data[data_start:], header_info.decompressed_length, table, debug=debug)
|
||||
yield from decompress_func(common.make_peekable(stream), header_info.decompressed_length, table, debug=debug)
|
||||
|
Reference in New Issue
Block a user