Refactor .dcmp0 and .dcmp1 to be stream-based

The decompression code is more readable this way, because the
compressed data needs to be processed sequentially. It also allows
moving the length check and some debug logging into an outer generator.

This also allows incremental decompression, but this doesn't have any
practical advantage, because the compressed resource data is all read
at once (there is no API for opening resources as streams), and
resources are not very large anyway.
This commit is contained in:
dgelessus 2019-10-01 10:09:00 +02:00
parent db48212ade
commit 1e79dc3c50
3 changed files with 139 additions and 169 deletions

View File

@ -100,19 +100,26 @@ class CompressedSystemHeaderInfo(CompressedHeaderInfo):
return f"{type(self).__qualname__}(header_length={self.header_length}, compression_type=0x{self.compression_type:>04x}, decompressed_length={self.decompressed_length}, dcmp_id={self.dcmp_id}, parameters={self.parameters!r})"
def read_variable_length_integer(data: bytes, position: int) -> typing.Tuple[int, int]:
"""Read a variable-length integer starting at the given position in the data, and return the integer as well as the number of bytes consumed.
def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes:
"""Read byte_count bytes from the stream and raise an exception if too few bytes are read (i. e. if EOF was hit prematurely)."""
data = stream.read(byte_count)
if len(data) != byte_count:
raise DecompressError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes")
return data
def read_variable_length_integer(stream: typing.BinaryIO) -> int:
"""Read a variable-length integer from the stream.
This variable-length integer format is used by the 0xfe codes in the compression formats used by 'dcmp' (0) and 'dcmp' (1).
"""
assert len(data) > position
if data[position] == 0xff:
assert len(data) > position + 4
return int.from_bytes(data[position+1:position+5], "big", signed=True), 5
elif data[position] >= 0x80:
assert len(data) > position + 1
data_modified = bytes([(data[position] - 0xc0) & 0xff, data[position+1]])
return int.from_bytes(data_modified, "big", signed=True), 2
head = read_exact(stream, 1)
if head[0] == 0xff:
return int.from_bytes(read_exact(stream, 4), "big", signed=True)
elif head[0] >= 0x80:
data_modified = bytes([(head[0] - 0xc0) & 0xff]) + read_exact(stream, 1)
return int.from_bytes(data_modified, "big", signed=True)
else:
return int.from_bytes(data[position:position+1], "big", signed=True), 1
return int.from_bytes(head, "big", signed=True)

View File

@ -1,3 +1,4 @@
import io
import typing
from . import common
@ -38,97 +39,73 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
assert len(TABLE) == len(range(0x4b, 0xfe))
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
"""Decompress compressed data in the format used by 'dcmp' (0)."""
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
if not isinstance(header_info, common.CompressedApplicationHeaderInfo):
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
prev_literals: typing.List[bytes] = []
decompressed = b""
i = 0
while i < len(data):
byte = data[i]
while True: # Loop is terminated when the EOF marker (0xff) is encountered
(byte,) = common.read_exact(stream, 1)
if debug:
print(f"Tag byte 0x{byte:>02x}, at 0x{i:x}, decompressing to 0x{len(decompressed):x}")
print(f"Tag byte 0x{byte:>02x}")
if byte in range(0x00, 0x20):
# Literal byte sequence.
if byte in (0x00, 0x10):
# The length of the literal data is stored in the next byte.
count_div2 = data[i+1]
begin = i + 2
(count_div2,) = common.read_exact(stream, 1)
else:
# The length of the literal data is stored in the low nibble of the tag byte.
count_div2 = byte >> 0 & 0xf
begin = i + 1
end = begin + 2*count_div2
count = 2 * count_div2
# Controls whether or not the literal is stored so that it can be referenced again later.
do_store = byte >= 0x10
literal = data[begin:end]
literal = common.read_exact(stream, count)
if debug:
print(f"Literal (storing: {do_store})")
print(f"\t-> {literal}")
decompressed += literal
if do_store:
if debug:
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
prev_literals.append(literal)
i = end
yield literal
elif byte in (0x20, 0x21):
# Backreference to a previous literal, 2-byte form.
# This can reference literals with index in range(0x28, 0x228).
table_index = 0x28 + ((byte - 0x20) << 8 | data[i+1])
i += 2
(next_byte,) = common.read_exact(stream, 1)
table_index = 0x28 + ((byte - 0x20) << 8 | next_byte)
if debug:
print(f"Backreference (2-byte form) to 0x{table_index:>02x}")
literal = prev_literals[table_index]
if debug:
print(f"\t-> {literal}")
decompressed += literal
yield prev_literals[table_index]
elif byte == 0x22:
# Backreference to a previous literal, 3-byte form.
# This can reference any literal with index 0x28 and higher, but is only necessary for literals with index 0x228 and higher.
table_index = 0x28 + int.from_bytes(data[i+1:i+3], "big", signed=False)
i += 3
table_index = 0x28 + int.from_bytes(common.read_exact(stream, 2), "big", signed=False)
if debug:
print(f"Backreference (3-byte form) to 0x{table_index:>02x}")
literal = prev_literals[table_index]
if debug:
print(f"\t-> {literal}")
decompressed += literal
yield prev_literals[table_index]
elif byte in range(0x23, 0x4b):
# Backreference to a previous literal, 1-byte form.
# This can reference literals with indices in range(0x28).
table_index = byte - 0x23
i += 1
if debug:
print(f"Backreference (1-byte form) to 0x{table_index:>02x}")
literal = prev_literals[table_index]
if debug:
print(f"\t-> {literal}")
decompressed += literal
yield prev_literals[table_index]
elif byte in range(0x4b, 0xfe):
# Reference into a fixed table of two-byte literals.
# All compressed resources use the same table.
table_index = byte - 0x4b
i += 1
if debug:
print(f"Fixed table reference to 0x{table_index:>02x}")
entry = TABLE[table_index]
if debug:
print(f"\t-> {entry}")
decompressed += entry
yield TABLE[table_index]
elif byte == 0xfe:
# Extended code, whose meaning is controlled by the following byte.
i += 1
kind = data[i]
(kind,) = common.read_exact(stream, 1)
if debug:
print(f"Extended code: 0x{kind:>02x}")
i += 1
if kind == 0x00:
# Compact representation of (part of) a segment loader jump table, as used in 'CODE' (0) resources.
@ -137,37 +114,28 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
print(f"Segment loader jump table entries")
# All generated jump table entries have the same segment number.
segment_number_int, length = common.read_variable_length_integer(data, i)
i += length
segment_number_int = common.read_variable_length_integer(stream)
if debug:
print(f"\t-> segment number: {segment_number_int:#x}")
# The tail part of all jump table entries (i. e. everything except for the address).
entry_tail = b"?<" + segment_number_int.to_bytes(2, "big", signed=True) + b"\xa9\xf0"
if debug:
print(f"\t-> tail of first entry: {entry_tail}")
# The tail is output once *without* an address in front, i. e. the first entry's address must be generated manually by a previous code.
decompressed += entry_tail
yield entry_tail
count, length = common.read_variable_length_integer(data, i)
i += length
count = common.read_variable_length_integer(stream)
if count <= 0:
raise common.DecompressError(f"Jump table entry count must be greater than 0, not {count}")
# The second entry's address is stored explicitly.
current_int, length = common.read_variable_length_integer(data, i)
i += length
current_int = common.read_variable_length_integer(stream)
if debug:
print(f"-> address of second entry: {current_int:#x}")
entry = current_int.to_bytes(2, "big", signed=False) + entry_tail
if debug:
print(f"-> second entry: {entry}")
decompressed += entry
print(f"\t-> address of second entry: {current_int:#x}")
yield current_int.to_bytes(2, "big", signed=False) + entry_tail
for _ in range(1, count):
# All further entries' addresses are stored as differences relative to the previous entry's address.
diff, length = common.read_variable_length_integer(data, i)
i += length
diff = common.read_variable_length_integer(stream)
# For some reason, each difference is 6 higher than it should be.
diff -= 6
@ -175,10 +143,7 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
current_int = (current_int + diff) & 0xffff
if debug:
print(f"\t-> difference {diff:#x}: {current_int:#x}")
entry = current_int.to_bytes(2, "big", signed=False) + entry_tail
if debug:
print(f"\t-> {entry}")
decompressed += entry
yield current_int.to_bytes(2, "big", signed=False) + entry_tail
elif kind in (0x02, 0x03):
# Repeat 1 or 2 bytes a certain number of times.
@ -193,23 +158,19 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
print(f"Repeat {byte_count}-byte value")
# The byte(s) to repeat, stored as a variable-length integer. The value is treated as unsigned, i. e. the integer is never negative.
to_repeat_int, length = common.read_variable_length_integer(data, i)
i += length
to_repeat_int = common.read_variable_length_integer(stream)
try:
to_repeat = to_repeat_int.to_bytes(byte_count, "big", signed=False)
except OverflowError:
raise common.DecompressError(f"Value to repeat out of range for {byte_count}-byte repeat: {to_repeat_int:#x}")
count_m1, length = common.read_variable_length_integer(data, i)
i += length
count = count_m1 + 1
count = common.read_variable_length_integer(stream) + 1
if count <= 0:
raise common.DecompressError(f"Repeat count must be positive: {count}")
repeated = to_repeat * count
if debug:
print(f"\t-> {to_repeat} * {count}: {repeated}")
decompressed += repeated
print(f"\t-> {to_repeat} * {count}")
yield to_repeat * count
elif kind == 0x04:
# A sequence of 16-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
@ -217,18 +178,16 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
print(f"Difference-encoded 16-bit integers")
# The first integer is stored explicitly, as a signed value.
initial_int, length = common.read_variable_length_integer(data, i)
i += length
initial_int = common.read_variable_length_integer(stream)
try:
initial = initial_int.to_bytes(2, "big", signed=True)
except OverflowError:
raise common.DecompressError(f"Initial value out of range for 16-bit integer difference encoding: {initial_int:#x}")
if debug:
print(f"\t-> initial: {initial}")
decompressed += initial
print(f"\t-> initial: 0x{initial_int:>04x}")
yield initial
count, length = common.read_variable_length_integer(data, i)
i += length
count = common.read_variable_length_integer(stream)
if count < 0:
raise common.DecompressError(f"Count cannot be negative: {count}")
@ -237,15 +196,13 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
for _ in range(count):
# The difference to the previous integer is stored as an 8-bit signed integer.
# The usual variable-length integer format is *not* used here.
diff = int.from_bytes(data[i:i+1], "big", signed=True)
i += 1
diff = int.from_bytes(common.read_exact(stream, 1), "big", signed=True)
# Simulate 16-bit integer wraparound.
current_int = (current_int + diff) & 0xffff
current = current_int.to_bytes(2, "big", signed=False)
if debug:
print(f"\t-> difference {diff:#x}: {current}")
decompressed += current
print(f"\t-> difference {diff:#x}: 0x{current_int:>04x}")
yield current_int.to_bytes(2, "big", signed=False)
elif kind == 0x06:
# A sequence of 32-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
@ -253,48 +210,63 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
print(f"Difference-encoded 32-bit integers")
# The first integer is stored explicitly, as a signed value.
initial_int, length = common.read_variable_length_integer(data, i)
i += length
initial_int = common.read_variable_length_integer(stream)
try:
initial = initial_int.to_bytes(4, "big", signed=True)
except OverflowError:
raise common.DecompressError(f"Initial value out of range for 32-bit integer difference encoding: {initial_int:#x}")
if debug:
print(f"\t-> initial: {initial}")
decompressed += initial
print(f"\t-> initial: 0x{initial_int:>08x}")
yield initial
count, length = common.read_variable_length_integer(data, i)
i += length
count = common.read_variable_length_integer(stream)
assert count >= 0
# To make the following calculations simpler, the signed initial_int value is converted to unsigned.
current_int = initial_int & 0xffffffff
for _ in range(count):
# The difference to the previous integer is stored as a variable-length integer, whose value may be negative.
diff, length = common.read_variable_length_integer(data, i)
i += length
diff = common.read_variable_length_integer(stream)
# Simulate 32-bit integer wraparound.
current_int = (current_int + diff) & 0xffffffff
current = current_int.to_bytes(4, "big", signed=False)
if debug:
print(f"\t-> difference {diff:#x}: {current}")
decompressed += current
print(f"\t-> difference {diff:#x}: 0x{current_int:>08x}")
yield current_int.to_bytes(4, "big", signed=False)
else:
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
elif byte == 0xff:
# End of data marker, always occurs exactly once as the last byte of the compressed data.
if debug:
print("End marker")
if i != len(data) - 1:
raise common.DecompressError(f"End marker reached at {i}, before the expected end of data at {len(data) - 1}")
i += 1
# Check that there really is no more data left.
extra = stream.read(1)
if extra:
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra})")
break
else:
raise common.DecompressError(f"Unknown tag byte: 0x{data[i]:>02x}")
if header_info.decompressed_length % 2 != 0 and len(decompressed) == header_info.decompressed_length + 1:
# Special case: if the decompressed data length stored in the header is odd and one less than the length of the actual decompressed data, drop the last byte.
# This is necessary because nearly all codes generate data in groups of 2 or 4 bytes, so it is basically impossible to represent data with an odd length using this compression format.
decompressed = decompressed[:-1]
return decompressed
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (0)."""
decompressed_length = 0
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
if debug:
print(f"\t-> {chunk}")
if header_info.decompressed_length % 2 != 0 and decompressed_length + len(chunk) == header_info.decompressed_length + 1:
# Special case: if the decompressed data length stored in the header is odd and one less than the length of the actual decompressed data, drop the last byte.
# This is necessary because nearly all codes generate data in groups of 2 or 4 bytes, so it is basically impossible to represent data with an odd length using this compression format.
decompressed_length += len(chunk) - 1
yield chunk[:-1]
else:
decompressed_length += len(chunk)
yield chunk
if debug:
print(f"Decompressed {decompressed_length:#x} bytes so far")
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
return b"".join(decompress_stream(header_info, io.BytesIO(data), debug=debug))

View File

@ -1,3 +1,4 @@
import io
import typing
from . import common
@ -21,99 +22,75 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
assert len(TABLE) == len(range(0xd5, 0xfe))
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
"""Decompress compressed data in the format used by 'dcmp' (1)."""
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
if not isinstance(header_info, common.CompressedApplicationHeaderInfo):
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
prev_literals: typing.List[bytes] = []
decompressed = b""
i = 0
while i < len(data):
byte = data[i]
while True: # Loop is terminated when the EOF marker (0xff) is encountered
(byte,) = common.read_exact(stream, 1)
if debug:
print(f"Tag byte 0x{byte:>02x}, at 0x{i:x}, decompressing to 0x{len(decompressed):x}")
print(f"Tag byte 0x{byte:>02x}")
if byte in range(0x00, 0x20):
# Literal byte sequence, 1-byte header.
# The length of the literal data is stored in the low nibble of the tag byte.
count = (byte >> 0 & 0xf) + 1
begin = i + 1
end = begin + count
# Controls whether or not the literal is stored so that it can be referenced again later.
do_store = byte >= 0x10
literal = data[begin:end]
literal = common.read_exact(stream, count)
if debug:
print(f"Literal (1-byte header, storing: {do_store})")
print(f"\t-> {literal}")
decompressed += literal
if do_store:
if debug:
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
prev_literals.append(literal)
i = end
yield literal
elif byte in range(0x20, 0xd0):
# Backreference to a previous literal, 1-byte form.
# This can reference literals with indices in range(0xb0).
table_index = byte - 0x20
i += 1
if debug:
print(f"Backreference (1-byte form) to 0x{table_index:>02x}")
literal = prev_literals[table_index]
if debug:
print(f"\t-> {literal}")
decompressed += literal
yield prev_literals[table_index]
elif byte in (0xd0, 0xd1):
# Literal byte sequence, 2-byte header.
# The length of the literal data is stored in the following byte.
count = data[i+1]
begin = i + 2
end = begin + count
(count,) = common.read_exact(stream, 1)
# Controls whether or not the literal is stored so that it can be referenced again later.
do_store = byte == 0xd1
literal = data[begin:end]
literal = common.read_exact(stream, count)
if debug:
print(f"Literal (2-byte header, storing: {do_store})")
print(f"\t-> {literal}")
decompressed += literal
if do_store:
if debug:
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
prev_literals.append(literal)
i = end
yield literal
elif byte == 0xd2:
# Backreference to a previous literal, 2-byte form.
# This can reference literals with indices in range(0xb0, 0x1b0).
table_index = data[i+1] + 0xb0
i += 2
(next_byte,) = common.read_exact(stream, 1)
table_index = next_byte + 0xb0
if debug:
print(f"Backreference (2-byte form) to 0x{table_index:>02x}")
literal = prev_literals[table_index]
if debug:
print(f"\t-> {literal}")
decompressed += literal
yield prev_literals[table_index]
elif byte in range(0xd5, 0xfe):
# Reference into a fixed table of two-byte literals.
# All compressed resources use the same table.
table_index = byte - 0xd5
i += 1
if debug:
print(f"Fixed table reference to 0x{table_index:>02x}")
entry = TABLE[table_index]
if debug:
print(f"\t-> {entry}")
decompressed += entry
yield TABLE[table_index]
elif byte == 0xfe:
# Extended code, whose meaning is controlled by the following byte.
i += 1
kind = data[i]
(kind,) = common.read_exact(stream, 1)
if debug:
print(f"Extended code: 0x{kind:>02x}")
i += 1
if kind == 0x02:
# Repeat 1 byte a certain number of times.
@ -124,33 +101,47 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
print(f"Repeat {byte_count}-byte value")
# The byte(s) to repeat, stored as a variable-length integer. The value is treated as unsigned, i. e. the integer is never negative.
to_repeat_int, length = common.read_variable_length_integer(data, i)
i += length
to_repeat_int = common.read_variable_length_integer(stream)
try:
to_repeat = to_repeat_int.to_bytes(byte_count, "big", signed=False)
except OverflowError:
raise common.DecompressError(f"Value to repeat out of range for {byte_count}-byte repeat: {to_repeat_int:#x}")
count_m1, length = common.read_variable_length_integer(data, i)
i += length
count = count_m1 + 1
count = common.read_variable_length_integer(stream) + 1
if count <= 0:
raise common.DecompressError(f"Repeat count must be positive: {count}")
repeated = to_repeat * count
if debug:
print(f"\t-> {to_repeat} * {count}: {repeated}")
decompressed += repeated
print(f"\t-> {to_repeat} * {count}")
yield to_repeat * count
else:
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
elif byte == 0xff:
# End of data marker, always occurs exactly once as the last byte of the compressed data.
if debug:
print("End marker")
if i != len(data) - 1:
raise common.DecompressError(f"End marker reached at {i}, before the expected end of data at {len(data) - 1}")
i += 1
# Check that there really is no more data left.
extra = stream.read(1)
if extra:
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra})")
break
else:
raise common.DecompressError(f"Unknown tag byte: 0x{data[i]:>02x}")
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (1)."""
return decompressed
decompressed_length = 0
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
if debug:
print(f"\t-> {chunk}")
decompressed_length += len(chunk)
yield chunk
if debug:
print(f"Decompressed {decompressed_length:#x} bytes so far")
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
return b"".join(decompress_stream(header_info, io.BytesIO(data), debug=debug))