mirror of
https://github.com/mnaberez/py65.git
synced 2024-08-06 23:28:59 +00:00
74 lines
2.3 KiB
Python
74 lines
2.3 KiB
Python
from collections import defaultdict
|
|
|
|
|
|
class ObservableMemory:
|
|
def __init__(self, subject=None, addrWidth=16):
|
|
self.physMask = 0xffff
|
|
if addrWidth > 16:
|
|
# even with 32-bit address space, model only 256k memory
|
|
self.physMask = 0x3ffff
|
|
|
|
if subject is None:
|
|
subject = (self.physMask + 1) * [0x00]
|
|
self._subject = subject
|
|
|
|
self._read_subscribers = defaultdict(list)
|
|
self._write_subscribers = defaultdict(list)
|
|
|
|
def __setitem__(self, address, value):
|
|
if isinstance(address, slice):
|
|
r = range(*address.indices(self.physMask + 1))
|
|
for n, v in zip(r, value):
|
|
self[n] = v
|
|
return
|
|
|
|
address &= self.physMask
|
|
callbacks = self._write_subscribers[address]
|
|
|
|
for callback in callbacks:
|
|
result = callback(address, value)
|
|
if result is not None:
|
|
value = result
|
|
|
|
self._subject[address] = value
|
|
|
|
def __getitem__(self, address):
|
|
if isinstance(address, slice):
|
|
r = range(*address.indices(self.physMask + 1))
|
|
return [ self[n] for n in r ]
|
|
|
|
address &= self.physMask
|
|
callbacks = self._read_subscribers[address]
|
|
final_result = None
|
|
|
|
for callback in callbacks:
|
|
result = callback(address)
|
|
if result is not None:
|
|
final_result = result
|
|
|
|
if final_result is None:
|
|
return self._subject[address]
|
|
else:
|
|
return final_result
|
|
|
|
def __getattr__(self, attribute):
|
|
return getattr(self._subject, attribute)
|
|
|
|
def subscribe_to_write(self, address_range, callback):
|
|
for address in address_range:
|
|
address &= self.physMask
|
|
callbacks = self._write_subscribers.setdefault(address, [])
|
|
if callback not in callbacks:
|
|
callbacks.append(callback)
|
|
|
|
def subscribe_to_read(self, address_range, callback):
|
|
for address in address_range:
|
|
address &= self.physMask
|
|
callbacks = self._read_subscribers.setdefault(address, [])
|
|
if callback not in callbacks:
|
|
callbacks.append(callback)
|
|
|
|
def write(self, start_address, bytes):
|
|
start_address &= self.physMask
|
|
self._subject[start_address:start_address + len(bytes)] = bytes
|