From 360833f9404278251ab34222d77ae03c8846c4c5 Mon Sep 17 00:00:00 2001 From: dgelessus Date: Sun, 14 Jul 2019 21:41:14 +0200 Subject: [PATCH] Refactor rsrcfork.compress to prepare for new compression type support --- rsrcfork/compress.py | 79 +++++++++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/rsrcfork/compress.py b/rsrcfork/compress.py index eff07e2..aad278a 100644 --- a/rsrcfork/compress.py +++ b/rsrcfork/compress.py @@ -9,21 +9,26 @@ __all__ = [ # The signature of all compressed resource data, 0xa89f6572 in hex, or "®üer" in MacRoman. COMPRESSED_SIGNATURE = b"\xa8\x9fer" +# The compression type commonly used for application resources. +COMPRESSED_TYPE_APPLICATION = 0x0801 # The compression type commonly used for System file resources. COMPRESSED_TYPE_SYSTEM = 0x0901 -# Header for a compressed resource. +# Common header for compressed resources of all types. # 4 bytes: Signature (see above). -# 2 bytes: Length of the header. (This meaning is just a guess - the field's value is always 0x0012, so there's no way to know for certain what it means.) -# 2 bytes: Compression type. Known so far: 0x0901 is used in the System file's resources. 0x0801 is used in other files' resources. Currently only the first type is supported. +# 2 bytes: Length of the complete header (this common part and the type-specific part that follows it). (This meaning is just a guess - the field's value is always 0x0012, so there's no way to know for certain what it means.) +# 2 bytes: Compression type. Known so far: 0x0901 is used in the System file's resources. 0x0801 is used in other files' resources. +STRUCT_COMPRESSED_HEADER = struct.Struct(">4sHH") + +# Header continuation part for a "system" compressed resource. # 4 bytes: Length of the data after decompression. # 2 bytes: The ID of the 'dcmp' resource that can decompress this resource. Currently only ID 2 is supported. # 2 bytes: Unknown meaning, doesn't appear to have any effect on the decompression algorithm. Usually zero, sometimes set to a small integer (< 10). On 'lpch' resources, the value is always nonzero, and sometimes larger than usual. # 1 byte: Number of entries in the custom lookup table minus one. Set to zero if the default lookup table is used. -# 1 byte: Flags. See the CompressedFlags enum below for details. -STRUCT_COMPRESSED_HEADER = struct.Struct(">4sHHIhHBB") +# 1 byte: Flags. See the CompressedSystemFlags enum below for details. +STRUCT_COMPRESSED_SYSTEM_HEADER = struct.Struct(">IhHBB") -# Default lookup table for compressed resources. +# Default lookup table for "system" compressed resources. # If the custom table flag is set, a custom table (usually with fewer than 256 entries) is used instead of this one. # This table was obtained by decompressing a manually created compressed resource that refers to every possible table entry. Detailed steps: # 1. Create a file with a resource fork @@ -69,7 +74,7 @@ COMPRESSED_DEFAULT_TABLE_DATA = ( COMPRESSED_DEFAULT_TABLE = [COMPRESSED_DEFAULT_TABLE_DATA[i:i + 2] for i in range(0, len(COMPRESSED_DEFAULT_TABLE_DATA), 2)] -class CompressedFlags(enum.Flag): +class CompressedSystemFlags(enum.Flag): TAGGED = 1 << 1 # The compressed data is tagged, meaning that it consists of "blocks" of a tag byte followed by 8 table references and/or literals. See comments in the decompress function for details. CUSTOM_TABLE = 1 << 0 # A custom lookup table is included before the compressed data, which is used instead of the default table. @@ -94,7 +99,11 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool ) -def _decompress_untagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes: +def _decompress_application(data: bytes, *, debug: bool=False) -> bytes: + raise DecompressError('"Application" compression type not supported yet') + + +def _decompress_system_untagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes: parts = [] i = 0 while i < len(data): @@ -113,7 +122,7 @@ def _decompress_untagged(data: bytes, decompressed_length: int, table: typing.Se return b"".join(parts) -def _decompress_tagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes: +def _decompress_system_tagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes: parts = [] i = 0 while i < len(data): @@ -151,20 +160,8 @@ def _decompress_tagged(data: bytes, decompressed_length: int, table: typing.Sequ return b"".join(parts) - -def decompress(data: bytes, *, debug: bool=False) -> bytes: - """Decompress the given compressed resource data.""" - - try: - signature, header_length, compression_type, decompressed_length, dcmp_id, unknown, table_count_m1, flags_raw = STRUCT_COMPRESSED_HEADER.unpack_from(data, offset=0) - except struct.error: - raise DecompressError(f"Invalid header") - if signature != COMPRESSED_SIGNATURE: - raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE}") - if header_length != STRUCT_COMPRESSED_HEADER.size: - raise DecompressError(f"Unsupported header length: 0x{header_length:>04x}, expected 0x{STRUCT_COMPRESSED_HEADER.size:>04x}") - if compression_type != COMPRESSED_TYPE_SYSTEM: - raise DecompressError(f"Unsupported compression type: 0x{compression_type:>04x}, expected 0x{COMPRESSED_TYPE_SYSTEM:>04x}") +def _decompress_system(data: bytes, *, debug: bool=False) -> bytes: + decompressed_length, dcmp_id, unknown, table_count_m1, flags_raw = STRUCT_COMPRESSED_SYSTEM_HEADER.unpack_from(data) if dcmp_id != 2: raise DecompressError(f"Unsupported 'dcmp' ID: {dcmp_id}, expected 2") if debug: @@ -175,15 +172,15 @@ def decompress(data: bytes, *, debug: bool=False) -> bytes: print(f"Table has {table_count} entries") try: - flags = CompressedFlags(flags_raw) + flags = CompressedSystemFlags(flags_raw) except ValueError: raise DecompressError(f"Unsupported flags set: 0b{flags_raw:>08b}, currently only bits 0 and 1 are supported") if debug: print(f"Flags: {flags}") - if CompressedFlags.CUSTOM_TABLE in flags: - table_start = STRUCT_COMPRESSED_HEADER.size + if CompressedSystemFlags.CUSTOM_TABLE in flags: + table_start = STRUCT_COMPRESSED_SYSTEM_HEADER.size data_start = table_start + table_count * 2 table = [] for i in range(table_start, data_start, 2): @@ -194,16 +191,38 @@ def decompress(data: bytes, *, debug: bool=False) -> bytes: if table_count_m1 != 0: raise DecompressError(f"table_count_m1 field is {table_count_m1}, but must be zero when the default table is used") table = COMPRESSED_DEFAULT_TABLE - data_start = STRUCT_COMPRESSED_HEADER.size + data_start = STRUCT_COMPRESSED_SYSTEM_HEADER.size if debug: print("Using default table") - if CompressedFlags.TAGGED in flags: - decompress_func = _decompress_tagged + if CompressedSystemFlags.TAGGED in flags: + decompress_func = _decompress_system_tagged else: - decompress_func = _decompress_untagged + decompress_func = _decompress_system_untagged decompressed = decompress_func(data[data_start:], decompressed_length, table, debug=debug) if len(decompressed) != decompressed_length: raise DecompressError(f"Actual length of decompressed data ({len(decompressed)}) does not match length stored in resource ({decompressed_length})") return decompressed + + +def decompress(data: bytes, *, debug: bool=False) -> bytes: + """Decompress the given compressed resource data.""" + + try: + signature, header_length, compression_type = STRUCT_COMPRESSED_HEADER.unpack_from(data) + except struct.error: + raise DecompressError(f"Invalid header") + if signature != COMPRESSED_SIGNATURE: + raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE}") + if header_length != 0x12: + raise DecompressError(f"Unsupported header length: 0x{header_length:>04x}, expected 0x12") + + if compression_type == COMPRESSED_TYPE_APPLICATION: + decompress_func = _decompress_application + elif compression_type == COMPRESSED_TYPE_SYSTEM: + decompress_func = _decompress_system + else: + raise DecompressError(f"Unsupported compression type: 0x{compression_type:>04x}") + + return decompress_func(data[STRUCT_COMPRESSED_HEADER.size:], debug=debug)