Improve cfmtool quite a bit

And the patch script to match
This commit is contained in:
Elliot Nunn 2019-10-16 10:41:53 +08:00
parent 707467529f
commit b62a504685
2 changed files with 272 additions and 78 deletions

View File

@ -32,6 +32,7 @@ import struct
import os
import re
import textwrap
import functools
from os import path
from ast import literal_eval as eval
@ -44,10 +45,6 @@ def dump(from_binary_or_path, to_path):
The first argument can be a bytes-like object, or a path to read from.
def write_txt(name, text):
with open(path.join(to_path, name + '.txt'), 'w') as f:
f.write(text + '\n')
from_binary = from_binary_or_path
@ -62,8 +59,8 @@ def dump(from_binary_or_path, to_path):
dateTimeStamp, *versions = struct.unpack_from('>4L', from_binary, 16)
write_txt('date', format_mac_date(dateTimeStamp))
write_txt('version', repr(dict(zip(('oldDefVersion', 'oldImpVersion', 'currentVersion'), versions))))
write_txt(format_mac_date(dateTimeStamp), to_path, 'date.txt')
write_txt(repr(dict(zip(('oldDefVersion', 'oldImpVersion', 'currentVersion'), versions))), to_path, 'version.txt')
section_list = []
section_count, = struct.unpack_from('>H', from_binary, 32)
@ -139,23 +136,19 @@ def dump(from_binary_or_path, to_path):
if sec['unpackedLength']:
zeropad = sec['totalLength'] - len(unpacked); unpacked += bytes(zeropad)
with open(path.join(to_path, sec['filename']), 'wb') as f: f.write(unpacked)
write_bin(unpacked, to_path, sec['filename'])
if packed is not None:
with open(path.join(to_path, 'packed-' + sec['filename']), 'wb') as f: f.write(packed)
write_bin(packed, to_path, 'packed-' + sec['filename'])
del sec['totalLength']
del sec['unpackedLength']
del sec['containerLength']
del sec['containerOffset']
for i, sec in enumerate(section_list):
if sec['sectionKind'] == 'loader':
dump_loader_section(section_list, path.join(to_path, sec['filename']), to_path)
write_txt('sections', repr(section_list))
dump_locations(to_path, to_path)
write_python(section_list, to_path, 'sections.txt')
def build(from_path, to_path=None):
@ -167,24 +160,20 @@ def build(from_path, to_path=None):
instead of being returned as a bytes object.
def read_txt(name):
with open(path.join(from_path, name + '.txt')) as f:
dateTimeStamp = parse_mac_date(read_txt('date'))
dateTimeStamp = parse_mac_date(read_txt(from_path, 'date.txt'))
dateTimeStamp = 0
versions = eval(read_txt('version'))
versions = read_python(from_path, 'version.txt')
versions = (versions['oldDefVersion'], versions['oldImpVersion'], versions['currentVersion'])
versions = (0, 0, 0)
section_list = eval(read_txt('sections'))
section_list = read_python(from_path, 'sections.txt')
# Hit the ground running
pef = bytearray(b'J o y ! peffpwpc\x00\x00\x00\x01'.replace(b' ', b''))
@ -274,7 +263,7 @@ def repr(obj):
return accum
elif isinstance(obj, dict):
if set(obj) in (set(('kind', 'weakFlag', 'name')), set(('section', 'offset', 'to')), set(('file', 'offset', 'function'))):
if set(obj) == set(('kind', 'weakFlag', 'name')) or 'offset' in obj:
oneline = True
oneline = False
@ -297,7 +286,7 @@ def repr(obj):
if oneline:
return '{' + ', '.join(accum) + '}'
return '{\n' + textwrap.indent('\n'.join(x + ',' for x in accum), ' ') + '\n}'
return '{\n' + textwrap.indent(''.join(x + ',\n' for x in accum), ' ') + '}'
elif isinstance(obj, tuple):
obj = [hex(el) if (i == 0 and isinstance(el, int)) else repr(el) for (i, el) in enumerate(obj)]
@ -307,6 +296,16 @@ def repr(obj):
return builtins.repr(obj)
def hex(obj):
"""Pad to 5 significant digits (up to a megabyte, plenty)
x = builtins.hex(obj)
while len(x.partition('x')[2]) < 5:
x = x.replace('x', 'x0')
return x
def unpack_pidata(packed):
"""Unpack pattern-initialized (compressed) data
@ -368,23 +367,30 @@ def unpack_pidata(packed):
return bytes(unpacked)
def dump_loader_section(section_list, from_binary_or_path, to_dir):
"""For a given loader section, dump: imports.txt, exports.txt (not yet), relocations.txt
def dump_lowlevel(basepath):
"""Dump from the loader section: exports.txt, imports.txt, mainvectors.txt, relocations.txt
loader = from_binary_or_path
except TypeError:
with open(from_binary_or_path, 'rb') as f:
loader =
section_list = read_python(basepath, 'sections.txt')
sec = dict(zip(('mainSection', 'mainOffset', 'initSection', 'initOffset', 'termSection', 'termOffset'),
struct.unpack_from('>lLlLlL', loader)))
for sec in section_list:
if sec['sectionKind'] == 'loader':
loader = read_bin(basepath, sec['filename'])
return # no loader section
importedLibraryCount, totalImportedSymbolCount, relocSectionCount, relocInstrOffset, loaderStringsOffset, \
exportHashOffset, exportHashTablePower, exportedSymbolCount = struct.unpack_from('>8L', loader, 24)
def get_mainvectors():
cardinals = {}
for ofs, knd in [(0, 'main'), (8, 'init'), (16, 'term')]:
vec_sec_idx, vec_offset = struct.unpack_from('>lL', loader, ofs)
if vec_sec_idx != -1:
cardinals[knd] = dict(section=section_list[vec_sec_idx]['filename'], offset=vec_offset)
return cardinals
def get_name(offset):
return loader[loaderStringsOffset+offset:].partition(b'\0')[0].decode('mac_roman')
@ -564,69 +570,140 @@ def dump_loader_section(section_list, from_binary_or_path, to_dir):
return relocations
to_dir = path.join(to_dir, 'loaderinfo')
os.makedirs(to_dir, exist_ok=True)
def get_exports():
ofs = exportHashOffset
with open(path.join(to_dir, 'imports.txt'), 'w') as f:
f.write(repr([get_imported_library(n) for n in range(importedLibraryCount)]) + '\n')
num_keys = 0
for i in range(2 ** exportHashTablePower):
htab_entry, = struct.unpack_from('>L', loader, ofs)
num_keys += htab_entry >> 18
ofs += 4
with open(path.join(to_dir, 'relocations.txt'), 'w') as f:
f.write(repr(get_relocations()) + '\n')
lengths = []
for i in range(num_keys):
sym_len, sym_hash = struct.unpack_from('>HH', loader, ofs)
ofs += 4
exports = []
for sym_len in lengths:
kind_and_name, sym_offset, sec_idx = struct.unpack_from('>LLh', loader, ofs)
kind = ('code', 'data', 'tvector', 'toc', 'glue')[kind_and_name >> 24]
name = loader[loaderStringsOffset+(kind_and_name&0xFFFFFF):][:sym_len].decode('mac_roman')
sec_name = section_list[sec_idx]['filename']
if sec_idx == -2:
# absolute address
elif sec_idx == -3:
# re-export
exports.append(dict(section=sec_name, offset=sym_offset, kind=kind, name=name))
ofs += 10
exports.sort(key=lambda dct: tuple(dct.values()))
return exports
write_python(get_mainvectors(), basepath, 'dump-lowlevel', 'mainvectors.txt')
write_python(get_exports(), basepath, 'dump-lowlevel', 'exports.txt')
write_python(get_relocations(), basepath, 'dump-lowlevel', 'relocations.txt')
write_python([get_imported_library(n) for n in range(importedLibraryCount)],
basepath, 'dump-lowlevel', 'imports.txt')
def dump_locations(from_path, to_path):
def dump_highlevel(basepath):
"""Create some useful files: glue.txt
to_path = path.join(to_path, 'locations')
os.makedirs(to_path, exist_ok=True)
section_list = read_python(basepath, 'sections.txt')
with open(path.join(from_path, 'sections.txt')) as f: section_list = eval(
# Relocations in lookup-able form
relocs = read_python(basepath, 'dump-lowlevel', 'relocations.txt')
relocs = {(rl['section'], rl['offset']): rl['to'] for rl in relocs}
gluelocs = []
# The base of the TOC is not guaranteed to be the base of the data section
tvectors = [dct for dct in read_python(basepath, 'dump-lowlevel', 'exports.txt') if dct['kind'] == 'tvector']
tvectors.extend(read_python(basepath, 'dump-lowlevel', 'mainvectors.txt').values())
tvectors = [(tv['section'], tv['offset']) for tv in tvectors]
for idx, sec in enumerate(section_list):
if sec['sectionKind'] != 'code': continue
table_of_contents = {}
for section, offset in tvectors: # (section, offset) tuple
reloc_kind, toc_section = relocs.get((section, offset + 4), (None, None))
if reloc_kind == 'section':
secdata = read_bin(basepath, section)
toc_offset, = struct.unpack_from('>L', secdata, offset + 4)
with open(path.join(from_path, sec['filename']), 'rb') as f: code =
table_of_contents = dict(section=toc_section, offset=toc_offset)
write_python(table_of_contents, basepath, 'dump-highlevel', 'table-of-contents.txt')
gluescan = []
for i in range(0, len(code) - 24, 4):
for a, b in zip(code[i:], b'\x81\x82\xff\xff\x90\x41\x00\x14\x80\x0c\x00\x00\x80\x4c\x00\x04\x7c\x09\x03\xa6\x4e\x80\x04\x20'):
if a != b and b != 0xFF: break
toc_ofs, = struct.unpack_from('>h', code, i+2)
gluescan.append((i, toc_ofs))
with open(path.join(from_path, 'loaderinfo', 'relocations.txt')) as f: relocs = eval(
with open(path.join(from_path, 'loaderinfo', 'imports.txt')) as f: imports = eval(
# Exports!
exports = read_python(basepath, 'dump-lowlevel', 'exports.txt')
codelocs_exported = []
# read_bin = functools.lru_cache(read_bin)
for exp in exports:
if exp['kind'] == 'tvector':
reloc_kind, reloc_targ_section = relocs.get((exp['section'], exp['offset']), (None, None))
if reloc_kind == 'section' and 'code' in reloc_targ_section:
secdata = read_bin(basepath, exp['section'])
code_offset, = struct.unpack_from('>L', secdata, exp['offset'])
codelocs_exported.append(dict(section=reloc_targ_section, offset=code_offset, function=exp['name']))
codelocs_exported.sort(key=lambda dct: tuple(dct.values()))
write_python(codelocs_exported, basepath, 'dump-highlevel', 'codelocs-exported.txt')
# Init, term and main functions
codelocs_main = []
for kind, dct in read_python(basepath, 'dump-lowlevel', 'mainvectors.txt').items():
reloc_kind, reloc_targ_section = relocs.get((dct['section'], dct['offset']), (None, None))
if reloc_kind == 'section' and 'code' in reloc_targ_section:
secdata = read_bin(basepath, dct['section'])
code_offset, = struct.unpack_from('>L', secdata, dct['offset'])
codelocs_main.append(dict(section=reloc_targ_section, offset=code_offset, function=kind))
codelocs_main.sort(key=lambda dct: tuple(dct.values()))
write_python(codelocs_main, basepath, 'dump-highlevel', 'codelocs-main.txt')
# Cross-toc glue
codelocs_xtocglue = []
if table_of_contents: # we might not have one if we export no functions!
imports = read_python(basepath, 'dump-lowlevel', 'imports.txt')
imports = [sym['name'] for lib in imports for sym in lib['symbols']]
toc_vectors = {}
for rel in relocs:
if 'data' in rel['section'] and rel['to'][0] == 'import':
toc_vectors[rel['offset']] = imports[rel['to'][1]]
toc_imports = {}
for (reloc_sec, reloc_offset), (reloc_kind, reloc_import_num) in relocs.items():
if reloc_sec == table_of_contents['section'] and reloc_kind == 'import':
toc_imports[reloc_offset - table_of_contents['offset']] = imports[reloc_import_num]
gluelocs = []
for code_ofs, toc_ofs in gluescan:
gluelocs.append(dict(file=sec['filename'], offset=code_ofs, function=toc_vectors[toc_ofs]))
except KeyError:
for sec in section_list:
if 'code' not in sec['filename']: continue
code = read_bin(basepath, sec['filename'])
gluelocs.sort(key=lambda dct: tuple(dct.values()))
gluescan = []
for ofs in range(0, len(code) - 23, 4):
for a, b in zip(code[ofs:ofs+24], b'\x81\x82\xff\xff\x90\x41\x00\x14\x80\x0c\x00\x00\x80\x4c\x00\x04\x7c\x09\x03\xa6\x4e\x80\x04\x20'):
if a != b and b != 0xFF: break
toc_ofs, = struct.unpack_from('>h', code, ofs+2)
codelocs_xtocglue.append(dict(section=sec['filename'], offset=ofs, function=toc_imports[toc_ofs]))
codelocs_xtocglue.sort(key=lambda dct: tuple(dct.values()))
write_python(codelocs_xtocglue, basepath, 'dump-highlevel', 'codelocs-xtocglue.txt')
with open(path.join(to_path, 'glue.txt'), 'w') as f:
f.write(repr(gluelocs) + '\n')
# MacsBug symbol locations
dbgsymlocs = []
codelocs_macsbug = []
for idx, sec in enumerate(section_list):
if sec['sectionKind'] != 'code': continue
with open(path.join(from_path, sec['filename']), 'rb') as f: code =
code = read_bin(basepath, sec['filename'])
end_offset = 0
for i in range(0, len(code) - 17, 4):
@ -648,12 +725,63 @@ def dump_locations(from_path, to_path):
code_ofs = i - guts[3]
code_len = guts[3]
dbgsymlocs.append(dict(file=sec['filename'], offset=code_ofs, function=name.decode('ascii')))
codelocs_macsbug.append(dict(section=sec['filename'], offset=code_ofs, function=name.decode('ascii')))
dbgsymlocs.sort(key=lambda dct: tuple(dct.values()))
codelocs_macsbug.sort(key=lambda dct: tuple(dct.values()))
write_python(codelocs_macsbug, basepath, 'dump-highlevel', 'codelocs-macsbug.txt')
with open(path.join(to_path, 'debugsyms.txt'), 'w') as f:
f.write(repr(dbgsymlocs) + '\n')
# Driver description
for exp in exports:
if exp['kind'] == 'data' and exp['name'] == 'TheDriverDescription':
secdata = read_bin(basepath, exp['section'])
ofs = exp['offset']
desc = list(struct.unpack_from('>4s L 32s L L 32s 32x L', secdata, ofs))
known_bits = {
0x1: 'kDriverIsLoadedUponDiscovery',
0x2: 'kDriverIsOpenedUponLoad',
0x4: 'kDriverIsUnderExpertControl',
0x8: 'kDriverIsConcurrent',
0x10: 'kDriverQueuesIOPB',
0x20: 'kDriverIsLoadedAtBoot',
0x40: 'kDriverIsForVirtualDevice',
bits = []
for i in range(32):
if desc[4] & (1 << i):
bits.append(known_bits.get(1 << i, hex(1 << i)))
bits = '|'.join(bits) or '0'
ofs += 0x74
services = []
for i in range(desc[6]): # nServices
svc = struct.unpack_from('>4s 4s L', secdata, ofs)
'serviceCategory': svc[0].decode('mac_roman'),
'serviceType': svc[1].decode('mac_roman'),
'serviceVersion': parse_mac_version(svc[2]),
ofs += 12
desc = {
'driverDescSignature': desc[0].decode('mac_roman'),
'driverDescVersion': desc[1],
'driverType': {
'nameInfoStr': pstring_or_cstring(desc[2]).decode('mac_roman'),
'version': parse_mac_version(desc[3]),
'driverOSRuntimeInfo': {
'driverRuntime': bits,
'driverName': pstring_or_cstring(desc[5]).decode('mac_roman'),
'driverServices': services,
write_python(desc, basepath, 'dump-highlevel', 'driver-description.txt')
def format_mac_date(srcint):
@ -685,6 +813,46 @@ def parse_mac_date(x):
return delta
def parse_mac_version(num):
maj, minbug, stage, unreleased = num.to_bytes(4, byteorder='big')
maj = '%x' % maj
minor, bugfix = '%02x' % minbug
if stage == 0x80:
stage = 'f'
elif stage == 0x60:
stage = 'b'
elif stage == 0x40:
stage = 'a'
elif stage == 0x20:
stage = 'd'
stage = '?'
unreleased = '%d' % unreleased
vers = maj + '.' + minor
if bugfix != '0':
vers += '.' + bugfix
if (stage, unreleased) != ('f', '0'):
vers += stage + unreleased
return vers
def pstring_or_cstring(s):
plen = s[0]
pstr = s[1:][:plen]
cstr = s.rstrip(b'\0')
if b'\0' in pstr or plen + 1 > len(s):
return cstr
return pstr
def _sec_kind_is_instantiated(sec_kind):
return sec_kind not in ('loader', 'debug', 'exception', 'traceback')
@ -713,11 +881,37 @@ def _sec_kind_min_align(sec_kind):
return 4
def read_python(*path_parts):
return eval(read_txt(*path_parts))
def read_txt(*path_parts):
with open(path.join(*path_parts), 'r') as f:
def read_bin(*path_parts):
with open(path.join(*path_parts), 'rb') as f:
def write_python(python, *path_parts):
write_txt(repr(python), *path_parts)
def write_txt(txt, *path_parts):
write_bin((txt + '\n').encode('utf-8'), *path_parts)
def write_bin(bin, *path_parts):
path_parts = path.join(*path_parts)
os.makedirs(path.dirname(path_parts), exist_ok=True)
with open(path_parts, 'wb') as f:
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='''
Convert between a Code Fragment Manager binary and an easily-edited dump directory.
# parser.add_argument('--gather', action='store_true', help='Binary or directory')
parser.add_argument('src', metavar='SOURCE', action='store', help='Binary or directory')
parser.add_argument('dest', metavar='DEST', action='store', help='Directory or binary')

View File

@ -352,10 +352,10 @@ def patch_rockhopper_ndrv(src, dest=None):
with tempfile.TemporaryDirectory() as tmp:
cfmtool.dump(src, tmp)
glue_file = eval(open(path.join(tmp, 'locations', 'glue.txt')).read())
glue_file = eval(open(path.join(tmp, 'dump-highlevel', 'codelocs-xtocglue.txt')).read())
glue_info = next(d for d in glue_file if d['function'] == 'ExpMgrConfigReadLong')
code_path = path.join(tmp, glue_info['file'])
code_path = path.join(tmp, glue_info['section'])
code = bytearray(open(code_path, 'rb').read())
while len(code) % 4: code.append(0) # align just in case