mirror of
https://github.com/KrisKennaway/ii-vision.git
synced 2024-12-21 05:30:20 +00:00
Style cleanups
This commit is contained in:
parent
01ffd034eb
commit
d90b865b16
12
audio.py
12
audio.py
@ -1,3 +1,5 @@
|
|||||||
|
from typing import Iterator
|
||||||
|
|
||||||
import audioread
|
import audioread
|
||||||
import librosa
|
import librosa
|
||||||
import numpy as np
|
import numpy as np
|
||||||
@ -6,18 +8,18 @@ import numpy as np
|
|||||||
class Audio:
|
class Audio:
|
||||||
def __init__(
|
def __init__(
|
||||||
self, filename: str, normalization: float = None):
|
self, filename: str, normalization: float = None):
|
||||||
self.filename = filename
|
self.filename = filename # type: str
|
||||||
|
|
||||||
# TODO: take into account that the available range is slightly offset
|
# TODO: take into account that the available range is slightly offset
|
||||||
# as fraction of total cycle count?
|
# as fraction of total cycle count?
|
||||||
self._tick_range = [4, 66]
|
self._tick_range = [4, 66]
|
||||||
self.cycles_per_tick = 73
|
self.cycles_per_tick = 73 # type: int
|
||||||
|
|
||||||
# TODO: round to divisor of video frame rate
|
# TODO: round to divisor of video frame rate
|
||||||
self.sample_rate = 14340 # int(1024. * 1024 / self.cycles_per_tick)
|
self.sample_rate = 14340 # int(1024. * 1024 / self.cycles_per_tick)
|
||||||
|
|
||||||
self.normalization = normalization or self._normalization()
|
self.normalization = (
|
||||||
print(self.normalization)
|
normalization or self._normalization()) # type: float
|
||||||
|
|
||||||
def _decode(self, f, buf) -> np.array:
|
def _decode(self, f, buf) -> np.array:
|
||||||
data = np.frombuffer(buf, dtype='int16').astype(
|
data = np.frombuffer(buf, dtype='int16').astype(
|
||||||
@ -47,7 +49,7 @@ class Audio:
|
|||||||
|
|
||||||
return 16384. / norm
|
return 16384. / norm
|
||||||
|
|
||||||
def audio_stream(self):
|
def audio_stream(self) -> Iterator[int]:
|
||||||
with audioread.audio_open(self.filename) as f:
|
with audioread.audio_open(self.filename) as f:
|
||||||
for buf in f.read_data(128 * 1024):
|
for buf in f.read_data(128 * 1024):
|
||||||
a = self._decode(f, buf)
|
a = self._decode(f, buf)
|
||||||
|
20
video.py
20
video.py
@ -19,7 +19,7 @@ import screen
|
|||||||
class Video:
|
class Video:
|
||||||
"""Apple II screen memory map encoding a bitmapped frame."""
|
"""Apple II screen memory map encoding a bitmapped frame."""
|
||||||
|
|
||||||
CLOCK_SPEED = 1024 * 1024
|
CLOCK_SPEED = 1024 * 1024 # type: int
|
||||||
|
|
||||||
def __init__(self, filename: str):
|
def __init__(self, filename: str):
|
||||||
self.filename = filename # type: str
|
self.filename = filename # type: str
|
||||||
@ -27,27 +27,30 @@ class Video:
|
|||||||
self._reader = skvideo.io.FFmpegReader(filename)
|
self._reader = skvideo.io.FFmpegReader(filename)
|
||||||
|
|
||||||
# Compute frame rate from input video
|
# Compute frame rate from input video
|
||||||
|
# TODO: possible to compute time offset for each frame instead?
|
||||||
data = skvideo.io.ffprobe(self.filename)['video']
|
data = skvideo.io.ffprobe(self.filename)['video']
|
||||||
rate_data = data['@r_frame_rate'].split("/") # e.g. 12000/1001
|
rate_data = data['@r_frame_rate'].split("/") # e.g. 12000/1001
|
||||||
self._input_frame_rate = float(rate_data[0]) / float(rate_data[1])
|
self._input_frame_rate = float(
|
||||||
|
rate_data[0]) / float(rate_data[1]) # type: float
|
||||||
|
|
||||||
self.cycles_per_frame = 1024. * 1024 / self._input_frame_rate
|
self.cycles_per_frame = (
|
||||||
self.frame_number = 0
|
1024. * 1024 / self._input_frame_rate) # type: float
|
||||||
|
self.frame_number = 0 # type: int
|
||||||
|
|
||||||
# Initialize empty
|
# Initialize empty screen
|
||||||
self.memory_map = screen.MemoryMap(
|
self.memory_map = screen.MemoryMap(
|
||||||
screen_page=1) # type: screen.MemoryMap
|
screen_page=1) # type: screen.MemoryMap
|
||||||
|
|
||||||
# Accumulates pending edit weights across frames
|
# Accumulates pending edit weights across frames
|
||||||
self.update_priority = np.zeros((32, 256), dtype=np.int64)
|
self.update_priority = np.zeros((32, 256), dtype=np.int64)
|
||||||
|
|
||||||
def tick(self, cycles) -> bool:
|
def tick(self, cycles: int) -> bool:
|
||||||
if cycles > (self.cycles_per_frame * self.frame_number):
|
if cycles > (self.cycles_per_frame * self.frame_number):
|
||||||
self.frame_number += 1
|
self.frame_number += 1
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _frame_grabber(self):
|
def _frame_grabber(self) -> Iterator[Image]:
|
||||||
for frame_array in self._reader.nextFrame():
|
for frame_array in self._reader.nextFrame():
|
||||||
yield Image.fromarray(frame_array)
|
yield Image.fromarray(frame_array)
|
||||||
|
|
||||||
@ -66,6 +69,7 @@ class Video:
|
|||||||
q = queue.Queue(maxsize=10)
|
q = queue.Queue(maxsize=10)
|
||||||
|
|
||||||
def worker():
|
def worker():
|
||||||
|
"""Invoke bmp2dhr to encode input image frames and push to queue."""
|
||||||
for _idx, _frame in enumerate(self._frame_grabber()):
|
for _idx, _frame in enumerate(self._frame_grabber()):
|
||||||
outfile = "%s/%08dC.BIN" % (frame_dir, _idx)
|
outfile = "%s/%08dC.BIN" % (frame_dir, _idx)
|
||||||
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
|
bmpfile = "%s/%08d.bmp" % (frame_dir, _idx)
|
||||||
@ -86,7 +90,7 @@ class Video:
|
|||||||
|
|
||||||
q.put(None)
|
q.put(None)
|
||||||
|
|
||||||
t = threading.Thread(target=worker)
|
t = threading.Thread(target=worker, daemon=True)
|
||||||
t.start()
|
t.start()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
Loading…
Reference in New Issue
Block a user