Rename FrameSequencer to FrameGrabber and break out into separate file.

Add a test case that the bmp2dhr output of input filenames containing
'.'  are handled correctly.

Break out video.Mode into video_mode.VideoMode to resolve circular
dependency.
This commit is contained in:
kris 2019-06-14 21:59:39 +01:00
parent d5f2482a0a
commit 549752e112
8 changed files with 202 additions and 156 deletions

144
transcoder/frame_grabber.py Normal file
View File

@ -0,0 +1,144 @@
"""Extracts sequence of still images from input video stream."""
import os
import queue
import subprocess
import threading
from typing import Iterator
import numpy as np
import skvideo.io
from PIL import Image
import screen
from video_mode import VideoMode
class FrameGrabber:
def __init__(self, mode: VideoMode):
self.video_mode = mode
self.input_frame_rate = 30
def frames(self) -> Iterator[screen.MemoryMap]:
raise NotImplementedError
class FileFrameGrabber(FrameGrabber):
def __init__(self, filename, mode: VideoMode):
super(FileFrameGrabber, self).__init__(mode)
self.filename = filename # type: str
self._reader = skvideo.io.FFmpegReader(filename)
# Compute frame rate from input video
# TODO: possible to compute time offset for each frame instead?
data = skvideo.io.ffprobe(self.filename)['video']
rate_data = data['@r_frame_rate'].split("/") # e.g. 12000/1001
self.input_frame_rate = float(
rate_data[0]) / float(rate_data[1]) # type: float
def _frame_grabber(self) -> Iterator[Image.Image]:
for frame_array in self._reader.nextFrame():
yield Image.fromarray(frame_array)
@staticmethod
def _output_dir(filename) -> str:
return ".".join(filename.split(".")[:-1])
def frames(self) -> Iterator[screen.MemoryMap]:
"""Encode frame to HGR using bmp2dhr.
We do the encoding in a background thread to parallelize.
"""
frame_dir = self._output_dir(self.filename)
try:
os.mkdir(frame_dir)
except FileExistsError:
pass
q = queue.Queue(maxsize=10)
def _hgr_decode(_idx, _frame):
outfile = "%s/%08dC.BIN" % (frame_dir, _idx)
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
try:
os.stat(outfile)
except FileNotFoundError:
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
_frame.save(bmpfile)
# TODO: parametrize palette
subprocess.call([
"/usr/local/bin/bmp2dhr", bmpfile, "hgr",
"P5",
# "P0", # Kegs32 RGB Color palette(for //gs playback)
"D9" # Buckels dither
])
os.remove(bmpfile)
_main = np.fromfile(outfile, dtype=np.uint8)
return _main, None
def _dhgr_decode(_idx, _frame):
mainfile = "%s/%08d.BIN" % (frame_dir, _idx)
auxfile = "%s/%08d.AUX" % (frame_dir, _idx)
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
try:
os.stat(mainfile)
os.stat(auxfile)
except FileNotFoundError:
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
_frame.save(bmpfile)
# TODO: parametrize palette
subprocess.call([
"/usr/local/bin/bmp2dhr", bmpfile, "dhgr", # "v",
"P5", # "P0", # Kegs32 RGB Color palette (for //gs
# playback)
"A", # Output separate .BIN and .AUX files
"D9" # Buckels dither
])
os.remove(bmpfile)
_main = np.fromfile(mainfile, dtype=np.uint8)
_aux = np.fromfile(auxfile, dtype=np.uint8)
return _main, _aux
def worker():
"""Invoke bmp2dhr to encode input image frames and push to queue."""
for _idx, _frame in enumerate(self._frame_grabber()):
if self.video_mode == VideoMode.DHGR:
res = _dhgr_decode(_idx, _frame)
else:
res = _hgr_decode(_idx, _frame)
q.put(res)
q.put((None, None))
t = threading.Thread(target=worker, daemon=True)
t.start()
while True:
main, aux = q.get()
if main is None:
break
main_map = screen.FlatMemoryMap(
screen_page=1, data=main).to_memory_map()
if aux is None:
aux_map = None
else:
aux_map = screen.FlatMemoryMap(
screen_page=1, data=aux).to_memory_map()
yield (main_map, aux_map)
q.task_done()
t.join()

View File

@ -0,0 +1,25 @@
import unittest
import frame_grabber
class TestFileFrameGrabber(unittest.TestCase):
def test_output_dir(self):
self.assertEqual(
"/foo/bar",
frame_grabber.FileFrameGrabber._output_dir("/foo/bar.mp4")
)
self.assertEqual(
"/foo/bar.blee",
frame_grabber.FileFrameGrabber._output_dir("/foo/bar.blee.mp4")
)
self.assertEqual(
"/foo/bar blee",
frame_grabber.FileFrameGrabber._output_dir("/foo/bar blee.mp4")
)
if __name__ == '__main__':
unittest.main()

View File

@ -3,7 +3,7 @@
import argparse
import movie
import video
import video_mode
parser = argparse.ArgumentParser(
description='Transcode videos to ][Vision format.')
@ -25,7 +25,7 @@ parser.add_argument(
'frame rate, which may give better quality for some videos.'
)
parser.add_argument(
'--video_mode', type=str, choices=video.Mode.__members__.keys(),
'--video_mode', type=str, choices=video_mode.VideoMode.__members__.keys(),
help='Video display mode to encode for (HGR/DHGR)'
)
@ -37,10 +37,10 @@ def main(args):
every_n_video_frames=args.every_n_video_frames,
audio_normalization=args.audio_normalization,
max_bytes_out=1024. * 1024 * args.max_output_mb,
video_mode=video.Mode[args.video_mode]
video_mode=video_mode.VideoMode[args.video_mode]
)
print("Input frame rate = %f" % m.frame_sequencer.input_frame_rate)
print("Input frame rate = %f" % m.frame_grabber.input_frame_rate)
if args.output:
out_filename = args.output

View File

@ -3,9 +3,11 @@
from typing import Iterable, Iterator
import audio
import frame_grabber
import machine
import opcodes
import video
from video_mode import VideoMode
class Movie:
@ -14,20 +16,20 @@ class Movie:
every_n_video_frames: int = 1,
audio_normalization: float = None,
max_bytes_out: int = None,
video_mode: video.Mode = video.Mode.HGR,
video_mode: VideoMode = VideoMode.HGR,
):
self.filename = filename # type: str
self.every_n_video_frames = every_n_video_frames # type: int
self.max_bytes_out = max_bytes_out # type: int
self.video_mode = video_mode # type: video.Mode
self.video_mode = video_mode # type: VideoMode
self.audio = audio.Audio(
filename, normalization=audio_normalization) # type: audio.Audio
self.frame_sequencer = video.FileFrameSequencer(
self.frame_grabber = frame_grabber.FileFrameGrabber(
filename, mode=video_mode)
self.video = video.Video(
self.frame_sequencer, mode=video_mode) # type: video.Video
self.frame_grabber, mode=video_mode) # type: video.Video
self.stream_pos = 0 # type: int
@ -49,7 +51,7 @@ class Movie:
:return:
"""
video_frames = self.frame_sequencer.frames()
video_frames = self.frame_grabber.frames()
main_seq = None
aux_seq = None
@ -104,7 +106,7 @@ class Movie:
if socket_pos >= 2044:
# 2 op_ack address bytes + 2 payload bytes from ACK must
# terminate 2K stream frame
if self.video_mode == video.Mode.DHGR:
if self.video_mode == VideoMode.DHGR:
# Flip-flop between MAIN and AUX banks
self.aux_memory_bank = not self.aux_memory_bank

View File

@ -4,6 +4,7 @@ import enum
from typing import Iterator, Tuple
import symbol_table
import video_mode
from machine import Machine
@ -64,7 +65,7 @@ class Header(Opcode):
"""Video header opcode."""
COMMAND = OpcodeCommand.HEADER
def __init__(self, mode: "video.Mode"):
def __init__(self, mode: video_mode.VideoMode):
self.video_mode = mode
def __data_eq__(self, other):

View File

@ -1,152 +1,16 @@
"""Encode a sequence of images as an optimized stream of screen changes."""
import enum
import functools
import heapq
import os
import queue
import random
import subprocess
import threading
from typing import List, Iterator, Tuple
import numpy as np
import skvideo.io
from PIL import Image
import opcodes
import screen
class Mode(enum.Enum):
HGR = 0
DHGR = 1
class FrameSequencer:
def __init__(self, mode: Mode):
self.video_mode = mode
self.input_frame_rate = 30
def frames(self) -> Iterator[screen.MemoryMap]:
raise NotImplementedError
class FileFrameSequencer(FrameSequencer):
def __init__(self, filename, mode: Mode):
super(FileFrameSequencer, self).__init__(mode)
self.filename = filename # type: str
self._reader = skvideo.io.FFmpegReader(filename)
# Compute frame rate from input video
# TODO: possible to compute time offset for each frame instead?
data = skvideo.io.ffprobe(self.filename)['video']
rate_data = data['@r_frame_rate'].split("/") # e.g. 12000/1001
self.input_frame_rate = float(
rate_data[0]) / float(rate_data[1]) # type: float
def _frame_grabber(self) -> Iterator[Image.Image]:
for frame_array in self._reader.nextFrame():
yield Image.fromarray(frame_array)
def frames(self) -> Iterator[screen.MemoryMap]:
"""Encode frame to HGR using bmp2dhr.
We do the encoding in a background thread to parallelize.
"""
frame_dir = ".".join(self.filename.split(".")[:-1])
try:
os.mkdir(frame_dir)
except FileExistsError:
pass
q = queue.Queue(maxsize=10)
def _hgr_decode(_idx, _frame):
outfile = "%s/%08dC.BIN" % (frame_dir, _idx)
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
try:
os.stat(outfile)
except FileNotFoundError:
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
_frame.save(bmpfile)
# TODO: parametrize palette
subprocess.call([
"/usr/local/bin/bmp2dhr", bmpfile, "hgr",
"P5",
# "P0", # Kegs32 RGB Color palette(for //gs playback)
"D9" # Buckels dither
])
os.remove(bmpfile)
_main = np.fromfile(outfile, dtype=np.uint8)
return _main, None
def _dhgr_decode(_idx, _frame):
mainfile = "%s/%08d.BIN" % (frame_dir, _idx)
auxfile = "%s/%08d.AUX" % (frame_dir, _idx)
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
try:
os.stat(mainfile)
os.stat(auxfile)
except FileNotFoundError:
_frame = _frame.resize((280, 192), resample=Image.LANCZOS)
_frame.save(bmpfile)
# TODO: parametrize palette
subprocess.call([
"/usr/local/bin/bmp2dhr", bmpfile, "dhgr", # "v",
"P5", # "P0", # Kegs32 RGB Color palette (for //gs
# playback)
"A", # Output separate .BIN and .AUX files
"D9" # Buckels dither
])
os.remove(bmpfile)
_main = np.fromfile(mainfile, dtype=np.uint8)
_aux = np.fromfile(auxfile, dtype=np.uint8)
return _main, _aux
def worker():
"""Invoke bmp2dhr to encode input image frames and push to queue."""
for _idx, _frame in enumerate(self._frame_grabber()):
if self.video_mode == Mode.DHGR:
res = _dhgr_decode(_idx, _frame)
else:
res = _hgr_decode(_idx, _frame)
q.put(res)
q.put((None, None))
t = threading.Thread(target=worker, daemon=True)
t.start()
while True:
main, aux = q.get()
if main is None:
break
main_map = screen.FlatMemoryMap(
screen_page=1, data=main).to_memory_map()
if aux is None:
aux_map = None
else:
aux_map = screen.FlatMemoryMap(
screen_page=1, data=aux).to_memory_map()
yield (main_map, aux_map)
q.task_done()
t.join()
from frame_grabber import FrameGrabber
from video_mode import VideoMode
class Video:
@ -156,13 +20,13 @@ class Video:
def __init__(
self,
frame_sequencer: FrameSequencer,
mode: Mode = Mode.HGR
frame_grabber: FrameGrabber,
mode: VideoMode = VideoMode.HGR
):
self.mode = mode # type: Mode
self.frame_sequencer = frame_sequencer # type: FrameSequencer
self.mode = mode # type: VideoMode
self.frame_grabber = frame_grabber # type: FrameGrabber
self.cycles_per_frame = (
self.CLOCK_SPEED / frame_sequencer.input_frame_rate
self.CLOCK_SPEED / frame_grabber.input_frame_rate
) # type: float
self.frame_number = 0 # type: int

8
transcoder/video_mode.py Normal file
View File

@ -0,0 +1,8 @@
"""Enum representing video encoding mode."""
import enum
class VideoMode(enum.Enum):
HGR = 0
DHGR = 1

View File

@ -2,14 +2,16 @@
import unittest
import frame_grabber
import screen
import video
import video_mode
class TestVideo(unittest.TestCase):
def test_diff_weights(self):
fs = video.FrameSequencer(mode=video.Mode.DHGR)
v = video.Video(fs, mode=video.Mode.DHGR)
fs = frame_grabber.FrameGrabber(mode=video_mode.VideoMode.DHGR)
v = video.Video(fs, mode=video_mode.VideoMode.DHGR)
frame = screen.MemoryMap(screen_page=1)
frame.page_offset[0, 0] = 0b1111111