T. Joseph Carter f3b5fe7dcd Added hexdump function
What's wrong with b2a_hex() or hex()?  Well, hex() only converts integers.  And
while a2b_hex() ignores whitespace, b2a_hex() doesn't provide any, making for
difficult to read output for anything longer than about 8 bytes or so.

In the basic case, it seems like you want a classic hexdump.  I chose the xxd

xxxxxxxx: xxxx xxxx xxxx xxxx  xxxx xxxx xxxx xxxx |cccccccccccccccc|

Rather than hardcode all of the integers and strings (as I started doing), I
decided that I might as well use variables for these things if only for
readability.  And if they're locals, you might as well be able to override

The knobs you have to play with are therefore these:

- wordsize=2, how many bytes are grouped together
- sep=' ', the spacing between words
- sep2='  ', the midpoint spacing

I suppose I could've made everything else configurable too, but YAGNI.
2017-07-02 15:45:28 -07:00

1335 lines
40 KiB
Executable File

#!/usr/bin/env python3
# vim: set tabstop=4 shiftwidth=4 noexpandtab filetype=python:
"""cppo: Copy/catalog files from a ProDOS/DOS 3.3/ShrinkIt image/archive.
copy all files: cppo [options] imagefile target_directory
copy one file : cppo [options] imagefile /extract/path target_path
catalog image : cppo -cat [options] imagefile
-shk: ShrinkIt archive as source (also auto-enabled by filename).
-ad : Netatalk-compatible AppleDouble metadata files and resource forks.
-e : Nulib2-compatible filenames with type/auxtype and resource forks.
-uc : Copy GS/OS mixed case filenames as uppercase.
-pro: Adapt DOS 3.3 names to ProDOS and remove addr/len from file data.
/extract/path examples:
/FULL/PRODOS/PATH (ProDOS image source)
"MY FILENAME" (DOS 3.3 image source)
Dir:SubDir:FileName (ShrinkIt archive source)
+ after a file name indicates a GS/OS or Mac OS extended (forked) file.
Wildcard matching (*) is not supported and images are not validated.
ShrinkIt support requires Nulib2. cppo requires Python 2.6+ or 3.0+."""
# cppo by Ivan X, ivan@ivanx.com, ivanx.com/appleii
# Does anyone want to rewrite/refactor this? It works, but it's a mess.
import sys
import os
import datetime
import shutil
import errno
import uuid # for temp directory
import subprocess
#import tempfile # not used, but should be for temp directory?
import logging
import struct
from typing import Sequence
from collections import namedtuple
from binascii import a2b_hex, b2a_hex
class Globals:
g = Globals()
g.image_data = b''
g.out_data = bytearray(b'')
g.ex_data = None
g.activeDirBlock = None
g.activeFileName = None
g.activeFileSize = None
g.activeFileBytesCopied = 0
g.resourceFork = 0
g.shk_hasrf = False
g.DIRPATH = ""
g.target_name = None
g.target_dir = ""
g.appledouble_dir = None
g.extract_file = None
# runtime options
g.use_appledouble = False # -ad (AppleDouble headers + resource forks)
g.use_extended = False # -e (extended filenames + resource forks)
g.catalog_only = False # -cat (catalog only, no extract)
g.casefold_upper = False # -uc (GS/OS mixed case filenames extract as uppercase)
g.src_shk = False # -shk (ShrinkIt archive source)
g.prodos_names = False # -pro (adapt DOS 3.3 names to ProDOS)
g.afpsync_msg = True # -s (sets False to suppress afpsync message at end)
g.extract_in_place = False # -n (don't create parent dir for SHK, extract files in place)
g.dos33 = False # (DOS 3.3 image source, selected automatically)
# functions
def pack_u24be(buf: bytearray, offset: int, val: int):
lo16 = val & 0xffff
hi8 = (val >> 16) & 0xff
struct.pack_into('>BH', buf, offset, hi8, lo16)
def pack_u32be(buf: bytearray, offset: int, val: int):
# Currently unused, will be needed for resource fork dates later
struct.pack_into('>L', buf, offset, val)
def unpack_u16le(buf: bytes, offset: int = 0) -> int:
return struct.unpack_from('<H', buf, offset)[0]
def unpack_u24le(buf: bytes, offset: int = 0) -> int:
lo16, hi8 = struct.unpack_from('<HB', buf, offset)
return lo16 | (hi8 << 16)
# FIXME: I know -> namedtuple below is _wrong_. It's a hint, and I am doing
# things wrong here. I'm not 100% sure how to do this right. If you are,
# please submit a more pythonic fix.
def unpack_2mg(buf: bytes, offset: int = 0) -> namedtuple:
# 2017-07-01: Version 1 data is the only kind so far...
a2mg = A2mg1(*struct.unpack_from(A2MG1_UNPACK, buf, offset))
if a2mg.magic == b'2IMG':
if a2mg.version == 1:
vol = A2mg1Vol(
locked=bool(a2mg.vol & (1<<31)),
dosvol=a2mg.vol & 0xff if a2mg.vol & 0x100 else None
a2mg = a2mg._replace(vol=vol)
"Unrecognized 2mg version {}: '{}'".format(
a2mg.version, name
a2mg = A2mgUnk(*a2mg[0:len(A2MG_UNK_ATTRS)])
a2mg = None
except ValueError:
a2mg = None
return a2mg
def date_prodos_to_unix(prodos_date: bytes) -> int:
"""Returns a UNIX timestamp given a raw ProDOS date"""
"""The ProDOS date consists of two 16-bit words stored little-
endian. We receive them as raw bytes with this layout:
mmmddddd yyyyyyym 00MMMMMM 000HHHHH
year yyyyyyy
month m mmm
day ddddd
hour HHHHH
minute MMMMMM
Some notes about that:
- The high bit of the month is the low bit of prodos_date[1], the rest of
lower bits are found in prodos_date[0].
- The two-digit year treats 40-99 as being 19xx, else 20xx.
- ProDOS has only minute-precision for its timestamps. Data regarding
seconds is lost.
- ProDOS dates are naive in the sense they lack a timezone. We (naively)
assume these timestamps are in local time.
- The unused bits in the time fields are masked off, just in case they're
ever NOT zero. 2040 is coming.
year = (prodos_date[1] & 0xfe)>>1
year += 1900 if year >= 40 else 2000
month = ((prodos_date[1] & 0x01)<<4) | ((prodos_date[0] & 0xe0)>>5)
day = prodos_date[0] & 0x1f
hour = prodos_date[3] & 0x1f
minute = prodos_date[2] & 0x3f
return int(datetime.datetime(year, month, day,
hour, minute).timestamp())
# <NO DATE> is always an option
return None
"""The number of seconds between 1970-01-01 amd 2000-01-01"""
# $ date --date="2000-01-01 00:00:00 GMT" +%s
# 946684800
def date_unix_to_appledouble(unix_date):
""" convert UNIX date to Apple epoch (2000-01-01) """
# input: seconds since Unix epoch (1-Jan-1970 00:00:00 GMT)
# output: seconds since Netatalk epoch (1-Jan-2000 00:00:00 GMT),
# in 4 bytes
adDate = int(unix_date - APPLE_EPOCH_OFFSET)
# Think: "UNIX dates have 30 years too many seconds to be Apple dates,
# so we need to subtract 30 years' worth of seconds."
if adDate < 0:
adDate += 1<<32 # to get negative hex number
return adDate.to_bytes(4, 'big')
# cppo support functions:
# arg1: directory block or [T,S] containing file entry, or shk file dir path
# arg2: file index in overall directory (if applicable), or shk file name
# returns byte position in disk image file
def getStartPos(arg1, arg2):
if g.dos33:
return (ts(arg1) + (35 * (arg2 % 7)) + 11)
else: # ProDOS
return (
(arg1 * 512)
+ (39 * ((arg2 + (arg2 > 11)) % 13))
+ (4 if arg2 > 11 else 43) )
def getStorageType(arg1, arg2):
start = getStartPos(arg1, arg2)
firstByte = g.image_data[start]
return (int(firstByte != 255)*2 if g.dos33 else (firstByte//16))
def getFileName(arg1, arg2):
start = getStartPos(arg1, arg2)
if g.dos33:
fileNameLo = bytearray()
fileNameHi = g.image_data[sli(start+3, 30)]
for b in fileNameHi:
fileNameLo.append(b & 0x7f)
fileName = bytes(fileNameLo).rstrip()
else: # ProDOS
firstByte = g.image_data[start]
entryType = firstByte//16
nameLength = firstByte - entryType*16
fileName = g.image_data[sli(start+1, nameLength)]
caseMask = getCaseMask(arg1, arg2)
if caseMask and not g.casefold_upper:
fileName = bytearray(fileName)
for i in range(0, len(fileName)):
if caseMask[i] == "1":
fileName[i:i+1] = fileName[i:i+1].lower()
fileName = bytes(fileName)
return fileName
def getCaseMask(arg1, arg2):
start = getStartPos(arg1, arg2)
caseMaskDec = unpack_u16le(g.image_data, start + 28)
if caseMaskDec < 32768:
return None
return to_bin(caseMaskDec - 32768, 15)
def getFileType(arg1, arg2):
if g.src_shk:
return arg2.split('#')[1][0:2]
start = getStartPos(arg1, arg2)
if g.dos33:
d33fileType = g.image_data[start+2]
if (d33fileType & 127) == 4:
return '06' # BIN
elif (d33fileType & 127) == 1:
return 'FA' # INT
elif (d33fileType & 127) == 2:
return 'FC' # BAS
return '04' # TXT or other
else: # ProDOS
return b2a_hex(g.image_data[start+16:start+17]).decode()
def getAuxType(arg1, arg2):
if g.src_shk:
return arg2.split('#')[1][2:6]
start = getStartPos(arg1, arg2)
if g.dos33:
fileType = getFileType(arg1, arg2)
if fileType == '06': # BIN (B)
# file address is in first two bytes of file data
fileTSlist = list(g.image_data[sli(start+0,2)])
fileStart = list(g.image_data[sli(ts(fileTSlist)+12,2)])
return (
b2a_hex(g.image_data[sli(ts(fileStart)+1,1)]) +
elif fileType == 'FC': # BAS (A)
return '0801'
elif fileType == 'FA': # INT (I)
return '9600'
else: # TXT (T) or other
return '0000'
else: # ProDOS
return format(unpack_u16le(g.image_data, start + 31), '04x')
def getKeyPointer(arg1, arg2):
start = getStartPos(arg1, arg2)
if g.dos33:
return list(g.image_data[sli(start,2)])
else: # ProDOS
return unpack_u16le(g.image_data, start + 17)
def getFileLength(arg1, arg2):
start = getStartPos(arg1, arg2)
if g.dos33:
fileType = getFileType(arg1, arg2)
fileTSlist = list(g.image_data[sli(start,2)])
fileStart = list(g.image_data[sli(ts(fileTSlist)+12,2)])
if fileType == '06': # BIN (B)
# file length is in second two bytes of file data
file_size = unpack_u16le(g.image_data, ts(fileStart) + 2) + 4
elif fileType == 'FC' or fileType == 'FA': # BAS (A) or INT (I)
# file length is in first two bytes of file data
file_size = unpack_u16le(g.image_data, ts(fileStart)) + 2
else: # TXT (T) or other
# sadly, we have to walk the whole file
# length is determined by sectors in TSlist, minus wherever
# anything after the first zero in the last sector
file_size = 0
lastTSpair = None
prevTSpair = [0,0]
nextTSlistSector = fileTSlist
endFound = False
while not endFound:
pos = ts(nextTSlistSector)
for tsPos in range(12, 256, 2):
cur_ts_pair = list(g.image_data[sli(pos+tsPos,2)])
if ts(cur_ts_pair) != 0:
file_size += 256
prevTSpair = cur_ts_pair
lastTSpair = prevTSpair
endFound = True
if not lastTSpair:
nextTSlistSector = list(g.image_data[sli(pos+1,2)])
if nextTSlistSector[0]+nextTSlistSector[1] == 0:
lastTSpair = prevTSpair
endFound = True
file_size -= 256
pos = ts(prevTSpair)
# now find out where the file really ends by finding the last 00
for offset in range(255, -1, -1):
#print("pos: " + to_hex(pos))
if g.image_data[pos+offset] != 0:
file_size += (offset + 1)
else: # ProDOS
file_size = unpack_u24le(g.image_data, start + 21)
return file_size
def getCreationDate(arg1, arg2):
#outputs prodos creation date/time as Unix time
# (seconds since Jan 1 1970 GMT)
#or None if there is none
if g.src_shk:
return None
elif g.dos33:
return None
else: # ProDOS
start = getStartPos(arg1, arg2)
return date_prodos_to_unix(g.image_data[start+24:start+28])
def getModifiedDate(arg1, arg2):
#outputs prodos modified date/time as Unix time
# (seconds since Jan 1 1970 GMT)
#or None if there is none
if g.src_shk:
return int(os.path.getmtime(os.path.join(arg1, arg2)))
elif g.dos33:
return None
else: # ProDOS
start = getStartPos(arg1, arg2)
return date_prodos_to_unix(g.image_data[start+33:start+27])
def getVolumeName():
return getWorkingDirName(2)
def getWorkingDirName(arg1, arg2=None):
# arg1:block, arg2:casemask (optional)
start = arg1 * 512
firstByte = g.image_data[start+4]
entryType = firstByte//16
nameLength = firstByte - entryType*16
workingDirName = g.image_data[sli(start+5, nameLength)]
if entryType == 15: # volume directory, get casemask from header
caseMaskDec = unpack_u16le(g.image_data, start + 26)
if caseMaskDec < 32768:
caseMask = None
caseMask = to_bin(caseMaskDec - 32768,15)
else: # subdirectory, get casemask from arg2 (not available in header)
caseMask = arg2
if caseMask and not g.casefold_upper:
workingDirName = bytearray(workingDirName)
for i in range(0, len(workingDirName)):
if caseMask[i] == "1":
workingDirName[i:i+1] = workingDirName[i:i+1].lower()
workingDirName = bytes(workingDirName)
return workingDirName
def getDirEntryCount(arg1):
if g.dos33:
entryCount = 0
nextSector = arg1
while True:
top = ts(nextSector)
pos = top+11
for e in range(0, 7):
if g.image_data[pos] == 0:
return entryCount # no more file entries
if g.image_data[pos] != 255:
entryCount += 1 # increment if not deleted file
pos += 35
nextSector = list(g.image_data[sli(top+1,2)])
if nextSector == [0,0]: # no more catalog sectors
return entryCount
else: # ProDOS
start = arg1 * 512
return unpack_u16le(g.image_data, start + 37)
def getDirNextChunkPointer(arg1):
if g.dos33:
start = ts(arg1)
return list(g.image_data[sli(start+1,2)])
else: # ProDOS
start = arg1 * 512
return unpack_u16le(g.image_data, start + 2)
def toProdosName(name):
i = 0
if name[0] == '.': # eliminate leading period
name = name[1:]
for c in name:
if c != '.' and not c.isalnum():
name = name[:i] + '.' + name[i+1:]
i += 1
name = name[:15]
return name
def ts(track, sector=None):
# returns offset; track and sector can be dec, or hex-ustr
# can also supply as [t,s] for convenience
if sector == None:
(track, sector) = track
if isinstance(track, str): # hex-ustr
track = int(track, 16)
if isinstance(sector, str): # hex-ustr
sector = int(sector, 16)
return track*16*256 + sector*256
def sli(start, length=1, ext=None):
"""return a slice object from an offset and length"""
return slice(start, start + length, ext)
# --- main logic functions
def copyFile(arg1, arg2):
# ProDOS : directory block / file index in overall directory
# DOS 3.3 : [track, sector] / file index in overall VTOC
# ShrinkIt: directory path / file name
# copies file or dfork to g.out_data, rfork if any to g.ex_data
g.activeFileBytesCopied = 0
if g.src_shk:
with open(os.path.join(arg1, arg2), 'rb') as infile:
g.out_data += infile.read()
if g.shk_hasrf:
print(" [data fork]")
if g.use_extended or g.use_appledouble:
print(" [resource fork]")
if g.ex_data == None:
g.ex_data = bytearray(b'')
with open(os.path.join(arg1, (arg2 + "r")), 'rb') as infile:
g.ex_data += infile.read()
else: # ProDOS or DOS 3.3
storageType = getStorageType(arg1, arg2)
keyPointer = getKeyPointer(arg1, arg2)
fileLen = getFileLength(arg1, arg2)
if storageType == 1: #seedling
copyBlock(keyPointer, fileLen)
elif storageType == 2: #sapling
elif storageType == 3: #tree
elif storageType == 5: #extended (forked)
if g.prodos_names:
# remove address/length data from DOS 3.3 file data if ProDOS target
if getFileType(arg1, arg2) == '06':
g.out_data = g.out_data[4:]
elif (getFileType(arg1, arg2) == 'FA'
or getFileType(arg1, arg2) == 'FC'):
g.out_data = g.out_data[2:]
def copyBlock(arg1, arg2):
#arg1: block number or [t,s] to copy
#arg2: bytes to write (should be 256 (DOS 3.3) or 512 (ProDOS),
# unless final block with less)
#print(arg1 + " " + arg2 + " " + g.activeFileBytesCopied)
if arg1 == 0:
outBytes = bytes(arg2)
outBytes = g.image_data[sli(ts(arg1) if g.dos33 else arg1*512, arg2)]
if g.resourceFork > 0:
if g.use_appledouble or g.use_extended:
offset = (741 if g.use_appledouble else 0)
if g.ex_data == None:
g.ex_data = bytearray(b'')
g.activeFileBytesCopied + offset
: g.activeFileBytesCopied + offset + arg2
] = outBytes
: g.activeFileBytesCopied + arg2
] = outBytes
g.activeFileBytesCopied += arg2
def process_dir(arg1, arg2=None, arg3=None, arg4=None, arg5=None):
# arg1: ProDOS directory block, or DOS 3.3 [track,sector]
# for key block (with directory header):
# arg2: casemask (optional), arg3:None, arg4:None, arg5:None
# for secondary directory blocks (non-key block):
# arg2/3/4/5: for non-key chunks: entryCount, entry#,
# workingDirName, processedEntryCount
entryCount = None
e = None
pe = None
workingDirName = None
if arg3:
entryCount = arg2
e = arg3
workingDirName = arg4
pe = arg5
e = 0
pe = 0
entryCount = getDirEntryCount(arg1)
if not g.dos33:
workingDirName = getWorkingDirName(arg1, arg2).decode("L1")
g.DIRPATH = (g.DIRPATH + "/" + workingDirName)
if ("/" + g.PDOSPATH_SEGMENT.lower()) != g.DIRPATH.lower():
print("ProDOS volume name does not match disk image.")
#else: print(g.DIRPATH)
while pe < entryCount:
if getStorageType(arg1, e) > 0:
#print(pe, e, entryCount)
processEntry(arg1, e)
pe += 1
e += 1
if not (e + (0 if g.dos33 else (e>11)) ) % (7 if g.dos33 else 13):
getDirNextChunkPointer(arg1), entryCount, e,
workingDirName, pe)
def processEntry(arg1, arg2):
# arg1=block number, [t,s] if g.dos33=True, or subdir name if g.src_shk=1
# arg2=index number of entry in directory, or file name if g.src_shk=1
#print(getFileName(arg1, arg2), getStorageType(arg1, arg2),
# getFileType(arg1, arg2), getKeyPointer(arg1, arg2),
# getFileLength(arg1, arg2), getAuxType(arg1, arg2),
# getCreationDate(arg1, arg2), getModifiedDate(arg1, arg2))
eTargetName = None
g.ex_data = None
g.out_data = bytearray(b'')
if g.src_shk: # ShrinkIt archive
g.activeFileName = (arg2 if g.use_extended else arg2.split('#')[0])
if g.casefold_upper:
g.activeFileName = g.activeFileName.upper()
origFileName = g.activeFileName
else: # ProDOS or DOS 3.3 image
g.activeFileName = getFileName(arg1 ,arg2).decode("L1")
origFileName = g.activeFileName
if g.prodos_names:
g.activeFileName = toProdosName(g.activeFileName)
g.activeFileSize = getFileLength(arg1, arg2)
if (not g.PDOSPATH_INDEX or
g.activeFileName.upper() == g.PDOSPATH_SEGMENT.upper()):
# if ProDOS directory, not file
if not g.src_shk and getStorageType(arg1, arg2) == 13:
g.target_dir = g.target_dir + "/" + g.activeFileName
g.appledouble_dir = g.target_dir + "/.AppleDouble"
if not g.catalog_only or os.path.isdir(g.target_dir):
if (not g.catalog_only and g.use_appledouble
and not os.path.isdir(g.appledouble_dir)):
process_dir(getKeyPointer(arg1, arg2), getCaseMask(arg1, arg2))
g.DIRPATH = g.DIRPATH.rsplit("/", 1)[0]
g.target_dir = g.target_dir.rsplit("/", 1)[0]
g.appledouble_dir = (g.target_dir + "/.AppleDouble")
else: # ProDOS or DOS 3.3 file either from image or ShrinkIt archive
dirPrint = ""
dirPrint = g.DIRPATH + "/"
if g.src_shk:
if "/".join(dirName.split('/')[3:]):
dirPrint = ("/".join(dirName.split('/')[3:]) + "/")
if (not g.extract_file or (
== origFileName.split('#')[0].lower())):
filePrint = g.activeFileName.split("#")[0]
dirPrint + filePrint
+ ("+" if (g.shk_hasrf
or (not g.src_shk
and getStorageType(arg1, arg2) == 5))
else "")
+ ((" [" + origFileName + "] ")
if (g.prodos_names
and origFileName != g.activeFileName)
else ""))
if g.catalog_only:
if not g.target_name:
g.target_name = g.activeFileName
if g.use_extended:
if g.src_shk:
eTargetName = arg2
else: # ProDOS image
eTargetName = (g.target_name + "#"
+ getFileType(arg1, arg2).lower()
+ getAuxType(arg1, arg2).lower())
# touch(g.target_dir + "/" + g.target_name)
if g.use_appledouble:
copyFile(arg1, arg2)
saveName = (g.target_dir + "/"
+ (eTargetName if eTargetName else g.target_name))
save_file(saveName, g.out_data)
d_created = getCreationDate(arg1, arg2)
d_modified = getModifiedDate(arg1, arg2)
if not d_modified:
d_modified = (d_created
or int(datetime.datetime.today().timestamp()))
if not d_created:
d_created = d_modified
if g.use_appledouble: # AppleDouble
# set dates
ADfile_path = g.appledouble_dir + "/" + g.target_name
g.ex_data[637:641] = date_unix_to_appledouble(d_created)
g.ex_data[641:645] = date_unix_to_appledouble(d_modified)
g.ex_data[645] = 0x80
g.ex_data[649] = 0x80
#set type/creator
g.ex_data[653] = ord('p')
g.ex_data[654:657] = bytes.fromhex(
getFileType(arg1, arg2)
+ getAuxType(arg1, arg2))
g.ex_data[657:661] = b'pdos'
save_file(ADfile_path, g.ex_data)
touch(saveName, d_modified)
if g.use_extended: # extended name from ProDOS image
if g.ex_data:
save_file((saveName + "r"), g.ex_data)
touch((saveName + "r"), d_modified)
or (g.extract_file
and (g.extract_file.lower()
== origFileName.lower()))):
g.target_name = None
#else print(g.activeFileName + " doesn't match " + g.PDOSPATH_SEGMENT)
def processForkedFile(arg1):
# finder info except type/creator
fInfoA_entryType = g.image_data[9]
fInfoB_entryType = g.image_data[27]
if fInfoA_entryType == 1:
g.image_data[661:669], g.image_data[18:26]
elif fInfoA_entryType == 2:
g.image_data[669:685], g.image_data[10:26]
if fInfoB_entryType == 1:
g.image_data[661:669], g.image_data[36:44]
elif fInfoB_entryType == 2:
g.image_data[669:685], g.image_data[28:44]
for f in (0, 256):
g.resourceFork = f
g.activeFileBytesCopied = 0
forkStart = arg1 * 512 # start of Forked File key block
#print("--" + forkStart)
forkStorageType = g.image_data[forkStart+f]
forkKeyPointer = unpack_u16le(g.image_data, forkStart + f + 1)
forkFileLen = unpack_u24le(g.image_data, forkStart + f + 5)
g.activeFileSize = forkFileLen
if g.resourceFork > 0:
rsrcForkLen = unpack_u24le(g.image_data, forkStart + f + 5)
#print(">>>", rsrcForkLen)
if g.use_appledouble or g.use_extended:
print(" [resource fork]")
if g.use_appledouble:
pack_u24be(g.ex_data, 35, rsrcForkLen)
print(" [data fork]")
if forkStorageType == 1: #seedling
copyBlock(forkKeyPointer, forkFileLen)
elif forkStorageType == 2: #sapling
elif forkStorageType == 3: #tree
g.resourceFork = 0
def processMasterIndexBlock(arg1):
processIndexBlock(arg1, True)
def processIndexBlock(arg1, arg2=False):
#arg1: indexBlock, or [t,s] of track/sector list
#arg2: if True, it's a Master Index Block
pos = 12 if g.dos33 else 0
bytesRemaining = g.activeFileSize
while g.activeFileBytesCopied < g.activeFileSize:
if g.dos33:
targetTS = list(g.image_data[sli(ts(arg1)+pos,2)])
bytesRemaining = (g.activeFileSize - g.activeFileBytesCopied)
bs = (bytesRemaining if bytesRemaining < 256 else 256)
copyBlock(targetTS, bs)
pos += 2
if pos > 255:
# continue with next T/S list sector
else: # ProDOS
# Note these are not consecutive bytes
targetBlock = (g.image_data[arg1*512+pos] +
if arg2:
bytesRemaining = (g.activeFileSize - g.activeFileBytesCopied)
bs = (bytesRemaining if bytesRemaining < 512 else 512)
copyBlock(targetBlock, bs)
pos += 1
if pos > 255:
break # go to next entry in Master Index Block (tree)
def makeADfile():
if not g.use_appledouble:
touch(g.appledouble_dir + "/" + g.target_name)
g.ex_data = bytearray(741)
# ADv2 header
g.ex_data[sli(0x00,8)] = a2b_hex("0005160700020000")
# number of entries
g.ex_data[sli(0x18,2)] = a2b_hex("000D")
# Resource Fork
g.ex_data[sli(0x1a,12)] = a2b_hex("00000002000002E500000000")
# Real Name
g.ex_data[sli(0x26,12)] = a2b_hex("00000003000000B600000000")
# Comment
g.ex_data[sli(0x32,12)] = a2b_hex("00000004000001B500000000")
# Dates Info
g.ex_data[sli(0x3e,12)] = a2b_hex("000000080000027D00000010")
# Finder Info
g.ex_data[sli(0x4a,12)] = a2b_hex("000000090000028D00000020")
# ProDOS file info
g.ex_data[sli(0x56,12)] = a2b_hex("0000000B000002C100000008")
# AFP short name
g.ex_data[sli(0x62,12)] = a2b_hex("0000000D000002B500000000")
# AFP File Info
g.ex_data[sli(0x6e,12)] = a2b_hex("0000000E000002B100000004")
# AFP Directory ID
g.ex_data[sli(0x7a,12)] = a2b_hex("0000000F000002AD00000004")
# dbd (second time) will create DEV, INO, SYN, SV~
def quit_now(exitcode=0):
if (exitcode == 0 and g.afpsync_msg and
g.use_appledouble and os.path.isdir("/usr/local/etc/netatalk")):
"File(s) have been copied to the target directory. "
"If the directory\n"
"is shared by Netatalk, please type 'afpsync' now.")
if g.src_shk: # clean up
for file in os.listdir('/tmp'):
if file.startswith("cppo-"):
shutil.rmtree('/tmp' + "/" + file)
def usage(exitcode=1):
def to_sys_name(name):
if os.name == 'nt':
if name[-1] == '.':
name += '-'
name = name.replace('./', '.-/')
return name
#---- IvanX general purpose functions ----#
def to_hex(val):
"""convert bytes, decimal number, or [bin-ustr] to two-digit hex values
unlike hex(), accepts bytes; has no leading 0x or trailing L"""
if isinstance(val, list): # [bin-ustr]
val = int(val[0], 2)
if isinstance(val, bytes): # bytes
return b2a_hex(val).decode()
elif isnumber(val):
if val < 0:
print ("val: " + str(val))
return b2a_hex(bytes([val])).decode()
raise Exception("to_hex() requires bytes, int/long, or [bin-ustr]")
def to_dec(val):
"""convert bytes, hex-ustr or [bin-ustr] to decimal int/long"""
if isinstance(val, list): # [bin-ustr]
return int(val[0], 2)
elif isinstance(val, bytes): # bytes
return int(to_hex(val), 16)
elif isinstance(val, str): # hex-ustr
return int(val, 16)
elif isnumber(val): # int/long
return val
raise Exception("to_dec() requires bytes, hex-ustr or [bin-ustr]")
def to_bin(val, fill = None):
"""convert bytes, hex-ustr, or int/long to bin-ustr"""
if isinstance(val, bytes): # bytes
b = bin(to_dec(to_hex(val)))[2:]
elif isinstance(val, str): # hex-ustr
b = bin(int(val, 16))[2:]
elif isnumber(val): # int/long
b = bin(val)[2:]
raise Exception("to_bin() requires bytes, hex-ustr, or int/long")
return b if not fill else b.zfill(fill)
def to_bytes(val):
"""converts hex-ustr, int/long, or [bin-ustr] to bytes"""
if isinstance(val, list): # [bin-ustr]
val = to_hex(val[0])
if isnumber(val): # int/long
if val < 256:
return chr(val).encode()
val = to_hex(val)
if isinstance(val, str): # hex-ustr
return a2b_hex(val.encode())
elif isinstance(val, bytes):
return val
raise Exception("to_bytes() requires hex-ustr, int/long, or [bin-ustr]")
def touch(file_path, modTime=None):
# http://stackoverflow.com/questions/1158076/implement-touch-using-python
with open(to_sys_name(file_path), "ab"):
os.utime(file_path, None if modTime is None else (modTime, modTime))
def mkdir(dirPath):
except FileExistsError:
def makedirs(dirPath):
except OSError as e:
if e.errno != errno.EEXIST:
def load_file(file_path):
with open(to_sys_name(file_path), "rb") as image_handle:
return image_handle.read()
def save_file(file_path, fileData):
with open(to_sys_name(file_path), "wb") as image_handle:
def dopo_swap(image_data):
# for each track,
# read each sector in the right sequence to make
# valid ProDOS blocks (sector pairs)
dopo = bytearray(143360)
for t in range(0, 35):
for s in range(16):
src = ts(t,s)
dst = ts(t,s if s in (0,15) else 15-s)
dopo[dst:dst+256] = image_data[src:src+256]
return bytes(dopo)
def isnumber(number):
try: # make sure it's not a string
return False
except TypeError:
except ValueError:
return False
return True
#---- end IvanX general purpose functions ----#
'<' # use little-endian numbers
'4s' # magic string '2IMG'
'4s' # creator string
'H' # header length
'H' # 2mg version
'L' # image format
'L' # flags (we unpack it into "vol")
'L' # number of 512 blocks
'L' # image data offset
'L' # image data length
'L' # comment offset
'L' # comment length
'L' # creator private use offset
'L' # creator private use length
'16x' # reserved for future use
'magic', 'creator', 'hdr_len', 'version',
'img_fmt', 'vol', 'num_blocks',
'data_offset', 'data_len',
'cmnt_offset', 'cmnt_len',
'creator_offset', 'creator_len'
A2MG1_VOL_ATTRS = ('locked', 'dosvol')
# We assume 2mg files with unknown version have these fields
A2MG_UNK_ATTRS = ('magic', 'creator', 'hdr_len', 'version')
A2mg1 = namedtuple('A2mg1', A2MG1_ATTRS)
A2mg1Vol = namedtuple('A2mg1Vol', A2MG1_VOL_ATTRS)
A2mgUnk = namedtuple('A2mgUnk', A2MG_UNK_ATTRS)
class Disk:
def __init__(self, name=None):
if name is not None:
self.pathname = name
self.path, self.filename = os.path.split(name)
self.diskname, self.ext = os.path.splitext(self.filename)
self.ext = os.path.splitext(name)[1].lower()
# FIXME: Handle compressed images?
with open(to_sys_name(name), "rb") as f:
self.image = f.read()
self.a2mg = unpack_2mg(self.image)
### UTIL
def seqsplit(seq: Sequence, num: int) -> Sequence:
"""split Sequence into smaller Sequences of size 'num'"""
for i in range(0, len(seq), num):
yield seq[i:i + num]
def hexdump(
buf: bytes,
striphigh: bool = False,
wordsize: int = 2,
sep: str = ' ',
sep2: str = ' '
) -> str:
"""return a multi-line debugging hexdump of a bytes object"""
'''Format is configurable but defaults to that of xxd:
########: #### #### #### #### #### #### #### #### |................|
wordsize is the number of bytes between separators
sep is the separator between words
sep2 is the midline separator
striphigh considers 0xa0-0xfe to be printable ASCII (as on Apple II)
out = []
hlen = 32 + len(sep2) + (16//wordsize-2) * len(sep)
wordlen = wordsize * 2
for i, vals in enumerate(seqsplit(buf, 16)):
hexs = sep2.join([
sep.join(seqsplit(b2a_hex(x).decode(), wordlen))
for x in seqsplit(vals,8)
if striphigh:
vals = [x & 0x7f for x in vals]
chars = ''.join([
chr(x) if x >= 0x20 and x < 0x7f else '.'
for x in vals
out.append('{i:07x}0: {hexs:{hlen}} |{chars}|'.format(**locals()))
return '\n'.join(out)
# *sigh* No clean/simple way to use str.format() type log strings without
# jumping through a few hoops
class Message(object):
def __init__(self, fmt, args):
self.fmt = fmt
self.args = args
def __str__(self):
return self.fmt.format(*self.args)
class StyleAdapter(logging.LoggerAdapter):
def __init__(self, logger, extra=None):
super(StyleAdapter, self).__init__(logger, extra or {})
def log(self, level, msg, *args, **kwargs):
if self.isEnabledFor(level):
msg, kwargs = self.process(msg, kwargs)
self.logger._log(level, Message(str(msg), args), (), **kwargs)
log = StyleAdapter(logging.getLogger(__name__))
def main(args: list):
# Set up our logging facility
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(levelname)s: %(message)s')
while True: # breaks when there are no more arguments starting with dash
if len(args) == 1:
elif args[1][0] != '-':
elif args[1] == '-s':
g.afpsync_msg = False
args = args[1:]
elif args[1] == '-n':
g.extract_in_place = True
args = args[1:]
elif args[1] == '-uc':
g.casefold_upper = True
args = args[1:]
elif args[1] == '-ad':
g.use_appledouble = True
g.prodos_names = True
args = args[1:]
elif args[1] == '-shk':
g.src_shk = True
args = args[1:]
elif args[1] == '-pro':
g.prodos_names = True
args = args[1:]
elif args[1] == '-e':
g.use_extended = True
g.prodos_names = True
args = args[1:]
elif args[1] == '-cat':
g.catalog_only = True
args = args[1:]
if g.use_appledouble and g.use_extended:
if g.catalog_only:
if len(args) != 2:
if len(args) not in (3, 4):
disk = Disk(args[1])
except IOError as e:
# automatically set ShrinkIt mode if extension suggests it
if g.src_shk or disk.ext in ('.shk', '.sdk', '.bxy'):
if os.name == "nt":
print("ShrinkIt archives cannot be extracted on Windows.")
with open(os.devnull, "w") as fnull:
subprocess.call("nulib2", stdout = fnull, stderr = fnull)
g.src_shk = True
except Exception:
"Nulib2 is not available; not expanding "
"ShrinkIt archive.")
if len(args) == 4:
g.extract_file = args[2]
if g.extract_file:
targetPath = args[3]
if os.path.isdir(targetPath):
g.target_dir = targetPath
elif targetPath.rsplit("/", 1) > 1:
g.target_dir, g.target_name = targetPath.rsplit("/", 1)
if not os.path.isdir(g.target_dir):
print("Target directory not found.")
if not g.catalog_only:
if not os.path.isdir(args[2]):
print("Target directory not found.")
if g.src_shk:
g.prodos_names = False
if not g.catalog_only:
targetDir = (args[3] if g.extract_file else args[2])
unshkdir = ('/tmp' + "/cppo-" + str(uuid.uuid4()))
result = os.system(
"/bin/bash -c 'cd " + unshkdir + "; "
+ "result=$(nulib2 -xse " + os.path.abspath(disk.pathname)
+ ((" " + args[2].replace('/', ':'))
if g.extract_file else "") + " 2> /dev/null); "
+ "if [[ $result == \"Failed.\" ]]; then exit 3; "
+ "else if grep -q \"no records match\" <<< \"$result\""
+ " > /dev/null; then exit 2; else exit 0; fi; fi'")
if result == 512:
"File not found in ShrinkIt archive. "
"Try cppo -cat to get the path,\n"
" and omit any leading slash or colon.")
elif result != 0:
"ShrinkIt archive is invalid, "
"or some other problem happened.")
if g.extract_file:
g.extract_file = g.extract_file.replace(':', '/')
extractPath = (unshkdir + "/" + g.extract_file)
extractPathDir = os.path.dirname(extractPath)
# move the extracted file to the root
newunshkdir = ('/tmp' + "/cppo-" + str(uuid.uuid4()))
for filename in os.listdir(extractPathDir):
shutil.move(extractPathDir + "/" + filename, newunshkdir)
unshkdir = newunshkdir
fileNames = [name for name in sorted(os.listdir(unshkdir))
if not name.startswith(".")]
if g.extract_in_place: # extract in place from "-n"
curDir = True
elif (len(fileNames) == 1 and
os.path.isdir(unshkdir + "/" + fileNames[0])):
curDir = True # only one folder at top level, so extract in place
volumeName = toProdosName(fileNames[0])
elif (len(fileNames) == 1 and # disk image, so extract in place
fileNames[0][-1:] == "i"):
curDir = True
volumeName = toProdosName(fileNames[0].split("#")[0])
else: # extract in folder based on disk image name
curDir = False
volumeName = toProdosName(os.path.basename(disk.pathname))
if volumeName[-4:].lower() in ('.shk', '.sdk', '.bxy'):
volumeName = volumeName[:-4]
if not g.catalog_only and not curDir and not g.extract_file:
print("Extracting into " + volumeName)
# recursively process unshrunk archive hierarchy
for dirName, subdirList, fileList in os.walk(unshkdir):
if not g.catalog_only:
g.target_dir = (
+ ("" if curDir else ("/" + volumeName))
+ ("/" if dirName.count('/') > 2 else "")
+ ("/".join(dirName.split('/')[3:]))) # chop tempdir
if g.extract_file: # solo item, so don't put it in the tree
g.target_dir = targetDir
if g.casefold_upper:
g.target_dir = g.target_dir.upper()
g.appledouble_dir = (g.target_dir + "/.AppleDouble")
if g.use_appledouble:
for fname in sorted(fileList):
if fname[-1:] == "i":
# disk image; rename to include suffix and correct
# type/auxtype
imagePath = os.path.join(dirName, fname).split("#")[0]
new_name = (
+ ("" if os.path.splitext(imagePath.lower())[1]
in ('.po', '.hdv') else ".PO") + "#e00005")
os.rename(os.path.join(dirName, fname), new_name)
fname = os.path.basename(new_name)
g.shk_hasrf = False
rfork = False
if (fname[-1:] == "r"
and os.path.isfile(os.path.join(dirName, fname[:-1]))):
rfork = True
elif (os.path.isfile(os.path.join(dirName, (fname + "r")))):
g.shk_hasrf = True
if not rfork:
processEntry(dirName, fname)
shutil.rmtree(unshkdir, True)
# end script if SHK
g.image_data = load_file(disk.pathname)
# detect if image is 2mg and remove 64-byte header if so
if disk.ext in ('.2mg', '.2img'):
g.image_data = g.image_data[64:]
# handle 140k disk image
if len(g.image_data) == 143360:
log.debug("140k disk")
prodos_disk = False
fix_order = False
# is it ProDOS?
if g.image_data[sli(ts(0,0), 4)] == b'\x01\x38\xb0\x03':
log.debug("detected ProDOS by boot block")
if g.image_data[sli(ts(0,1)+3, 6)] == b'PRODOS':
log.debug("order OK (PO)")
prodos_disk = True
elif g.image_data[sli(ts(0,14)+3, 6)] == b'PRODOS':
log.debug("order needs fixing (DO)")
prodos_disk = True
fix_order = True
# is it DOS 3.3?
log.debug("it's not ProDOS")
if g.image_data[ts(17,0)+3] == 3:
vtocT, vtocS = g.image_data[sli(ts(17,0) + 1,2)]
if vtocT < 35 and vtocS < 16:
log.debug("it's DOS 3.3")
g.dos33 = True
# it's DOS 3.3; check sector order next
if g.image_data[ts(17,14)+2] != 13:
log.debug("order needs fixing (PO)")
fix_order = True
log.debug("order OK (DO)")
# fall back on disk extension if weird boot block (e.g. AppleCommander)
if not prodos_disk and not g.dos33:
log.debug("format and ordering unknown, checking extension")
if disk.ext in ('.dsk', '.do'):
log.debug("extension indicates DO, changing to PO")
fix_order = True
if fix_order:
log.debug("fixing order")
g.image_data = dopo_swap(g.image_data)
#print("saving fixed order file as outfile.dsk")
#save_file("outfile.dsk", g.image_data)
if not prodos_disk and not g.dos33:
print("Warning: Unable to determine disk format, assuming ProDOS.")
# enforce leading slash if ProDOS
if (not g.src_shk and not g.dos33 and g.extract_file
and (args[2][0] not in ('/', ':'))):
if g.dos33:
disk_name = (disk.diskname
if disk.ext in ('.dsk', '.do', '.po')
else disk.filename)
if g.prodos_names:
disk_name = toProdosName(disk_name)
if not g.catalog_only:
g.target_dir = (args[3]
if g.extract_file
else (args[2] + "/" + disk_name))
g.appledouble_dir = (g.target_dir + "/.AppleDouble")
if g.use_appledouble:
if not g.extract_file:
print("Extracting into " + disk_name)
if g.extract_file:
print("ProDOS file not found within image file.")
# below: ProDOS
g.activeDirBlock = 0
g.activeFileName = ""
g.activeFileSize = 0
g.activeFileBytesCopied = 0
g.resourceFork = 0
g.prodos_names = False
if g.extract_file:
g.PDOSPATH = g.extract_file.replace(':', '/').split('/')
g.extract_file = None
if not g.PDOSPATH[0]:
g.appledouble_dir = (g.target_dir + "/.AppleDouble")
if g.use_appledouble and not os.path.isdir(g.appledouble_dir):
print("ProDOS file not found within image file.")
if not g.catalog_only:
g.target_dir = (args[2] + "/" + getVolumeName().decode())
g.appledouble_dir = (g.target_dir + "/.AppleDouble")
if not os.path.isdir(g.target_dir):
if g.use_appledouble and not os.path.isdir(g.appledouble_dir):
if __name__ == '__main__':