- parametrize frame size. 4KB has too much buffering though
- Correct speaker model to apply coefficients for a square wave impulse - Parametrize speaker scaling factor - Flush wav file output after 1MB
This commit is contained in:
parent
e79ed985bc
commit
f64b1a6e2c
106
encode_audio.py
106
encode_audio.py
|
@ -40,9 +40,16 @@ import opcodes_generated
|
|||
|
||||
def total_error(positions: numpy.ndarray, data: numpy.ndarray) -> numpy.ndarray:
|
||||
"""Computes the total squared error for speaker position matrix vs data."""
|
||||
# Make sure we handle gracefully when the opcode would take us beyond
|
||||
# the end of data
|
||||
# XXX
|
||||
# min_len = min(len(positions), len(data))
|
||||
return numpy.sum(numpy.square(positions - data), axis=-1)
|
||||
|
||||
|
||||
FRAME_SIZE = 2048
|
||||
|
||||
|
||||
@functools.lru_cache(None)
|
||||
def frame_horizon(frame_offset: int, lookahead_steps: int):
|
||||
"""Optimize frame_offset when more than lookahead_steps from end of frame.
|
||||
|
@ -52,13 +59,14 @@ def frame_horizon(frame_offset: int, lookahead_steps: int):
|
|||
"""
|
||||
# TODO: This could be made tighter because a step is always at least 5
|
||||
# cycles towards lookahead_steps.
|
||||
if frame_offset < (2047 - lookahead_steps):
|
||||
if frame_offset < (FRAME_SIZE - lookahead_steps):
|
||||
return 0
|
||||
return frame_offset
|
||||
|
||||
|
||||
class Speaker:
|
||||
def __init__(self, sample_rate: float, freq: float, damping: float):
|
||||
def __init__(self, sample_rate: float, freq: float, damping: float,
|
||||
scale: float):
|
||||
self.sample_rate = sample_rate
|
||||
self.freq = freq
|
||||
self.damping = damping
|
||||
|
@ -72,19 +80,23 @@ class Speaker:
|
|||
c1 = 2 * e * numpy.cos(w)
|
||||
|
||||
c2 = e * e
|
||||
t0 = (1 - 2 * e * numpy.cos(w) + e * e) / (d * d + w * w)
|
||||
t = d * d + w * w - numpy.pi * numpy.pi
|
||||
t1 = (1 + 2 * e * numpy.cos(w) + e * e) / numpy.sqrt(t * t + 4 * d * d *
|
||||
numpy.pi * numpy.pi)
|
||||
b2 = (t1 - t0) / (t1 + t0)
|
||||
b1 = b2 * dt * dt * (t0 + t1) / 2
|
||||
# t0 = (1 - 2 * e * numpy.cos(w) + e * e) / (d * d + w * w)
|
||||
# t = d * d + w * w - numpy.pi * numpy.pi
|
||||
# t1 = (1 + 2 * e * numpy.cos(w) + e * e) / numpy.sqrt(t * t + 4 * d * d *
|
||||
# numpy.pi * numpy.pi)
|
||||
# b2 = (t1 - t0) / (t1 + t0)
|
||||
# b1 = b2 * dt * dt * (t0 + t1) / 2
|
||||
|
||||
# Square wave impulse
|
||||
b2 = 0.0
|
||||
b1 = 1.0
|
||||
|
||||
self.c1 = c1
|
||||
self.c2 = c2
|
||||
self.b1 = b1
|
||||
self.b2 = b2
|
||||
|
||||
self.scale = numpy.float64(1 / 800) # TODO: analytic expression
|
||||
self.scale = numpy.float64(scale) # TODO: analytic expression
|
||||
|
||||
|
||||
def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int,
|
||||
|
@ -99,14 +111,25 @@ def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int,
|
|||
[data, numpy.zeros(max(lookahead_steps, opcodes.cycle_length(
|
||||
opcodes_generated.PlayerOps.TICK_00)), dtype=numpy.float32)]))
|
||||
|
||||
# At resonance freq the scale is about 22400 but we can only access about 7%
|
||||
# of it across the frequency range. This is also the equilibrium speaker
|
||||
# position when voltage is held constant. Normalize to this working
|
||||
# range for convenience.
|
||||
inv_scale = 22400 * 0.07759626164027278 # XXX
|
||||
|
||||
# inv_scale = 15000 * 0.1102744481718292
|
||||
#
|
||||
# inv_scale = 115954.98423621713
|
||||
# Starting speaker applied voltage.
|
||||
voltage1 = voltage2 = 1.0
|
||||
# last 2 speaker positions
|
||||
# last 2 speaker positions.
|
||||
y1 = y2 = 1.0
|
||||
|
||||
toggles = 0
|
||||
|
||||
sp = Speaker(sample_rate, freq=3875, damping=-1210)
|
||||
sp = Speaker(sample_rate, freq=3875, damping=-1210, scale=1 / inv_scale)
|
||||
# sp = Speaker(sample_rate, freq=3968, damping=-1800, scale=1 / inv_scale)
|
||||
# sp = Speaker(sample_rate, freq=475, damping=-210, scale=1 / inv_scale)
|
||||
|
||||
total_err = 0.0 # Total squared error of audio output
|
||||
frame_offset = 0 # Position in 2048-byte TCP frame
|
||||
|
@ -118,21 +141,35 @@ def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int,
|
|||
|
||||
clicks = 0
|
||||
min_lookahead_steps = lookahead_steps
|
||||
# next_step = sample_rate
|
||||
|
||||
# data = (numpy.arange(sample_rate) / sample_rate - 0.5).astype(
|
||||
# numpy.float32)
|
||||
|
||||
# dlen = len(data)
|
||||
while i < dlen // 1:
|
||||
# if i > next_step:
|
||||
# next_step += sample_rate
|
||||
# inv_scale += 100
|
||||
# print("XXX scale %d" % inv_scale)
|
||||
# sp = Speaker(sample_rate, freq=3875, damping=-1210,
|
||||
# scale=1 / inv_scale)
|
||||
|
||||
if i >= next_tick:
|
||||
eta.print_status()
|
||||
next_tick = int(eta.i * dlen / 1000)
|
||||
|
||||
if frame_offset >= 2043: # XXX
|
||||
if frame_offset >= (FRAME_SIZE - 5): # XXX
|
||||
lookahead_steps = min_lookahead_steps + 130 # XXX parametrize
|
||||
else:
|
||||
lookahead_steps = min_lookahead_steps
|
||||
|
||||
# Compute all possible opcode sequences for this frame offset
|
||||
last_opcode = opcode if frame_offset == FRAME_SIZE - 1 else None
|
||||
next_candidate_opcodes, voltages, lookahead_steps = \
|
||||
opcodes.candidate_opcodes(
|
||||
frame_horizon(frame_offset, lookahead_steps),
|
||||
lookahead_steps, opcode if frame_offset == 2047 else None)
|
||||
lookahead_steps, last_opcode)
|
||||
opcode_idx = lookahead.evolve_return_best(
|
||||
sp, y1, y2, voltage1, voltage2, voltage1 * voltages,
|
||||
data[i:i + lookahead_steps])
|
||||
|
@ -165,14 +202,14 @@ def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int,
|
|||
|
||||
# print(frame_offset, i / sample_rate, opcode)
|
||||
# for v in all_positions[0]:
|
||||
# yield
|
||||
# # print(v * sp.scale)
|
||||
# print(v * sp.scale)
|
||||
# if frame_offset == 2047:
|
||||
# print(opcode)
|
||||
yield opcode, (all_positions * sp.scale).astype(numpy.float32)
|
||||
yield opcode, numpy.array(
|
||||
all_positions * sp.scale, dtype=numpy.float32).reshape(-1)
|
||||
|
||||
i += opcode_length
|
||||
frame_offset = (frame_offset + 1) % 2048
|
||||
frame_offset = (frame_offset + 1) % FRAME_SIZE
|
||||
|
||||
# Make sure we have at least 2k left in stream so player will do a
|
||||
# complete read.
|
||||
|
@ -197,6 +234,14 @@ def preprocess(
|
|||
|
||||
data, _ = librosa.load(filename, sr=target_sample_rate, mono=True)
|
||||
|
||||
# data = []
|
||||
# freq = 926
|
||||
# data.extend(numpy.sin(numpy.arange(target_sample_rate * 1) * (
|
||||
# 2 * numpy.pi / (target_sample_rate / freq))).astype(
|
||||
# numpy.float32))
|
||||
# # freq *= 1.05
|
||||
# data = numpy.array(data, dtype=numpy.float32)
|
||||
|
||||
max_value = numpy.percentile(data, normalization_percentile)
|
||||
data /= max_value
|
||||
data *= normalize
|
||||
|
@ -206,10 +251,15 @@ def preprocess(
|
|||
|
||||
def resample_output(output_buffer, input_audio, sample_rate, output_rate,
|
||||
noise_output=False):
|
||||
resampled_output = librosa.resample(
|
||||
numpy.array(output_buffer, dtype=numpy.float32),
|
||||
orig_sr=sample_rate,
|
||||
target_sr=output_rate)
|
||||
try:
|
||||
resampled_output = librosa.resample(
|
||||
numpy.array(output_buffer, dtype=numpy.float32),
|
||||
orig_sr=sample_rate,
|
||||
target_sr=output_rate)
|
||||
except:
|
||||
for i in output_buffer:
|
||||
print(i)
|
||||
raise
|
||||
|
||||
resampled_noise = None
|
||||
if noise_output:
|
||||
|
@ -238,7 +288,7 @@ def main():
|
|||
parser.add_argument("--lookahead_cycles", type=int,
|
||||
help="Number of clock cycles to look ahead in audio "
|
||||
"stream.")
|
||||
parser.add_argument("--normalization", default=1.0, type=float,
|
||||
parser.add_argument("--normalization", default=0.8, type=float,
|
||||
help="Overall multiplier to rescale input audio "
|
||||
"values.")
|
||||
parser.add_argument("--norm_percentile", default=100,
|
||||
|
@ -279,7 +329,7 @@ def main():
|
|||
# We're not creating a file but still need a context
|
||||
noise_context = contextlib.nullcontext
|
||||
|
||||
with wav_context as wav_f, noise_context as noise_f, opcode_context\
|
||||
with wav_context as wav_f, noise_context as noise_f, opcode_context \
|
||||
as opcode_f:
|
||||
for idx, sample_data in enumerate(audio_bytestream(
|
||||
input_audio, args.step_size, args.lookahead_cycles,
|
||||
|
@ -293,12 +343,12 @@ def main():
|
|||
|
||||
# TODO: don't bother computing if we're not writing wavs
|
||||
|
||||
# Keep accumulating as long as we have <10MB in the buffer, or are
|
||||
# within 10MB from the end. This ensures we have enough samples to
|
||||
# Keep accumulating as long as we have <1MB in the buffer, or are
|
||||
# within 1MB from the end. This ensures we have enough samples to
|
||||
# resample including the last (partial) buffer
|
||||
if len(output_buffer) < 10 * 1024 * 1024:
|
||||
if len(output_buffer) < 1 * 1024 * 1024:
|
||||
continue
|
||||
if (len(input_audio) - input_offset) < 10 * 1024 * 1024:
|
||||
if (len(input_audio) - input_offset) < 1 * 1024 * 1024:
|
||||
continue
|
||||
resampled_output_buffer, resampled_noise_buffer = resample_output(
|
||||
output_buffer, input_audio[input_offset - len(output_buffer):],
|
||||
|
@ -306,8 +356,10 @@ def main():
|
|||
)
|
||||
if args.wav_output:
|
||||
wav_f.write(resampled_output_buffer)
|
||||
wav_f.flush()
|
||||
if args.noise_output:
|
||||
noise_f.write(resampled_noise_buffer)
|
||||
noise_f.flush()
|
||||
|
||||
output_buffer = []
|
||||
|
||||
|
|
13
opcodes.py
13
opcodes.py
|
@ -19,6 +19,8 @@ def voltage_schedule(op: player_op.PlayerOp) -> numpy.ndarray:
|
|||
"""Returns the 65C02 applied voltage schedule of a player opcode."""
|
||||
return op.toggles
|
||||
|
||||
FRAME_SIZE = 2048
|
||||
|
||||
|
||||
#@functools.lru_cache(None)
|
||||
def opcode_choices(
|
||||
|
@ -30,15 +32,14 @@ def opcode_choices(
|
|||
good results, we'll pick the one with the longest cycle count to reduce the
|
||||
stream bitrate.
|
||||
"""
|
||||
if frame_offset == 2046:
|
||||
if frame_offset == FRAME_SIZE - 2:
|
||||
return opcodes_generated.EOF_STAGE_1_OPS
|
||||
if frame_offset == 2047:
|
||||
if frame_offset == FRAME_SIZE - 1:
|
||||
return opcodes_generated.EOF_STAGE_2_3_OPS[eof_stage_1_op]
|
||||
|
||||
return sorted(
|
||||
list(opcodes_generated.AUDIO_OPS), key=cycle_length, reverse=True)
|
||||
|
||||
|
||||
#@functools.lru_cache(None)
|
||||
def opcode_lookahead(
|
||||
frame_offset: int,
|
||||
|
@ -56,13 +57,13 @@ def opcode_lookahead(
|
|||
ops.append((op,))
|
||||
else:
|
||||
# XXX check this
|
||||
if frame_offset == 2046 and eof_stage_1_op is None:
|
||||
if frame_offset == FRAME_SIZE - 2 and eof_stage_1_op is None:
|
||||
temp_op = op
|
||||
else:
|
||||
temp_op = eof_stage_1_op
|
||||
|
||||
for res in opcode_lookahead(
|
||||
(frame_offset + 1) % 2048,
|
||||
(frame_offset + 1) % FRAME_SIZE,
|
||||
lookahead_cycles - cycle_length(op), temp_op):
|
||||
ops.append((op,) + res)
|
||||
return tuple(ops) # TODO: fix return type
|
||||
|
@ -110,7 +111,7 @@ def candidate_opcodes(
|
|||
pruned_cycles = []
|
||||
for ops in opcodes:
|
||||
cycles = cycle_lookahead(ops, lookahead_cycles)
|
||||
if frame_offset == 2046 and cycles in seen_cycles:
|
||||
if cycles in seen_cycles:
|
||||
# print("Dropping", ops, cycles, seen_cycles[cycles])
|
||||
continue
|
||||
seen_cycles[cycles] = ops
|
||||
|
|
Loading…
Reference in New Issue