mirror of
https://github.com/dgelessus/python-rsrcfork.git
synced 2024-05-28 10:41:30 +00:00
Simplify decompressor lookup
All decompressors now have exactly the same signature (as a result, each decompressor now has to check itself that the header type is correct). This allows the decompressors to be stored in a simple dictionary, which makes the lookup process much simpler.
This commit is contained in:
parent
53e73be980
commit
a23cd0fcb2
|
@ -10,24 +10,13 @@ __all__ = [
|
|||
]
|
||||
|
||||
|
||||
def _decompress_application(data: bytes, header_info: CompressedApplicationHeaderInfo, *, debug: bool=False) -> bytes:
|
||||
if header_info.dcmp_id == 0:
|
||||
decompress_func = dcmp0.decompress
|
||||
elif header_info.dcmp_id == 1:
|
||||
decompress_func = dcmp1.decompress
|
||||
else:
|
||||
raise DecompressError(f"Unsupported 'dcmp' ID: {header_info.dcmp_id}, expected 0 or 1")
|
||||
|
||||
return decompress_func(header_info, data, debug=debug)
|
||||
|
||||
|
||||
def _decompress_system(data: bytes, header_info: CompressedSystemHeaderInfo, *, debug: bool=False) -> bytes:
|
||||
if header_info.dcmp_id == 2:
|
||||
decompress_func = dcmp2.decompress
|
||||
else:
|
||||
raise DecompressError(f"Unsupported 'dcmp' ID: {header_info.dcmp_id}, expected 2")
|
||||
|
||||
return decompress_func(header_info, data, debug=debug)
|
||||
# Maps 'dcmp' IDs to their corresponding Python implementations.
|
||||
# Each decompressor has the signature (header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes.
|
||||
DECOMPRESSORS = {
|
||||
0: dcmp0.decompress,
|
||||
1: dcmp1.decompress,
|
||||
2: dcmp2.decompress,
|
||||
}
|
||||
|
||||
|
||||
def decompress(data: bytes, *, debug: bool=False) -> bytes:
|
||||
|
@ -38,14 +27,12 @@ def decompress(data: bytes, *, debug: bool=False) -> bytes:
|
|||
if debug:
|
||||
print(f"Compressed resource data header: {header_info}")
|
||||
|
||||
if isinstance(header_info, CompressedApplicationHeaderInfo):
|
||||
decompress_func = _decompress_application
|
||||
elif isinstance(header_info, CompressedSystemHeaderInfo):
|
||||
decompress_func = _decompress_system
|
||||
else:
|
||||
raise DecompressError(f"Unsupported compression type: 0x{header_info.compression_type:>04x}")
|
||||
try:
|
||||
decompress_func = DECOMPRESSORS[header_info.dcmp_id]
|
||||
except KeyError:
|
||||
raise DecompressError(f"Unsupported 'dcmp' ID: {header_info.dcmp_id}")
|
||||
|
||||
decompressed = decompress_func(data[header_info.header_length:], header_info, debug=debug)
|
||||
decompressed = decompress_func(header_info, data[header_info.header_length:], debug=debug)
|
||||
if len(decompressed) != header_info.decompressed_length:
|
||||
raise DecompressError(f"Actual length of decompressed data ({len(decompressed)}) does not match length stored in resource ({header_info.decompressed_length})")
|
||||
return decompressed
|
||||
|
|
|
@ -36,9 +36,12 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
|
|||
assert len(TABLE) == len(range(0x4b, 0xfe))
|
||||
|
||||
|
||||
def decompress(header_info: common.CompressedApplicationHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (0)."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedApplicationHeaderInfo):
|
||||
raise common.DecompressError(f"Incorrect header type: {type(header_info.__qualname__)}")
|
||||
|
||||
prev_literals = []
|
||||
decompressed = b""
|
||||
|
||||
|
|
|
@ -19,9 +19,12 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
|
|||
assert len(TABLE) == len(range(0xd5, 0xfe))
|
||||
|
||||
|
||||
def decompress(header_info: common.CompressedApplicationHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (1)."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedApplicationHeaderInfo):
|
||||
raise common.DecompressError(f"Incorrect header type: {type(header_info.__qualname__)}")
|
||||
|
||||
prev_literals = []
|
||||
decompressed = b""
|
||||
|
||||
|
|
|
@ -131,9 +131,12 @@ def _decompress_system_tagged(data: bytes, decompressed_length: int, table: typi
|
|||
return b"".join(parts)
|
||||
|
||||
|
||||
def decompress(header_info: common.CompressedSystemHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (2)."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedSystemHeaderInfo):
|
||||
raise common.DecompressError(f"Incorrect header type: {type(header_info.__qualname__)}")
|
||||
|
||||
unknown, table_count_m1, flags_raw = STRUCT_PARAMETERS.unpack(header_info.parameters)
|
||||
|
||||
if debug:
|
||||
|
|
Loading…
Reference in New Issue
Block a user