mirror of
https://github.com/KrisKennaway/ii-sound.git
synced 2024-09-27 10:59:55 +00:00
Merge preprocess_audio back in. We didn't end up getting a speed
boost from pypy, perhaps because the critical path is in numpy code. Add some comments and clean up a bit.
This commit is contained in:
parent
7ff129346e
commit
1bbbd8c154
219
encode_audio.py
Normal file → Executable file
219
encode_audio.py
Normal file → Executable file
@ -1,147 +1,164 @@
|
||||
# Delta modulation audio encoder.
|
||||
#
|
||||
# Models the Apple II speaker as an RC circuit with given time constant
|
||||
# and computes a sequence of speaker ticks at multiples of 13-cycle intervals
|
||||
# to approximate the target audio waveform.
|
||||
#
|
||||
# To optimize the audio quality we look ahead some defined number of steps and
|
||||
# choose a speaker trajectory that minimizes errors over this range. e.g.
|
||||
# this allows us to anticipate large amplitude changes by pre-moving
|
||||
# the speaker to better approximate them.
|
||||
#
|
||||
# This also needs to take into account scheduling the "slow path" every 2048
|
||||
# output bytes, where the Apple II will manage the TCP socket buffer while
|
||||
# ticking the speaker every 13 cycles. Since we know this is happening
|
||||
# we can compensate for it, i.e. look ahead to this upcoming slow path and
|
||||
# pre-position the speaker so that it introduces the least error during
|
||||
# this "dead" period when we're keeping the speaker in a net-neutral position.
|
||||
|
||||
import sys
|
||||
import functools
|
||||
import librosa
|
||||
import numpy
|
||||
import soundfile
|
||||
from typing import List
|
||||
from eta import ETA
|
||||
|
||||
PORT = 1977
|
||||
|
||||
OPCODES = {
|
||||
'tick': 0x00,
|
||||
'notick_page1': 0x08,
|
||||
'notick_page2': 0x10,
|
||||
'exit': 0x18,
|
||||
'slowpath': 0x28
|
||||
'tick_page1': 0x00,
|
||||
'notick_page1': 0x09,
|
||||
'notick_page2': 0x11,
|
||||
'exit': 0x19,
|
||||
'slowpath': 0x29
|
||||
}
|
||||
|
||||
# Errors
|
||||
# 1000 LAH 1 = 704632.752909
|
||||
|
||||
# 100 LAH 1 Norm 0.5 = 6783135.377118
|
||||
|
||||
# 100 LAH 1 Norm 0.3 = 3251821.285253
|
||||
# 100 LAH 2 Norm 0.3 = 6446570.565330
|
||||
|
||||
# 100 LAH 1 Norm 0.2 = 1798442.832489
|
||||
# 100 LAH 7 Norm 0.2 = 1624865.370107
|
||||
|
||||
# 10 LAH 1 Norm 0.5 = 3251142.161679
|
||||
# 20 LAH 1 Norm 0.5 = 3518009.983899
|
||||
# 30 ... = 4137179.898981
|
||||
|
||||
# 40 LAH 1 Norm 0.5 = 4667834.501476
|
||||
# 40 LAH 7 Norm 0.5 = 3614627.124544
|
||||
|
||||
# 50 ... = 5118520.178948
|
||||
# 60 ... = 5513928.786507
|
||||
# 70 ... = 5872060.623297
|
||||
# 80 ... = 6198922.754212
|
||||
# 90 ... = 6501755.264692
|
||||
# 100 ... = 6782164.670392
|
||||
# 200 = 8708489.044314
|
||||
# 300 = 9686398.703931
|
||||
# 400 = 10233308.826519
|
||||
# 500 = 10567966.087857
|
||||
|
||||
# TODO: synchronize to VBL
|
||||
# TODO: tick now has space to also flip a soft-switch
|
||||
# TODO: notick also has room to flip another softswitch, what can I do with it?
|
||||
|
||||
|
||||
# @profile
|
||||
def lookahead(step_size: int, initial_position: float,
|
||||
initial_voltage: float, data: numpy.ndarray, offset: int,
|
||||
toggles: List):
|
||||
|
||||
# TODO: construct list of voltage values directly
|
||||
# - construct voltage trajectories from position and vectorize comparison
|
||||
voltage = initial_voltage
|
||||
position = initial_position
|
||||
|
||||
total_error = 0.0
|
||||
for i, t in enumerate(toggles):
|
||||
target_val = data[i+offset]
|
||||
if t:
|
||||
voltage = -voltage
|
||||
position += (voltage - position) / step_size
|
||||
err = position - target_val
|
||||
total_error += abs(err)
|
||||
return total_error
|
||||
|
||||
|
||||
# TODO: test slowpath
|
||||
# TODO: test
|
||||
@functools.lru_cache(None)
|
||||
def lookahead_patterns(lookahead: int, slowpath: int):
|
||||
patterns = []
|
||||
def lookahead_patterns(
|
||||
lookahead: int, slowpath_distance: int,
|
||||
voltage: float) -> numpy.ndarray:
|
||||
initial_voltage = voltage
|
||||
patterns = set()
|
||||
|
||||
num_bits = max(lookahead - slowpath, 0)
|
||||
for i in range(2 ** num_bits):
|
||||
slowpath_pre_bits = 0
|
||||
slowpath_post_bits = 0
|
||||
if slowpath_distance <= 0:
|
||||
slowpath_pre_bits = min(12 + slowpath_distance, lookahead)
|
||||
elif slowpath_distance <= lookahead:
|
||||
slowpath_post_bits = lookahead - slowpath_distance
|
||||
|
||||
enumerate_bits = lookahead - slowpath_pre_bits - slowpath_post_bits
|
||||
assert slowpath_pre_bits + enumerate_bits + slowpath_post_bits == lookahead
|
||||
|
||||
for i in range(2 ** enumerate_bits):
|
||||
voltage = initial_voltage
|
||||
pattern = []
|
||||
for j in range(num_bits):
|
||||
pattern.append(bool((i >> j) & 1))
|
||||
pattern.extend([True for _ in range(min(slowpath, lookahead))])
|
||||
patterns.append(pattern)
|
||||
for j in range(slowpath_pre_bits):
|
||||
voltage = -voltage
|
||||
pattern.append(voltage)
|
||||
|
||||
for j in range(enumerate_bits):
|
||||
voltage = 1.0 if ((i >> j) & 1) else -1.0
|
||||
pattern.append(voltage)
|
||||
|
||||
for j in range(slowpath_post_bits):
|
||||
voltage = -voltage
|
||||
pattern.append(voltage)
|
||||
|
||||
patterns.add(tuple(pattern))
|
||||
|
||||
res = numpy.array(list(patterns), dtype=numpy.float32)
|
||||
return res
|
||||
|
||||
|
||||
def lookahead(step_size: int, initial_position: float, data: numpy.ndarray,
|
||||
offset: int,
|
||||
voltages: numpy.ndarray):
|
||||
positions = numpy.full(voltages.shape[0], initial_position,
|
||||
dtype=numpy.float32)
|
||||
target_val = data[offset:offset + voltages.shape[1]]
|
||||
total_error = numpy.zeros(shape=voltages.shape[0], dtype=numpy.float32)
|
||||
for i in range(0, voltages.shape[1]):
|
||||
positions += (voltages[:, i] - positions) / step_size
|
||||
err = numpy.power(numpy.abs(positions - target_val[i]), 2)
|
||||
total_error += err
|
||||
# err = numpy.abs(positions[:, 1:] - target_val)
|
||||
# total_error = numpy.sum(err, axis=1)
|
||||
|
||||
best = numpy.argmin(total_error)
|
||||
return voltages[best, 0]
|
||||
|
||||
return patterns
|
||||
|
||||
def sample(data: numpy.ndarray, step: int, lookahead_steps: int):
|
||||
dlen = len(data)
|
||||
data = numpy.concatenate([data, numpy.zeros(lookahead_steps)])
|
||||
data = numpy.concatenate([data, numpy.zeros(lookahead_steps)]).astype(
|
||||
numpy.float32)
|
||||
|
||||
voltage = -1.0
|
||||
position = -1.0
|
||||
|
||||
total_err = 0.0
|
||||
slowpath = 0
|
||||
slowpath_distance = 2047
|
||||
cnt = 0
|
||||
eta = ETA(total=1000)
|
||||
for i, val in enumerate(data[:dlen]):
|
||||
if i % int((dlen / 100)) == 0:
|
||||
print("%d%% complete" % (i * 100 / dlen))
|
||||
if (cnt % 2048) == 2047:
|
||||
slowpath = 12
|
||||
if i and i % int((dlen / 1000)) == 0:
|
||||
eta.print_status()
|
||||
|
||||
min_err = 1e9
|
||||
best_pattern = None
|
||||
voltages = lookahead_patterns(
|
||||
lookahead_steps, slowpath_distance, voltage)
|
||||
new_voltage = lookahead(step, position, data, i, voltages)
|
||||
|
||||
# TODO: double-check this
|
||||
slowpath_bits = max((cnt % 2048) + lookahead_steps - 2047, 0)
|
||||
for lh in lookahead_patterns(lookahead_steps, slowpath_bits):
|
||||
err = lookahead(step, position, voltage, data, i, lh)
|
||||
# print(err, lh)
|
||||
if err < min_err:
|
||||
min_err = err
|
||||
best_pattern = lh
|
||||
# "BEST: %f, %s --> %s" % (err, lh, lh[0]))
|
||||
|
||||
if slowpath:
|
||||
if slowpath == 12:
|
||||
yield OPCODES['slowpath']
|
||||
if slowpath_distance == 0:
|
||||
yield OPCODES['slowpath']
|
||||
cnt += 1
|
||||
elif slowpath_distance > 0:
|
||||
if new_voltage != voltage:
|
||||
yield OPCODES['tick_page1']
|
||||
cnt += 1
|
||||
slowpath -= 1
|
||||
elif best_pattern[0]:
|
||||
voltage = -voltage
|
||||
yield OPCODES['tick']
|
||||
cnt += 1
|
||||
else:
|
||||
yield OPCODES['notick_page1']
|
||||
cnt += 1
|
||||
else:
|
||||
yield OPCODES['notick_page2']
|
||||
cnt += 1
|
||||
|
||||
slowpath_distance -= 1
|
||||
if slowpath_distance == -12:
|
||||
# End of slowpath
|
||||
slowpath_distance = 2047
|
||||
|
||||
voltage = new_voltage
|
||||
position += (voltage - position) / step
|
||||
err = position - val
|
||||
err = (position - val) ** 2
|
||||
total_err += abs(err)
|
||||
# print("State: ", sp.voltage, sp.position, val)
|
||||
|
||||
for _ in range(cnt % 2048, 2047):
|
||||
yield OPCODES['notick_page1']
|
||||
yield OPCODES['exit']
|
||||
eta.done()
|
||||
print("Total error %f" % total_err)
|
||||
|
||||
|
||||
def preprocess(
|
||||
filename: str, target_sample_rate: int,
|
||||
normalize: float = 0.5) -> numpy.ndarray:
|
||||
data, _ = librosa.load(filename, sr=target_sample_rate, mono=True)
|
||||
|
||||
max_value = numpy.percentile(data, 90)
|
||||
data /= max_value
|
||||
data *= normalize
|
||||
|
||||
return data
|
||||
|
||||
def main(argv):
|
||||
serve_file = argv[1]
|
||||
step = int(argv[2])
|
||||
lookahead_steps = int(argv[3])
|
||||
out = argv[4]
|
||||
|
||||
data, sr = soundfile.read(serve_file)
|
||||
sample_rate = int(1024. * 1000 / 13)
|
||||
data = preprocess(serve_file, sample_rate)
|
||||
with open(out, "wb+") as f:
|
||||
for b in sample(data, step, lookahead_steps):
|
||||
f.write(bytes([b]))
|
||||
|
0
play_audio.py
Normal file → Executable file
0
play_audio.py
Normal file → Executable file
@ -1,27 +0,0 @@
|
||||
import sys
|
||||
import librosa
|
||||
import numpy
|
||||
import soundfile as sf
|
||||
|
||||
|
||||
def preprocess(
|
||||
filename: str, target_sample_rate: int,
|
||||
normalize: float = 0.5) -> numpy.ndarray:
|
||||
data, _ = librosa.load(filename, sr=target_sample_rate, mono=True)
|
||||
|
||||
max_value = numpy.percentile(data, 90)
|
||||
data /= max_value
|
||||
data *= normalize
|
||||
|
||||
return data
|
||||
|
||||
def main(argv):
|
||||
serve_file = argv[1]
|
||||
out = argv[2]
|
||||
sample_rate = int(1024. * 1000 / 13)
|
||||
|
||||
sf.write(out, preprocess(serve_file, sample_rate), sample_rate)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv)
|
Loading…
Reference in New Issue
Block a user