diff --git a/apple2/apple2config.py b/apple2/apple2config.py index 722ed35..5cc83e0 100644 --- a/apple2/apple2config.py +++ b/apple2/apple2config.py @@ -2,6 +2,7 @@ from generic.generic_config import BaseConfig class Apple2Config(BaseConfig): + # Memory ROM_START = 0xD000 ROM_SIZE = 0x3000 ROM_END = ROM_START + ROM_SIZE # 0x10000 @@ -10,6 +11,12 @@ class Apple2Config(BaseConfig): RAM_SIZE = 0xC000 RAM_END = RAM_START + RAM_SIZE # 0xc000 + # CPU + STACK_PAGE = 0x100 + RESET_VECTOR = 0xFFFC + + HTTPSERVER_PORT = 6502 + if __name__ == "__main__": cfg = Apple2Config() diff --git a/cpu6502.py b/cpu6502.py index a168fd9..95d2d6f 100644 --- a/cpu6502.py +++ b/cpu6502.py @@ -12,12 +12,7 @@ import sys from apple2.apple2memory import Memory, RAM, ROM from apple2.apple2config import Apple2Config - - -def signed(x): - if x > 0x7F: - x = x - 0x100 - return x +from generic.cpu import BaseCPU, signed class Disassemble: @@ -273,178 +268,17 @@ class Disassemble: return r, info[0] -class ControlHandler(BaseHTTPServer.BaseHTTPRequestHandler): - - def __init__(self, request, client_address, server, cpu): - self.cpu = cpu - self.disassemble = Disassemble(self.cpu, self.cpu.memory) - - self.get_urls = { - r"/disassemble/(\d+)$": self.get_disassemble, - r"/memory/(\d+)(-(\d+))?$": self.get_memory, - r"/memory/(\d+)(-(\d+))?/raw$": self.get_memory_raw, - r"/status$": self.get_status, - } - - self.post_urls = { - r"/memory/(\d+)(-(\d+))?$": self.post_memory, - r"/memory/(\d+)(-(\d+))?/raw$": self.post_memory_raw, - r"/quit$": self.post_quit, - r"/reset$": self.post_reset, - } - - BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, server) - - def log_request(self, code, size=0): - pass - - def dispatch(self, urls): - for r, f in urls.items(): - m = re.match(r, self.path) - if m is not None: - f(m) - break - else: - self.send_response(404) - self.end_headers() - - def response(self, s): - self.send_response(200) - self.send_header("Content-Length", str(len(s))) - self.end_headers() - self.wfile.write(s) - - def do_GET(self): - self.dispatch(self.get_urls) - - def do_POST(self): - self.dispatch(self.post_urls) - - def get_disassemble(self, m): - addr = int(m.group(1)) - r = [] - n = 20 - while n > 0: - dis, length = self.disassemble.disasm(addr) - r.append(dis) - addr += length - n -= 1 - self.response(json.dumps(r)) - - def get_memory_raw(self, m): - addr = int(m.group(1)) - e = m.group(3) - if e is not None: - end = int(e) - else: - end = addr - self.response("".join([chr(self.cpu.read_byte(x)) for x in range(addr, end + 1)])) - - def get_memory(self, m): - addr = int(m.group(1)) - e = m.group(3) - if e is not None: - end = int(e) - else: - end = addr - self.response(json.dumps(list(map(self.cpu.read_byte, range(addr, end + 1))))) - - def get_status(self, m): - self.response(json.dumps(dict((x, getattr(self.cpu, x)) for x in ( - "accumulator", - "x_index", - "y_index", - "stack_pointer", - "program_counter", - "sign_flag", - "overflow_flag", - "break_flag", - "decimal_mode_flag", - "interrupt_disable_flag", - "zero_flag", - "carry_flag", - )))) - - def post_memory(self, m): - addr = int(m.group(1)) - e = m.group(3) - if e is not None: - end = int(e) - else: - end = addr - data = json.loads(self.rfile.read(int(self.headers["Content-Length"]))) - for i, a in enumerate(range(addr, end + 1)): - self.cpu.write_byte(a, data[i]) - self.response("") - - def post_memory_raw(self, m): - addr = int(m.group(1)) - e = m.group(3) - if e is not None: - end = int(e) - else: - end = addr - data = self.rfile.read(int(self.headers["Content-Length"])) - for i, a in enumerate(range(addr, end + 1)): - self.cpu.write_byte(a, data[i]) - self.response("") - - def post_quit(self, m): - self.cpu.quit = True - self.response("") - - def post_reset(self, m): - self.cpu.reset() - self.cpu.running = True - self.response("") -class ControlHandlerFactory: - - def __init__(self, cpu): - self.cpu = cpu - - def __call__(self, request, client_address, server): - return ControlHandler(request, client_address, server, self.cpu) -class CPU: - - STACK_PAGE = 0x100 - RESET_VECTOR = 0xFFFC +class CPU6502(BaseCPU): - def __init__(self, cfg, bus, options, memory): - self.cfg = cfg - self.bus = bus - self.memory = memory + def __init__(self, *args, **kwargs): + super(CPU6502, self).__init__(*args, **kwargs) - self.control_server = BaseHTTPServer.HTTPServer( - (self.cfg.LOCAL_HOST_IP, 6502), ControlHandlerFactory(self) - ) - - self.accumulator = 0x00 - self.x_index = 0x00 - self.y_index = 0x00 - - self.carry_flag = 0 - self.zero_flag = 0 - self.interrupt_disable_flag = 0 - self.decimal_mode_flag = 0 - self.break_flag = 1 - self.overflow_flag = 0 - self.sign_flag = 0 - self.stack_pointer = 0xFF - - self.cycles = 0 - - self.setup_ops() - self.reset() - if options.pc is not None: - self.program_counter = options.pc - self.running = True - self.quit = False - + def setup_ops(self): self.ops = [None] * 0x100 self.ops[0x00] = lambda: self.BRK() @@ -598,486 +432,7 @@ class CPU: self.ops[0xF9] = lambda: self.SBC(self.absolute_y_mode()) self.ops[0xFD] = lambda: self.SBC(self.absolute_x_mode()) self.ops[0xFE] = lambda: self.INC(self.absolute_x_mode(rmw=True)) - - def reset(self): - self.program_counter = self.read_word(self.RESET_VECTOR) - def run(self): - sys.stdout.flush() - while not self.quit: - - timeout = 0 - if not self.running: - timeout = 1 - # Currently this handler blocks from the moment - # a connection is accepted until the response - # is sent. TODO: use an async HTTP server that - # handles input data asynchronously. - sockets = [self.control_server] - rs, _, _ = select.select(sockets, [], [], timeout) - for s in rs: - if s is self.control_server: - self.control_server._handle_request_noblock() - else: - pass - - count = 1000 - while count > 0 and self.running: - self.cycles += 2 # all instructions take this as a minimum - op = self.read_pc_byte() - func = self.ops[op] - if func is None: - print "UNKNOWN OP" - print hex(self.program_counter - 1) - print hex(op) - break - else: - self.ops[op]() - count -= 1 - - def test_run(self, start, end): - self.program_counter = start - while True: - self.cycles += 2 # all instructions take this as a minimum - if self.program_counter == end: - break - op = self.read_pc_byte() - func = self.ops[op] - if func is None: - print "UNKNOWN OP" - print hex(self.program_counter - 1) - print hex(op) - break - else: - self.ops[op]() - - #### - - def get_pc(self, inc=1): - pc = self.program_counter - self.program_counter += inc - return pc - - def read_byte(self, address): - return self.memory.read_byte(self.cycles, address) - - def read_word(self, address): - return self.memory.read_word(self.cycles, address) - - def read_word_bug(self, address): - return self.memory.read_word_bug(self.cycles, address) - - def read_pc_byte(self): - return self.read_byte(self.get_pc()) - - def read_pc_word(self): - return self.read_word(self.get_pc(2)) - - def write_byte(self, address, value): - self.memory.write_byte(self.cycles, address, value) - - #### - - def status_from_byte(self, status): - self.carry_flag = [0, 1][0 != status & 1] - self.zero_flag = [0, 1][0 != status & 2] - self.interrupt_disable_flag = [0, 1][0 != status & 4] - self.decimal_mode_flag = [0, 1][0 != status & 8] - self.break_flag = [0, 1][0 != status & 16] - self.overflow_flag = [0, 1][0 != status & 64] - self.sign_flag = [0, 1][0 != status & 128] - - def status_as_byte(self): - return self.carry_flag | self.zero_flag << 1 | self.interrupt_disable_flag << 2 | self.decimal_mode_flag << 3 | self.break_flag << 4 | 1 << 5 | self.overflow_flag << 6 | self.sign_flag << 7 - - #### - - def push_byte(self, byte): - self.write_byte(self.STACK_PAGE + self.stack_pointer, byte) - self.stack_pointer = (self.stack_pointer - 1) % 0x100 - - def pull_byte(self): - self.stack_pointer = (self.stack_pointer + 1) % 0x100 - return self.read_byte(self.STACK_PAGE + self.stack_pointer) - - def push_word(self, word): - hi, lo = divmod(word, 0x100) - self.push_byte(hi) - self.push_byte(lo) - - def pull_word(self): - s = self.STACK_PAGE + self.stack_pointer + 1 - self.stack_pointer += 2 - return self.read_word(s) - - #### - - def immediate_mode(self): - return self.get_pc() - - def absolute_mode(self): - self.cycles += 2 - return self.read_pc_word() - - def absolute_x_mode(self, rmw=False): - if rmw: - self.cycles += 1 - return self.absolute_mode() + self.x_index - - def absolute_y_mode(self, rmw=False): - if rmw: - self.cycles += 1 - return self.absolute_mode() + self.y_index - - def zero_page_mode(self): - self.cycles += 1 - return self.read_pc_byte() - - def zero_page_x_mode(self): - self.cycles += 1 - return (self.zero_page_mode() + self.x_index) % 0x100 - - def zero_page_y_mode(self): - self.cycles += 1 - return (self.zero_page_mode() + self.y_index) % 0x100 - - def indirect_mode(self): - self.cycles += 2 - return self.read_word_bug(self.absolute_mode()) - - def indirect_x_mode(self): - self.cycles += 4 - return self.read_word_bug((self.read_pc_byte() + self.x_index) % 0x100) - - def indirect_y_mode(self, rmw=False): - if rmw: - self.cycles += 4 - else: - self.cycles += 3 - return self.read_word_bug(self.read_pc_byte()) + self.y_index - - def relative_mode(self): - pc = self.get_pc() - return pc + 1 + signed(self.read_byte(pc)) - - #### - - def update_nz(self, value): - value = value % 0x100 - self.zero_flag = [0, 1][(value == 0)] - self.sign_flag = [0, 1][((value & 0x80) != 0)] - return value - - def update_nzc(self, value): - self.carry_flag = [0, 1][(value > 0xFF)] - return self.update_nz(value) - - #### - - # LOAD / STORE - - def LDA(self, operand_address): - self.accumulator = self.update_nz(self.read_byte(operand_address)) - - def LDX(self, operand_address): - self.x_index = self.update_nz(self.read_byte(operand_address)) - - def LDY(self, operand_address): - self.y_index = self.update_nz(self.read_byte(operand_address)) - - def STA(self, operand_address): - self.write_byte(operand_address, self.accumulator) - - def STX(self, operand_address): - self.write_byte(operand_address, self.x_index) - - def STY(self, operand_address): - self.write_byte(operand_address, self.y_index) - - # TRANSFER - - def TAX(self): - self.x_index = self.update_nz(self.accumulator) - - def TXA(self): - self.accumulator = self.update_nz(self.x_index) - - def TAY(self): - self.y_index = self.update_nz(self.accumulator) - - def TYA(self): - self.accumulator = self.update_nz(self.y_index) - - def TSX(self): - self.x_index = self.update_nz(self.stack_pointer) - - def TXS(self): - self.stack_pointer = self.x_index - - # SHIFTS / ROTATES - - def ASL(self, operand_address=None): - if operand_address is None: - self.accumulator = self.update_nzc(self.accumulator << 1) - else: - self.cycles += 2 - self.write_byte(operand_address, self.update_nzc(self.read_byte(operand_address) << 1)) - - def ROL(self, operand_address=None): - if operand_address is None: - a = self.accumulator << 1 - if self.carry_flag: - a = a | 0x01 - self.accumulator = self.update_nzc(a) - else: - self.cycles += 2 - m = self.read_byte(operand_address) << 1 - if self.carry_flag: - m = m | 0x01 - self.write_byte(operand_address, self.update_nzc(m)) - - def ROR(self, operand_address=None): - if operand_address is None: - if self.carry_flag: - self.accumulator = self.accumulator | 0x100 - self.carry_flag = self.accumulator % 2 - self.accumulator = self.update_nz(self.accumulator >> 1) - else: - self.cycles += 2 - m = self.read_byte(operand_address) - if self.carry_flag: - m = m | 0x100 - self.carry_flag = m % 2 - self.write_byte(operand_address, self.update_nz(m >> 1)) - - def LSR(self, operand_address=None): - if operand_address is None: - self.carry_flag = self.accumulator % 2 - self.accumulator = self.update_nz(self.accumulator >> 1) - else: - self.cycles += 2 - self.carry_flag = self.read_byte(operand_address) % 2 - self.write_byte(operand_address, self.update_nz(self.read_byte(operand_address) >> 1)) - - # JUMPS / RETURNS - - def JMP(self, operand_address): - self.cycles -= 1 - self.program_counter = operand_address - - def JSR(self, operand_address): - self.cycles += 2 - self.push_word(self.program_counter - 1) - self.program_counter = operand_address - - def RTS(self): - self.cycles += 4 - self.program_counter = self.pull_word() + 1 - - # BRANCHES - - def BCC(self, operand_address): - if not self.carry_flag: - self.cycles += 1 - self.program_counter = operand_address - - def BCS(self, operand_address): - if self.carry_flag: - self.cycles += 1 - self.program_counter = operand_address - - def BEQ(self, operand_address): - if self.zero_flag: - self.cycles += 1 - self.program_counter = operand_address - - def BNE(self, operand_address): - if not self.zero_flag: - self.cycles += 1 - self.program_counter = operand_address - - def BMI(self, operand_address): - if self.sign_flag: - self.cycles += 1 - self.program_counter = operand_address - - def BPL(self, operand_address): - if not self.sign_flag: - self.cycles += 1 - self.program_counter = operand_address - - def BVC(self, operand_address): - if not self.overflow_flag: - self.cycles += 1 - self.program_counter = operand_address - - def BVS(self, operand_address): - if self.overflow_flag: - self.cycles += 1 - self.program_counter = operand_address - - # SET / CLEAR FLAGS - - def CLC(self): - self.carry_flag = 0 - - def CLD(self): - self.decimal_mode_flag = 0 - - def CLI(self): - self.interrupt_disable_flag = 0 - - def CLV(self): - self.overflow_flag = 0 - - def SEC(self): - self.carry_flag = 1 - - def SED(self): - self.decimal_mode_flag = 1 - - def SEI(self): - self.interrupt_disable_flag = 1 - - # INCREMENT / DECREMENT - - def DEC(self, operand_address): - self.cycles += 2 - self.write_byte(operand_address, self.update_nz(self.read_byte(operand_address) - 1)) - - def DEX(self): - self.x_index = self.update_nz(self.x_index - 1) - - def DEY(self): - self.y_index = self.update_nz(self.y_index - 1) - - def INC(self, operand_address): - self.cycles += 2 - self.write_byte(operand_address, self.update_nz(self.read_byte(operand_address) + 1)) - - def INX(self): - self.x_index = self.update_nz(self.x_index + 1) - - def INY(self): - self.y_index = self.update_nz(self.y_index + 1) - - # PUSH / PULL - - def PHA(self): - self.cycles += 1 - self.push_byte(self.accumulator) - - def PHP(self): - self.cycles += 1 - self.push_byte(self.status_as_byte()) - - def PLA(self): - self.cycles += 2 - self.accumulator = self.update_nz(self.pull_byte()) - - def PLP(self): - self.cycles += 2 - self.status_from_byte(self.pull_byte()) - - # LOGIC - - def AND(self, operand_address): - self.accumulator = self.update_nz(self.accumulator & self.read_byte(operand_address)) - - def ORA(self, operand_address): - self.accumulator = self.update_nz(self.accumulator | self.read_byte(operand_address)) - - def EOR(self, operand_address): - self.accumulator = self.update_nz(self.accumulator ^ self.read_byte(operand_address)) - - # ARITHMETIC - - def ADC(self, operand_address): - # @@@ doesn't handle BCD yet - assert not self.decimal_mode_flag - - a2 = self.accumulator - a1 = signed(a2) - m2 = self.read_byte(operand_address) - m1 = signed(m2) - - # twos complement addition - result1 = a1 + m1 + self.carry_flag - - # unsigned addition - result2 = a2 + m2 + self.carry_flag - - self.accumulator = self.update_nzc(result2) - - # perhaps this could be calculated from result2 but result1 is more intuitive - self.overflow_flag = [0, 1][(result1 > 127) | (result1 < -128)] - - def SBC(self, operand_address): - # @@@ doesn't handle BCD yet - assert not self.decimal_mode_flag - - a2 = self.accumulator - a1 = signed(a2) - m2 = self.read_byte(operand_address) - m1 = signed(m2) - - # twos complement subtraction - result1 = a1 - m1 - [1, 0][self.carry_flag] - - # unsigned subtraction - result2 = a2 - m2 - [1, 0][self.carry_flag] - - self.accumulator = self.update_nz(result2) - self.carry_flag = [0, 1][(result2 >= 0)] - - # perhaps this could be calculated from result2 but result1 is more intuitive - self.overflow_flag = [0, 1][(result1 > 127) | (result1 < -128)] - - # BIT - - def BIT(self, operand_address): - value = self.read_byte(operand_address) - self.sign_flag = ((value >> 7) % 2) # bit 7 - self.overflow_flag = ((value >> 6) % 2) # bit 6 - self.zero_flag = [0, 1][((self.accumulator & value) == 0)] - - # COMPARISON - - def CMP(self, operand_address): - result = self.accumulator - self.read_byte(operand_address) - self.carry_flag = [0, 1][(result >= 0)] - self.update_nz(result) - - def CPX(self, operand_address): - result = self.x_index - self.read_byte(operand_address) - self.carry_flag = [0, 1][(result >= 0)] - self.update_nz(result) - - def CPY(self, operand_address): - result = self.y_index - self.read_byte(operand_address) - self.carry_flag = [0, 1][(result >= 0)] - self.update_nz(result) - - # SYSTEM - - def NOP(self): - pass - - def BRK(self): - self.cycles += 5 - self.push_word(self.program_counter + 1) - self.push_byte(self.status_as_byte()) - self.program_counter = self.read_word(0xFFFE) - self.break_flag = 1 - - def RTI(self): - self.cycles += 4 - self.status_from_byte(self.pull_byte()) - self.program_counter = self.pull_word() - - - # @@@ IRQ - # @@@ NMI def usage(): @@ -1142,5 +497,5 @@ if __name__ == "__main__": mem = Memory(cfg, bus, RAM, ROM, options) - cpu = CPU(cfg, bus, options, mem) + cpu = CPU6502(cfg, bus, options, mem, Disassemble) cpu.run() diff --git a/generic/cpu.py b/generic/cpu.py index e69de29..d4c0085 100644 --- a/generic/cpu.py +++ b/generic/cpu.py @@ -0,0 +1,666 @@ +import BaseHTTPServer +import re +import json +import select +import sys + + +def signed(x): + if x > 0x7F: + x = x - 0x100 + return x + + +class ControlHandler(BaseHTTPServer.BaseHTTPRequestHandler): + + def __init__(self, request, client_address, server, cpu, disassemble): + self.cpu = cpu + self.disassemble = disassemble + + self.get_urls = { + r"/disassemble/(\d+)$": self.get_disassemble, + r"/memory/(\d+)(-(\d+))?$": self.get_memory, + r"/memory/(\d+)(-(\d+))?/raw$": self.get_memory_raw, + r"/status$": self.get_status, + } + + self.post_urls = { + r"/memory/(\d+)(-(\d+))?$": self.post_memory, + r"/memory/(\d+)(-(\d+))?/raw$": self.post_memory_raw, + r"/quit$": self.post_quit, + r"/reset$": self.post_reset, + } + + BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, request, client_address, server) + + def log_request(self, code, size=0): + pass + + def dispatch(self, urls): + for r, f in urls.items(): + m = re.match(r, self.path) + if m is not None: + f(m) + break + else: + self.send_response(404) + self.end_headers() + + def response(self, s): + self.send_response(200) + self.send_header("Content-Length", str(len(s))) + self.end_headers() + self.wfile.write(s) + + def do_GET(self): + self.dispatch(self.get_urls) + + def do_POST(self): + self.dispatch(self.post_urls) + + def get_disassemble(self, m): + addr = int(m.group(1)) + r = [] + n = 20 + while n > 0: + dis, length = self.disassemble.disasm(addr) + r.append(dis) + addr += length + n -= 1 + self.response(json.dumps(r)) + + def get_memory_raw(self, m): + addr = int(m.group(1)) + e = m.group(3) + if e is not None: + end = int(e) + else: + end = addr + self.response("".join([chr(self.cpu.read_byte(x)) for x in range(addr, end + 1)])) + + def get_memory(self, m): + addr = int(m.group(1)) + e = m.group(3) + if e is not None: + end = int(e) + else: + end = addr + self.response(json.dumps(list(map(self.cpu.read_byte, range(addr, end + 1))))) + + def get_status(self, m): + self.response(json.dumps(dict((x, getattr(self.cpu, x)) for x in ( + "accumulator", + "x_index", + "y_index", + "stack_pointer", + "program_counter", + "sign_flag", + "overflow_flag", + "break_flag", + "decimal_mode_flag", + "interrupt_disable_flag", + "zero_flag", + "carry_flag", + )))) + + def post_memory(self, m): + addr = int(m.group(1)) + e = m.group(3) + if e is not None: + end = int(e) + else: + end = addr + data = json.loads(self.rfile.read(int(self.headers["Content-Length"]))) + for i, a in enumerate(range(addr, end + 1)): + self.cpu.write_byte(a, data[i]) + self.response("") + + def post_memory_raw(self, m): + addr = int(m.group(1)) + e = m.group(3) + if e is not None: + end = int(e) + else: + end = addr + data = self.rfile.read(int(self.headers["Content-Length"])) + for i, a in enumerate(range(addr, end + 1)): + self.cpu.write_byte(a, data[i]) + self.response("") + + def post_quit(self, m): + self.cpu.quit = True + self.response("") + + def post_reset(self, m): + self.cpu.reset() + self.cpu.running = True + self.response("") + + +class ControlHandlerFactory: + + def __init__(self, cpu, disassemble): + self.cpu = cpu + self.disassemble = disassemble + + def __call__(self, request, client_address, server): + return ControlHandler( + request, client_address, server, self.cpu, self.disassemble + ) + + +class BaseCPU(object): + + def __init__(self, cfg, bus, options, memory, DisassembleClass): + self.cfg = cfg + self.bus = bus + self.memory = memory + + disassemble = DisassembleClass(self, memory) + + self.control_server = BaseHTTPServer.HTTPServer( + (self.cfg.LOCAL_HOST_IP, self.cfg.HTTPSERVER_PORT), + ControlHandlerFactory(self, disassemble) + ) + + self.accumulator = 0x00 + self.x_index = 0x00 + self.y_index = 0x00 + + self.carry_flag = 0 + self.zero_flag = 0 + self.interrupt_disable_flag = 0 + self.decimal_mode_flag = 0 + self.break_flag = 1 + self.overflow_flag = 0 + self.sign_flag = 0 + + self.cycles = 0 + + self.setup_ops() + self.reset() + if options.pc is not None: + self.program_counter = options.pc + self.running = True + self.quit = False + + + def reset(self): + self.program_counter = self.read_word(self.cfg.RESET_VECTOR) + + def run(self): + sys.stdout.flush() + while not self.quit: + + timeout = 0 + if not self.running: + timeout = 1 + # Currently this handler blocks from the moment + # a connection is accepted until the response + # is sent. TODO: use an async HTTP server that + # handles input data asynchronously. + sockets = [self.control_server] + rs, _, _ = select.select(sockets, [], [], timeout) + for s in rs: + if s is self.control_server: + self.control_server._handle_request_noblock() + else: + pass + + count = 1000 + while count > 0 and self.running: + self.cycles += 2 # all instructions take this as a minimum + op = self.read_pc_byte() + func = self.ops[op] + if func is None: + print "UNKNOWN OP" + print hex(self.program_counter - 1) + print hex(op) + break + else: + self.ops[op]() + count -= 1 + + def test_run(self, start, end): + self.program_counter = start + while True: + self.cycles += 2 # all instructions take this as a minimum + if self.program_counter == end: + break + op = self.read_pc_byte() + func = self.ops[op] + if func is None: + print "UNKNOWN OP" + print hex(self.program_counter - 1) + print hex(op) + break + else: + self.ops[op]() + + #### + + def get_pc(self, inc=1): + pc = self.program_counter + self.program_counter += inc + return pc + + def read_byte(self, address): + return self.memory.read_byte(self.cycles, address) + + def read_word(self, address): + return self.memory.read_word(self.cycles, address) + + def read_word_bug(self, address): + return self.memory.read_word_bug(self.cycles, address) + + def read_pc_byte(self): + return self.read_byte(self.get_pc()) + + def read_pc_word(self): + return self.read_word(self.get_pc(2)) + + def write_byte(self, address, value): + self.memory.write_byte(self.cycles, address, value) + + #### + + def status_from_byte(self, status): + self.carry_flag = [0, 1][0 != status & 1] + self.zero_flag = [0, 1][0 != status & 2] + self.interrupt_disable_flag = [0, 1][0 != status & 4] + self.decimal_mode_flag = [0, 1][0 != status & 8] + self.break_flag = [0, 1][0 != status & 16] + self.overflow_flag = [0, 1][0 != status & 64] + self.sign_flag = [0, 1][0 != status & 128] + + def status_as_byte(self): + return self.carry_flag | self.zero_flag << 1 | self.interrupt_disable_flag << 2 | self.decimal_mode_flag << 3 | self.break_flag << 4 | 1 << 5 | self.overflow_flag << 6 | self.sign_flag << 7 + + #### + + def push_byte(self, byte): + self.write_byte(self.cfg.STACK_PAGE + self.stack_pointer, byte) + self.stack_pointer = (self.stack_pointer - 1) % 0x100 + + def pull_byte(self): + self.stack_pointer = (self.stack_pointer + 1) % 0x100 + return self.read_byte(self.cfg.STACK_PAGE + self.stack_pointer) + + def push_word(self, word): + hi, lo = divmod(word, 0x100) + self.push_byte(hi) + self.push_byte(lo) + + def pull_word(self): + s = self.cfg.STACK_PAGE + self.stack_pointer + 1 + self.stack_pointer += 2 + return self.read_word(s) + + #### + + def immediate_mode(self): + return self.get_pc() + + def absolute_mode(self): + self.cycles += 2 + return self.read_pc_word() + + def absolute_x_mode(self, rmw=False): + if rmw: + self.cycles += 1 + return self.absolute_mode() + self.x_index + + def absolute_y_mode(self, rmw=False): + if rmw: + self.cycles += 1 + return self.absolute_mode() + self.y_index + + def zero_page_mode(self): + self.cycles += 1 + return self.read_pc_byte() + + def zero_page_x_mode(self): + self.cycles += 1 + return (self.zero_page_mode() + self.x_index) % 0x100 + + def zero_page_y_mode(self): + self.cycles += 1 + return (self.zero_page_mode() + self.y_index) % 0x100 + + def indirect_mode(self): + self.cycles += 2 + return self.read_word_bug(self.absolute_mode()) + + def indirect_x_mode(self): + self.cycles += 4 + return self.read_word_bug((self.read_pc_byte() + self.x_index) % 0x100) + + def indirect_y_mode(self, rmw=False): + if rmw: + self.cycles += 4 + else: + self.cycles += 3 + return self.read_word_bug(self.read_pc_byte()) + self.y_index + + def relative_mode(self): + pc = self.get_pc() + return pc + 1 + signed(self.read_byte(pc)) + + #### + + def update_nz(self, value): + value = value % 0x100 + self.zero_flag = [0, 1][(value == 0)] + self.sign_flag = [0, 1][((value & 0x80) != 0)] + return value + + def update_nzc(self, value): + self.carry_flag = [0, 1][(value > 0xFF)] + return self.update_nz(value) + + #### + + # LOAD / STORE + + def LDA(self, operand_address): + self.accumulator = self.update_nz(self.read_byte(operand_address)) + + def LDX(self, operand_address): + self.x_index = self.update_nz(self.read_byte(operand_address)) + + def LDY(self, operand_address): + self.y_index = self.update_nz(self.read_byte(operand_address)) + + def STA(self, operand_address): + self.write_byte(operand_address, self.accumulator) + + def STX(self, operand_address): + self.write_byte(operand_address, self.x_index) + + def STY(self, operand_address): + self.write_byte(operand_address, self.y_index) + + # TRANSFER + + def TAX(self): + self.x_index = self.update_nz(self.accumulator) + + def TXA(self): + self.accumulator = self.update_nz(self.x_index) + + def TAY(self): + self.y_index = self.update_nz(self.accumulator) + + def TYA(self): + self.accumulator = self.update_nz(self.y_index) + + def TSX(self): + self.x_index = self.update_nz(self.stack_pointer) + + def TXS(self): + self.stack_pointer = self.x_index + + # SHIFTS / ROTATES + + def ASL(self, operand_address=None): + if operand_address is None: + self.accumulator = self.update_nzc(self.accumulator << 1) + else: + self.cycles += 2 + self.write_byte(operand_address, self.update_nzc(self.read_byte(operand_address) << 1)) + + def ROL(self, operand_address=None): + if operand_address is None: + a = self.accumulator << 1 + if self.carry_flag: + a = a | 0x01 + self.accumulator = self.update_nzc(a) + else: + self.cycles += 2 + m = self.read_byte(operand_address) << 1 + if self.carry_flag: + m = m | 0x01 + self.write_byte(operand_address, self.update_nzc(m)) + + def ROR(self, operand_address=None): + if operand_address is None: + if self.carry_flag: + self.accumulator = self.accumulator | 0x100 + self.carry_flag = self.accumulator % 2 + self.accumulator = self.update_nz(self.accumulator >> 1) + else: + self.cycles += 2 + m = self.read_byte(operand_address) + if self.carry_flag: + m = m | 0x100 + self.carry_flag = m % 2 + self.write_byte(operand_address, self.update_nz(m >> 1)) + + def LSR(self, operand_address=None): + if operand_address is None: + self.carry_flag = self.accumulator % 2 + self.accumulator = self.update_nz(self.accumulator >> 1) + else: + self.cycles += 2 + self.carry_flag = self.read_byte(operand_address) % 2 + self.write_byte(operand_address, self.update_nz(self.read_byte(operand_address) >> 1)) + + # JUMPS / RETURNS + + def JMP(self, operand_address): + self.cycles -= 1 + self.program_counter = operand_address + + def JSR(self, operand_address): + self.cycles += 2 + self.push_word(self.program_counter - 1) + self.program_counter = operand_address + + def RTS(self): + self.cycles += 4 + self.program_counter = self.pull_word() + 1 + + # BRANCHES + + def BCC(self, operand_address): + if not self.carry_flag: + self.cycles += 1 + self.program_counter = operand_address + + def BCS(self, operand_address): + if self.carry_flag: + self.cycles += 1 + self.program_counter = operand_address + + def BEQ(self, operand_address): + if self.zero_flag: + self.cycles += 1 + self.program_counter = operand_address + + def BNE(self, operand_address): + if not self.zero_flag: + self.cycles += 1 + self.program_counter = operand_address + + def BMI(self, operand_address): + if self.sign_flag: + self.cycles += 1 + self.program_counter = operand_address + + def BPL(self, operand_address): + if not self.sign_flag: + self.cycles += 1 + self.program_counter = operand_address + + def BVC(self, operand_address): + if not self.overflow_flag: + self.cycles += 1 + self.program_counter = operand_address + + def BVS(self, operand_address): + if self.overflow_flag: + self.cycles += 1 + self.program_counter = operand_address + + # SET / CLEAR FLAGS + + def CLC(self): + self.carry_flag = 0 + + def CLD(self): + self.decimal_mode_flag = 0 + + def CLI(self): + self.interrupt_disable_flag = 0 + + def CLV(self): + self.overflow_flag = 0 + + def SEC(self): + self.carry_flag = 1 + + def SED(self): + self.decimal_mode_flag = 1 + + def SEI(self): + self.interrupt_disable_flag = 1 + + # INCREMENT / DECREMENT + + def DEC(self, operand_address): + self.cycles += 2 + self.write_byte(operand_address, self.update_nz(self.read_byte(operand_address) - 1)) + + def DEX(self): + self.x_index = self.update_nz(self.x_index - 1) + + def DEY(self): + self.y_index = self.update_nz(self.y_index - 1) + + def INC(self, operand_address): + self.cycles += 2 + self.write_byte(operand_address, self.update_nz(self.read_byte(operand_address) + 1)) + + def INX(self): + self.x_index = self.update_nz(self.x_index + 1) + + def INY(self): + self.y_index = self.update_nz(self.y_index + 1) + + # PUSH / PULL + + def PHA(self): + self.cycles += 1 + self.push_byte(self.accumulator) + + def PHP(self): + self.cycles += 1 + self.push_byte(self.status_as_byte()) + + def PLA(self): + self.cycles += 2 + self.accumulator = self.update_nz(self.pull_byte()) + + def PLP(self): + self.cycles += 2 + self.status_from_byte(self.pull_byte()) + + # LOGIC + + def AND(self, operand_address): + self.accumulator = self.update_nz(self.accumulator & self.read_byte(operand_address)) + + def ORA(self, operand_address): + self.accumulator = self.update_nz(self.accumulator | self.read_byte(operand_address)) + + def EOR(self, operand_address): + self.accumulator = self.update_nz(self.accumulator ^ self.read_byte(operand_address)) + + # ARITHMETIC + + def ADC(self, operand_address): + # @@@ doesn't handle BCD yet + assert not self.decimal_mode_flag + + a2 = self.accumulator + a1 = signed(a2) + m2 = self.read_byte(operand_address) + m1 = signed(m2) + + # twos complement addition + result1 = a1 + m1 + self.carry_flag + + # unsigned addition + result2 = a2 + m2 + self.carry_flag + + self.accumulator = self.update_nzc(result2) + + # perhaps this could be calculated from result2 but result1 is more intuitive + self.overflow_flag = [0, 1][(result1 > 127) | (result1 < -128)] + + def SBC(self, operand_address): + # @@@ doesn't handle BCD yet + assert not self.decimal_mode_flag + + a2 = self.accumulator + a1 = signed(a2) + m2 = self.read_byte(operand_address) + m1 = signed(m2) + + # twos complement subtraction + result1 = a1 - m1 - [1, 0][self.carry_flag] + + # unsigned subtraction + result2 = a2 - m2 - [1, 0][self.carry_flag] + + self.accumulator = self.update_nz(result2) + self.carry_flag = [0, 1][(result2 >= 0)] + + # perhaps this could be calculated from result2 but result1 is more intuitive + self.overflow_flag = [0, 1][(result1 > 127) | (result1 < -128)] + + # BIT + + def BIT(self, operand_address): + value = self.read_byte(operand_address) + self.sign_flag = ((value >> 7) % 2) # bit 7 + self.overflow_flag = ((value >> 6) % 2) # bit 6 + self.zero_flag = [0, 1][((self.accumulator & value) == 0)] + + # COMPARISON + + def CMP(self, operand_address): + result = self.accumulator - self.read_byte(operand_address) + self.carry_flag = [0, 1][(result >= 0)] + self.update_nz(result) + + def CPX(self, operand_address): + result = self.x_index - self.read_byte(operand_address) + self.carry_flag = [0, 1][(result >= 0)] + self.update_nz(result) + + def CPY(self, operand_address): + result = self.y_index - self.read_byte(operand_address) + self.carry_flag = [0, 1][(result >= 0)] + self.update_nz(result) + + # SYSTEM + + def NOP(self): + pass + + def BRK(self): + self.cycles += 5 + self.push_word(self.program_counter + 1) + self.push_byte(self.status_as_byte()) + self.program_counter = self.read_word(0xFFFE) + self.break_flag = 1 + + def RTI(self): + self.cycles += 4 + self.status_from_byte(self.pull_byte()) + self.program_counter = self.pull_word() + + + # @@@ IRQ + # @@@ NMI diff --git a/tests.py b/tests.py index 6a5d606..e9c6613 100644 --- a/tests.py +++ b/tests.py @@ -1,5 +1,5 @@ import unittest -from cpu6502 import Memory, CPU +from cpu6502 import Memory, CPU6502 class TestMemory(unittest.TestCase): @@ -26,7 +26,7 @@ class TestLoadStoreOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) self.memory.load(0x1000, [0x00, 0x01, 0x7F, 0x80, 0xFF]) def test_LDA(self): @@ -115,7 +115,7 @@ class TestRegisterTransferOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_TAX(self): self.cpu.accumulator = 0x00 @@ -190,7 +190,7 @@ class TestStackOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_TSX(self): s = self.cpu.stack_pointer @@ -238,7 +238,7 @@ class TestLogicalOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_AND(self): self.memory.write_byte(None, 0x1000, 0x37) @@ -326,7 +326,7 @@ class TestArithmeticOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_ADC_without_BCD(self): @@ -545,7 +545,7 @@ class TestIncrementDecrementOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_INC(self): self.memory.write_byte(None, 0x1000, 0x00) @@ -654,7 +654,7 @@ class TestShiftOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_ASL(self): self.cpu.accumulator = 0x01 @@ -761,7 +761,7 @@ class TestJumpCallOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_JMP(self): self.cpu.JMP(0x1000) @@ -793,7 +793,7 @@ class TestBranchOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_BCC(self): self.cpu.program_counter = 0x1000 @@ -880,7 +880,7 @@ class TestStatusFlagOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_CLC(self): self.cpu.carry_flag = 1 @@ -922,7 +922,7 @@ class TestSystemFunctionOperations(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_BRK(self): self.cpu.program_counter = 0x1000 @@ -952,7 +952,7 @@ class Test6502Bugs(unittest.TestCase): def setUp(self): self.memory = Memory(use_bus=False) - self.cpu = CPU(self.memory) + self.cpu = CPU6502(self.memory) def test_zero_page_x(self): self.cpu.x_index = 0x01