diff --git a/bin/py65m.py b/bin/py65m.py new file mode 100644 index 0000000..d6fba0a --- /dev/null +++ b/bin/py65m.py @@ -0,0 +1,910 @@ +#!/usr/bin/env python -u + +"""py65mon -- interact with a simulated 6502-based system + +Usage: %s [options] + +Options: +-h, --help : Show this message +-m, --mpu : Choose which MPU device (default is 6502) +-l, --load : Load a file at address 0 +-r, --rom : Load a rom at the top of address space and reset into it +-g, --goto
: Perform a goto command after loading any files +-i, --input
: define location of getc (default $f004) +-o, --output
: define location of putc (default $f001) +""" + +import cmd +import getopt +import os +import re +import shlex +import sys + +from asyncore import compact_traceback +from py65.devices.mpu6502 import MPU as NMOS6502 +from py65.devices.mpu65c02 import MPU as CMOS65C02 +from py65.devices.mpu65org16 import MPU as V65Org16 +from py65.disassembler import Disassembler +from py65.assembler import Assembler +from py65.utils.addressing import AddressParser +from py65.utils import console +from py65.utils.conversions import itoa +from py65.memory import ObservableMemory + +try: + from urllib2 import urlopen +except ImportError: # Python 3 + from urllib.request import urlopen + +class FileSystem(): + + def __init__(self): + self.registers = 8 + self.register = {} + + def write_mem(self, address, value): + self.register[address] = value; + + def read_mem(self, address): + return self.register[address] + +class Monitor(cmd.Cmd): + + Microprocessors = {'6502': NMOS6502, '65C02': CMOS65C02, + '65Org16': V65Org16} + + def __init__(self, mpu_type=NMOS6502, completekey='tab', stdin=None, + stdout=None, argv=None): + self.mpu_type = mpu_type + self.putc_addr = 0xF001 + self.getc_addr = 0xF004 + self.fsio_addr = 0xF100 + if argv is None: + argv = sys.argv + self._breakpoints = [] + self._width = 78 + self.prompt = "." + self._add_shortcuts() + cmd.Cmd.__init__(self, completekey, stdin, stdout) + self._parse_args(argv) + self._reset(self.mpu_type,self.getc_addr,self.putc_addr,self.fsio_addr) + + def _parse_args(self, argv): + try: + shortopts = 'hf:i:o:m:l:r:g:' + longopts = ['help', 'mpu=', 'fileio=', 'input=', 'output=', 'load=', 'rom=', 'goto='] + options, args = getopt.getopt(argv[1:], shortopts, longopts) + except getopt.GetoptError as exc: + self._output(exc.args[0]) + self._usage() + self._exit(1) + + load = None + rom = None + goto = None + mpu = None + + for opt, value in options: + if opt in ('-i', '--input'): + self.getc_addr = int(value, 16) + + if opt in ('-o', '--output'): + self.putc_addr = int(value, 16) + + if opt in ('-f', '--fileio'): + self.fsio_addr = int(value, 16) + + if opt in ('-l', '--load'): + load = value + + if opt in ('-r', '--rom'): + rom = value + + if opt in ('-g', '--goto'): + goto = value + + if opt in ('-m', '--mpu'): + mpu = value + + elif opt in ("-h", "--help"): + self._usage() + self._exit(0) + + if (mpu is not None) or (rom is not None): + if mpu is None: + mpu = "6502" + if self._get_mpu(mpu) is None: + mpus = list(self.Microprocessors.keys()) + mpus.sort() + msg = "Fatal: no such MPU. Available MPUs: %s" + self._output(msg % ', '.join(mpus)) + sys.exit(1) + cmd = "mpu %s" % mpu + self.onecmd(cmd) + + if load is not None: + cmd = "load %s" % load + self.onecmd(cmd) + + if goto is not None: + cmd = "goto %s" % goto + self.onecmd(cmd) + + if rom is not None: + # load a ROM and run from the reset vector + cmd = "load '%s' top" % rom + self.onecmd(cmd) + physMask = self._mpu.memory.physMask + reset = self._mpu.RESET & physMask + dest = self._mpu.memory[reset] + \ + (self._mpu.memory[reset + 1] << self.byteWidth) + cmd = "goto %08x" % dest + self.onecmd(cmd) + + def _usage(self): + usage = __doc__ % sys.argv[0] + self._output(usage) + + def onecmd(self, line): + line = self._preprocess_line(line) + + result = None + try: + result = cmd.Cmd.onecmd(self, line) + except KeyboardInterrupt: + self._output("Interrupt") + except Exception: + (file, fun, line), t, v, tbinfo = compact_traceback() + error = 'Error: %s, %s: file: %s line: %s' % (t, v, file, line) + self._output(error) + + if not line.startswith("quit"): + self._output_mpu_status() + + return result + + def _reset(self, mpu_type,getc_addr=0xF004,putc_addr=0xF001,fsio_addr=0xF100): + self._mpu = mpu_type() + self.addrWidth = self._mpu.ADDR_WIDTH + self.byteWidth = self._mpu.BYTE_WIDTH + self.addrFmt = self._mpu.ADDR_FORMAT + self.byteFmt = self._mpu.BYTE_FORMAT + self.addrMask = self._mpu.addrMask + self.byteMask = self._mpu.byteMask + self._install_mpu_observers(getc_addr,putc_addr,fsio_addr) + self._address_parser = AddressParser() + self._disassembler = Disassembler(self._mpu, self._address_parser) + self._assembler = Assembler(self._mpu, self._address_parser) + + def _add_shortcuts(self): + self._shortcuts = {'EOF': 'quit', + '~': 'tilde', + 'a': 'assemble', + 'ab': 'add_breakpoint', + 'al': 'add_label', + 'd': 'disassemble', + 'db': 'delete_breakpoint', + 'dl': 'delete_label', + 'exit': 'quit', + 'f': 'fill', + '>': 'fill', + 'g': 'goto', + 'h': 'help', + '?': 'help', + 'l': 'load', + 'm': 'mem', + 'q': 'quit', + 'r': 'registers', + 'ret': 'return', + 'rad': 'radix', + 's': 'save', + 'shb': 'show_breakpoints', + 'shl': 'show_labels', + 'x': 'quit', + 'z': 'step'} + + def _preprocess_line(self, line): + # line comments + quoted = False + for pos, char in enumerate(line): + if char in ('"', "'"): + quoted = not quoted + if (not quoted) and (char == ';'): + line = line[:pos] + break + + # whitespace & leading dots + line = line.strip(' \t').lstrip('.') + + # special case for vice compatibility + if line.startswith('~'): + line = self._shortcuts['~'] + ' ' + line[1:] + + # command shortcuts + for shortcut, command in self._shortcuts.items(): + if line == shortcut: + line = command + break + + pattern = '^%s\s+' % re.escape(shortcut) + matches = re.match(pattern, line) + if matches: + start, end = matches.span() + line = "%s %s" % (command, line[end:]) + break + + return line + + def _get_mpu(self, name): + requested = name.lower() + mpu = None + for key, klass in self.Microprocessors.items(): + if key.lower() == requested: + mpu = klass + break + return mpu + + def _install_mpu_observers(self,getc_addr,putc_addr,fsio_addr): + def putc(address, value): + try: + self.stdout.write(chr(value)) + except UnicodeEncodeError: # Python 3 + self.stdout.write("?") + self.stdout.flush() + + def getc(address): + char = console.getch_noblock(self.stdin) + if char: + byte = ord(char) + else: + byte = 0 + return byte + + + m = ObservableMemory(addrWidth=self.addrWidth) + m.subscribe_to_write([self.putc_addr], putc) + m.subscribe_to_read([self.getc_addr], getc) + + fs = FileSystem() + fsio_range = range(fsio_addr, fsio_addr+fs.registers) + m.subscribe_to_write(fsio_range, fs.write_mem) + m.subscribe_to_read(fsio_range, fs.read_mem) + + self._mpu.memory = m + + def _output_mpu_status(self): + self._output("\n" + repr(self._mpu)) + + def _output(self, stuff): + self.stdout.write("%s\n" % stuff) + + def _exit(self, exitcode=0): + sys.exit(exitcode) + + def do_help(self, args): + args = self._shortcuts.get(args.strip(), args) + return cmd.Cmd.do_help(self, args) + + def help_version(self): + self._output("version\t\tDisplay Py65 version information.") + + def do_version(self, args): + self._output("\nPy65 Monitor") + + def help_help(self): + self._output("help\t\tPrint a list of available actions.") + self._output("help \tPrint help for .") + + def help_reset(self): + self._output("reset\t\tReset the microprocessor") + + def do_reset(self, args): + klass = self._mpu.__class__ + self._reset(mpu_type=klass) + + def do_mpu(self, args): + def available_mpus(): + mpus = list(self.Microprocessors.keys()) + mpus.sort() + self._output("Available MPUs: %s" % ', '.join(mpus)) + + if args == '': + self._output("Current MPU is %s" % self._mpu.name) + available_mpus() + else: + new_mpu = self._get_mpu(args) + if new_mpu is None: + self._output("Unknown MPU: %s" % args) + available_mpus() + else: + self._reset(new_mpu,self.getc_addr,self.putc_addr,self.fsio_addr) + self._output("Reset with new MPU %s" % self._mpu.name) + + def help_mpu(self): + self._output("mpu\t\tPrint available microprocessors.") + self._output("mpu \tSelect a new microprocessor.") + + def do_quit(self, args): + self._output('') + return 1 + + def help_quit(self): + self._output("To quit, type ^D or use the quit command.") + + def do_assemble(self, args): + splitted = args.split(None, 1) + if len(splitted) != 2: + return self._interactive_assemble(args) + + statement = splitted[1] + try: + start = self._address_parser.number(splitted[0]) + bytes = self._assembler.assemble(statement, start) + end = start + len(bytes) + self._mpu.memory[start:end] = bytes + self.do_disassemble(self.addrFmt % start) + except KeyError as exc: + self._output(exc.args[0]) # "Label not found: foo" + except OverflowError: + self._output("Overflow error: %s" % args) + except SyntaxError: + self._output("Syntax error: %s" % statement) + + def help_assemble(self): + self._output("assemble\t\t\t" + "Start interactive assembly at the program counter.") + self._output("assemble
\t\t" + "Start interactive assembly at the address.") + self._output("assemble
\t" + "Assemble a statement at the address.") + + def _interactive_assemble(self, args): + if args == '': + start = self._mpu.pc + else: + try: + start = self._address_parser.number(args) + except KeyError as exc: + self._output(exc.args[0]) # "Label not found: foo" + return + + while True: + prompt = "\r$" + (self.addrFmt % start) + " " + \ + (" " * int(1 + self.byteWidth / 4) * 3) + + line = console.line_input(prompt, + stdin=self.stdin, stdout=self.stdout) + + if not line.strip(): + self.stdout.write("\n") + return + + # assemble into memory + try: + bytes = self._assembler.assemble(line, pc=start) + numbytes = len(bytes) + + end = start + numbytes + self._mpu.memory[start:end] = bytes + + # print disassembly + _, disasm = self._disassembler.instruction_at(start) + fdisasm = self._format_disassembly(start, numbytes, disasm) + indent = ' ' * (len(prompt + line) + 5) + self.stdout.write("\r" + indent + "\r") + self.stdout.write(fdisasm + "\n") + + # advance to next address + start += numbytes + if start >= (2 ** self._mpu.ADDR_WIDTH): + start = 0 + except KeyError: + addr = self.addrFmt % start + self.stdout.write("\r$%s ?Label\n" % addr) + except OverflowError: + addr = self.addrFmt % start + self.stdout.write("\r$%s ?Overflow\n" % addr) + except SyntaxError: + addr = self.addrFmt % start + self.stdout.write("\r$%s ?Syntax\n" % addr) + + def do_disassemble(self, args): + splitted = shlex.split(args) + if len(splitted) != 1: + return self.help_disassemble() + + address_parts = splitted[0].split(":") + start = self._address_parser.number(address_parts[0]) + if len(address_parts) > 1: + end = self._address_parser.number(address_parts[1]) + else: + end = start + + max_address = (2 ** self._mpu.ADDR_WIDTH) - 1 + cur_address = start + needs_wrap = start > end + + while needs_wrap or cur_address <= end: + length, disasm = self._disassembler.instruction_at(cur_address) + self._output(self._format_disassembly(cur_address, length, disasm)) + + remaining = length + while remaining: + remaining -= 1 + cur_address += 1 + if start > end and cur_address > max_address: + needs_wrap = False + cur_address = 0 + + def _format_disassembly(self, address, length, disasm): + cur_address = address + max_address = (2 ** self._mpu.ADDR_WIDTH) - 1 + + bytes_remaining = length + dump = '' + + while bytes_remaining: + if cur_address > max_address: + cur_address = 0 + dump += self.byteFmt % self._mpu.memory[cur_address] + " " + cur_address += 1 + bytes_remaining -= 1 + + fieldwidth = 1 + int(1 + self.byteWidth / 4) * 3 + fieldfmt = "%%-%ds" % fieldwidth + return "$" + self.addrFmt % address + " " + fieldfmt % dump + disasm + + def help_disassemble(self): + self._output("disassemble ") + self._output("Disassemble instructions in the address range.") + self._output('Range is specified like ":".') + + def help_step(self): + self._output("step") + self._output("Single-step through instructions.") + + def do_step(self, args): + self._mpu.step() + self.do_disassemble(self.addrFmt % self._mpu.pc) + + def help_return(self): + self._output("return") + self._output("Continues execution and returns to the monitor just") + self._output("before the next RTS or RTI is executed.") + + def do_return(self, args): + returns = [0x60, 0x40] # RTS, RTI + self._run(stopcodes=returns) + + def help_goto(self): + self._output("goto
") + self._output("Change the PC to address and continue execution.") + + def do_goto(self, args): + if args == '': + return self.help_goto() + + self._mpu.pc = self._address_parser.number(args) + brks = [0x00] # BRK + self._run(stopcodes=brks) + + def _run(self, stopcodes): + stopcodes = set(stopcodes) + breakpoints = set(self._breakpoints) + mpu = self._mpu + mem = self._mpu.memory + + if not breakpoints: + while True: + mpu.step() + if mem[mpu.pc] in stopcodes: + break + else: + while True: + mpu.step() + pc = mpu.pc + if mem[pc] in stopcodes: + break + if pc in breakpoints: + msg = "Breakpoint %d reached." + self._output(msg % self._breakpoints.index(pc)) + break + + def help_radix(self): + self._output("radix [H|D|O|B]") + self._output("Set default radix to hex, decimal, octal, or binary.") + self._output("With no argument, the current radix is printed.") + + def help_cycles(self): + self._output("Display the total number of cycles executed.") + + def do_cycles(self, args): + self._output(str(self._mpu.processorCycles)) + + def do_radix(self, args): + radixes = {'Hexadecimal': 16, 'Decimal': 10, 'Octal': 8, 'Binary': 2} + + if args != '': + new = args[0].lower() + changed = False + for name, radix in radixes.items(): + if name[0].lower() == new: + self._address_parser.radix = radix + changed = True + if not changed: + self._output("Illegal radix: %s" % args) + + for name, radix in radixes.items(): + if self._address_parser.radix == radix: + self._output("Default radix is %s" % name) + + def help_tilde(self): + self._output("~ ") + self._output("Display a number in decimal, hex, octal, and binary.") + + def do_tilde(self, args): + if args == '': + return self.help_tilde() + + try: + num = self._address_parser.number(args) + self._output("+%u" % num) + self._output("$" + self.byteFmt % num) + self._output("%04o" % num) + self._output(itoa(num, 2).zfill(8)) + except KeyError: + self._output("Bad label: %s" % args) + except OverflowError: + self._output("Overflow error: %s" % args) + + def help_registers(self): + self._output("registers[= [, =]*]") + self._output("Assign respective registers. With no parameters,") + self._output("display register values.") + + def do_registers(self, args): + if args == '': + return + + pairs = re.findall('([^=,\s]*)=([^=,\s]*)', args) + if pairs == []: + return self._output("Syntax error: %s" % args) + + for register, value in pairs: + if register not in ('pc', 'sp', 'a', 'x', 'y', 'p'): + self._output("Invalid register: %s" % register) + else: + try: + intval = self._address_parser.number(value) + except KeyError as exc: # label not found + self._output(exc.args[0]) + continue + except OverflowError as exc: # wider than address space + msg = "Overflow: %r too wide for register %r" + self._output(msg % (value, register)) + continue + + if register != 'pc': + if intval != (intval & self.byteMask): + msg = "Overflow: %r too wide for register %r" + self._output(msg % (value, register)) + continue + + setattr(self._mpu, register, intval) + + def help_cd(self): + self._output("cd ") + self._output("Change the working directory.") + + def do_cd(self, args): + if args == '': + return self.help_cd() + + try: + os.chdir(args) + except OSError as exc: + msg = "Cannot change directory: [%d] %s" % (exc.errno, + exc.strerror) + self._output(msg) + self.do_pwd() + + def help_pwd(self): + self._output("Show the current working directory.") + + def do_pwd(self, args=None): + cwd = os.getcwd() + self._output(cwd) + + def help_load(self): + self._output("load ") + self._output("Load a file into memory at the specified address.") + self._output('An address of "top" loads into the top of memory.') + self._output("Commodore-style load address bytes are ignored.") + + def do_load(self, args): + split = shlex.split(args) + if len(split) not in (1, 2): + self._output("Syntax error: %s" % args) + return + + filename = split[0] + + if "://" in filename: + try: + f = urlopen(filename) + bytes = f.read() + f.close() + except Exception as exc: + msg = "Cannot fetch remote file: %s" % str(exc) + self._output(msg) + return + else: + try: + f = open(filename, 'rb') + bytes = f.read() + f.close() + except (OSError, IOError) as exc: + msg = "Cannot load file: [%d] %s" % (exc.errno, exc.strerror) + self._output(msg) + return + + if len(split) == 2: + if split[1] == "top": + # load a ROM to top of memory + top_address = self.addrMask + program_size = len(bytes) // (self.byteWidth // 8) + start = top_address - program_size + 1 + else: + start = self._address_parser.number(split[1]) + else: + start = self._mpu.pc + + if self.byteWidth == 8: + if isinstance(bytes, str): + bytes = map(ord, bytes) + else: # Python 3 + bytes = [ b for b in bytes ] + + elif self.byteWidth == 16: + def format(msb, lsb): + if isinstance(bytes, str): + return (ord(msb) << 8) + ord(lsb) + else: # Python 3 + return (msb << 8) + lsb + bytes = list(map(format, bytes[0::2], bytes[1::2])) + + self._fill(start, start, bytes) + + def help_save(self): + self._output("save \"filename\" ") + self._output("Save the specified memory range as a binary file.") + self._output("Commodore-style load address bytes are not written.") + + def do_save(self, args): + split = shlex.split(args) + if len(split) != 3: + self._output("Syntax error: %s" % args) + return + + filename = split[0] + start = self._address_parser.number(split[1]) + end = self._address_parser.number(split[2]) + + mem = self._mpu.memory[start:end + 1] + try: + f = open(filename, 'wb') + for m in mem: + # output each octect from msb first + for shift in range(self.byteWidth - 8, -1, -8): + f.write(bytearray([(m >> shift) & 0xff])) + f.close() + except (OSError, IOError) as exc: + msg = "Cannot save file: [%d] %s" % (exc.errno, exc.strerror) + self._output(msg) + return + + self._output("Saved +%d bytes to %s" % (len(mem), filename)) + + def help_fill(self): + self._output("fill ") + self._output("Fill memory in the address range with the data in") + self._output(". If the size of the address range is") + self._output("greater than the size of the data_list, the data_list ") + self._output("is repeated.") + + def do_fill(self, args): + split = shlex.split(args) + if len(split) < 2: + return self.help_fill() + + try: + start, end = self._address_parser.range(split[0]) + filler = list(map(self._address_parser.number, split[1:])) + except KeyError as exc: + self._output(exc.args[0]) # "Label not found: foo" + else: + self._fill(start, end, filler) + + def _fill(self, start, end, filler): + address = start + length, index = len(filler), 0 + + if start == end: + end = start + length - 1 + if (end > self.addrMask): + end = self.addrMask + + while address <= end: + address &= self.addrMask + self._mpu.memory[address] = (filler[index] & self.byteMask) + index += 1 + if index == length: + index = 0 + address += 1 + + fmt = (end - start + 1, start, end) + starttoend = "$" + self.addrFmt + " to $" + self.addrFmt + self._output(("Wrote +%d bytes from " + starttoend) % fmt) + + def help_mem(self): + self._output("mem ") + self._output("Display the contents of memory.") + self._output('Range is specified like "".') + + def do_mem(self, args): + split = shlex.split(args) + if len(split) != 1: + return self.help_mem() + + start, end = self._address_parser.range(split[0]) + + line = self.addrFmt % start + ":" + for address in range(start, end + 1): + byte = self._mpu.memory[address] + more = " " + self.byteFmt % byte + + exceeded = len(line) + len(more) > self._width + if exceeded: + self._output(line) + line = self.addrFmt % address + ":" + line += more + self._output(line) + + def help_add_label(self): + self._output("add_label