Add some docstrings

Clean up naming in edit_distance

In video encoder, when we emit additional offsets as part of an opcode,
reinsert back into the priority heapq if the new edit distance is
nonzero, in case we get the chance to fix it up later in the frame.

Also make sure to zero out the diff_weights and content_deltas
so we don't consider the offset again as a side-effect of some other
opcode.

Instead of prioritizing side-effect offsets by their previous update
priority, prioritize by those with the lowest (error - edit) delta i.e.
not introducing too much error relative to their edit distance.
This commit is contained in:
kris 2019-03-21 22:56:45 +00:00
parent fe10f98128
commit eebbccf711
2 changed files with 114 additions and 24 deletions

View File

@ -1,4 +1,18 @@
"""Computes visual differences between screen image data."""
"""Computes visual differences between screen image data.
This is the core of the video encoding, for three reasons:
- The edit distance between old and new frames is used to prioritize which
screen bytes to send
- When deciding which other offset bytes to send along with a chosen screen
byte, we minimize the error introduced by sending this (probably non-optimal)
byte instead of the actual target screen byte. This needs to account for the
colour artifacts introduced by this byte as well as weighting perceived
errors introduced (e.g. long runs of colour)
- The byte_screen_error_distance function is on the critical path of the encoding.
"""
import functools
@ -21,6 +35,11 @@ def byte_to_nominal_colour_string(b: int, is_odd_offset: bool) -> str:
There are also even weirder colour artifacts that happen when
neighbouring bytes have mismatched colour palettes, which also cross the
odd/even boundary. But these may not be worth worrying about.
:param b: byte to encode
:param is_odd_offset: whether byte is at an odd screen column
:return: string encoding nominal colour of pixels in the byte, with "0"
or "1" for the "hanging" bit that spans the neighbouring byte.
"""
pixels = []
@ -80,6 +99,10 @@ def byte_to_colour_string_with_white_coalescing(
It also ignores other colour fringing e.g. from NTSC artifacts.
TODO: this needs more work.
:param b:
:param is_odd_offset:
:return:
"""
pixels = []
@ -156,6 +179,14 @@ delete_costs = np.ones(128, dtype=np.float64) * 1000
def _edit_weight(a: int, b: int, is_odd_offset: bool, error: bool):
"""
:param a:
:param b:
:param is_odd_offset:
:param error:
:return:
"""
a_pixels = byte_to_colour_string_with_white_coalescing(a, is_odd_offset)
b_pixels = byte_to_colour_string_with_white_coalescing(b, is_odd_offset)
@ -168,7 +199,13 @@ def _edit_weight(a: int, b: int, is_odd_offset: bool, error: bool):
return np.int64(dist)
def edit_weight_matrixes(error: bool) -> np.array:
@functools.lru_cache(None)
def _edit_weight_matrices(error: bool) -> np.array:
"""
:param error:
:return:
"""
ewm = np.zeros(shape=(256, 256, 2), dtype=np.int64)
for a in range(256):
for b in range(256):
@ -179,14 +216,18 @@ def edit_weight_matrixes(error: bool) -> np.array:
return ewm
_ewm = edit_weight_matrixes(False)
_error_ewm = edit_weight_matrixes(True)
@functools.lru_cache(None)
def edit_weight(a: int, b: int, is_odd_offset: bool, error: bool):
e = _error_ewm if error else _ewm
return e[a, b, int(is_odd_offset)]
"""
:param a: first content value
:param b: second content value
:param is_odd_offset: whether this content byte is at an odd screen
byte offset
:param error: whether to compute error distance or edit distance
:return: the corresponding distance value
"""
return _edit_weight_matrices(error)[a, b, int(is_odd_offset)]
_even_ewm = {}
@ -203,18 +244,31 @@ for a in range(256):
@functools.lru_cache(None)
def _content_a_array(content: int, shape) -> np.array:
return (np.ones(shape, dtype=np.uint16) * content) << 8
def _constant_array(content: int, shape) -> np.array:
"""
:param content:
:param shape:
:return:
"""
return np.ones(shape, dtype=np.uint16) * content
def content_edit_weight(content: int, b: np.array) -> np.array:
def byte_screen_error_distance(content: int, b: np.array) -> np.array:
"""
:param content: byte for which to compute error distance
:param b: np.array of size (32, 256) representing existing screen memory.
:return: np.array of size (32, 256) representing error distance from
content byte to each byte of b
"""
assert b.shape == (32, 256), b.shape
# Extract even and off column offsets (128,)
even_b = b[:, ::2]
odd_b = b[:, 1::2]
a = _content_a_array(content, even_b.shape)
a = _constant_array(content << 8, even_b.shape)
even = a + even_b
odd = a + odd_b
@ -229,7 +283,13 @@ def content_edit_weight(content: int, b: np.array) -> np.array:
return res
def array_edit_weight(a: np.array, b: np.array) -> np.array:
def screen_edit_distance(a: np.array, b: np.array) -> np.array:
"""
:param a:
:param b:
:return:
"""
# Extract even and off column offsets (32, 128)
even_a = a[:, ::2]
odd_a = a[:, 1::2]

View File

@ -23,7 +23,10 @@ class Video:
CLOCK_SPEED = 1024 * 1024 # type: int
def __init__(self, filename: str):
def __init__(
self,
filename: str,
):
self.filename = filename # type: str
self._reader = skvideo.io.FFmpegReader(filename)
@ -185,9 +188,15 @@ class Video:
# Clear priority for the offset we're emitting
self.update_priority[page, offset] = 0
self.memory_map.page_offset[page, offset] = content
diff_weights[page, offset] = 0
# Make sure we don't emit this offset as a side-effect of some
# other offset later.
for cd in content_deltas.values():
cd[page, offset] = 0
# Need to find 3 more offsets to fill this opcode
for o, p in self._compute_error(
for o in self._compute_error(
page,
content,
target,
@ -195,11 +204,24 @@ class Video:
content_deltas
):
offsets.append(o)
# Compute new edit distance between new content and target
# byte, so we can reinsert with this value
p = edit_distance.edit_weight(
content, target.page_offset[page, o], o % 2 == 1,
error=False)
# Update priority for the offset we're emitting
self.update_priority[page, o] = 0 # XXX p
self.update_priority[page, o] = p # 0
self.memory_map.page_offset[page, o] = content
# heapq.heappush(priorities, (-p, random.random(), page,
# offset))
if p:
# This content byte introduced an error, so put back on the
# heap in case we can get back to fixing it exactly
# during this frame. Otherwise we'll get to it later.
heapq.heappush(
priorities, (-p, random.random(), page, offset))
# Pad to 4 if we didn't find enough
for _ in range(len(offsets), 4):
@ -217,7 +239,7 @@ class Video:
source: screen.MemoryMap,
target: screen.MemoryMap
):
return edit_distance.array_edit_weight(
return edit_distance.screen_edit_distance(
source.page_offset, target.page_offset)
def _heapify_priorities(self) -> List:
@ -244,13 +266,14 @@ class Video:
"""
This function is the critical path for the video encoding.
"""
return edit_distance.content_edit_weight(content, target) - old
return edit_distance.byte_screen_error_distance(content, target) - old
_OFFSETS = np.arange(256)
def _compute_error(self, page, content, target, old_error, content_deltas):
offsets = []
# TODO: move this up into parent
delta_screen = content_deltas.get(content)
if delta_screen is None:
delta_screen = self._compute_delta(
@ -260,17 +283,24 @@ class Video:
delta_page = delta_screen[page]
cond = delta_page < 0
candidate_offsets = self._OFFSETS[cond]
priorities = self.update_priority[page][cond]
priorities = delta_page[cond]
l = [
(-priorities[i], random.random(), candidate_offsets[i])
(priorities[i], random.random(), candidate_offsets[i])
for i in range(len(candidate_offsets))
]
heapq.heapify(l)
while l:
p, _, o = heapq.heappop(l)
offsets.append((o, -p))
_, _, o = heapq.heappop(l)
offsets.append(o)
# Make sure we don't end up considering this (page, offset) again
# until the next image frame. Even if a better match comes along,
# it's probably better to fix up some other byte.
for cd in content_deltas.values():
cd[page, o] = 0
if len(offsets) == 3:
break