py65/py65/memory.py

74 lines
2.3 KiB
Python

from collections import defaultdict
class ObservableMemory:
def __init__(self, subject=None, addrWidth=16):
self.physMask = 0xffff
if addrWidth > 16:
# even with 32-bit address space, model only 256k memory
self.physMask = 0x3ffff
if subject is None:
subject = (self.physMask + 1) * [0x00]
self._subject = subject
self._read_subscribers = defaultdict(list)
self._write_subscribers = defaultdict(list)
def __setitem__(self, address, value):
if isinstance(address, slice):
r = range(*address.indices(self.physMask + 1))
for n, v in zip(r, value):
self[n] = v
return
address &= self.physMask
callbacks = self._write_subscribers[address]
for callback in callbacks:
result = callback(address, value)
if result is not None:
value = result
self._subject[address] = value
def __getitem__(self, address):
if isinstance(address, slice):
r = range(*address.indices(self.physMask + 1))
return [ self[n] for n in r ]
address &= self.physMask
callbacks = self._read_subscribers[address]
final_result = None
for callback in callbacks:
result = callback(address)
if result is not None:
final_result = result
if final_result is None:
return self._subject[address]
else:
return final_result
def __getattr__(self, attribute):
return getattr(self._subject, attribute)
def subscribe_to_write(self, address_range, callback):
for address in address_range:
address &= self.physMask
callbacks = self._write_subscribers.setdefault(address, [])
if callback not in callbacks:
callbacks.append(callback)
def subscribe_to_read(self, address_range, callback):
for address in address_range:
address &= self.physMask
callbacks = self._read_subscribers.setdefault(address, [])
if callback not in callbacks:
callbacks.append(callback)
def write(self, start_address, bytes):
start_address &= self.physMask
self._subject[start_address:start_address + len(bytes)] = bytes