ii-vision/transcoder/audio.py

83 lines
2.5 KiB
Python
Raw Normal View History

2019-03-21 16:42:47 +00:00
"""Encodes input audio stream into sequence of speaker duty cycle counts."""
2019-03-14 23:05:15 +00:00
from typing import Iterator
import audioread
import librosa
2019-03-14 21:45:28 +00:00
import numpy as np
class Audio:
def __init__(
self, filename: str, normalization: float = None):
2019-03-14 23:05:15 +00:00
self.filename = filename # type: str
# TODO: take into account that the available range is slightly offset
# as fraction of total cycle count?
self._tick_range = [4, 66]
# At 73 cycles/tick, true audio playback sample rate is
# roughly 1024*1024/73 = 14364 Hz (ignoring ACK slow path).
# Typical audio encoding is 44100Hz which is close to 14700*3
# Downscaling by 3x gives better results than trying to resample
# to a non-divisor. So we cheat a bit and play back the video a tiny
# bit (<2%) faster.
self.sample_rate = 14700. # type: float
2019-03-14 23:05:15 +00:00
self.normalization = (
normalization or self._normalization()) # type: float
def _decode(self, f, buf) -> np.array:
"""
:param f:
:param buf:
:return:
"""
data = np.frombuffer(buf, dtype='int16').astype(
'float32').reshape((f.channels, -1), order='F')
a = librosa.core.to_mono(data)
a = librosa.resample(a, f.samplerate,
self.sample_rate).flatten()
return a
2019-03-14 21:45:28 +00:00
def _normalization(self, read_bytes=1024 * 1024 * 10):
"""Read first read_bytes of audio stream and compute normalization.
We compute the 2.5th and 97.5th percentiles i.e. only 2.5% of samples
will clip.
:param read_bytes:
:return:
"""
raw = bytearray()
with audioread.audio_open(self.filename) as f:
for buf in f.read_data():
raw.extend(bytearray(buf))
if len(raw) > read_bytes:
break
a = self._decode(f, raw)
norm = np.max(np.abs(np.percentile(a, [2.5, 97.5])))
return 16384. / norm
2019-03-14 23:05:15 +00:00
def audio_stream(self) -> Iterator[int]:
"""
:return:
"""
with audioread.audio_open(self.filename) as f:
for buf in f.read_data(128 * 1024):
a = self._decode(f, buf)
a /= 16384 # normalize to -1.0 .. 1.0
a *= self.normalization
# Convert to -16 .. 16
a = (a * 16).astype(np.int)
a = np.clip(a, -15, 16)
yield from a