ii-vision/transcoder/video.py

379 lines
13 KiB
Python
Raw Normal View History

2019-03-21 16:42:47 +00:00
"""Encode a sequence of images as an optimized stream of screen changes."""
import functools
import heapq
import random
from typing import List, Iterator, Tuple
import numpy as np
import opcodes
import screen
from frame_grabber import FrameGrabber
from palette import Palette
from video_mode import VideoMode
class Video:
"""Apple II screen memory map encoding a bitmapped frame."""
CLOCK_SPEED = 1024 * 1024 # type: int
def __init__(
self,
frame_grabber: FrameGrabber,
ticks_per_second: int,
mode: VideoMode = VideoMode.HGR,
2019-06-19 21:10:15 +00:00
palette: Palette = Palette.NTSC
):
self.mode = mode # type: VideoMode
self.frame_grabber = frame_grabber # type: FrameGrabber
self.ticks_per_second = float(ticks_per_second) # type: float
self.ticks_per_frame = (
2019-06-19 21:10:15 +00:00
self.ticks_per_second / frame_grabber.input_frame_rate) # type: float
self.frame_number = 0 # type: int
self.palette = palette # type: Palette
# Initialize empty screen
self.memory_map = screen.MemoryMap(
screen_page=1) # type: screen.MemoryMap
if self.mode == mode.DHGR:
self.aux_memory_map = screen.MemoryMap(
screen_page=1) # type: screen.MemoryMap
self.pixelmap = screen.DHGRBitmap(
main_memory=self.memory_map,
aux_memory=self.aux_memory_map
)
# Accumulates pending edit weights across frames
self.update_priority = np.zeros((32, 256), dtype=np.int)
if self.mode == mode.DHGR:
self.aux_update_priority = np.zeros((32, 256), dtype=np.int)
2019-06-19 21:28:31 +00:00
def tick(self, ticks: int) -> bool:
if ticks >= (self.ticks_per_frame * self.frame_number):
self.frame_number += 1
return True
return False
def encode_frame(
self,
target: screen.MemoryMap,
is_aux: bool,
) -> Iterator[opcodes.Opcode]:
"""Update to match content of frame within provided budget."""
if is_aux:
memory_map = self.aux_memory_map
update_priority = self.aux_update_priority
else:
memory_map = self.memory_map
update_priority = self.update_priority
print("Similarity %f" % (update_priority.mean()))
yield from self._index_changes(
memory_map, target, update_priority, is_aux)
def _index_changes(
self,
source: screen.MemoryMap,
target: screen.MemoryMap,
update_priority: np.array,
is_aux: True
) -> Iterator[Tuple[int, int, List[int]]]:
"""Transform encoded screen to sequence of change tuples."""
if is_aux:
target_pixelmap = screen.DHGRBitmap(
main_memory=self.memory_map,
aux_memory=target
)
else:
target_pixelmap = screen.DHGRBitmap(
main_memory=target,
aux_memory=self.aux_memory_map
)
diff_weights = self._diff_weights(
self.pixelmap, target_pixelmap, is_aux
)
# Clear any update priority entries that have resolved themselves
# with new frame
update_priority[diff_weights == 0] = 0
update_priority += diff_weights
priorities = self._heapify_priorities(update_priority)
content_deltas = {}
while priorities:
pri, _, page, offset = heapq.heappop(priorities)
# Check whether we've already cleared this diff while processing
# an earlier opcode
if update_priority[page, offset] == 0:
continue
offsets = [offset]
content = target.page_offset[page, offset]
assert content < 0x80 # DHGR palette bit not expected to be set
# Clear priority for the offset we're emitting
update_priority[page, offset] = 0
diff_weights[page, offset] = 0
# Update memory maps
source.page_offset[page, offset] = content
self.pixelmap.apply(page, offset, is_aux, content)
# Make sure we don't emit this offset as a side-effect of some
# other offset later.
for cd in content_deltas.values():
cd[page, offset] = 0
# TODO: what if we add another content_deltas entry later?
# We might clobber it again
# Need to find 3 more offsets to fill this opcode
for err, o in self._compute_error(
page,
content,
target_pixelmap,
diff_weights,
content_deltas,
is_aux
):
assert o != offset
if update_priority[page, o] == 0:
# print("Skipping page=%d, offset=%d" % (page, o))
continue
# Make sure we don't end up considering this (page, offset)
# again until the next image frame. Even if a better match
# comes along, it's probably better to fix up some other byte.
# TODO: or should we recompute it with new error?
for cd in content_deltas.values():
cd[page, o] = 0
byte_offset = target_pixelmap.interleaved_byte_offset(o, is_aux)
old_packed = target_pixelmap.packed[page, o // 2]
p = self._byte_pair_difference(
target_pixelmap, byte_offset, old_packed, content)
# Update priority for the offset we're emitting
update_priority[page, o] = p # 0
source.page_offset[page, o] = content
self.pixelmap.apply(page, o, is_aux, content)
if p:
# This content byte introduced an error, so put back on the
# heap in case we can get back to fixing it exactly
# during this frame. Otherwise we'll get to it later.
heapq.heappush(
priorities, (-p, random.getrandbits(16), page, o))
offsets.append(o)
if len(offsets) == 3:
break
# Pad to 4 if we didn't find enough
for _ in range(len(offsets), 4):
offsets.append(offsets[0])
yield (page + 32, content, offsets)
# TODO: there is still a bug causing residual diffs when we have
# apparently run out of work to do
if not np.array_equal(source.page_offset, target.page_offset):
diffs = np.nonzero(source.page_offset != target.page_offset)
for i in range(len(diffs[0])):
diff_p = diffs[0][i]
diff_o = diffs[1][i]
print("Diff at (%d, %d): %d != %d" % (
diff_p, diff_o, source.page_offset[diff_p, diff_o],
target.page_offset[diff_p, diff_o]
))
# assert False
# If we run out of things to do, pad forever
content = target.page_offset[0, 0]
while True:
yield (32, content, [0, 0, 0, 0])
@staticmethod
def _heapify_priorities(update_priority: np.array) -> List:
pages, offsets = update_priority.nonzero()
priorities = [tuple(data) for data in np.stack((
-update_priority[pages, offsets],
# Don't use deterministic order for page, offset
np.random.randint(0, 2**8, size=pages.shape[0]),
pages,
offsets)
).T.tolist()]
heapq.heapify(priorities)
return priorities
def _diff_weights(
self,
source: screen.DHGRBitmap,
target: screen.DHGRBitmap,
is_aux: bool
):
diff = np.ndarray((32, 256), dtype=np.int)
if is_aux:
# Pixels influenced by byte offset 0
source_pixels0 = source.mask_and_shift_data(source.packed, 0)
target_pixels0 = target.mask_and_shift_data(target.packed, 0)
# Concatenate 8-bit source and target into 16-bit values
pair0 = (source_pixels0 << 8) + target_pixels0
dist0 = source.edit_distances(self.palette)[0][pair0].reshape(
pair0.shape)
# Pixels influenced by byte offset 2
source_pixels2 = source.mask_and_shift_data(source.packed, 2)
target_pixels2 = target.mask_and_shift_data(target.packed, 2)
# Concatenate 12-bit source and target into 24-bit values
pair2 = (source_pixels2 << 12) + target_pixels2
dist2 = source.edit_distances(self.palette)[2][pair2].reshape(
pair2.shape)
diff[:, 0::2] = dist0
diff[:, 1::2] = dist2
else:
# Pixels influenced by byte offset 1
source_pixels1 = source.mask_and_shift_data(source.packed, 1)
target_pixels1 = target.mask_and_shift_data(target.packed, 1)
pair1 = (source_pixels1 << 12) + target_pixels1
dist1 = source.edit_distances(self.palette)[1][pair1].reshape(
pair1.shape)
# Pixels influenced by byte offset 3
source_pixels3 = source.mask_and_shift_data(source.packed, 3)
target_pixels3 = target.mask_and_shift_data(target.packed, 3)
pair3 = (source_pixels3 << 8) + target_pixels3
dist3 = source.edit_distances(self.palette)[3][pair3].reshape(
pair3.shape)
diff[:, 0::2] = dist1
diff[:, 1::2] = dist3
return diff
@functools.lru_cache(None)
def _byte_pair_difference(
self,
target_pixelmap,
byte_offset,
old_packed,
content
):
old_pixels = target_pixelmap.mask_and_shift_data(
old_packed, byte_offset)
new_pixels = target_pixelmap.mask_and_shift_data(
target_pixelmap.masked_update(
byte_offset, old_packed, content), byte_offset)
if byte_offset == 0 or byte_offset == 3:
pair = (old_pixels << 8) + new_pixels
else:
pair = (old_pixels << 12) + new_pixels
p = target_pixelmap.edit_distances(self.palette)[byte_offset][pair]
return p
def _compute_delta(
self,
content: int,
target: screen.DHGRBitmap,
old,
is_aux: bool
):
diff = np.ndarray((32, 256), dtype=np.int)
# TODO: use error edit distance
if is_aux:
# Pixels influenced by byte offset 0
source_pixels0 = target.mask_and_shift_data(
target.masked_update(0, target.packed, content), 0)
target_pixels0 = target.mask_and_shift_data(target.packed, 0)
# Concatenate 8-bit source and target into 16-bit values
pair0 = (source_pixels0 << 8) + target_pixels0
dist0 = target.edit_distances(self.palette)[0][pair0].reshape(
pair0.shape)
# Pixels influenced by byte offset 2
source_pixels2 = target.mask_and_shift_data(
target.masked_update(2, target.packed, content), 2)
target_pixels2 = target.mask_and_shift_data(target.packed, 2)
# Concatenate 12-bit source and target into 24-bit values
pair2 = (source_pixels2 << 12) + target_pixels2
dist2 = target.edit_distances(self.palette)[2][pair2].reshape(
pair2.shape)
diff[:, 0::2] = dist0
diff[:, 1::2] = dist2
else:
# Pixels influenced by byte offset 1
source_pixels1 = target.mask_and_shift_data(
target.masked_update(1, target.packed, content), 1)
target_pixels1 = target.mask_and_shift_data(target.packed, 1)
pair1 = (source_pixels1 << 12) + target_pixels1
dist1 = target.edit_distances(self.palette)[1][pair1].reshape(
pair1.shape)
# Pixels influenced by byte offset 3
source_pixels3 = target.mask_and_shift_data(
target.masked_update(3, target.packed, content), 3)
target_pixels3 = target.mask_and_shift_data(target.packed, 3)
pair3 = (source_pixels3 << 8) + target_pixels3
dist3 = target.edit_distances(self.palette)[3][pair3].reshape(
pair3.shape)
diff[:, 0::2] = dist1
diff[:, 1::2] = dist3
# TODO: try different weightings
return (diff * 5) - old
_OFFSETS = np.arange(256)
def _compute_error(self, page, content, target_pixelmap, old_error,
content_deltas, is_aux):
# TODO: move this up into parent
delta_screen = content_deltas.get(content)
if delta_screen is None:
delta_screen = self._compute_delta(
content, target_pixelmap, old_error, is_aux)
content_deltas[content] = delta_screen
delta_page = delta_screen[page]
cond = delta_page < 0
candidate_offsets = self._OFFSETS[cond]
priorities = delta_page[cond]
deltas = [
(priorities[i], random.getrandbits(16), candidate_offsets[i])
for i in range(len(candidate_offsets))
]
heapq.heapify(deltas)
while deltas:
pri, _, o = heapq.heappop(deltas)
assert pri < 0
assert o < 255
yield -pri, o