mirror of
https://github.com/elliotnunn/machfs.git
synced 2024-06-09 20:29:32 +00:00
fb493a154f
Specifically: - A data fork dump file will always exist - A resource fork dump file will exist iff there are any bytes in that fork (but this dump file may be empty) - A Finder info dump will exist if there are any nonzero bytes in the type or creator code
308 lines
10 KiB
Python
308 lines
10 KiB
Python
from collections.abc import MutableMapping
|
|
import os
|
|
from os import path
|
|
from macresources import make_rez_code, parse_rez_code, make_file, parse_file
|
|
from warnings import warn
|
|
|
|
|
|
TEXT_TYPES = [b'TEXT', b'ttro'] # Teach Text read-only
|
|
|
|
|
|
def _unsyncability(name): # files named '_' reserved for directory Finder info
|
|
return name.endswith(('.rdump', '.idump')) or name.startswith('.') or name == '_'
|
|
|
|
def _fuss_if_unsyncable(name):
|
|
if _unsyncability(name):
|
|
raise ValueError('Unsyncable name: %r' % name)
|
|
|
|
def _try_delete(name):
|
|
try:
|
|
os.remove(name)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
class AbstractFolder(MutableMapping):
|
|
def __init__(self, from_dict=()):
|
|
self._prefdict = {} # lowercase to preferred
|
|
self._maindict = {} # lowercase to contents
|
|
self.update(from_dict)
|
|
|
|
def __setitem__(self, key, value):
|
|
if isinstance(key, tuple):
|
|
if len(key) == 1:
|
|
self[key[0]] = value
|
|
else:
|
|
self[key[0]][key[1:]] = value
|
|
|
|
try:
|
|
key = key.decode('mac_roman')
|
|
except AttributeError:
|
|
pass
|
|
|
|
key.encode('mac_roman')
|
|
|
|
lower = key.lower()
|
|
self._prefdict[lower] = key
|
|
self._maindict[lower] = value
|
|
|
|
def __getitem__(self, key):
|
|
if isinstance(key, tuple):
|
|
if len(key) == 1:
|
|
return self[key[0]]
|
|
else:
|
|
return self[key[0]][key[1:]]
|
|
|
|
try:
|
|
key = key.decode('mac_roman')
|
|
except AttributeError:
|
|
pass
|
|
|
|
lower = key.lower()
|
|
return self._maindict[lower]
|
|
|
|
def __delitem__(self, key):
|
|
if isinstance(key, tuple):
|
|
if len(key) == 1:
|
|
del self[key[0]]
|
|
else:
|
|
del self[key[0]][key[1:]]
|
|
|
|
try:
|
|
key = key.decode('mac_roman')
|
|
except AttributeError:
|
|
pass
|
|
|
|
lower = key.lower()
|
|
del self._maindict[lower]
|
|
del self._prefdict[lower]
|
|
|
|
def __iter__(self):
|
|
return iter(self._prefdict.values())
|
|
|
|
def __len__(self):
|
|
return len(self._maindict)
|
|
|
|
def __repr__(self):
|
|
the_dict = {self._prefdict[k]: v for (k, v) in self._maindict.items()}
|
|
return repr(the_dict)
|
|
|
|
def __str__(self):
|
|
lines = []
|
|
for k, v in self.items():
|
|
v = str(v)
|
|
if '\n' in v:
|
|
lines.append(k + ':')
|
|
for l in v.split('\n'):
|
|
lines.append(' ' + l)
|
|
else:
|
|
lines.append(k + ': ' + v)
|
|
return '\n'.join(lines)
|
|
|
|
def iter_paths(self):
|
|
for name, child in self.items():
|
|
yield ((name,), child)
|
|
try:
|
|
childs_children = child.iter_paths()
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
for each_path, each_child in childs_children:
|
|
yield (name,) + each_path, each_child
|
|
|
|
def walk(self, topdown=True):
|
|
result = self._recursive_walk(my_path=(), topdown=topdown)
|
|
|
|
if not topdown:
|
|
result = list(result)
|
|
result.reverse()
|
|
|
|
return result
|
|
|
|
def _recursive_walk(self, my_path, topdown): # like os.walk, except dirpath is a tuple
|
|
dirnames = [n for (n, obj) in self.items() if isinstance(obj, AbstractFolder)]
|
|
filenames = [n for (n, obj) in self.items() if not isinstance(obj, AbstractFolder)]
|
|
|
|
yield (my_path, dirnames, filenames)
|
|
|
|
if not topdown: dirnames.reverse() # hack to account for reverse() in walk()
|
|
|
|
for dn in dirnames: # the caller can change dirnames in a loop
|
|
yield from self[dn]._recursive_walk(my_path=my_path+(dn,), topdown=topdown)
|
|
|
|
def read_folder(self, folder_path, date=0, mpw_dates=False):
|
|
def includefilter(n):
|
|
if n.startswith('.'): return False
|
|
if n.endswith('.rdump'): return True
|
|
if n.endswith('.idump'): return True
|
|
return True
|
|
|
|
def swapsep(n):
|
|
return n.replace(':', path.sep)
|
|
|
|
def mkbasename(n):
|
|
base, ext = path.splitext(n)
|
|
if ext in ('.rdump', '.idump'):
|
|
return base
|
|
else:
|
|
return n
|
|
|
|
self.crdate = self.mddate = self.bkdate = date
|
|
|
|
tmptree = {folder_path: self}
|
|
|
|
for dirpath, dirnames, filenames in os.walk(folder_path):
|
|
dirnames[:] = [swapsep(x) for x in dirnames if includefilter(x)]
|
|
filenames[:] = [swapsep(x) for x in filenames if includefilter(x)]
|
|
|
|
for dn in dirnames:
|
|
_fuss_if_unsyncable(dn)
|
|
|
|
newdir = Folder()
|
|
newdir.crdate = newdir.mddate = newdir.bkdate = date
|
|
tmptree[dirpath][dn] = newdir
|
|
tmptree[path.join(dirpath, dn)] = newdir
|
|
|
|
for fn in filenames:
|
|
basename = mkbasename(fn)
|
|
_fuss_if_unsyncable(basename)
|
|
|
|
fullbase = path.join(dirpath, basename)
|
|
fullpath = path.join(dirpath, fn)
|
|
|
|
try:
|
|
thefile = tmptree[fullbase]
|
|
except KeyError:
|
|
thefile = File()
|
|
thefile.real_t = 0 # for the MPW hack
|
|
thefile.crdate = thefile.mddate = thefile.bkdate = date
|
|
thefile.contributors = []
|
|
tmptree[fullbase] = thefile
|
|
|
|
if fn.endswith('.idump'):
|
|
with open(fullpath, 'rb') as f:
|
|
thefile.type = f.read(4)
|
|
thefile.creator = f.read(4)
|
|
elif fn.endswith('rdump'):
|
|
rez = open(fullpath, 'rb').read()
|
|
resources = parse_rez_code(rez)
|
|
resfork = make_file(resources, align=4)
|
|
thefile.rsrc = resfork
|
|
else:
|
|
thefile.data = open(fullpath, 'rb').read()
|
|
|
|
thefile.contributors.append(fullpath)
|
|
if mpw_dates:
|
|
thefile.real_t = max(thefile.real_t, path.getmtime(fullpath))
|
|
|
|
tmptree[dirpath][basename] = thefile
|
|
|
|
for pathtpl, obj in self.iter_paths():
|
|
try:
|
|
if obj.type in TEXT_TYPES:
|
|
obj.data = obj.data.decode('utf8').replace('\r\n', '\r').replace('\n', '\r').encode('mac_roman')
|
|
except AttributeError:
|
|
pass
|
|
|
|
if mpw_dates:
|
|
all_real_times = set()
|
|
for pathtpl, obj in self.iter_paths():
|
|
try:
|
|
all_real_times.add(obj.real_t)
|
|
except AttributeError:
|
|
pass
|
|
ts2idx = {ts: idx for (idx, ts) in enumerate(sorted(set(all_real_times)))}
|
|
|
|
for pathtpl, obj in self.iter_paths():
|
|
try:
|
|
real_t = obj.real_t
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
fake_t = obj.crdate + 60 * ts2idx[real_t]
|
|
obj.crdate = obj.mddate = obj.bkdate = fake_t
|
|
|
|
def write_folder(self, folder_path):
|
|
def any_exists(at_path):
|
|
if path.exists(at_path): return True
|
|
if path.exists(at_path + '.rdump'): return True
|
|
if path.exists(at_path + '.idump'): return True
|
|
return False
|
|
|
|
written = []
|
|
blacklist = list()
|
|
for p, obj in self.iter_paths():
|
|
blacklist_test = ':'.join(p) + ':'
|
|
if blacklist_test.startswith(tuple(blacklist)): continue
|
|
if _unsyncability(p[-1]):
|
|
warn('Ignoring unsyncable name: %r' % (':' + ':'.join(p)))
|
|
blacklist.append(blacklist_test)
|
|
continue
|
|
|
|
nativepath = path.join(folder_path, *(comp.replace(path.sep, ':') for comp in p))
|
|
info_path = nativepath + '.idump'
|
|
rsrc_path = nativepath + '.rdump'
|
|
|
|
if isinstance(obj, Folder):
|
|
os.makedirs(nativepath, exist_ok=True)
|
|
|
|
elif obj.mddate != obj.bkdate or not any_exists(nativepath):
|
|
# always write the data fork
|
|
data = obj.data
|
|
if obj.type in TEXT_TYPES:
|
|
data = data.decode('mac_roman').replace('\r', os.linesep).encode('utf8')
|
|
with open(nativepath, 'wb') as f:
|
|
f.write(data)
|
|
|
|
# write a resource dump iff that fork has any bytes (dump may still be empty)
|
|
if obj.rsrc:
|
|
with open(rsrc_path, 'wb') as f:
|
|
rdump = make_rez_code(parse_file(obj.rsrc), ascii_clean=True)
|
|
f.write(rdump)
|
|
else:
|
|
_try_delete(rsrc_path)
|
|
|
|
# write an info dump iff either field is non-null
|
|
idump = obj.type + obj.creator
|
|
if any(idump):
|
|
with open(info_path, 'wb') as f:
|
|
f.write(idump)
|
|
else:
|
|
_try_delete(info_path)
|
|
|
|
if written:
|
|
t = path.getmtime(written[-1])
|
|
for w in written:
|
|
os.utime(w, (t, t))
|
|
|
|
|
|
class Folder(AbstractFolder):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
self.flags = 0 # help me!
|
|
self.x = 0 # where to put this spatially?
|
|
self.y = 0
|
|
|
|
self.crdate = self.mddate = self.bkdate = 0
|
|
|
|
|
|
class File:
|
|
def __init__(self):
|
|
self.type = b'????'
|
|
self.creator = b'????'
|
|
self.flags = 0 # help me!
|
|
self.x = 0 # where to put this spatially?
|
|
self.y = 0
|
|
|
|
self.locked = False
|
|
self.crdate = self.mddate = self.bkdate = 0
|
|
|
|
self.rsrc = bytearray()
|
|
self.data = bytearray()
|
|
|
|
def __str__(self):
|
|
typestr, creatorstr = (x.decode('mac_roman') for x in (self.type, self.creator))
|
|
dstr, rstr = (repr(bytes(x)) if 1 <= len(x) <= 32 else '%db' % len(x) for x in (self.data, self.rsrc))
|
|
return '[%s/%s] data=%s rsrc=%s' % (typestr, creatorstr, dstr, rstr)
|