mirror of
https://github.com/KrisKennaway/ii-vision.git
synced 2024-12-22 12:29:31 +00:00
371 lines
12 KiB
Python
371 lines
12 KiB
Python
"""Encode a sequence of images as an optimized stream of screen changes."""
|
|
|
|
import enum
|
|
import heapq
|
|
import os
|
|
import queue
|
|
import random
|
|
import subprocess
|
|
import threading
|
|
from typing import List, Iterator, Tuple
|
|
|
|
# import hitherdither
|
|
import numpy as np
|
|
import skvideo.io
|
|
from PIL import Image
|
|
|
|
import edit_distance
|
|
import opcodes
|
|
import screen
|
|
|
|
|
|
class Mode(enum.Enum):
|
|
HGR = 0
|
|
DHGR = 1
|
|
|
|
|
|
class Video:
|
|
"""Apple II screen memory map encoding a bitmapped frame."""
|
|
|
|
CLOCK_SPEED = 1024 * 1024 # type: int
|
|
|
|
def __init__(
|
|
self,
|
|
filename: str,
|
|
mode: Mode = Mode.HGR
|
|
):
|
|
self.filename = filename # type: str
|
|
self.mode = mode # type: Mode
|
|
|
|
self._reader = skvideo.io.FFmpegReader(filename)
|
|
|
|
# Compute frame rate from input video
|
|
# TODO: possible to compute time offset for each frame instead?
|
|
data = skvideo.io.ffprobe(self.filename)['video']
|
|
rate_data = data['@r_frame_rate'].split("/") # e.g. 12000/1001
|
|
self.input_frame_rate = float(
|
|
rate_data[0]) / float(rate_data[1]) # type: float
|
|
|
|
self.cycles_per_frame = (
|
|
self.CLOCK_SPEED / self.input_frame_rate) # type: float
|
|
self.frame_number = 0 # type: int
|
|
|
|
# Initialize empty screen
|
|
self.memory_map = screen.MemoryMap(
|
|
screen_page=1) # type: screen.MemoryMap
|
|
if self.mode == mode.DHGR:
|
|
self.aux_memory_map = screen.MemoryMap(
|
|
screen_page=1) # type: screen.MemoryMap
|
|
|
|
# Accumulates pending edit weights across frames
|
|
self.update_priority = np.zeros((32, 256), dtype=np.int64)
|
|
if self.mode == mode.DHGR:
|
|
self.aux_update_priority = np.zeros((32, 256), dtype=np.int64)
|
|
|
|
def tick(self, cycles: int) -> bool:
|
|
if cycles > (self.cycles_per_frame * self.frame_number):
|
|
self.frame_number += 1
|
|
return True
|
|
return False
|
|
|
|
def _frame_grabber(self) -> Iterator[Image.Image]:
|
|
for frame_array in self._reader.nextFrame():
|
|
yield Image.fromarray(frame_array)
|
|
|
|
@staticmethod
|
|
def _rgb(r, g, b):
|
|
return (r << 16) + (g << 8) + b
|
|
|
|
# def dither_framesframes(self) -> Iterator[screen.MemoryMap]:
|
|
# palette = hitherdither.palette.Palette(
|
|
# [
|
|
# self._rgb(0,0,0), # black */
|
|
# self._rgb(148,12,125), # red - hgr 0*/
|
|
# self._rgb(32,54,212), # dk blue - hgr 0 */
|
|
# self._rgb(188,55,255), # purple - default HGR overlay color */
|
|
# self._rgb(51,111,0), # dk green - hgr 0 */
|
|
# self._rgb(126,126,126), # gray - hgr 0 */
|
|
# self._rgb(7,168,225), # med blue - alternate HGR overlay
|
|
# # color */
|
|
# self._rgb(158,172,255), # lt blue - hgr 0 */
|
|
# self._rgb(99,77,0), # brown - hgr 0 */
|
|
# self._rgb(249,86,29), # orange */
|
|
# self._rgb(126,126,126), # grey - hgr 0 */
|
|
# self._rgb(255,129,236), # pink - hgr 0 */
|
|
# self._rgb(67,200,0), # lt green */
|
|
# self._rgb(221,206,23), # yellow - hgr 0 */
|
|
# self._rgb(93,248,133), # aqua - hgr 0 */
|
|
# self._rgb(255,255,255) # white
|
|
# ]
|
|
# )
|
|
# for _idx, _frame in enumerate(self._frame_grabber()):
|
|
# if _idx % 60 == 0:
|
|
# img_dithered = hitherdither.ordered.yliluoma.yliluomas_1_ordered_dithering(
|
|
# _frame.resize((280,192), resample=Image.NEAREST),
|
|
# palette, order=8)
|
|
#
|
|
# yield img_dithered
|
|
|
|
def frames(self) -> Iterator[screen.MemoryMap]:
|
|
"""Encode frame to HGR using bmp2dhr.
|
|
|
|
We do the encoding in a background thread to parallelize.
|
|
"""
|
|
|
|
frame_dir = self.filename.split(".")[0]
|
|
try:
|
|
os.mkdir(frame_dir)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
q = queue.Queue(maxsize=10)
|
|
|
|
def _hgr_decode(_idx, _frame):
|
|
outfile = "%s/%08dC.BIN" % (frame_dir, _idx)
|
|
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
|
|
|
|
try:
|
|
os.stat(outfile)
|
|
except FileNotFoundError:
|
|
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
|
|
_frame.save(bmpfile)
|
|
|
|
# TODO: parametrize palette
|
|
subprocess.call([
|
|
"/usr/local/bin/bmp2dhr", bmpfile, "hgr",
|
|
"P0", # Kegs32 RGB Color palette(for //gs playback)
|
|
"D9" # Buckels dither
|
|
])
|
|
|
|
os.remove(bmpfile)
|
|
|
|
_main = np.fromfile(outfile, dtype=np.uint8)
|
|
|
|
return _main, None
|
|
|
|
def _dhgr_decode(_idx, _frame):
|
|
mainfile = "%s/%08d.BIN" % (frame_dir, _idx)
|
|
auxfile = "%s/%08d.AUX" % (frame_dir, _idx)
|
|
|
|
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
|
|
|
|
try:
|
|
os.stat(mainfile)
|
|
os.stat(auxfile)
|
|
except FileNotFoundError:
|
|
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
|
|
_frame.save(bmpfile)
|
|
|
|
# TODO: parametrize palette
|
|
subprocess.call([
|
|
"/usr/local/bin/bmp2dhr", bmpfile, "dhgr",
|
|
"P0", # Kegs32 RGB Color palette (for //gs playback)
|
|
"A", # Output separate .BIN and .AUX files
|
|
"D9" # Buckels dither
|
|
])
|
|
|
|
os.remove(bmpfile)
|
|
|
|
_main = np.fromfile(mainfile, dtype=np.uint8)
|
|
_aux = np.fromfile(auxfile, dtype=np.uint8)
|
|
|
|
return _main, _aux
|
|
|
|
def worker():
|
|
"""Invoke bmp2dhr to encode input image frames and push to queue."""
|
|
for _idx, _frame in enumerate(self._frame_grabber()):
|
|
if self.mode == Mode.DHGR:
|
|
res = _dhgr_decode(_idx, _frame)
|
|
else:
|
|
res = _hgr_decode(_idx, _frame)
|
|
q.put(res)
|
|
|
|
q.put((None, None))
|
|
|
|
t = threading.Thread(target=worker, daemon=True)
|
|
t.start()
|
|
|
|
while True:
|
|
|
|
main, aux = q.get()
|
|
if main is None:
|
|
break
|
|
|
|
main_map = screen.FlatMemoryMap(
|
|
screen_page=1, data=main).to_memory_map()
|
|
if aux is None:
|
|
aux_map = None
|
|
else:
|
|
aux_map = screen.FlatMemoryMap(
|
|
screen_page=1, data=aux).to_memory_map()
|
|
yield (main_map, aux_map)
|
|
q.task_done()
|
|
|
|
t.join()
|
|
|
|
def encode_frame(
|
|
self, target: screen.MemoryMap,
|
|
memory_map: screen.MemoryMap,
|
|
update_priority: np.array,
|
|
) -> Iterator[opcodes.Opcode]:
|
|
"""Update to match content of frame within provided budget."""
|
|
|
|
print("Similarity %f" % (update_priority.mean()))
|
|
yield from self._index_changes(memory_map, target, update_priority)
|
|
|
|
def _index_changes(
|
|
self,
|
|
source: screen.MemoryMap,
|
|
target: screen.MemoryMap,
|
|
update_priority: np.array
|
|
) -> Iterator[Tuple[int, int, List[int]]]:
|
|
"""Transform encoded screen to sequence of change tuples."""
|
|
|
|
diff_weights = self._diff_weights(source, target)
|
|
|
|
# Clear any update priority entries that have resolved themselves
|
|
# with new frame
|
|
update_priority[diff_weights == 0] = 0
|
|
|
|
# Halve existing weights to increase bias to new diffs.
|
|
# In particular this means that existing updates with diff 1 will
|
|
# become diff 0, i.e. will only be prioritized if they are still
|
|
# diffs in the new frame.
|
|
# self.update_priority >>= 1
|
|
update_priority += diff_weights
|
|
|
|
priorities = self._heapify_priorities(update_priority)
|
|
|
|
content_deltas = {}
|
|
|
|
while priorities:
|
|
_, _, page, offset = heapq.heappop(priorities)
|
|
# Check whether we've already cleared this diff while processing
|
|
# an earlier opcode
|
|
if update_priority[page, offset] == 0:
|
|
continue
|
|
|
|
offsets = [offset]
|
|
content = target.page_offset[page, offset]
|
|
|
|
# Clear priority for the offset we're emitting
|
|
update_priority[page, offset] = 0
|
|
source.page_offset[page, offset] = content
|
|
diff_weights[page, offset] = 0
|
|
|
|
# Make sure we don't emit this offset as a side-effect of some
|
|
# other offset later.
|
|
for cd in content_deltas.values():
|
|
cd[page, offset] = 0
|
|
|
|
# Need to find 3 more offsets to fill this opcode
|
|
for o in self._compute_error(
|
|
page,
|
|
content,
|
|
target,
|
|
diff_weights,
|
|
content_deltas
|
|
):
|
|
offsets.append(o)
|
|
|
|
# Compute new edit distance between new content and target
|
|
# byte, so we can reinsert with this value
|
|
p = edit_distance.edit_weight(
|
|
content, target.page_offset[page, o], o % 2 == 1,
|
|
error=False)
|
|
|
|
# Update priority for the offset we're emitting
|
|
update_priority[page, o] = p # 0
|
|
|
|
source.page_offset[page, o] = content
|
|
|
|
if p:
|
|
# This content byte introduced an error, so put back on the
|
|
# heap in case we can get back to fixing it exactly
|
|
# during this frame. Otherwise we'll get to it later.
|
|
heapq.heappush(
|
|
priorities, (-p, random.random(), page, offset))
|
|
|
|
# Pad to 4 if we didn't find enough
|
|
for _ in range(len(offsets), 4):
|
|
offsets.append(offsets[0])
|
|
|
|
yield (page + 32, content, offsets)
|
|
|
|
# If we run out of things to do, pad forever
|
|
content = target.page_offset[(0, 0)]
|
|
while True:
|
|
yield (32, content, [0, 0, 0, 0])
|
|
|
|
@staticmethod
|
|
def _diff_weights(
|
|
source: screen.MemoryMap,
|
|
target: screen.MemoryMap
|
|
):
|
|
return edit_distance.screen_edit_distance(
|
|
source.page_offset, target.page_offset)
|
|
|
|
def _heapify_priorities(self, update_priority: np.array) -> List:
|
|
priorities = []
|
|
it = np.nditer(update_priority, flags=['multi_index'])
|
|
while not it.finished:
|
|
priority = it[0]
|
|
if not priority:
|
|
it.iternext()
|
|
continue
|
|
|
|
page, offset = it.multi_index
|
|
|
|
# Don't use deterministic order for page, offset
|
|
nonce = random.random()
|
|
priorities.append((-priority, nonce, page, offset))
|
|
it.iternext()
|
|
|
|
heapq.heapify(priorities)
|
|
return priorities
|
|
|
|
@staticmethod
|
|
def _compute_delta(content, target, old):
|
|
"""
|
|
This function is the critical path for the video encoding.
|
|
"""
|
|
return edit_distance.byte_screen_error_distance(content, target) - old
|
|
|
|
_OFFSETS = np.arange(256)
|
|
|
|
def _compute_error(self, page, content, target, old_error, content_deltas):
|
|
offsets = []
|
|
|
|
# TODO: move this up into parent
|
|
delta_screen = content_deltas.get(content)
|
|
if delta_screen is None:
|
|
delta_screen = self._compute_delta(
|
|
content, target.page_offset, old_error)
|
|
content_deltas[content] = delta_screen
|
|
|
|
delta_page = delta_screen[page]
|
|
cond = delta_page < 0
|
|
candidate_offsets = self._OFFSETS[cond]
|
|
priorities = delta_page[cond]
|
|
|
|
l = [
|
|
(priorities[i], random.random(), candidate_offsets[i])
|
|
for i in range(len(candidate_offsets))
|
|
]
|
|
heapq.heapify(l)
|
|
|
|
while l:
|
|
_, _, o = heapq.heappop(l)
|
|
offsets.append(o)
|
|
|
|
# Make sure we don't end up considering this (page, offset) again
|
|
# until the next image frame. Even if a better match comes along,
|
|
# it's probably better to fix up some other byte.
|
|
for cd in content_deltas.values():
|
|
cd[page, o] = 0
|
|
|
|
if len(offsets) == 3:
|
|
break
|
|
|
|
return offsets
|