mirror of
https://github.com/mnaberez/py65.git
synced 2025-02-06 02:31:08 +00:00
Added trap for keyboard input.
This commit is contained in:
parent
de651a4053
commit
fda4631664
@ -12,22 +12,26 @@ class ObservableMemory:
|
||||
self._observers = []
|
||||
|
||||
def __setitem__(self, address, value):
|
||||
self._notify(self.WRITE, address, value)
|
||||
for oper, addr_range, callback in self._observers:
|
||||
if address in addr_range:
|
||||
if (oper == self.RW) or (oper == self.WRITE):
|
||||
result = callback(self.WRITE, address, value)
|
||||
if result is not None:
|
||||
value = result
|
||||
self._subject[address] = value
|
||||
|
||||
def __getitem__(self, address):
|
||||
self._notify(self.READ, address)
|
||||
for oper, addr_range, callback in self._observers:
|
||||
if address in addr_range:
|
||||
if (oper == self.RW) or (oper == self.READ):
|
||||
result = callback(self.READ, address, None)
|
||||
if result is not None:
|
||||
return result
|
||||
return self._subject[address]
|
||||
|
||||
def __getattr__(self, address):
|
||||
return getattr(self._subject, address)
|
||||
|
||||
def _notify(self, operation, address, value=None):
|
||||
for oper, addr_range, callback in self._observers:
|
||||
if address in addr_range:
|
||||
if (oper == self.RW) or (oper == operation):
|
||||
callback(operation, address, value)
|
||||
|
||||
def subscribe(self, operation, addr_range, callback):
|
||||
if operation not in (self.READ, self.WRITE, self.RW):
|
||||
raise ValueError("Unsupported operation")
|
||||
|
@ -5,8 +5,9 @@ import os
|
||||
import re
|
||||
import shlex
|
||||
import asyncore
|
||||
import sys
|
||||
from py65.mpu6502 import MPU
|
||||
from py65.util import itoa, AddressParser
|
||||
from py65.util import itoa, AddressParser, getch
|
||||
from py65.memory import ObservableMemory
|
||||
|
||||
class Monitor(cmd.Cmd):
|
||||
@ -54,11 +55,16 @@ class Monitor(cmd.Cmd):
|
||||
return line
|
||||
|
||||
def _install_mpu_observers(self):
|
||||
def printit(operation, address, value):
|
||||
def putc(operation, address, value):
|
||||
self.stdout.write(chr(value))
|
||||
self.stdout.flush()
|
||||
|
||||
def getc(operation, address, value):
|
||||
return getch(self.stdin)
|
||||
|
||||
m = ObservableMemory()
|
||||
m.subscribe(m.WRITE, [0xE001], printit)
|
||||
m.subscribe(m.WRITE, [0xF001], putc)
|
||||
m.subscribe(m.READ, [0xF004], getc)
|
||||
|
||||
self._mpu.memory = m
|
||||
|
||||
|
@ -1,4 +1,8 @@
|
||||
import re
|
||||
import select
|
||||
import termios
|
||||
import fcntl
|
||||
import os
|
||||
|
||||
class AddressParser:
|
||||
"""Parse user input into addresses or ranges of addresses.
|
||||
@ -123,3 +127,31 @@ bin2bcd = [
|
||||
0x80,0x81,0x82,0x83,0x84,0x85,0x86,0x87,0x88,0x89,
|
||||
0x90,0x91,0x92,0x93,0x94,0x95,0x96,0x97,0x98,0x99
|
||||
]
|
||||
|
||||
def getch(stdin):
|
||||
""" Performs a nonblocking read of one byte from stdin and returns
|
||||
its ordinal value. If no byte is available, 0 is returned.
|
||||
"""
|
||||
|
||||
fd = stdin.fileno()
|
||||
|
||||
oldterm = termios.tcgetattr(fd)
|
||||
newattr = oldterm[:]
|
||||
newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO
|
||||
termios.tcsetattr(fd, termios.TCSANOW, newattr)
|
||||
|
||||
oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)
|
||||
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
|
||||
|
||||
try:
|
||||
byte = 0
|
||||
r, w, e = select.select([fd], [], [], 0.1)
|
||||
if r:
|
||||
c = stdin.read(1)
|
||||
byte = ord(c)
|
||||
if byte == 0x0a:
|
||||
byte = 0x0d
|
||||
finally:
|
||||
termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm)
|
||||
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags)
|
||||
return byte
|
||||
|
Loading…
x
Reference in New Issue
Block a user