2009-04-05 06:10:22 +00:00
|
|
|
import unittest
|
|
|
|
from py65.memory import ObservableMemory
|
|
|
|
|
2012-11-19 20:44:30 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
class ObservableMemoryTests(unittest.TestCase):
|
|
|
|
|
|
|
|
# __setitem__
|
|
|
|
|
|
|
|
def test___setitem__with_no_listeners_changes_memory(self):
|
|
|
|
subject = self._make_subject()
|
|
|
|
mem = ObservableMemory(subject=subject)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
mem[0xC000] = 0xAB
|
|
|
|
self.assertEqual(0xAB, subject[0xC000])
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def test___setitem__ignores_subscribers_returning_none(self):
|
2009-04-05 06:10:22 +00:00
|
|
|
subject = self._make_subject()
|
|
|
|
mem = ObservableMemory(subject=subject)
|
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def write_subscriber_1(address, value):
|
2009-04-05 06:10:22 +00:00
|
|
|
return None
|
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def write_subscriber_2(address, value):
|
2009-04-05 06:10:22 +00:00
|
|
|
return None
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
mem.subscribe_to_write([0xC000], write_subscriber_1)
|
|
|
|
mem.subscribe_to_write([0xC000], write_subscriber_2)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
mem[0xC000] = 0xAB
|
|
|
|
self.assertEqual(0xAB, subject[0xC000])
|
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def test___setitem__uses_result_of_last_subscriber(self):
|
2009-04-05 06:10:22 +00:00
|
|
|
subject = self._make_subject()
|
|
|
|
mem = ObservableMemory(subject=subject)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def write_subscriber_1(address, value):
|
2009-04-05 06:10:22 +00:00
|
|
|
return 0x01
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def write_subscriber_2(address, value):
|
2009-04-05 06:10:22 +00:00
|
|
|
return 0x02
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
mem.subscribe_to_write([0xC000], write_subscriber_1)
|
|
|
|
mem.subscribe_to_write([0xC000], write_subscriber_2)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
mem[0xC000] = 0xAB
|
|
|
|
self.assertEqual(0x02, subject[0xC000])
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
# subscribe_to_read
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def test_subscribe_to_read_covers_all_addresses_in_range(self):
|
2009-04-05 06:10:22 +00:00
|
|
|
subject = self._make_subject()
|
|
|
|
mem = ObservableMemory(subject=subject)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def read_subscriber(address, value):
|
2009-04-05 06:10:22 +00:00
|
|
|
return 0xAB
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2013-10-26 21:08:21 +00:00
|
|
|
mem.subscribe_to_read(range(0xC000, 0xC001 + 1), read_subscriber)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
mem[0xC000] = 0xAB
|
|
|
|
mem[0xC001] = 0xAB
|
|
|
|
self.assertEqual(0xAB, subject[0xC001])
|
|
|
|
self.assertEqual(0xAB, subject[0xC001])
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def test__subscribe_to_read_does_not_register_same_listener_twice(self):
|
2009-04-05 06:10:22 +00:00
|
|
|
subject = self._make_subject()
|
|
|
|
mem = ObservableMemory(subject=subject)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 19:30:34 +00:00
|
|
|
calls = []
|
2012-11-19 20:44:30 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def read_subscriber(address):
|
|
|
|
calls.append('read_subscriber')
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
mem.subscribe_to_read([0xC000], read_subscriber)
|
|
|
|
mem.subscribe_to_read([0xC000], read_subscriber)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
value = mem[0xC000]
|
2013-10-26 23:55:53 +00:00
|
|
|
value = value # pyflakes
|
2009-04-05 21:57:13 +00:00
|
|
|
self.assertEqual(['read_subscriber'], calls)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
# __getitem__
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def test___getitem__with_no_write_subscribers_changes_memory(self):
|
2009-04-05 06:10:22 +00:00
|
|
|
subject = self._make_subject()
|
|
|
|
mem = ObservableMemory(subject=subject)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
subject[0xC000] = 0xAB
|
|
|
|
self.assertEqual(0xAB, mem[0xC000])
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def test___getitem__ignores_read_subscribers_returning_none(self):
|
2009-04-05 06:10:22 +00:00
|
|
|
subject = self._make_subject()
|
|
|
|
mem = ObservableMemory(subject=subject)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def read_subscriber_1(address):
|
2009-04-05 06:10:22 +00:00
|
|
|
return None
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def read_subscriber_2(address):
|
2009-04-05 06:10:22 +00:00
|
|
|
return None
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
mem.subscribe_to_read([0xC000], read_subscriber_1)
|
|
|
|
mem.subscribe_to_read([0xC000], read_subscriber_2)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
mem[0xC000] = 0xAB
|
|
|
|
self.assertEqual(0xAB, subject[0xC000])
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def test___getitem__calls_all_read_subscribers_uses_last_result(self):
|
2009-04-05 06:10:22 +00:00
|
|
|
subject = self._make_subject()
|
|
|
|
mem = ObservableMemory(subject=subject)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 19:30:34 +00:00
|
|
|
calls = []
|
2012-11-19 20:44:30 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def read_subscriber_1(address):
|
|
|
|
calls.append('read_subscriber_1')
|
2009-04-05 18:08:44 +00:00
|
|
|
return 0x01
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def read_subscriber_2(address):
|
|
|
|
calls.append('read_subscriber_2')
|
2009-04-05 18:08:44 +00:00
|
|
|
return 0x02
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
mem.subscribe_to_read([0xC000], read_subscriber_1)
|
|
|
|
mem.subscribe_to_read([0xC000], read_subscriber_2)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
subject[0xC000] = 0xAB
|
2009-04-05 18:08:44 +00:00
|
|
|
self.assertEqual(0x02, mem[0xC000])
|
2009-04-05 06:10:22 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
expected_calls = ['read_subscriber_1', 'read_subscriber_2']
|
|
|
|
self.assertEqual(expected_calls, calls)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
# __getattr__
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
def test__getattr__proxies_subject(self):
|
|
|
|
subject = self._make_subject()
|
|
|
|
mem = ObservableMemory(subject=subject)
|
|
|
|
self.assertEqual(subject.count, mem.count)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
# write
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 06:10:22 +00:00
|
|
|
def test_write_directly_writes_values_to_subject(self):
|
|
|
|
subject = self._make_subject()
|
|
|
|
mem = ObservableMemory(subject=subject)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2009-04-05 21:57:13 +00:00
|
|
|
def write_subscriber(address, value):
|
2009-04-05 06:10:22 +00:00
|
|
|
return 0xFF
|
2012-11-19 20:44:30 +00:00
|
|
|
mem.subscribe_to_write([0xC000, 0xC001], write_subscriber)
|
2012-11-19 18:26:03 +00:00
|
|
|
|
2013-10-26 20:56:06 +00:00
|
|
|
mem.write(0xC000, [0x01, 0x02])
|
2009-04-05 06:10:22 +00:00
|
|
|
self.assertEqual(0x01, subject[0xC000])
|
|
|
|
self.assertEqual(0x02, subject[0xC001])
|
|
|
|
|
|
|
|
# Test Helpers
|
2012-11-19 18:26:03 +00:00
|
|
|
|
|
|
|
def _make_subject(self):
|
|
|
|
subject = 0x10000 * [0x00]
|
2009-04-05 06:10:22 +00:00
|
|
|
return subject
|
|
|
|
|
2011-08-19 16:33:21 +00:00
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|