mirror of
https://github.com/dgelessus/python-rsrcfork.git
synced 2025-01-08 21:35:20 +00:00
Refactor rsrcfork.compress to prepare for new compression type support
This commit is contained in:
parent
67a16d34a6
commit
360833f940
@ -9,21 +9,26 @@ __all__ = [
|
||||
|
||||
# The signature of all compressed resource data, 0xa89f6572 in hex, or "®üer" in MacRoman.
|
||||
COMPRESSED_SIGNATURE = b"\xa8\x9fer"
|
||||
# The compression type commonly used for application resources.
|
||||
COMPRESSED_TYPE_APPLICATION = 0x0801
|
||||
# The compression type commonly used for System file resources.
|
||||
COMPRESSED_TYPE_SYSTEM = 0x0901
|
||||
|
||||
# Header for a compressed resource.
|
||||
# Common header for compressed resources of all types.
|
||||
# 4 bytes: Signature (see above).
|
||||
# 2 bytes: Length of the header. (This meaning is just a guess - the field's value is always 0x0012, so there's no way to know for certain what it means.)
|
||||
# 2 bytes: Compression type. Known so far: 0x0901 is used in the System file's resources. 0x0801 is used in other files' resources. Currently only the first type is supported.
|
||||
# 2 bytes: Length of the complete header (this common part and the type-specific part that follows it). (This meaning is just a guess - the field's value is always 0x0012, so there's no way to know for certain what it means.)
|
||||
# 2 bytes: Compression type. Known so far: 0x0901 is used in the System file's resources. 0x0801 is used in other files' resources.
|
||||
STRUCT_COMPRESSED_HEADER = struct.Struct(">4sHH")
|
||||
|
||||
# Header continuation part for a "system" compressed resource.
|
||||
# 4 bytes: Length of the data after decompression.
|
||||
# 2 bytes: The ID of the 'dcmp' resource that can decompress this resource. Currently only ID 2 is supported.
|
||||
# 2 bytes: Unknown meaning, doesn't appear to have any effect on the decompression algorithm. Usually zero, sometimes set to a small integer (< 10). On 'lpch' resources, the value is always nonzero, and sometimes larger than usual.
|
||||
# 1 byte: Number of entries in the custom lookup table minus one. Set to zero if the default lookup table is used.
|
||||
# 1 byte: Flags. See the CompressedFlags enum below for details.
|
||||
STRUCT_COMPRESSED_HEADER = struct.Struct(">4sHHIhHBB")
|
||||
# 1 byte: Flags. See the CompressedSystemFlags enum below for details.
|
||||
STRUCT_COMPRESSED_SYSTEM_HEADER = struct.Struct(">IhHBB")
|
||||
|
||||
# Default lookup table for compressed resources.
|
||||
# Default lookup table for "system" compressed resources.
|
||||
# If the custom table flag is set, a custom table (usually with fewer than 256 entries) is used instead of this one.
|
||||
# This table was obtained by decompressing a manually created compressed resource that refers to every possible table entry. Detailed steps:
|
||||
# 1. Create a file with a resource fork
|
||||
@ -69,7 +74,7 @@ COMPRESSED_DEFAULT_TABLE_DATA = (
|
||||
COMPRESSED_DEFAULT_TABLE = [COMPRESSED_DEFAULT_TABLE_DATA[i:i + 2] for i in range(0, len(COMPRESSED_DEFAULT_TABLE_DATA), 2)]
|
||||
|
||||
|
||||
class CompressedFlags(enum.Flag):
|
||||
class CompressedSystemFlags(enum.Flag):
|
||||
TAGGED = 1 << 1 # The compressed data is tagged, meaning that it consists of "blocks" of a tag byte followed by 8 table references and/or literals. See comments in the decompress function for details.
|
||||
CUSTOM_TABLE = 1 << 0 # A custom lookup table is included before the compressed data, which is used instead of the default table.
|
||||
|
||||
@ -94,7 +99,11 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool
|
||||
)
|
||||
|
||||
|
||||
def _decompress_untagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
|
||||
def _decompress_application(data: bytes, *, debug: bool=False) -> bytes:
|
||||
raise DecompressError('"Application" compression type not supported yet')
|
||||
|
||||
|
||||
def _decompress_system_untagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
|
||||
parts = []
|
||||
i = 0
|
||||
while i < len(data):
|
||||
@ -113,7 +122,7 @@ def _decompress_untagged(data: bytes, decompressed_length: int, table: typing.Se
|
||||
|
||||
return b"".join(parts)
|
||||
|
||||
def _decompress_tagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
|
||||
def _decompress_system_tagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
|
||||
parts = []
|
||||
i = 0
|
||||
while i < len(data):
|
||||
@ -151,20 +160,8 @@ def _decompress_tagged(data: bytes, decompressed_length: int, table: typing.Sequ
|
||||
|
||||
return b"".join(parts)
|
||||
|
||||
|
||||
def decompress(data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress the given compressed resource data."""
|
||||
|
||||
try:
|
||||
signature, header_length, compression_type, decompressed_length, dcmp_id, unknown, table_count_m1, flags_raw = STRUCT_COMPRESSED_HEADER.unpack_from(data, offset=0)
|
||||
except struct.error:
|
||||
raise DecompressError(f"Invalid header")
|
||||
if signature != COMPRESSED_SIGNATURE:
|
||||
raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE}")
|
||||
if header_length != STRUCT_COMPRESSED_HEADER.size:
|
||||
raise DecompressError(f"Unsupported header length: 0x{header_length:>04x}, expected 0x{STRUCT_COMPRESSED_HEADER.size:>04x}")
|
||||
if compression_type != COMPRESSED_TYPE_SYSTEM:
|
||||
raise DecompressError(f"Unsupported compression type: 0x{compression_type:>04x}, expected 0x{COMPRESSED_TYPE_SYSTEM:>04x}")
|
||||
def _decompress_system(data: bytes, *, debug: bool=False) -> bytes:
|
||||
decompressed_length, dcmp_id, unknown, table_count_m1, flags_raw = STRUCT_COMPRESSED_SYSTEM_HEADER.unpack_from(data)
|
||||
if dcmp_id != 2:
|
||||
raise DecompressError(f"Unsupported 'dcmp' ID: {dcmp_id}, expected 2")
|
||||
if debug:
|
||||
@ -175,15 +172,15 @@ def decompress(data: bytes, *, debug: bool=False) -> bytes:
|
||||
print(f"Table has {table_count} entries")
|
||||
|
||||
try:
|
||||
flags = CompressedFlags(flags_raw)
|
||||
flags = CompressedSystemFlags(flags_raw)
|
||||
except ValueError:
|
||||
raise DecompressError(f"Unsupported flags set: 0b{flags_raw:>08b}, currently only bits 0 and 1 are supported")
|
||||
|
||||
if debug:
|
||||
print(f"Flags: {flags}")
|
||||
|
||||
if CompressedFlags.CUSTOM_TABLE in flags:
|
||||
table_start = STRUCT_COMPRESSED_HEADER.size
|
||||
if CompressedSystemFlags.CUSTOM_TABLE in flags:
|
||||
table_start = STRUCT_COMPRESSED_SYSTEM_HEADER.size
|
||||
data_start = table_start + table_count * 2
|
||||
table = []
|
||||
for i in range(table_start, data_start, 2):
|
||||
@ -194,16 +191,38 @@ def decompress(data: bytes, *, debug: bool=False) -> bytes:
|
||||
if table_count_m1 != 0:
|
||||
raise DecompressError(f"table_count_m1 field is {table_count_m1}, but must be zero when the default table is used")
|
||||
table = COMPRESSED_DEFAULT_TABLE
|
||||
data_start = STRUCT_COMPRESSED_HEADER.size
|
||||
data_start = STRUCT_COMPRESSED_SYSTEM_HEADER.size
|
||||
if debug:
|
||||
print("Using default table")
|
||||
|
||||
if CompressedFlags.TAGGED in flags:
|
||||
decompress_func = _decompress_tagged
|
||||
if CompressedSystemFlags.TAGGED in flags:
|
||||
decompress_func = _decompress_system_tagged
|
||||
else:
|
||||
decompress_func = _decompress_untagged
|
||||
decompress_func = _decompress_system_untagged
|
||||
|
||||
decompressed = decompress_func(data[data_start:], decompressed_length, table, debug=debug)
|
||||
if len(decompressed) != decompressed_length:
|
||||
raise DecompressError(f"Actual length of decompressed data ({len(decompressed)}) does not match length stored in resource ({decompressed_length})")
|
||||
return decompressed
|
||||
|
||||
|
||||
def decompress(data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress the given compressed resource data."""
|
||||
|
||||
try:
|
||||
signature, header_length, compression_type = STRUCT_COMPRESSED_HEADER.unpack_from(data)
|
||||
except struct.error:
|
||||
raise DecompressError(f"Invalid header")
|
||||
if signature != COMPRESSED_SIGNATURE:
|
||||
raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE}")
|
||||
if header_length != 0x12:
|
||||
raise DecompressError(f"Unsupported header length: 0x{header_length:>04x}, expected 0x12")
|
||||
|
||||
if compression_type == COMPRESSED_TYPE_APPLICATION:
|
||||
decompress_func = _decompress_application
|
||||
elif compression_type == COMPRESSED_TYPE_SYSTEM:
|
||||
decompress_func = _decompress_system
|
||||
else:
|
||||
raise DecompressError(f"Unsupported compression type: 0x{compression_type:>04x}")
|
||||
|
||||
return decompress_func(data[STRUCT_COMPRESSED_HEADER.size:], debug=debug)
|
||||
|
Loading…
Reference in New Issue
Block a user