1
0
mirror of https://github.com/mnaberez/py65.git synced 2025-01-04 16:30:42 +00:00

Changed ObservableMemory interface. (offe)

This commit is contained in:
Mike Naberezny 2009-04-04 19:40:22 -07:00
parent 354117fef1
commit 1ed2582247

View File

@ -1,44 +1,45 @@
class ObservableMemory:
READ = 0
WRITE = 1
RW = 2
from collections import defaultdict
class ObservableMemory:
def __init__(self, subject=None):
if subject is None:
subject = []
for addr in range(0x0000, 0xFFFF+1):
subject.insert(addr, 0x00)
subject = 0x10000 * [0x00]
self._subject = subject
self._observers = []
self._listeners = defaultdict(list)
self._providers = defaultdict(list)
def __setitem__(self, address, value):
for oper, addr_range, callback in self._observers:
if address in addr_range:
if (oper == self.RW) or (oper == self.WRITE):
result = callback(self.WRITE, address, value)
if result is not None:
value = result
self._subject[address] = value
callbacks = self._listeners[address]
for callback in callbacks:
result = callback(address, value)
if result is not None:
value = result
def __getitem__(self, address):
for oper, addr_range, callback in self._observers:
if address in addr_range:
if (oper == self.RW) or (oper == self.READ):
result = callback(self.READ, address, None)
if result is not None:
return result
return self._subject[address]
callbacks = self._providers[address]
final_result = None
for callback in callbacks:
result = callback(address)
if result is not None:
final_result = result
if final_result:
return final_result
else:
return self._subject[address]
def __getattr__(self, address):
return getattr(self._subject, address)
def subscribe(self, operation, addr_range, callback):
if operation not in (self.READ, self.WRITE, self.RW):
raise ValueError("Unsupported operation")
self._observers.append([operation, addr_range, callback])
def register_provider(self, address_range, callback):
for address in address_range:
self._providers[address].append(callback)
def dma_read(self, key):
return self._subject[key]
def dma_write(self, key, value):
self._subject[key] = value
def register_listener(self, address_range, callback):
for address in address_range:
self._listeners[address].append(callback)
def write(self, start_address, bytes):
self._subject[start_address:start_address+len(bytes)] = bytes