"""A collection of utility functions and classes related to IO streams. For internal use only.""" import io import typing def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes: """Read byte_count bytes from the stream and raise an exception if too few bytes are read (i. e. if EOF was hit prematurely). :param stream: The stream to read from. :param byte_count: The number of bytes to read. :return: The read data, which is exactly ``byte_count`` bytes long. :raise EOFError: If not enough data could be read from the stream. """ data = stream.read(byte_count) if len(data) != byte_count: raise EOFError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes") return data class _SubStream(io.RawIOBase): """A read-only stream that provides a view over a range of data from another stream.""" _outer_stream: typing.BinaryIO _start_offset: int _length: int _seek_position: int def __init__(self, stream: typing.BinaryIO, start_offset: int, length: int) -> None: super().__init__() self._outer_stream = stream self._start_offset = start_offset self._length = length self._seek_position = 0 outer_stream_length = self._outer_stream.seek(0, io.SEEK_END) if self._start_offset + self._length > outer_stream_length: raise ValueError(f"start_offset ({self._start_offset}) or length ({self._length}) too high: outer stream must be at least {self._start_offset + self._length} bytes long, but is only {outer_stream_length} bytes") def seekable(self) -> bool: return True def seek(self, offset: int, whence: int = io.SEEK_SET) -> int: if whence == io.SEEK_SET: if offset < 0: raise ValueError(f"Negative seek offset not allowed with SEEK_SET: {offset}") self._seek_position = offset elif whence == io.SEEK_CUR: self._seek_position += offset elif whence == io.SEEK_END: self._seek_position = self._length - offset else: raise ValueError(f"Invalid whence value: {whence}") self._seek_position = max(0, min(self._length, self._seek_position)) return self._seek_position def tell(self) -> int: return self._seek_position def readable(self) -> bool: return True def readall(self) -> bytes: self._outer_stream.seek(self._start_offset + self._seek_position) return self._outer_stream.read(self._length - self._seek_position) def readinto(self, buffer: bytearray) -> typing.Optional[int]: size = min(len(buffer), self._length - self._seek_position) self._outer_stream.seek(self._start_offset + self._seek_position) data = self._outer_stream.read(size) self._seek_position += len(data) buffer[:len(data)] = data return len(data) def make_substream(stream: typing.BinaryIO, start_offset: int, length: int) -> typing.BinaryIO: """Create a read-only stream that exposes the specified range of data from ``stream``. :param stream: The underlying binary stream from which to read the data. The stream must be readable and seekable and contain at least ``start_offset + length`` bytes of data. :param start_offset: The absolute offset in the parent stream at which the data to expose starts. This offset will correspond to offset 0 in the returned stream. :param length: The length of the data to expose. This is the highest valid offset in the returned stream. """ # For some reason, mypy thinks that io.BufferedReader is not a typing.BinaryIO. return typing.cast(typing.BinaryIO, io.BufferedReader(_SubStream(stream, start_offset, length))) if typing.TYPE_CHECKING: class PeekableIO(typing.Protocol): """Minimal protocol for binary IO streams that support the peek method. The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable. """ def readable(self) -> bool: ... def read(self, size: typing.Optional[int] = ...) -> bytes: ... def peek(self, size: int = ...) -> bytes: ... class _PeekableIOWrapper(object): """Wrapper class to add peek support to an existing stream. Do not instantiate this class directly, use the make_peekable function instead. Python provides a standard io.BufferedReader class, which supports the peek method. However, according to its documentation, it only supports wrapping io.RawIOBase subclasses, and not streams which are already otherwise buffered. Warning: this class does not perform any buffering of its own, outside of what is required to make peek work. It is strongly recommended to only wrap streams that are already buffered or otherwise fast to read from. In particular, raw streams (io.RawIOBase subclasses) should be wrapped using io.BufferedReader instead. """ _wrapped: typing.BinaryIO _readahead: bytes def __init__(self, wrapped: typing.BinaryIO) -> None: super().__init__() self._wrapped = wrapped self._readahead = b"" def readable(self) -> bool: return self._wrapped.readable() def read(self, size: typing.Optional[int] = None) -> bytes: if size is None or size < 0: ret = self._readahead + self._wrapped.read() self._readahead = b"" elif size <= len(self._readahead): ret = self._readahead[:size] self._readahead = self._readahead[size:] else: ret = self._readahead + self._wrapped.read(size - len(self._readahead)) self._readahead = b"" return ret def peek(self, size: int = -1) -> bytes: if not self._readahead: self._readahead = self._wrapped.read(io.DEFAULT_BUFFER_SIZE if size < 0 else size) return self._readahead def make_peekable(stream: typing.BinaryIO) -> "PeekableIO": """Wrap an arbitrary binary IO stream so that it supports the peek method. The stream is wrapped as efficiently as possible (or not at all if it already supports the peek method). However, in the worst case a custom wrapper class needs to be used, which may not be particularly efficient and only supports a very minimal interface. The only methods that are guaranteed to exist on the returned stream are readable, read, and peek. """ if hasattr(stream, "peek"): # Stream is already peekable, nothing to be done. return typing.cast("PeekableIO", stream) elif not typing.TYPE_CHECKING and isinstance(stream, io.RawIOBase): # This branch is skipped when type checking - mypy incorrectly warns about this code being unreachable, because it thinks that a typing.BinaryIO cannot be an instance of io.RawIOBase. # Raw IO streams can be wrapped efficiently using BufferedReader. return io.BufferedReader(stream) else: # Other streams need to be wrapped using our custom wrapper class. return _PeekableIOWrapper(stream)