47 Commits
v1.2.0 ... 1.5

Author SHA1 Message Date
d342614f55 Release version 1.5.0 2019-10-22 10:17:07 +02:00
a5fb30e194 Fix broken handling of - (stdin) file name on command line 2019-10-16 23:29:20 +02:00
f3b3de496e Change naming of compression types
The old names ("system" and "application" compression) were not really
accurate in all cases, so the compression types are now referred to by
their number.
2019-10-07 10:08:32 +02:00
a71274d554 Document stream-based decompression in changelog 2019-10-02 16:36:54 +02:00
6d69d0097d Update rsrcfork.compress.__all__ 2019-10-02 16:29:32 +02:00
8db1b22bdc Make the generic decompression API stream-based
The non-stream-based APIs still exist as before and are not deprecated,
they just act as thin wrappers around the stream-based API.

The main rsrcfork module doesn't use the stream-based APIs yet, because
it reads each resource's data all at once and not incrementally.
2019-10-02 16:28:40 +02:00
6559cbc337 Refactor .dcmp2 to be stream-based
This is a little more complex than with the other decompressors,
because .dcmp2 has to behave differently when at the byte before EOF.
Checking whether this is the case requires lookahead, which is not easy
to do with a plain IO stream.

Some buffered IO streams provide a peek method for lookahead, but
others don't (such as io.BytesIO). There is no standard way to wrap an
already buffered IO stream to add a peek method, so we need a custom
wrapper class and helper function for this purpose.
2019-10-02 10:26:03 +02:00
1e79dc3c50 Refactor .dcmp0 and .dcmp1 to be stream-based
The decompression code is more readable this way, because the
compressed data needs to be processed sequentially. It also allows
moving the length check and some debug logging into an outer generator.

This also allows incremental decompression, but this doesn't have any
practical advantage, because the compressed resource data is all read
at once (there is no API for opening resources as streams), and
resources are not very large anyway.
2019-10-01 21:26:41 +02:00
db48212ade Fix a typo in a .compress.dcmp0 debug message 2019-10-01 21:26:41 +02:00
3a72bd3406 Remove leading underscores where they don't make much sense
The leading underscore is meant to distinguish private (for internal
use only) APIs from public (for external use) APIs. One can argue about
where the line between public and private should be, but if something
is used from other modules (as with read_variable_length_integer) it's
not really private IMHO.

In scripts (like __main__) it also doesn't make much sense to use
leading underscores, because the entire file is never meant to be used
by external code.
2019-10-01 21:26:41 +02:00
cb868b8005 Bump version to 1.4.1.dev 2019-09-29 19:27:43 +02:00
2f2472cfe9 Release version 1.4.0 2019-09-29 19:20:37 +02:00
e0f73d3220 Fix more issues reported by mypy 2019-09-29 16:28:07 +02:00
b77c85c295 Add mypy configuration section to setup.cfg 2019-09-29 16:27:37 +02:00
e5875ffe67 Fix various issues reported by mypy 2019-09-29 16:14:55 +02:00
449bf4dd71 Use parameterized typing.Mapping in ResourceFile definition
Previously the un-parameterized collections.abc.Mapping was used, which
makes type checking less accurate, as the exact key/value types are not
known.
2019-09-29 15:42:19 +02:00
0ac6e8a3c4 Fix misplaced parens in dcmp modules 2019-09-29 15:33:14 +02:00
29ddd21740 Add missing type annotations on some methods 2019-09-29 15:32:18 +02:00
add22b704a Fix ResourceFile.__enter__ not returning anything 2019-09-29 15:09:41 +02:00
fdd04c944b Remove __slots__ declaration from Resource class
It doesn't seem to have any noticeable performance benefit.
2019-09-29 15:00:45 +02:00
97c459bca7 Change attribute type annotations to standard format
Previously, the types of instance attributes were annotated with the
first assignment of each attribute. The standard way to annotate
instance attributes is to do so at class level without assigning any
value.
2019-09-29 14:58:18 +02:00
9ef084de58 Remove uses of the typing.io pseudo-module
According to https://bugs.python.org/issue35089, typing.io should not
be used anymore, and the types that it contains should be accessed
through the main typing module instead.
2019-09-28 01:40:34 +02:00
6d03954784 Document setup.cfg options.packages fixes in changelog 2019-09-25 02:32:32 +02:00
343259049c Fix setup.cfg options.packages not including subpackages
This caused normal installs (i. e. without --editable) of this library
to not include the rsrcfork.compress subpackage, and made everything
unusable as a result. Oops.
2019-09-25 01:51:23 +02:00
e75e88018e Add lots of additional Inside Macintosh-related links/info to README 2019-09-25 00:32:18 +02:00
0f72e8eb1f Document decompression improvements in changelog 2019-09-24 00:46:35 +02:00
84f09d0b83 Display 'dcmp' IDs in command line listings of compressed resources 2019-09-24 00:27:54 +02:00
c108af60ca Add length and length_raw attributes to Resource (closes #3)
For compressed resources, the value of the length attribute can be
accessed much more quickly than the data itself (because it only
requires parsing the header, rather than decompressing the entire
data). This is used to speed up listing of compressed resources on the
command line.

The length_raw attribute is added for symmetry, although it is not
specifically optimized in any case yet.
2019-09-24 00:13:23 +02:00
0c942e26ec Fix hex number formatting in compressed header info reprs 2019-09-23 23:52:06 +02:00
868a322b8e Add Resource.compressed_info attribute
This allows accessing a compressed resource's header data, without
having to decompress it or parse the compressed data manually.
2019-09-23 23:50:29 +02:00
a23cd0fcb2 Simplify decompressor lookup
All decompressors now have exactly the same signature (as a result,
each decompressor now has to check itself that the header type is
correct). This allows the decompressors to be stored in a simple
dictionary, which makes the lookup process much simpler.
2019-09-23 23:32:38 +02:00
53e73be980 Pass complete header info to individual decompressors 2019-09-23 23:19:20 +02:00
9dbdf5b827 Move compressed header info constants/classes to .compress.common
This allows the constants/classes to be accessed from the individual
decompressor submodules.
2019-09-23 23:14:06 +02:00
87d4ae43d4 Refactor parsing of compressed resource headers
In preparation for #3, the compressed resource data headers are parsed
and stored as proper objects. For now these objects are only used
internally by the decompression code, but in the future they can be
exposed.
2019-09-23 23:10:55 +02:00
716ac30a53 Add release instructions in a comment in __init__.py 2019-09-16 17:09:47 +02:00
20991154d3 Bump version to 1.3.1.dev 2019-09-16 16:46:17 +02:00
7207b1d32b Release version 1.3.0 2019-09-16 16:34:40 +02:00
1de940d597 Enable --sort by default and add --no-sort to disable sorting
In most cases the file order is not important and the unsorted output
hurts readability. The performance impact of sorting is relatively
small and barely noticeable even with large resource files.
2019-09-16 15:25:41 +02:00
d7255bc977 Adjust --group=id output format slightly 2019-09-16 14:58:21 +02:00
c6337bdfbd Rename resource_type and resource_id attributes to type and id
The old names were chosen to avoid conflicts with Python's type and id
builtins, but for attribute names this is not necessary.
2019-09-15 15:56:03 +02:00
f4c2717720 Add command-line --group option 2019-09-15 15:38:01 +02:00
8ad0234633 Add command-line --sort option 2019-09-13 15:00:56 +02:00
7612322c43 Add dump-text output format on command line 2019-09-13 14:51:16 +02:00
51ae7c6a09 Refactor __main__.main into smaller functions 2019-09-13 14:17:21 +02:00
194c886472 Change hex dump output format to match hexdump -C 2019-09-13 10:51:27 +02:00
b2fa5f8b0f Collapse multiple subsequent identical lines in hex dumps 2019-09-13 10:40:03 +02:00
752ec9e828 Bump version to 1.2.1.dev 2019-09-13 10:22:43 +02:00
11 changed files with 902 additions and 506 deletions

3
.gitignore vendored
View File

@ -6,3 +6,6 @@ __pycache__/
*.egg-info/
build/
dist/
# mypy
.mypy_cache/

View File

@ -56,7 +56,7 @@ Simple example
>>> rf
<rsrcfork.ResourceFile at 0x1046e6048, attributes ResourceFileAttrs.0, containing 4 resource types: [b'utxt', b'utf8', b'TEXT', b'drag']>
>>> rf[b"TEXT"]
<rsrcfork.ResourceFile._LazyResourceMap at 0x10470ed30 containing one resource: rsrcfork.Resource(resource_type=b'TEXT', resource_id=256, name=None, attributes=ResourceAttrs.0, data=b'Here is some text')>
<rsrcfork.ResourceFile._LazyResourceMap at 0x10470ed30 containing one resource: rsrcfork.Resource(type=b'TEXT', id=256, name=None, attributes=ResourceAttrs.0, data=b'Here is some text')>
Automatic selection of data/resource fork
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -110,10 +110,57 @@ Further info on resource files
Sources of information about the resource fork data format, and the structure of common resource types:
* Inside Macintosh, Volume I, Chapter 5 "The Resource Manager". This book can probably be obtained in physical form somewhere, but the relevant chapter/book is also available in a few places online:
* The Inside Macintosh book series, specifically the chapter "Resource Manager". These books are Apple's official reference material for the classic Macintosh platform. Over time, they have gone through many revisions and updates, and their structure has been changed multiple times. This is a (likely incomplete) list of the major revisions of Inside Macintosh and where they can be obtained online.
* `Apple's legacy documentation <https://developer.apple.com/legacy/library/documentation/mac/pdf/MoreMacintoshToolbox.pdf>`_
* pagetable.com, a site that happened to have a copy of the book: `info blog post <http://www.pagetable.com/?p=50>`_, `direct download <http://www.weihenstephan.org/~michaste/pagetable/mac/Inside_Macintosh.pdf>`_
* The earliest revisions consisted of two volumes, each a three-ring binder containing photocopied pages. The chapters were referred to as individual "manuals" and were essentially standalone - each one had its own title page, TOC, glossary, and page numbers. Various parts were still missing or not yet finalized, and updated pages were distributed regularly as part of the `Macintosh Software Supplement <https://macgui.com/news/article.php?t=447>`_.
* bitsavers.org has scanned and OCRed PDFs of a late (November 1984) revision: `Volume I <http://bitsavers.org/pdf/apple/mac/Inside_Macintosh_Vol_1_1984.pdf>`_, `Volume II <http://bitsavers.org/pdf/apple/mac/Inside_Macintosh_Vol_2_1984.pdf>`_.
* The Promotional Edition, released in early 1985, consisted of a single book (it was nicknamed the "phonebook" edition because of its paper quality). Although it was physically a single book, the contents were still structured into standalone "manuals" like in the ring binder version, and some parts were still missing or not finalized.
* bitsavers.org has `a scanned and OCRed PDF <http://bitsavers.org/pdf/apple/mac/Inside_Macintosh_Promotional_Edition_1985.pdf>`_.
* The published 1985 revision consisted of three volumes, available as three paperback books or a single hardcover book. They contained the finalized contents of the previous revisions, which documented the Macintosh 128k, Macintosh 512k, and Macintosh XL. Unlike the previous revisions, each volume had continuous page numbers and a full TOC and index, and volume III contained a complete glossary.
* pagetable.com has a `blog post <http://www.pagetable.com/?p=50>`_ with `a scanned and OCRed PDF of the three paperback volumes <http://www.weihenstephan.org/~michaste/pagetable/mac/Inside_Macintosh.pdf>`_.
* Additional volumes were published later to document newer Macintosh models. These served as incremental additions and did not fully supersede or replace any of the previous volumes.
* Volume IV was published in 1986 and documented the Macintosh Plus and Macintosh 512k Enhanced.
* Volume V was published in 1986 and documented the Macintosh II and Macintosh SE.
* Volume VI was published in 1991 and documented System 7.0.
* VintageApple.org has `scanned and OCRed PDFs of Volumes I through VI <https://vintageapple.org/inside_o/>`_.
* After 1991, Inside Macintosh was restructured into over 20 volumes organized by topic, rather than chronologically by Macintosh model. These were published as books starting in 1992, and later also on CDs and online.
* VintageApple.org has `rendered (not scanned) PDFs of 26 volumes and 7 X-Ref volumes <https://vintageapple.org/inside_r/>`_.
* The Communications Toolbox and QuickDraw GX Programmers' Overview volumes appear to be missing.
* Many volumes are still available in Apple's legacy developer documentation archive, in HTML and rendered (not scanned) PDF formats:
* Two volumes appear on the website under Inside Macintosh, even though other sources don't consider them part of the Inside Macintosh series:
* `Advanced Color Imaging on the Mac OS (HTML) <https://developer.apple.com/library/archive/documentation/mac/ACI/ACI-2.html>`_ (November 1996)
* `Advanced Color Imaging Reference (HTML) <https://developer.apple.com/library/archive/documentation/mac/ACIReference/ACIReference-2.html>`_ (November 1996)
* `Devices (HTML) <https://developer.apple.com/library/archive/documentation/mac/Devices/Devices-2.html>`_ (July 1996), `Devices (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Devices/pdf.html>`_ (1994)
* `Files (HTML) <https://developer.apple.com/library/archive/documentation/mac/Files/Files-2.html>`_ (July 1996), `Files (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Files/pdf.html>`_ (1992)
* `Imaging with QuickDraw (HTML) <https://developer.apple.com/library/archive/documentation/mac/QuickDraw/QuickDraw-2.html>`_ (July 1996), `Imaging with QuickDraw (single PDF) <https://developer.apple.com/library/archive/documentation/mac/pdf/ImagingWithQuickDraw.pdf>`_ (1994)
* `Interapplication Communication (HTML) <https://developer.apple.com/library/archive/documentation/mac/IAC/IAC-2.html>`_ (July 1996), `Interapplication Communication (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Interapplication_Communication/pdf.html>`_ (1993)
* `Macintosh Toolbox Essentials (HTML) <https://developer.apple.com/library/archive/documentation/mac/Toolbox/Toolbox-2.html>`_ (July 1996), `Macintosh Toolbox Essentials (single PDF) <https://developer.apple.com/library/archive/documentation/mac/pdf/MacintoshToolboxEssentials.pdf>`_ (1992)
* `Memory (HTML) <https://developer.apple.com/library/archive/documentation/mac/Memory/Memory-2.html>`_ (July 1996), `Memory (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Memory/pdf.html>`_ (1992)
* `More Macintosh Toolbox (HTML) <https://developer.apple.com/library/archive/documentation/mac/MoreToolbox/MoreToolbox-2.html>`_ (July 1996), `More Macintosh Toolbox (single PDF) <https://developer.apple.com/library/archive/documentation/mac/pdf/MoreMacintoshToolbox.pdf>`_ (1993)
* `Networking (HTML) <https://developer.apple.com/library/archive/documentation/mac/Networking/Networking-2.html>`_ (July 1996), `Networking (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Networking/pdf.html>`_ (1994)
* `Operating System Utilities (HTML) <https://developer.apple.com/library/archive/documentation/mac/OSUtilities/OSUtilities-2.html>`_ (July 1996), `Operating System Utilities (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Operating_System_Utilities/pdf.html>`_ (1994)
* `PowerPC Numerics (HTML) <https://developer.apple.com/library/archive/documentation/mac/PPCNumerics/PPCNumerics-2.html>`_ (July 1996), `PowerPC Numerics (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/PPC_Numerics.sit.hqx>`_ (1994)
* `PowerPC System Software (HTML) <https://developer.apple.com/library/archive/documentation/mac/PPCSoftware/PPCSoftware-2.html>`_ (July 1996), `PowerPC System Software (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/PPC_System_Software.sit.hqx>`_ (1994)
* `Processes (HTML) <https://developer.apple.com/library/archive/documentation/mac/Processes/Processes-2.html>`_ (June 1996), `Processes (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Processes/pdf.html>`_ (1992)
* `Sound (HTML) <https://developer.apple.com/library/archive/documentation/mac/Sound/Sound-2.html>`_ (July 1996), `Sound (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Sound/pdf.html>`_ (1994)
* `Text (HTML) <https://developer.apple.com/library/archive/documentation/mac/Text/Text-2.html>`_ (July 1996), `Text (single PDF) <https://developer.apple.com/library/archive/documentation/mac/pdf/Text.pdf>`_ (1993)
* The two AOCE volumes, Communications Toolbox, Human Interface Guidelines, Overview, seven QuickDraw GX volumes, two QuickTime volumes, and X-Ref are missing.
* The Gryphel project (best known for the Mini vMac emulator) has `a list of physical book releases <https://www.gryphel.com/c/books/appledev.html>`_ of Inside Macintosh (and other Apple developer documentation), including ISBNs, publishers, dates, and Amazon links.
* `Wikipedia <https://en.wikipedia.org/wiki/Resource_fork>`_, of course
* The `Resource Fork <http://fileformats.archiveteam.org/wiki/Resource_Fork>`_ article on "Just Solve the File Format Problem" (despite the title, this is a decent site and not clickbait)
@ -127,6 +174,53 @@ If these links are no longer functional, some are archived in the `Internet Arch
Changelog
---------
Version 1.5.0
^^^^^^^^^^^^^
* Added stream-based decompression methods to the ``rsrcfork.compress`` module.
* The internal decompressor implementations have been refactored to use streams.
* This allows for incremental decompression of compressed resource data. In practice this has no noticeable effect yet, because the main ``rsrcfork`` API doesn't support incremental reading of resource data.
* Fixed the command line tool always displaying an incorrect error "Cannot specify an explicit fork when reading from stdin" when using ``-`` (stdin) as the input file.
Version 1.4.0
^^^^^^^^^^^^^
* Added ``length`` and ``length_raw`` attributes to ``Resource``. These attributes are equivalent to the ``len`` of ``data`` and ``data_raw`` respectively, but may be faster to access.
* Currently, the only optimized case is ``length`` for compressed resources, but more optimizations may be added in the future.
* Added a ``compressed_info`` attribute to ``Resource`` that provides access to the header information of compressed resources.
* Improved handling of compressed resources when listing resource files with the command line tool.
* Metadata of compressed resources is now displayed even if no decompressor implementation is available (as long as the compressed data header can be parsed).
* Performance has been improved - the data no longer needs to be fully decompressed to get its length, this information is now read from the header.
* The ``'dcmp'`` ID used to decompress each resource is displayed.
* Fixed an incorrect ``options.packages`` in ``setup.cfg``, which made the library unusable except when installing from source using ``--editable``.
* Fixed ``ResourceFile.__enter__`` returning ``None``, which made it impossible to use ``ResourceFile`` properly in a ``with`` statement.
* Fixed various minor errors reported by type checking with ``mypy``.
Version 1.3.0.post1
^^^^^^^^^^^^^^^^^^^
* Fixed an incorrect ``options.packages`` in ``setup.cfg``, which made the library unusable except when installing from source using ``--editable``.
Version 1.2.0.post1
^^^^^^^^^^^^^^^^^^^
* Fixed an incorrect ``options.packages`` in ``setup.cfg``, which made the library unusable except when installing from source using ``--editable``.
Version 1.3.0
^^^^^^^^^^^^^
* Added a ``--group`` command line option to group resources in list format by type (the default), ID, or with no grouping.
* Added a ``dump-text`` output format to the command line tool. This format is identical to ``dump``, but instead of a hex dump, it outputs the resource data as text. The data is decoded as MacRoman and classic Mac newlines (``\r``) are translated. This is useful for examining resources that contain mostly plain text.
* Changed the command line tool to sort resources by type and ID, and added a ``--no-sort`` option to disable sorting and output resources in file order (which was the previous behavior).
* Renamed the ``rsrcfork.Resource`` attributes ``resource_type`` and ``resource_id`` to ``type`` and ``id``, respectively. The old names have been deprecated and will be removed in the future, but are still supported for now.
* Changed ``--format=dump`` output to match ``hexdump -C``'s format - spacing has been adjusted, and multiple subsequent identical lines are collapsed into a single ``*``.
Version 1.2.0
^^^^^^^^^^^^^

View File

@ -1,6 +1,26 @@
"""A pure Python, cross-platform library/tool for reading Macintosh resource data, as stored in resource forks and ``.rsrc`` files."""
__version__ = "1.2.0"
# To release a new version:
# * Remove the .dev suffix from the version number in this file.
# * Update the changelog in the README.rst (rename the "next version" section to the correct version number).
# * Remove the ``dist`` directory (if it exists) to clean up any old release files.
# * Run ``python3 setup.py sdist bdist_wheel`` to build the release files.
# * Run ``python3 -m twine check dist/*`` to check the release files.
# * Fix any errors reported by the build and/or check steps.
# * Commit the changes to master.
# * Tag the release commit with the version number, prefixed with a "v" (e. g. version 1.2.3 is tagged as v1.2.3).
# * Fast-forward the release branch to the new release commit.
# * Push the master and release branches.
# * Upload the release files to PyPI using ``python3 -m twine upload dist/*``.
# * On the GitHub repo's Releases page, edit the new release tag and add the relevant changelog section from the README.rst. (Note: The README is in reStructuredText format, but GitHub's release notes use Markdown, so it may be necessary to adjust the markup syntax.)
# After releasing:
# * (optional) Remove the build and dist directories from the previous release as they are no longer needed.
# * Bump the version number in this file to the next version and add a .dev suffix.
# * Add a new empty section for the next version to the README.rst changelog.
# * Commit and push the changes to master.
__version__ = "1.5.0"
__all__ = [
"Resource",

View File

@ -1,6 +1,7 @@
import argparse
import collections
import enum
import itertools
import sys
import textwrap
import typing
@ -24,13 +25,13 @@ _REZ_ATTR_NAMES = {
api.ResourceAttrs.resCompressed: None, # "Extended Header resource attribute"
}
F = typing.TypeVar("F", bound=enum.Flag, covariant=True)
def _decompose_flags(value: F) -> typing.Sequence[F]:
F = typing.TypeVar("F", bound=enum.Flag)
def decompose_flags(value: F) -> typing.Sequence[F]:
"""Decompose an enum.Flags instance into separate enum constants."""
return [bit for bit in type(value) if bit in value]
def _is_printable(char: str) -> bool:
def is_printable(char: str) -> bool:
"""Determine whether a character is printable for our purposes.
We mainly use Python's definition of printable (i. e. everything that Unicode does not consider a separator or "other" character). However, we also treat U+F8FF as printable, which is the private use codepoint used for the Apple logo character.
@ -38,20 +39,20 @@ def _is_printable(char: str) -> bool:
return char.isprintable() or char == "\uf8ff"
def _bytes_unescape(string: str) -> bytes:
def bytes_unescape(string: str) -> bytes:
"""Convert a string containing text (in _TEXT_ENCODING) and hex escapes to a bytestring.
(We implement our own unescaping mechanism here to not depend on any of Python's string/bytes escape syntax.)
"""
out = []
out: typing.List[int] = []
it = iter(string)
for char in it:
if char == "\\":
try:
esc = next(it)
if esc in "\\\'\"":
out.append(esc)
out.extend(esc.encode(_TEXT_ENCODING))
elif esc == "x":
x1, x2 = next(it), next(it)
out.append(int(x1+x2, 16))
@ -64,7 +65,7 @@ def _bytes_unescape(string: str) -> bytes:
return bytes(out)
def _bytes_escape(bs: bytes, *, quote: str=None) -> str:
def bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str:
"""Convert a bytestring to a string (using _TEXT_ENCODING), with non-printable characters hex-escaped.
(We implement our own escaping mechanism here to not depend on Python's str or bytes repr.)
@ -74,15 +75,15 @@ def _bytes_escape(bs: bytes, *, quote: str=None) -> str:
for byte, char in zip(bs, bs.decode(_TEXT_ENCODING)):
if char in {quote, "\\"}:
out.append(f"\\{char}")
elif _is_printable(char):
elif is_printable(char):
out.append(char)
else:
out.append(f"\\x{byte:02x}")
return "".join(out)
def _filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typing.Sequence[api.Resource]:
matching = collections.OrderedDict()
def filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typing.List[api.Resource]:
matching: typing.MutableMapping[typing.Tuple[bytes, int], api.Resource] = collections.OrderedDict()
for filter in filters:
if len(filter) == 4:
@ -92,15 +93,15 @@ def _filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> ty
continue
for res in resources.values():
matching[res.resource_type, res.resource_id] = res
matching[res.type, res.id] = res
elif filter[0] == filter[-1] == "'":
try:
resources = rf[_bytes_unescape(filter[1:-1])]
resources = rf[bytes_unescape(filter[1:-1])]
except KeyError:
continue
for res in resources.values():
matching[res.resource_type, res.resource_id] = res
matching[res.type, res.id] = res
else:
pos = filter.find("'", 1)
if pos == -1:
@ -108,71 +109,86 @@ def _filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> ty
elif filter[pos + 1] != " ":
raise ValueError(f"Invalid filter {filter!r}: Resource type and ID must be separated by a space")
restype, resid = filter[:pos + 1], filter[pos + 2:]
restype_str, resid_str = filter[:pos + 1], filter[pos + 2:]
if not restype[0] == restype[-1] == "'":
if not restype_str[0] == restype_str[-1] == "'":
raise ValueError(
f"Invalid filter {filter!r}: Resource type is not a single-quoted type identifier: {restype!r}")
restype = _bytes_unescape(restype[1:-1])
f"Invalid filter {filter!r}: Resource type is not a single-quoted type identifier: {restype_str!r}")
restype = bytes_unescape(restype_str[1:-1])
if len(restype) != 4:
raise ValueError(
f"Invalid filter {filter!r}: Type identifier must be 4 bytes after replacing escapes, got {len(restype)} bytes: {restype!r}")
if resid[0] != "(" or resid[-1] != ")":
if resid_str[0] != "(" or resid_str[-1] != ")":
raise ValueError(f"Invalid filter {filter!r}: Resource ID must be parenthesized")
resid = resid[1:-1]
resid_str = resid_str[1:-1]
try:
resources = rf[restype]
except KeyError:
continue
if resid[0] == resid[-1] == '"':
name = _bytes_unescape(resid[1:-1])
if resid_str[0] == resid_str[-1] == '"':
name = bytes_unescape(resid_str[1:-1])
for res in resources.values():
if res.name == name:
matching[res.resource_type, res.resource_id] = res
matching[res.type, res.id] = res
break
elif ":" in resid:
if resid.count(":") > 1:
raise ValueError(f"Invalid filter {filter!r}: Too many colons in ID range expression: {resid!r}")
start, end = resid.split(":")
start, end = int(start), int(end)
elif ":" in resid_str:
if resid_str.count(":") > 1:
raise ValueError(f"Invalid filter {filter!r}: Too many colons in ID range expression: {resid_str!r}")
start_str, end_str = resid_str.split(":")
start, end = int(start_str), int(end_str)
for res in resources.values():
if start <= res.resource_id <= end:
matching[res.resource_type, res.resource_id] = res
if start <= res.id <= end:
matching[res.type, res.id] = res
else:
resid = int(resid)
resid = int(resid_str)
try:
res = resources[resid]
except KeyError:
continue
matching[res.resource_type, res.resource_id] = res
matching[res.type, res.id] = res
return list(matching.values())
def _hexdump(data: bytes):
def hexdump(data: bytes) -> None:
last_line = None
asterisk_shown = False
for i in range(0, len(data), 16):
line = data[i:i + 16]
line_hex = " ".join(f"{byte:02x}" for byte in line)
line_char = line.decode(_TEXT_ENCODING).translate(_TRANSLATE_NONPRINTABLES)
print(f"{i:08x} {line_hex:<{16*2+15}} |{line_char}|")
# If the same 16-byte lines appear multiple times, print only the first one, and replace all further lines with a single line with an asterisk.
# This is unambiguous - to find out how many lines were collapsed this way, the user can compare the addresses of the lines before and after the asterisk.
if line == last_line:
if not asterisk_shown:
print("*")
asterisk_shown = True
else:
line_hex_left = " ".join(f"{byte:02x}" for byte in line[:8])
line_hex_right = " ".join(f"{byte:02x}" for byte in line[8:])
line_char = line.decode(_TEXT_ENCODING).translate(_TRANSLATE_NONPRINTABLES)
print(f"{i:08x} {line_hex_left:<{8*2+7}} {line_hex_right:<{8*2+7}} |{line_char}|")
asterisk_shown = False
last_line = line
if data:
print(f"{len(data):08x}")
def _raw_hexdump(data: bytes):
def raw_hexdump(data: bytes) -> None:
for i in range(0, len(data), 16):
print(" ".join(f"{byte:02x}" for byte in data[i:i + 16]))
def _describe_resource(res: api.Resource, *, include_type: bool, decompress: bool) -> str:
id_desc_parts = [f"{res.resource_id}"]
def translate_text(data: bytes) -> str:
return data.decode(_TEXT_ENCODING).replace("\r", "\n")
def describe_resource(res: api.Resource, *, include_type: bool, decompress: bool) -> str:
id_desc_parts = [f"{res.id}"]
if res.name is not None:
name = _bytes_escape(res.name, quote='"')
name = bytes_escape(res.name, quote='"')
id_desc_parts.append(f'"{name}"')
id_desc = ", ".join(id_desc_parts)
@ -181,16 +197,18 @@ def _describe_resource(res: api.Resource, *, include_type: bool, decompress: boo
if decompress and api.ResourceAttrs.resCompressed in res.attributes:
try:
res.data
res.compressed_info
except compress.DecompressError:
length_desc = f"decompression failed ({len(res.data_raw)} bytes compressed)"
length_desc = f"unparseable compressed data header ({res.length_raw} bytes compressed)"
else:
length_desc = f"{len(res.data)} bytes ({len(res.data_raw)} bytes compressed)"
assert res.compressed_info is not None
length_desc = f"{res.length} bytes ({res.length_raw} bytes compressed, 'dcmp' ({res.compressed_info.dcmp_id}) format)"
else:
length_desc = f"{len(res.data_raw)} bytes"
assert res.compressed_info is None
length_desc = f"{res.length_raw} bytes"
content_desc_parts.append(length_desc)
attrs = _decompose_flags(res.attributes)
attrs = decompose_flags(res.attributes)
if attrs:
content_desc_parts.append(" | ".join(attr.name for attr in attrs))
@ -198,11 +216,11 @@ def _describe_resource(res: api.Resource, *, include_type: bool, decompress: boo
desc = f"({id_desc}): {content_desc}"
if include_type:
restype = _bytes_escape(res.resource_type, quote="'")
restype = bytes_escape(res.type, quote="'")
desc = f"'{restype}' {desc}"
return desc
def main():
def parse_args() -> argparse.Namespace:
ap = argparse.ArgumentParser(
add_help=False,
fromfile_prefix_chars="@",
@ -229,7 +247,9 @@ def main():
ap.add_argument("-a", "--all", action="store_true", help="When no filters are given, show all resources in full, instead of an overview")
ap.add_argument("-f", "--fork", choices=["auto", "data", "rsrc"], default="auto", help="The fork from which to read the resource data, or auto to guess (default: %(default)s)")
ap.add_argument("--no-decompress", action="store_false", dest="decompress", help="Do not decompress compressed resources, output compressed resource data as-is")
ap.add_argument("--format", choices=["dump", "hex", "raw", "derez"], default="dump", help="How to output the resources - human-readable info with hex dump (dump), data only as hex (hex), data only as raw bytes (raw), or like DeRez with no resource definitions (derez)")
ap.add_argument("--format", choices=["dump", "dump-text", "hex", "raw", "derez"], default="dump", help="How to output the resources - human-readable info with hex dump (dump) (default), human-readable info with newline-translated data (dump-text), data only as hex (hex), data only as raw bytes (raw), or like DeRez with no resource definitions (derez)")
ap.add_argument("--group", action="store", choices=["none", "type", "id"], default="type", help="Group resources in list view by type or ID, or disable grouping (default: type)")
ap.add_argument("--no-sort", action="store_false", dest="sort", help="Output resources in the order in which they are stored in the file, instead of sorting them by type and ID")
ap.add_argument("--header-system", action="store_true", help="Output system-reserved header data and nothing else")
ap.add_argument("--header-application", action="store_true", help="Output application-specific header data and nothing else")
@ -237,9 +257,173 @@ def main():
ap.add_argument("filter", nargs="*", help="One or more filters to select which resources to display, or omit to show an overview of all resources")
ns = ap.parse_args()
return ns
def show_header_data(data: bytes, *, format: str) -> None:
if format == "dump":
hexdump(data)
elif format == "dump-text":
print(translate_text(data))
elif format == "hex":
raw_hexdump(data)
elif format == "raw":
sys.stdout.buffer.write(data)
elif format == "derez":
print("Cannot output file header data in derez format", file=sys.stderr)
sys.exit(1)
else:
raise ValueError(f"Unhandled output format: {format}")
def show_filtered_resources(resources: typing.Sequence[api.Resource], format: str, decompress: bool) -> None:
if not resources:
if format in ("dump", "dump-text"):
print("No resources matched the filter")
elif format in ("hex", "raw"):
print("No resources matched the filter", file=sys.stderr)
sys.exit(1)
elif format == "derez":
print("/* No resources matched the filter */")
else:
raise AssertionError(f"Unhandled output format: {format}")
elif format in ("hex", "raw") and len(resources) != 1:
print(f"Format {format} can only output a single resource, but the filter matched {len(resources)} resources", file=sys.stderr)
sys.exit(1)
for res in resources:
if decompress:
data = res.data
else:
data = res.data_raw
if format in ("dump", "dump-text"):
# Human-readable info and hex or text dump
desc = describe_resource(res, include_type=True, decompress=decompress)
print(f"Resource {desc}:")
if format == "dump":
hexdump(data)
elif format == "dump-text":
print(translate_text(data))
else:
raise AssertionError(f"Unhandled format: {format!r}")
print()
elif format == "hex":
# Data only as hex
raw_hexdump(data)
elif format == "raw":
# Data only as raw bytes
sys.stdout.buffer.write(data)
elif format == "derez":
# Like DeRez with no resource definitions
attrs = list(decompose_flags(res.attributes))
if decompress and api.ResourceAttrs.resCompressed in attrs:
attrs.remove(api.ResourceAttrs.resCompressed)
attrs_comment = " /* was compressed */"
else:
attrs_comment = ""
attr_descs_with_none = [_REZ_ATTR_NAMES[attr] for attr in attrs]
if None in attr_descs_with_none:
attr_descs = [f"${res.attributes.value:02X}"]
else:
attr_descs = typing.cast(typing.List[str], attr_descs_with_none)
parts = [str(res.id)]
if res.name is not None:
name = bytes_escape(res.name, quote='"')
parts.append(f'"{name}"')
parts += attr_descs
restype = bytes_escape(res.type, quote="'")
print(f"data '{restype}' ({', '.join(parts)}{attrs_comment}) {{")
for i in range(0, len(data), 16):
# Two-byte grouping is really annoying to implement.
groups = []
for j in range(0, 16, 2):
if i+j >= len(data):
break
elif i+j+1 >= len(data):
groups.append(f"{data[i+j]:02X}")
else:
groups.append(f"{data[i+j]:02X}{data[i+j+1]:02X}")
s = f'$"{" ".join(groups)}"'
comment = "/* " + data[i:i + 16].decode(_TEXT_ENCODING).translate(_TRANSLATE_NONPRINTABLES) + " */"
print(f"\t{s:<54s}{comment}")
print("};")
print()
else:
raise ValueError(f"Unhandled output format: {format}")
def list_resource_file(rf: api.ResourceFile, *, sort: bool, group: str, decompress: bool) -> None:
if rf.header_system_data != bytes(len(rf.header_system_data)):
print("Header system data:")
hexdump(rf.header_system_data)
if rf.header_application_data != bytes(len(rf.header_application_data)):
print("Header application data:")
hexdump(rf.header_application_data)
attrs = decompose_flags(rf.file_attributes)
if attrs:
print("File attributes: " + " | ".join(attr.name for attr in attrs))
if len(rf) == 0:
print("No resources (empty resource file)")
return
if group == "none":
all_resources: typing.List[api.Resource] = []
for reses in rf.values():
all_resources.extend(reses.values())
if sort:
all_resources.sort(key=lambda res: (res.type, res.id))
print(f"{len(all_resources)} resources:")
for res in all_resources:
print(describe_resource(res, include_type=True, decompress=decompress))
elif group == "type":
print(f"{len(rf)} resource types:")
restype_items: typing.Collection[typing.Tuple[bytes, typing.Mapping[int, api.Resource]]] = rf.items()
if sort:
restype_items = sorted(restype_items, key=lambda item: item[0])
for typecode, resources_map in restype_items:
restype = bytes_escape(typecode, quote="'")
print(f"'{restype}': {len(resources_map)} resources:")
resources_items: typing.Collection[typing.Tuple[int, api.Resource]] = resources_map.items()
if sort:
resources_items = sorted(resources_items, key=lambda item: item[0])
for resid, res in resources_items:
print(describe_resource(res, include_type=False, decompress=decompress))
print()
elif group == "id":
all_resources = []
for reses in rf.values():
all_resources.extend(reses.values())
all_resources.sort(key=lambda res: res.id)
resources_by_id = {resid: list(reses) for resid, reses in itertools.groupby(all_resources, key=lambda res: res.id)}
print(f"{len(resources_by_id)} resource IDs:")
for resid, resources in resources_by_id.items():
print(f"({resid}): {len(resources)} resources:")
if sort:
resources.sort(key=lambda res: res.type)
for res in resources:
print(describe_resource(res, include_type=True, decompress=decompress))
print()
else:
raise AssertionError(f"Unhandled group mode: {group!r}")
def main() -> typing.NoReturn:
ns = parse_args()
if ns.file == "-":
if ns.fork is not None:
if ns.fork != "auto":
print("Cannot specify an explicit fork when reading from stdin", file=sys.stderr)
sys.exit(1)
@ -254,127 +438,21 @@ def main():
else:
data = rf.header_application_data
if ns.format == "dump":
_hexdump(data)
elif ns.format == "hex":
_raw_hexdump(data)
elif ns.format == "raw":
sys.stdout.buffer.write(data)
elif ns.format == "derez":
print("Cannot output file header data in derez format", file=sys.stderr)
sys.exit(1)
else:
raise ValueError(f"Unhandled output format: {ns.format}")
show_header_data(data, format=ns.format)
elif ns.filter or ns.all:
if ns.filter:
resources = _filter_resources(rf, ns.filter)
resources = filter_resources(rf, ns.filter)
else:
resources = []
for reses in rf.values():
resources.extend(reses.values())
if not resources:
if ns.format == "dump":
print("No resources matched the filter")
elif ns.format in ("hex", "raw"):
print("No resources matched the filter", file=sys.stderr)
sys.exit(1)
elif ns.format == "derez":
print("/* No resources matched the filter */")
else:
raise AssertionError(f"Unhandled output format: {ns.format}")
elif ns.format in ("hex", "raw") and len(resources) != 1:
print(f"Format {ns.format} can only output a single resource, but the filter matched {len(resources)} resources", file=sys.stderr)
sys.exit(1)
if ns.sort:
resources.sort(key=lambda res: (res.type, res.id))
for res in resources:
if ns.decompress:
data = res.data
else:
data = res.data_raw
if ns.format == "dump":
# Human-readable info and hex dump
desc = _describe_resource(res, include_type=True, decompress=ns.decompress)
print(f"Resource {desc}:")
_hexdump(data)
print()
elif ns.format == "hex":
# Data only as hex
_raw_hexdump(data)
elif ns.format == "raw":
# Data only as raw bytes
sys.stdout.buffer.write(data)
elif ns.format == "derez":
# Like DeRez with no resource definitions
attrs = list(_decompose_flags(res.attributes))
if ns.decompress and api.ResourceAttrs.resCompressed in attrs:
attrs.remove(api.ResourceAttrs.resCompressed)
attrs_comment = " /* was compressed */"
else:
attrs_comment = ""
attr_descs = [_REZ_ATTR_NAMES[attr] for attr in attrs]
if None in attr_descs:
attr_descs[:] = [f"${res.attributes.value:02X}"]
parts = [str(res.resource_id)]
if res.name is not None:
name = _bytes_escape(res.name, quote='"')
parts.append(f'"{name}"')
parts += attr_descs
restype = _bytes_escape(res.resource_type, quote="'")
print(f"data '{restype}' ({', '.join(parts)}{attrs_comment}) {{")
for i in range(0, len(data), 16):
# Two-byte grouping is really annoying to implement.
groups = []
for j in range(0, 16, 2):
if i+j >= len(data):
break
elif i+j+1 >= len(data):
groups.append(f"{data[i+j]:02X}")
else:
groups.append(f"{data[i+j]:02X}{data[i+j+1]:02X}")
s = f'$"{" ".join(groups)}"'
comment = "/* " + data[i:i + 16].decode(_TEXT_ENCODING).translate(_TRANSLATE_NONPRINTABLES) + " */"
print(f"\t{s:<54s}{comment}")
print("};")
print()
else:
raise ValueError(f"Unhandled output format: {ns.format}")
show_filtered_resources(resources, format=ns.format, decompress=ns.decompress)
else:
if rf.header_system_data != bytes(len(rf.header_system_data)):
print("Header system data:")
_hexdump(rf.header_system_data)
if rf.header_application_data != bytes(len(rf.header_application_data)):
print("Header application data:")
_hexdump(rf.header_application_data)
attrs = _decompose_flags(rf.file_attributes)
if attrs:
print("File attributes: " + " | ".join(attr.name for attr in attrs))
if len(rf) > 0:
print(f"{len(rf)} resource types:")
for typecode, resources in rf.items():
restype = _bytes_escape(typecode, quote="'")
print(f"'{restype}': {len(resources)} resources:")
for resid, res in rf[typecode].items():
print(_describe_resource(res, include_type=False, decompress=ns.decompress))
print()
else:
print("No resource types (empty resource file)")
list_resource_file(rf, sort=ns.sort, group=ns.group, decompress=ns.decompress)
sys.exit(0)

View File

@ -4,6 +4,7 @@ import enum
import io
import os
import struct
import types
import typing
import warnings
@ -96,20 +97,26 @@ class ResourceAttrs(enum.Flag):
class Resource(object):
"""A single resource from a resource file."""
__slots__ = ("resource_type", "resource_id", "name", "attributes", "data_raw", "_data_decompressed")
type: bytes
id: int
name: typing.Optional[bytes]
attributes: ResourceAttrs
data_raw: bytes
_compressed_info: compress.common.CompressedHeaderInfo
_data_decompressed: bytes
def __init__(self, resource_type: bytes, resource_id: int, name: typing.Optional[bytes], attributes: ResourceAttrs, data_raw: bytes):
def __init__(self, resource_type: bytes, resource_id: int, name: typing.Optional[bytes], attributes: ResourceAttrs, data_raw: bytes) -> None:
"""Create a new resource with the given type code, ID, name, attributes, and data."""
super().__init__()
self.resource_type: bytes = resource_type
self.resource_id: int = resource_id
self.name: typing.Optional[bytes] = name
self.attributes: ResourceAttrs = attributes
self.data_raw: bytes = data_raw
self.type = resource_type
self.id = resource_id
self.name = name
self.attributes = attributes
self.data_raw = data_raw
def __repr__(self):
def __repr__(self) -> str:
try:
data = self.data
except compress.DecompressError:
@ -126,7 +133,54 @@ class Resource(object):
if not decompress_ok:
data_repr = f"<decompression failed - compressed data: {data_repr}>"
return f"{type(self).__module__}.{type(self).__qualname__}(resource_type={self.resource_type}, resource_id={self.resource_id}, name={self.name}, attributes={self.attributes}, data={data_repr})"
return f"{type(self).__module__}.{type(self).__qualname__}(type={self.type}, id={self.id}, name={self.name}, attributes={self.attributes}, data={data_repr})"
@property
def resource_type(self) -> bytes:
warnings.warn(DeprecationWarning("The resource_type attribute has been deprecated and will be removed in a future version. Please use the type attribute instead."))
return self.type
@property
def resource_id(self) -> int:
warnings.warn(DeprecationWarning("The resource_id attribute has been deprecated and will be removed in a future version. Please use the id attribute instead."))
return self.id
@property
def compressed_info(self) -> typing.Optional[compress.common.CompressedHeaderInfo]:
"""The compressed resource header information, or None if this resource is not compressed.
Accessing this attribute may raise a DecompressError if the resource data is compressed and the header could not be parsed. To access the unparsed header data, use the data_raw attribute.
"""
if ResourceAttrs.resCompressed in self.attributes:
try:
return self._compressed_info
except AttributeError:
self._compressed_info = compress.common.CompressedHeaderInfo.parse(self.data_raw)
return self._compressed_info
else:
return None
@property
def length_raw(self) -> int:
"""The length of the raw resource data, which may be compressed.
Accessing this attribute may be faster than computing len(self.data_raw) manually.
"""
return len(self.data_raw)
@property
def length(self) -> int:
"""The length of the resource data. If the resource data is compressed, this is the length of the data after decompression.
Accessing this attribute may be faster than computing len(self.data) manually.
"""
if self.compressed_info is not None:
return self.compressed_info.decompressed_length
else:
return self.length_raw
@property
def data(self) -> bytes:
@ -135,42 +189,46 @@ class Resource(object):
Accessing this attribute may raise a DecompressError if the resource data is compressed and could not be decompressed. To access the compressed resource data, use the data_raw attribute.
"""
if ResourceAttrs.resCompressed in self.attributes:
if self.compressed_info is not None:
try:
return self._data_decompressed
except AttributeError:
self._data_decompressed = compress.decompress(self.data_raw)
self._data_decompressed = compress.decompress_parsed(self.compressed_info, self.data_raw[self.compressed_info.header_length:])
return self._data_decompressed
else:
return self.data_raw
class ResourceFile(collections.abc.Mapping):
class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.ContextManager["ResourceFile"]):
"""A resource file reader operating on a byte stream."""
# noinspection PyProtectedMember
class _LazyResourceMap(collections.abc.Mapping):
class _LazyResourceMap(typing.Mapping[int, Resource]):
"""Internal class: Lazy mapping of resource IDs to resource objects, returned when subscripting a ResourceFile."""
def __init__(self, resfile: "ResourceFile", restype: bytes):
_resfile: "ResourceFile"
_restype: bytes
_submap: typing.Mapping[int, typing.Tuple[int, ResourceAttrs, int]]
def __init__(self, resfile: "ResourceFile", restype: bytes) -> None:
"""Create a new _LazyResourceMap "containing" all resources in resfile that have the type code restype."""
super().__init__()
self._resfile: "ResourceFile" = resfile
self._restype: bytes = restype
self._submap: typing.Mapping[int, typing.Tuple[int, ResourceAttrs, int]] = self._resfile._references[self._restype]
self._resfile = resfile
self._restype = restype
self._submap = self._resfile._references[self._restype]
def __len__(self):
def __len__(self) -> int:
"""Get the number of resources with this type code."""
return len(self._submap)
def __iter__(self):
def __iter__(self) -> typing.Iterator[int]:
"""Iterate over the IDs of all resources with this type code."""
return iter(self._submap)
def __contains__(self, key: int):
def __contains__(self, key: object) -> bool:
"""Check if a resource with the given ID exists for this type code."""
return key in self._submap
@ -193,14 +251,31 @@ class ResourceFile(collections.abc.Mapping):
return Resource(self._restype, key, name, attributes, data)
def __repr__(self):
def __repr__(self) -> str:
if len(self) == 1:
return f"<{type(self).__module__}.{type(self).__qualname__} at {id(self):#x} containing one resource: {next(iter(self.values()))}>"
else:
return f"<{type(self).__module__}.{type(self).__qualname__} at {id(self):#x} containing {len(self)} resources with IDs: {list(self)}>"
_close_stream: bool
_stream: typing.BinaryIO
data_offset: int
map_offset: int
data_length: int
map_length: int
header_system_data: bytes
header_application_data: bytes
map_type_list_offset: int
map_name_list_offset: int
file_attributes: ResourceFileAttrs
_reference_counts: typing.MutableMapping[bytes, int]
_references: typing.MutableMapping[bytes, typing.MutableMapping[int, typing.Tuple[int, ResourceAttrs, int]]]
@classmethod
def open(cls, filename: typing.Union[str, bytes, os.PathLike], *, fork: str="auto", **kwargs) -> "ResourceFile":
def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str="auto", **kwargs: typing.Any) -> "ResourceFile":
"""Open the file at the given path as a ResourceFile.
The fork parameter controls which fork of the file the resource data will be read from. It accepts the following values:
@ -259,7 +334,7 @@ class ResourceFile(collections.abc.Mapping):
else:
raise ValueError(f"Unsupported value for the fork parameter: {fork!r}")
def __init__(self, stream: typing.io.BinaryIO, *, close: bool=False):
def __init__(self, stream: typing.BinaryIO, *, close: bool=False) -> None:
"""Create a ResourceFile wrapping the given byte stream.
To read resource file data from a bytes object, wrap it in an io.BytesIO.
@ -273,8 +348,7 @@ class ResourceFile(collections.abc.Mapping):
super().__init__()
self._close_stream: bool = close
self._stream: typing.io.BinaryIO
self._close_stream = close
if stream.seekable():
self._stream = stream
else:
@ -298,7 +372,7 @@ class ResourceFile(collections.abc.Mapping):
raise InvalidResourceFileError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes")
return data
def _stream_unpack(self, st: struct.Struct) -> typing.Tuple:
def _stream_unpack(self, st: struct.Struct) -> tuple:
"""Unpack data from the stream according to the struct st. The number of bytes to read is determined using st.size, so variable-sized structs cannot be used with this method."""
try:
@ -306,17 +380,11 @@ class ResourceFile(collections.abc.Mapping):
except struct.error as e:
raise InvalidResourceFileError(str(e))
def _read_header(self):
def _read_header(self) -> None:
"""Read the resource file header, starting at the current stream position."""
assert self._stream.tell() == 0
self.data_offset: int
self.map_offset: int
self.data_length: int
self.map_length: int
self.header_system_data: bytes
self.header_application_data: bytes
(
self.data_offset,
self.map_offset,
@ -329,25 +397,23 @@ class ResourceFile(collections.abc.Mapping):
if self._stream.tell() != self.data_offset:
raise InvalidResourceFileError(f"The data offset ({self.data_offset}) should point exactly to the end of the file header ({self._stream.tell()})")
def _read_map_header(self):
def _read_map_header(self) -> None:
"""Read the map header, starting at the current stream position."""
assert self._stream.tell() == self.map_offset
self.map_type_list_offset: int
self.map_name_list_offset: int
(
_file_attributes,
self.map_type_list_offset,
self.map_name_list_offset,
) = self._stream_unpack(STRUCT_RESOURCE_MAP_HEADER)
self.file_attributes: ResourceFileAttrs = ResourceFileAttrs(_file_attributes)
self.file_attributes = ResourceFileAttrs(_file_attributes)
def _read_all_resource_types(self):
def _read_all_resource_types(self) -> None:
"""Read all resource types, starting at the current stream position."""
self._reference_counts: typing.MutableMapping[bytes, int] = collections.OrderedDict()
self._reference_counts = collections.OrderedDict()
(type_list_length_m1,) = self._stream_unpack(STRUCT_RESOURCE_TYPE_LIST_HEADER)
type_list_length = (type_list_length_m1 + 1) % 0x10000
@ -361,10 +427,10 @@ class ResourceFile(collections.abc.Mapping):
count = (count_m1 + 1) % 0x10000
self._reference_counts[resource_type] = count
def _read_all_references(self):
def _read_all_references(self) -> None:
"""Read all resource references, starting at the current stream position."""
self._references: typing.MutableMapping[bytes, typing.MutableMapping[int, typing.Tuple[int, ResourceAttrs, int]]] = collections.OrderedDict()
self._references = collections.OrderedDict()
for resource_type, count in self._reference_counts.items():
resmap: typing.MutableMapping[int, typing.Tuple[int, ResourceAttrs, int]] = collections.OrderedDict()
@ -381,7 +447,7 @@ class ResourceFile(collections.abc.Mapping):
resmap[resource_id] = (name_offset, ResourceAttrs(attributes), data_offset)
def close(self):
def close(self) -> None:
"""Close this ResourceFile.
If close=True was passed when this ResourceFile was created, the underlying stream's close method is called as well.
@ -390,23 +456,29 @@ class ResourceFile(collections.abc.Mapping):
if self._close_stream:
self._stream.close()
def __enter__(self):
pass
def __enter__(self) -> "ResourceFile":
return self
def __exit__(self, exc_type, exc_val, exc_tb):
def __exit__(
self,
exc_type: typing.Optional[typing.Type[BaseException]],
exc_val: typing.Optional[BaseException],
exc_tb: typing.Optional[types.TracebackType]
) -> typing.Optional[bool]:
self.close()
return None
def __len__(self):
def __len__(self) -> int:
"""Get the number of resource types in this ResourceFile."""
return len(self._references)
def __iter__(self):
def __iter__(self) -> typing.Iterator[bytes]:
"""Iterate over all resource types in this ResourceFile."""
return iter(self._references)
def __contains__(self, key: bytes):
def __contains__(self, key: object) -> bool:
"""Check whether this ResourceFile contains any resources of the given type."""
return key in self._references
@ -416,5 +488,5 @@ class ResourceFile(collections.abc.Mapping):
return ResourceFile._LazyResourceMap(self, key)
def __repr__(self):
def __repr__(self) -> str:
return f"<{type(self).__module__}.{type(self).__qualname__} at {id(self):#x}, attributes {self.file_attributes}, containing {len(self)} resource types: {list(self)}>"

View File

@ -1,97 +1,63 @@
import struct
import io
import typing
from . import dcmp0
from . import dcmp1
from . import dcmp2
from .common import DecompressError
from .common import DecompressError, CompressedHeaderInfo
__all__ = [
"CompressedHeaderInfo",
"DecompressError",
"decompress",
"decompress_parsed",
"decompress_stream",
"decompress_stream_parsed",
]
# The signature of all compressed resource data, 0xa89f6572 in hex, or "®üer" in MacRoman.
COMPRESSED_SIGNATURE = b"\xa8\x9fer"
# The compression type commonly used for application resources.
COMPRESSED_TYPE_APPLICATION = 0x0801
# The compression type commonly used for System file resources.
COMPRESSED_TYPE_SYSTEM = 0x0901
# Common header for compressed resources of all types.
# 4 bytes: Signature (see above).
# 2 bytes: Length of the complete header (this common part and the type-specific part that follows it). (This meaning is just a guess - the field's value is always 0x0012, so there's no way to know for certain what it means.)
# 2 bytes: Compression type. Known so far: 0x0901 is used in the System file's resources. 0x0801 is used in other files' resources.
# 4 bytes: Length of the data after decompression.
STRUCT_COMPRESSED_HEADER = struct.Struct(">4sHHI")
# Header continuation part for an "application" compressed resource.
# 1 byte: "Working buffer fractional size" - the ratio of the compressed data size to the uncompressed data size, times 256.
# 1 byte: "Expansion buffer size" - the maximum number of bytes that the data might grow during decompression.
# 2 bytes: The ID of the 'dcmp' resource that can decompress this resource. Currently only ID 0 is supported.
# 2 bytes: Reserved (always zero).
STRUCT_COMPRESSED_APPLICATION_HEADER = struct.Struct(">BBhH")
# Header continuation part for a "system" compressed resource.
# 2 bytes: The ID of the 'dcmp' resource that can decompress this resource. Currently only ID 2 is supported.
# 4 bytes: Decompressor-specific parameters.
STRUCT_COMPRESSED_SYSTEM_HEADER = struct.Struct(">h4s")
# Maps 'dcmp' IDs to their corresponding Python implementations.
# Each decompressor has the signature (header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes].
DECOMPRESSORS = {
0: dcmp0.decompress_stream,
1: dcmp1.decompress_stream,
2: dcmp2.decompress_stream,
}
def _decompress_application(data: bytes, decompressed_length: int, *, debug: bool=False) -> bytes:
working_buffer_fractional_size, expansion_buffer_size, dcmp_id, reserved = STRUCT_COMPRESSED_APPLICATION_HEADER.unpack_from(data)
def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Decompress compressed resource data from a stream, whose header has already been read and parsed into a CompressedHeaderInfo object."""
try:
decompress_func = DECOMPRESSORS[header_info.dcmp_id]
except KeyError:
raise DecompressError(f"Unsupported 'dcmp' ID: {header_info.dcmp_id}")
decompressed_length = 0
for chunk in decompress_func(header_info, stream, debug=debug):
decompressed_length += len(chunk)
yield chunk
if decompressed_length != header_info.decompressed_length:
raise DecompressError(f"Actual length of decompressed data ({decompressed_length}) does not match length stored in resource ({header_info.decompressed_length})")
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
"""Decompress the given compressed resource data, whose header has already been removed and parsed into a CompressedHeaderInfo object."""
return b"".join(decompress_stream_parsed(header_info, io.BytesIO(data), debug=debug))
def decompress_stream(stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Decompress compressed resource data from a stream."""
header_info = CompressedHeaderInfo.parse_stream(stream)
if debug:
print(f"Working buffer fractional size: {working_buffer_fractional_size} (=> {len(data) * 256 / working_buffer_fractional_size})")
print(f"Expansion buffer size: {expansion_buffer_size}")
print(f"Compressed resource data header: {header_info}")
if dcmp_id == 0:
decompress_func = dcmp0.decompress
elif dcmp_id == 1:
decompress_func = dcmp1.decompress
else:
raise DecompressError(f"Unsupported 'dcmp' ID: {dcmp_id}, expected 0 or 1")
if reserved != 0:
raise DecompressError(f"Reserved field should be 0, not 0x{reserved:>04x}")
return decompress_func(data[STRUCT_COMPRESSED_APPLICATION_HEADER.size:], decompressed_length, debug=debug)
def _decompress_system(data: bytes, decompressed_length: int, *, debug: bool=False) -> bytes:
dcmp_id, params = STRUCT_COMPRESSED_SYSTEM_HEADER.unpack_from(data)
if dcmp_id == 2:
decompress_func = dcmp2.decompress
else:
raise DecompressError(f"Unsupported 'dcmp' ID: {dcmp_id}, expected 2")
return decompress_func(data[STRUCT_COMPRESSED_SYSTEM_HEADER.size:], decompressed_length, params, debug=debug)
yield from decompress_stream_parsed(header_info, stream, debug=debug)
def decompress(data: bytes, *, debug: bool=False) -> bytes:
"""Decompress the given compressed resource data."""
try:
signature, header_length, compression_type, decompressed_length = STRUCT_COMPRESSED_HEADER.unpack_from(data)
except struct.error:
raise DecompressError(f"Invalid header")
if signature != COMPRESSED_SIGNATURE:
raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE}")
if header_length != 0x12:
raise DecompressError(f"Unsupported header length: 0x{header_length:>04x}, expected 0x12")
if compression_type == COMPRESSED_TYPE_APPLICATION:
decompress_func = _decompress_application
elif compression_type == COMPRESSED_TYPE_SYSTEM:
decompress_func = _decompress_system
else:
raise DecompressError(f"Unsupported compression type: 0x{compression_type:>04x}")
if debug:
print(f"Decompressed length: {decompressed_length}")
decompressed = decompress_func(data[STRUCT_COMPRESSED_HEADER.size:], decompressed_length, debug=debug)
if len(decompressed) != decompressed_length:
raise DecompressError(f"Actual length of decompressed data ({len(decompressed)}) does not match length stored in resource ({decompressed_length})")
return decompressed
return b"".join(decompress_stream(io.BytesIO(data), debug=debug))

View File

@ -1,3 +1,5 @@
import io
import struct
import typing
@ -5,19 +7,192 @@ class DecompressError(Exception):
"""Raised when resource data decompression fails, because the data is invalid or the compression type is not supported."""
def _read_variable_length_integer(data: bytes, position: int) -> typing.Tuple[int, int]:
"""Read a variable-length integer starting at the given position in the data, and return the integer as well as the number of bytes consumed.
# The signature of all compressed resource data, 0xa89f6572 in hex, or "®üer" in MacRoman.
COMPRESSED_SIGNATURE = b"\xa8\x9fer"
# The number of the "type 8" compression type. This type is used in the Finder, ResEdit, and some other system files.
COMPRESSED_TYPE_8 = 0x0801
# The number of the "type 9" compression type. This type is used in the System file and System 7.5's Installer.
COMPRESSED_TYPE_9 = 0x0901
# Common header for compressed resources of all types.
# 4 bytes: Signature (see above).
# 2 bytes: Length of the complete header (this common part and the type-specific part that follows it). (This meaning is just a guess - the field's value is always 0x0012, so there's no way to know for certain what it means.)
# 2 bytes: Compression type. Known so far: 0x0801 ("type 8") and 0x0901 ("type 9").
# 4 bytes: Length of the data after decompression.
# 6 bytes: Remainder of the header. The exact format varies depending on the compression type.
STRUCT_COMPRESSED_HEADER = struct.Struct(">4sHHI6s")
# Remainder of header for a "type 8" compressed resource.
# 1 byte: "Working buffer fractional size" - the ratio of the compressed data size to the uncompressed data size, times 256.
# 1 byte: "Expansion buffer size" - the maximum number of bytes that the data might grow during decompression.
# 2 bytes: The ID of the 'dcmp' resource that can decompress this resource. Currently only ID 0 is supported.
# 2 bytes: Reserved (always zero).
STRUCT_COMPRESSED_TYPE_8_HEADER = struct.Struct(">BBhH")
# Remainder of header for a "type 9" compressed resource.
# 2 bytes: The ID of the 'dcmp' resource that can decompress this resource. Currently only ID 2 is supported.
# 4 bytes: Decompressor-specific parameters.
STRUCT_COMPRESSED_TYPE_9_HEADER = struct.Struct(">h4s")
class CompressedHeaderInfo(object):
@classmethod
def parse_stream(cls, stream: typing.BinaryIO) -> "CompressedHeaderInfo":
try:
signature, header_length, compression_type, decompressed_length, remainder = STRUCT_COMPRESSED_HEADER.unpack(stream.read(STRUCT_COMPRESSED_HEADER.size))
except struct.error:
raise DecompressError(f"Invalid header")
if signature != COMPRESSED_SIGNATURE:
raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE}")
if header_length != 0x12:
raise DecompressError(f"Unsupported header length: 0x{header_length:>04x}, expected 0x12")
if compression_type == COMPRESSED_TYPE_8:
working_buffer_fractional_size, expansion_buffer_size, dcmp_id, reserved = STRUCT_COMPRESSED_TYPE_8_HEADER.unpack(remainder)
if reserved != 0:
raise DecompressError(f"Reserved field should be 0, not 0x{reserved:>04x}")
return CompressedType8HeaderInfo(header_length, compression_type, decompressed_length, dcmp_id, working_buffer_fractional_size, expansion_buffer_size)
elif compression_type == COMPRESSED_TYPE_9:
dcmp_id, parameters = STRUCT_COMPRESSED_TYPE_9_HEADER.unpack(remainder)
return CompressedType9HeaderInfo(header_length, compression_type, decompressed_length, dcmp_id, parameters)
else:
raise DecompressError(f"Unsupported compression type: 0x{compression_type:>04x}")
@classmethod
def parse(cls, data: bytes) -> "CompressedHeaderInfo":
return cls.parse_stream(io.BytesIO(data))
header_length: int
compression_type: int
decompressed_length: int
dcmp_id: int
def __init__(self, header_length: int, compression_type: int, decompressed_length: int, dcmp_id: int) -> None:
super().__init__()
self.header_length = header_length
self.compression_type = compression_type
self.decompressed_length = decompressed_length
self.dcmp_id = dcmp_id
class CompressedType8HeaderInfo(CompressedHeaderInfo):
working_buffer_fractional_size: int
expansion_buffer_size: int
def __init__(self, header_length: int, compression_type: int, decompressed_length: int, dcmp_id: int, working_buffer_fractional_size: int, expansion_buffer_size: int) -> None:
super().__init__(header_length, compression_type, decompressed_length, dcmp_id)
self.working_buffer_fractional_size = working_buffer_fractional_size
self.expansion_buffer_size = expansion_buffer_size
def __repr__(self) -> str:
return f"{type(self).__qualname__}(header_length={self.header_length}, compression_type=0x{self.compression_type:>04x}, decompressed_length={self.decompressed_length}, dcmp_id={self.dcmp_id}, working_buffer_fractional_size={self.working_buffer_fractional_size}, expansion_buffer_size={self.expansion_buffer_size})"
class CompressedType9HeaderInfo(CompressedHeaderInfo):
parameters: bytes
def __init__(self, header_length: int, compression_type: int, decompressed_length: int, dcmp_id: int, parameters: bytes) -> None:
super().__init__(header_length, compression_type, decompressed_length, dcmp_id)
self.parameters = parameters
def __repr__(self) -> str:
return f"{type(self).__qualname__}(header_length={self.header_length}, compression_type=0x{self.compression_type:>04x}, decompressed_length={self.decompressed_length}, dcmp_id={self.dcmp_id}, parameters={self.parameters!r})"
if typing.TYPE_CHECKING:
class PeekableIO(typing.Protocol):
"""Minimal protocol for binary IO streams that support the peek method.
The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable.
"""
def readable(self) -> bool: ...
def read(self, size: typing.Optional[int] = ...) -> bytes: ...
def peek(self, size: int = ...) -> bytes: ...
class _PeekableIOWrapper(object):
"""Wrapper class to add peek support to an existing stream. Do not instantiate this class directly, use the make_peekable function instead.
Python provides a standard io.BufferedReader class, which supports the peek method. However, according to its documentation, it only supports wrapping io.RawIOBase subclasses, and not streams which are already otherwise buffered.
Warning: this class does not perform any buffering of its own, outside of what is required to make peek work. It is strongly recommended to only wrap streams that are already buffered or otherwise fast to read from. In particular, raw streams (io.RawIOBase subclasses) should be wrapped using io.BufferedReader instead.
"""
_wrapped: typing.BinaryIO
_readahead: bytes
def __init__(self, wrapped: typing.BinaryIO) -> None:
super().__init__()
self._wrapped = wrapped
self._readahead = b""
def readable(self) -> bool:
return self._wrapped.readable()
def read(self, size: typing.Optional[int] = None) -> bytes:
if size is None or size < 0:
ret = self._readahead + self._wrapped.read()
self._readahead = b""
elif size <= len(self._readahead):
ret = self._readahead[:size]
self._readahead = self._readahead[size:]
else:
ret = self._readahead + self._wrapped.read(size - len(self._readahead))
self._readahead = b""
return ret
def peek(self, size: int = -1) -> bytes:
if not self._readahead:
self._readahead = self._wrapped.read(io.DEFAULT_BUFFER_SIZE if size < 0 else size)
return self._readahead
def make_peekable(stream: typing.BinaryIO) -> "PeekableIO":
"""Wrap an arbitrary binary IO stream so that it supports the peek method.
The stream is wrapped as efficiently as possible (or not at all if it already supports the peek method). However, in the worst case a custom wrapper class needs to be used, which may not be particularly efficient and only supports a very minimal interface. The only methods that are guaranteed to exist on the returned stream are readable, read, and peek.
"""
if hasattr(stream, "peek"):
# Stream is already peekable, nothing to be done.
return typing.cast("PeekableIO", stream)
elif isinstance(stream, io.RawIOBase):
# Raw IO streams can be wrapped efficiently using BufferedReader.
return io.BufferedReader(stream)
else:
# Other streams need to be wrapped using our custom wrapper class.
return _PeekableIOWrapper(stream)
def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes:
"""Read byte_count bytes from the stream and raise an exception if too few bytes are read (i. e. if EOF was hit prematurely)."""
data = stream.read(byte_count)
if len(data) != byte_count:
raise DecompressError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes")
return data
def read_variable_length_integer(stream: typing.BinaryIO) -> int:
"""Read a variable-length integer from the stream.
This variable-length integer format is used by the 0xfe codes in the compression formats used by 'dcmp' (0) and 'dcmp' (1).
"""
assert len(data) > position
if data[position] == 0xff:
assert len(data) > position + 4
return int.from_bytes(data[position+1:position+5], "big", signed=True), 5
elif data[position] >= 0x80:
assert len(data) > position + 1
data_modified = bytes([(data[position] - 0xc0) & 0xff, data[position+1]])
return int.from_bytes(data_modified, "big", signed=True), 2
head = read_exact(stream, 1)
if head[0] == 0xff:
return int.from_bytes(read_exact(stream, 4), "big", signed=True)
elif head[0] >= 0x80:
data_modified = bytes([(head[0] - 0xc0) & 0xff]) + read_exact(stream, 1)
return int.from_bytes(data_modified, "big", signed=True)
else:
return int.from_bytes(data[position:position+1], "big", signed=True), 1
return int.from_bytes(head, "big", signed=True)

View File

@ -1,3 +1,6 @@
import io
import typing
from . import common
# Lookup table for codes in range(0x4b, 0xfe).
@ -36,94 +39,73 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
assert len(TABLE) == len(range(0x4b, 0xfe))
def decompress(data: bytes, decompressed_length: int, *, debug: bool=False) -> bytes:
"""Decompress compressed data in the format used by 'dcmp' (0)."""
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
prev_literals = []
decompressed = b""
if not isinstance(header_info, common.CompressedType8HeaderInfo):
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
i = 0
prev_literals: typing.List[bytes] = []
while i < len(data):
byte = data[i]
while True: # Loop is terminated when the EOF marker (0xff) is encountered
(byte,) = common.read_exact(stream, 1)
if debug:
print(f"Tag byte 0x{byte:>02x}, at 0x{i:x}, decompressing to 0x{len(decompressed):x}")
print(f"Tag byte 0x{byte:>02x}")
if byte in range(0x00, 0x20):
# Literal byte sequence.
if byte in (0x00, 0x10):
# The length of the literal data is stored in the next byte.
count_div2 = data[i+1]
begin = i + 2
(count_div2,) = common.read_exact(stream, 1)
else:
# The length of the literal data is stored in the low nibble of the tag byte.
count_div2 = byte >> 0 & 0xf
begin = i + 1
end = begin + 2*count_div2
count = 2 * count_div2
# Controls whether or not the literal is stored so that it can be referenced again later.
do_store = byte >= 0x10
literal = data[begin:end]
literal = common.read_exact(stream, count)
if debug:
print(f"Literal (storing: {do_store})")
print(f"\t-> {literal}")
decompressed += literal
if do_store:
if debug:
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
prev_literals.append(literal)
i = end
yield literal
elif byte in (0x20, 0x21):
# Backreference to a previous literal, 2-byte form.
# This can reference literals with index in range(0x28, 0x228).
table_index = 0x28 + ((byte - 0x20) << 8 | data[i+1])
i += 2
(next_byte,) = common.read_exact(stream, 1)
table_index = 0x28 + ((byte - 0x20) << 8 | next_byte)
if debug:
print(f"Backreference (2-byte form) to 0x{table_index:>02x}")
literal = prev_literals[table_index]
if debug:
print(f"\t-> {literal}")
decompressed += literal
yield prev_literals[table_index]
elif byte == 0x22:
# Backreference to a previous literal, 3-byte form.
# This can reference any literal with index 0x28 and higher, but is only necessary for literals with index 0x228 and higher.
table_index = 0x28 + int.from_bytes(data[i+1:i+3], "big", signed=False)
i += 3
table_index = 0x28 + int.from_bytes(common.read_exact(stream, 2), "big", signed=False)
if debug:
print(f"Backreference (3-byte form) to 0x{table_index:>02x}")
literal = prev_literals[table_index]
if debug:
print(f"\t-> {literal}")
decompressed += literal
yield prev_literals[table_index]
elif byte in range(0x23, 0x4b):
# Backreference to a previous literal, 1-byte form.
# This can reference literals with indices in range(0x28).
table_index = byte - 0x23
i += 1
if debug:
print(f"Backreference (1-byte form) to 0x{table_index:>02x}")
literal = prev_literals[table_index]
if debug:
print(f"\t-> {literal}")
decompressed += literal
yield prev_literals[table_index]
elif byte in range(0x4b, 0xfe):
# Reference into a fixed table of two-byte literals.
# All compressed resources use the same table.
table_index = byte - 0x4b
i += 1
if debug:
print(f"Fixed table reference to 0x{table_index:>02x}")
entry = TABLE[table_index]
if debug:
print(f"\t-> {entry}")
decompressed += entry
yield TABLE[table_index]
elif byte == 0xfe:
# Extended code, whose meaning is controlled by the following byte.
i += 1
kind = data[i]
(kind,) = common.read_exact(stream, 1)
if debug:
print(f"Extended code: 0x{kind:>02x}")
i += 1
if kind == 0x00:
# Compact representation of (part of) a segment loader jump table, as used in 'CODE' (0) resources.
@ -132,37 +114,28 @@ def decompress(data: bytes, decompressed_length: int, *, debug: bool=False) -> b
print(f"Segment loader jump table entries")
# All generated jump table entries have the same segment number.
segment_number_int, length = common._read_variable_length_integer(data, i)
i += length
segment_number_int = common.read_variable_length_integer(stream)
if debug:
print(f"\t-> segment number: {segment_number_int:#x}")
# The tail part of all jump table entries (i. e. everything except for the address).
entry_tail = b"?<" + segment_number_int.to_bytes(2, "big", signed=True) + b"\xa9\xf0"
if debug:
print(f"\t-> tail of first entry: {entry_tail}")
# The tail is output once *without* an address in front, i. e. the first entry's address must be generated manually by a previous code.
decompressed += entry_tail
yield entry_tail
count, length = common._read_variable_length_integer(data, i)
i += length
count = common.read_variable_length_integer(stream)
if count <= 0:
raise common.DecompressError(f"Jump table entry count must be greater than 0, not {count}")
# The second entry's address is stored explicitly.
current_int, length = common._read_variable_length_integer(data, i)
i += length
current_int = common.read_variable_length_integer(stream)
if debug:
print(f"-> address of second entry: {current_int:#x}")
entry = current_int.to_bytes(2, "big", signed=False) + entry_tail
if debug:
print(f"-> second entry: {entry}")
decompressed += entry
print(f"\t-> address of second entry: {current_int:#x}")
yield current_int.to_bytes(2, "big", signed=False) + entry_tail
for _ in range(1, count):
# All further entries' addresses are stored as differences relative to the previous entry's address.
diff, length = common._read_variable_length_integer(data, i)
i += length
diff = common.read_variable_length_integer(stream)
# For some reason, each difference is 6 higher than it should be.
diff -= 6
@ -170,10 +143,7 @@ def decompress(data: bytes, decompressed_length: int, *, debug: bool=False) -> b
current_int = (current_int + diff) & 0xffff
if debug:
print(f"\t-> difference {diff:#x}: {current_int:#x}")
entry = current_int.to_bytes(2, "big", signed=False) + entry_tail
if debug:
print(f"\t-> {entry}")
decompressed += entry
yield current_int.to_bytes(2, "big", signed=False) + entry_tail
elif kind in (0x02, 0x03):
# Repeat 1 or 2 bytes a certain number of times.
@ -188,23 +158,19 @@ def decompress(data: bytes, decompressed_length: int, *, debug: bool=False) -> b
print(f"Repeat {byte_count}-byte value")
# The byte(s) to repeat, stored as a variable-length integer. The value is treated as unsigned, i. e. the integer is never negative.
to_repeat_int, length = common._read_variable_length_integer(data, i)
i += length
to_repeat_int = common.read_variable_length_integer(stream)
try:
to_repeat = to_repeat_int.to_bytes(byte_count, "big", signed=False)
except OverflowError:
raise common.DecompressError(f"Value to repeat out of range for {byte_count}-byte repeat: {to_repeat_int:#x}")
count_m1, length = common._read_variable_length_integer(data, i)
i += length
count = count_m1 + 1
count = common.read_variable_length_integer(stream) + 1
if count <= 0:
raise common.DecompressError(f"Repeat count must be positive: {count}")
repeated = to_repeat * count
if debug:
print(f"\t-> {to_repeat} * {count}: {repeated}")
decompressed += repeated
print(f"\t-> {to_repeat} * {count}")
yield to_repeat * count
elif kind == 0x04:
# A sequence of 16-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
@ -212,18 +178,16 @@ def decompress(data: bytes, decompressed_length: int, *, debug: bool=False) -> b
print(f"Difference-encoded 16-bit integers")
# The first integer is stored explicitly, as a signed value.
initial_int, length = common._read_variable_length_integer(data, i)
i += length
initial_int = common.read_variable_length_integer(stream)
try:
initial = initial_int.to_bytes(2, "big", signed=True)
except OverflowError:
raise common.DecompressError(f"Initial value out of range for 16-bit integer difference encoding: {initial_int:#x}")
if debug:
print(f"\t-> initial: {initial}")
decompressed += initial
print(f"\t-> initial: 0x{initial_int:>04x}")
yield initial
count, length = common._read_variable_length_integer(data, i)
i += length
count = common.read_variable_length_integer(stream)
if count < 0:
raise common.DecompressError(f"Count cannot be negative: {count}")
@ -232,64 +196,74 @@ def decompress(data: bytes, decompressed_length: int, *, debug: bool=False) -> b
for _ in range(count):
# The difference to the previous integer is stored as an 8-bit signed integer.
# The usual variable-length integer format is *not* used here.
diff = int.from_bytes(data[i:i+1], "big", signed=True)
i += 1
diff = int.from_bytes(common.read_exact(stream, 1), "big", signed=True)
# Simulate 16-bit integer wraparound.
current_int = (current_int + diff) & 0xffff
current = current_int.to_bytes(2, "big", signed=False)
if debug:
print(f"\t-> difference {diff:#x}: {current}")
decompressed += current
print(f"\t-> difference {diff:#x}: 0x{current_int:>04x}")
yield current_int.to_bytes(2, "big", signed=False)
elif kind == 0x06:
# A sequence of 32-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
if debug:
print(f"Difference-encoded 16-bit integers")
print(f"Difference-encoded 32-bit integers")
# The first integer is stored explicitly, as a signed value.
initial_int, length = common._read_variable_length_integer(data, i)
i += length
initial_int = common.read_variable_length_integer(stream)
try:
initial = initial_int.to_bytes(4, "big", signed=True)
except OverflowError:
raise common.DecompressError(f"Initial value out of range for 32-bit integer difference encoding: {initial_int:#x}")
if debug:
print(f"\t-> initial: {initial}")
decompressed += initial
print(f"\t-> initial: 0x{initial_int:>08x}")
yield initial
count, length = common._read_variable_length_integer(data, i)
i += length
count = common.read_variable_length_integer(stream)
assert count >= 0
# To make the following calculations simpler, the signed initial_int value is converted to unsigned.
current_int = initial_int & 0xffffffff
for _ in range(count):
# The difference to the previous integer is stored as a variable-length integer, whose value may be negative.
diff, length = common._read_variable_length_integer(data, i)
i += length
diff = common.read_variable_length_integer(stream)
# Simulate 32-bit integer wraparound.
current_int = (current_int + diff) & 0xffffffff
current = current_int.to_bytes(4, "big", signed=False)
if debug:
print(f"\t-> difference {diff:#x}: {current}")
decompressed += current
print(f"\t-> difference {diff:#x}: 0x{current_int:>08x}")
yield current_int.to_bytes(4, "big", signed=False)
else:
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
elif byte == 0xff:
# End of data marker, always occurs exactly once as the last byte of the compressed data.
if debug:
print("End marker")
if i != len(data) - 1:
raise common.DecompressError(f"End marker reached at {i}, before the expected end of data at {len(data) - 1}")
i += 1
# Check that there really is no more data left.
extra = stream.read(1)
if extra:
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra})")
break
else:
raise common.DecompressError(f"Unknown tag byte: 0x{data[i]:>02x}")
if decompressed_length % 2 != 0 and len(decompressed) == decompressed_length + 1:
# Special case: if the decompressed data length stored in the header is odd and one less than the length of the actual decompressed data, drop the last byte.
# This is necessary because nearly all codes generate data in groups of 2 or 4 bytes, so it is basically impossible to represent data with an odd length using this compression format.
decompressed = decompressed[:-1]
return decompressed
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (0)."""
decompressed_length = 0
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
if debug:
print(f"\t-> {chunk}")
if header_info.decompressed_length % 2 != 0 and decompressed_length + len(chunk) == header_info.decompressed_length + 1:
# Special case: if the decompressed data length stored in the header is odd and one less than the length of the actual decompressed data, drop the last byte.
# This is necessary because nearly all codes generate data in groups of 2 or 4 bytes, so it is basically impossible to represent data with an odd length using this compression format.
decompressed_length += len(chunk) - 1
yield chunk[:-1]
else:
decompressed_length += len(chunk)
yield chunk
if debug:
print(f"Decompressed {decompressed_length:#x} bytes so far")

View File

@ -1,3 +1,6 @@
import io
import typing
from . import common
# Lookup table for codes in range(0xd5, 0xfe).
@ -19,96 +22,75 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
assert len(TABLE) == len(range(0xd5, 0xfe))
def decompress(data: bytes, decompressed_length: int, *, debug: bool=False) -> bytes:
"""Decompress compressed data in the format used by 'dcmp' (1)."""
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
prev_literals = []
decompressed = b""
if not isinstance(header_info, common.CompressedType8HeaderInfo):
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
i = 0
prev_literals: typing.List[bytes] = []
while i < len(data):
byte = data[i]
while True: # Loop is terminated when the EOF marker (0xff) is encountered
(byte,) = common.read_exact(stream, 1)
if debug:
print(f"Tag byte 0x{byte:>02x}, at 0x{i:x}, decompressing to 0x{len(decompressed):x}")
print(f"Tag byte 0x{byte:>02x}")
if byte in range(0x00, 0x20):
# Literal byte sequence, 1-byte header.
# The length of the literal data is stored in the low nibble of the tag byte.
count = (byte >> 0 & 0xf) + 1
begin = i + 1
end = begin + count
# Controls whether or not the literal is stored so that it can be referenced again later.
do_store = byte >= 0x10
literal = data[begin:end]
literal = common.read_exact(stream, count)
if debug:
print(f"Literal (1-byte header, storing: {do_store})")
print(f"\t-> {literal}")
decompressed += literal
if do_store:
if debug:
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
prev_literals.append(literal)
i = end
yield literal
elif byte in range(0x20, 0xd0):
# Backreference to a previous literal, 1-byte form.
# This can reference literals with indices in range(0xb0).
table_index = byte - 0x20
i += 1
if debug:
print(f"Backreference (1-byte form) to 0x{table_index:>02x}")
literal = prev_literals[table_index]
if debug:
print(f"\t-> {literal}")
decompressed += literal
yield prev_literals[table_index]
elif byte in (0xd0, 0xd1):
# Literal byte sequence, 2-byte header.
# The length of the literal data is stored in the following byte.
count = data[i+1]
begin = i + 2
end = begin + count
(count,) = common.read_exact(stream, 1)
# Controls whether or not the literal is stored so that it can be referenced again later.
do_store = byte == 0xd1
literal = data[begin:end]
literal = common.read_exact(stream, count)
if debug:
print(f"Literal (2-byte header, storing: {do_store})")
print(f"\t-> {literal}")
decompressed += literal
if do_store:
if debug:
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
prev_literals.append(literal)
i = end
yield literal
elif byte == 0xd2:
# Backreference to a previous literal, 2-byte form.
# This can reference literals with indices in range(0xb0, 0x1b0).
table_index = data[i+1] + 0xb0
i += 2
(next_byte,) = common.read_exact(stream, 1)
table_index = next_byte + 0xb0
if debug:
print(f"Backreference (2-byte form) to 0x{table_index:>02x}")
literal = prev_literals[table_index]
if debug:
print(f"\t-> {literal}")
decompressed += literal
yield prev_literals[table_index]
elif byte in range(0xd5, 0xfe):
# Reference into a fixed table of two-byte literals.
# All compressed resources use the same table.
table_index = byte - 0xd5
i += 1
if debug:
print(f"Fixed table reference to 0x{table_index:>02x}")
entry = TABLE[table_index]
if debug:
print(f"\t-> {entry}")
decompressed += entry
yield TABLE[table_index]
elif byte == 0xfe:
# Extended code, whose meaning is controlled by the following byte.
i += 1
kind = data[i]
(kind,) = common.read_exact(stream, 1)
if debug:
print(f"Extended code: 0x{kind:>02x}")
i += 1
if kind == 0x02:
# Repeat 1 byte a certain number of times.
@ -119,33 +101,44 @@ def decompress(data: bytes, decompressed_length: int, *, debug: bool=False) -> b
print(f"Repeat {byte_count}-byte value")
# The byte(s) to repeat, stored as a variable-length integer. The value is treated as unsigned, i. e. the integer is never negative.
to_repeat_int, length = common._read_variable_length_integer(data, i)
i += length
to_repeat_int = common.read_variable_length_integer(stream)
try:
to_repeat = to_repeat_int.to_bytes(byte_count, "big", signed=False)
except OverflowError:
raise common.DecompressError(f"Value to repeat out of range for {byte_count}-byte repeat: {to_repeat_int:#x}")
count_m1, length = common._read_variable_length_integer(data, i)
i += length
count = count_m1 + 1
count = common.read_variable_length_integer(stream) + 1
if count <= 0:
raise common.DecompressError(f"Repeat count must be positive: {count}")
repeated = to_repeat * count
if debug:
print(f"\t-> {to_repeat} * {count}: {repeated}")
decompressed += repeated
print(f"\t-> {to_repeat} * {count}")
yield to_repeat * count
else:
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
elif byte == 0xff:
# End of data marker, always occurs exactly once as the last byte of the compressed data.
if debug:
print("End marker")
if i != len(data) - 1:
raise common.DecompressError(f"End marker reached at {i}, before the expected end of data at {len(data) - 1}")
i += 1
# Check that there really is no more data left.
extra = stream.read(1)
if extra:
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra})")
break
else:
raise common.DecompressError(f"Unknown tag byte: 0x{data[i]:>02x}")
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (1)."""
return decompressed
decompressed_length = 0
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
if debug:
print(f"\t-> {chunk}")
decompressed_length += len(chunk)
yield chunk
if debug:
print(f"Decompressed {decompressed_length:#x} bytes so far")

View File

@ -1,4 +1,5 @@
import enum
import io
import struct
import typing
@ -73,68 +74,72 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool
)
def _decompress_system_untagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
parts = []
i = 0
while i < len(data):
if i == len(data) - 1 and decompressed_length % 2 != 0:
def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]:
while True: # Loop is terminated when EOF is reached.
table_index_data = stream.read(1)
if not table_index_data:
# End of compressed data.
break
elif not stream.peek(1) and decompressed_length % 2 != 0:
# Special case: if we are at the last byte of the compressed data, and the decompressed data has an odd length, the last byte is a single literal byte, and not a table reference.
if debug:
print(f"Last byte: {data[-1:]}")
parts.append(data[-1:])
print(f"Last byte: {table_index_data}")
yield table_index_data
break
# Compressed data is untagged, every byte is a table reference.
(table_index,) = table_index_data
if debug:
print(f"Reference: {data[i]} -> {table[data[i]]}")
parts.append(table[data[i]])
i += 1
return b"".join(parts)
print(f"Reference: {table_index} -> {table[table_index]}")
yield table[table_index]
def _decompress_system_tagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
parts = []
i = 0
while i < len(data):
if i == len(data) - 1 and decompressed_length % 2 != 0:
def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> typing.Iterator[bytes]:
while True: # Loop is terminated when EOF is reached.
tag_data = stream.read(1)
if not tag_data:
# End of compressed data.
break
elif not stream.peek(1) and decompressed_length % 2 != 0:
# Special case: if we are at the last byte of the compressed data, and the decompressed data has an odd length, the last byte is a single literal byte, and not a tag or a table reference.
if debug:
print(f"Last byte: {data[-1:]}")
parts.append(data[-1:])
print(f"Last byte: {tag_data}")
yield tag_data
break
# Compressed data is tagged, each tag byte is followed by 8 table references and/or literals.
tag = data[i]
(tag,) = tag_data
if debug:
print(f"Tag: 0b{tag:>08b}")
i += 1
for is_ref in _split_bits(tag):
if is_ref:
# This is a table reference (a single byte that is an index into the table).
table_index_data = stream.read(1)
if not table_index_data:
# End of compressed data.
break
(table_index,) = table_index_data
if debug:
print(f"Reference: {data[i]} -> {table[data[i]]}")
parts.append(table[data[i]])
i += 1
print(f"Reference: {table_index} -> {table[table_index]}")
yield table[table_index]
else:
# This is a literal (two uncompressed bytes that are literally copied into the output).
# Note: if i == len(data)-1, the literal is actually only a single byte long.
# This case is handled automatically - the slice extends one byte past the end of the data, and only one byte is returned.
literal = stream.read(2)
if not literal:
# End of compressed data.
break
# Note: the literal may be only a single byte long if it is located exactly at EOF. This is intended and expected - the 1-byte literal is yielded normally, and on the next iteration, decompression is terminated as EOF is detected.
if debug:
print(f"Literal: {data[i:i+2]}")
parts.append(data[i:i + 2])
i += 2
# If the end of the compressed data is reached in the middle of a chunk, all further tag bits are ignored (they should be zero) and decompression ends.
if i >= len(data):
break
return b"".join(parts)
print(f"Literal: {literal}")
yield literal
def decompress(data: bytes, decompressed_length: int, parameters: bytes, *, debug: bool=False) -> bytes:
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes]:
"""Decompress compressed data in the format used by 'dcmp' (2)."""
unknown, table_count_m1, flags_raw = STRUCT_PARAMETERS.unpack(parameters)
if not isinstance(header_info, common.CompressedType9HeaderInfo):
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
unknown, table_count_m1, flags_raw = STRUCT_PARAMETERS.unpack(header_info.parameters)
if debug:
print(f"Value of unknown parameter field: 0x{unknown:>04x}")
@ -152,24 +157,21 @@ def decompress(data: bytes, decompressed_length: int, parameters: bytes, *, debu
print(f"Flags: {flags}")
if ParameterFlags.CUSTOM_TABLE in flags:
table_start = 0
data_start = table_start + table_count * 2
table = []
for i in range(table_start, data_start, 2):
table.append(data[i:i + 2])
for _ in range(table_count):
table.append(common.read_exact(stream, 2))
if debug:
print(f"Using custom table: {table}")
else:
if table_count_m1 != 0:
raise common.DecompressError(f"table_count_m1 field is {table_count_m1}, but must be zero when the default table is used")
table = DEFAULT_TABLE
data_start = 0
if debug:
print("Using default table")
if ParameterFlags.TAGGED in flags:
decompress_func = _decompress_system_tagged
decompress_func = _decompress_tagged
else:
decompress_func = _decompress_system_untagged
decompress_func = _decompress_untagged
return decompress_func(data[data_start:], decompressed_length, table, debug=debug)
yield from decompress_func(common.make_peekable(stream), header_info.decompressed_length, table, debug=debug)

View File

@ -36,9 +36,28 @@ keywords =
setup_requires =
setuptools>=39.2.0
python_requires = >=3.6
packages =
packages = find:
[options.packages.find]
include =
rsrcfork
rsrcfork.*
[options.entry_points]
console_scripts =
rsrcfork = rsrcfork.__main__:main
[mypy]
files=rsrcfork/**/*.py
python_version = 3.6
disallow_untyped_calls = True
disallow_untyped_defs = True
disallow_untyped_decorators = True
no_implicit_optional = True
warn_unused_ignores = True
warn_unreachable = True
warn_redundant_casts = True