Rename FrameSequencer to FrameGrabber and break out into separate file.

Add a test case that the bmp2dhr output of input filenames containing
'.'  are handled correctly.

Break out video.Mode into video_mode.VideoMode to resolve circular
dependency.
This commit is contained in:
kris
2019-06-14 21:59:39 +01:00
parent fac2089206
commit 33aa4d46c4
8 changed files with 200 additions and 160 deletions

View File

@@ -1,158 +1,16 @@
"""Encode a sequence of images as an optimized stream of screen changes."""
import enum
import functools
import heapq
import os
import queue
import random
import subprocess
import threading
from typing import List, Iterator, Tuple
import numpy as np
import skvideo.io
from PIL import Image
import opcodes
import screen
class Mode(enum.Enum):
HGR = 0
DHGR = 1
class FrameSequencer:
def __init__(self, mode: Mode):
self.video_mode = mode
self.input_frame_rate = 30
def frames(self) -> Iterator[screen.MemoryMap]:
raise NotImplementedError
class FileFrameSequencer(FrameSequencer):
def __init__(
self,
filename: str,
mode: Mode = Mode.HGR,
):
super(FileFrameSequencer, self).__init__(mode)
self.filename = filename # type: str
self.mode = mode # type: Mode
self._reader = skvideo.io.FFmpegReader(filename)
# Compute frame rate from input video
# TODO: possible to compute time offset for each frame instead?
data = skvideo.io.ffprobe(self.filename)['video']
rate_data = data['@r_frame_rate'].split("/") # e.g. 12000/1001
self.input_frame_rate = float(
rate_data[0]) / float(rate_data[1]) # type: float
def _frame_grabber(self) -> Iterator[Image.Image]:
for frame_array in self._reader.nextFrame():
yield Image.fromarray(frame_array)
def frames(self) -> Iterator[screen.MemoryMap]:
"""Encode frame to HGR using bmp2dhr.
We do the encoding in a background thread to parallelize.
"""
frame_dir = ".".join(self.filename.split(".")[:-1])
try:
os.mkdir(frame_dir)
except FileExistsError:
pass
q = queue.Queue(maxsize=10)
def _hgr_decode(_idx, _frame):
outfile = "%s/%08dC.BIN" % (frame_dir, _idx)
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
try:
os.stat(outfile)
except FileNotFoundError:
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
_frame.save(bmpfile)
# TODO: parametrize palette
subprocess.call([
"/usr/local/bin/bmp2dhr", bmpfile, "hgr",
"P5",
# "P0", # Kegs32 RGB Color palette(for //gs playback)
"D9" # Buckels dither
])
os.remove(bmpfile)
_main = np.fromfile(outfile, dtype=np.uint8)
return _main, None
def _dhgr_decode(_idx, _frame):
mainfile = "%s/%08d.BIN" % (frame_dir, _idx)
auxfile = "%s/%08d.AUX" % (frame_dir, _idx)
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
try:
os.stat(mainfile)
os.stat(auxfile)
except FileNotFoundError:
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
_frame.save(bmpfile)
# TODO: parametrize palette
subprocess.call([
"/usr/local/bin/bmp2dhr", bmpfile, "dhgr", # "v",
"P5", # "P0", # Kegs32 RGB Color palette (for //gs
# playback)
"A", # Output separate .BIN and .AUX files
"D9" # Buckels dither
])
os.remove(bmpfile)
_main = np.fromfile(mainfile, dtype=np.uint8)
_aux = np.fromfile(auxfile, dtype=np.uint8)
return _main, _aux
def worker():
"""Invoke bmp2dhr to encode input image frames and push to queue."""
for _idx, _frame in enumerate(self._frame_grabber()):
if self.video_mode == Mode.DHGR:
res = _dhgr_decode(_idx, _frame)
else:
res = _hgr_decode(_idx, _frame)
q.put(res)
q.put((None, None))
t = threading.Thread(target=worker, daemon=True)
t.start()
while True:
main, aux = q.get()
if main is None:
break
main_map = screen.FlatMemoryMap(
screen_page=1, data=main).to_memory_map()
if aux is None:
aux_map = None
else:
aux_map = screen.FlatMemoryMap(
screen_page=1, data=aux).to_memory_map()
yield (main_map, aux_map)
q.task_done()
t.join()
from frame_grabber import FrameGrabber
from video_mode import VideoMode
class Video:
@@ -162,11 +20,11 @@ class Video:
def __init__(
self,
frame_sequencer: FrameSequencer,
mode: Mode = Mode.HGR
frame_grabber: FrameGrabber,
mode: VideoMode = VideoMode.HGR
):
self.mode = mode # type: Mode
self.frame_sequencer = frame_sequencer # type: FrameSequencer
self.mode = mode # type: VideoMode
self.frame_grabber = frame_grabber # type: FrameGrabber
self.ticks_per_frame = (
self.ticks_per_second / self.input_frame_rate) # type: float
self.frame_number = 0 # type: int