mirror of
https://github.com/dgelessus/python-rsrcfork.git
synced 2024-11-15 13:04:51 +00:00
145 lines
5.8 KiB
Python
145 lines
5.8 KiB
Python
import io
|
|
import typing
|
|
|
|
from . import common
|
|
|
|
# Lookup table for codes in range(0xd5, 0xfe).
|
|
# This table was obtained by decompressing a manually created compressed resource with the following contents:
|
|
# b'\xa8\x9fer\x00\x12\x08\x01\x00\x00\x00R\x80\x03\x00\x01\x00\x00' + bytes(range(0xd5, 0xfe)) + b'\xff'
|
|
TABLE_DATA = (
|
|
# First line corresponds to codes in range(0xd5, 0xd8).
|
|
b"\x00\x00\x00\x01\x00\x02"
|
|
# All following lines correspond to 8 codes each.
|
|
b"\x00\x03.\x01>\x01\x01\x01\x1e\x01\xff\xff\x0e\x011\x00"
|
|
b"\x11\x12\x01\x0732\x129\xed\x10\x01'#\"\x017"
|
|
b"\x07\x06\x01\x17\x01#\x00\xff\x00/\x07\x0e\xfd<\x015"
|
|
b"\x01\x15\x01\x02\x00\x07\x00>\x05\xd5\x02\x01\x06\x07\x07\x08"
|
|
# Last line corresponds to codes in range(0xf8, 0xfe).
|
|
b"0\x01\x013\x00\x10\x17\x167>67"
|
|
)
|
|
# Note: index 0 in this table corresponds to code 0xd5, index 1 to 0xd6, etc.
|
|
TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
|
|
assert len(TABLE) == len(range(0xd5, 0xfe))
|
|
|
|
|
|
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
|
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
|
|
|
|
if not isinstance(header_info, common.CompressedType8HeaderInfo):
|
|
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
|
|
|
|
prev_literals: typing.List[bytes] = []
|
|
|
|
while True: # Loop is terminated when the EOF marker (0xff) is encountered
|
|
(byte,) = common.read_exact(stream, 1)
|
|
if debug:
|
|
print(f"Tag byte 0x{byte:>02x}")
|
|
|
|
if byte in range(0x00, 0x20):
|
|
# Literal byte sequence, 1-byte header.
|
|
# The length of the literal data is stored in the low nibble of the tag byte.
|
|
count = (byte >> 0 & 0xf) + 1
|
|
# Controls whether or not the literal is stored so that it can be referenced again later.
|
|
do_store = byte >= 0x10
|
|
literal = common.read_exact(stream, count)
|
|
if debug:
|
|
print(f"Literal (1-byte header, storing: {do_store})")
|
|
if do_store:
|
|
if debug:
|
|
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
|
|
prev_literals.append(literal)
|
|
yield literal
|
|
elif byte in range(0x20, 0xd0):
|
|
# Backreference to a previous literal, 1-byte form.
|
|
# This can reference literals with indices in range(0xb0).
|
|
table_index = byte - 0x20
|
|
if debug:
|
|
print(f"Backreference (1-byte form) to 0x{table_index:>02x}")
|
|
yield prev_literals[table_index]
|
|
elif byte in (0xd0, 0xd1):
|
|
# Literal byte sequence, 2-byte header.
|
|
# The length of the literal data is stored in the following byte.
|
|
(count,) = common.read_exact(stream, 1)
|
|
# Controls whether or not the literal is stored so that it can be referenced again later.
|
|
do_store = byte == 0xd1
|
|
literal = common.read_exact(stream, count)
|
|
if debug:
|
|
print(f"Literal (2-byte header, storing: {do_store})")
|
|
if do_store:
|
|
if debug:
|
|
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
|
|
prev_literals.append(literal)
|
|
yield literal
|
|
elif byte == 0xd2:
|
|
# Backreference to a previous literal, 2-byte form.
|
|
# This can reference literals with indices in range(0xb0, 0x1b0).
|
|
(next_byte,) = common.read_exact(stream, 1)
|
|
table_index = next_byte + 0xb0
|
|
if debug:
|
|
print(f"Backreference (2-byte form) to 0x{table_index:>02x}")
|
|
yield prev_literals[table_index]
|
|
elif byte in range(0xd5, 0xfe):
|
|
# Reference into a fixed table of two-byte literals.
|
|
# All compressed resources use the same table.
|
|
table_index = byte - 0xd5
|
|
if debug:
|
|
print(f"Fixed table reference to 0x{table_index:>02x}")
|
|
yield TABLE[table_index]
|
|
elif byte == 0xfe:
|
|
# Extended code, whose meaning is controlled by the following byte.
|
|
|
|
(kind,) = common.read_exact(stream, 1)
|
|
if debug:
|
|
print(f"Extended code: 0x{kind:>02x}")
|
|
|
|
if kind == 0x02:
|
|
# Repeat 1 byte a certain number of times.
|
|
|
|
byte_count = 1 # Unlike with 'dcmp' (0) compression, there doesn't appear to be a 2-byte repeat (or if there is, it's never used in practice).
|
|
|
|
if debug:
|
|
print(f"Repeat {byte_count}-byte value")
|
|
|
|
# The byte(s) to repeat, stored as a variable-length integer. The value is treated as unsigned, i. e. the integer is never negative.
|
|
to_repeat_int = common.read_variable_length_integer(stream)
|
|
try:
|
|
to_repeat = to_repeat_int.to_bytes(byte_count, "big", signed=False)
|
|
except OverflowError:
|
|
raise common.DecompressError(f"Value to repeat out of range for {byte_count}-byte repeat: {to_repeat_int:#x}")
|
|
|
|
count = common.read_variable_length_integer(stream) + 1
|
|
if count <= 0:
|
|
raise common.DecompressError(f"Repeat count must be positive: {count}")
|
|
|
|
if debug:
|
|
print(f"\t-> {to_repeat!r} * {count}")
|
|
yield to_repeat * count
|
|
else:
|
|
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
|
|
elif byte == 0xff:
|
|
# End of data marker, always occurs exactly once as the last byte of the compressed data.
|
|
if debug:
|
|
print("End marker")
|
|
|
|
# Check that there really is no more data left.
|
|
extra = stream.read(1)
|
|
if extra:
|
|
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra!r})")
|
|
break
|
|
else:
|
|
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
|
|
|
|
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
|
|
"""Decompress compressed data in the format used by 'dcmp' (1)."""
|
|
|
|
decompressed_length = 0
|
|
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
|
|
if debug:
|
|
print(f"\t-> {chunk!r}")
|
|
|
|
decompressed_length += len(chunk)
|
|
yield chunk
|
|
|
|
if debug:
|
|
print(f"Decompressed {decompressed_length:#x} bytes so far")
|