mirror of
https://github.com/dgelessus/python-rsrcfork.git
synced 2025-01-23 16:30:46 +00:00
Refactor rsrcfork.compress to prepare for 'dcmp' (1) support
This commit is contained in:
parent
3d444bda10
commit
acd056973e
@ -163,23 +163,11 @@ def _read_variable_length_integer(data: bytes, position: int) -> typing.Tuple[in
|
|||||||
return int.from_bytes(data[position:position+1], "big", signed=True), 1
|
return int.from_bytes(data[position:position+1], "big", signed=True), 1
|
||||||
|
|
||||||
|
|
||||||
def _decompress_application(data: bytes, decompressed_length: int, *, debug: bool=False) -> bytes:
|
def _decompress_application_0(data: bytes, decompressed_length: int, *, debug: bool=False) -> bytes:
|
||||||
working_buffer_fractional_size, expansion_buffer_size, dcmp_id, reserved = STRUCT_COMPRESSED_APPLICATION_HEADER.unpack_from(data)
|
|
||||||
|
|
||||||
if debug:
|
|
||||||
print(f"Working buffer fractional size: {working_buffer_fractional_size} (=> {len(data) * 256 / working_buffer_fractional_size})")
|
|
||||||
print(f"Expansion buffer size: {expansion_buffer_size}")
|
|
||||||
|
|
||||||
if dcmp_id != 0:
|
|
||||||
raise DecompressError(f"Unsupported 'dcmp' ID: {dcmp_id}, expected 0")
|
|
||||||
|
|
||||||
if reserved != 0:
|
|
||||||
raise DecompressError(f"Reserved field should be 0, not 0x{reserved:>04x}")
|
|
||||||
|
|
||||||
prev_literals = []
|
prev_literals = []
|
||||||
decompressed = b""
|
decompressed = b""
|
||||||
|
|
||||||
i = STRUCT_COMPRESSED_APPLICATION_HEADER.size
|
i = 0
|
||||||
|
|
||||||
while i < len(data):
|
while i < len(data):
|
||||||
byte = data[i]
|
byte = data[i]
|
||||||
@ -432,6 +420,30 @@ def _decompress_application(data: bytes, decompressed_length: int, *, debug: boo
|
|||||||
return decompressed
|
return decompressed
|
||||||
|
|
||||||
|
|
||||||
|
def _decompress_application_1(data: bytes, decompressed_length: int, *, debug: bool=False) -> bytes:
|
||||||
|
raise NotImplementedError("'dcmp' (1) decompression not supported yet")
|
||||||
|
|
||||||
|
|
||||||
|
def _decompress_application(data: bytes, decompressed_length: int, *, debug: bool=False) -> bytes:
|
||||||
|
working_buffer_fractional_size, expansion_buffer_size, dcmp_id, reserved = STRUCT_COMPRESSED_APPLICATION_HEADER.unpack_from(data)
|
||||||
|
|
||||||
|
if debug:
|
||||||
|
print(f"Working buffer fractional size: {working_buffer_fractional_size} (=> {len(data) * 256 / working_buffer_fractional_size})")
|
||||||
|
print(f"Expansion buffer size: {expansion_buffer_size}")
|
||||||
|
|
||||||
|
if dcmp_id == 0:
|
||||||
|
decompress_func = _decompress_application_0
|
||||||
|
elif dcmp_id == 1:
|
||||||
|
decompress_func = _decompress_application_1
|
||||||
|
else:
|
||||||
|
raise DecompressError(f"Unsupported 'dcmp' ID: {dcmp_id}, expected 0 or 1")
|
||||||
|
|
||||||
|
if reserved != 0:
|
||||||
|
raise DecompressError(f"Reserved field should be 0, not 0x{reserved:>04x}")
|
||||||
|
|
||||||
|
return decompress_func(data[STRUCT_COMPRESSED_APPLICATION_HEADER.size:], decompressed_length, debug=debug)
|
||||||
|
|
||||||
|
|
||||||
def _decompress_system_untagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
|
def _decompress_system_untagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
|
||||||
parts = []
|
parts = []
|
||||||
i = 0
|
i = 0
|
||||||
|
Loading…
x
Reference in New Issue
Block a user