mirror of
https://github.com/KrisKennaway/ii-vision.git
synced 2024-12-21 05:30:20 +00:00
Video() is now aware of target frame rate, and will continue to emit
opcodes until the cycle budget for the frame is exhausted. Output stream is also now aware of TCP framing, and schedules an ACK opcode every 2048 output bytes to instruct the client to perform TCP ACK and buffer management. Fixes several serious bugs in RLE encoding, including: - we were emitting the RLE opcode with the next content byte after the run completed! - we were looking at the wrong field for the start offset! - handle the case where the entire page is a single run - stop trying to allow accumulating error when RLE -- this does not respect the Apple II colour encoding, i.e. may introduce colour fringing. - also because of this we're unlikely to actually be able to find many runs because odd and even columns are encoded differently. In a followup we should start encoding odd and even columns separately Optimize after profiling -- encoder is now about 2x faster Add tests.
This commit is contained in:
parent
cc6c92335d
commit
1b54c9c864
241
video.py
241
video.py
@ -1,8 +1,5 @@
|
||||
import functools
|
||||
from collections import defaultdict
|
||||
from typing import Iterator, Set, Tuple
|
||||
|
||||
import numpy as np
|
||||
from typing import Iterator, Tuple, Iterable
|
||||
|
||||
import opcodes
|
||||
import scheduler
|
||||
@ -10,8 +7,7 @@ import memory_map
|
||||
import screen
|
||||
|
||||
|
||||
@functools.lru_cache(None)
|
||||
def hamming_weight(n: int) -> int:
|
||||
def hamming_weight(n):
|
||||
"""Compute hamming weight of 8-bit int"""
|
||||
n = (n & 0x55) + ((n & 0xAA) >> 1)
|
||||
n = (n & 0x33) + ((n & 0xCC) >> 2)
|
||||
@ -22,7 +18,9 @@ def hamming_weight(n: int) -> int:
|
||||
class Video:
|
||||
"""Apple II screen memory map encoding a bitmapped frame."""
|
||||
|
||||
def __init__(self, screen_page: int = 0,
|
||||
CLOCK_SPEED = 1024 * 1024
|
||||
|
||||
def __init__(self, frame_rate: int = 15, screen_page: int = 0,
|
||||
opcode_scheduler: scheduler.OpcodeScheduler = None):
|
||||
self.screen_page = screen_page
|
||||
|
||||
@ -31,18 +29,29 @@ class Video:
|
||||
|
||||
self.memory_map = memory_map.MemoryMap(screen_page, self.screen)
|
||||
|
||||
self.cycle_counter = opcodes.CycleCounter()
|
||||
self.state = opcodes.State(self.cycle_counter, self.memory_map)
|
||||
|
||||
self.scheduler = (
|
||||
opcode_scheduler or scheduler.HeuristicPageFirstScheduler())
|
||||
|
||||
def update(self, frame: screen.Bitmap,
|
||||
cycle_budget: int, fullness: float) -> Iterator[int]:
|
||||
self.cycle_counter = opcodes.CycleCounter()
|
||||
|
||||
self.state = opcodes.State(self.cycle_counter, self.memory_map)
|
||||
|
||||
self.frame_rate = frame_rate
|
||||
self.stream_pos = 0
|
||||
if self.frame_rate:
|
||||
self.cycles_per_frame = self.CLOCK_SPEED // self.frame_rate
|
||||
else:
|
||||
self.cycles_per_frame = None
|
||||
|
||||
self._last_op = opcodes.Nop()
|
||||
|
||||
def encode_frame(self, frame: screen.Bitmap) -> Iterator[opcodes.Opcode]:
|
||||
"""Update to match content of frame within provided budget.
|
||||
|
||||
Emits encoded byte stream for rendering the image.
|
||||
|
||||
XXX update
|
||||
|
||||
The byte stream consists of offsets against a selected page (e.g. $20xx)
|
||||
at which to write a selected content byte. Those selections are
|
||||
controlled by special opcodes emitted to the stream
|
||||
@ -53,128 +62,148 @@ class Video:
|
||||
TICK - tick the speaker
|
||||
DONE - terminate the video decoding
|
||||
|
||||
In order to "make room" for these opcodes we make use of the fact that
|
||||
each page has 2 sets of 8-byte "screen holes", at page offsets
|
||||
0x78-0x7f and 0xf8-0xff. Currently we only use the latter range as
|
||||
this allows for efficient matching in the critical path of the decoder.
|
||||
|
||||
We group by offsets from page boundary (cf some other more
|
||||
optimal starting point) because STA (..),y has 1 extra cycle if
|
||||
crossing the page boundary. Though maybe this would be worthwhile if
|
||||
it optimizes the bytestream.
|
||||
"""
|
||||
|
||||
self.cycle_counter.reset()
|
||||
# Target screen memory map for new frame
|
||||
target = frame.pack()
|
||||
|
||||
# Estimate number of opcodes that will end up fitting in the cycle
|
||||
# budget.
|
||||
byte_cycles = opcodes.Store(0).cycles
|
||||
est_opcodes = int(cycle_budget / fullness / byte_cycles)
|
||||
|
||||
# Sort by highest xor weight and take the estimated number of change
|
||||
# operations
|
||||
# TODO: changes should be a class
|
||||
changes = list(
|
||||
sorted(self.index_changes(self.screen, target), reverse=True)
|
||||
)[:est_opcodes]
|
||||
changes = sorted(list(self._index_changes(self.screen, target)),
|
||||
reverse=True)
|
||||
|
||||
for op in self.scheduler.schedule(changes):
|
||||
yield from self.state.emit(op)
|
||||
yield from self.scheduler.schedule(changes)
|
||||
|
||||
def index_changes(self, source: screen.Bytemap,
|
||||
target: screen.Bytemap) -> Set[
|
||||
Tuple[int, int, int, int, int]]:
|
||||
"""Transform encoded screen to sequence of change tuples.
|
||||
|
||||
Change tuple is (xor_weight, page, offset, content)
|
||||
"""
|
||||
|
||||
changes = set()
|
||||
# TODO: don't use 256 bytes if XMAX is smaller, or we may compute RLE
|
||||
# over the full page!
|
||||
memmap = defaultdict(lambda: [(0, 0, 0)] * 256)
|
||||
|
||||
it = np.nditer(target.bytemap, flags=['multi_index'])
|
||||
while not it.finished:
|
||||
y, x_byte = it.multi_index
|
||||
|
||||
page, offset = self.memory_map.to_page_offset(x_byte, y)
|
||||
|
||||
src_content = source.bytemap[y][x_byte]
|
||||
target_content = np.asscalar(it[0])
|
||||
|
||||
bits_different = hamming_weight(src_content ^ target_content)
|
||||
|
||||
memmap[page][offset] = (bits_different, src_content, target_content)
|
||||
it.iternext()
|
||||
@functools.lru_cache()
|
||||
def _rle_cycles(self, run_length):
|
||||
return opcodes.RLE(0, run_length).cycles
|
||||
|
||||
def _index_page(self, bits_different, target_content):
|
||||
byte_cycles = opcodes.Store(0).cycles
|
||||
|
||||
for page, offsets in memmap.items():
|
||||
cur_content = None
|
||||
run_length = 0
|
||||
maybe_run = []
|
||||
for offset, data in enumerate(offsets):
|
||||
bits_different, src_content, target_content = data
|
||||
cur_content = None
|
||||
run_length = 0
|
||||
run = []
|
||||
|
||||
# TODO: allowing odd bit errors introduces colour error
|
||||
if maybe_run and hamming_weight(
|
||||
cur_content ^ target_content) > 2:
|
||||
# End of run
|
||||
# Number of changes in run for which >0 bits differ
|
||||
num_changes_in_run = 0
|
||||
|
||||
# Decide if it's worth emitting as a run vs single stores
|
||||
# Total weight of differences accumulated in run
|
||||
total_xor_in_run = 0
|
||||
|
||||
# Number of changes in run for which >0 bits differ
|
||||
num_changes = len([c for c in maybe_run if c[0]])
|
||||
run_cost = opcodes.RLE(0, run_length).cycles
|
||||
single_cost = byte_cycles * num_changes
|
||||
# print("Run of %d cheaper than %d singles" % (
|
||||
# run_length, num_changes))
|
||||
def end_run():
|
||||
# Decide if it's worth emitting as a run vs single stores
|
||||
run_cost = self._rle_cycles(run_length)
|
||||
single_cost = byte_cycles * num_changes_in_run
|
||||
# print("Run of %d cheaper than %d singles" % (
|
||||
# run_length, num_changes_in_run))
|
||||
|
||||
# TODO: don't allow too much error to accumulate
|
||||
if run_cost < single_cost:
|
||||
start_offset = run[0][1]
|
||||
|
||||
if run_cost < single_cost:
|
||||
# Compute median bit value over run
|
||||
median_bits = np.median(
|
||||
np.vstack(
|
||||
np.unpackbits(
|
||||
np.array(r[3], dtype=np.uint8)
|
||||
)
|
||||
for r in maybe_run
|
||||
), axis=0
|
||||
) > 0.5
|
||||
# print("Found run of %d * %2x at %2x" % (
|
||||
# run_length, cur_content, offset - run_length)
|
||||
# )
|
||||
# print(run)
|
||||
yield (
|
||||
total_xor_in_run, start_offset, cur_content, run_length)
|
||||
else:
|
||||
for ch in run:
|
||||
if ch[0]:
|
||||
yield ch
|
||||
|
||||
typical_content = np.asscalar(np.packbits(median_bits))
|
||||
for offset in range(256):
|
||||
bd = bits_different[offset]
|
||||
tc = target_content[offset]
|
||||
if run and cur_content != tc:
|
||||
# End of run
|
||||
|
||||
total_xor = sum(ch[0] for ch in maybe_run)
|
||||
start_offset = maybe_run[0][2]
|
||||
yield from end_run()
|
||||
|
||||
change = (total_xor, page, start_offset,
|
||||
typical_content, run_length)
|
||||
# print("Found run of %d * %2x at %2x:%2x" % (
|
||||
# run_length, cur_content, page, offset - run_length)
|
||||
# )
|
||||
# print(maybe_run)
|
||||
# print("change =", change)
|
||||
changes.add(change)
|
||||
else:
|
||||
changes.update(ch for ch in maybe_run if ch[0])
|
||||
maybe_run = []
|
||||
run_length = 0
|
||||
cur_content = target_content
|
||||
run = []
|
||||
run_length = 0
|
||||
num_changes_in_run = 0
|
||||
total_xor_in_run = 0
|
||||
cur_content = tc
|
||||
|
||||
if cur_content is None:
|
||||
cur_content = target_content
|
||||
if cur_content is None:
|
||||
cur_content = tc
|
||||
|
||||
run_length += 1
|
||||
maybe_run.append(
|
||||
(bits_different, page, offset, target_content, 1))
|
||||
run_length += 1
|
||||
run.append((bd, offset, tc, 1))
|
||||
if bd:
|
||||
num_changes_in_run += 1
|
||||
total_xor_in_run += bd
|
||||
|
||||
return changes
|
||||
if run:
|
||||
# End of run
|
||||
yield from end_run()
|
||||
|
||||
def _index_changes(
|
||||
self,
|
||||
source: screen.Bytemap,
|
||||
target: screen.Bytemap) -> Iterator[Tuple[int, int, int, int, int]]:
|
||||
"""Transform encoded screen to sequence of change tuples.
|
||||
|
||||
Change tuple is (xor_weight, page, offset, content, run_length)
|
||||
"""
|
||||
|
||||
# TODO: work with memory maps directly?
|
||||
source_memmap = memory_map.MemoryMap.to_memory_map(source.bytemap)
|
||||
target_memmap = memory_map.MemoryMap.to_memory_map(target.bytemap)
|
||||
|
||||
# TODO: don't use 256 bytes if XMAX is smaller, or we may compute RLE
|
||||
# (with bit errors) over the full page!
|
||||
diff_weights = hamming_weight(source_memmap ^ target_memmap)
|
||||
|
||||
for page in range(32):
|
||||
for change in self._index_page(
|
||||
diff_weights[page], target_memmap[page]):
|
||||
total_xor_in_run, start_offset, target_content, run_length = \
|
||||
change
|
||||
|
||||
# TODO: handle screen page
|
||||
yield (
|
||||
total_xor_in_run, page + 32, start_offset,
|
||||
target_content, run_length
|
||||
)
|
||||
|
||||
def _emit_bytes(self, _op):
|
||||
# print("%04X:" % self.stream_pos)
|
||||
for b in self.state.emit(self._last_op, _op):
|
||||
yield b
|
||||
self.stream_pos += 1
|
||||
self._last_op = _op
|
||||
|
||||
def emit_stream(self, ops: Iterable[opcodes.Opcode]) -> Iterator[int]:
|
||||
self.cycle_counter.reset()
|
||||
for op in ops:
|
||||
# Keep track of where we are in TCP client socket buffer
|
||||
socket_pos = self.stream_pos % 2048
|
||||
if socket_pos >= 2045:
|
||||
# May be about to emit a 3-byte opcode, pad out to last byte
|
||||
# in frame
|
||||
nops = 2047 - socket_pos
|
||||
# print("At position %04x, padding with %d nops" % (
|
||||
# socket_pos, nops))
|
||||
for _ in range(nops):
|
||||
yield from self._emit_bytes(opcodes.Nop())
|
||||
yield from self._emit_bytes(opcodes.Ack())
|
||||
# Ack falls through to nop
|
||||
self._last_op = opcodes.Nop()
|
||||
yield from self._emit_bytes(op)
|
||||
|
||||
if self.cycles_per_frame and (
|
||||
self.cycle_counter.cycles > self.cycles_per_frame):
|
||||
print("Out of cycle budget")
|
||||
return
|
||||
# TODO: pad to cycles_per_frame with NOPs
|
||||
|
||||
def done(self) -> Iterator[int]:
|
||||
"""Terminate opcode stream."""
|
||||
|
||||
yield from self.state.emit(opcodes.Terminate())
|
||||
yield from self._emit_bytes(opcodes.Terminate())
|
||||
|
168
video_test.py
Normal file
168
video_test.py
Normal file
@ -0,0 +1,168 @@
|
||||
import unittest
|
||||
|
||||
import numpy as np
|
||||
|
||||
import opcodes
|
||||
import screen
|
||||
import video
|
||||
|
||||
|
||||
class TestHammingWeight(unittest.TestCase):
|
||||
def testHammingWeight(self):
|
||||
self.assertEqual(0, video.hamming_weight(0))
|
||||
self.assertEqual(1, video.hamming_weight(0b1))
|
||||
self.assertEqual(1, video.hamming_weight(0b100))
|
||||
self.assertEqual(3, video.hamming_weight(0b101100))
|
||||
self.assertEqual(7, video.hamming_weight(0b11111110))
|
||||
|
||||
|
||||
class TestVideo(unittest.TestCase):
|
||||
def testEncodeEmptyFrame(self):
|
||||
f = screen.HGR140Bitmap()
|
||||
v = video.Video()
|
||||
|
||||
self.assertEqual([], list(v.encode_frame(f)))
|
||||
|
||||
def testEncodeOnePixel(self):
|
||||
f = screen.HGR140Bitmap()
|
||||
a = np.zeros((f.YMAX, f.XMAX), dtype=bool)
|
||||
a[0, 0] = True
|
||||
|
||||
f = screen.HGR140Bitmap(a)
|
||||
|
||||
v = video.Video()
|
||||
|
||||
want = [
|
||||
opcodes.SetPage(0x20),
|
||||
opcodes.SetContent(0x03),
|
||||
opcodes.Store(0x00),
|
||||
]
|
||||
got = list(v.encode_frame(f))
|
||||
self.assertListEqual(want, got)
|
||||
|
||||
|
||||
class TestIndexPage(unittest.TestCase):
|
||||
def testFullPageSameValue(self):
|
||||
"""Constant data with nonzero weights should return single run"""
|
||||
v = video.Video()
|
||||
|
||||
data = np.ones((256,), dtype=np.uint8)
|
||||
|
||||
# total_xor_difference, start_offset, content, run_length
|
||||
want = [(256, 0, 1, 256)]
|
||||
got = list(v._index_page(video.hamming_weight(data), data))
|
||||
|
||||
self.assertEqual(want, got)
|
||||
|
||||
def testFullPageZeroValue(self):
|
||||
"""Zero data with 0 weights should return nothing"""
|
||||
v = video.Video()
|
||||
|
||||
data = np.zeros((256,), dtype=np.uint8)
|
||||
|
||||
# total_xor_difference, start_offset, content, run_length
|
||||
want = []
|
||||
got = list(v._index_page(video.hamming_weight(data), data))
|
||||
|
||||
self.assertEqual(want, got)
|
||||
|
||||
def testFullPageZeroValueWithDiff(self):
|
||||
"""Zero data with nonzero weights should return single run"""
|
||||
v = video.Video()
|
||||
|
||||
old_data = np.ones((256,), dtype=np.uint8)
|
||||
|
||||
data = np.zeros((256,), dtype=np.uint8)
|
||||
|
||||
# total_xor_difference, start_offset, content, run_length
|
||||
want = [(256, 0, 0, 256)]
|
||||
got = list(v._index_page(video.hamming_weight(old_data), data))
|
||||
|
||||
self.assertEqual(want, got)
|
||||
|
||||
def testSingleRun(self):
|
||||
"""Single run of nonzero data"""
|
||||
v = video.Video()
|
||||
|
||||
data = np.zeros((256,), dtype=np.uint8)
|
||||
for i in range(5):
|
||||
data[i] = 1
|
||||
|
||||
# total_xor_difference, start_offset, content, run_length
|
||||
want = [(5, 0, 1, 5)]
|
||||
got = list(v._index_page(video.hamming_weight(data), data))
|
||||
|
||||
self.assertEqual(want, got)
|
||||
|
||||
def testTwoRuns(self):
|
||||
"""Two consecutive runs of nonzero data"""
|
||||
v = video.Video()
|
||||
|
||||
data = np.zeros((256,), dtype=np.uint8)
|
||||
for i in range(5):
|
||||
data[i] = 1
|
||||
for i in range(5, 10):
|
||||
data[i] = 2
|
||||
|
||||
# total_xor_difference, start_offset, content, run_length
|
||||
want = [(5, 0, 1, 5), (5, 5, 2, 5)]
|
||||
got = list(v._index_page(video.hamming_weight(data), data))
|
||||
|
||||
self.assertEqual(want, got)
|
||||
|
||||
def testShortRun(self):
|
||||
"""Run that is too short to encode as RLE opcode"""
|
||||
v = video.Video()
|
||||
|
||||
data = np.zeros((256,), dtype=np.uint8)
|
||||
for i in range(2):
|
||||
data[i] = 1
|
||||
|
||||
# total_xor_difference, start_offset, content, run_length
|
||||
want = [(1, 0, 1, 1), (1, 1, 1, 1)]
|
||||
got = list(v._index_page(video.hamming_weight(data), data))
|
||||
|
||||
self.assertEqual(want, got)
|
||||
|
||||
|
||||
class TestEncodeDecode(unittest.TestCase):
|
||||
def testEncodeDecode(self):
|
||||
for _ in range(10):
|
||||
s = video.Video(frame_rate=1)
|
||||
screen_cls = screen.HGR140Bitmap
|
||||
|
||||
im = np.random.randint(
|
||||
0, 2, (screen_cls.YMAX, screen_cls.XMAX), dtype=np.bool)
|
||||
f = screen_cls(im)
|
||||
|
||||
_ = bytes(s.emit_stream(s.encode_frame(f)))
|
||||
|
||||
# assert that the screen decodes to the original bitmap
|
||||
bm = screen_cls.from_bytemap(s.screen).bitmap
|
||||
|
||||
self.assertTrue(np.array_equal(bm, im))
|
||||
|
||||
def testEncodeDecodeTwoFrames(self):
|
||||
|
||||
for _ in range(10):
|
||||
s = video.Video(frame_rate=1)
|
||||
screen_cls = screen.HGR140Bitmap
|
||||
|
||||
im = np.random.randint(
|
||||
0, 2, (screen_cls.YMAX, screen_cls.XMAX), dtype=np.bool)
|
||||
f = screen_cls(im)
|
||||
_ = bytes(s.emit_stream(s.encode_frame(f)))
|
||||
|
||||
im2 = np.random.randint(
|
||||
0, 2, (screen_cls.YMAX, screen_cls.XMAX), dtype=np.bool)
|
||||
f = screen_cls(im2)
|
||||
_ = bytes(s.emit_stream(s.encode_frame(f)))
|
||||
|
||||
# assert that the screen decodes to the original bitmap
|
||||
bm = screen_cls.from_bytemap(s.screen).bitmap
|
||||
|
||||
self.assertTrue(np.array_equal(bm, im2))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user