Bare-bones py65-based simulator for Apple //e with Uthernet (i.e.

simulating the W5100).  This will hopefully be useful for
troubleshooting and testing player behaviour more precisely, e.g.

- trapping read/write access to unexpected memory areas
- asserting invariants on the processor state across loops
- measuring cycle timing
- tracing program execution

This already gets as far as negotiating the TCP connect.  The major
remaining piece seems to be the TCP buffer management on the W5100 side.
This commit is contained in:
kris 2019-02-27 22:26:35 +00:00
parent 2b3343f374
commit 90f696b8e4
5 changed files with 785 additions and 0 deletions

196
simulator/apple2.py Normal file
View File

@ -0,0 +1,196 @@
import machine
import memory
import uthernet
from py65 import disassembler
from py65.devices import mpu65c02
class AppleII(machine.Machine):
def __init__(self, uthernet: uthernet.Uthernet):
memory_map = [
memory.MemoryRegion("Zero page", 0x0000, 0x00ff),
memory.MemoryRegion("Stack", 0x0100, 0x01ff),
memory.MemoryRegion(
"Text page 1", 0x0400, 0x7ff,
write_interceptor=self.TextPageWriteInterceptor),
memory.MemoryRegion("HiRes Page 1", 0x2000, 0x2fff),
memory.MemoryRegion("HiRes Page 2", 0x4000, 0x4fff, writable=False),
memory.MemoryRegion(
"IO page", 0xc000, 0xc0ff,
read_interceptor=self.io_interceptor,
write_interceptor=self.io_interceptor),
memory.MemoryRegion("Slot 1 ROM", 0xc100, 0xc1ff, writable=False),
memory.MemoryRegion("Slot 2 ROM", 0xc200, 0xc2ff, writable=False),
memory.MemoryRegion("Slot 3 ROM", 0xc300, 0xc3ff, writable=False),
memory.MemoryRegion("Slot 4 ROM", 0xc400, 0xc4ff, writable=False),
memory.MemoryRegion("Slot 5 ROM", 0xc500, 0xc5ff, writable=False),
memory.MemoryRegion("Slot 6 ROM", 0xc600, 0xc6ff, writable=False),
memory.MemoryRegion("Slot 7 ROM", 0xc700, 0xc7ff, writable=False),
memory.MemoryRegion(
"ROM", 0xd000, 0xffff,
entrypoints={
0xfca8: self._Wait,
0xfded: machine._Event("ROM", "COUT"),
0xfe89: machine._Event("ROM", "Select the keyboard (IN#0)")
},
writable=False
)
]
self.memory_manager = memory.MemoryManager(memory_map)
self.memory = self.memory_manager.memory
self.cpu = mpu65c02.MPU(memory=self.memory)
self.uthernet = uthernet # type: uthernet.Uthernet
self.disassembler = disassembler.Disassembler(self.cpu)
def _uther_wmode(mode, value):
if mode & machine.AccessMode.READ:
return self.uthernet.read_mode()
else:
return self.uthernet.write_mode(value)
def _uther_wadrh(mode, value):
old = self.uthernet.ptr
self.uthernet.ptr = (value << 8) | (self.uthernet.ptr & 0x7f)
machine.Log("WADRH", "%04x -> %04x" % (old, self.uthernet.ptr))
def _uther_wadrl(mode, value):
old = self.uthernet.ptr
self.uthernet.ptr = (self.uthernet.ptr & 0x7f00) | value
machine.Log("WADRL", "%04x -> %04x" % (old, self.uthernet.ptr))
def _uther_wdata(mode, value):
if mode & machine.AccessMode.READ:
return self.uthernet.read_data()
else:
return self.uthernet.write_data(value)
# Set up interceptors for accessing various interesting parts of the
# memory map
self.io_map = {
0xc094: (
machine.AccessMode.RW, "WMODE", _uther_wmode),
0xc095: (
machine.AccessMode.WRITE, "WADRH", _uther_wadrh),
0xc096: (
machine.AccessMode.WRITE, "WADRL", _uther_wadrl),
0xc097: (
machine.AccessMode.RW, "WDATA", _uther_wdata),
}
self.soft_switches = {}
for ss in [
machine.SoftSwitch(
"80Store",
clear_addr=0xc000,
set_addr=0xc001,
status_addr=0xc018,
callback=machine.SoftSwitch.unimplemented
),
machine.SoftSwitch(
"RamRd",
clear_addr=0xc002,
set_addr=0xc003,
status_addr=0xc013,
callback=machine.SoftSwitch.unimplemented
),
machine.SoftSwitch(
"RamWrt",
clear_addr=0xc004,
set_addr=0xc005,
status_addr=0xc014,
callback=machine.SoftSwitch.unimplemented
),
machine.SoftSwitch(
"IntCxROM",
clear_addr=0xc006,
set_addr=0xc007,
status_addr=0xc015,
callback=machine.SoftSwitch.unimplemented
),
machine.SoftSwitch(
"AltZP",
clear_addr=0xc008,
set_addr=0xc009,
status_addr=0xc016,
callback=machine.SoftSwitch.unimplemented
),
machine.SoftSwitch(
"SlotC3ROM",
clear_addr=0xc00a,
set_addr=0xc00b,
status_addr=0xc017,
callback=machine.SoftSwitch.unimplemented
),
machine.SoftSwitch(
"80Col",
clear_addr=0xc00c,
set_addr=0xc00d,
status_addr=0xc01f
),
machine.SoftSwitch(
"AltCharSet",
clear_addr=0xc00e,
set_addr=0xc00f,
status_addr=0xc01e
),
machine.SoftSwitch(
"Text",
clear_addr=0xc050,
set_addr=0xc051,
status_addr=0xc01a,
mode=machine.AccessMode.RW
),
machine.SoftSwitch(
"Mixed",
clear_addr=0xc052, set_addr=0xc053,
status_addr=0xc01b,
mode=machine.AccessMode.RW
),
machine.SoftSwitch(
"Page2",
clear_addr=0xc054, set_addr=0xc055,
status_addr=0xc01c,
mode=machine.AccessMode.RW
),
machine.SoftSwitch(
"Hires",
clear_addr=0xc056, set_addr=0xc057,
status_addr=0xc01d,
mode=machine.AccessMode.RW
)
]:
self.soft_switches[ss.name] = ss
ss.register(self.io_map)
@staticmethod
def _Wait(_):
print("Waiting")
# TODO: convert addresses to screen coordinates
# See e.g. https://retrocomputing.stackexchange.com/questions/2534/what-are-the-screen-holes-in-apple-ii-graphics
@staticmethod
def TextPageWriteInterceptor(address, value):
print('Wrote "%s" to text page address $%04X' % (chr(value & 0x7f),
address))
def Run(self, pc, trace=False):
self.cpu.pc = pc
old_pc = self.cpu.pc
while True:
self.memory_manager.MaybeInterceptExecution(self.cpu, old_pc)
old_pc = self.cpu.pc
if trace:
print(self.cpu)
print(" $%04X: %s" % (
self.cpu.pc,
self.disassembler.instruction_at(self.cpu.pc)[1]))
self.cpu.step()
if self.cpu.pc == old_pc:
break

134
simulator/machine.py Normal file
View File

@ -0,0 +1,134 @@
"""Abstract hardware machine with CPU and memory."""
import enum
from typing import Dict, Tuple, Callable, Optional
from py65 import memory as py65_memory
class AccessMode(enum.IntFlag):
READ = 0x1
WRITE = 0x2
RW = READ | WRITE
class Event(object):
def __init__(self, event_type, details):
self.event_type = event_type
self.details = details
def __str__(self) -> str:
return "Event(%s): %s" % (self.event_type, self.details)
def Log(region:str, message:str):
print("%s event: %s" % (region, message))
# TODO: why?
def _Event(region:str, message:str):
def _Event(_):
Log(region, message)
return _Event
class TrapException(Exception):
def __init__(self, address:int, msg:str):
self.address = address
self.msg = msg
def __str__(self) -> str:
return "$%04X: %s" % (self.address, self.msg)
class SoftSwitch:
def __init__(
self, name: str, clear_addr: int, set_addr: int,
status_addr: int, mode: AccessMode = AccessMode.WRITE,
callback=None):
self.name = name
self.clear_addr = clear_addr
self.set_addr = set_addr
self.status_addr = status_addr
# Whether switch is set/clear by READ/WRITE or both
self.mode = mode # type: AccessMode
self.state = False # type: bool
self.callback = callback # type: Callable[[bool], Optional[int]]
def set(self) -> Optional[int]:
self.state = True
Log(self.name, "Setting soft switch")
return self.callback(True)
def clear(self) -> Optional[int]:
self.state = False
Log(self.name, "Clearing soft switch")
return self.callback(False)
def get(self) -> int:
Log(self.name, "Reading soft switch (%s)" % (
"on" if self.state else "off"))
return 0x80 & self.state
@staticmethod
def unimplemented(_):
raise NotImplementedError
def register(self, io_map):
def _clear(mode, value):
return self.clear()
def _set(mode, value):
return self.set()
def _get(mode, value):
return self.get()
io_map[self.clear_addr] = (
self.mode, "%s OFF" % self.name, _clear)
io_map[self.set_addr] = (
self.mode, "%s ON" % self.name, _set)
io_map[self.status_addr] = (
AccessMode.READ, "%s READ" % self.name, _get)
class Machine:
def __init__(self):
self.memory_manager = None # type: memory.MemoryManager
self.memory = None # type: py65_memory.ObservableMemory
self.cpu = None
self.io_map = {} # type: Dict[int, Tuple[AccessMode, str, Callable]]
@staticmethod
def unimplemented_io_callback(mode, value):
raise NotImplementedError
def io_interceptor(self, address, value=None):
access_mode = (
AccessMode.READ if (value is None) else AccessMode.WRITE)
try:
(mode, name, callback) = self.io_map[address]
if access_mode & mode:
if access_mode & AccessMode.READ:
print("==== IO EVENT: READ %s" % name)
else:
print("==== IO EVENT: WRITE %s -> %02x" % (name, value))
if callback:
return callback(access_mode, value)
else:
return
else:
print("**** IO EVENT with unexpected mode: %s" % access_mode)
raise TrapException(address, access_mode)
except KeyError:
if value:
raise TrapException(
address, 'Wrote %02X ("%s")' % (value, chr(value)))
else:
raise TrapException(address, 'Read')

106
simulator/memory.py Normal file
View File

@ -0,0 +1,106 @@
from collections import defaultdict
from py65 import memory
class WriteProtectedException(Exception):
def __init__(self, name, addr, value):
self.name = name
self.addr = addr
self.value = value
def __str__(self):
return 'Write denied to %s ($%04X): $%02X' % (
self.name, self.addr, self.value)
class UndefinedEntryPointException(Exception):
def __init__(self, region, prev_addr, addr):
self.region = region
self.prev_addr = prev_addr
self.addr = addr
def __str__(self):
return 'Entered region %s via undefined entry point: $%04X --> $%04X' % (
self.region.name, self.prev_addr, self.addr)
class MemoryRegion:
def __init__(
self, name, start, end, read_interceptor=None,
write_interceptor=None, entrypoints=None,
writable=True):
self.name = name
self.start = start
self.end = end
self.read_interceptor = read_interceptor
self.write_interceptor = write_interceptor
self.writable = writable
# Maps PC to handler
self.entrypoints = entrypoints or {}
class MemoryManager:
def __init__(self, memory_map):
self.entrypoints = defaultdict(list)
default_region = MemoryRegion('default', 0x0, 0xffff)
def _default_region():
return default_region
self.regions = defaultdict(_default_region)
self.memory = memory.ObservableMemory()
self._memory_map = memory_map
def enable(self):
for region in self._memory_map:
self.RegisterRegion(region)
def MaybeInterceptExecution(self, cpu, old_pc):
pc = cpu.pc
if pc in self.entrypoints:
handlers = self.entrypoints[pc]
else:
handlers = []
if self.regions[old_pc] != self.regions[pc]:
print("Entering region %s" % self.regions[pc].name)
if not handlers:
raise UndefinedEntryPointException(self.regions[pc], old_pc, pc)
for handler in handlers:
handler(cpu)
def RegisterRegion(self, region):
addr_range = range(region.start, region.end + 1)
if region.read_interceptor:
self.memory.subscribe_to_read(addr_range, region.read_interceptor)
if region.write_interceptor:
self.memory.subscribe_to_write(addr_range, region.write_interceptor)
if not region.writable:
self.memory.subscribe_to_write(addr_range,
self.DenyWritesToRegion(region))
for addr in addr_range:
self.regions[addr] = region
for addr, handler in region.entrypoints.items():
self.entrypoints[addr].append(handler)
# TODO: should trap by default
@staticmethod
def DenyWritesToRegion(region):
def _DenyWritesInterceptor(addr, value):
raise WriteProtectedException(region.name, addr, value)
return _DenyWritesInterceptor

51
simulator/simulator.py Normal file
View File

@ -0,0 +1,51 @@
import subprocess
import sys
import apple2
import uthernet
def main():
stream = open("out.bin", "rb").read()
uth = uthernet.Uthernet(stream)
a2 = apple2.AppleII(uth)
# Read in Apple IIE ROM image
rom = open("simulator/APPLE2E.ROM", "rb").read()
# TODO: other slot ROMs; alternate Cx ROMs
# Slot 6 ROM
a2.memory.write(0xc600, rom[0x0600:0x6ff])
# Main ROM
a2.memory.write(0xd000, rom[0x5000:0x7fff])
# Load video player
# Extract ethernet.bin from disk image
cmd = "java -jar ethernet/ethernet/make/AppleCommander.jar -g " \
"ethernet/ethernet/ethernet.dsk ethernet " \
"ethernet/ethernet/ethernet.bin"
p = subprocess.run(cmd.split())
if p.returncode:
sys.exit(1)
load_addr = 0x8000
with open("ethernet/ethernet/ethernet.bin", "rb") as f:
code = f.read()
a2.memory.write(load_addr, code)
# COUT vector
a2.memory[0x36] = 0xf0
a2.memory[0x37] = 0xfd
a2.memory_manager.enable()
# TODO: why does this not use the 6502 reset vector?
a2.cpu.reset()
a2.Run(load_addr, trace=True)
if __name__ == "__main__":
main()

298
simulator/uthernet.py Normal file
View File

@ -0,0 +1,298 @@
import machine
import memory
class Uthernet(machine.Machine):
"""Uthernet device simulator."""
def __init__(self, stream:bytes):
memory_map = [
memory.MemoryRegion(
"Registers", 0x0000, 0x002f,
read_interceptor=self.io_interceptor,
write_interceptor=self.io_interceptor),
memory.MemoryRegion(
"Socket registers", 0x0400, 0x07ff,
read_interceptor=self.io_interceptor,
write_interceptor=self.io_interceptor),
memory.MemoryRegion("TX Memory", 0x4000, 0x5fff),
memory.MemoryRegion("RX Memory", 0x6000, 0x7fff),
]
self.memory_manager = memory.MemoryManager(memory_map)
self.memory = self.memory_manager.memory
self.memory_manager.enable()
self._indirect_bus_mode = False # type: bool
self._auto_increment = False # type: bool
# Address read pointer
self.ptr = 0x0000
# Inbound data to buffer via TCP socket
self.stream = stream
def _mode(mode, value):
if not (mode & machine.AccessMode.WRITE):
return
# 7 - reset
# 1 - address auto-increment
# 0 - indirect bus mode
assert value & 0b10000011 == value, value
self._indirect_bus_mode = bool(value & 1)
self._auto_increment = bool(value & (1 << 1))
machine.Log(
"Uthernet", "Indirect bus mode: %s, Auto-incr: %s" % (
self._indirect_bus_mode, self._auto_increment
))
if value & (1 << 7):
self.reset()
def _socket_mode(mode, value):
if not (mode & machine.AccessMode.WRITE):
return
# 5 - delayed ACK disabled
# 3 - 0 for TCP
# 2 - 0 for TCP
# 1 - 0 for TCP
# 0 - 1 for TCP
assert value == 0b100001, value
def _socket_command(mode, value):
if not (mode & machine.AccessMode.WRITE):
return
def _OPEN():
# Move TCP status to SOCK_INIT
self.memory[0x403] = 0x13
machine.Log("Uthernet", "Opening socket 0")
def _CONNECT():
# Move TCP status to SOCK_ESTABLISHED
self.memory[0x403] = 0x17
def _RECV():
raise NotImplementedError
commands = {
0x01: _OPEN,
# 0x02: _LISTEN,
0x04: _CONNECT,
# 0x08: _DISCON,
# 0x10: _CLOSE,
# 0x20: _SEND,
# 0x21: _SEND_MAC,
# 0x22: _SEND_KEEP,
0x40: _RECV,
}
handler = commands.get(value)
if handler:
handler()
self.io_map = {
# CONTROL REGISTERS
0x0000: (
machine.AccessMode.RW, "Mode", _mode),
0x0001: (
machine.AccessMode.RW, "Gateway Address 0", None),
0x0002: (
machine.AccessMode.RW, "Gateway Address 1", None),
0x0003: (
machine.AccessMode.RW, "Gateway Address 2", None),
0x0004: (
machine.AccessMode.RW, "Gateway Address 3", None),
0x0005: (
machine.AccessMode.RW, "Subnet Mask Address 0", None),
0x0006: (
machine.AccessMode.RW, "Subnet Mask Address 1", None),
0x0007: (
machine.AccessMode.RW, "Subnet Mask Address 2", None),
0x0008: (
machine.AccessMode.RW, "Subnet Mask Address 3", None),
0x0009: (
machine.AccessMode.RW, "Source Hardware Address 0", None),
0x000a: (
machine.AccessMode.RW, "Source Hardware Address 1", None),
0x000b: (
machine.AccessMode.RW, "Source Hardware Address 2", None),
0x000c: (
machine.AccessMode.RW, "Source Hardware Address 3", None),
0x000d: (
machine.AccessMode.RW, "Source Hardware Address 4", None),
0x000e: (
machine.AccessMode.RW, "Source Hardware Address 5", None),
0x000f: (
machine.AccessMode.RW, "Source IP Address 0", None),
0x0010: (
machine.AccessMode.RW, "Source IP Address 1", None),
0x0011: (
machine.AccessMode.RW, "Source IP Address 2", None),
0x0012: (
machine.AccessMode.RW, "Source IP Address 3", None),
0x0015: (
machine.AccessMode.RW, "Interrupt",
self.unimplemented_io_callback),
0x0016: (
machine.AccessMode.RW, "Interrupt Mask",
self.unimplemented_io_callback),
0x0017: (
machine.AccessMode.RW, "Retry Time 0",
self.unimplemented_io_callback),
0x0018: (
machine.AccessMode.RW, "Retry Time 1",
self.unimplemented_io_callback),
0x0019: (
machine.AccessMode.RW, "Retry Count",
self.unimplemented_io_callback),
0x001a: (
machine.AccessMode.RW, "RX Memory Size", None),
0x001b: (
machine.AccessMode.RW, "TX Memory Size", None),
0x001c: (
machine.AccessMode.RW, "PPPoE Auth Type 0",
self.unimplemented_io_callback),
0x001d: (
machine.AccessMode.RW, "PPPoE Auth Type 1",
self.unimplemented_io_callback),
0x0028: (
machine.AccessMode.RW, "PPP LCP Request Timer",
self.unimplemented_io_callback),
0x0029: (
machine.AccessMode.RW, "PPP LCP Magic Number",
self.unimplemented_io_callback),
0x002a: (
machine.AccessMode.RW, "Unreachable IP Address 0",
self.unimplemented_io_callback),
0x002b: (
machine.AccessMode.RW, "Unreachable IP Address 1",
self.unimplemented_io_callback),
0x002c: (
machine.AccessMode.RW, "Unreachable IP Address 2",
self.unimplemented_io_callback),
0x002d: (
machine.AccessMode.RW, "Unreachable IP Address 3",
self.unimplemented_io_callback),
0x002e: (
machine.AccessMode.RW, "Unreachable Port 0",
self.unimplemented_io_callback),
0x002f: (
machine.AccessMode.RW, "Unreachable Port 0",
self.unimplemented_io_callback),
# SOCKET 0 registers
0x0400: (
machine.AccessMode.RW, "Socket 0 Mode",
_socket_mode),
0x0401: (
machine.AccessMode.RW, "Socket 0 Command",
_socket_command),
0x0402: (
machine.AccessMode.RW, "Socket 0 Interrupt",
self.unimplemented_io_callback),
0x0403: (
machine.AccessMode.RW, "Socket 0 Status", None),
0x0404: (
machine.AccessMode.RW, "Socket 0 Source Port 0", None),
0x0405: (
machine.AccessMode.RW, "Socket 0 Source Port 1", None),
0x0406: (
machine.AccessMode.RW, "Socket 0 Dest HW Addr 0",
self.unimplemented_io_callback),
0x0407: (
machine.AccessMode.RW, "Socket 0 Dest HW Addr 1",
self.unimplemented_io_callback),
0x0408: (
machine.AccessMode.RW, "Socket 0 Dest HW Addr 2",
self.unimplemented_io_callback),
0x0409: (
machine.AccessMode.RW, "Socket 0 Dest HW Addr 3",
self.unimplemented_io_callback),
0x040a: (
machine.AccessMode.RW, "Socket 0 Dest HW Addr 4",
self.unimplemented_io_callback),
0x040b: (
machine.AccessMode.RW, "Socket 0 Dest HW Addr 5",
self.unimplemented_io_callback),
0x040c: (
machine.AccessMode.RW, "Socket 0 Dest IP Addr 0", None),
0x040d: (
machine.AccessMode.RW, "Socket 0 Dest IP Addr 1", None),
0x040e: (
machine.AccessMode.RW, "Socket 0 Dest IP Addr 2", None),
0x040f: (
machine.AccessMode.RW, "Socket 0 Dest IP Addr 3", None),
0x0410: (
machine.AccessMode.RW, "Socket 0 Dest Port 0", None),
0x0411: (
machine.AccessMode.RW, "Socket 0 Dest Port 1", None),
0x0412: (
machine.AccessMode.RW, "Socket 0 MSS 0",
self.unimplemented_io_callback),
0x0413: (
machine.AccessMode.RW, "Socket 0 MSS 1",
self.unimplemented_io_callback),
0x0414: (
machine.AccessMode.RW, "Socket 0 Protocol",
self.unimplemented_io_callback),
0x0415: (
machine.AccessMode.RW, "Socket 0 IP TOS",
self.unimplemented_io_callback),
0x0416: (
machine.AccessMode.RW, "Socket 0 IP TTL",
self.unimplemented_io_callback),
0x0420: (
machine.AccessMode.RW, "Socket 0 TX Free Size 0",
self.unimplemented_io_callback),
0x0421: (
machine.AccessMode.RW, "Socket 0 TX Free Size 1",
self.unimplemented_io_callback),
0x0422: (
machine.AccessMode.RW, "Socket 0 TX Read Ptr 0",
self.unimplemented_io_callback),
0x0423: (
machine.AccessMode.RW, "Socket 0 TX Read Ptr 1",
self.unimplemented_io_callback),
0x0424: (
machine.AccessMode.RW, "Socket 0 TX Write Ptr 0",
self.unimplemented_io_callback),
0x0425: (
machine.AccessMode.RW, "Socket 0 TX Write Ptr 1",
self.unimplemented_io_callback),
0x0426: (
machine.AccessMode.RW, "Socket 0 RX Received Size 0", None),
0x0427: (
machine.AccessMode.RW, "Socket 0 RX Received Size 1", None),
0x0428: (
machine.AccessMode.RW, "Socket 0 RX Read Ptr 0", None),
0x0429: (
machine.AccessMode.RW, "Socket 0 RX Read Ptr 1", None),
}
def read_data(self):
val = self.memory[self.ptr]
self.ptr = (self.ptr + 1) & 0x7fff
return val
def write_data(self, value):
self.memory[self.ptr] = value
self.ptr = (self.ptr + 1) & 0x7fff
return
def read_mode(self):
return self.memory[0x0000]
def write_mode(self, value):
self.memory[0x0000] = value
def reset(self):
# TODO: what state should be reset?
machine.Log("Uthernet", "Resetting")
def fill_socket(self):
# TODO: assumes 4k socket rx buffer
print("")