diff --git a/README b/README index 15e688b..9bdd5d1 100644 --- a/README +++ b/README @@ -31,4 +31,5 @@ runs all the programs I've tried so far. The only I/O supported is the keyboard and screen but 40-column text, LORES and HIRES graphics are all supported. -ApplePy currently requires Pygame. \ No newline at end of file +ApplePy currently requires Pygame (although there is a minimal applepy_curses.py +that uses curses to display text mode only). \ No newline at end of file diff --git a/applepy.py b/applepy.py index 9d1795c..08f2123 100644 --- a/applepy.py +++ b/applepy.py @@ -5,10 +5,13 @@ import numpy import pygame +import select +import socket import struct import subprocess import sys import time +import wave class Display: @@ -106,6 +109,10 @@ class Display: self.flash_time = time.time() self.flash_on = False self.flash_chars = [[0] * 0x400] * 2 + + self.page = 1 + self.text = True + self.colour = False self.chargen = [] for c in self.characters: @@ -288,12 +295,33 @@ class Speaker: self.play() +class Cassette: + + def __init__(self, fn): + wav = wave.open(fn, "r") + self.raw = wav.readframes(wav.getnframes()) + self.start_cycle = 0 + self.start_offset = 0 + + for i, b in enumerate(self.raw): + if ord(b) > 0xA0: + self.start_offset = i + break + + def read_byte(self, cycle): + if self.start_cycle == 0: + self.start_cycle = cycle + offset = self.start_offset + (cycle - self.start_cycle) * 22000 / 1000000 + return ord(self.raw[offset]) if offset < len(self.raw) else 0x80 + + class SoftSwitches: - def __init__(self, display, speaker): + def __init__(self, display, speaker, cassette): self.kbd = 0x00 self.display = display self.speaker = speaker + self.cassette = cassette def read_byte(self, cycle, address): assert 0xC000 <= address <= 0xCFFF @@ -320,6 +348,9 @@ class SoftSwitches: self.display.lores() elif address == 0xC057: self.display.hires() + elif address == 0xC060: + if self.cassette: + return self.cassette.read_byte(cycle) else: pass # print "%04X" % address return 0x00 @@ -327,25 +358,36 @@ class SoftSwitches: class Apple2: - def __init__(self, options, display, speaker): + def __init__(self, options, display, speaker, cassette): self.display = display self.speaker = speaker - self.softswitches = SoftSwitches(display, speaker) + self.softswitches = SoftSwitches(display, speaker, cassette) + + listener = socket.socket() + listener.bind(("127.0.0.1", 0)) + listener.listen(0) args = [ "pypy", "cpu6502.py", + "--bus", str(listener.getsockname()[1]), "--rom", options.rom, ] if options.ram: args.extend([ "--ram", options.ram, ]) - self.core = subprocess.Popen( - args=args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - ) + if options.pc is not None: + args.extend([ + "--pc", str(options.pc), + ]) + self.core = subprocess.Popen(args) + + rs, _, _ = select.select([listener], [], [], 2) + if not rs: + print >>sys.stderr, "CPU module did not start" + sys.exit(1) + self.cpu, _ = listener.accept() def run(self): update_cycle = 0 @@ -354,11 +396,12 @@ class Apple2: quit = False while not quit: - op = self.core.stdout.read(8) + op = self.cpu.recv(8) + if len(op) == 0: + break cycle, rw, addr, val = struct.unpack(">sys.stderr print >>sys.stderr, "Usage: applepy.py [options]" print >>sys.stderr + print >>sys.stderr, " -c, --cassette Cassette wav file to load" print >>sys.stderr, " -R, --rom ROM file to use (default A2ROM.BIN)" print >>sys.stderr, " -r, --ram RAM file to load (default none)" + print >>sys.stderr, " -p, --pc Initial PC value" print >>sys.stderr, " -q, --quiet Quiet mode, no sounds (default sounds)" sys.exit(1) @@ -406,20 +451,28 @@ def usage(): def get_options(): class Options: def __init__(self): + self.cassette = None self.rom = "A2ROM.BIN" self.ram = None + self.pc = None self.quiet = False options = Options() a = 1 while a < len(sys.argv): if sys.argv[a].startswith("-"): - if sys.argv[a] in ("-R", "--rom"): + if sys.argv[a] in ("-c", "--cassette"): + a += 1 + options.cassette = sys.argv[a] + elif sys.argv[a] in ("-R", "--rom"): a += 1 options.rom = sys.argv[a] elif sys.argv[a] in ("-r", "--ram"): a += 1 options.ram = sys.argv[a] + elif sys.argv[a] in ("-p", "--pc"): + a += 1 + options.pc = int(sys.argv[a]) elif sys.argv[a] in ("-q", "--quiet"): options.quiet = True else: @@ -435,6 +488,7 @@ if __name__ == "__main__": options = get_options() display = Display() speaker = None if options.quiet else Speaker() + cassette = Cassette(options.cassette) if options.cassette else None - apple = Apple2(options, display, speaker) + apple = Apple2(options, display, speaker, cassette) apple.run() diff --git a/applepy_curses.py b/applepy_curses.py index 8b234e6..a232c40 100644 --- a/applepy_curses.py +++ b/applepy_curses.py @@ -4,6 +4,7 @@ import curses +import socket import struct import subprocess import sys @@ -55,20 +56,29 @@ def write(win, addr, val): def run(win): global kbd - p = subprocess.Popen( - args=[sys.executable, "cpu6502.py"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - ) + + listener = socket.socket() + listener.bind(("127.0.0.1", 0)) + listener.listen(0) + + args = [ + sys.executable, + "cpu6502.py", + "--bus", str(listener.getsockname()[1]), + "--rom", options.rom, + ] + + p = subprocess.Popen(args) + cpu, _ = listener.accept() + win.clear() curses.noecho() win.nodelay(True) while True: - op = p.stdout.read(8) + op = cpu.recv(8) cycle, rw, addr, val = struct.unpack(">sys.stderr, "ApplePy - an Apple ][ emulator in Python" + print >>sys.stderr, "James Tauber / http://jtauber.com/" + print >>sys.stderr + print >>sys.stderr, "Usage: applepy_curses.py [options]" + print >>sys.stderr + print >>sys.stderr, " -R, --rom ROM file to use (default A2ROM.BIN)" + sys.exit(1) + + +def get_options(): + class Options: + def __init__(self): + self.rom = "A2ROM.BIN" + + options = Options() + a = 1 + while a < len(sys.argv): + if sys.argv[a].startswith("-"): + if sys.argv[a] in ("-R", "--rom"): + a += 1 + options.rom = sys.argv[a] + else: + usage() + else: + usage() + a += 1 + + return options if __name__ == "__main__": + options = get_options() curses.wrapper(run) diff --git a/control.py b/control.py new file mode 100644 index 0000000..115f44c --- /dev/null +++ b/control.py @@ -0,0 +1,157 @@ +import json +import readline +import sys +import urllib + +URL_PREFIX = "http://localhost:6502" + +def get(url): + return json.loads(urllib.urlopen(URL_PREFIX + url).read()) + +def post(url, data=None): + return urllib.urlopen(URL_PREFIX + url, json.dumps(data) if data is not None else "") + +def value(s): + if s.startswith("$"): + return int(s[1:], 16) + if s.startswith("0x"): + return int(s[2:], 16) + return int(s) + +def format_disassemble(dis): + r = "%04X- " % dis["address"] + for i in range(3): + if i < len(dis["bytes"]): + r += "%02X " % dis["bytes"][i] + else: + r += " " + r += " %s" % dis["mnemonic"] + if "operand" in dis: + r += " %-10s" % dis["operand"] + if "memory" in dis: + r += "[%04X] = %0*X" % tuple(dis["memory"]) + return r + +def cmd_disassemble(a): + """Disassemble""" + if len(a) > 1: + addr = value(a[1]) + else: + status = get("/status") + addr = status["program_counter"] + disasm = get("/disassemble/%d" % addr) + for d in disasm: + print format_disassemble(d) + +def cmd_dump(a): + """Dump memory""" + start = value(a[1]) + if len(a) > 2: + end = value(a[2]) + else: + end = start + 15 + data = get("/memory/%d-%d" % (start, end)) + addr = start & ~0xF + while addr <= end: + s = "%04X-" % addr + for i in range(16): + if start <= addr + i <= end: + s += " %02X" % data[addr + i - start] + else: + s += " " + s += " " + for i in range(16): + if start <= addr + i <= end: + c = data[addr + i - start] + + # adjust for apple character set + c &= 0x3f + if c < 0x20: + c += 0x40 + + if 0x20 <= c < 0x7f: + s += chr(c) + else: + s += "." + else: + s += " " + print s + addr += 16 + +def cmd_help(a): + """Help commands""" + if len(a) > 1: + f = Commands.get(a[1]) + if f is not None: + print f.__doc__ + else: + print "Unknown command:", a[1] + else: + print "Commands:" + for c in sorted(Commands): + print " ", c + +def cmd_peek(a): + """Peek memory location""" + addr = value(a[1]) + dump = get("/memory/%d" % addr) + print "%04X: %02X" % (addr, dump[0]) + +def cmd_poke(a): + """Poke memory location""" + addr = value(a[1]) + val = value(a[2]) + post("/memory/%d" % addr, [val]) + +def cmd_status(a): + """CPU status""" + status = get("/status") + print "A=%02X X=%02X Y=%02X S=%02X PC=%04X F=%c%c0%c%c%c%c%c" % ( + status["accumulator"], + status["x_index"], + status["y_index"], + status["stack_pointer"], + status["program_counter"], + "N" if status["sign_flag"] else "n", + "V" if status["overflow_flag"] else "v", + "B" if status["break_flag"] else "b", + "D" if status["decimal_mode_flag"] else "d", + "I" if status["interrupt_disable_flag"] else "i", + "Z" if status["zero_flag"] else "z", + "C" if status["carry_flag"] else "c", + ) + disasm = get("/disassemble/%d" % status["program_counter"]) + print format_disassemble(disasm[0]) + +def cmd_quit(a): + """Quit""" + sys.exit(0) + +def cmd_reset(a): + """Reset""" + post("/reset") + +Commands = { + "disassemble": cmd_disassemble, + "dump": cmd_dump, + "help": cmd_help, + "peek": cmd_peek, + "poke": cmd_poke, + "status": cmd_status, + "quit": cmd_quit, + "reset": cmd_reset, +} + +def main(): + print "ApplePy control console" + while True: + s = raw_input("6502> ") + a = s.strip().split() + f = Commands.get(a[0]) + if f is not None: + f(a) + else: + print "Unknown command:", s + +if __name__ == "__main__": + main() diff --git a/cpu6502.py b/cpu6502.py index 0a3064d..c29c4cc 100644 --- a/cpu6502.py +++ b/cpu6502.py @@ -3,10 +3,18 @@ # originally written 2001, updated 2011 +import BaseHTTPServer +import json +import re +import select +import socket import struct import sys +bus = None # socket for bus I/O + + def signed(x): if x > 0x7F: x = x - 0x100 @@ -25,7 +33,7 @@ class ROM: self._mem[address - self.start + offset] = datum def load_file(self, address, filename): - with open(filename) as f: + with open(filename, "rb") as f: for offset, datum in enumerate(f.read()): self._mem[address - self.start + offset] = ord(datum) @@ -42,8 +50,8 @@ class RAM(ROM): class Memory: - def __init__(self, options=None, use_stdio=True): - self.use_stdio = use_stdio + def __init__(self, options=None, use_bus=True): + self.use_bus = use_bus self.rom = ROM(0xD000, 0x3000) if options: @@ -82,26 +90,24 @@ class Memory: self.bus_write(cycle, address, value) def bus_read(self, cycle, address): - if not self.use_stdio: + if not self.use_bus: return 0 op = struct.pack(" 1: - s += " " + info[1](pc) - return s + r = { + "address": pc, + "bytes": [self.cpu.read_byte(pc + i) for i in range(info[0])], + "mnemonic": info[1], + } + if len(info) > 2: + r.update(info[2](pc)) + return r, info[0] + + +class ControlHandler(BaseHTTPServer.BaseHTTPRequestHandler): + + def __init__(self, request, client_address, server, cpu): + self.cpu = cpu + self.disassemble = Disassemble(self.cpu, self.cpu.memory) + + self.get_urls = { + r"/disassemble/(\d+)$": self.get_disassemble, + r"/memory/(\d+)(-(\d+))?$": self.get_memory, + r"/memory/(\d+)(-(\d+))?/raw$": self.get_memory_raw, + r"/status$": self.get_status, + } + + self.post_urls = { + r"/memory/(\d+)(-(\d+))?$": self.post_memory, + r"/memory/(\d+)(-(\d+))?/raw$": self.post_memory_raw, + r"/quit$": self.post_quit, + r"/reset$": self.post_reset, + } + + BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, server) + + def log_request(self, code, size=0): + pass + + def dispatch(self, urls): + for r, f in urls.items(): + m = re.match(r, self.path) + if m is not None: + f(m) + break + else: + self.send_response(404) + self.end_headers() + + def response(self, s): + self.send_response(200) + self.send_header("Content-Length", str(len(s))) + self.end_headers() + self.wfile.write(s) + + def do_GET(self): + self.dispatch(self.get_urls) + + def do_POST(self): + self.dispatch(self.post_urls) + + def get_disassemble(self, m): + addr = int(m.group(1)) + r = [] + n = 20 + while n > 0: + dis, length = self.disassemble.disasm(addr) + r.append(dis) + addr += length + n -= 1 + self.response(json.dumps(r)) + + def get_memory_raw(self, m): + addr = int(m.group(1)) + e = m.group(3) + if e is not None: + end = int(e) + else: + end = addr + self.response("".join([chr(self.cpu.read_byte(x)) for x in range(addr, end + 1)])) + + def get_memory(self, m): + addr = int(m.group(1)) + e = m.group(3) + if e is not None: + end = int(e) + else: + end = addr + self.response(json.dumps(list(map(self.cpu.read_byte, range(addr, end + 1))))) + + def get_status(self, m): + self.response(json.dumps(dict((x, getattr(self.cpu, x)) for x in ( + "accumulator", + "x_index", + "y_index", + "stack_pointer", + "program_counter", + "sign_flag", + "overflow_flag", + "break_flag", + "decimal_mode_flag", + "interrupt_disable_flag", + "zero_flag", + "carry_flag", + )))) + + def post_memory(self, m): + addr = int(m.group(1)) + e = m.group(3) + if e is not None: + end = int(e) + else: + end = addr + data = json.loads(self.rfile.read(int(self.headers["Content-Length"]))) + for i, a in enumerate(range(addr, end + 1)): + self.cpu.write_byte(a, data[i]) + self.response("") + + def post_memory_raw(self, m): + addr = int(m.group(1)) + e = m.group(3) + if e is not None: + end = int(e) + else: + end = addr + data = self.rfile.read(int(self.headers["Content-Length"])) + for i, a in enumerate(range(addr, end + 1)): + self.cpu.write_byte(a, data[i]) + self.response("") + + def post_quit(self, m): + self.cpu.quit = True + self.response("") + + def post_reset(self, m): + self.cpu.reset() + self.cpu.running = True + self.response("") + + +class ControlHandlerFactory: + + def __init__(self, cpu): + self.cpu = cpu + + def __call__(self, request, client_address, server): + return ControlHandler(request, client_address, server, self.cpu) class CPU: @@ -329,9 +505,10 @@ class CPU: STACK_PAGE = 0x100 RESET_VECTOR = 0xFFFC - def __init__(self, memory): + def __init__(self, options, memory): self.memory = memory - self.disassemble = Disassemble(self, memory) + + self.control_server = BaseHTTPServer.HTTPServer(("127.0.0.1", 6502), ControlHandlerFactory(self)) self.accumulator = 0x00 self.x_index = 0x00 @@ -351,6 +528,10 @@ class CPU: self.setup_ops() self.reset() + if options.pc is not None: + self.program_counter = options.pc + self.running = True + self.quit = False def setup_ops(self): self.ops = [None] * 0x100 @@ -509,18 +690,41 @@ class CPU: def reset(self): self.program_counter = self.read_word(self.RESET_VECTOR) - def run(self): - while True: - self.cycles += 2 # all instructions take this as a minimum - op = self.read_pc_byte() - func = self.ops[op] - if func is None: - print "UNKNOWN OP" - print hex(self.program_counter - 1) - print hex(op) - break - else: - self.ops[op]() + def run(self, bus_port): + global bus + bus = socket.socket() + bus.connect(("127.0.0.1", bus_port)) + + while not self.quit: + + timeout = 0 + if not self.running: + timeout = 1 + # Currently this handler blocks from the moment + # a connection is accepted until the response + # is sent. TODO: use an async HTTP server that + # handles input data asynchronously. + sockets = [self.control_server] + rs, _, _ = select.select(sockets, [], [], timeout) + for s in rs: + if s is self.control_server: + self.control_server._handle_request_noblock() + else: + pass + + count = 1000 + while count > 0 and self.running: + self.cycles += 2 # all instructions take this as a minimum + op = self.read_pc_byte() + func = self.ops[op] + if func is None: + print "UNKNOWN OP" + print hex(self.program_counter - 1) + print hex(op) + break + else: + self.ops[op]() + count -= 1 def test_run(self, start, end): self.program_counter = start @@ -973,6 +1177,8 @@ def usage(): print >>sys.stderr print >>sys.stderr, "Usage: cpu6502.py [options]" print >>sys.stderr + print >>sys.stderr, " -b, --bus Bus port number" + print >>sys.stderr, " -p, --pc Initial PC value" print >>sys.stderr, " -R, --rom ROM file to use (default A2ROM.BIN)" print >>sys.stderr, " -r, --ram RAM file to load (default none)" sys.exit(1) @@ -983,12 +1189,20 @@ def get_options(): def __init__(self): self.rom = "A2ROM.BIN" self.ram = None + self.bus = None + self.pc = None options = Options() a = 1 while a < len(sys.argv): if sys.argv[a].startswith("-"): - if sys.argv[a] in ("-R", "--rom"): + if sys.argv[a] in ("-b", "--bus"): + a += 1 + options.bus = int(sys.argv[a]) + elif sys.argv[a] in ("-p", "--pc"): + a += 1 + options.pc = int(sys.argv[a]) + elif sys.argv[a] in ("-R", "--rom"): a += 1 options.rom = sys.argv[a] elif sys.argv[a] in ("-r", "--ram"): @@ -1004,13 +1218,13 @@ def get_options(): if __name__ == "__main__": - if sys.stdout.isatty(): + options = get_options() + if options.bus is None: print "ApplePy cpu core" print "Run applepy.py instead" sys.exit(0) - options = get_options() mem = Memory(options) - cpu = CPU(mem) - cpu.run() + cpu = CPU(options, mem) + cpu.run(options.bus) diff --git a/tests.py b/tests.py index 3d8dedd..6a5d606 100644 --- a/tests.py +++ b/tests.py @@ -5,7 +5,7 @@ from cpu6502 import Memory, CPU class TestMemory(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) def test_load(self): self.memory.load(0x1000, [0x01, 0x02, 0x03]) @@ -25,7 +25,7 @@ class TestMemory(unittest.TestCase): class TestLoadStoreOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) self.memory.load(0x1000, [0x00, 0x01, 0x7F, 0x80, 0xFF]) @@ -114,7 +114,7 @@ class TestLoadStoreOperations(unittest.TestCase): class TestRegisterTransferOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_TAX(self): @@ -189,7 +189,7 @@ class TestRegisterTransferOperations(unittest.TestCase): class TestStackOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_TSX(self): @@ -237,7 +237,7 @@ class TestStackOperations(unittest.TestCase): class TestLogicalOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_AND(self): @@ -325,7 +325,7 @@ class TestLogicalOperations(unittest.TestCase): class TestArithmeticOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_ADC_without_BCD(self): @@ -544,7 +544,7 @@ class TestArithmeticOperations(unittest.TestCase): class TestIncrementDecrementOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_INC(self): @@ -653,7 +653,7 @@ class TestIncrementDecrementOperations(unittest.TestCase): class TestShiftOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_ASL(self): @@ -760,7 +760,7 @@ class TestShiftOperations(unittest.TestCase): class TestJumpCallOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_JMP(self): @@ -792,7 +792,7 @@ class TestJumpCallOperations(unittest.TestCase): class TestBranchOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_BCC(self): @@ -879,7 +879,7 @@ class TestBranchOperations(unittest.TestCase): class TestStatusFlagOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_CLC(self): @@ -921,7 +921,7 @@ class TestStatusFlagOperations(unittest.TestCase): class TestSystemFunctionOperations(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_BRK(self): @@ -951,7 +951,7 @@ class TestSystemFunctionOperations(unittest.TestCase): class Test6502Bugs(unittest.TestCase): def setUp(self): - self.memory = Memory(use_stdio=False) + self.memory = Memory(use_bus=False) self.cpu = CPU(self.memory) def test_zero_page_x(self):