2019-10-02 16:28:40 +02:00
import io
import typing
2019-08-22 21:19:10 +02:00
from . import dcmp0
from . import dcmp1
from . import dcmp2
2019-10-02 16:29:32 +02:00
from . common import DecompressError , CompressedHeaderInfo
2019-08-22 21:19:10 +02:00
__all__ = [
2019-10-02 16:29:32 +02:00
" CompressedHeaderInfo " ,
2019-08-22 21:19:10 +02:00
" DecompressError " ,
" decompress " ,
2019-10-02 16:29:32 +02:00
" decompress_parsed " ,
" decompress_stream " ,
" decompress_stream_parsed " ,
2019-08-22 21:19:10 +02:00
]
2019-09-23 23:10:55 +02:00
2019-09-23 23:32:38 +02:00
# Maps 'dcmp' IDs to their corresponding Python implementations.
2019-10-02 16:28:40 +02:00
# Each decompressor has the signature (header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes].
2019-09-23 23:32:38 +02:00
DECOMPRESSORS = {
2019-10-02 16:28:40 +02:00
0 : dcmp0 . decompress_stream ,
1 : dcmp1 . decompress_stream ,
2 : dcmp2 . decompress_stream ,
2019-09-23 23:32:38 +02:00
}
2019-08-22 21:19:10 +02:00
2019-10-02 16:28:40 +02:00
def decompress_stream_parsed ( header_info : CompressedHeaderInfo , stream : typing . BinaryIO , * , debug : bool = False ) - > typing . Iterator [ bytes ] :
""" Decompress compressed resource data from a stream, whose header has already been read and parsed into a CompressedHeaderInfo object. """
2019-09-23 23:10:55 +02:00
2019-09-23 23:32:38 +02:00
try :
decompress_func = DECOMPRESSORS [ header_info . dcmp_id ]
except KeyError :
raise DecompressError ( f " Unsupported ' dcmp ' ID: { header_info . dcmp_id } " )
2019-08-22 21:19:10 +02:00
2019-10-02 16:28:40 +02:00
decompressed_length = 0
for chunk in decompress_func ( header_info , stream , debug = debug ) :
decompressed_length + = len ( chunk )
yield chunk
if decompressed_length != header_info . decompressed_length :
raise DecompressError ( f " Actual length of decompressed data ( { decompressed_length } ) does not match length stored in resource ( { header_info . decompressed_length } ) " )
2019-09-23 23:50:29 +02:00
2019-10-02 16:28:40 +02:00
def decompress_parsed ( header_info : CompressedHeaderInfo , data : bytes , * , debug : bool = False ) - > bytes :
""" Decompress the given compressed resource data, whose header has already been removed and parsed into a CompressedHeaderInfo object. """
return b " " . join ( decompress_stream_parsed ( header_info , io . BytesIO ( data ) , debug = debug ) )
2019-09-23 23:50:29 +02:00
2019-10-02 16:28:40 +02:00
def decompress_stream ( stream : typing . BinaryIO , * , debug : bool = False ) - > typing . Iterator [ bytes ] :
""" Decompress compressed resource data from a stream. """
2019-09-23 23:50:29 +02:00
2019-10-02 16:28:40 +02:00
header_info = CompressedHeaderInfo . parse_stream ( stream )
2019-09-23 23:50:29 +02:00
if debug :
print ( f " Compressed resource data header: { header_info } " )
2019-10-02 16:28:40 +02:00
yield from decompress_stream_parsed ( header_info , stream , debug = debug )
def decompress ( data : bytes , * , debug : bool = False ) - > bytes :
""" Decompress the given compressed resource data. """
return b " " . join ( decompress_stream ( io . BytesIO ( data ) , debug = debug ) )