Compare commits
76 Commits
Author | SHA1 | Date | |
---|---|---|---|
82b5926b4f | |||
5456013bf4 | |||
b595456a05 | |||
d367a9238a | |||
33c4016124 | |||
b01cfc77cf | |||
a9f54b678c | |||
b46018e666 | |||
b0eefe3889 | |||
3e0bbcee04 | |||
13654c2560 | |||
d5199bd503 | |||
c5c3f24a10 | |||
7c77c4ef20 | |||
f7b6080c0e | |||
007d15eb3d | |||
246b69e375 | |||
d67ff64851 | |||
5391d66a78 | |||
5b2700bf17 | |||
c41b25fea1 | |||
a45dbd8eca | |||
3401ce65dd | |||
890dd24f76 | |||
67c2b4acf0 | |||
238c78a73e | |||
fbd861edf4 | |||
a7a407a1dd | |||
ecee2616cf | |||
ba284d1800 | |||
f690caac24 | |||
3a805c3e56 | |||
6adf8eb88d | |||
e132a91dea | |||
4e1cd05412 | |||
1a416defed | |||
1089a19c01 | |||
8fc24040ea | |||
d492d9a6a8 | |||
d0e1eaf262 | |||
1e55569442 | |||
2abf6e2a06 | |||
2b0bbb19ed | |||
c009e8f80f | |||
d67641d537 | |||
d6dbfdb149 | |||
b2502c48a2 | |||
158ca4884b | |||
8568f355c4 | |||
97d2dbe1b3 | |||
a4b6328782 | |||
393160b5da | |||
476eaecd17 | |||
546edbc31a | |||
cf6ce3c2a6 | |||
af2ac70676 | |||
5af455992b | |||
2193c81518 | |||
7dc0d980a3 | |||
2ce1d6b63a | |||
ec5eb3bcc1 | |||
25bec2f93a | |||
6fbb919285 | |||
3be4d9c969 | |||
f537fb3d37 | |||
d342614f55 | |||
a5fb30e194 | |||
f3b3de496e | |||
a71274d554 | |||
6d69d0097d | |||
8db1b22bdc | |||
6559cbc337 | |||
1e79dc3c50 | |||
db48212ade | |||
3a72bd3406 | |||
cb868b8005 |
@ -8,3 +8,7 @@ insert_final_newline = true
|
||||
[*.rst]
|
||||
indent_style = space
|
||||
indent_size = 4
|
||||
|
||||
[*.yml]
|
||||
indent_style = space
|
||||
indent_size = 2
|
||||
|
20
.github/workflows/ci.yml
vendored
Normal file
@ -0,0 +1,20 @@
|
||||
on: [pull_request, push]
|
||||
jobs:
|
||||
test:
|
||||
strategy:
|
||||
matrix:
|
||||
platform: [macos-latest, ubuntu-latest, windows-latest]
|
||||
runs-on: ${{ matrix.platform }}
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-python@v1
|
||||
with:
|
||||
python-version: "3.6"
|
||||
- uses: actions/setup-python@v1
|
||||
with:
|
||||
python-version: "3.7"
|
||||
- uses: actions/setup-python@v1
|
||||
with:
|
||||
python-version: "3.8"
|
||||
- run: python -m pip install --upgrade tox
|
||||
- run: tox
|
3
.gitignore
vendored
@ -2,6 +2,9 @@
|
||||
*.py[co]
|
||||
__pycache__/
|
||||
|
||||
# tox
|
||||
.tox/
|
||||
|
||||
# setuptools
|
||||
*.egg-info/
|
||||
build/
|
||||
|
6
MANIFEST.in
Normal file
@ -0,0 +1,6 @@
|
||||
# Note: See the PyPA documentation for a list of file names that are included/excluded by default:
|
||||
# https://packaging.python.org/guides/using-manifest-in/#how-files-are-included-in-an-sdist
|
||||
# Please only add entries here for files that are *not* already handled by default.
|
||||
|
||||
recursive-include tests *.py
|
||||
recursive-include tests/data *.rsrc
|
103
README.rst
@ -108,72 +108,51 @@ Writing resource data is not supported at all.
|
||||
Further info on resource files
|
||||
------------------------------
|
||||
|
||||
Sources of information about the resource fork data format, and the structure of common resource types:
|
||||
|
||||
* The Inside Macintosh book series, specifically the chapter "Resource Manager". These books are Apple's official reference material for the classic Macintosh platform. Over time, they have gone through many revisions and updates, and their structure has been changed multiple times. This is a (likely incomplete) list of the major revisions of Inside Macintosh and where they can be obtained online.
|
||||
|
||||
* The earliest revisions consisted of two volumes, each a three-ring binder containing photocopied pages. The chapters were referred to as individual "manuals" and were essentially standalone - each one had its own title page, TOC, glossary, and page numbers. Various parts were still missing or not yet finalized, and updated pages were distributed regularly as part of the `Macintosh Software Supplement <https://macgui.com/news/article.php?t=447>`_.
|
||||
|
||||
* bitsavers.org has scanned and OCRed PDFs of a late (November 1984) revision: `Volume I <http://bitsavers.org/pdf/apple/mac/Inside_Macintosh_Vol_1_1984.pdf>`_, `Volume II <http://bitsavers.org/pdf/apple/mac/Inside_Macintosh_Vol_2_1984.pdf>`_.
|
||||
|
||||
* The Promotional Edition, released in early 1985, consisted of a single book (it was nicknamed the "phonebook" edition because of its paper quality). Although it was physically a single book, the contents were still structured into standalone "manuals" like in the ring binder version, and some parts were still missing or not finalized.
|
||||
|
||||
* bitsavers.org has `a scanned and OCRed PDF <http://bitsavers.org/pdf/apple/mac/Inside_Macintosh_Promotional_Edition_1985.pdf>`_.
|
||||
|
||||
* The published 1985 revision consisted of three volumes, available as three paperback books or a single hardcover book. They contained the finalized contents of the previous revisions, which documented the Macintosh 128k, Macintosh 512k, and Macintosh XL. Unlike the previous revisions, each volume had continuous page numbers and a full TOC and index, and volume III contained a complete glossary.
|
||||
|
||||
* pagetable.com has a `blog post <http://www.pagetable.com/?p=50>`_ with `a scanned and OCRed PDF of the three paperback volumes <http://www.weihenstephan.org/~michaste/pagetable/mac/Inside_Macintosh.pdf>`_.
|
||||
|
||||
* Additional volumes were published later to document newer Macintosh models. These served as incremental additions and did not fully supersede or replace any of the previous volumes.
|
||||
|
||||
* Volume IV was published in 1986 and documented the Macintosh Plus and Macintosh 512k Enhanced.
|
||||
* Volume V was published in 1986 and documented the Macintosh II and Macintosh SE.
|
||||
* Volume VI was published in 1991 and documented System 7.0.
|
||||
* VintageApple.org has `scanned and OCRed PDFs of Volumes I through VI <https://vintageapple.org/inside_o/>`_.
|
||||
|
||||
* After 1991, Inside Macintosh was restructured into over 20 volumes organized by topic, rather than chronologically by Macintosh model. These were published as books starting in 1992, and later also on CDs and online.
|
||||
|
||||
* VintageApple.org has `rendered (not scanned) PDFs of 26 volumes and 7 X-Ref volumes <https://vintageapple.org/inside_r/>`_.
|
||||
|
||||
* The Communications Toolbox and QuickDraw GX Programmers' Overview volumes appear to be missing.
|
||||
|
||||
* Many volumes are still available in Apple's legacy developer documentation archive, in HTML and rendered (not scanned) PDF formats:
|
||||
|
||||
* Two volumes appear on the website under Inside Macintosh, even though other sources don't consider them part of the Inside Macintosh series:
|
||||
|
||||
* `Advanced Color Imaging on the Mac OS (HTML) <https://developer.apple.com/library/archive/documentation/mac/ACI/ACI-2.html>`_ (November 1996)
|
||||
* `Advanced Color Imaging Reference (HTML) <https://developer.apple.com/library/archive/documentation/mac/ACIReference/ACIReference-2.html>`_ (November 1996)
|
||||
|
||||
* `Devices (HTML) <https://developer.apple.com/library/archive/documentation/mac/Devices/Devices-2.html>`_ (July 1996), `Devices (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Devices/pdf.html>`_ (1994)
|
||||
* `Files (HTML) <https://developer.apple.com/library/archive/documentation/mac/Files/Files-2.html>`_ (July 1996), `Files (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Files/pdf.html>`_ (1992)
|
||||
* `Imaging with QuickDraw (HTML) <https://developer.apple.com/library/archive/documentation/mac/QuickDraw/QuickDraw-2.html>`_ (July 1996), `Imaging with QuickDraw (single PDF) <https://developer.apple.com/library/archive/documentation/mac/pdf/ImagingWithQuickDraw.pdf>`_ (1994)
|
||||
* `Interapplication Communication (HTML) <https://developer.apple.com/library/archive/documentation/mac/IAC/IAC-2.html>`_ (July 1996), `Interapplication Communication (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Interapplication_Communication/pdf.html>`_ (1993)
|
||||
* `Macintosh Toolbox Essentials (HTML) <https://developer.apple.com/library/archive/documentation/mac/Toolbox/Toolbox-2.html>`_ (July 1996), `Macintosh Toolbox Essentials (single PDF) <https://developer.apple.com/library/archive/documentation/mac/pdf/MacintoshToolboxEssentials.pdf>`_ (1992)
|
||||
* `Memory (HTML) <https://developer.apple.com/library/archive/documentation/mac/Memory/Memory-2.html>`_ (July 1996), `Memory (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Memory/pdf.html>`_ (1992)
|
||||
* `More Macintosh Toolbox (HTML) <https://developer.apple.com/library/archive/documentation/mac/MoreToolbox/MoreToolbox-2.html>`_ (July 1996), `More Macintosh Toolbox (single PDF) <https://developer.apple.com/library/archive/documentation/mac/pdf/MoreMacintoshToolbox.pdf>`_ (1993)
|
||||
* `Networking (HTML) <https://developer.apple.com/library/archive/documentation/mac/Networking/Networking-2.html>`_ (July 1996), `Networking (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Networking/pdf.html>`_ (1994)
|
||||
* `Operating System Utilities (HTML) <https://developer.apple.com/library/archive/documentation/mac/OSUtilities/OSUtilities-2.html>`_ (July 1996), `Operating System Utilities (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Operating_System_Utilities/pdf.html>`_ (1994)
|
||||
* `PowerPC Numerics (HTML) <https://developer.apple.com/library/archive/documentation/mac/PPCNumerics/PPCNumerics-2.html>`_ (July 1996), `PowerPC Numerics (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/PPC_Numerics.sit.hqx>`_ (1994)
|
||||
* `PowerPC System Software (HTML) <https://developer.apple.com/library/archive/documentation/mac/PPCSoftware/PPCSoftware-2.html>`_ (July 1996), `PowerPC System Software (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/PPC_System_Software.sit.hqx>`_ (1994)
|
||||
* `Processes (HTML) <https://developer.apple.com/library/archive/documentation/mac/Processes/Processes-2.html>`_ (June 1996), `Processes (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Processes/pdf.html>`_ (1992)
|
||||
* `Sound (HTML) <https://developer.apple.com/library/archive/documentation/mac/Sound/Sound-2.html>`_ (July 1996), `Sound (chapter PDFs) <https://developer.apple.com/library/archive/documentation/mac/pdf/Sound/pdf.html>`_ (1994)
|
||||
* `Text (HTML) <https://developer.apple.com/library/archive/documentation/mac/Text/Text-2.html>`_ (July 1996), `Text (single PDF) <https://developer.apple.com/library/archive/documentation/mac/pdf/Text.pdf>`_ (1993)
|
||||
* The two AOCE volumes, Communications Toolbox, Human Interface Guidelines, Overview, seven QuickDraw GX volumes, two QuickTime volumes, and X-Ref are missing.
|
||||
|
||||
* The Gryphel project (best known for the Mini vMac emulator) has `a list of physical book releases <https://www.gryphel.com/c/books/appledev.html>`_ of Inside Macintosh (and other Apple developer documentation), including ISBNs, publishers, dates, and Amazon links.
|
||||
|
||||
* `Wikipedia <https://en.wikipedia.org/wiki/Resource_fork>`_, of course
|
||||
* The `Resource Fork <http://fileformats.archiveteam.org/wiki/Resource_Fork>`_ article on "Just Solve the File Format Problem" (despite the title, this is a decent site and not clickbait)
|
||||
* The `KSFL <https://github.com/kreativekorp/ksfl>`_ library (and `its wiki <https://github.com/kreativekorp/ksfl/wiki/Macintosh-Resource-File-Format>`_), written in Java, which supports reading and writing resource files
|
||||
* Alysis Software Corporation's article on resource compression (found on `the company's website <http://www.alysis.us/arctechnology.htm>`_ and in `MacTech Magazine's online archive <http://preserve.mactech.com/articles/mactech/Vol.09/09.01/ResCompression/index.html>`_) has some information on the structure of certain kinds of compressed resources.
|
||||
* Apple's macOS SDK, which is distributed with Xcode. The latest version of Xcode is available for free from the Mac App Store. Current and previous versions can be downloaded from `the Apple Developer download page <https://developer.apple.com/download/more/>`_. Accessing these downloads requires an Apple ID with (at least) a free developer program membership.
|
||||
* Apple's MPW (Macintosh Programmer's Workshop) and related developer tools. These were previously available from Apple's FTP server at ftp://ftp.apple.com/, which is no longer functional. Because of this, these downloads are only available on mirror sites, such as http://staticky.com/mirrors/ftp.apple.com/.
|
||||
|
||||
If these links are no longer functional, some are archived in the `Internet Archive Wayback Machine <https://archive.org/web/>`_ or `archive.is <http://archive.is/>`_ aka `archive.fo <https://archive.fo/>`_.
|
||||
For technical info and documentation about resource files and resources, see the `"resource forks" section of the mac_file_format_docs repo <https://github.com/dgelessus/mac_file_format_docs/blob/master/README.md#resource-forks>`_.
|
||||
|
||||
Changelog
|
||||
---------
|
||||
|
||||
Version 1.8.0
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
* Removed the old (non-subcommand-based) CLI syntax.
|
||||
* Added filtering support to the ``list`` subcommand.
|
||||
* Added a ``resource-info`` subcommand to display technical information about resources (more detailed than what is displayed by ``list`` and ``read``).
|
||||
* Added a ``raw-compress-info`` subcommand to display technical header information about standalone compressed resource data.
|
||||
* Made the library PEP 561-compliant by adding a py.typed file.
|
||||
* Fixed an incorrect ``AssertionError`` when using the ``--no-decompress`` command-line options.
|
||||
|
||||
Version 1.7.0
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
* Added a ``raw-decompress`` subcommand to decompress compressed resource data stored in a standalone file rather than as a resource.
|
||||
* Optimized lazy loading of ``Resource`` objects. Previously, resource data would be read from disk whenever a ``Resource`` object was looked up, even if the data itself is never used. Now the resource data is only loaded once the ``data`` (or ``data_raw``) attribute is accessed.
|
||||
|
||||
* The same optimization applies to the ``name`` attribute, although this is unlikely to make a difference in practice.
|
||||
* As a result, it is no longer possible to construct ``Resource`` objects without a resource file. This was previously possible, but had no practical use.
|
||||
* Fixed a small error in the ``'dcmp' (0)`` decompression implementation.
|
||||
|
||||
Version 1.6.0
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
* Added a new subcommand-based command-line syntax to the ``rsrcfork`` tool, similar to other CLI tools such as ``git`` or ``diskutil``.
|
||||
|
||||
* This subcommand-based syntax is meant to replace the old CLI options, as the subcommand structure is easier to understand and more extensible in the future.
|
||||
* Currently there are three subcommands: ``list`` to list resources in a file, ``read`` to read/display resource data, and ``read-header`` to read a resource file's header data. These subcommands can be used to perform all operations that were also available with the old CLI syntax.
|
||||
* The old CLI syntax is still supported for now, but it will be removed soon.
|
||||
* The new syntax no longer supports reading CLI arguments from a file (using ``@args_file.txt``), abbreviating long options (e. g. ``--no-d`` instead of ``--no-decompress``), or the short option ``-f`` instead of ``--fork``. If you have a need for any of these features, please open an issue.
|
||||
|
||||
Version 1.5.0
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
* Added stream-based decompression methods to the ``rsrcfork.compress`` module.
|
||||
|
||||
* The internal decompressor implementations have been refactored to use streams.
|
||||
* This allows for incremental decompression of compressed resource data. In practice this has no noticeable effect yet, because the main ``rsrcfork`` API doesn't support incremental reading of resource data.
|
||||
|
||||
* Fixed the command line tool always displaying an incorrect error "Cannot specify an explicit fork when reading from stdin" when using ``-`` (stdin) as the input file.
|
||||
|
||||
Version 1.4.0
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
|
6
pyproject.toml
Normal file
@ -0,0 +1,6 @@
|
||||
[build-system]
|
||||
requires = [
|
||||
"setuptools >= 46.4.0",
|
||||
"wheel >= 0.32.0",
|
||||
]
|
||||
build-backend = "setuptools.build_meta"
|
@ -20,7 +20,7 @@
|
||||
# * Add a new empty section for the next version to the README.rst changelog.
|
||||
# * Commit and push the changes to master.
|
||||
|
||||
__version__ = "1.4.0"
|
||||
__version__ = "1.8.0"
|
||||
|
||||
__all__ = [
|
||||
"Resource",
|
||||
@ -31,8 +31,8 @@ __all__ = [
|
||||
"open",
|
||||
]
|
||||
|
||||
from . import api, compress
|
||||
from .api import Resource, ResourceAttrs, ResourceFile, ResourceFileAttrs
|
||||
from . import compress
|
||||
|
||||
# noinspection PyShadowingBuiltins
|
||||
open = ResourceFile.open
|
||||
|
@ -1,9 +1,7 @@
|
||||
import argparse
|
||||
import collections
|
||||
import enum
|
||||
import itertools
|
||||
import sys
|
||||
import textwrap
|
||||
import typing
|
||||
|
||||
from . import __version__, api, compress
|
||||
@ -11,9 +9,6 @@ from . import __version__, api, compress
|
||||
# The encoding to use when rendering bytes as text (in four-char codes, strings, hex dumps, etc.) or reading a quoted byte string (from the command line).
|
||||
_TEXT_ENCODING = "MacRoman"
|
||||
|
||||
# Translation table to replace ASCII non-printable characters with periods.
|
||||
_TRANSLATE_NONPRINTABLES = {k: "." for k in [*range(0x20), 0x7f]}
|
||||
|
||||
_REZ_ATTR_NAMES = {
|
||||
api.ResourceAttrs.resSysRef: None, # "Illegal or reserved attribute"
|
||||
api.ResourceAttrs.resSysHeap: "sysheap",
|
||||
@ -26,12 +21,15 @@ _REZ_ATTR_NAMES = {
|
||||
}
|
||||
|
||||
F = typing.TypeVar("F", bound=enum.Flag)
|
||||
def _decompose_flags(value: F) -> typing.Sequence[F]:
|
||||
|
||||
|
||||
def decompose_flags(value: F) -> typing.Sequence[F]:
|
||||
"""Decompose an enum.Flags instance into separate enum constants."""
|
||||
|
||||
return [bit for bit in type(value) if bit in value]
|
||||
|
||||
def _is_printable(char: str) -> bool:
|
||||
|
||||
def is_printable(char: str) -> bool:
|
||||
"""Determine whether a character is printable for our purposes.
|
||||
|
||||
We mainly use Python's definition of printable (i. e. everything that Unicode does not consider a separator or "other" character). However, we also treat U+F8FF as printable, which is the private use codepoint used for the Apple logo character.
|
||||
@ -39,7 +37,12 @@ def _is_printable(char: str) -> bool:
|
||||
|
||||
return char.isprintable() or char == "\uf8ff"
|
||||
|
||||
def _bytes_unescape(string: str) -> bytes:
|
||||
|
||||
# Translation table to replace non-printable characters with periods.
|
||||
_TRANSLATE_NONPRINTABLES = {ord(c): "." for c in bytes(range(256)).decode(_TEXT_ENCODING) if not is_printable(c)}
|
||||
|
||||
|
||||
def bytes_unescape(string: str) -> bytes:
|
||||
"""Convert a string containing text (in _TEXT_ENCODING) and hex escapes to a bytestring.
|
||||
|
||||
(We implement our own unescaping mechanism here to not depend on any of Python's string/bytes escape syntax.)
|
||||
@ -65,7 +68,8 @@ def _bytes_unescape(string: str) -> bytes:
|
||||
|
||||
return bytes(out)
|
||||
|
||||
def _bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str:
|
||||
|
||||
def bytes_escape(bs: bytes, *, quote: typing.Optional[str] = None) -> str:
|
||||
"""Convert a bytestring to a string (using _TEXT_ENCODING), with non-printable characters hex-escaped.
|
||||
|
||||
(We implement our own escaping mechanism here to not depend on Python's str or bytes repr.)
|
||||
@ -75,33 +79,41 @@ def _bytes_escape(bs: bytes, *, quote: typing.Optional[str]=None) -> str:
|
||||
for byte, char in zip(bs, bs.decode(_TEXT_ENCODING)):
|
||||
if char in {quote, "\\"}:
|
||||
out.append(f"\\{char}")
|
||||
elif _is_printable(char):
|
||||
elif is_printable(char):
|
||||
out.append(char)
|
||||
else:
|
||||
out.append(f"\\x{byte:02x}")
|
||||
|
||||
return "".join(out)
|
||||
|
||||
def _filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typing.List[api.Resource]:
|
||||
matching: typing.MutableMapping[typing.Tuple[bytes, int], api.Resource] = collections.OrderedDict()
|
||||
|
||||
def bytes_quote(bs: bytes, quote: str) -> str:
|
||||
"""Convert a bytestring to a quoted string (using _TEXT_ENCODING), with non-printable characters hex-escaped.
|
||||
|
||||
for filter in filters:
|
||||
(We implement our own escaping mechanism here to not depend on Python's str or bytes repr.)
|
||||
"""
|
||||
|
||||
return quote + bytes_escape(bs, quote=quote) + quote
|
||||
|
||||
|
||||
MIN_RESOURCE_ID = -0x8000
|
||||
MAX_RESOURCE_ID = 0x7fff
|
||||
|
||||
|
||||
class ResourceFilter(object):
|
||||
type: bytes
|
||||
min_id: int
|
||||
max_id: int
|
||||
name: typing.Optional[bytes]
|
||||
|
||||
@classmethod
|
||||
def from_string(cls, filter: str) -> "ResourceFilter":
|
||||
if len(filter) == 4:
|
||||
try:
|
||||
resources = rf[filter.encode("ascii")]
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
for res in resources.values():
|
||||
matching[res.type, res.id] = res
|
||||
restype = filter.encode("ascii")
|
||||
return cls(restype, MIN_RESOURCE_ID, MAX_RESOURCE_ID, None)
|
||||
elif filter[0] == filter[-1] == "'":
|
||||
try:
|
||||
resources = rf[_bytes_unescape(filter[1:-1])]
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
for res in resources.values():
|
||||
matching[res.type, res.id] = res
|
||||
restype = bytes_unescape(filter[1:-1])
|
||||
return cls(restype, MIN_RESOURCE_ID, MAX_RESOURCE_ID, None)
|
||||
else:
|
||||
pos = filter.find("'", 1)
|
||||
if pos == -1:
|
||||
@ -114,48 +126,64 @@ def _filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> ty
|
||||
if not restype_str[0] == restype_str[-1] == "'":
|
||||
raise ValueError(
|
||||
f"Invalid filter {filter!r}: Resource type is not a single-quoted type identifier: {restype_str!r}")
|
||||
restype = _bytes_unescape(restype_str[1:-1])
|
||||
|
||||
if len(restype) != 4:
|
||||
raise ValueError(
|
||||
f"Invalid filter {filter!r}: Type identifier must be 4 bytes after replacing escapes, got {len(restype)} bytes: {restype!r}")
|
||||
restype = bytes_unescape(restype_str[1:-1])
|
||||
|
||||
if resid_str[0] != "(" or resid_str[-1] != ")":
|
||||
raise ValueError(f"Invalid filter {filter!r}: Resource ID must be parenthesized")
|
||||
resid_str = resid_str[1:-1]
|
||||
|
||||
try:
|
||||
resources = rf[restype]
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
if resid_str[0] == resid_str[-1] == '"':
|
||||
name = _bytes_unescape(resid_str[1:-1])
|
||||
|
||||
for res in resources.values():
|
||||
if res.name == name:
|
||||
matching[res.type, res.id] = res
|
||||
break
|
||||
name = bytes_unescape(resid_str[1:-1])
|
||||
return cls(restype, MIN_RESOURCE_ID, MAX_RESOURCE_ID, name)
|
||||
elif ":" in resid_str:
|
||||
if resid_str.count(":") > 1:
|
||||
raise ValueError(f"Invalid filter {filter!r}: Too many colons in ID range expression: {resid_str!r}")
|
||||
start_str, end_str = resid_str.split(":")
|
||||
start, end = int(start_str), int(end_str)
|
||||
|
||||
for res in resources.values():
|
||||
if start <= res.id <= end:
|
||||
matching[res.type, res.id] = res
|
||||
return cls(restype, start, end, None)
|
||||
else:
|
||||
resid = int(resid_str)
|
||||
try:
|
||||
res = resources[resid]
|
||||
except KeyError:
|
||||
continue
|
||||
matching[res.type, res.id] = res
|
||||
return cls(restype, resid, resid, None)
|
||||
|
||||
return list(matching.values())
|
||||
def __init__(self, restype: bytes, min_id: int, max_id: int, name: typing.Optional[bytes]) -> None:
|
||||
super().__init__()
|
||||
|
||||
if len(restype) != 4:
|
||||
raise ValueError(f"Invalid filter: Type code must be exactly 4 bytes long, not {len(restype)} bytes: {restype!r}")
|
||||
elif min_id < MIN_RESOURCE_ID:
|
||||
raise ValueError(f"Invalid filter: Resource ID lower bound ({min_id}) cannot be lower than {MIN_RESOURCE_ID}")
|
||||
elif max_id > MAX_RESOURCE_ID:
|
||||
raise ValueError(f"Invalid filter: Resource ID upper bound ({max_id}) cannot be greater than {MAX_RESOURCE_ID}")
|
||||
elif min_id > max_id:
|
||||
raise ValueError(f"Invalid filter: Resource ID lower bound ({min_id}) cannot be greater than upper bound ({max_id})")
|
||||
|
||||
self.type = restype
|
||||
self.min_id = min_id
|
||||
self.max_id = max_id
|
||||
self.name = name
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{type(self).__name__}({self.type!r}, {self.min_id!r}, {self.max_id!r}, {self.name!r})"
|
||||
|
||||
def matches(self, res: api.Resource) -> bool:
|
||||
return res.type == self.type and self.min_id <= res.id <= self.max_id and (self.name is None or res.name == self.name)
|
||||
|
||||
def _hexdump(data: bytes) -> None:
|
||||
|
||||
def filter_resources(rf: api.ResourceFile, filters: typing.Sequence[str]) -> typing.Iterable[api.Resource]:
|
||||
if not filters:
|
||||
# Special case: an empty list of filters matches all resources rather than none
|
||||
for reses in rf.values():
|
||||
yield from reses.values()
|
||||
else:
|
||||
filter_objs = [ResourceFilter.from_string(filter) for filter in filters]
|
||||
|
||||
for reses in rf.values():
|
||||
for res in reses.values():
|
||||
if any(filter_obj.matches(res) for filter_obj in filter_objs):
|
||||
yield res
|
||||
|
||||
|
||||
def hexdump(data: bytes) -> None:
|
||||
last_line = None
|
||||
asterisk_shown = False
|
||||
for i in range(0, len(data), 16):
|
||||
@ -177,19 +205,21 @@ def _hexdump(data: bytes) -> None:
|
||||
if data:
|
||||
print(f"{len(data):08x}")
|
||||
|
||||
def _raw_hexdump(data: bytes) -> None:
|
||||
|
||||
def raw_hexdump(data: bytes) -> None:
|
||||
for i in range(0, len(data), 16):
|
||||
print(" ".join(f"{byte:02x}" for byte in data[i:i + 16]))
|
||||
|
||||
def _translate_text(data: bytes) -> str:
|
||||
|
||||
def translate_text(data: bytes) -> str:
|
||||
return data.decode(_TEXT_ENCODING).replace("\r", "\n")
|
||||
|
||||
def _describe_resource(res: api.Resource, *, include_type: bool, decompress: bool) -> str:
|
||||
|
||||
def describe_resource(res: api.Resource, *, include_type: bool, decompress: bool) -> str:
|
||||
id_desc_parts = [f"{res.id}"]
|
||||
|
||||
if res.name is not None:
|
||||
name = _bytes_escape(res.name, quote='"')
|
||||
id_desc_parts.append(f'"{name}"')
|
||||
id_desc_parts.append(bytes_quote(res.name, '"'))
|
||||
|
||||
id_desc = ", ".join(id_desc_parts)
|
||||
|
||||
@ -202,13 +232,12 @@ def _describe_resource(res: api.Resource, *, include_type: bool, decompress: boo
|
||||
length_desc = f"unparseable compressed data header ({res.length_raw} bytes compressed)"
|
||||
else:
|
||||
assert res.compressed_info is not None
|
||||
length_desc = f"{res.length} bytes ({res.length_raw} bytes compressed, 'dcmp' ({res.compressed_info.dcmp_id}) format)"
|
||||
length_desc = f"{res.length} bytes ({res.length_raw} bytes compressed)"
|
||||
else:
|
||||
assert res.compressed_info is None
|
||||
length_desc = f"{res.length_raw} bytes"
|
||||
content_desc_parts.append(length_desc)
|
||||
|
||||
attrs = _decompose_flags(res.attributes)
|
||||
attrs = decompose_flags(res.attributes)
|
||||
if attrs:
|
||||
content_desc_parts.append(" | ".join(attr.name for attr in attrs))
|
||||
|
||||
@ -216,65 +245,12 @@ def _describe_resource(res: api.Resource, *, include_type: bool, decompress: boo
|
||||
|
||||
desc = f"({id_desc}): {content_desc}"
|
||||
if include_type:
|
||||
restype = _bytes_escape(res.type, quote="'")
|
||||
desc = f"'{restype}' {desc}"
|
||||
quoted_restype = bytes_quote(res.type, "'")
|
||||
desc = f"{quoted_restype} {desc}"
|
||||
return desc
|
||||
|
||||
def _parse_args() -> argparse.Namespace:
|
||||
ap = argparse.ArgumentParser(
|
||||
add_help=False,
|
||||
fromfile_prefix_chars="@",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
description=textwrap.dedent("""
|
||||
Read and display resources from a file's resource or data fork.
|
||||
|
||||
When specifying resource filters, each one may be of one of the
|
||||
following forms:
|
||||
|
||||
An unquoted type name (without escapes): TYPE
|
||||
A quoted type name: 'TYPE'
|
||||
A quoted type name and an ID: 'TYPE' (42)
|
||||
A quoted type name and an ID range: 'TYPE' (24:42)
|
||||
A quoted type name and a resource name: 'TYPE' ("foobar")
|
||||
|
||||
When multiple filters are specified, all resources matching any of them
|
||||
are displayed.
|
||||
"""),
|
||||
)
|
||||
|
||||
ap.add_argument("--help", action="help", help="Display this help message and exit")
|
||||
ap.add_argument("--version", action="version", version=__version__, help="Display version information and exit")
|
||||
ap.add_argument("-a", "--all", action="store_true", help="When no filters are given, show all resources in full, instead of an overview")
|
||||
ap.add_argument("-f", "--fork", choices=["auto", "data", "rsrc"], default="auto", help="The fork from which to read the resource data, or auto to guess (default: %(default)s)")
|
||||
ap.add_argument("--no-decompress", action="store_false", dest="decompress", help="Do not decompress compressed resources, output compressed resource data as-is")
|
||||
ap.add_argument("--format", choices=["dump", "dump-text", "hex", "raw", "derez"], default="dump", help="How to output the resources - human-readable info with hex dump (dump) (default), human-readable info with newline-translated data (dump-text), data only as hex (hex), data only as raw bytes (raw), or like DeRez with no resource definitions (derez)")
|
||||
ap.add_argument("--group", action="store", choices=["none", "type", "id"], default="type", help="Group resources in list view by type or ID, or disable grouping (default: type)")
|
||||
ap.add_argument("--no-sort", action="store_false", dest="sort", help="Output resources in the order in which they are stored in the file, instead of sorting them by type and ID")
|
||||
ap.add_argument("--header-system", action="store_true", help="Output system-reserved header data and nothing else")
|
||||
ap.add_argument("--header-application", action="store_true", help="Output application-specific header data and nothing else")
|
||||
|
||||
ap.add_argument("file", help="The file to read, or - for stdin")
|
||||
ap.add_argument("filter", nargs="*", help="One or more filters to select which resources to display, or omit to show an overview of all resources")
|
||||
|
||||
ns = ap.parse_args()
|
||||
return ns
|
||||
|
||||
def _show_header_data(data: bytes, *, format: str) -> None:
|
||||
if format == "dump":
|
||||
_hexdump(data)
|
||||
elif format == "dump-text":
|
||||
print(_translate_text(data))
|
||||
elif format == "hex":
|
||||
_raw_hexdump(data)
|
||||
elif format == "raw":
|
||||
sys.stdout.buffer.write(data)
|
||||
elif format == "derez":
|
||||
print("Cannot output file header data in derez format", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
else:
|
||||
raise ValueError(f"Unhandled output format: {format}")
|
||||
|
||||
def _show_filtered_resources(resources: typing.Sequence[api.Resource], format: str, decompress: bool) -> None:
|
||||
def show_filtered_resources(resources: typing.Sequence[api.Resource], format: str, decompress: bool) -> None:
|
||||
if not resources:
|
||||
if format in ("dump", "dump-text"):
|
||||
print("No resources matched the filter")
|
||||
@ -297,19 +273,19 @@ def _show_filtered_resources(resources: typing.Sequence[api.Resource], format: s
|
||||
|
||||
if format in ("dump", "dump-text"):
|
||||
# Human-readable info and hex or text dump
|
||||
desc = _describe_resource(res, include_type=True, decompress=decompress)
|
||||
desc = describe_resource(res, include_type=True, decompress=decompress)
|
||||
print(f"Resource {desc}:")
|
||||
if format == "dump":
|
||||
_hexdump(data)
|
||||
hexdump(data)
|
||||
elif format == "dump-text":
|
||||
print(_translate_text(data))
|
||||
print(translate_text(data))
|
||||
else:
|
||||
raise AssertionError(f"Unhandled format: {format!r}")
|
||||
print()
|
||||
elif format == "hex":
|
||||
# Data only as hex
|
||||
|
||||
_raw_hexdump(data)
|
||||
raw_hexdump(data)
|
||||
elif format == "raw":
|
||||
# Data only as raw bytes
|
||||
|
||||
@ -317,7 +293,7 @@ def _show_filtered_resources(resources: typing.Sequence[api.Resource], format: s
|
||||
elif format == "derez":
|
||||
# Like DeRez with no resource definitions
|
||||
|
||||
attrs = list(_decompose_flags(res.attributes))
|
||||
attrs = list(decompose_flags(res.attributes))
|
||||
|
||||
if decompress and api.ResourceAttrs.resCompressed in attrs:
|
||||
attrs.remove(api.ResourceAttrs.resCompressed)
|
||||
@ -334,13 +310,12 @@ def _show_filtered_resources(resources: typing.Sequence[api.Resource], format: s
|
||||
parts = [str(res.id)]
|
||||
|
||||
if res.name is not None:
|
||||
name = _bytes_escape(res.name, quote='"')
|
||||
parts.append(f'"{name}"')
|
||||
parts.append(bytes_quote(res.name, '"'))
|
||||
|
||||
parts += attr_descs
|
||||
|
||||
restype = _bytes_escape(res.type, quote="'")
|
||||
print(f"data '{restype}' ({', '.join(parts)}{attrs_comment}) {{")
|
||||
quoted_restype = bytes_quote(res.type, "'")
|
||||
print(f"data {quoted_restype} ({', '.join(parts)}{attrs_comment}) {{")
|
||||
|
||||
for i in range(0, len(data), 16):
|
||||
# Two-byte grouping is really annoying to implement.
|
||||
@ -362,99 +337,499 @@ def _show_filtered_resources(resources: typing.Sequence[api.Resource], format: s
|
||||
else:
|
||||
raise ValueError(f"Unhandled output format: {format}")
|
||||
|
||||
def _list_resource_file(rf: api.ResourceFile, *, sort: bool, group: str, decompress: bool) -> None:
|
||||
if rf.header_system_data != bytes(len(rf.header_system_data)):
|
||||
print("Header system data:")
|
||||
_hexdump(rf.header_system_data)
|
||||
|
||||
if rf.header_application_data != bytes(len(rf.header_application_data)):
|
||||
print("Header application data:")
|
||||
_hexdump(rf.header_application_data)
|
||||
|
||||
attrs = _decompose_flags(rf.file_attributes)
|
||||
if attrs:
|
||||
print("File attributes: " + " | ".join(attr.name for attr in attrs))
|
||||
|
||||
if len(rf) == 0:
|
||||
print("No resources (empty resource file)")
|
||||
|
||||
def list_resources(resources: typing.List[api.Resource], *, sort: bool, group: str, decompress: bool) -> None:
|
||||
if len(resources) == 0:
|
||||
print("No resources matched the filter")
|
||||
return
|
||||
|
||||
if group == "none":
|
||||
all_resources: typing.List[api.Resource] = []
|
||||
for reses in rf.values():
|
||||
all_resources.extend(reses.values())
|
||||
if sort:
|
||||
all_resources.sort(key=lambda res: (res.type, res.id))
|
||||
print(f"{len(all_resources)} resources:")
|
||||
for res in all_resources:
|
||||
print(_describe_resource(res, include_type=True, decompress=decompress))
|
||||
resources.sort(key=lambda res: (res.type, res.id))
|
||||
print(f"{len(resources)} resources:")
|
||||
for res in resources:
|
||||
print(describe_resource(res, include_type=True, decompress=decompress))
|
||||
elif group == "type":
|
||||
print(f"{len(rf)} resource types:")
|
||||
restype_items: typing.Collection[typing.Tuple[bytes, typing.Mapping[int, api.Resource]]] = rf.items()
|
||||
if sort:
|
||||
restype_items = sorted(restype_items, key=lambda item: item[0])
|
||||
for typecode, resources_map in restype_items:
|
||||
restype = _bytes_escape(typecode, quote="'")
|
||||
print(f"'{restype}': {len(resources_map)} resources:")
|
||||
resources_items: typing.Collection[typing.Tuple[int, api.Resource]] = resources_map.items()
|
||||
resources.sort(key=lambda res: res.type)
|
||||
resources_by_type = {restype: list(reses) for restype, reses in itertools.groupby(resources, key=lambda res: res.type)}
|
||||
print(f"{len(resources_by_type)} resource types:")
|
||||
for restype, restype_resources in resources_by_type.items():
|
||||
quoted_restype = bytes_quote(restype, "'")
|
||||
print(f"{quoted_restype}: {len(restype_resources)} resources:")
|
||||
if sort:
|
||||
resources_items = sorted(resources_items, key=lambda item: item[0])
|
||||
for resid, res in resources_items:
|
||||
print(_describe_resource(res, include_type=False, decompress=decompress))
|
||||
restype_resources.sort(key=lambda res: res.id)
|
||||
for res in restype_resources:
|
||||
print(describe_resource(res, include_type=False, decompress=decompress))
|
||||
print()
|
||||
elif group == "id":
|
||||
all_resources = []
|
||||
for reses in rf.values():
|
||||
all_resources.extend(reses.values())
|
||||
all_resources.sort(key=lambda res: res.id)
|
||||
resources_by_id = {resid: list(reses) for resid, reses in itertools.groupby(all_resources, key=lambda res: res.id)}
|
||||
resources.sort(key=lambda res: res.id)
|
||||
resources_by_id = {resid: list(reses) for resid, reses in itertools.groupby(resources, key=lambda res: res.id)}
|
||||
print(f"{len(resources_by_id)} resource IDs:")
|
||||
for resid, resources in resources_by_id.items():
|
||||
print(f"({resid}): {len(resources)} resources:")
|
||||
for resid, resid_resources in resources_by_id.items():
|
||||
print(f"({resid}): {len(resid_resources)} resources:")
|
||||
if sort:
|
||||
resources.sort(key=lambda res: res.type)
|
||||
for res in resources:
|
||||
print(_describe_resource(res, include_type=True, decompress=decompress))
|
||||
resid_resources.sort(key=lambda res: res.type)
|
||||
for res in resid_resources:
|
||||
print(describe_resource(res, include_type=True, decompress=decompress))
|
||||
print()
|
||||
else:
|
||||
raise AssertionError(f"Unhandled group mode: {group!r}")
|
||||
|
||||
def main() -> typing.NoReturn:
|
||||
ns = _parse_args()
|
||||
|
||||
def format_compressed_header_info(header_info: compress.CompressedHeaderInfo) -> typing.Iterable[str]:
|
||||
yield f"Header length: {header_info.header_length} bytes"
|
||||
yield f"Compression type: 0x{header_info.compression_type:>04x}"
|
||||
yield f"Decompressed data length: {header_info.decompressed_length} bytes"
|
||||
yield f"'dcmp' resource ID: {header_info.dcmp_id}"
|
||||
|
||||
if ns.file == "-":
|
||||
if ns.fork is not None:
|
||||
if isinstance(header_info, compress.CompressedType8HeaderInfo):
|
||||
yield f"Working buffer fractional size: {header_info.working_buffer_fractional_size} 256ths of compressed data length"
|
||||
yield f"Expansion buffer size: {header_info.expansion_buffer_size} bytes"
|
||||
elif isinstance(header_info, compress.CompressedType9HeaderInfo):
|
||||
yield f"Decompressor-specific parameters: {header_info.parameters!r}"
|
||||
else:
|
||||
raise AssertionError(f"Unhandled compressed header info type: {type(header_info)}")
|
||||
|
||||
|
||||
def make_subcommand_parser(subs: typing.Any, name: str, *, help: str, description: str, **kwargs: typing.Any) -> argparse.ArgumentParser:
|
||||
"""Add a subcommand parser with some slightly modified defaults to a subcommand set.
|
||||
|
||||
This function is used to ensure that all subcommands use the same base configuration for their ArgumentParser.
|
||||
"""
|
||||
|
||||
ap = subs.add_parser(
|
||||
name,
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
help=help,
|
||||
description=description,
|
||||
allow_abbrev=False,
|
||||
add_help=False,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
ap.add_argument("--help", action="help", help="Display this help message and exit.")
|
||||
|
||||
return ap
|
||||
|
||||
|
||||
def add_resource_file_args(ap: argparse.ArgumentParser) -> None:
|
||||
"""Define common options/arguments for specifying an input resource file.
|
||||
|
||||
This includes a positional argument for the resource file's path, and the ``--fork`` option to select which fork of the file to use.
|
||||
"""
|
||||
|
||||
ap.add_argument("--fork", choices=["auto", "data", "rsrc"], default="auto", help="The fork from which to read the resource file data, or auto to guess. Default: %(default)s")
|
||||
ap.add_argument("file", help="The file from which to read resources, or - for stdin.")
|
||||
|
||||
|
||||
RESOURCE_FILTER_HELP = """
|
||||
The resource filters use syntax similar to Rez (resource definition) files.
|
||||
Each filter can have one of the following forms:
|
||||
|
||||
An unquoted type name (without escapes): TYPE
|
||||
A quoted type name: 'TYPE'
|
||||
A quoted type name and an ID: 'TYPE' (42)
|
||||
A quoted type name and an ID range: 'TYPE' (24:42)
|
||||
A quoted type name and a resource name: 'TYPE' ("foobar")
|
||||
|
||||
Note that the resource filter syntax uses quotes, parentheses and spaces,
|
||||
which have special meanings in most shells. It is recommended to quote each
|
||||
resource filter (using double quotes) to ensure that it is not interpreted
|
||||
or rewritten by the shell.
|
||||
"""
|
||||
|
||||
|
||||
def add_resource_filter_args(ap: argparse.ArgumentParser) -> None:
|
||||
"""Define common options/arguments for specifying resource filters."""
|
||||
|
||||
ap.add_argument("filter", nargs="*", help="One or more filters to select resources. If no filters are specified, all resources are selected.")
|
||||
|
||||
|
||||
def open_resource_file(file: str, *, fork: str) -> api.ResourceFile:
|
||||
"""Open a resource file at the given path, using the specified fork."""
|
||||
|
||||
if file == "-":
|
||||
if fork != "auto":
|
||||
print("Cannot specify an explicit fork when reading from stdin", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
rf = api.ResourceFile(sys.stdin.buffer)
|
||||
return api.ResourceFile(sys.stdin.buffer)
|
||||
else:
|
||||
rf = api.ResourceFile.open(ns.file, fork=ns.fork)
|
||||
|
||||
with rf:
|
||||
if ns.header_system or ns.header_application:
|
||||
if ns.header_system:
|
||||
return api.ResourceFile.open(file, fork=fork)
|
||||
|
||||
|
||||
def do_read_header(ns: argparse.Namespace) -> typing.NoReturn:
|
||||
with open_resource_file(ns.file, fork=ns.fork) as rf:
|
||||
if ns.format in {"dump", "dump-text"}:
|
||||
if ns.format == "dump":
|
||||
dump_func = hexdump
|
||||
elif ns.format == "dump-text":
|
||||
def dump_func(data: bytes) -> None:
|
||||
print(translate_text(data))
|
||||
else:
|
||||
raise AssertionError(f"Unhandled --format: {ns.format!r}")
|
||||
|
||||
if ns.part in {"system", "all"}:
|
||||
print("System-reserved header data:")
|
||||
dump_func(rf.header_system_data)
|
||||
|
||||
if ns.part in {"application", "all"}:
|
||||
print("Application-specific header data:")
|
||||
dump_func(rf.header_application_data)
|
||||
elif ns.format in {"hex", "raw"}:
|
||||
if ns.part == "system":
|
||||
data = rf.header_system_data
|
||||
else:
|
||||
elif ns.part == "application":
|
||||
data = rf.header_application_data
|
||||
|
||||
_show_header_data(data, format=ns.format)
|
||||
elif ns.filter or ns.all:
|
||||
if ns.filter:
|
||||
resources = _filter_resources(rf, ns.filter)
|
||||
elif ns.part == "all":
|
||||
data = rf.header_system_data + rf.header_application_data
|
||||
else:
|
||||
resources = []
|
||||
for reses in rf.values():
|
||||
resources.extend(reses.values())
|
||||
raise AssertionError(f"Unhandled --part: {ns.part!r}")
|
||||
|
||||
if ns.sort:
|
||||
resources.sort(key=lambda res: (res.type, res.id))
|
||||
|
||||
_show_filtered_resources(resources, format=ns.format, decompress=ns.decompress)
|
||||
if ns.format == "hex":
|
||||
raw_hexdump(data)
|
||||
elif ns.format == "raw":
|
||||
sys.stdout.buffer.write(data)
|
||||
else:
|
||||
raise AssertionError(f"Unhandled --format: {ns.format!r}")
|
||||
else:
|
||||
_list_resource_file(rf, sort=ns.sort, group=ns.group, decompress=ns.decompress)
|
||||
raise AssertionError(f"Unhandled --format: {ns.format!r}")
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def do_info(ns: argparse.Namespace) -> typing.NoReturn:
|
||||
with open_resource_file(ns.file, fork=ns.fork) as rf:
|
||||
print("System-reserved header data:")
|
||||
hexdump(rf.header_system_data)
|
||||
print()
|
||||
print("Application-specific header data:")
|
||||
hexdump(rf.header_application_data)
|
||||
print()
|
||||
|
||||
print(f"Resource data starts at {rf.data_offset:#x} and is {rf.data_length:#x} bytes long")
|
||||
print(f"Resource map starts at {rf.map_offset:#x} and is {rf.map_length:#x} bytes long")
|
||||
attrs = decompose_flags(rf.file_attributes)
|
||||
if attrs:
|
||||
attrs_desc = " | ".join(attr.name for attr in attrs)
|
||||
else:
|
||||
attrs_desc = "(none)"
|
||||
print(f"Resource map attributes: {attrs_desc}")
|
||||
print(f"Resource map type list starts at {rf.map_type_list_offset:#x} (relative to map start) and contains {len(rf)} types")
|
||||
print(f"Resource map name list starts at {rf.map_name_list_offset:#x} (relative to map start)")
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def do_list(ns: argparse.Namespace) -> typing.NoReturn:
|
||||
with open_resource_file(ns.file, fork=ns.fork) as rf:
|
||||
if not rf:
|
||||
print("No resources (empty resource file)")
|
||||
else:
|
||||
resources = list(filter_resources(rf, ns.filter))
|
||||
list_resources(resources, sort=ns.sort, group=ns.group, decompress=ns.decompress)
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def do_resource_info(ns: argparse.Namespace) -> typing.NoReturn:
|
||||
with open_resource_file(ns.file, fork=ns.fork) as rf:
|
||||
resources = list(filter_resources(rf, ns.filter))
|
||||
|
||||
if ns.sort:
|
||||
resources.sort(key=lambda res: (res.type, res.id))
|
||||
|
||||
if not resources:
|
||||
print("No resources matched the filter")
|
||||
sys.exit(0)
|
||||
|
||||
for res in resources:
|
||||
quoted_restype = bytes_quote(res.type, "'")
|
||||
print(f"Resource {quoted_restype} ({res.id}):")
|
||||
|
||||
if res.name is None:
|
||||
print("\tName: none (unnamed)")
|
||||
else:
|
||||
assert res.name_offset is not None
|
||||
quoted_name = bytes_quote(res.name, '"')
|
||||
print(f'\tName: {quoted_name} (at offset {res.name_offset} in name list)')
|
||||
|
||||
attrs = decompose_flags(res.attributes)
|
||||
if attrs:
|
||||
attrs_desc = " | ".join(attr.name for attr in attrs)
|
||||
else:
|
||||
attrs_desc = "(none)"
|
||||
print(f"\tAttributes: {attrs_desc}")
|
||||
|
||||
print(f"\tData: {res.length_raw} bytes stored at offset {res.data_raw_offset} in resource file data")
|
||||
|
||||
if api.ResourceAttrs.resCompressed in res.attributes and ns.decompress:
|
||||
print()
|
||||
print("\tCompressed resource header info:")
|
||||
try:
|
||||
res.compressed_info
|
||||
except compress.DecompressError:
|
||||
print("\t\t(failed to parse compressed resource header)")
|
||||
else:
|
||||
assert res.compressed_info is not None
|
||||
for line in format_compressed_header_info(res.compressed_info):
|
||||
print(f"\t\t{line}")
|
||||
|
||||
print()
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def do_read(ns: argparse.Namespace) -> typing.NoReturn:
|
||||
with open_resource_file(ns.file, fork=ns.fork) as rf:
|
||||
resources = list(filter_resources(rf, ns.filter))
|
||||
|
||||
if ns.sort:
|
||||
resources.sort(key=lambda res: (res.type, res.id))
|
||||
|
||||
show_filtered_resources(resources, format=ns.format, decompress=ns.decompress)
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def do_raw_compress_info(ns: argparse.Namespace) -> typing.NoReturn:
|
||||
if ns.input_file == "-":
|
||||
in_stream = sys.stdin.buffer
|
||||
close_in_stream = False
|
||||
else:
|
||||
in_stream = open(ns.input_file, "rb")
|
||||
close_in_stream = True
|
||||
|
||||
try:
|
||||
for line in format_compressed_header_info(compress.CompressedHeaderInfo.parse_stream(in_stream)):
|
||||
print(line)
|
||||
finally:
|
||||
if close_in_stream:
|
||||
in_stream.close()
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def do_raw_decompress(ns: argparse.Namespace) -> typing.NoReturn:
|
||||
if ns.input_file == "-":
|
||||
in_stream = sys.stdin.buffer
|
||||
close_in_stream = False
|
||||
else:
|
||||
in_stream = open(ns.input_file, "rb")
|
||||
close_in_stream = True
|
||||
|
||||
try:
|
||||
header_info = compress.CompressedHeaderInfo.parse_stream(in_stream)
|
||||
|
||||
# Open the output file only after parsing the header, so that the file is only created (or its existing contents deleted) if the input file is valid.
|
||||
if ns.output_file == "-":
|
||||
if ns.debug:
|
||||
print("Cannot use --debug if the decompression output file is - (stdout).", file=sys.stderr)
|
||||
print("The debug output goes to stdout and would conflict with the decompressed data.", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
out_stream = sys.stdout.buffer
|
||||
close_out_stream = False
|
||||
else:
|
||||
out_stream = open(ns.output_file, "wb")
|
||||
close_out_stream = True
|
||||
|
||||
try:
|
||||
for chunk in compress.decompress_stream_parsed(header_info, in_stream, debug=ns.debug):
|
||||
out_stream.write(chunk)
|
||||
finally:
|
||||
if close_out_stream:
|
||||
out_stream.close()
|
||||
finally:
|
||||
if close_in_stream:
|
||||
in_stream.close()
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main() -> typing.NoReturn:
|
||||
"""Main function of the CLI.
|
||||
|
||||
This function is a valid setuptools entry point. Arguments are passed in sys.argv, and every execution path ends with a sys.exit call. (setuptools entry points are also permitted to return an integer, which will be treated as an exit code. We do not use this feature and instead always call sys.exit ourselves.)
|
||||
"""
|
||||
|
||||
ap = argparse.ArgumentParser(
|
||||
description="""
|
||||
%(prog)s is a tool for working with Classic Mac OS resource files.
|
||||
Currently this tool can only read resource files; modifying/writing resource
|
||||
files is not supported yet.
|
||||
|
||||
Note: This tool is intended for human users. The output format is not
|
||||
machine-readable and may change at any time. The command-line syntax usually
|
||||
does not change much across versions, but this should not be relied on.
|
||||
Automated scripts and programs should use the Python API provided by the
|
||||
rsrcfork library, which this tool is a part of.
|
||||
""",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
allow_abbrev=False,
|
||||
add_help=False,
|
||||
)
|
||||
|
||||
ap.add_argument("--help", action="help", help="Display this help message and exit.")
|
||||
ap.add_argument("--version", action="version", version=__version__, help="Display version information and exit.")
|
||||
|
||||
subs = ap.add_subparsers(
|
||||
dest="subcommand",
|
||||
# TODO Add required=True (added in Python 3.7) once we drop Python 3.6 compatibility.
|
||||
metavar="SUBCOMMAND",
|
||||
)
|
||||
|
||||
ap_read_header = make_subcommand_parser(
|
||||
subs,
|
||||
"read-header",
|
||||
help="Read the header data from a resource file.",
|
||||
description="""
|
||||
Read and output a resource file's header data.
|
||||
|
||||
The header data consists of two parts:
|
||||
|
||||
The system-reserved data is 112 bytes long and used by the Classic Mac OS
|
||||
Finder as temporary storage space. It usually contains parts of the
|
||||
file metadata (name, type/creator code, etc.).
|
||||
|
||||
The application-specific data is 128 bytes long and is available for use by
|
||||
applications. In practice it usually contains junk data that happened to be in
|
||||
memory when the resource file was written.
|
||||
|
||||
Mac OS X does not use the header data fields anymore. Resource files written
|
||||
on Mac OS X normally have both parts of the header data set to all zero bytes.
|
||||
""",
|
||||
)
|
||||
|
||||
ap_read_header.add_argument("--format", choices=["dump", "dump-text", "hex", "raw"], default="dump", help="How to output the header data: human-readable info with hex dump (dump) (default), human-readable info with newline-translated data (dump-text), data only as hex (hex), or data only as raw bytes (raw). Default: %(default)s")
|
||||
ap_read_header.add_argument("--part", choices=["system", "application", "all"], default="all", help="Which part of the header to read. Default: %(default)s")
|
||||
add_resource_file_args(ap_read_header)
|
||||
|
||||
ap_info = make_subcommand_parser(
|
||||
subs,
|
||||
"info",
|
||||
help="Display technical information about the resource file.",
|
||||
description="""
|
||||
Display technical information and stats about the resource file.
|
||||
""",
|
||||
)
|
||||
add_resource_file_args(ap_info)
|
||||
|
||||
ap_list = make_subcommand_parser(
|
||||
subs,
|
||||
"list",
|
||||
help="List the resources in a file.",
|
||||
description=f"""
|
||||
List the resources stored in a resource file.
|
||||
|
||||
Each resource's type, ID, name (if any), attributes (if any), and data length
|
||||
are displayed. For compressed resources, the compressed and decompressed data
|
||||
length are displayed, as well as the ID of the 'dcmp' resource used to
|
||||
decompress the resource data.
|
||||
|
||||
{RESOURCE_FILTER_HELP}
|
||||
""",
|
||||
)
|
||||
|
||||
ap_list.add_argument("--no-decompress", action="store_false", dest="decompress", help="Do not parse the data header of compressed resources and only output their compressed length.")
|
||||
ap_list.add_argument("--group", action="store", choices=["none", "type", "id"], default="type", help="Group resources by type or ID, or disable grouping. Default: %(default)s")
|
||||
ap_list.add_argument("--no-sort", action="store_false", dest="sort", help="Output resources in the order in which they are stored in the file, instead of sorting them by type and ID.")
|
||||
add_resource_file_args(ap_list)
|
||||
add_resource_filter_args(ap_list)
|
||||
|
||||
ap_resource_info = make_subcommand_parser(
|
||||
subs,
|
||||
"resource-info",
|
||||
help="Display technical information about resources.",
|
||||
description=f"""
|
||||
Display technical information about one or more resources.
|
||||
|
||||
{RESOURCE_FILTER_HELP}
|
||||
""",
|
||||
)
|
||||
|
||||
ap_resource_info.add_argument("--no-decompress", action="store_false", dest="decompress", help="Do not parse the contents of compressed resources, only output regular resource information.")
|
||||
ap_resource_info.add_argument("--no-sort", action="store_false", dest="sort", help="Output resources in the order in which they are stored in the file, instead of sorting them by type and ID.")
|
||||
add_resource_file_args(ap_resource_info)
|
||||
add_resource_filter_args(ap_resource_info)
|
||||
|
||||
ap_read = make_subcommand_parser(
|
||||
subs,
|
||||
"read",
|
||||
help="Read data from resources.",
|
||||
description=f"""
|
||||
Read the data of one or more resources.
|
||||
|
||||
{RESOURCE_FILTER_HELP}
|
||||
""",
|
||||
)
|
||||
|
||||
ap_read.add_argument("--no-decompress", action="store_false", dest="decompress", help="Do not decompress compressed resources, output the raw compressed resource data.")
|
||||
ap_read.add_argument("--format", choices=["dump", "dump-text", "hex", "raw", "derez"], default="dump", help="How to output the resources: human-readable info with hex dump (dump), human-readable info with newline-translated data (dump-text), data only as hex (hex), data only as raw bytes (raw), or like DeRez with no resource definitions (derez). Default: %(default)s")
|
||||
ap_read.add_argument("--no-sort", action="store_false", dest="sort", help="Output resources in the order in which they are stored in the file, instead of sorting them by type and ID.")
|
||||
add_resource_file_args(ap_read)
|
||||
add_resource_filter_args(ap_read)
|
||||
|
||||
ap_raw_compress_info = make_subcommand_parser(
|
||||
subs,
|
||||
"raw-compress-info",
|
||||
help="Display technical information about raw compressed resource data.",
|
||||
description="""
|
||||
Display technical information about raw compressed resource data that is stored
|
||||
in a standalone file and not as a resource in a resource file.
|
||||
""",
|
||||
)
|
||||
|
||||
ap_raw_compress_info.add_argument("input_file", help="The file from which to read the compressed resource data, or - for stdin.")
|
||||
|
||||
ap_raw_decompress = make_subcommand_parser(
|
||||
subs,
|
||||
"raw-decompress",
|
||||
help="Decompress raw compressed resource data.",
|
||||
description="""
|
||||
Decompress raw compressed resource data that is stored in a standalone file
|
||||
and not as a resource in a resource file.
|
||||
|
||||
This subcommand can be used in a shell pipeline by passing - as the input and
|
||||
output file name, i. e. "%(prog)s - -".
|
||||
|
||||
Note: All other rsrcfork subcommands natively support compressed resources and
|
||||
will automatically decompress them as needed. This subcommand is only needed
|
||||
to decompress resource data that has been read from a resource file in
|
||||
compressed form (e. g. using --no-decompress or another tool that does not
|
||||
handle resource compression).
|
||||
""",
|
||||
)
|
||||
|
||||
ap_raw_decompress.add_argument("--debug", action="store_true", help="Display debugging output from the decompressor on stdout. Cannot be used if the output file is - (stdout).")
|
||||
|
||||
ap_raw_decompress.add_argument("input_file", help="The file from which to read the compressed resource data, or - for stdin.")
|
||||
ap_raw_decompress.add_argument("output_file", help="The file to which to write the decompressed resource data, or - for stdout.")
|
||||
|
||||
ns = ap.parse_args()
|
||||
|
||||
if ns.subcommand is None:
|
||||
# TODO Remove this branch once we drop Python 3.6 compatibility, because this case will be handled by passing required=True to add_subparsers (see above).
|
||||
print("Missing subcommand", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
elif ns.subcommand == "read-header":
|
||||
do_read_header(ns)
|
||||
elif ns.subcommand == "info":
|
||||
do_info(ns)
|
||||
elif ns.subcommand == "list":
|
||||
do_list(ns)
|
||||
elif ns.subcommand == "resource-info":
|
||||
do_resource_info(ns)
|
||||
elif ns.subcommand == "read":
|
||||
do_read(ns)
|
||||
elif ns.subcommand == "raw-compress-info":
|
||||
do_raw_compress_info(ns)
|
||||
elif ns.subcommand == "raw-decompress":
|
||||
do_raw_decompress(ns)
|
||||
else:
|
||||
raise AssertionError(f"Subcommand not handled: {ns.subcommand!r}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
168
rsrcfork/api.py
@ -59,9 +59,11 @@ STRUCT_RESOURCE_REFERENCE = struct.Struct(">hHI4x")
|
||||
# 1 byte: Length of following resource name.
|
||||
STRUCT_RESOURCE_NAME_HEADER = struct.Struct(">B")
|
||||
|
||||
|
||||
class InvalidResourceFileError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ResourceFileAttrs(enum.Flag):
|
||||
"""Resource file attribute flags. The descriptions for these flags are taken from comments on the map*Bit and map* enum constants in <CarbonCore/Resources.h>."""
|
||||
|
||||
@ -82,6 +84,7 @@ class ResourceFileAttrs(enum.Flag):
|
||||
_BIT_1 = 1 << 1
|
||||
_BIT_0 = 1 << 0
|
||||
|
||||
|
||||
class ResourceAttrs(enum.Flag):
|
||||
"""Resource attribute flags. The descriptions for these flags are taken from comments on the res*Bit and res* enum constants in <CarbonCore/Resources.h>."""
|
||||
|
||||
@ -94,27 +97,35 @@ class ResourceAttrs(enum.Flag):
|
||||
resChanged = 1 << 1 # "Existing resource changed since last update", "Resource changed?"
|
||||
resCompressed = 1 << 0 # "indicates that the resource data is compressed" (only documented in https://github.com/kreativekorp/ksfl/wiki/Macintosh-Resource-File-Format)
|
||||
|
||||
|
||||
class Resource(object):
|
||||
"""A single resource from a resource file."""
|
||||
|
||||
_resfile: "ResourceFile"
|
||||
type: bytes
|
||||
id: int
|
||||
name: typing.Optional[bytes]
|
||||
name_offset: int
|
||||
_name: typing.Optional[bytes]
|
||||
attributes: ResourceAttrs
|
||||
data_raw: bytes
|
||||
data_raw_offset: int
|
||||
_data_raw: bytes
|
||||
_compressed_info: compress.common.CompressedHeaderInfo
|
||||
_data_decompressed: bytes
|
||||
|
||||
def __init__(self, resource_type: bytes, resource_id: int, name: typing.Optional[bytes], attributes: ResourceAttrs, data_raw: bytes) -> None:
|
||||
"""Create a new resource with the given type code, ID, name, attributes, and data."""
|
||||
def __init__(self, resfile: "ResourceFile", resource_type: bytes, resource_id: int, name_offset: int, attributes: ResourceAttrs, data_raw_offset: int) -> None:
|
||||
"""Create a resource object representing a resource stored in a resource file.
|
||||
|
||||
External code should not call this constructor manually. Resources should be looked up through a ResourceFile object instead.
|
||||
"""
|
||||
|
||||
super().__init__()
|
||||
|
||||
self._resfile = resfile
|
||||
self.type = resource_type
|
||||
self.id = resource_id
|
||||
self.name = name
|
||||
self.name_offset = name_offset
|
||||
self.attributes = attributes
|
||||
self.data_raw = data_raw
|
||||
self.data_raw_offset = data_raw_offset
|
||||
|
||||
def __repr__(self) -> str:
|
||||
try:
|
||||
@ -126,14 +137,14 @@ class Resource(object):
|
||||
decompress_ok = True
|
||||
|
||||
if len(data) > 32:
|
||||
data_repr = f"<{len(data)} bytes: {data[:32]}...>"
|
||||
data_repr = f"<{len(data)} bytes: {data[:32]!r}...>"
|
||||
else:
|
||||
data_repr = repr(data)
|
||||
|
||||
if not decompress_ok:
|
||||
data_repr = f"<decompression failed - compressed data: {data_repr}>"
|
||||
|
||||
return f"{type(self).__module__}.{type(self).__qualname__}(type={self.type}, id={self.id}, name={self.name}, attributes={self.attributes}, data={data_repr})"
|
||||
return f"<{type(self).__qualname__} type {self.type!r}, id {self.id}, name {self.name!r}, attributes {self.attributes}, data {data_repr}>"
|
||||
|
||||
@property
|
||||
def resource_type(self) -> bytes:
|
||||
@ -145,6 +156,30 @@ class Resource(object):
|
||||
warnings.warn(DeprecationWarning("The resource_id attribute has been deprecated and will be removed in a future version. Please use the id attribute instead."))
|
||||
return self.id
|
||||
|
||||
@property
|
||||
def name(self) -> typing.Optional[bytes]:
|
||||
try:
|
||||
return self._name
|
||||
except AttributeError:
|
||||
if self.name_offset == 0xffff:
|
||||
self._name = None
|
||||
else:
|
||||
self._resfile._stream.seek(self._resfile.map_offset + self._resfile.map_name_list_offset + self.name_offset)
|
||||
(name_length,) = self._resfile._stream_unpack(STRUCT_RESOURCE_NAME_HEADER)
|
||||
self._name = self._resfile._read_exact(name_length)
|
||||
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def data_raw(self) -> bytes:
|
||||
try:
|
||||
return self._data_raw
|
||||
except AttributeError:
|
||||
self._resfile._stream.seek(self._resfile.data_offset + self.data_raw_offset)
|
||||
(data_raw_length,) = self._resfile._stream_unpack(STRUCT_RESOURCE_DATA_HEADER)
|
||||
self._data_raw = self._resfile._read_exact(data_raw_length)
|
||||
return self._data_raw
|
||||
|
||||
@property
|
||||
def compressed_info(self) -> typing.Optional[compress.common.CompressedHeaderInfo]:
|
||||
"""The compressed resource header information, or None if this resource is not compressed.
|
||||
@ -198,65 +233,56 @@ class Resource(object):
|
||||
else:
|
||||
return self.data_raw
|
||||
|
||||
|
||||
class _LazyResourceMap(typing.Mapping[int, Resource]):
|
||||
"""Internal class: Read-only wrapper for a mapping of resource IDs to resource objects.
|
||||
|
||||
This class behaves like a normal read-only mapping. The main difference to a plain dict (or similar mapping) is that this mapping has a specialized repr to avoid excessive output when working in the REPL.
|
||||
"""
|
||||
|
||||
type: bytes
|
||||
_submap: typing.Mapping[int, Resource]
|
||||
|
||||
def __init__(self, resource_type: bytes, submap: typing.Mapping[int, Resource]) -> None:
|
||||
"""Create a new _LazyResourceMap that wraps the given mapping."""
|
||||
|
||||
super().__init__()
|
||||
|
||||
self.type = resource_type
|
||||
self._submap = submap
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""Get the number of resources with this type code."""
|
||||
|
||||
return len(self._submap)
|
||||
|
||||
def __iter__(self) -> typing.Iterator[int]:
|
||||
"""Iterate over the IDs of all resources with this type code."""
|
||||
|
||||
return iter(self._submap)
|
||||
|
||||
def __contains__(self, key: object) -> bool:
|
||||
"""Check if a resource with the given ID exists for this type code."""
|
||||
|
||||
return key in self._submap
|
||||
|
||||
def __getitem__(self, key: int) -> Resource:
|
||||
"""Get a resource with the given ID for this type code."""
|
||||
|
||||
return self._submap[key]
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if len(self) == 1:
|
||||
contents = f"one resource: {next(iter(self.values()))}"
|
||||
else:
|
||||
contents = f"{len(self)} resources with IDs {list(self)}"
|
||||
|
||||
return f"<Resource map for type {self.type!r}, containing {contents}>"
|
||||
|
||||
|
||||
class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.ContextManager["ResourceFile"]):
|
||||
"""A resource file reader operating on a byte stream."""
|
||||
|
||||
# noinspection PyProtectedMember
|
||||
class _LazyResourceMap(typing.Mapping[int, Resource]):
|
||||
"""Internal class: Lazy mapping of resource IDs to resource objects, returned when subscripting a ResourceFile."""
|
||||
|
||||
_resfile: "ResourceFile"
|
||||
_restype: bytes
|
||||
_submap: typing.Mapping[int, typing.Tuple[int, ResourceAttrs, int]]
|
||||
|
||||
def __init__(self, resfile: "ResourceFile", restype: bytes) -> None:
|
||||
"""Create a new _LazyResourceMap "containing" all resources in resfile that have the type code restype."""
|
||||
|
||||
super().__init__()
|
||||
|
||||
self._resfile = resfile
|
||||
self._restype = restype
|
||||
self._submap = self._resfile._references[self._restype]
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""Get the number of resources with this type code."""
|
||||
|
||||
return len(self._submap)
|
||||
|
||||
def __iter__(self) -> typing.Iterator[int]:
|
||||
"""Iterate over the IDs of all resources with this type code."""
|
||||
|
||||
return iter(self._submap)
|
||||
|
||||
def __contains__(self, key: object) -> bool:
|
||||
"""Check if a resource with the given ID exists for this type code."""
|
||||
|
||||
return key in self._submap
|
||||
|
||||
def __getitem__(self, key: int) -> Resource:
|
||||
"""Get a resource with the given ID for this type code."""
|
||||
|
||||
name_offset, attributes, data_offset = self._submap[key]
|
||||
|
||||
if name_offset == 0xffff:
|
||||
name = None
|
||||
else:
|
||||
self._resfile._stream.seek(self._resfile.map_offset + self._resfile.map_name_list_offset + name_offset)
|
||||
(name_length,) = self._resfile._stream_unpack(STRUCT_RESOURCE_NAME_HEADER)
|
||||
name = self._resfile._read_exact(name_length)
|
||||
|
||||
self._resfile._stream.seek(self._resfile.data_offset + data_offset)
|
||||
(data_length,) = self._resfile._stream_unpack(STRUCT_RESOURCE_DATA_HEADER)
|
||||
data = self._resfile._read_exact(data_length)
|
||||
|
||||
return Resource(self._restype, key, name, attributes, data)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if len(self) == 1:
|
||||
return f"<{type(self).__module__}.{type(self).__qualname__} at {id(self):#x} containing one resource: {next(iter(self.values()))}>"
|
||||
else:
|
||||
return f"<{type(self).__module__}.{type(self).__qualname__} at {id(self):#x} containing {len(self)} resources with IDs: {list(self)}>"
|
||||
|
||||
_close_stream: bool
|
||||
_stream: typing.BinaryIO
|
||||
|
||||
@ -272,10 +298,10 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
|
||||
file_attributes: ResourceFileAttrs
|
||||
|
||||
_reference_counts: typing.MutableMapping[bytes, int]
|
||||
_references: typing.MutableMapping[bytes, typing.MutableMapping[int, typing.Tuple[int, ResourceAttrs, int]]]
|
||||
_references: typing.MutableMapping[bytes, typing.MutableMapping[int, Resource]]
|
||||
|
||||
@classmethod
|
||||
def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str="auto", **kwargs: typing.Any) -> "ResourceFile":
|
||||
def open(cls, filename: typing.Union[str, os.PathLike], *, fork: str = "auto", **kwargs: typing.Any) -> "ResourceFile":
|
||||
"""Open the file at the given path as a ResourceFile.
|
||||
|
||||
The fork parameter controls which fork of the file the resource data will be read from. It accepts the following values:
|
||||
@ -334,7 +360,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
|
||||
else:
|
||||
raise ValueError(f"Unsupported value for the fork parameter: {fork!r}")
|
||||
|
||||
def __init__(self, stream: typing.BinaryIO, *, close: bool=False) -> None:
|
||||
def __init__(self, stream: typing.BinaryIO, *, close: bool = False) -> None:
|
||||
"""Create a ResourceFile wrapping the given byte stream.
|
||||
|
||||
To read resource file data from a bytes object, wrap it in an io.BytesIO.
|
||||
@ -433,7 +459,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
|
||||
self._references = collections.OrderedDict()
|
||||
|
||||
for resource_type, count in self._reference_counts.items():
|
||||
resmap: typing.MutableMapping[int, typing.Tuple[int, ResourceAttrs, int]] = collections.OrderedDict()
|
||||
resmap: typing.MutableMapping[int, Resource] = collections.OrderedDict()
|
||||
self._references[resource_type] = resmap
|
||||
for _ in range(count):
|
||||
(
|
||||
@ -445,7 +471,7 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
|
||||
attributes = attributes_and_data_offset >> 24
|
||||
data_offset = attributes_and_data_offset & ((1 << 24) - 1)
|
||||
|
||||
resmap[resource_id] = (name_offset, ResourceAttrs(attributes), data_offset)
|
||||
resmap[resource_id] = Resource(self, resource_type, resource_id, name_offset, ResourceAttrs(attributes), data_offset)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close this ResourceFile.
|
||||
@ -483,10 +509,10 @@ class ResourceFile(typing.Mapping[bytes, typing.Mapping[int, Resource]], typing.
|
||||
|
||||
return key in self._references
|
||||
|
||||
def __getitem__(self, key: bytes) -> "ResourceFile._LazyResourceMap":
|
||||
def __getitem__(self, key: bytes) -> "_LazyResourceMap":
|
||||
"""Get a lazy mapping of all resources with the given type in this ResourceFile."""
|
||||
|
||||
return ResourceFile._LazyResourceMap(self, key)
|
||||
return _LazyResourceMap(key, self._references[key])
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<{type(self).__module__}.{type(self).__qualname__} at {id(self):#x}, attributes {self.file_attributes}, containing {len(self)} resource types: {list(self)}>"
|
||||
|
@ -1,44 +1,68 @@
|
||||
import io
|
||||
import typing
|
||||
|
||||
from . import dcmp0
|
||||
from . import dcmp1
|
||||
from . import dcmp2
|
||||
|
||||
from .common import DecompressError, CompressedApplicationHeaderInfo, CompressedHeaderInfo, CompressedSystemHeaderInfo
|
||||
from .common import DecompressError, CompressedHeaderInfo, CompressedType8HeaderInfo, CompressedType9HeaderInfo
|
||||
|
||||
__all__ = [
|
||||
"CompressedHeaderInfo",
|
||||
"CompressedType8HeaderInfo",
|
||||
"CompressedType9HeaderInfo",
|
||||
"DecompressError",
|
||||
"decompress",
|
||||
"decompress_parsed",
|
||||
"decompress_stream",
|
||||
"decompress_stream_parsed",
|
||||
]
|
||||
|
||||
|
||||
# Maps 'dcmp' IDs to their corresponding Python implementations.
|
||||
# Each decompressor has the signature (header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes.
|
||||
# Each decompressor has the signature (header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool=False) -> typing.Iterator[bytes].
|
||||
DECOMPRESSORS = {
|
||||
0: dcmp0.decompress,
|
||||
1: dcmp1.decompress,
|
||||
2: dcmp2.decompress,
|
||||
0: dcmp0.decompress_stream,
|
||||
1: dcmp1.decompress_stream,
|
||||
2: dcmp2.decompress_stream,
|
||||
}
|
||||
|
||||
|
||||
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress the given compressed resource data, whose header has already been removed and parsed into a CompressedHeaderInfo object."""
|
||||
def decompress_stream_parsed(header_info: CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed resource data from a stream, whose header has already been read and parsed into a CompressedHeaderInfo object."""
|
||||
|
||||
try:
|
||||
decompress_func = DECOMPRESSORS[header_info.dcmp_id]
|
||||
except KeyError:
|
||||
raise DecompressError(f"Unsupported 'dcmp' ID: {header_info.dcmp_id}")
|
||||
|
||||
decompressed = decompress_func(header_info, data, debug=debug)
|
||||
if len(decompressed) != header_info.decompressed_length:
|
||||
raise DecompressError(f"Actual length of decompressed data ({len(decompressed)}) does not match length stored in resource ({header_info.decompressed_length})")
|
||||
return decompressed
|
||||
|
||||
|
||||
def decompress(data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress the given compressed resource data."""
|
||||
decompressed_length = 0
|
||||
for chunk in decompress_func(header_info, stream, debug=debug):
|
||||
decompressed_length += len(chunk)
|
||||
yield chunk
|
||||
|
||||
header_info = CompressedHeaderInfo.parse(data)
|
||||
if decompressed_length != header_info.decompressed_length:
|
||||
raise DecompressError(f"Actual length of decompressed data ({decompressed_length}) does not match length stored in resource ({header_info.decompressed_length})")
|
||||
|
||||
|
||||
def decompress_parsed(header_info: CompressedHeaderInfo, data: bytes, *, debug: bool = False) -> bytes:
|
||||
"""Decompress the given compressed resource data, whose header has already been removed and parsed into a CompressedHeaderInfo object."""
|
||||
|
||||
return b"".join(decompress_stream_parsed(header_info, io.BytesIO(data), debug=debug))
|
||||
|
||||
|
||||
def decompress_stream(stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed resource data from a stream."""
|
||||
|
||||
header_info = CompressedHeaderInfo.parse_stream(stream)
|
||||
|
||||
if debug:
|
||||
print(f"Compressed resource data header: {header_info}")
|
||||
|
||||
return decompress_parsed(header_info, data[header_info.header_length:], debug=debug)
|
||||
yield from decompress_stream_parsed(header_info, stream, debug=debug)
|
||||
|
||||
|
||||
def decompress(data: bytes, *, debug: bool = False) -> bytes:
|
||||
"""Decompress the given compressed resource data."""
|
||||
|
||||
return b"".join(decompress_stream(io.BytesIO(data), debug=debug))
|
||||
|
@ -1,3 +1,4 @@
|
||||
import io
|
||||
import struct
|
||||
import typing
|
||||
|
||||
@ -8,58 +9,62 @@ class DecompressError(Exception):
|
||||
|
||||
# The signature of all compressed resource data, 0xa89f6572 in hex, or "®üer" in MacRoman.
|
||||
COMPRESSED_SIGNATURE = b"\xa8\x9fer"
|
||||
# The compression type commonly used for application resources.
|
||||
COMPRESSED_TYPE_APPLICATION = 0x0801
|
||||
# The compression type commonly used for System file resources.
|
||||
COMPRESSED_TYPE_SYSTEM = 0x0901
|
||||
# The number of the "type 8" compression type. This type is used in the Finder, ResEdit, and some other system files.
|
||||
COMPRESSED_TYPE_8 = 0x0801
|
||||
# The number of the "type 9" compression type. This type is used in the System file and System 7.5's Installer.
|
||||
COMPRESSED_TYPE_9 = 0x0901
|
||||
|
||||
# Common header for compressed resources of all types.
|
||||
# 4 bytes: Signature (see above).
|
||||
# 2 bytes: Length of the complete header (this common part and the type-specific part that follows it). (This meaning is just a guess - the field's value is always 0x0012, so there's no way to know for certain what it means.)
|
||||
# 2 bytes: Compression type. Known so far: 0x0901 is used in the System file's resources. 0x0801 is used in other files' resources.
|
||||
# 2 bytes: Compression type. Known so far: 0x0801 ("type 8") and 0x0901 ("type 9").
|
||||
# 4 bytes: Length of the data after decompression.
|
||||
# 6 bytes: Remainder of the header. The exact format varies depending on the compression type.
|
||||
STRUCT_COMPRESSED_HEADER = struct.Struct(">4sHHI6s")
|
||||
|
||||
# Remainder of header for an "application" compressed resource.
|
||||
# Remainder of header for a "type 8" compressed resource.
|
||||
# 1 byte: "Working buffer fractional size" - the ratio of the compressed data size to the uncompressed data size, times 256.
|
||||
# 1 byte: "Expansion buffer size" - the maximum number of bytes that the data might grow during decompression.
|
||||
# 2 bytes: The ID of the 'dcmp' resource that can decompress this resource. Currently only ID 0 is supported.
|
||||
# 2 bytes: Reserved (always zero).
|
||||
STRUCT_COMPRESSED_APPLICATION_HEADER = struct.Struct(">BBhH")
|
||||
STRUCT_COMPRESSED_TYPE_8_HEADER = struct.Struct(">BBhH")
|
||||
|
||||
# Remainder of header for a "system" compressed resource.
|
||||
# Remainder of header for a "type 9" compressed resource.
|
||||
# 2 bytes: The ID of the 'dcmp' resource that can decompress this resource. Currently only ID 2 is supported.
|
||||
# 4 bytes: Decompressor-specific parameters.
|
||||
STRUCT_COMPRESSED_SYSTEM_HEADER = struct.Struct(">h4s")
|
||||
STRUCT_COMPRESSED_TYPE_9_HEADER = struct.Struct(">h4s")
|
||||
|
||||
|
||||
class CompressedHeaderInfo(object):
|
||||
@classmethod
|
||||
def parse(cls, data: bytes) -> "CompressedHeaderInfo":
|
||||
def parse_stream(cls, stream: typing.BinaryIO) -> "CompressedHeaderInfo":
|
||||
try:
|
||||
signature, header_length, compression_type, decompressed_length, remainder = STRUCT_COMPRESSED_HEADER.unpack_from(data)
|
||||
signature, header_length, compression_type, decompressed_length, remainder = STRUCT_COMPRESSED_HEADER.unpack(stream.read(STRUCT_COMPRESSED_HEADER.size))
|
||||
except struct.error:
|
||||
raise DecompressError(f"Invalid header")
|
||||
raise DecompressError("Invalid header")
|
||||
if signature != COMPRESSED_SIGNATURE:
|
||||
raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE}")
|
||||
raise DecompressError(f"Invalid signature: {signature!r}, expected {COMPRESSED_SIGNATURE!r}")
|
||||
if header_length != 0x12:
|
||||
raise DecompressError(f"Unsupported header length: 0x{header_length:>04x}, expected 0x12")
|
||||
|
||||
if compression_type == COMPRESSED_TYPE_APPLICATION:
|
||||
working_buffer_fractional_size, expansion_buffer_size, dcmp_id, reserved = STRUCT_COMPRESSED_APPLICATION_HEADER.unpack(remainder)
|
||||
if compression_type == COMPRESSED_TYPE_8:
|
||||
working_buffer_fractional_size, expansion_buffer_size, dcmp_id, reserved = STRUCT_COMPRESSED_TYPE_8_HEADER.unpack(remainder)
|
||||
|
||||
if reserved != 0:
|
||||
raise DecompressError(f"Reserved field should be 0, not 0x{reserved:>04x}")
|
||||
|
||||
return CompressedApplicationHeaderInfo(header_length, compression_type, decompressed_length, dcmp_id, working_buffer_fractional_size, expansion_buffer_size)
|
||||
elif compression_type == COMPRESSED_TYPE_SYSTEM:
|
||||
dcmp_id, parameters = STRUCT_COMPRESSED_SYSTEM_HEADER.unpack(remainder)
|
||||
return CompressedType8HeaderInfo(header_length, compression_type, decompressed_length, dcmp_id, working_buffer_fractional_size, expansion_buffer_size)
|
||||
elif compression_type == COMPRESSED_TYPE_9:
|
||||
dcmp_id, parameters = STRUCT_COMPRESSED_TYPE_9_HEADER.unpack(remainder)
|
||||
|
||||
return CompressedSystemHeaderInfo(header_length, compression_type, decompressed_length, dcmp_id, parameters)
|
||||
return CompressedType9HeaderInfo(header_length, compression_type, decompressed_length, dcmp_id, parameters)
|
||||
else:
|
||||
raise DecompressError(f"Unsupported compression type: 0x{compression_type:>04x}")
|
||||
|
||||
@classmethod
|
||||
def parse(cls, data: bytes) -> "CompressedHeaderInfo":
|
||||
return cls.parse_stream(io.BytesIO(data))
|
||||
|
||||
header_length: int
|
||||
compression_type: int
|
||||
decompressed_length: int
|
||||
@ -74,7 +79,7 @@ class CompressedHeaderInfo(object):
|
||||
self.dcmp_id = dcmp_id
|
||||
|
||||
|
||||
class CompressedApplicationHeaderInfo(CompressedHeaderInfo):
|
||||
class CompressedType8HeaderInfo(CompressedHeaderInfo):
|
||||
working_buffer_fractional_size: int
|
||||
expansion_buffer_size: int
|
||||
|
||||
@ -88,7 +93,7 @@ class CompressedApplicationHeaderInfo(CompressedHeaderInfo):
|
||||
return f"{type(self).__qualname__}(header_length={self.header_length}, compression_type=0x{self.compression_type:>04x}, decompressed_length={self.decompressed_length}, dcmp_id={self.dcmp_id}, working_buffer_fractional_size={self.working_buffer_fractional_size}, expansion_buffer_size={self.expansion_buffer_size})"
|
||||
|
||||
|
||||
class CompressedSystemHeaderInfo(CompressedHeaderInfo):
|
||||
class CompressedType9HeaderInfo(CompressedHeaderInfo):
|
||||
parameters: bytes
|
||||
|
||||
def __init__(self, header_length: int, compression_type: int, decompressed_length: int, dcmp_id: int, parameters: bytes) -> None:
|
||||
@ -100,19 +105,101 @@ class CompressedSystemHeaderInfo(CompressedHeaderInfo):
|
||||
return f"{type(self).__qualname__}(header_length={self.header_length}, compression_type=0x{self.compression_type:>04x}, decompressed_length={self.decompressed_length}, dcmp_id={self.dcmp_id}, parameters={self.parameters!r})"
|
||||
|
||||
|
||||
def _read_variable_length_integer(data: bytes, position: int) -> typing.Tuple[int, int]:
|
||||
"""Read a variable-length integer starting at the given position in the data, and return the integer as well as the number of bytes consumed.
|
||||
if typing.TYPE_CHECKING:
|
||||
class PeekableIO(typing.Protocol):
|
||||
"""Minimal protocol for binary IO streams that support the peek method.
|
||||
|
||||
The peek method is supported by various standard Python binary IO streams, such as io.BufferedReader. If a stream does not natively support the peek method, it may be wrapped using the custom helper function make_peekable.
|
||||
"""
|
||||
|
||||
def readable(self) -> bool:
|
||||
...
|
||||
|
||||
def read(self, size: typing.Optional[int] = ...) -> bytes:
|
||||
...
|
||||
|
||||
def peek(self, size: int = ...) -> bytes:
|
||||
...
|
||||
|
||||
|
||||
class _PeekableIOWrapper(object):
|
||||
"""Wrapper class to add peek support to an existing stream. Do not instantiate this class directly, use the make_peekable function instead.
|
||||
|
||||
Python provides a standard io.BufferedReader class, which supports the peek method. However, according to its documentation, it only supports wrapping io.RawIOBase subclasses, and not streams which are already otherwise buffered.
|
||||
|
||||
Warning: this class does not perform any buffering of its own, outside of what is required to make peek work. It is strongly recommended to only wrap streams that are already buffered or otherwise fast to read from. In particular, raw streams (io.RawIOBase subclasses) should be wrapped using io.BufferedReader instead.
|
||||
"""
|
||||
|
||||
_wrapped: typing.BinaryIO
|
||||
_readahead: bytes
|
||||
|
||||
def __init__(self, wrapped: typing.BinaryIO) -> None:
|
||||
super().__init__()
|
||||
|
||||
self._wrapped = wrapped
|
||||
self._readahead = b""
|
||||
|
||||
def readable(self) -> bool:
|
||||
return self._wrapped.readable()
|
||||
|
||||
def read(self, size: typing.Optional[int] = None) -> bytes:
|
||||
if size is None or size < 0:
|
||||
ret = self._readahead + self._wrapped.read()
|
||||
self._readahead = b""
|
||||
elif size <= len(self._readahead):
|
||||
ret = self._readahead[:size]
|
||||
self._readahead = self._readahead[size:]
|
||||
else:
|
||||
ret = self._readahead + self._wrapped.read(size - len(self._readahead))
|
||||
self._readahead = b""
|
||||
|
||||
return ret
|
||||
|
||||
def peek(self, size: int = -1) -> bytes:
|
||||
if not self._readahead:
|
||||
self._readahead = self._wrapped.read(io.DEFAULT_BUFFER_SIZE if size < 0 else size)
|
||||
return self._readahead
|
||||
|
||||
|
||||
def make_peekable(stream: typing.BinaryIO) -> "PeekableIO":
|
||||
"""Wrap an arbitrary binary IO stream so that it supports the peek method.
|
||||
|
||||
The stream is wrapped as efficiently as possible (or not at all if it already supports the peek method). However, in the worst case a custom wrapper class needs to be used, which may not be particularly efficient and only supports a very minimal interface. The only methods that are guaranteed to exist on the returned stream are readable, read, and peek.
|
||||
"""
|
||||
|
||||
if hasattr(stream, "peek"):
|
||||
# Stream is already peekable, nothing to be done.
|
||||
return typing.cast("PeekableIO", stream)
|
||||
elif not typing.TYPE_CHECKING and isinstance(stream, io.RawIOBase):
|
||||
# This branch is skipped when type checking - mypy incorrectly warns about this code being unreachable, because it thinks that a typing.BinaryIO cannot be an instance of io.RawIOBase.
|
||||
# Raw IO streams can be wrapped efficiently using BufferedReader.
|
||||
return io.BufferedReader(stream)
|
||||
else:
|
||||
# Other streams need to be wrapped using our custom wrapper class.
|
||||
return _PeekableIOWrapper(stream)
|
||||
|
||||
|
||||
def read_exact(stream: typing.BinaryIO, byte_count: int) -> bytes:
|
||||
"""Read byte_count bytes from the stream and raise an exception if too few bytes are read (i. e. if EOF was hit prematurely)."""
|
||||
|
||||
data = stream.read(byte_count)
|
||||
if len(data) != byte_count:
|
||||
raise DecompressError(f"Attempted to read {byte_count} bytes of data, but only got {len(data)} bytes")
|
||||
return data
|
||||
|
||||
|
||||
def read_variable_length_integer(stream: typing.BinaryIO) -> int:
|
||||
"""Read a variable-length integer from the stream.
|
||||
|
||||
This variable-length integer format is used by the 0xfe codes in the compression formats used by 'dcmp' (0) and 'dcmp' (1).
|
||||
"""
|
||||
|
||||
assert len(data) > position
|
||||
if data[position] == 0xff:
|
||||
assert len(data) > position + 4
|
||||
return int.from_bytes(data[position+1:position+5], "big", signed=True), 5
|
||||
elif data[position] >= 0x80:
|
||||
assert len(data) > position + 1
|
||||
data_modified = bytes([(data[position] - 0xc0) & 0xff, data[position+1]])
|
||||
return int.from_bytes(data_modified, "big", signed=True), 2
|
||||
head = read_exact(stream, 1)
|
||||
|
||||
if head[0] == 0xff:
|
||||
return int.from_bytes(read_exact(stream, 4), "big", signed=True)
|
||||
elif head[0] >= 0x80:
|
||||
data_modified = bytes([(head[0] - 0xc0) & 0xff]) + read_exact(stream, 1)
|
||||
return int.from_bytes(data_modified, "big", signed=True)
|
||||
else:
|
||||
return int.from_bytes(data[position:position+1], "big", signed=True), 1
|
||||
return int.from_bytes(head, "big", signed=True)
|
||||
|
@ -38,136 +38,103 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
|
||||
assert len(TABLE) == len(range(0x4b, 0xfe))
|
||||
|
||||
|
||||
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (0)."""
|
||||
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedApplicationHeaderInfo):
|
||||
if not isinstance(header_info, common.CompressedType8HeaderInfo):
|
||||
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
|
||||
|
||||
prev_literals: typing.List[bytes] = []
|
||||
decompressed = b""
|
||||
|
||||
i = 0
|
||||
|
||||
while i < len(data):
|
||||
byte = data[i]
|
||||
while True: # Loop is terminated when the EOF marker (0xff) is encountered
|
||||
(byte,) = common.read_exact(stream, 1)
|
||||
if debug:
|
||||
print(f"Tag byte 0x{byte:>02x}, at 0x{i:x}, decompressing to 0x{len(decompressed):x}")
|
||||
print(f"Tag byte 0x{byte:>02x}")
|
||||
|
||||
if byte in range(0x00, 0x20):
|
||||
# Literal byte sequence.
|
||||
if byte in (0x00, 0x10):
|
||||
# The length of the literal data is stored in the next byte.
|
||||
count_div2 = data[i+1]
|
||||
begin = i + 2
|
||||
(count_div2,) = common.read_exact(stream, 1)
|
||||
else:
|
||||
# The length of the literal data is stored in the low nibble of the tag byte.
|
||||
count_div2 = byte >> 0 & 0xf
|
||||
begin = i + 1
|
||||
end = begin + 2*count_div2
|
||||
count = 2 * count_div2
|
||||
# Controls whether or not the literal is stored so that it can be referenced again later.
|
||||
do_store = byte >= 0x10
|
||||
literal = data[begin:end]
|
||||
literal = common.read_exact(stream, count)
|
||||
if debug:
|
||||
print(f"Literal (storing: {do_store})")
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
if do_store:
|
||||
if debug:
|
||||
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
|
||||
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
|
||||
prev_literals.append(literal)
|
||||
i = end
|
||||
yield literal
|
||||
elif byte in (0x20, 0x21):
|
||||
# Backreference to a previous literal, 2-byte form.
|
||||
# This can reference literals with index in range(0x28, 0x228).
|
||||
table_index = 0x28 + ((byte - 0x20) << 8 | data[i+1])
|
||||
i += 2
|
||||
(next_byte,) = common.read_exact(stream, 1)
|
||||
table_index = 0x28 + ((byte - 0x20) << 8 | next_byte)
|
||||
if debug:
|
||||
print(f"Backreference (2-byte form) to 0x{table_index:>02x}")
|
||||
literal = prev_literals[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
yield prev_literals[table_index]
|
||||
elif byte == 0x22:
|
||||
# Backreference to a previous literal, 3-byte form.
|
||||
# This can reference any literal with index 0x28 and higher, but is only necessary for literals with index 0x228 and higher.
|
||||
table_index = 0x28 + int.from_bytes(data[i+1:i+3], "big", signed=False)
|
||||
i += 3
|
||||
table_index = 0x28 + int.from_bytes(common.read_exact(stream, 2), "big", signed=False)
|
||||
if debug:
|
||||
print(f"Backreference (3-byte form) to 0x{table_index:>02x}")
|
||||
literal = prev_literals[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
yield prev_literals[table_index]
|
||||
elif byte in range(0x23, 0x4b):
|
||||
# Backreference to a previous literal, 1-byte form.
|
||||
# This can reference literals with indices in range(0x28).
|
||||
table_index = byte - 0x23
|
||||
i += 1
|
||||
if debug:
|
||||
print(f"Backreference (1-byte form) to 0x{table_index:>02x}")
|
||||
literal = prev_literals[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
yield prev_literals[table_index]
|
||||
elif byte in range(0x4b, 0xfe):
|
||||
# Reference into a fixed table of two-byte literals.
|
||||
# All compressed resources use the same table.
|
||||
table_index = byte - 0x4b
|
||||
i += 1
|
||||
if debug:
|
||||
print(f"Fixed table reference to 0x{table_index:>02x}")
|
||||
entry = TABLE[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {entry}")
|
||||
decompressed += entry
|
||||
yield TABLE[table_index]
|
||||
elif byte == 0xfe:
|
||||
# Extended code, whose meaning is controlled by the following byte.
|
||||
|
||||
i += 1
|
||||
kind = data[i]
|
||||
(kind,) = common.read_exact(stream, 1)
|
||||
if debug:
|
||||
print(f"Extended code: 0x{kind:>02x}")
|
||||
i += 1
|
||||
|
||||
if kind == 0x00:
|
||||
# Compact representation of (part of) a segment loader jump table, as used in 'CODE' (0) resources.
|
||||
|
||||
if debug:
|
||||
print(f"Segment loader jump table entries")
|
||||
print("Segment loader jump table entries")
|
||||
|
||||
# All generated jump table entries have the same segment number.
|
||||
segment_number_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
segment_number_int = common.read_variable_length_integer(stream)
|
||||
if debug:
|
||||
print(f"\t-> segment number: {segment_number_int:#x}")
|
||||
|
||||
# The tail part of all jump table entries (i. e. everything except for the address).
|
||||
entry_tail = b"?<" + segment_number_int.to_bytes(2, "big", signed=True) + b"\xa9\xf0"
|
||||
if debug:
|
||||
print(f"\t-> tail of first entry: {entry_tail}")
|
||||
entry_tail = b"?<" + segment_number_int.to_bytes(2, "big", signed=False) + b"\xa9\xf0"
|
||||
# The tail is output once *without* an address in front, i. e. the first entry's address must be generated manually by a previous code.
|
||||
decompressed += entry_tail
|
||||
yield entry_tail
|
||||
|
||||
count, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
count = common.read_variable_length_integer(stream)
|
||||
if count <= 0:
|
||||
raise common.DecompressError(f"Jump table entry count must be greater than 0, not {count}")
|
||||
|
||||
# The second entry's address is stored explicitly.
|
||||
current_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
current_int = common.read_variable_length_integer(stream)
|
||||
if debug:
|
||||
print(f"-> address of second entry: {current_int:#x}")
|
||||
entry = current_int.to_bytes(2, "big", signed=False) + entry_tail
|
||||
if debug:
|
||||
print(f"-> second entry: {entry}")
|
||||
decompressed += entry
|
||||
print(f"\t-> address of second entry: {current_int:#x}")
|
||||
yield current_int.to_bytes(2, "big", signed=False) + entry_tail
|
||||
|
||||
for _ in range(1, count):
|
||||
# All further entries' addresses are stored as differences relative to the previous entry's address.
|
||||
diff, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
diff = common.read_variable_length_integer(stream)
|
||||
# For some reason, each difference is 6 higher than it should be.
|
||||
diff -= 6
|
||||
|
||||
@ -175,10 +142,7 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
current_int = (current_int + diff) & 0xffff
|
||||
if debug:
|
||||
print(f"\t-> difference {diff:#x}: {current_int:#x}")
|
||||
entry = current_int.to_bytes(2, "big", signed=False) + entry_tail
|
||||
if debug:
|
||||
print(f"\t-> {entry}")
|
||||
decompressed += entry
|
||||
yield current_int.to_bytes(2, "big", signed=False) + entry_tail
|
||||
elif kind in (0x02, 0x03):
|
||||
# Repeat 1 or 2 bytes a certain number of times.
|
||||
|
||||
@ -193,42 +157,36 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
print(f"Repeat {byte_count}-byte value")
|
||||
|
||||
# The byte(s) to repeat, stored as a variable-length integer. The value is treated as unsigned, i. e. the integer is never negative.
|
||||
to_repeat_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
to_repeat_int = common.read_variable_length_integer(stream)
|
||||
try:
|
||||
to_repeat = to_repeat_int.to_bytes(byte_count, "big", signed=False)
|
||||
except OverflowError:
|
||||
raise common.DecompressError(f"Value to repeat out of range for {byte_count}-byte repeat: {to_repeat_int:#x}")
|
||||
|
||||
count_m1, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
count = count_m1 + 1
|
||||
count = common.read_variable_length_integer(stream) + 1
|
||||
if count <= 0:
|
||||
raise common.DecompressError(f"Repeat count must be positive: {count}")
|
||||
|
||||
repeated = to_repeat * count
|
||||
if debug:
|
||||
print(f"\t-> {to_repeat} * {count}: {repeated}")
|
||||
decompressed += repeated
|
||||
print(f"\t-> {to_repeat!r} * {count}")
|
||||
yield to_repeat * count
|
||||
elif kind == 0x04:
|
||||
# A sequence of 16-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
|
||||
|
||||
if debug:
|
||||
print(f"Difference-encoded 16-bit integers")
|
||||
print("Difference-encoded 16-bit integers")
|
||||
|
||||
# The first integer is stored explicitly, as a signed value.
|
||||
initial_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
initial_int = common.read_variable_length_integer(stream)
|
||||
try:
|
||||
initial = initial_int.to_bytes(2, "big", signed=True)
|
||||
except OverflowError:
|
||||
raise common.DecompressError(f"Initial value out of range for 16-bit integer difference encoding: {initial_int:#x}")
|
||||
if debug:
|
||||
print(f"\t-> initial: {initial}")
|
||||
decompressed += initial
|
||||
print(f"\t-> initial: 0x{initial_int:>04x}")
|
||||
yield initial
|
||||
|
||||
count, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
count = common.read_variable_length_integer(stream)
|
||||
if count < 0:
|
||||
raise common.DecompressError(f"Count cannot be negative: {count}")
|
||||
|
||||
@ -237,64 +195,75 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
for _ in range(count):
|
||||
# The difference to the previous integer is stored as an 8-bit signed integer.
|
||||
# The usual variable-length integer format is *not* used here.
|
||||
diff = int.from_bytes(data[i:i+1], "big", signed=True)
|
||||
i += 1
|
||||
diff = int.from_bytes(common.read_exact(stream, 1), "big", signed=True)
|
||||
|
||||
# Simulate 16-bit integer wraparound.
|
||||
current_int = (current_int + diff) & 0xffff
|
||||
current = current_int.to_bytes(2, "big", signed=False)
|
||||
if debug:
|
||||
print(f"\t-> difference {diff:#x}: {current}")
|
||||
decompressed += current
|
||||
print(f"\t-> difference {diff:#x}: 0x{current_int:>04x}")
|
||||
yield current_int.to_bytes(2, "big", signed=False)
|
||||
elif kind == 0x06:
|
||||
# A sequence of 32-bit signed integers, with each integer encoded as a difference relative to the previous integer. The first integer is stored explicitly.
|
||||
|
||||
if debug:
|
||||
print(f"Difference-encoded 16-bit integers")
|
||||
print("Difference-encoded 32-bit integers")
|
||||
|
||||
# The first integer is stored explicitly, as a signed value.
|
||||
initial_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
initial_int = common.read_variable_length_integer(stream)
|
||||
try:
|
||||
initial = initial_int.to_bytes(4, "big", signed=True)
|
||||
except OverflowError:
|
||||
raise common.DecompressError(f"Initial value out of range for 32-bit integer difference encoding: {initial_int:#x}")
|
||||
if debug:
|
||||
print(f"\t-> initial: {initial}")
|
||||
decompressed += initial
|
||||
print(f"\t-> initial: 0x{initial_int:>08x}")
|
||||
yield initial
|
||||
|
||||
count, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
count = common.read_variable_length_integer(stream)
|
||||
assert count >= 0
|
||||
|
||||
# To make the following calculations simpler, the signed initial_int value is converted to unsigned.
|
||||
current_int = initial_int & 0xffffffff
|
||||
for _ in range(count):
|
||||
# The difference to the previous integer is stored as a variable-length integer, whose value may be negative.
|
||||
diff, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
diff = common.read_variable_length_integer(stream)
|
||||
|
||||
# Simulate 32-bit integer wraparound.
|
||||
current_int = (current_int + diff) & 0xffffffff
|
||||
current = current_int.to_bytes(4, "big", signed=False)
|
||||
if debug:
|
||||
print(f"\t-> difference {diff:#x}: {current}")
|
||||
decompressed += current
|
||||
print(f"\t-> difference {diff:#x}: 0x{current_int:>08x}")
|
||||
yield current_int.to_bytes(4, "big", signed=False)
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
|
||||
elif byte == 0xff:
|
||||
# End of data marker, always occurs exactly once as the last byte of the compressed data.
|
||||
if debug:
|
||||
print("End marker")
|
||||
if i != len(data) - 1:
|
||||
raise common.DecompressError(f"End marker reached at {i}, before the expected end of data at {len(data) - 1}")
|
||||
i += 1
|
||||
|
||||
# Check that there really is no more data left.
|
||||
extra = stream.read(1)
|
||||
if extra:
|
||||
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra!r})")
|
||||
break
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown tag byte: 0x{data[i]:>02x}")
|
||||
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
|
||||
|
||||
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (0)."""
|
||||
|
||||
if header_info.decompressed_length % 2 != 0 and len(decompressed) == header_info.decompressed_length + 1:
|
||||
# Special case: if the decompressed data length stored in the header is odd and one less than the length of the actual decompressed data, drop the last byte.
|
||||
# This is necessary because nearly all codes generate data in groups of 2 or 4 bytes, so it is basically impossible to represent data with an odd length using this compression format.
|
||||
decompressed = decompressed[:-1]
|
||||
|
||||
return decompressed
|
||||
decompressed_length = 0
|
||||
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
|
||||
if debug:
|
||||
print(f"\t-> {chunk!r}")
|
||||
|
||||
if header_info.decompressed_length % 2 != 0 and decompressed_length + len(chunk) == header_info.decompressed_length + 1:
|
||||
# Special case: if the decompressed data length stored in the header is odd and one less than the length of the actual decompressed data, drop the last byte.
|
||||
# This is necessary because nearly all codes generate data in groups of 2 or 4 bytes, so it is basically impossible to represent data with an odd length using this compression format.
|
||||
decompressed_length += len(chunk) - 1
|
||||
yield chunk[:-1]
|
||||
else:
|
||||
decompressed_length += len(chunk)
|
||||
yield chunk
|
||||
|
||||
if debug:
|
||||
print(f"Decompressed {decompressed_length:#x} bytes so far")
|
||||
|
@ -21,99 +21,75 @@ TABLE = [TABLE_DATA[i:i + 2] for i in range(0, len(TABLE_DATA), 2)]
|
||||
assert len(TABLE) == len(range(0xd5, 0xfe))
|
||||
|
||||
|
||||
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (1)."""
|
||||
def decompress_stream_inner(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Internal helper function, implements the main decompression algorithm. Only called from decompress_stream, which performs some extra checks and debug logging."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedApplicationHeaderInfo):
|
||||
if not isinstance(header_info, common.CompressedType8HeaderInfo):
|
||||
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
|
||||
|
||||
prev_literals: typing.List[bytes] = []
|
||||
decompressed = b""
|
||||
|
||||
i = 0
|
||||
|
||||
while i < len(data):
|
||||
byte = data[i]
|
||||
while True: # Loop is terminated when the EOF marker (0xff) is encountered
|
||||
(byte,) = common.read_exact(stream, 1)
|
||||
if debug:
|
||||
print(f"Tag byte 0x{byte:>02x}, at 0x{i:x}, decompressing to 0x{len(decompressed):x}")
|
||||
print(f"Tag byte 0x{byte:>02x}")
|
||||
|
||||
if byte in range(0x00, 0x20):
|
||||
# Literal byte sequence, 1-byte header.
|
||||
# The length of the literal data is stored in the low nibble of the tag byte.
|
||||
count = (byte >> 0 & 0xf) + 1
|
||||
begin = i + 1
|
||||
end = begin + count
|
||||
# Controls whether or not the literal is stored so that it can be referenced again later.
|
||||
do_store = byte >= 0x10
|
||||
literal = data[begin:end]
|
||||
literal = common.read_exact(stream, count)
|
||||
if debug:
|
||||
print(f"Literal (1-byte header, storing: {do_store})")
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
if do_store:
|
||||
if debug:
|
||||
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
|
||||
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
|
||||
prev_literals.append(literal)
|
||||
i = end
|
||||
yield literal
|
||||
elif byte in range(0x20, 0xd0):
|
||||
# Backreference to a previous literal, 1-byte form.
|
||||
# This can reference literals with indices in range(0xb0).
|
||||
table_index = byte - 0x20
|
||||
i += 1
|
||||
if debug:
|
||||
print(f"Backreference (1-byte form) to 0x{table_index:>02x}")
|
||||
literal = prev_literals[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
yield prev_literals[table_index]
|
||||
elif byte in (0xd0, 0xd1):
|
||||
# Literal byte sequence, 2-byte header.
|
||||
# The length of the literal data is stored in the following byte.
|
||||
count = data[i+1]
|
||||
begin = i + 2
|
||||
end = begin + count
|
||||
(count,) = common.read_exact(stream, 1)
|
||||
# Controls whether or not the literal is stored so that it can be referenced again later.
|
||||
do_store = byte == 0xd1
|
||||
literal = data[begin:end]
|
||||
literal = common.read_exact(stream, count)
|
||||
if debug:
|
||||
print(f"Literal (2-byte header, storing: {do_store})")
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
if do_store:
|
||||
if debug:
|
||||
print(f"\t-> stored as literal number 0x{len(prev_literals):x}")
|
||||
print(f"\t-> storing as literal number 0x{len(prev_literals):x}")
|
||||
prev_literals.append(literal)
|
||||
i = end
|
||||
yield literal
|
||||
elif byte == 0xd2:
|
||||
# Backreference to a previous literal, 2-byte form.
|
||||
# This can reference literals with indices in range(0xb0, 0x1b0).
|
||||
table_index = data[i+1] + 0xb0
|
||||
i += 2
|
||||
(next_byte,) = common.read_exact(stream, 1)
|
||||
table_index = next_byte + 0xb0
|
||||
if debug:
|
||||
print(f"Backreference (2-byte form) to 0x{table_index:>02x}")
|
||||
literal = prev_literals[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {literal}")
|
||||
decompressed += literal
|
||||
yield prev_literals[table_index]
|
||||
elif byte in range(0xd5, 0xfe):
|
||||
# Reference into a fixed table of two-byte literals.
|
||||
# All compressed resources use the same table.
|
||||
table_index = byte - 0xd5
|
||||
i += 1
|
||||
if debug:
|
||||
print(f"Fixed table reference to 0x{table_index:>02x}")
|
||||
entry = TABLE[table_index]
|
||||
if debug:
|
||||
print(f"\t-> {entry}")
|
||||
decompressed += entry
|
||||
yield TABLE[table_index]
|
||||
elif byte == 0xfe:
|
||||
# Extended code, whose meaning is controlled by the following byte.
|
||||
|
||||
i += 1
|
||||
kind = data[i]
|
||||
(kind,) = common.read_exact(stream, 1)
|
||||
if debug:
|
||||
print(f"Extended code: 0x{kind:>02x}")
|
||||
i += 1
|
||||
|
||||
if kind == 0x02:
|
||||
# Repeat 1 byte a certain number of times.
|
||||
@ -124,33 +100,45 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
print(f"Repeat {byte_count}-byte value")
|
||||
|
||||
# The byte(s) to repeat, stored as a variable-length integer. The value is treated as unsigned, i. e. the integer is never negative.
|
||||
to_repeat_int, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
to_repeat_int = common.read_variable_length_integer(stream)
|
||||
try:
|
||||
to_repeat = to_repeat_int.to_bytes(byte_count, "big", signed=False)
|
||||
except OverflowError:
|
||||
raise common.DecompressError(f"Value to repeat out of range for {byte_count}-byte repeat: {to_repeat_int:#x}")
|
||||
|
||||
count_m1, length = common._read_variable_length_integer(data, i)
|
||||
i += length
|
||||
count = count_m1 + 1
|
||||
count = common.read_variable_length_integer(stream) + 1
|
||||
if count <= 0:
|
||||
raise common.DecompressError(f"Repeat count must be positive: {count}")
|
||||
|
||||
repeated = to_repeat * count
|
||||
if debug:
|
||||
print(f"\t-> {to_repeat} * {count}: {repeated}")
|
||||
decompressed += repeated
|
||||
print(f"\t-> {to_repeat!r} * {count}")
|
||||
yield to_repeat * count
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown extended code: 0x{kind:>02x}")
|
||||
elif byte == 0xff:
|
||||
# End of data marker, always occurs exactly once as the last byte of the compressed data.
|
||||
if debug:
|
||||
print("End marker")
|
||||
if i != len(data) - 1:
|
||||
raise common.DecompressError(f"End marker reached at {i}, before the expected end of data at {len(data) - 1}")
|
||||
i += 1
|
||||
|
||||
# Check that there really is no more data left.
|
||||
extra = stream.read(1)
|
||||
if extra:
|
||||
raise common.DecompressError(f"Extra data encountered after end of data marker (first extra byte: {extra!r})")
|
||||
break
|
||||
else:
|
||||
raise common.DecompressError(f"Unknown tag byte: 0x{data[i]:>02x}")
|
||||
raise common.DecompressError(f"Unknown tag byte: 0x{byte:>02x}")
|
||||
|
||||
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (1)."""
|
||||
|
||||
return decompressed
|
||||
decompressed_length = 0
|
||||
for chunk in decompress_stream_inner(header_info, stream, debug=debug):
|
||||
if debug:
|
||||
print(f"\t-> {chunk!r}")
|
||||
|
||||
decompressed_length += len(chunk)
|
||||
yield chunk
|
||||
|
||||
if debug:
|
||||
print(f"Decompressed {decompressed_length:#x} bytes so far")
|
||||
|
@ -73,68 +73,70 @@ def _split_bits(i: int) -> typing.Tuple[bool, bool, bool, bool, bool, bool, bool
|
||||
)
|
||||
|
||||
|
||||
def _decompress_system_untagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
|
||||
parts = []
|
||||
i = 0
|
||||
while i < len(data):
|
||||
if i == len(data) - 1 and decompressed_length % 2 != 0:
|
||||
def _decompress_untagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
while True: # Loop is terminated when EOF is reached.
|
||||
table_index_data = stream.read(1)
|
||||
if not table_index_data:
|
||||
# End of compressed data.
|
||||
break
|
||||
elif not stream.peek(1) and decompressed_length % 2 != 0:
|
||||
# Special case: if we are at the last byte of the compressed data, and the decompressed data has an odd length, the last byte is a single literal byte, and not a table reference.
|
||||
if debug:
|
||||
print(f"Last byte: {data[-1:]}")
|
||||
parts.append(data[-1:])
|
||||
print(f"Last byte: {table_index_data!r}")
|
||||
yield table_index_data
|
||||
break
|
||||
|
||||
# Compressed data is untagged, every byte is a table reference.
|
||||
(table_index,) = table_index_data
|
||||
if debug:
|
||||
print(f"Reference: {data[i]} -> {table[data[i]]}")
|
||||
parts.append(table[data[i]])
|
||||
i += 1
|
||||
|
||||
return b"".join(parts)
|
||||
print(f"Reference: {table_index} -> {table[table_index]!r}")
|
||||
yield table[table_index]
|
||||
|
||||
def _decompress_system_tagged(data: bytes, decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool=False) -> bytes:
|
||||
parts = []
|
||||
i = 0
|
||||
while i < len(data):
|
||||
if i == len(data) - 1 and decompressed_length % 2 != 0:
|
||||
|
||||
def _decompress_tagged(stream: "common.PeekableIO", decompressed_length: int, table: typing.Sequence[bytes], *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
while True: # Loop is terminated when EOF is reached.
|
||||
tag_data = stream.read(1)
|
||||
if not tag_data:
|
||||
# End of compressed data.
|
||||
break
|
||||
elif not stream.peek(1) and decompressed_length % 2 != 0:
|
||||
# Special case: if we are at the last byte of the compressed data, and the decompressed data has an odd length, the last byte is a single literal byte, and not a tag or a table reference.
|
||||
if debug:
|
||||
print(f"Last byte: {data[-1:]}")
|
||||
parts.append(data[-1:])
|
||||
print(f"Last byte: {tag_data!r}")
|
||||
yield tag_data
|
||||
break
|
||||
|
||||
# Compressed data is tagged, each tag byte is followed by 8 table references and/or literals.
|
||||
tag = data[i]
|
||||
(tag,) = tag_data
|
||||
if debug:
|
||||
print(f"Tag: 0b{tag:>08b}")
|
||||
i += 1
|
||||
for is_ref in _split_bits(tag):
|
||||
if is_ref:
|
||||
# This is a table reference (a single byte that is an index into the table).
|
||||
table_index_data = stream.read(1)
|
||||
if not table_index_data:
|
||||
# End of compressed data.
|
||||
break
|
||||
(table_index,) = table_index_data
|
||||
if debug:
|
||||
print(f"Reference: {data[i]} -> {table[data[i]]}")
|
||||
parts.append(table[data[i]])
|
||||
i += 1
|
||||
print(f"Reference: {table_index} -> {table[table_index]!r}")
|
||||
yield table[table_index]
|
||||
else:
|
||||
# This is a literal (two uncompressed bytes that are literally copied into the output).
|
||||
# Note: if i == len(data)-1, the literal is actually only a single byte long.
|
||||
# This case is handled automatically - the slice extends one byte past the end of the data, and only one byte is returned.
|
||||
literal = stream.read(2)
|
||||
if not literal:
|
||||
# End of compressed data.
|
||||
break
|
||||
# Note: the literal may be only a single byte long if it is located exactly at EOF. This is intended and expected - the 1-byte literal is yielded normally, and on the next iteration, decompression is terminated as EOF is detected.
|
||||
if debug:
|
||||
print(f"Literal: {data[i:i+2]}")
|
||||
parts.append(data[i:i + 2])
|
||||
i += 2
|
||||
|
||||
# If the end of the compressed data is reached in the middle of a chunk, all further tag bits are ignored (they should be zero) and decompression ends.
|
||||
if i >= len(data):
|
||||
break
|
||||
|
||||
return b"".join(parts)
|
||||
print(f"Literal: {literal!r}")
|
||||
yield literal
|
||||
|
||||
|
||||
def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug: bool=False) -> bytes:
|
||||
def decompress_stream(header_info: common.CompressedHeaderInfo, stream: typing.BinaryIO, *, debug: bool = False) -> typing.Iterator[bytes]:
|
||||
"""Decompress compressed data in the format used by 'dcmp' (2)."""
|
||||
|
||||
if not isinstance(header_info, common.CompressedSystemHeaderInfo):
|
||||
if not isinstance(header_info, common.CompressedType9HeaderInfo):
|
||||
raise common.DecompressError(f"Incorrect header type: {type(header_info).__qualname__}")
|
||||
|
||||
unknown, table_count_m1, flags_raw = STRUCT_PARAMETERS.unpack(header_info.parameters)
|
||||
@ -155,24 +157,21 @@ def decompress(header_info: common.CompressedHeaderInfo, data: bytes, *, debug:
|
||||
print(f"Flags: {flags}")
|
||||
|
||||
if ParameterFlags.CUSTOM_TABLE in flags:
|
||||
table_start = 0
|
||||
data_start = table_start + table_count * 2
|
||||
table = []
|
||||
for i in range(table_start, data_start, 2):
|
||||
table.append(data[i:i + 2])
|
||||
for _ in range(table_count):
|
||||
table.append(common.read_exact(stream, 2))
|
||||
if debug:
|
||||
print(f"Using custom table: {table}")
|
||||
else:
|
||||
if table_count_m1 != 0:
|
||||
raise common.DecompressError(f"table_count_m1 field is {table_count_m1}, but must be zero when the default table is used")
|
||||
table = DEFAULT_TABLE
|
||||
data_start = 0
|
||||
if debug:
|
||||
print("Using default table")
|
||||
|
||||
if ParameterFlags.TAGGED in flags:
|
||||
decompress_func = _decompress_system_tagged
|
||||
decompress_func = _decompress_tagged
|
||||
else:
|
||||
decompress_func = _decompress_system_untagged
|
||||
decompress_func = _decompress_untagged
|
||||
|
||||
return decompress_func(data[data_start:], header_info.decompressed_length, table, debug=debug)
|
||||
yield from decompress_func(common.make_peekable(stream), header_info.decompressed_length, table, debug=debug)
|
||||
|
0
rsrcfork/py.typed
Normal file
32
setup.cfg
@ -18,8 +18,10 @@ classifiers =
|
||||
Programming Language :: Python :: 3 :: Only
|
||||
Programming Language :: Python :: 3.6
|
||||
Programming Language :: Python :: 3.7
|
||||
Programming Language :: Python :: 3.8
|
||||
license = MIT
|
||||
license_file = LICENSE
|
||||
license_files =
|
||||
LICENSE
|
||||
description = A pure Python, cross-platform library/tool for reading Macintosh resource data, as stored in resource forks and ``.rsrc`` files
|
||||
long_description = file: README.rst
|
||||
long_description_content_type = text/x-rst
|
||||
@ -33,11 +35,16 @@ keywords =
|
||||
macos
|
||||
|
||||
[options]
|
||||
setup_requires =
|
||||
setuptools>=39.2.0
|
||||
# mypy can only find type hints in the package if zip_safe is set to False,
|
||||
# see https://mypy.readthedocs.io/en/latest/installed_packages.html#making-pep-561-compatible-packages
|
||||
zip_safe = False
|
||||
python_requires = >=3.6
|
||||
packages = find:
|
||||
|
||||
[options.package_data]
|
||||
rsrcfork =
|
||||
py.typed
|
||||
|
||||
[options.packages.find]
|
||||
include =
|
||||
rsrcfork
|
||||
@ -47,6 +54,25 @@ include =
|
||||
console_scripts =
|
||||
rsrcfork = rsrcfork.__main__:main
|
||||
|
||||
[flake8]
|
||||
extend-exclude =
|
||||
.mypy_cache/,
|
||||
build/,
|
||||
dist/,
|
||||
|
||||
# The following issues are ignored because they do not match our code style:
|
||||
ignore =
|
||||
E226, # missing whitespace around arithmetic operator
|
||||
E261, # at least two spaces before inline comment
|
||||
E501, # line too long
|
||||
W293, # blank line contains whitespace
|
||||
W503, # line break before binary operator
|
||||
|
||||
# flake8-tabs configuration
|
||||
use-flake8-tabs = true
|
||||
blank-lines-indent = always
|
||||
indent-tabs-def = 1
|
||||
|
||||
[mypy]
|
||||
files=rsrcfork/**/*.py
|
||||
python_version = 3.6
|
||||
|
BIN
tests/data/compress/compressed/Finder Help.rsrc
Normal file
After Width: | Height: | Size: 35 KiB |
BIN
tests/data/compress/compressed/Finder.rsrc
Normal file
After Width: | Height: | Size: 355 KiB |
BIN
tests/data/compress/compressed/Install.rsrc
Normal file
After Width: | Height: | Size: 127 KiB |
BIN
tests/data/compress/compressed/System.rsrc
Normal file
After Width: | Height: | Size: 884 KiB |
BIN
tests/data/compress/uncompressed/Finder Help.rsrc
Normal file
After Width: | Height: | Size: 51 KiB |
BIN
tests/data/compress/uncompressed/Finder.rsrc
Normal file
After Width: | Height: | Size: 478 KiB |
BIN
tests/data/compress/uncompressed/Install.rsrc
Normal file
After Width: | Height: | Size: 159 KiB |
BIN
tests/data/compress/uncompressed/System.rsrc
Normal file
After Width: | Height: | Size: 1.1 MiB |
BIN
tests/data/empty.rsrc
Normal file
After Width: | Height: | Size: 286 B |
BIN
tests/data/testfile.rsrc
Normal file
After Width: | Height: | Size: 558 B |
BIN
tests/data/unicode.textClipping.rsrc
Normal file
After Width: | Height: | Size: 602 B |
290
tests/test_rsrcfork.py
Normal file
@ -0,0 +1,290 @@
|
||||
import collections
|
||||
import io
|
||||
import pathlib
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import typing
|
||||
import unittest
|
||||
|
||||
import rsrcfork
|
||||
|
||||
RESOURCE_FORKS_SUPPORTED = sys.platform.startswith("darwin")
|
||||
RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE = "Resource forks are only supported on Mac"
|
||||
|
||||
DATA_DIR = pathlib.Path(__file__).parent / "data"
|
||||
EMPTY_RSRC_FILE = DATA_DIR / "empty.rsrc"
|
||||
TEXTCLIPPING_RSRC_FILE = DATA_DIR / "unicode.textClipping.rsrc"
|
||||
TESTFILE_RSRC_FILE = DATA_DIR / "testfile.rsrc"
|
||||
|
||||
COMPRESS_DATA_DIR = DATA_DIR / "compress"
|
||||
COMPRESSED_DIR = COMPRESS_DATA_DIR / "compressed"
|
||||
UNCOMPRESSED_DIR = COMPRESS_DATA_DIR / "uncompressed"
|
||||
COMPRESS_RSRC_FILE_NAMES = [
|
||||
"Finder.rsrc",
|
||||
"Finder Help.rsrc",
|
||||
# "Install.rsrc", # Commented out for performance - this file contains a lot of small resources.
|
||||
"System.rsrc",
|
||||
]
|
||||
|
||||
|
||||
def make_pascal_string(s):
|
||||
return bytes([len(s)]) + s
|
||||
|
||||
|
||||
UNICODE_TEXT = "Here is some text, including Üñïçø∂é!"
|
||||
DRAG_DATA = (
|
||||
b"\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03"
|
||||
b"utxt\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"utf8\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"TEXT\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
)
|
||||
TEXTCLIPPING_RESOURCES = collections.OrderedDict([
|
||||
(b"utxt", collections.OrderedDict([
|
||||
(256, UNICODE_TEXT.encode("utf-16-be")),
|
||||
])),
|
||||
(b"utf8", collections.OrderedDict([
|
||||
(256, UNICODE_TEXT.encode("utf-8")),
|
||||
])),
|
||||
(b"TEXT", collections.OrderedDict([
|
||||
(256, UNICODE_TEXT.encode("macroman")),
|
||||
])),
|
||||
(b"drag", collections.OrderedDict([
|
||||
(128, DRAG_DATA),
|
||||
]))
|
||||
])
|
||||
|
||||
TESTFILE_HEADER_SYSTEM_DATA = (
|
||||
b"\xa7F$\x08 <\x00\x00\xab\x03\xa7F <\x00\x00"
|
||||
b"\x01\x00\xb4\x88f\x06`\np\x00`\x06 <\x00\x00"
|
||||
b"\x08testfile\x00\x02\x00\x02\x00rs"
|
||||
b"rcRSED\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x02\x00rsrcRSED\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\xdaIp~\x00\x00\x00\x00\x00\x00\x02.\xfe\x84"
|
||||
)
|
||||
TESTFILE_HEADER_APPLICATION_DATA = b"This is the application-specific header data section. Apparently I can write whatever nonsense I want here. A few more bytes...."
|
||||
TESTFILE_RESOURCES = collections.OrderedDict([
|
||||
(b"STR ", collections.OrderedDict([
|
||||
(128, (
|
||||
None, rsrcfork.ResourceAttrs(0),
|
||||
make_pascal_string(b"The String, without name or attributes"),
|
||||
)),
|
||||
(129, (
|
||||
b"The Name", rsrcfork.ResourceAttrs(0),
|
||||
make_pascal_string(b"The String, with name and no attributes"),
|
||||
)),
|
||||
(130, (
|
||||
None, rsrcfork.ResourceAttrs.resProtected | rsrcfork.ResourceAttrs.resPreload,
|
||||
make_pascal_string(b"The String, without name but with attributes"),
|
||||
)),
|
||||
(131, (
|
||||
b"The Name with Attributes", rsrcfork.ResourceAttrs.resSysHeap,
|
||||
make_pascal_string(b"The String, with both name and attributes"),
|
||||
)),
|
||||
])),
|
||||
])
|
||||
|
||||
|
||||
class UnseekableStreamWrapper(io.BufferedIOBase):
|
||||
_wrapped: typing.BinaryIO
|
||||
|
||||
def __init__(self, wrapped: typing.BinaryIO) -> None:
|
||||
super().__init__()
|
||||
|
||||
self._wrapped = wrapped
|
||||
|
||||
def read(self, size: typing.Optional[int] = -1) -> bytes:
|
||||
return self._wrapped.read(size)
|
||||
|
||||
|
||||
def open_resource_fork(path: pathlib.Path, mode: str) -> typing.BinaryIO:
|
||||
return (path / "..namedfork" / "rsrc").open(mode)
|
||||
|
||||
|
||||
class ResourceFileReadTests(unittest.TestCase):
|
||||
def test_empty(self) -> None:
|
||||
with rsrcfork.open(EMPTY_RSRC_FILE, fork="data") as rf:
|
||||
self.assertEqual(rf.header_system_data, bytes(112))
|
||||
self.assertEqual(rf.header_application_data, bytes(128))
|
||||
self.assertEqual(rf.file_attributes, rsrcfork.ResourceFileAttrs(0))
|
||||
self.assertEqual(list(rf), [])
|
||||
|
||||
def internal_test_textclipping(self, rf: rsrcfork.ResourceFile) -> None:
|
||||
self.assertEqual(rf.header_system_data, bytes(112))
|
||||
self.assertEqual(rf.header_application_data, bytes(128))
|
||||
self.assertEqual(rf.file_attributes, rsrcfork.ResourceFileAttrs(0))
|
||||
self.assertEqual(list(rf), list(TEXTCLIPPING_RESOURCES))
|
||||
|
||||
for (actual_type, actual_reses), (expected_type, expected_reses) in zip(rf.items(), TEXTCLIPPING_RESOURCES.items()):
|
||||
with self.subTest(type=expected_type):
|
||||
self.assertEqual(actual_type, expected_type)
|
||||
self.assertEqual(list(actual_reses), list(expected_reses))
|
||||
|
||||
for (actual_id, actual_res), (expected_id, expected_data) in zip(actual_reses.items(), expected_reses.items()):
|
||||
with self.subTest(id=expected_id):
|
||||
self.assertEqual(actual_res.type, expected_type)
|
||||
self.assertEqual(actual_id, expected_id)
|
||||
self.assertEqual(actual_res.id, expected_id)
|
||||
self.assertEqual(actual_res.name, None)
|
||||
self.assertEqual(actual_res.attributes, rsrcfork.ResourceAttrs(0))
|
||||
self.assertEqual(actual_res.data, expected_data)
|
||||
self.assertEqual(actual_res.compressed_info, None)
|
||||
|
||||
def test_textclipping_seekable_stream(self) -> None:
|
||||
with TEXTCLIPPING_RSRC_FILE.open("rb") as f:
|
||||
with rsrcfork.ResourceFile(f) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
def test_textclipping_unseekable_stream(self) -> None:
|
||||
with TEXTCLIPPING_RSRC_FILE.open("rb") as f:
|
||||
with UnseekableStreamWrapper(f) as usf:
|
||||
with rsrcfork.ResourceFile(usf) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
def test_textclipping_path_data_fork(self) -> None:
|
||||
with rsrcfork.open(TEXTCLIPPING_RSRC_FILE, fork="data") as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
@unittest.skipUnless(RESOURCE_FORKS_SUPPORTED, RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE)
|
||||
def test_textclipping_path_resource_fork(self) -> None:
|
||||
with tempfile.NamedTemporaryFile() as tempf:
|
||||
with TEXTCLIPPING_RSRC_FILE.open("rb") as dataf:
|
||||
with open_resource_fork(pathlib.Path(tempf.name), "wb") as rsrcf:
|
||||
shutil.copyfileobj(dataf, rsrcf)
|
||||
|
||||
with rsrcfork.open(tempf.name, fork="rsrc") as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
@unittest.skipUnless(RESOURCE_FORKS_SUPPORTED, RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE)
|
||||
def test_textclipping_path_auto_resource_fork(self) -> None:
|
||||
with tempfile.NamedTemporaryFile() as temp_data_fork:
|
||||
with TEXTCLIPPING_RSRC_FILE.open("rb") as source_file:
|
||||
with open_resource_fork(pathlib.Path(temp_data_fork.name), "wb") as temp_rsrc_fork:
|
||||
shutil.copyfileobj(source_file, temp_rsrc_fork)
|
||||
|
||||
with self.subTest(data_fork="empty"):
|
||||
# Resource fork is selected when data fork is empty.
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
with self.subTest(data_fork="non-resource data"):
|
||||
# Resource fork is selected when data fork contains non-resource data.
|
||||
|
||||
temp_data_fork.write(b"This is the file's data fork. It should not be read, as the file has a resource fork.")
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
with self.subTest(data_fork="valid resource data"):
|
||||
# Resource fork is selected even when data fork contains valid resource data.
|
||||
|
||||
with EMPTY_RSRC_FILE.open("rb") as source_file:
|
||||
shutil.copyfileobj(source_file, temp_data_fork)
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
@unittest.skipUnless(RESOURCE_FORKS_SUPPORTED, RESOURCE_FORKS_NOT_SUPPORTED_MESSAGE)
|
||||
def test_textclipping_path_auto_data_fork(self) -> None:
|
||||
with tempfile.NamedTemporaryFile() as temp_data_fork:
|
||||
with TEXTCLIPPING_RSRC_FILE.open("rb") as source_file:
|
||||
shutil.copyfileobj(source_file, temp_data_fork)
|
||||
# Have to flush the temporary file manually so that the data is visible to the other reads below.
|
||||
# Normally this happens automatically as part of the close method, but that would also delete the temporary file, which we don't want.
|
||||
temp_data_fork.flush()
|
||||
|
||||
with self.subTest(rsrc_fork="nonexistant"):
|
||||
# Data fork is selected when resource fork does not exist.
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
with self.subTest(rsrc_fork="empty"):
|
||||
# Data fork is selected when resource fork exists, but is empty.
|
||||
|
||||
with open_resource_fork(pathlib.Path(temp_data_fork.name), "wb") as temp_rsrc_fork:
|
||||
temp_rsrc_fork.write(b"")
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
with self.subTest(rsrc_fork="non-resource data"):
|
||||
# Data fork is selected when resource fork contains non-resource data.
|
||||
|
||||
with open_resource_fork(pathlib.Path(temp_data_fork.name), "wb") as temp_rsrc_fork:
|
||||
temp_rsrc_fork.write(b"This is the file's resource fork. It contains junk, so it should be ignored in favor of the data fork.")
|
||||
|
||||
with rsrcfork.open(temp_data_fork.name) as rf:
|
||||
self.internal_test_textclipping(rf)
|
||||
|
||||
def test_testfile(self) -> None:
|
||||
with rsrcfork.open(TESTFILE_RSRC_FILE, fork="data") as rf:
|
||||
self.assertEqual(rf.header_system_data, TESTFILE_HEADER_SYSTEM_DATA)
|
||||
self.assertEqual(rf.header_application_data, TESTFILE_HEADER_APPLICATION_DATA)
|
||||
self.assertEqual(rf.file_attributes, rsrcfork.ResourceFileAttrs.mapPrinterDriverMultiFinderCompatible | rsrcfork.ResourceFileAttrs.mapReadOnly)
|
||||
self.assertEqual(list(rf), list(TESTFILE_RESOURCES))
|
||||
|
||||
for (actual_type, actual_reses), (expected_type, expected_reses) in zip(rf.items(), TESTFILE_RESOURCES.items()):
|
||||
with self.subTest(type=expected_type):
|
||||
self.assertEqual(actual_type, expected_type)
|
||||
self.assertEqual(list(actual_reses), list(expected_reses))
|
||||
|
||||
for (actual_id, actual_res), (expected_id, (expected_name, expected_attrs, expected_data)) in zip(actual_reses.items(), expected_reses.items()):
|
||||
with self.subTest(id=expected_id):
|
||||
self.assertEqual(actual_res.type, expected_type)
|
||||
self.assertEqual(actual_id, expected_id)
|
||||
self.assertEqual(actual_res.id, expected_id)
|
||||
self.assertEqual(actual_res.name, expected_name)
|
||||
self.assertEqual(actual_res.attributes, expected_attrs)
|
||||
self.assertEqual(actual_res.data, expected_data)
|
||||
self.assertEqual(actual_res.compressed_info, None)
|
||||
|
||||
def test_compress_compare(self) -> None:
|
||||
# This test goes through pairs of resource files: one original file with both compressed and uncompressed resources, and one modified file where all compressed resources have been decompressed (using ResEdit on System 7.5.5).
|
||||
# It checks that the rsrcfork library performs automatic decompression on the compressed resources, so that the compressed resource file appears to the user like the uncompressed resource file (ignoring resource order, which was lost during decompression using ResEdit).
|
||||
|
||||
for name in COMPRESS_RSRC_FILE_NAMES:
|
||||
with self.subTest(name=name):
|
||||
with rsrcfork.open(COMPRESSED_DIR / name, fork="data") as compressed_rf, rsrcfork.open(UNCOMPRESSED_DIR / name, fork="data") as uncompressed_rf:
|
||||
self.assertEqual(sorted(compressed_rf), sorted(uncompressed_rf))
|
||||
|
||||
for (compressed_type, compressed_reses), (uncompressed_type, uncompressed_reses) in zip(sorted(compressed_rf.items()), sorted(uncompressed_rf.items())):
|
||||
with self.subTest(type=compressed_type):
|
||||
self.assertEqual(compressed_type, uncompressed_type)
|
||||
self.assertEqual(sorted(compressed_reses), sorted(uncompressed_reses))
|
||||
|
||||
for (compressed_id, compressed_res), (uncompressed_id, uncompressed_res) in zip(sorted(compressed_reses.items()), sorted(uncompressed_reses.items())):
|
||||
with self.subTest(id=compressed_id):
|
||||
# The metadata of the compressed and uncompressed resources must match.
|
||||
self.assertEqual(compressed_res.type, uncompressed_res.type)
|
||||
self.assertEqual(compressed_id, uncompressed_id)
|
||||
self.assertEqual(compressed_res.id, compressed_id)
|
||||
self.assertEqual(compressed_res.id, uncompressed_res.id)
|
||||
self.assertEqual(compressed_res.name, uncompressed_res.name)
|
||||
self.assertEqual(compressed_res.attributes & ~rsrcfork.ResourceAttrs.resCompressed, uncompressed_res.attributes)
|
||||
|
||||
# The uncompressed resource really has to be not compressed.
|
||||
self.assertNotIn(rsrcfork.ResourceAttrs.resCompressed, uncompressed_res.attributes)
|
||||
self.assertEqual(uncompressed_res.compressed_info, None)
|
||||
self.assertEqual(uncompressed_res.data, uncompressed_res.data_raw)
|
||||
self.assertEqual(uncompressed_res.length, uncompressed_res.length_raw)
|
||||
|
||||
# The compressed resource's (automatically decompressed) data must match the uncompressed data.
|
||||
self.assertEqual(compressed_res.data, uncompressed_res.data)
|
||||
self.assertEqual(compressed_res.length, uncompressed_res.length)
|
||||
|
||||
if rsrcfork.ResourceAttrs.resCompressed in compressed_res.attributes:
|
||||
# Resources with the compressed attribute must expose correct compression metadata.
|
||||
self.assertNotEqual(compressed_res.compressed_info, None)
|
||||
self.assertEqual(compressed_res.compressed_info.decompressed_length, compressed_res.length)
|
||||
else:
|
||||
# Some resources in the "compressed" files are not actually compressed, in which case there is no compression metadata.
|
||||
self.assertEqual(compressed_res.compressed_info, None)
|
||||
self.assertEqual(compressed_res.data, compressed_res.data_raw)
|
||||
self.assertEqual(compressed_res.length, compressed_res.length_raw)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
27
tox.ini
Normal file
@ -0,0 +1,27 @@
|
||||
[tox]
|
||||
# When adding a new Python version here, please also update the list of Python versions called by the GitHub Actions workflow (.github/workflows/ci.yml).
|
||||
envlist = py{36,37,38},flake8,mypy,package
|
||||
|
||||
[testenv]
|
||||
commands = python -m unittest discover --start-directory ./tests
|
||||
|
||||
[testenv:flake8]
|
||||
deps =
|
||||
flake8 >= 3.8.0
|
||||
flake8-bugbear
|
||||
flake8-tabs
|
||||
commands = flake8
|
||||
|
||||
[testenv:mypy]
|
||||
deps =
|
||||
mypy
|
||||
commands = mypy
|
||||
|
||||
[testenv:package]
|
||||
deps =
|
||||
twine
|
||||
wheel >= 0.32.0
|
||||
|
||||
commands =
|
||||
python setup.py sdist bdist_wheel
|
||||
twine check dist/*
|