Initial Commit

This commit is contained in:
Sam Fuller 2021-04-11 18:50:58 -07:00
commit de17ef8df8
11 changed files with 910 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*.scsi
.idea
__pycache__

4
README.md Normal file
View File

@ -0,0 +1,4 @@
# Pimp My Plus
This repo contains a bunch of scripts for downloading all of the classic mac software you could ever want and creating HD images with that software for use with SCSI2SD. No floppy disks required!
*NOTE:* The `driver.bin` file contains the SCSI driver that is installed to the disk by Apple HD SC Setup 7.3.5. The licensing of this driver is not clear. Please consider this before using this repo for commercial purposes.

124
appledouble.py Normal file
View File

@ -0,0 +1,124 @@
"""
Reference:
http://formats.kaitai.io/apple_single_double/index.html
http://kaiser-edv.de/documents/AppleSingle_AppleDouble.pdf
above linke is dead, archive:
https://web.archive.org/web/20180311140826/http://kaiser-edv.de/documents/AppleSingle_AppleDouble.pdf
"""
import enum
from ctypes import BigEndianStructure, c_uint32, c_uint16, c_char
from io import RawIOBase
from typing import List, Optional
class AppleDoubleHeader(BigEndianStructure):
_pack_ = 1
_fields_ = [
('signature', c_uint32),
('version', c_uint32),
('reserved', c_uint32 * 4), # Must all be zero
('num_entries', c_uint16)
]
class AppleDoubleEntry(BigEndianStructure):
_pack_ = 1
_fields_ = [
('type', c_uint32),
('body_offset', c_uint32),
('body_length', c_uint32)
]
class EntryType(enum.Enum):
data_fork = 1
resource_fork = 2
real_name = 3 # File name on a file system that supports all the attributes.
comment = 4
icon_bw = 5
icon_color = 6
file_dates_info = 8 # File creation, modification, access date/timestamps.
finder_info = 9
macintosh_file_info = 10
prodos_file_info = 11
msdos_file_info = 12
afp_short_name = 13
afp_file_info = 14
afp_directory_id = 15
class Point(BigEndianStructure):
"""Specifies 2D coordinate in QuickDraw grid."""
_pack_ = 1
_fields_ = [
('x', c_uint16),
('y', c_uint16),
]
class FinderInfo(BigEndianStructure):
"""From the older Inside Macintosh publication, Volume II page 84 or Volume IV page 104."""
_pack_ = 1
_fields_ = [
('fdType', c_char * 4),
('fdCreator', c_char * 4),
('fdFlags', c_uint16),
('fdLocation', Point), # File icon's coordinates when displaying this folder.
('fdFolder', c_uint16) # File folder ID (=window).
]
class Entry(object):
def __init__(self):
self.info = None
self.data = None
class AppleDouble(object):
def __init__(self):
self.header = AppleDoubleHeader()
self.entries: List[Entry] = []
def get_entry(self, type: EntryType) -> Optional[Entry]:
for entry in self.entries:
if entry.info.type == type.value:
return entry
return None
SIGNATURE_APPLE_SINGLE = 0x00051600
SIGNATURE_APPLE_DOUBLE = 0x00051607
def parse(f: RawIOBase) -> AppleDouble:
header = AppleDoubleHeader()
f.readinto(header)
# Validate signature
if header.signature not in (SIGNATURE_APPLE_SINGLE, SIGNATURE_APPLE_DOUBLE):
raise ValueError('Invalid signature')
entries = []
for i in range(header.num_entries):
entry = Entry()
info = AppleDoubleEntry()
f.readinto(info)
entry.info = info
entries.append(entry)
for entry in entries:
info = entry.info
f.seek(info.body_offset)
data = f.read(info.body_length)
if info.type == EntryType.finder_info.value:
entry.data = FinderInfo.from_buffer_copy(data)
else:
entry.data = data
result = AppleDouble()
result.header = header
result.entries = entries
return result

30
applicationutil.py Normal file
View File

@ -0,0 +1,30 @@
# https://vintageapple.org/inside_r/pdf/PPC_System_Software_1994.pdf
from io import BytesIO
from typing import List
import rsrcfork
ARCH_68K = '68k'
ARCH_PPC = 'PPC'
def get_supported_archs(rsrc: bytes) -> List[str]:
with BytesIO(rsrc) as f:
resource_file = rsrcfork.ResourceFile(f)
archs = []
cfrg = resource_file.get(b'cfrg')
if cfrg:
# Assume it supports PPC if there's a cfrg lump.
# TODO: Check the actuall processor field in this cfrg lump?
archs.append(ARCH_PPC)
code = resource_file.get(b'code')
if code:
archs.append(ARCH_68K)
return archs

242
disk.py Normal file
View File

@ -0,0 +1,242 @@
# https://developer.apple.com/library/archive/documentation/mac/pdf/Devices/SCSI_Manager.pdf
import configparser
from ctypes import BigEndianStructure, c_uint16, c_uint32, c_char
from typing import BinaryIO, List
import machfs
class DriverIni(object):
def __init__(self):
self.partition_type = b'Apple_Driver43'
self.partition_flags = 0
self.booter = 0
self.bytes = 0
self.load_address_0 = 0
self.load_address_1 = 0
self.goto_address_0 = 0
self.goto_address_1 = 0
self.checksum = 0
self.processor = b'68000'
self.boot_args: List[int] = []
def driver_from_ini(section) -> DriverIni:
ini = DriverIni()
ini.partition_type = bytes(section['partition_type'], encoding='ascii')
ini.partition_flags = int(section['partition_flags'])
ini.booter = int(section['booter'])
ini.bytes = int(section['bytes'])
ini.load_address_0 = int(section['load_address_0'], 16)
ini.load_address_1 = int(section['load_address_1'], 16)
ini.goto_address_0 = int(section['goto_address_0'], 16)
ini.goto_address_1 = int(section['goto_address_1'], 16)
ini.checksum = int(section['checksum'], 16)
ini.processor = bytes(section['processor'], encoding='ascii')
ini.boot_args = [int(x, 0) for x in section['boot_args'].split(',')]
return ini
class DriverDescriptor(BigEndianStructure):
_pack_ = 1
_fields_ = [
('ddBlock', c_uint32),
('ddSize', c_uint16),
('ddType', c_uint16),
]
class Block0(BigEndianStructure):
_pack_ = 1
_fields_ = [
('sbSig', c_uint16),
('sbBlkSize', c_uint16),
('sbBlkCount', c_uint32),
# Dev Type and Dev Id both have no information from apple.
# Apple's hfdisk utility assigns zero for both, but System 6's disk utility sets these both to 1.
# I have no idea if these fields are used at all.
('sbDevType', c_uint16),
('sbDevId', c_uint16),
# Reserved. Seems to be unused by anything.
('sbData', c_uint32),
('sbDrvrCount', c_uint16),
('ddDrivers', DriverDescriptor * 61),
('_pad1', c_uint32),
('_pad2', c_uint16)
]
def __init__(self):
super().__init__()
self.sbSig = 0x4552 # sbSIGWord magic number.
class PartitionMapBlock(BigEndianStructure):
_pack_ = 1
_fields_ = [
('dpme_signature', c_uint16),
('dpme_sigPad', c_uint16),
('dpme_map_entries', c_uint32),
('dpme_pblock_start', c_uint32),
('dpme_pblocks', c_uint32),
('dpme_name', c_char * 32),
('dpme_type', c_char * 32),
('dpme_lblock_start', c_uint32),
('dpme_lblocks', c_uint32),
# Apple Docs say this is only used by A/UX. That is not 100% true.
('dpme_flags', c_uint32),
# Note: Below data appears to only be used for SCSI Driver partitions
('dpme_boot_block', c_uint32),
('dpme_boot_bytes', c_uint32),
('dpme_load_addr', c_uint32),
('dpme_load_addr_2', c_uint32),
('dpme_goto_addr', c_uint32),
('dpme_goto_addr_2', c_uint32),
('dpme_checksum', c_uint32),
('dpme_process_id', c_char * 16),
('dpme_boot_args', c_uint32 * 32),
('dpme_reserved_3', c_uint32 * 62)
]
def __init__(self):
super().__init__()
self.dpme_signature = 0x504d # "PM"
def create_basic_partition(name, type, start_block, block_count, flags) -> PartitionMapBlock:
block = PartitionMapBlock()
block.dpme_pblock_start = start_block
block.dpme_pblocks = block_count
block.dpme_name = name
block.dpme_type = type
block.dpme_lblocks = block_count
block.dpme_flags = flags
return block
def create_partition_map_partition() -> PartitionMapBlock:
block = PartitionMapBlock()
block.dpme_pblock_start = 1
block.dpme_pblocks = 63
block.dpme_name = b'Apple'
block.dpme_type = b'Apple_partition_map'
block.dpme_lblocks = 63
return block
def create_driver_partition_block(info: DriverIni, start_block: int, block_count: int) -> PartitionMapBlock:
block = PartitionMapBlock()
block.dpme_pblock_start = start_block
block.dpme_pblocks = block_count
block.dpme_name = b'Macintosh'
block.dpme_type = info.partition_type
block.dpme_lblocks = block_count
block.dpme_flags = info.partition_flags
block.dpme_boot_block = info.booter
block.dpme_boot_bytes = info.bytes
block.dpme_load_addr = info.load_address_0
block.dpme_load_addr_2 = info.load_address_1
block.dpme_goto_addr = info.goto_address_0
block.dpme_goto_addr_2 = info.goto_address_1
block.dpme_checksum = info.checksum
block.dpme_process_id = info.processor
for i, value in enumerate(info.boot_args):
block.dpme_boot_args[i] = value
return block
def create_bootable_disk(of: BinaryIO, volume: machfs.Volume, block_count: int):
"""
:param of:
:param volume:
:param block_count: Total blocks in the disk, including blocks used by block0, partition map, and all partitions.
:return:
"""
driver_ini = configparser.ConfigParser()
driver_ini.read('driver.ini')
driver_info = driver_from_ini(driver_ini['Driver'])
block0 = Block0()
block0.sbBlkSize = 512
block0.sbBlkCount = block_count
block0.sbDrvrCount = 1
descriptor = DriverDescriptor()
descriptor.ddBlock = 64
descriptor.ddSize = int(driver_info.bytes / int(512) + (1 if driver_info.bytes % int(512) != 0 else 0))
descriptor.ddType = 1 # Always 1
block0.ddDrivers[0] = descriptor
# Set these both to 1, just in case. See comment in Block0 class.
# TODO: Once we get a booting disk on a real MacPlus, try removing and see if it still works.
block0.sbDevType = 1
block0.sbDevId = 1
block0_bytes = bytes(block0)
if len(block0_bytes) != 512:
raise ValueError('ASSERTION FAILED! sizeof(Block0) != 512')
of.write(block0_bytes)
def write_partition_map_block(block: PartitionMapBlock):
block_bytes = bytes(block)
if len(block_bytes) != 512:
raise ValueError('ASSERTION FAILED! sizeof(PartitionMapBlock) != 512')
of.write(block_bytes)
volume_offset = 64 + 32 # Block0 + Partition Map + Driver
volume_block_count = block_count - volume_offset
partition_map_0 = create_basic_partition(
name=b'MacOS',
type=b'Apple_HFS',
start_block=volume_offset,
block_count=volume_block_count,
flags=0)
partition_map_0.dpme_map_entries = 3
write_partition_map_block(partition_map_0)
partition_map_1 = create_partition_map_partition()
partition_map_1.dpme_map_entries = 3
write_partition_map_block(partition_map_1)
partition_map_2 = create_driver_partition_block(driver_info, 64, 32)
partition_map_2.dpme_map_entries = 3
write_partition_map_block(partition_map_2)
# Write empty partition map entries
empty_block = b'\0' * 512
for i in range(1 + 3, 64): # 3 is partition map block count TODO: Kill all magic numbers
of.write(empty_block)
# Write Driver
with open('driver.bin', 'rb') as f:
for i in range(32):
of.write(f.read(512))
# Write HFS Volume
volume_data = volume.write(
size=volume_block_count * 512,
# desktopdb=False,
bootable=False
)
if len(volume_data) != volume_block_count * 512:
raise ValueError('ASSERTION FAILED! len(volume_data) != volume_block_count * 512')
of.write(volume_data)
if of.tell() != block_count * 512:
raise ValueError('Error! Output file is not expected size!')
def mb_block_count(mb: int) -> int:
kb = mb * 1024
return kb * 2 # 2 512-byte blocks = 1 kb.

37
diskcopyimage.py Normal file
View File

@ -0,0 +1,37 @@
# https://www.discferret.com/wiki/Apple_DiskCopy_4.2
from ctypes import BigEndianStructure, c_byte, c_char, c_uint32, c_uint16
from typing import BinaryIO
NAME_SIZE = 63
class DiskCopyImageHeader(BigEndianStructure):
_pack_ = 1
_fields_ = [
('name_length', c_byte),
('name', c_char * NAME_SIZE),
('data_size', c_uint32),
('tag_size', c_uint32),
('data_checksum', c_uint32),
('tag_checksum', c_uint32),
('disk_type', c_byte),
('format', c_byte),
('magic_number', c_uint16),
]
def read_data(self, f: BinaryIO) -> bytes:
if self.magic_number != 0x0100:
raise ValueError('Invalid Magic Number')
data = f.read(self.data_size)
if len(data) != self.data_size:
raise ValueError('Unexpected EOF')
# TODO: Checksum verification?
return data
@property
def image_name(self) -> bytes:
return self.name[:min(self.name_length, NAME_SIZE)]

BIN
driver.bin Normal file

Binary file not shown.

13
driver.ini Normal file
View File

@ -0,0 +1,13 @@
[Driver]
partition_type=Apple_Driver43
partition_flags=0
booter=0
bytes=9392
load_address_0=0
load_address_1=0
goto_address_0=0
goto_address_1=0
checksum=f624
processor=68000
boot_args=0x00010600,0,1,0x00070000

106
macftp.py Normal file
View File

@ -0,0 +1,106 @@
import os
import ftplib
import json
import argparse
# https://macintoshgarden.org/forum/public-access-file-repository
FTP_URL = 'repo1.macintoshgarden.org'
FTP_USER = 'macgarden'
FTP_PASS = 'publicdl'
DIR = 'Garden/apps'
OUTPUT_DIR = os.path.join('./macdl', DIR)
ALLOWED_EXTENSIONS = set(('.sit', '.dsk'))
MAX_SIZE = 1024 * 1024 * 10 # 10 MB
JSON_LIST_FILENAME = 'items.json'
argparser = argparse.ArgumentParser()
argparser.add_argument('--cached-list', help='Use cached listing of files', action='store_true')
def parse_item(line):
parts = [x for x in line.split(' ') if len(x) > 0]
size = parts[4]
name = parts[8]
return int(size), name
def should_include(size, name):
_, ext = os.path.splitext(name)
return size <= MAX_SIZE and ext in ALLOWED_EXTENSIONS
def exists(local_path, size, name):
local_path = os.path.join(OUTPUT_DIR, name)
if not os.path.isfile(local_path):
return False
return os.path.getsize(local_path) == size
def download(ftp, size, name):
local_path = os.path.join(OUTPUT_DIR, name)
if exists(local_path, size, name):
print(f'File already downloaded: {name} ({size})')
return
print(f'Downloading {name} ({size})')
with open(local_path, 'wb') as f:
result = ftp.retrbinary('RETR ' + name, f.write)
print(f' result: {result}')
args = argparser.parse_args()
try:
print('Connecting to FTP...')
ftp = ftplib.FTP(FTP_URL, user=FTP_USER, passwd=FTP_PASS)
print(f'Setting FTP directory to {DIR}')
ftp.cwd(DIR)
items = []
all_items = []
def add_item(item):
size, name = item
all_items.append(item)
will_include = should_include(size, name)
print(f'{"+" if will_include else "-"} {name} ({size})')
if will_include:
items.append(item)
def item_callback(line):
item = parse_item(line)
add_item(item)
if not args.cached_list:
ftp.retrlines('LIST', callback=item_callback)
print('Saving all_items as JSON...')
with open(JSON_LIST_FILENAME, 'w') as f:
json.dump(all_items, f)
else:
print('Loading items from JSON...')
with open(JSON_LIST_FILENAME) as f:
cached_items = json.load(f)
for item in cached_items:
add_item(item)
print('Downloading items')
os.makedirs(OUTPUT_DIR, exist_ok=True)
for size, name in items:
download(ftp, size, name)
print('done!')
finally:
print('Calling ftp.quit()')
ftp.quit()

348
preparevolume.py Normal file
View File

@ -0,0 +1,348 @@
# How to get HFS file type and creator data from the rsrc:
# http://bitsavers.org/pdf/apple/mac/Inside_Macintosh_Promotional_Edition_1985.pdf
# http://mirror.informatimago.com/next/developer.apple.com/documentation/mac/MoreToolbox/MoreToolbox-9.html
# https://developer.apple.com/library/archive/documentation/mac/pdf/MacintoshToolboxEssentials.pdf
# ^ See FInfo and FXInfo
import argparse
import os
import subprocess
import traceback
from typing import Dict, Tuple, Union, Optional
import machfs
import rsrcfork
from machfs.directory import AbstractFolder
from progress.bar import Bar
import appledouble
import applicationutil
import diskcopyimage
import disk
DEFAULT_BLOCK_TARGET = int((1024 * 1024 * 1024 * 1) / 512)
argparser = argparse.ArgumentParser()
argparser.add_argument('dl_folder')
argparser.add_argument('sit_dir')
argparser.add_argument('--target-blocks', type=int, default=DEFAULT_BLOCK_TARGET, help=f'Target size in 512 byte blocks. Default is {DEFAULT_BLOCK_TARGET} (1GiB)')
argparser.add_argument('--volume-start-index', type=int, default=0)
argparser.add_argument('--hfs-internals-ratio', type=float, default=0.85)
argparser.add_argument('--verbose', '-v', action='store_true')
args = argparser.parse_args()
class FilterException(Exception):
pass
class PreparationIssue(Exception):
pass
def sanitize_hfs_name(name: bytes, is_folder: bool) -> bytes:
if len(name) < 1:
raise ValueError('Invalid empty hfs name')
# name = b' '
val = name.replace(b':', b'?')
if is_folder:
return val[:17]
else:
return val[:31]
def sanitize_hfs_name_str(name: str, is_folder: bool) -> bytes:
return sanitize_hfs_name(name.encode('mac_roman', errors='replace'), is_folder=is_folder)
def get_hfs_file_size(file: machfs.File) -> int:
def block_align(size: int) -> int:
return (int(size / 512) + (1 if size % 512 != 0 else 0)) * 512
# Guessing 1K of Filesystem Junk for each file
return block_align(len(file.data)) + block_align(len(file.rsrc)) # + 1024
def add_disk_data(containing_folder: AbstractFolder, path: str, data: bytes) -> int:
dsk_volume = machfs.Volume()
try:
dsk_volume.read(data)
except Exception:
traceback.print_exc()
print(f'Issue reading file system from disk image at path: {path}. Skipping.')
return 0
for name, child in dsk_volume.items():
containing_folder[name] = child
total_bytes = 0
for path_tuple, dirnames, filenames in containing_folder.walk():
current_folder = containing_folder[path_tuple] if len(path_tuple) > 0 else containing_folder
for file in filenames:
current_file = current_folder[file]
total_bytes += get_hfs_file_size(current_file)
return total_bytes
def add_dsk(path: str) -> Tuple[machfs.Folder, bytes, int]:
if args.verbose:
print(f'* Adding DSK image at {path}')
base_path, dsk_filname = os.path.split(path)
folder_name, _ = os.path.splitext(dsk_filname)
dsk_folder = machfs.Folder()
sanitized_folder_name = sanitize_hfs_name_str(folder_name, is_folder=True)
with open(path, 'rb') as f:
flat = f.read()
return dsk_folder, sanitized_folder_name, add_disk_data(dsk_folder, path, flat)
def add_img(path: str) -> Tuple[machfs.Folder, bytes, int]:
if args.verbose:
print(f'* Adding DiskCopy image at {path}')
header = diskcopyimage.DiskCopyImageHeader()
with open(path, 'rb') as f:
f.readinto(header)
try:
data = header.read_data(f)
except ValueError as e:
raise PreparationIssue(f'Error reading DiskCopy file at {path}: {e}')
disk_folder = machfs.Folder()
folder_name = header.image_name
if len(folder_name) < 1:
_, filename = os.path.split(path)
folder_name = sanitize_hfs_name_str(os.path.splitext(filename)[0], is_folder=True)
else:
folder_name = sanitize_hfs_name(folder_name, is_folder=True)
return disk_folder, folder_name, add_disk_data(disk_folder, path, data)
def add_file(path: str) -> Optional[Tuple[Union[machfs.Folder, machfs.File], bytes, int]]:
base_path, filename = os.path.split(path)
base_filename, ext = os.path.splitext(filename)
has_data_file = True
if filename == '.DS_Store':
return None
if ext == '.dmg':
raise FilterException('Contains an OSX DMG')
if ext == '.rsrc':
if not os.path.isfile(os.path.join(base_path, base_filename)):
has_data_file = False
else:
# Skip .rsrc files, we handle resource forks while adding each normal file.
return None
# Try to Mount DiskCopy images
if ext == '.img' or ext == '.image':
return add_img(path)
# Expand sit files
if ext == '.sit':
return add_sit(path)
# Expand dsk files
if ext == '.dsk':
return add_dsk(path)
file = machfs.File()
if has_data_file:
with open(path, 'rb') as f:
size = f.seek(0, 2)
if size > 1024 * 1024 * 5: # >5 MiB, TODO: MAKE THIS TUNABLE
raise FilterException('Contains a file that is greater than 5 MiB')
f.seek(0)
file.data = f.read()
rsrc_path = path + '.rsrc'
else:
rsrc_path = path
if os.path.isfile(rsrc_path):
with open(rsrc_path, 'rb') as f:
try:
double = appledouble.parse(f)
except ValueError:
double = None
if double:
rsrc_entry = double.get_entry(appledouble.EntryType.resource_fork)
if rsrc_entry:
file.rsrc = rsrc_entry.data
finder_entry = double.get_entry(appledouble.EntryType.finder_info)
if finder_entry:
file.type = bytes(finder_entry.data.fdType)
file.creator = bytes(finder_entry.data.fdCreator)
file.flags = finder_entry.data.fdFlags
file.x = finder_entry.data.fdLocation.x
file.y = finder_entry.data.fdLocation.y
try:
supported_archs = applicationutil.get_supported_archs(file.rsrc)
except rsrcfork.api.InvalidResourceFileError:
print(f'Warning: Unable to parse resource fork from AppleDouble file at {rsrc_path}')
supported_archs = []
if len(supported_archs) > 0 and applicationutil.ARCH_68K not in supported_archs:
raise FilterException('Found a non-68k executable.')
if args.verbose:
print(f'* Adding file at path {path}')
hfs_filename = filename if has_data_file else base_filename
sanitized_name = sanitize_hfs_name_str(hfs_filename, is_folder=False)
return file, sanitized_name, get_hfs_file_size(file)
def add_files(root: AbstractFolder, path: str) -> int:
path_map: Dict[str, AbstractFolder] = {path: root}
total_bytes = 0
for dirpath, dirnames, filenames in os.walk(path):
containing_folder: AbstractFolder = path_map[dirpath]
for dirname in dirnames:
_, dirname_ext = os.path.splitext(dirname)
if dirname_ext == '.app':
raise FilterException(".app directory detected")
dirname_path = os.path.join(dirpath, dirname)
hfs_dir = machfs.Folder()
clean_dirname = sanitize_hfs_name_str(dirname, is_folder=True)
containing_folder[clean_dirname] = hfs_dir
path_map[dirname_path] = hfs_dir
for filename in filenames:
filepath = os.path.join(dirpath, filename)
result = add_file(filepath)
if not result:
continue
file, hfs_filename, file_bytes = result
containing_folder[hfs_filename] = file
total_bytes += file_bytes
return total_bytes
def add_sit(path: str) -> Tuple[machfs.Folder, bytes, int]:
_, filename = os.path.split(path)
folder_name, _ = os.path.splitext(filename)
output_dir = os.path.join(args.sit_dir, folder_name)
result = subprocess.run([
'unar',
'-o', args.sit_dir,
'-s', # Skip files which exist
'-d', # Force directory,
'-p', '', # Always use blank password
'-q', # Quiet
'-forks', 'visible',
path
])
if result.returncode != 0:
raise PreparationIssue(f'There was an error extracting {path}')
folder = machfs.Folder()
folder_name = sanitize_hfs_name_str(folder_name, is_folder=True)
return folder, folder_name, add_files(folder, output_dir)
def sizeof_fmt(num, suffix='B'):
"""https://stackoverflow.com/a/1094933/594760"""
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
class CoolBar(Bar):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bytes_taken = 0
@property
def human_readable_bytes(self):
return sizeof_fmt(self.bytes_taken)
def to_blocks(byte_count: int) -> int:
return int(byte_count / 512) + (1 if byte_count % 512 != 0 else 0)
class VolumeManager:
def __init__(self, start_index: int):
self.volume_index = start_index
self.bytes_taken = 0
self.volume = machfs.Volume()
def write_volume(self):
self.volume.name = f'Pimp My Plus #{self.volume_index}'
volume_output_path = f'collection.{self.volume_index}.scsi'
print(f'Writing Volume to {volume_output_path} with {args.target_blocks} blocks')
with open(volume_output_path, 'wb') as f:
disk.create_bootable_disk(f, self.volume, args.target_blocks)
self.volume_index += 1
self.volume = machfs.Volume()
self.bytes_taken = 0
files = os.listdir(args.dl_folder)
files.sort(key=str.casefold)
volume_manager = VolumeManager(args.volume_start_index)
# Extra blocks are taken by the filesystem when writing the volume.
# In the future, we could be more smart about this (Do bookkeeping when adding files to the volume, might require modifying machfs library)
# For now, just cheese it and calculate usable space using a ratio of file data to filesystem data.
target_blocks_per_volume = int(args.target_blocks * args.hfs_internals_ratio) - 96
target_size = target_blocks_per_volume * 512
with CoolBar(max=len(files), suffix='%(percent)d%% -- %(index)d / %(max)d -- ~%(human_readable_bytes)s') as progress:
progress.start()
for file in files:
path = os.path.join(args.dl_folder, file)
result = None
try:
result = add_file(path)
except (PreparationIssue, FilterException) as e:
print(e)
if result:
result_file, result_filename, bytes_taken = result
# If this new entry would cause us to go over, write the current volume out and start a new volume.
if volume_manager.bytes_taken + bytes_taken > target_size:
print('Reached target size, writing a volume.')
volume_manager.write_volume()
progress.bytes_taken = 0
volume_manager.volume[result_filename] = result_file
print(f'\n* Added {file} (~{sizeof_fmt(bytes_taken)})')
progress.bytes_taken += bytes_taken
volume_manager.bytes_taken += bytes_taken
progress.next()
volume_manager.write_volume()
print('Done!')

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
machfs==1.2.4
rsrcfork==1.8.0
progress==1.5