2019-03-21 16:42:47 +00:00
|
|
|
"""Encodes input audio stream into sequence of speaker duty cycle counts."""
|
|
|
|
|
2019-03-14 23:05:15 +00:00
|
|
|
from typing import Iterator
|
|
|
|
|
2019-03-05 20:47:34 +00:00
|
|
|
import audioread
|
2019-02-27 14:49:21 +00:00
|
|
|
import librosa
|
2019-03-14 21:45:28 +00:00
|
|
|
import numpy as np
|
2019-02-27 14:49:21 +00:00
|
|
|
|
|
|
|
|
|
|
|
class Audio:
|
2019-03-07 23:07:24 +00:00
|
|
|
def __init__(
|
2019-03-14 21:40:09 +00:00
|
|
|
self, filename: str, normalization: float = None):
|
2019-03-14 23:05:15 +00:00
|
|
|
self.filename = filename # type: str
|
2019-02-27 14:49:21 +00:00
|
|
|
|
2019-03-07 23:07:24 +00:00
|
|
|
# TODO: take into account that the available range is slightly offset
|
|
|
|
# as fraction of total cycle count?
|
|
|
|
self._tick_range = [4, 66]
|
2019-03-14 23:05:15 +00:00
|
|
|
self.cycles_per_tick = 73 # type: int
|
2019-02-27 14:49:21 +00:00
|
|
|
|
2019-03-07 23:07:24 +00:00
|
|
|
# TODO: round to divisor of video frame rate
|
2019-03-21 15:36:29 +00:00
|
|
|
self.sample_rate = 14340 # int(1024. * 1024 /
|
|
|
|
# self.cycles_per_tick)
|
2019-02-27 14:49:21 +00:00
|
|
|
|
2019-03-14 23:05:15 +00:00
|
|
|
self.normalization = (
|
|
|
|
normalization or self._normalization()) # type: float
|
2019-03-14 21:40:09 +00:00
|
|
|
|
|
|
|
def _decode(self, f, buf) -> np.array:
|
2019-03-21 22:41:05 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
:param f:
|
|
|
|
:param buf:
|
|
|
|
:return:
|
|
|
|
"""
|
2019-03-14 21:40:09 +00:00
|
|
|
data = np.frombuffer(buf, dtype='int16').astype(
|
|
|
|
'float32').reshape((f.channels, -1), order='F')
|
|
|
|
|
|
|
|
a = librosa.core.to_mono(data)
|
|
|
|
a = librosa.resample(a, f.samplerate,
|
|
|
|
self.sample_rate).flatten()
|
|
|
|
|
|
|
|
return a
|
|
|
|
|
2019-03-14 21:45:28 +00:00
|
|
|
def _normalization(self, read_bytes=1024 * 1024 * 10):
|
2019-03-14 21:40:09 +00:00
|
|
|
"""Read first read_bytes of audio stream and compute normalization.
|
|
|
|
|
|
|
|
We compute the 2.5th and 97.5th percentiles i.e. only 2.5% of samples
|
|
|
|
will clip.
|
2019-03-21 22:41:05 +00:00
|
|
|
|
|
|
|
:param read_bytes:
|
|
|
|
:return:
|
2019-03-14 21:40:09 +00:00
|
|
|
"""
|
|
|
|
raw = bytearray()
|
2019-03-07 23:07:24 +00:00
|
|
|
with audioread.audio_open(self.filename) as f:
|
2019-03-14 21:40:09 +00:00
|
|
|
for buf in f.read_data():
|
|
|
|
raw.extend(bytearray(buf))
|
|
|
|
if len(raw) > read_bytes:
|
|
|
|
break
|
|
|
|
a = self._decode(f, raw)
|
|
|
|
norm = np.max(np.abs(np.percentile(a, [2.5, 97.5])))
|
2019-03-05 20:47:34 +00:00
|
|
|
|
2019-03-14 21:40:09 +00:00
|
|
|
return 16384. / norm
|
2019-03-05 20:47:34 +00:00
|
|
|
|
2019-03-14 23:05:15 +00:00
|
|
|
def audio_stream(self) -> Iterator[int]:
|
2019-03-21 22:41:05 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
:return:
|
|
|
|
"""
|
2019-03-14 21:40:09 +00:00
|
|
|
with audioread.audio_open(self.filename) as f:
|
|
|
|
for buf in f.read_data(128 * 1024):
|
|
|
|
a = self._decode(f, buf)
|
2019-03-07 23:07:24 +00:00
|
|
|
|
|
|
|
a /= 16384 # normalize to -1.0 .. 1.0
|
|
|
|
a *= self.normalization
|
2019-03-05 20:47:34 +00:00
|
|
|
|
2019-03-07 23:07:24 +00:00
|
|
|
# Convert to -16 .. 16
|
|
|
|
a = (a * 16).astype(np.int)
|
|
|
|
a = np.clip(a, -15, 16)
|
2019-03-05 20:47:34 +00:00
|
|
|
|
2019-03-07 23:07:24 +00:00
|
|
|
yield from a
|