mirror of
https://github.com/bobbimanners/ProDOS-Utils.git
synced 2024-12-13 15:29:57 +00:00
392 lines
11 KiB
Python
Executable File
392 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
###########################################################################
|
|
# Bobbi June 2020
|
|
#
|
|
# Alternative server for ADTPro's VEDRIVE.SYSTEM
|
|
# Virtual Ethernet Drive for Apple II / ProDOS
|
|
#
|
|
# See https://www.adtpro.com/protocolv1.html
|
|
#
|
|
###########################################################################
|
|
|
|
pd25 = False # Default to old-style date/time --prodos25 to use new format
|
|
file1 = "/home/bobbi/virtual-1.po" # Disk image drive 1 --disk1 to override
|
|
file2 = "/home/bobbi/virtual-2.po" # Disk image drive 2 --disk2 to override
|
|
serial_port = None # Serial port to use instead of ethernet
|
|
baud_rate = 115200 # Baud rate for serial mode
|
|
|
|
###########################################################################
|
|
|
|
import socket
|
|
import time
|
|
import os
|
|
import getopt
|
|
import sys
|
|
|
|
IP = "::"
|
|
PORT = 6502
|
|
BLKSZ = 512
|
|
|
|
# vt100 colour codes for pretty printing
|
|
BLK = '\033[90m'
|
|
RED = '\033[91m'
|
|
GRN = '\033[92m'
|
|
YEL = '\033[93m'
|
|
BLU = '\033[94m'
|
|
MAG = '\033[95m'
|
|
CYN = '\033[96m'
|
|
WHT = '\033[97m'
|
|
ENDC = '\033[0m'
|
|
|
|
# Globals
|
|
systemd = False # True if running under Systemd
|
|
packet = 1 # Sent packet counter
|
|
prevblk = -1 # Last block read/written
|
|
prevdrv = -1 # Last drive read/written
|
|
prevop = -1 # Last operation (read or write)
|
|
prevcs = -1 # Previous checksum
|
|
col = 0 # Used to control logging printout
|
|
skip1 = 0 # Bytes to skip over header (Drive 1)
|
|
skip2 = 0 # Bytes to skip over header (Drive 2)
|
|
|
|
#
|
|
# Get date/time bytes
|
|
#
|
|
def getDateTimeBytes():
|
|
global pd25
|
|
t = time.localtime()
|
|
dt = []
|
|
if pd25:
|
|
# ProDOS 2.5+
|
|
word1 = 2048 * t.tm_mday + 64 * t.tm_hour + t.tm_min
|
|
word2 = 4096 * (t.tm_mon + 1) + t.tm_year
|
|
else:
|
|
# Legacy ProDOS <2.5
|
|
word1 = t.tm_min + 256 * t.tm_hour
|
|
word2 = t.tm_mday + 32 * t.tm_mon + 512 * (t.tm_year - 2000)
|
|
dt.append(word1 & 0xff)
|
|
dt.append((word1 & 0xff00) >> 8)
|
|
dt.append(word2 & 0xff)
|
|
dt.append((word2 & 0xff00) >> 8)
|
|
return dt
|
|
|
|
#
|
|
# Append byte b to list l, return updated checksum
|
|
#
|
|
def appendbyte(l, b, csin):
|
|
l.append(b)
|
|
return csin ^ b
|
|
|
|
|
|
#
|
|
# Pretty print info about each request
|
|
#
|
|
def printinfo(drv, blknum, isWrite, isError, cs, filename):
|
|
global systemd, prevblk, prevdrv, prevop, prevcs, col
|
|
if drv != prevdrv:
|
|
if systemd:
|
|
print('\nDrive {} ({})'.format(drv, filename))
|
|
else:
|
|
print('\n{}Drive {} ({}){}'.format(BLU, drv, filename, ENDC))
|
|
col = 0
|
|
e = '+' if ((blknum == prevblk) and (drv == prevdrv) and (isWrite == prevop) and (cs == prevcs)) else ' '
|
|
e = 'X' if isError else e
|
|
if systemd:
|
|
c = 'W' if isWrite else 'R'
|
|
print(' {0}{1}{2:05d}'.format(e, c, blknum), end='', flush=True)
|
|
else:
|
|
c = RED if isWrite else GRN
|
|
print('{0} {1}{2:05d}{3}'.format(c, e, blknum, ENDC), end='', flush=True)
|
|
col += 1
|
|
if col == 8:
|
|
print('')
|
|
col = 0
|
|
prevblk = blknum
|
|
prevdrv = drv
|
|
prevop = isWrite
|
|
prevcs = cs
|
|
|
|
#
|
|
# Augment filename by adding IP
|
|
# If filename is basename.ext and IP is 192.168.2.3
|
|
# will return basename-192.168.2.3.ext
|
|
#
|
|
def augment_filename(filename, ip):
|
|
idx = filename.rfind(".")
|
|
basename = filename[0 : idx]
|
|
ext = filename[idx:]
|
|
return basename + "-" + ip + ext
|
|
|
|
#
|
|
# See if augmented filename exists, if so return that name
|
|
# otherwise return the unaugmented name
|
|
#
|
|
def select_filename(filename, addr):
|
|
if serial_port:
|
|
return filename
|
|
ip = addr[0]
|
|
ip = ip[ip.rfind(":")+1:]
|
|
filename_with_ip = augment_filename(filename, ip)
|
|
try:
|
|
with open(filename_with_ip, 'r+b'):
|
|
pass
|
|
except:
|
|
return filename
|
|
return filename_with_ip
|
|
|
|
#
|
|
# Read block with date/time update
|
|
#
|
|
def read3(dataport, addr, d):
|
|
global packet
|
|
|
|
d = dataport.recvmore(d, 3)
|
|
|
|
if d[1] == 0x03:
|
|
filename = file1
|
|
drv = 1
|
|
skip = skip1
|
|
else:
|
|
filename = file2
|
|
drv = 2
|
|
skip = skip2
|
|
|
|
filename = select_filename(filename, addr)
|
|
|
|
blknum = d[2] + 256 * d[3]
|
|
|
|
err = False
|
|
try:
|
|
with open(filename, 'r+b') as f:
|
|
b = blknum * BLKSZ + skip
|
|
f.seek(b)
|
|
block = f.read(BLKSZ)
|
|
except:
|
|
err = True
|
|
|
|
dt = getDateTimeBytes()
|
|
l = []
|
|
if not serial_port:
|
|
appendbyte(l, packet & 0xff, 0) # Packet number
|
|
packet += 1
|
|
cs = appendbyte(l, 0xc5, 0) # "E"
|
|
cs = appendbyte(l, d[1], cs) # 0x03 or 0x05
|
|
cs = appendbyte(l, d[2], cs) # Block num LSB
|
|
cs = appendbyte(l, d[3], cs) # Block num MSB
|
|
cs = appendbyte(l, dt[0], cs) # Time of day LSB
|
|
cs = appendbyte(l, dt[1], cs) # Time of day MSB
|
|
cs = appendbyte(l, dt[2], cs) # Date LSB
|
|
cs = appendbyte(l, dt[3], cs) # Date MSB
|
|
appendbyte(l, cs, cs) # Checksum for header
|
|
|
|
# Signal read errors by responding with incorrect checksum
|
|
if err:
|
|
cs += 1
|
|
else:
|
|
cs = 0
|
|
for i in range (0, BLKSZ):
|
|
cs = appendbyte(l, block[i], cs)
|
|
|
|
|
|
appendbyte(l, cs, cs) # Checksum for datablock
|
|
|
|
printinfo(drv, blknum, False, err, cs, filename)
|
|
|
|
b = dataport.sendto(bytearray(l), addr)
|
|
#print('Sent {} bytes to {}'.format(b, addr))
|
|
|
|
#
|
|
# Write block
|
|
#
|
|
def write(dataport, addr, d):
|
|
global packet
|
|
|
|
d = dataport.recvmore(d, BLKSZ + 4)
|
|
|
|
if d[1] == 0x02:
|
|
filename = file1
|
|
drv = 1
|
|
skip = skip1
|
|
else:
|
|
filename = file2
|
|
drv = 2
|
|
skip = skip2
|
|
|
|
filename = select_filename(filename, addr)
|
|
|
|
cs = 0
|
|
for i in range (0, BLKSZ):
|
|
cs ^= d[i+5]
|
|
|
|
blknum = d[2] + 256 * d[3]
|
|
|
|
err = False
|
|
if cs == d[517]:
|
|
try:
|
|
with open(filename, 'r+b') as f:
|
|
b = blknum * BLKSZ + skip
|
|
f.seek(b)
|
|
for i in range (0, BLKSZ):
|
|
f.write(bytes([d[i+5]]))
|
|
except:
|
|
err = True # Write error
|
|
else:
|
|
err == True # Bad checksum
|
|
|
|
# Signal write errors by responding with bad data checksum.
|
|
# Use sender's checksum + 1, so there is never an inadvertent match.
|
|
if err:
|
|
cs = d[517] + 1
|
|
|
|
l = []
|
|
if not serial_port:
|
|
appendbyte(l, packet & 0xff, 0) # Packet number
|
|
packet += 1
|
|
appendbyte(l, 0xc5, 0) # "E"
|
|
appendbyte(l, d[1], 0) # 0x02 or 0x04
|
|
appendbyte(l, d[2], 0) # Block num LSB
|
|
appendbyte(l, d[3], 0) # Block num MSB
|
|
appendbyte(l, cs, 0) # Checksum of datablock
|
|
|
|
printinfo(drv, blknum, True, err, cs, filename)
|
|
|
|
b = dataport.sendto(bytearray(l), addr)
|
|
#print('Sent {} bytes to {}'.format(b, addr))
|
|
|
|
#
|
|
# See if file is a 2MG and, if so, that it contains .PO image
|
|
# Returns bytes to skip over header
|
|
#
|
|
def check2MG(filename):
|
|
try:
|
|
with open(filename, 'r+b') as f:
|
|
hdr = f.read(16)
|
|
except:
|
|
return 0
|
|
if (hdr[0] == 0x32) and (hdr[1] == 0x49) and (hdr[2] == 0x4d) and (hdr[3] == 0x47):
|
|
print('** ' + filename + ' is a 2MG file **')
|
|
if hdr[0x0c] != 0x01:
|
|
print('** Warning NOT in ProDOS order **')
|
|
return 64
|
|
return 0
|
|
|
|
class DataPort:
|
|
class Timeout(Exception):
|
|
pass
|
|
|
|
def __init__(self, serial_port, baud_rate):
|
|
if serial_port:
|
|
import serial # Import locally to avoid hard dependency
|
|
|
|
# Use a short timeout so that the protocol resets on desync. This
|
|
# tends to happen if you reboot the client.
|
|
self.impl = serial.Serial(
|
|
port=serial_port, baudrate=baud_rate, bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
|
|
timeout=1, xonxoff=False, rtscts=True, dsrdtr=True
|
|
)
|
|
self.stream = True
|
|
print("veserver - listening on serial port {}".format(serial_port))
|
|
else:
|
|
self.impl = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
|
|
self.impl.bind((IP, PORT))
|
|
self.stream = False
|
|
print("veserver - listening on UDP port {}".format(PORT))
|
|
|
|
def __enter__(self):
|
|
self.impl.__enter__()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.impl.__exit__(exc_type, exc_val, exc_tb)
|
|
|
|
def recvfrom(self, required_bytes):
|
|
if self.stream:
|
|
data = self.impl.read(required_bytes)
|
|
if len(data) < required_bytes:
|
|
raise DataPort.Timeout
|
|
else:
|
|
return data, None
|
|
else:
|
|
return self.impl.recvfrom(1024)
|
|
|
|
def recvmore(self, data, remaining_bytes):
|
|
if self.stream:
|
|
return data + self.impl.read(remaining_bytes)
|
|
else:
|
|
return data
|
|
|
|
def sendto(self, data, addr):
|
|
if self.stream:
|
|
return self.impl.write(data)
|
|
else:
|
|
return self.impl.sendto(data, addr)
|
|
|
|
def usage():
|
|
print('usage: veserver [OPTION]...')
|
|
print(' -h, --help Show this help');
|
|
print(' -p, --prodos25 Use ProDOS 2.5 date/time format');
|
|
print(' -1 FNAME, --disk1=FNAME Specify filename for disk 1 image');
|
|
print(' -2 FNAME, --disk2=FNAME Specify filename for disk 2 image');
|
|
print(' -s PORT, --serial=PORT Use a serial link instead of ethernet');
|
|
print(' -b BAUD, --baud=BAUD Baud rate for serial link');
|
|
|
|
#
|
|
# Entry point
|
|
#
|
|
|
|
# Check whether we are running under Systemd or not
|
|
if 'INVOCATION_ID' in os.environ:
|
|
systemd = True
|
|
|
|
short_opts = "hp1:2:s:b:"
|
|
long_opts = ["help", "prodos25", "disk1=", "disk2=", "serial=", "baud="]
|
|
try:
|
|
args, vals = getopt.getopt(sys.argv[1:], short_opts, long_opts)
|
|
except getopt.error as e:
|
|
print (str(e))
|
|
usage()
|
|
sys.exit(2)
|
|
|
|
for a, v in args:
|
|
if a in ('-h', '--help'):
|
|
usage()
|
|
sys.exit(0)
|
|
elif a in ('-p', '--prodos25'):
|
|
pd25 = True
|
|
elif a in ('-1', '--disk1'):
|
|
file1 = v
|
|
elif a in ('-2', '--disk2'):
|
|
file2 = v
|
|
elif a in ('-s', '--serial'):
|
|
serial_port = v
|
|
elif a in ('-b', '--baud'):
|
|
baud_rate = int(v)
|
|
|
|
print("VEServer v1.3")
|
|
if pd25:
|
|
print("ProDOS 2.5+ Clock Driver")
|
|
else:
|
|
print("Legacy ProDOS Clock Driver")
|
|
|
|
print("Disk 1: {}".format(file1))
|
|
skip1 = check2MG(file1)
|
|
print("Disk 2: {}".format(file2))
|
|
skip2 = check2MG(file2)
|
|
|
|
with DataPort(serial_port, baud_rate) as dataport:
|
|
while True:
|
|
try:
|
|
data, address = dataport.recvfrom(2)
|
|
#print('Received {} bytes from {}'.format(len(data), address))
|
|
if (data[0] == 0xc5):
|
|
if (data[1] == 0x03) or (data[1] == 0x05):
|
|
read3(dataport, address, data)
|
|
elif (data[1] == 0x02) or (data[1] == 0x04):
|
|
write(dataport, address, data)
|
|
except DataPort.Timeout:
|
|
pass
|
|
|
|
|