ii-vision/transcoder/video.py

302 lines
11 KiB
Python

"""Encode a sequence of images as an optimized stream of screen changes."""
import heapq
import random
from typing import List, Iterator, Tuple
import numpy as np
import opcodes
import screen
from frame_grabber import FrameGrabber
from palette import Palette
from video_mode import VideoMode
class Video:
"""Encodes sequence of images into prioritized screen byte changes."""
CLOCK_SPEED = 1024 * 1024 # type: int
def __init__(
self,
frame_grabber: FrameGrabber,
ticks_per_second: float,
mode: VideoMode = VideoMode.HGR,
palette: Palette = Palette.NTSC
):
self.mode = mode # type: VideoMode
self.frame_grabber = frame_grabber # type: FrameGrabber
self.ticks_per_second = float(ticks_per_second) # type: float
self.ticks_per_frame = (
self.ticks_per_second / frame_grabber.input_frame_rate
) # type: float
self.frame_number = 0 # type: int
self.palette = palette # type: Palette
# Initialize empty screen
self.memory_map = screen.MemoryMap(
screen_page=1) # type: screen.MemoryMap
if self.mode == mode.DHGR:
self.aux_memory_map = screen.MemoryMap(
screen_page=1) # type: screen.MemoryMap
self.pixelmap = screen.DHGRBitmap(
palette=palette,
main_memory=self.memory_map,
aux_memory=self.aux_memory_map
)
else:
self.pixelmap = screen.HGRBitmap(
palette=palette,
main_memory=self.memory_map,
)
# Accumulates pending edit weights across frames
self.update_priority = np.zeros((32, 256), dtype=np.int32)
if self.mode == mode.DHGR:
self.aux_update_priority = np.zeros((32, 256), dtype=np.int32)
# Indicates whether we have run out of work for the main/aux banks.
# Key is True for aux bank and False for main bank
self.out_of_work = {True: False, False: False}
def tick(self, ticks: int) -> bool:
"""Keep track of when it is time for a new image frame."""
if ticks >= (self.ticks_per_frame * self.frame_number):
self.frame_number += 1
return True
return False
def encode_frame(
self,
target: screen.Bitmap,
is_aux: bool,
) -> Iterator[opcodes.Opcode]:
"""Converge towards target frame in priority order of edit distance."""
if is_aux:
memory_map = self.aux_memory_map
update_priority = self.aux_update_priority
else:
memory_map = self.memory_map
update_priority = self.update_priority
# Make sure nothing is leaking into screen holes
assert np.count_nonzero(
memory_map.page_offset[screen.SCREEN_HOLES]) == 0
print("Similarity %f" % (update_priority.mean()))
yield from self._index_changes(
memory_map, target, update_priority, is_aux)
def _index_changes(
self,
source: screen.MemoryMap,
target_pixelmap: screen.Bitmap,
update_priority: np.array,
is_aux: bool
) -> Iterator[Tuple[int, int, List[int]]]:
"""Transform encoded screen to sequence of change tuples."""
if self.mode == VideoMode.DHGR and is_aux:
target = target_pixelmap.aux_memory
else:
target = target_pixelmap.main_memory
diff_weights = target_pixelmap.diff_weights(self.pixelmap, is_aux)
# Don't bother storing into screen holes
diff_weights[screen.SCREEN_HOLES] = 0
# Clear any update priority entries that have resolved themselves
# with new frame
update_priority[diff_weights == 0] = 0
update_priority += diff_weights
assert np.all(update_priority >= 0)
priorities = self._heapify_priorities(update_priority)
while priorities:
pri, _, page, offset = heapq.heappop(priorities)
assert not screen.SCREEN_HOLES[page, offset], (
"Attempted to store into screen hole at (%d, %d)" % (
page, offset))
# Check whether we've already cleared this diff while processing
# an earlier opcode
if update_priority[page, offset] == 0:
continue
offsets = [offset]
content = target.page_offset[page, offset]
if self.mode == VideoMode.DHGR:
# DHGR palette bit not expected to be set
assert content < 0x80
# Clear priority for the offset we're emitting
update_priority[page, offset] = 0
diff_weights[page, offset] = 0
# Update memory maps
self.pixelmap.apply(page, offset, is_aux, content)
# Need to find 3 more offsets to fill this opcode
for err, o in self._compute_error(
page,
content,
target_pixelmap,
diff_weights,
is_aux
):
assert o != offset
assert not screen.SCREEN_HOLES[page, o], (
"Attempted to store into screen hole at (%d, %d)" % (
page, o))
if update_priority[page, o] == 0:
# Someone already resolved this diff.
continue
byte_offset = target_pixelmap.byte_offset(o, is_aux)
old_packed = target_pixelmap.packed[page, o // 2]
p = target_pixelmap.byte_pair_difference(
byte_offset, old_packed, content)
# Update priority for the offset we're emitting
update_priority[page, o] = p
self.pixelmap.apply(page, o, is_aux, content)
if p:
# This content byte introduced an error, so put back on the
# heap in case we can get back to fixing it exactly
# during this frame. Otherwise, we'll get to it later.
heapq.heappush(
priorities, (-p, random.getrandbits(8), page, o))
offsets.append(o)
if len(offsets) == 3:
break
# Pad to 4 if we didn't find enough
for _ in range(len(offsets), 4):
offsets.append(offsets[0])
yield page + 32, content, offsets
self.out_of_work[is_aux] = True
# These debugging assertions validate that when we are out of work,
# our source and target representations should be identical.
#
# They only work correctly for palettes that do not have identical
# colours (e.g. IIGS but not NTSC which has two identical greys).
#
# The problem is that if we have substituted one grey for the other
# there may be no diff if they are part of an extended run of greys.
#
# The only difference is at the end of the run where these produce
# different artifact colours, but this may only be visible in the
# other bank.
#
# It may take several iterations of main/aux before we will notice and
# correct all of these differences. That means we don't have a
# deterministic point in time when we can assert that all diffs should
# have been resolved.
# TODO: add flag to enable debug assertions
# if not np.array_equal(source.page_offset, target.page_offset):
# diffs = np.nonzero(source.page_offset != target.page_offset)
# for i in range(len(diffs[0])):
# diff_p = diffs[0][i]
# diff_o = diffs[1][i]
#
# # For HGR, 0x00 or 0x7f may be visually equivalent to the same
# # bytes with high bit set (depending on neighbours), so skip
# # them
# if (source.page_offset[diff_p, diff_o] & 0x7f) == 0 and \
# (target.page_offset[diff_p, diff_o] & 0x7f) == 0:
# continue
#
# if (source.page_offset[diff_p, diff_o] & 0x7f) == 0x7f and \
# (target.page_offset[diff_p, diff_o] & 0x7f) == 0x7f:
# continue
#
# print("Diff at (%d, %d): %d != %d" % (
# diff_p, diff_o, source.page_offset[diff_p, diff_o],
# target.page_offset[diff_p, diff_o]
# ))
# assert False
#
# # If we've finished both main and aux pages, there should be no residual
# # diffs in packed representation
# all_done = self.out_of_work[True] and self.out_of_work[False]
# if all_done and not np.array_equal(self.pixelmap.packed,
# target_pixelmap.packed):
# diffs = np.nonzero(
# self.pixelmap.packed != target_pixelmap.packed)
# print("is_aux: %s" % is_aux)
# for i in range(len(diffs[0])):
# diff_p = diffs[0][i]
# diff_o = diffs[1][i]
# print("(%d, %d): got %d want %d" % (
# diff_p, diff_o, self.pixelmap.packed[diff_p, diff_o],
# target_pixelmap.packed[diff_p, diff_o]))
# assert False
# If we run out of things to do, pad forever
content = target.page_offset[0, 0]
while True:
yield 32, content, [0, 0, 0, 0]
@staticmethod
def _heapify_priorities(update_priority: np.array) -> List:
"""Build priority queue of (page, offset) ordered by update priority."""
# Use numpy vectorization to efficiently compute the list of
# (priority, random nonce, page, offset) tuples to be heapified.
pages, offsets = update_priority.nonzero()
priorities = [tuple(data) for data in np.stack((
-update_priority[pages, offsets],
# Don't use deterministic order for page, offset. Otherwise,
# we get the "venetian blind" effect when filling large blocks of
# colour.
np.random.randint(0, 2 ** 8, size=pages.shape[0]),
pages,
offsets)
).T.tolist()]
heapq.heapify(priorities)
return priorities
_OFFSETS = np.arange(256)
def _compute_error(
self, page, content, target_pixelmap, diff_weights, is_aux):
"""Build priority queue of other offsets at which to store content.
Ordered by offsets which are closest to the target content value.
"""
delta_page = target_pixelmap.compute_delta_page(
page, content, diff_weights[page, :], is_aux)
cond = delta_page < 0
candidate_offsets = self._OFFSETS[cond]
priorities = delta_page[cond]
# Don't use deterministic order for page, offset. Otherwise,
# we get the "venetian blind" effect when filling large blocks of
# colour.
deltas = [
(priorities[i], random.getrandbits(8), candidate_offsets[i])
for i in range(len(candidate_offsets))
]
heapq.heapify(deltas)
while deltas:
pri, _, offset = heapq.heappop(deltas)
assert pri < 0
assert 0 <= offset <= 255
yield -pri, offset