Move .compress.common.make_peekable and related code into ._io_utils

This commit is contained in:
dgelessus 2020-07-19 23:13:01 +02:00
parent 476a68916b
commit 0f6018e4bf
3 changed files with 79 additions and 77 deletions

View File

@ -1,5 +1,6 @@
"""A collection of utility functions and classes related to IO streams. For internal use only."""
import io
import typing
@ -16,3 +17,77 @@ def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes:
if len(data) != byte_count:
raise EOFError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes")
return data
if typing.TYPE_CHECKING:
class PeekableIO(typing.Protocol):
"""Minimal protocol for binary IO streams that support the peek method.
The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable.
"""
def readable(self) -> bool:
...
def read(self, size: typing.Optional[int] = ...) -> bytes:
...
def peek(self, size: int = ...) -> bytes:
...
class _PeekableIOWrapper(object):
"""Wrapper class to add peek support to an existing stream. Do not instantiate this class directly, use the make_peekable function instead.
Python provides a standard io.BufferedReader class, which supports the peek method. However, according to its documentation, it only supports wrapping io.RawIOBase subclasses, and not streams which are already otherwise buffered.
Warning: this class does not perform any buffering of its own, outside of what is required to make peek work. It is strongly recommended to only wrap streams that are already buffered or otherwise fast to read from. In particular, raw streams (io.RawIOBase subclasses) should be wrapped using io.BufferedReader instead.
"""
_wrapped: typing.BinaryIO
_readahead: bytes
def __init__(self, wrapped: typing.BinaryIO) -> None:
super().__init__()
self._wrapped = wrapped
self._readahead = b""
def readable(self) -> bool:
return self._wrapped.readable()
def read(self, size: typing.Optional[int] = None) -> bytes:
if size is None or size < 0:
ret = self._readahead + self._wrapped.read()
self._readahead = b""
elif size <= len(self._readahead):
ret = self._readahead[:size]
self._readahead = self._readahead[size:]
else:
ret = self._readahead + self._wrapped.read(size - len(self._readahead))
self._readahead = b""
return ret
def peek(self, size: int = -1) -> bytes:
if not self._readahead:
self._readahead = self._wrapped.read(io.DEFAULT_BUFFER_SIZE if size < 0 else size)
return self._readahead
def make_peekable(stream: typing.BinaryIO) -> "PeekableIO":
"""Wrap an arbitrary binary IO stream so that it supports the peek method.
The stream is wrapped as efficiently as possible (or not at all if it already supports the peek method). However, in the worst case a custom wrapper class needs to be used, which may not be particularly efficient and only supports a very minimal interface. The only methods that are guaranteed to exist on the returned stream are readable, read, and peek.
"""
if hasattr(stream, "peek"):
# Stream is already peekable, nothing to be done.
return typing.cast("PeekableIO", stream)
elif not typing.TYPE_CHECKING and isinstance(stream, io.RawIOBase):
# This branch is skipped when type checking - mypy incorrectly warns about this code being unreachable, because it thinks that a typing.BinaryIO cannot be an instance of io.RawIOBase.
# Raw IO streams can be wrapped efficiently using BufferedReader.
return io.BufferedReader(stream)
else:
# Other streams need to be wrapped using our custom wrapper class.
return _PeekableIOWrapper(stream)

View File

@ -107,80 +107,6 @@ class CompressedType9HeaderInfo(CompressedHeaderInfo):
return f"{type(self).__qualname__}(header_length={self.header_length}, compression_type=0x{self.compression_type:>04x}, decompressed_length={self.decompressed_length}, dcmp_id={self.dcmp_id}, parameters={self.parameters!r})"
if typing.TYPE_CHECKING:
class PeekableIO(typing.Protocol):
"""Minimal protocol for binary IO streams that support the peek method.
The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable.
"""
def readable(self) -> bool:
...
def read(self, size: typing.Optional[int] = ...) -> bytes:
...
def peek(self, size: int = ...) -> bytes:
...
class _PeekableIOWrapper(object):
"""Wrapper class to add peek support to an existing stream. Do not instantiate this class directly, use the make_peekable function instead.
Python provides a standard io.BufferedReader class, which supports the peek method. However, according to its documentation, it only supports wrapping io.RawIOBase subclasses, and not streams which are already otherwise buffered.
Warning: this class does not perform any buffering of its own, outside of what is required to make peek work. It is strongly recommended to only wrap streams that are already buffered or otherwise fast to read from. In particular, raw streams (io.RawIOBase subclasses) should be wrapped using io.BufferedReader instead.
"""
_wrapped: typing.BinaryIO
_readahead: bytes
def __init__(self, wrapped: typing.BinaryIO) -> None:
super().__init__()
self._wrapped = wrapped
self._readahead = b""
def readable(self) -> bool:
return self._wrapped.readable()
def read(self, size: typing.Optional[int] = None) -> bytes:
if size is None or size < 0:
ret = self._readahead + self._wrapped.read()
self._readahead = b""
elif size <= len(self._readahead):
ret = self._readahead[:size]
self._readahead = self._readahead[size:]
else:
ret = self._readahead + self._wrapped.read(size - len(self._readahead))
self._readahead = b""
return ret
def peek(self, size: int = -1) -> bytes:
if not self._readahead:
self._readahead = self._wrapped.read(io.DEFAULT_BUFFER_SIZE if size < 0 else size)
return self._readahead
def make_peekable(stream: typing.BinaryIO) -> "PeekableIO":
"""Wrap an arbitrary binary IO stream so that it supports the peek method.
The stream is wrapped as efficiently as possible (or not at all if it already supports the peek method). However, in the worst case a custom wrapper class needs to be used, which may not be particularly efficient and only supports a very minimal interface. The only methods that are guaranteed to exist on the returned stream are readable, read, and peek.
"""
if hasattr(stream, "peek"):
# Stream is already peekable, nothing to be done.
return typing.cast("PeekableIO", stream)
elif not typing.TYPE_CHECKING and isinstance(stream, io.RawIOBase):
# This branch is skipped when type checking - mypy incorrectly warns about this code being unreachable, because it thinks that a typing.BinaryIO cannot be an instance of io.RawIOBase.
# Raw IO streams can be wrapped efficiently using BufferedReader.
return io.BufferedReader(stream)
else:
# Other streams need to be wrapped using our custom wrapper class.
return _PeekableIOWrapper(stream)
def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes:
"""Read byte_count bytes from the stream and raise an exception if too few bytes are read (i. e. if EOF was hit prematurely)."""

View File

@ -2,6 +2,7 @@ import enum
import struct
import typing
from .. import _io_utils
from . import common
@ -73,7 +74,7 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool
)
def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
def _decompress_untagged(stream: "_io_utils.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
while True: # Loop is terminated when EOF is reached.
table_index_data = stream.read(1)
if not table_index_data:
@ -93,7 +94,7 @@ def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int,
yield table[table_index]
def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
def _decompress_tagged(stream: "_io_utils.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
while True: # Loop is terminated when EOF is reached.
tag_data = stream.read(1)
if not tag_data:
@ -174,4 +175,4 @@ def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.B
else:
decompress_func = _decompress_untagged
yield from decompress_func(common.make_peekable(stream), header_info.decompressed_length, table, debug=debug)
yield from decompress_func(_io_utils.make_peekable(stream), header_info.decompressed_length, table, debug=debug)