From 0f6018e4bff90799ab22e1dd4ae2728384d64d29 Mon Sep 17 00:00:00 2001 From: dgelessus Date: Sun, 19 Jul 2020 23:13:01 +0200 Subject: [PATCH] Move .compress.common.make_peekable and related code into ._io_utils --- rsrcfork/_io_utils.py | 75 +++++++++++++++++++++++++++++++++++++ rsrcfork/compress/common.py | 74 ------------------------------------ rsrcfork/compress/dcmp2.py | 7 ++-- 3 files changed, 79 insertions(+), 77 deletions(-) diff --git a/rsrcfork/_io_utils.py b/rsrcfork/_io_utils.py index ec2d87b..d44c43d 100644 --- a/rsrcfork/_io_utils.py +++ b/rsrcfork/_io_utils.py @@ -1,5 +1,6 @@ """A collection of utility functions and classes related to IO streams. For internal use only.""" +import io import typing @@ -16,3 +17,77 @@ def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes: if len(data) != byte_count: raise EOFError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes") return data + + +if typing.TYPE_CHECKING: + class PeekableIO(typing.Protocol): + """Minimal protocol for binary IO streams that support the peek method. + + The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable. + """ + + def readable(self) -> bool: + ... + + def read(self, size: typing.Optional[int] = ...) -> bytes: + ... + + def peek(self, size: int = ...) -> bytes: + ... + + +class _PeekableIOWrapper(object): + """Wrapper class to add peek support to an existing stream. Do not instantiate this class directly, use the make_peekable function instead. + + Python provides a standard io.BufferedReader class, which supports the peek method. However, according to its documentation, it only supports wrapping io.RawIOBase subclasses, and not streams which are already otherwise buffered. + + Warning: this class does not perform any buffering of its own, outside of what is required to make peek work. It is strongly recommended to only wrap streams that are already buffered or otherwise fast to read from. In particular, raw streams (io.RawIOBase subclasses) should be wrapped using io.BufferedReader instead. + """ + + _wrapped: typing.BinaryIO + _readahead: bytes + + def __init__(self, wrapped: typing.BinaryIO) -> None: + super().__init__() + + self._wrapped = wrapped + self._readahead = b"" + + def readable(self) -> bool: + return self._wrapped.readable() + + def read(self, size: typing.Optional[int] = None) -> bytes: + if size is None or size < 0: + ret = self._readahead + self._wrapped.read() + self._readahead = b"" + elif size <= len(self._readahead): + ret = self._readahead[:size] + self._readahead = self._readahead[size:] + else: + ret = self._readahead + self._wrapped.read(size - len(self._readahead)) + self._readahead = b"" + + return ret + + def peek(self, size: int = -1) -> bytes: + if not self._readahead: + self._readahead = self._wrapped.read(io.DEFAULT_BUFFER_SIZE if size < 0 else size) + return self._readahead + + +def make_peekable(stream: typing.BinaryIO) -> "PeekableIO": + """Wrap an arbitrary binary IO stream so that it supports the peek method. + + The stream is wrapped as efficiently as possible (or not at all if it already supports the peek method). However, in the worst case a custom wrapper class needs to be used, which may not be particularly efficient and only supports a very minimal interface. The only methods that are guaranteed to exist on the returned stream are readable, read, and peek. + """ + + if hasattr(stream, "peek"): + # Stream is already peekable, nothing to be done. + return typing.cast("PeekableIO", stream) + elif not typing.TYPE_CHECKING and isinstance(stream, io.RawIOBase): + # This branch is skipped when type checking - mypy incorrectly warns about this code being unreachable, because it thinks that a typing.BinaryIO cannot be an instance of io.RawIOBase. + # Raw IO streams can be wrapped efficiently using BufferedReader. + return io.BufferedReader(stream) + else: + # Other streams need to be wrapped using our custom wrapper class. + return _PeekableIOWrapper(stream) diff --git a/rsrcfork/compress/common.py b/rsrcfork/compress/common.py index f8141cb..87c6c7c 100644 --- a/rsrcfork/compress/common.py +++ b/rsrcfork/compress/common.py @@ -107,80 +107,6 @@ class CompressedType9HeaderInfo(CompressedHeaderInfo): return f"{type(self).__qualname__}(header_length={self.header_length}, compression_type=0x{self.compression_type:>04x}, decompressed_length={self.decompressed_length}, dcmp_id={self.dcmp_id}, parameters={self.parameters!r})" -if typing.TYPE_CHECKING: - class PeekableIO(typing.Protocol): - """Minimal protocol for binary IO streams that support the peek method. - - The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable. - """ - - def readable(self) -> bool: - ... - - def read(self, size: typing.Optional[int] = ...) -> bytes: - ... - - def peek(self, size: int = ...) -> bytes: - ... - - -class _PeekableIOWrapper(object): - """Wrapper class to add peek support to an existing stream. Do not instantiate this class directly, use the make_peekable function instead. - - Python provides a standard io.BufferedReader class, which supports the peek method. However, according to its documentation, it only supports wrapping io.RawIOBase subclasses, and not streams which are already otherwise buffered. - - Warning: this class does not perform any buffering of its own, outside of what is required to make peek work. It is strongly recommended to only wrap streams that are already buffered or otherwise fast to read from. In particular, raw streams (io.RawIOBase subclasses) should be wrapped using io.BufferedReader instead. - """ - - _wrapped: typing.BinaryIO - _readahead: bytes - - def __init__(self, wrapped: typing.BinaryIO) -> None: - super().__init__() - - self._wrapped = wrapped - self._readahead = b"" - - def readable(self) -> bool: - return self._wrapped.readable() - - def read(self, size: typing.Optional[int] = None) -> bytes: - if size is None or size < 0: - ret = self._readahead + self._wrapped.read() - self._readahead = b"" - elif size <= len(self._readahead): - ret = self._readahead[:size] - self._readahead = self._readahead[size:] - else: - ret = self._readahead + self._wrapped.read(size - len(self._readahead)) - self._readahead = b"" - - return ret - - def peek(self, size: int = -1) -> bytes: - if not self._readahead: - self._readahead = self._wrapped.read(io.DEFAULT_BUFFER_SIZE if size < 0 else size) - return self._readahead - - -def make_peekable(stream: typing.BinaryIO) -> "PeekableIO": - """Wrap an arbitrary binary IO stream so that it supports the peek method. - - The stream is wrapped as efficiently as possible (or not at all if it already supports the peek method). However, in the worst case a custom wrapper class needs to be used, which may not be particularly efficient and only supports a very minimal interface. The only methods that are guaranteed to exist on the returned stream are readable, read, and peek. - """ - - if hasattr(stream, "peek"): - # Stream is already peekable, nothing to be done. - return typing.cast("PeekableIO", stream) - elif not typing.TYPE_CHECKING and isinstance(stream, io.RawIOBase): - # This branch is skipped when type checking - mypy incorrectly warns about this code being unreachable, because it thinks that a typing.BinaryIO cannot be an instance of io.RawIOBase. - # Raw IO streams can be wrapped efficiently using BufferedReader. - return io.BufferedReader(stream) - else: - # Other streams need to be wrapped using our custom wrapper class. - return _PeekableIOWrapper(stream) - - def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes: """Read byte_count bytes from the stream and raise an exception if too few bytes are read (i. e. if EOF was hit prematurely).""" diff --git a/rsrcfork/compress/dcmp2.py b/rsrcfork/compress/dcmp2.py index c57b06d..5876d3f 100644 --- a/rsrcfork/compress/dcmp2.py +++ b/rsrcfork/compress/dcmp2.py @@ -2,6 +2,7 @@ import enum import struct import typing +from .. import _io_utils from . import common @@ -73,7 +74,7 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool ) -def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]: +def _decompress_untagged(stream: "_io_utils.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]: while True: # Loop is terminated when EOF is reached. table_index_data = stream.read(1) if not table_index_data: @@ -93,7 +94,7 @@ def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, yield table[table_index] -def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]: +def _decompress_tagged(stream: "_io_utils.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]: while True: # Loop is terminated when EOF is reached. tag_data = stream.read(1) if not tag_data: @@ -174,4 +175,4 @@ def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.B else: decompress_func = _decompress_untagged - yield from decompress_func(common.make_peekable(stream), header_info.decompressed_length, table, debug=debug) + yield from decompress_func(_io_utils.make_peekable(stream), header_info.decompressed_length, table, debug=debug)