start splitting for https://github.com/jtauber/applepy/issues/18
This commit is contained in:
parent
fe80bc897d
commit
82dcec24b6
|
@ -0,0 +1,16 @@
|
|||
from generic.generic_config import BaseConfig
|
||||
|
||||
|
||||
class Apple2Config(BaseConfig):
|
||||
ROM_START = 0xD000
|
||||
ROM_SIZE = 0x3000
|
||||
ROM_END = ROM_START + ROM_SIZE # 0x10000
|
||||
|
||||
RAM_START = 0x0000
|
||||
RAM_SIZE = 0xC000
|
||||
RAM_END = RAM_START + RAM_SIZE # 0xc000
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cfg = Apple2Config()
|
||||
cfg.print_debug_info()
|
|
@ -0,0 +1,21 @@
|
|||
from generic.base_memory import MemoryBase, ROMBase, RAMBase
|
||||
|
||||
class ROM(ROMBase):
|
||||
pass
|
||||
|
||||
class RAM(RAMBase):
|
||||
pass
|
||||
|
||||
class Memory(MemoryBase):
|
||||
|
||||
def read_word_bug(self, cycle, address):
|
||||
if address % 0x100 == 0xFF:
|
||||
return self.read_byte(cycle, address) + (self.read_byte(cycle + 1, address & 0xFF00) << 8)
|
||||
else:
|
||||
return self.read_word(cycle, address)
|
||||
|
||||
def write_byte(self, cycle, address, value):
|
||||
if address < self.cfg.RAM_SIZE:
|
||||
self.ram.write_byte(address, value)
|
||||
if 0x400 <= address < 0x800 or 0x2000 <= address < 0x5FFF:
|
||||
self.bus_write(cycle, address, value)
|
19
applepy.py
19
applepy.py
|
@ -15,6 +15,8 @@ import sys
|
|||
import time
|
||||
import wave
|
||||
|
||||
from apple2.apple2config import Apple2Config
|
||||
|
||||
|
||||
class Display:
|
||||
|
||||
|
@ -364,19 +366,23 @@ class SoftSwitches:
|
|||
|
||||
class Apple2:
|
||||
|
||||
def __init__(self, options, display, speaker, cassette):
|
||||
def __init__(self, cfg, options, display, speaker, cassette):
|
||||
self.cfg = cfg
|
||||
self.display = display
|
||||
self.speaker = speaker
|
||||
self.softswitches = SoftSwitches(display, speaker, cassette)
|
||||
|
||||
listener = socket.socket()
|
||||
listener.bind(("127.0.0.1", 0))
|
||||
listener.bind((self.cfg.LOCAL_HOST_IP, 0))
|
||||
listener.listen(0)
|
||||
bus_port = listener.getsockname()[1]
|
||||
|
||||
print "bus I/O listen on %s:%s" % (self.cfg.LOCAL_HOST_IP, bus_port)
|
||||
|
||||
args = [
|
||||
sys.executable,
|
||||
"cpu6502.py",
|
||||
"--bus", str(listener.getsockname()[1]),
|
||||
"--bus", str(bus_port),
|
||||
"--rom", options.rom,
|
||||
]
|
||||
if options.ram:
|
||||
|
@ -391,11 +397,12 @@ class Apple2:
|
|||
|
||||
rs, _, _ = select.select([listener], [], [], 2)
|
||||
if not rs:
|
||||
print >>sys.stderr, "CPU module did not start"
|
||||
print >> sys.stderr, "CPU module did not start '%s'" % " ".join(args)
|
||||
sys.exit(1)
|
||||
self.cpu, _ = listener.accept()
|
||||
|
||||
def run(self):
|
||||
sys.stdout.flush()
|
||||
update_cycle = 0
|
||||
quit = False
|
||||
while not quit:
|
||||
|
@ -485,10 +492,12 @@ def get_options():
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cfg = Apple2Config()
|
||||
|
||||
options = get_options()
|
||||
display = Display()
|
||||
speaker = None if options.quiet else Speaker()
|
||||
cassette = Cassette(options.cassette) if options.cassette else None
|
||||
|
||||
apple = Apple2(options, display, speaker, cassette)
|
||||
apple = Apple2(cfg, options, display, speaker, cassette)
|
||||
apple.run()
|
||||
|
|
134
cpu6502.py
134
cpu6502.py
|
@ -8,11 +8,10 @@ import json
|
|||
import re
|
||||
import select
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
|
||||
|
||||
bus = None # socket for bus I/O
|
||||
from apple2.apple2memory import Memory, RAM, ROM
|
||||
from apple2.apple2config import Apple2Config
|
||||
|
||||
|
||||
def signed(x):
|
||||
|
@ -21,97 +20,6 @@ def signed(x):
|
|||
return x
|
||||
|
||||
|
||||
class ROM:
|
||||
|
||||
def __init__(self, start, size):
|
||||
self.start = start
|
||||
self.end = start + size - 1
|
||||
self._mem = [0x00] * size
|
||||
|
||||
def load(self, address, data):
|
||||
for offset, datum in enumerate(data):
|
||||
self._mem[address - self.start + offset] = datum
|
||||
|
||||
def load_file(self, address, filename):
|
||||
with open(filename, "rb") as f:
|
||||
for offset, datum in enumerate(f.read()):
|
||||
self._mem[address - self.start + offset] = ord(datum)
|
||||
|
||||
def read_byte(self, address):
|
||||
assert self.start <= address <= self.end
|
||||
return self._mem[address - self.start]
|
||||
|
||||
|
||||
class RAM(ROM):
|
||||
|
||||
def write_byte(self, address, value):
|
||||
self._mem[address] = value
|
||||
|
||||
|
||||
class Memory:
|
||||
|
||||
def __init__(self, options=None, use_bus=True):
|
||||
self.use_bus = use_bus
|
||||
self.rom = ROM(0xD000, 0x3000)
|
||||
|
||||
if options:
|
||||
self.rom.load_file(0xD000, options.rom)
|
||||
|
||||
self.ram = RAM(0x0000, 0xC000)
|
||||
|
||||
if options and options.ram:
|
||||
self.ram.load_file(0x0000, options.ram)
|
||||
|
||||
def load(self, address, data):
|
||||
if address < 0xC000:
|
||||
self.ram.load(address, data)
|
||||
|
||||
def read_byte(self, cycle, address):
|
||||
if address < 0xC000:
|
||||
return self.ram.read_byte(address)
|
||||
elif address < 0xD000:
|
||||
return self.bus_read(cycle, address)
|
||||
else:
|
||||
return self.rom.read_byte(address)
|
||||
|
||||
def read_word(self, cycle, address):
|
||||
return self.read_byte(cycle, address) + (self.read_byte(cycle + 1, address + 1) << 8)
|
||||
|
||||
def read_word_bug(self, cycle, address):
|
||||
if address % 0x100 == 0xFF:
|
||||
return self.read_byte(cycle, address) + (self.read_byte(cycle + 1, address & 0xFF00) << 8)
|
||||
else:
|
||||
return self.read_word(cycle, address)
|
||||
|
||||
def write_byte(self, cycle, address, value):
|
||||
if address < 0xC000:
|
||||
self.ram.write_byte(address, value)
|
||||
if 0x400 <= address < 0x800 or 0x2000 <= address < 0x5FFF:
|
||||
self.bus_write(cycle, address, value)
|
||||
|
||||
def bus_read(self, cycle, address):
|
||||
if not self.use_bus:
|
||||
return 0
|
||||
op = struct.pack("<IBHB", cycle, 0, address, 0)
|
||||
try:
|
||||
bus.send(op)
|
||||
b = bus.recv(1)
|
||||
if len(b) == 0:
|
||||
sys.exit(0)
|
||||
return ord(b)
|
||||
except socket.error:
|
||||
sys.exit(0)
|
||||
|
||||
def bus_write(self, cycle, address, value):
|
||||
if not self.use_bus:
|
||||
return
|
||||
op = struct.pack("<IBHB", cycle, 1, address, value)
|
||||
try:
|
||||
bus.send(op)
|
||||
except IOError:
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
class Disassemble:
|
||||
def __init__(self, cpu, memory):
|
||||
self.cpu = cpu
|
||||
|
@ -504,12 +412,16 @@ class CPU:
|
|||
|
||||
STACK_PAGE = 0x100
|
||||
RESET_VECTOR = 0xFFFC
|
||||
|
||||
def __init__(self, options, memory):
|
||||
|
||||
def __init__(self, cfg, bus, options, memory):
|
||||
self.cfg = cfg
|
||||
self.bus = bus
|
||||
self.memory = memory
|
||||
|
||||
self.control_server = BaseHTTPServer.HTTPServer(("127.0.0.1", 6502), ControlHandlerFactory(self))
|
||||
|
||||
self.control_server = BaseHTTPServer.HTTPServer(
|
||||
(self.cfg.LOCAL_HOST_IP, 6502), ControlHandlerFactory(self)
|
||||
)
|
||||
|
||||
self.accumulator = 0x00
|
||||
self.x_index = 0x00
|
||||
self.y_index = 0x00
|
||||
|
@ -689,12 +601,9 @@ class CPU:
|
|||
|
||||
def reset(self):
|
||||
self.program_counter = self.read_word(self.RESET_VECTOR)
|
||||
|
||||
def run(self, bus_port):
|
||||
global bus
|
||||
bus = socket.socket()
|
||||
bus.connect(("127.0.0.1", bus_port))
|
||||
|
||||
def run(self):
|
||||
sys.stdout.flush()
|
||||
while not self.quit:
|
||||
|
||||
timeout = 0
|
||||
|
@ -1198,7 +1107,7 @@ def get_options():
|
|||
if sys.argv[a].startswith("-"):
|
||||
if sys.argv[a] in ("-b", "--bus"):
|
||||
a += 1
|
||||
options.bus = int(sys.argv[a])
|
||||
options.bus_port = int(sys.argv[a])
|
||||
elif sys.argv[a] in ("-p", "--pc"):
|
||||
a += 1
|
||||
options.pc = int(sys.argv[a])
|
||||
|
@ -1219,12 +1128,19 @@ def get_options():
|
|||
|
||||
if __name__ == "__main__":
|
||||
options = get_options()
|
||||
if options.bus is None:
|
||||
if options.bus_port is None:
|
||||
print "ApplePy cpu core"
|
||||
print "Run applepy.py instead"
|
||||
sys.exit(0)
|
||||
|
||||
mem = Memory(options)
|
||||
|
||||
cpu = CPU(options, mem)
|
||||
cpu.run(options.bus)
|
||||
cfg = Apple2Config()
|
||||
|
||||
bus = socket.socket()
|
||||
bus_address = (cfg.LOCAL_HOST_IP, options.bus_port)
|
||||
print "bus I/O connect to %s" % repr(bus_address)
|
||||
bus.connect(bus_address)
|
||||
|
||||
mem = Memory(cfg, bus, RAM, ROM, options)
|
||||
|
||||
cpu = CPU(cfg, bus, options, mem)
|
||||
cpu.run()
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
import sys
|
||||
import socket
|
||||
import struct
|
||||
|
||||
class ROMBase(object):
|
||||
|
||||
def __init__(self, start, size):
|
||||
self.start = start
|
||||
self.end = start + size - 1
|
||||
self._mem = [0x00] * size
|
||||
|
||||
def load(self, address, data):
|
||||
for offset, datum in enumerate(data):
|
||||
self._mem[address - self.start + offset] = datum
|
||||
|
||||
def load_file(self, address, filename):
|
||||
with open(filename, "rb") as f:
|
||||
for offset, datum in enumerate(f.read()):
|
||||
self._mem[address - self.start + offset] = ord(datum)
|
||||
|
||||
def read_byte(self, address):
|
||||
assert self.start <= address <= self.end
|
||||
return self._mem[address - self.start]
|
||||
|
||||
|
||||
class RAMBase(ROMBase):
|
||||
|
||||
def write_byte(self, address, value):
|
||||
self._mem[address] = value
|
||||
|
||||
|
||||
class MemoryBase(object):
|
||||
def __init__(self, cfg, bus, ram_class, rom_class, options=None, use_bus=True):
|
||||
self.cfg = cfg
|
||||
if use_bus:
|
||||
self.bus = bus
|
||||
self.use_bus = use_bus
|
||||
|
||||
self.rom = rom_class(self.cfg.ROM_START, self.cfg.ROM_SIZE)
|
||||
|
||||
if options:
|
||||
self.rom.load_file(self.cfg.ROM_START, options.rom)
|
||||
|
||||
self.ram = ram_class(self.cfg.RAM_START, self.cfg.RAM_SIZE)
|
||||
|
||||
if options and options.ram:
|
||||
self.ram.load_file(self.cfg.RAM_START, options.ram)
|
||||
|
||||
def load(self, address, data):
|
||||
if address < self.cfg.RAM_SIZE:
|
||||
self.ram.load(address, data)
|
||||
|
||||
def read_byte(self, cycle, address):
|
||||
if address < self.cfg.RAM_SIZE:
|
||||
return self.ram.read_byte(address)
|
||||
elif address < self.cfg.ROM_START:
|
||||
return self.bus_read(cycle, address)
|
||||
else:
|
||||
return self.rom.read_byte(address)
|
||||
|
||||
def read_word(self, cycle, address):
|
||||
return self.read_byte(cycle, address) + (self.read_byte(cycle + 1, address + 1) << 8)
|
||||
|
||||
def bus_read(self, cycle, address):
|
||||
if not self.use_bus:
|
||||
return 0
|
||||
op = struct.pack("<IBHB", cycle, 0, address, 0)
|
||||
try:
|
||||
self.bus.send(op)
|
||||
b = self.bus.recv(1)
|
||||
if len(b) == 0:
|
||||
sys.exit(0)
|
||||
return ord(b)
|
||||
except socket.error:
|
||||
sys.exit(0)
|
||||
|
||||
def bus_write(self, cycle, address, value):
|
||||
if not self.use_bus:
|
||||
return
|
||||
op = struct.pack("<IBHB", cycle, 1, address, value)
|
||||
try:
|
||||
self.bus.send(op)
|
||||
except IOError:
|
||||
sys.exit(0)
|
|
@ -0,0 +1,22 @@
|
|||
import inspect
|
||||
|
||||
|
||||
class BaseConfig(object):
|
||||
LOCAL_HOST_IP = "127.0.0.1"
|
||||
|
||||
|
||||
def print_debug_info(self):
|
||||
print "Config: '%s'" % self.__class__.__name__
|
||||
|
||||
for name, value in inspect.getmembers(self): # , inspect.isdatadescriptor):
|
||||
if name.startswith("_"):
|
||||
continue
|
||||
# print name, type(value)
|
||||
if not isinstance(value, (int, basestring, list, tuple, dict)):
|
||||
continue
|
||||
if isinstance(value, (int,)):
|
||||
print "%20s = %-4s (in hex: %7s)" % (
|
||||
name, value, repr(hex(value))
|
||||
)
|
||||
else:
|
||||
print "%20s = %s" % (name, value)
|
Loading…
Reference in New Issue