Tidy
This commit is contained in:
parent
cf7716d068
commit
183781da60
188
encode_audio.py
188
encode_audio.py
|
@ -54,25 +54,6 @@ import opcodes_generated
|
|||
FRAME_SIZE = 2048
|
||||
|
||||
|
||||
def total_error(positions: numpy.ndarray, data: numpy.ndarray) -> numpy.ndarray:
|
||||
"""Computes the total squared error for speaker position matrix vs data."""
|
||||
return numpy.sum(numpy.square(positions - data), axis=-1)
|
||||
|
||||
|
||||
@functools.lru_cache(None)
|
||||
def frame_horizon(frame_offset: int, lookahead_steps: int):
|
||||
"""Optimize frame_offset when more than lookahead_steps from end of frame.
|
||||
|
||||
Candidate opcodes for all values of frame_offset are equal, until the
|
||||
end-of-frame opcode comes within our lookahead horizon.
|
||||
"""
|
||||
# TODO: This could be made tighter because a step is always at least 5
|
||||
# cycles towards lookahead_steps.
|
||||
if frame_offset < (FRAME_SIZE - lookahead_steps):
|
||||
return 0
|
||||
return frame_offset
|
||||
|
||||
|
||||
class Speaker:
|
||||
"""Simulates the response of the Apple II speaker."""
|
||||
|
||||
|
@ -113,11 +94,65 @@ class Speaker:
|
|||
self.b2 = b2
|
||||
|
||||
|
||||
def total_error(positions: numpy.ndarray, data: numpy.ndarray) -> numpy.ndarray:
|
||||
"""Computes the total squared error for speaker position matrix vs data."""
|
||||
return numpy.sum(numpy.square(positions - data), axis=-1)
|
||||
|
||||
|
||||
@functools.lru_cache(None)
|
||||
def frame_horizon(frame_offset: int, lookahead_steps: int):
|
||||
"""Optimize frame_offset when more than lookahead_steps from end of frame.
|
||||
|
||||
Candidate opcodes for all values of frame_offset are equal, until the
|
||||
end-of-frame opcode comes within our lookahead horizon. This avoids
|
||||
needing to recompute many copies of the same candidate opcodes.
|
||||
|
||||
TODO: this is a bit of a hack and we should be able to make the candidate
|
||||
opcode selection itself smarter about avoiding duplicate work
|
||||
"""
|
||||
# TODO: This could be made tighter because a step is always at least 5
|
||||
# cycles towards lookahead_steps.
|
||||
if frame_offset < (FRAME_SIZE - lookahead_steps):
|
||||
return 0
|
||||
return frame_offset
|
||||
|
||||
|
||||
def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int,
|
||||
sample_rate: int):
|
||||
"""Computes optimal sequence of player opcodes to reproduce audio data."""
|
||||
# At resonance freq the scale is about 22400 but we can only access about 7%
|
||||
# of it across the frequency range. This is also the equilibrium speaker
|
||||
# position when voltage is held constant. Normalize to this working
|
||||
# range for convenience.
|
||||
inv_scale = 22400 * 0.07759626164027278 # XXX
|
||||
|
||||
# Speaker response parameters can be fitted in several ways. First, by
|
||||
# recording audio of
|
||||
# - a single speaker click (impulse response)
|
||||
# - a frequency sweep over the entire audible spectrum
|
||||
#
|
||||
# Resonant frequency can be read off from the frequency spectrum. For my
|
||||
# speaker there were two primary frequencies, at ~3875 and ~480Hz.
|
||||
# Looking at the click waveform the higher frequency mode dominates at
|
||||
# short time scales, and the lower frequency mode dominates at late
|
||||
# times. Since we're interested in short timescale speaker control we
|
||||
# can ignore the latter.
|
||||
#
|
||||
# Damping factor can be estimated by fitting against the speaker click
|
||||
# waveform or the spectrum, e.g. by simulating a speaker click and then
|
||||
# computing its spectrum.
|
||||
#
|
||||
# TODO: other Apple II speakers almost certainly have different response
|
||||
# characteristics, but hopefully not too widely varying.
|
||||
sp = Speaker(sample_rate, freq=3875, damping=-1210, scale=1 / inv_scale)
|
||||
|
||||
# Starting speaker applied voltage.
|
||||
voltage1 = voltage2 = 1.0
|
||||
|
||||
# last 2 speaker positions.
|
||||
# XXX 0.0?
|
||||
y1 = y2 = 1.0
|
||||
|
||||
dlen = len(data)
|
||||
# Leave enough padding at the end to look ahead from the last data value,
|
||||
# and in case we schedule an end-of-frame opcode towards the end.
|
||||
# TODO: avoid temporarily doubling memory footprint to concatenate
|
||||
|
@ -125,50 +160,64 @@ def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int,
|
|||
[data, numpy.zeros(max(lookahead_steps, opcodes.cycle_length(
|
||||
opcodes_generated.PlayerOps.TICK_00)), dtype=numpy.float32)]))
|
||||
|
||||
# At resonance freq the scale is about 22400 but we can only access about 7%
|
||||
# of it across the frequency range. This is also the equilibrium speaker
|
||||
# position when voltage is held constant. Normalize to this working
|
||||
# range for convenience.
|
||||
inv_scale = 22400 * 0.07759626164027278 # XXX
|
||||
# index within input audio data
|
||||
i = 0
|
||||
# Position in 2048-byte TCP frame
|
||||
frame_offset = 0
|
||||
|
||||
# Starting speaker applied voltage.
|
||||
voltage1 = voltage2 = 1.0
|
||||
# last 2 speaker positions.
|
||||
# XXX 0.0?
|
||||
y1 = y2 = 1.0
|
||||
# Total squared error of audio output
|
||||
total_err = 0.0
|
||||
|
||||
sp = Speaker(sample_rate, freq=3875, damping=-1210, scale=1 / inv_scale)
|
||||
# Keep track of how many large deviations we see from the target waveform
|
||||
# as another measure of audio quality
|
||||
clicks = 0
|
||||
# total squared error threshold to consider an operation as producing a
|
||||
# click
|
||||
click_threshold = 0.3
|
||||
|
||||
total_err = 0.0 # Total squared error of audio output
|
||||
frame_offset = 0 # Position in 2048-byte TCP frame
|
||||
i = 0 # index within input data
|
||||
eta = ETA(total=1000, min_ms_between_updates=0)
|
||||
next_tick = 0 # Value of i at which we should next update eta
|
||||
# Keep track of how many opcodes we schedule
|
||||
# Keep track of how many opcodes we schedule, so we can print summary
|
||||
# statistics at the end
|
||||
opcode_counts = collections.defaultdict(int)
|
||||
|
||||
clicks = 0
|
||||
# Always look ahead at least this many cycles. We pick a higher value
|
||||
# when approaching end of frame, so we can maximize the
|
||||
min_lookahead_steps = lookahead_steps
|
||||
while i < dlen // 1:
|
||||
if i >= next_tick:
|
||||
|
||||
# Progress tracker
|
||||
# TODO: find a more adaptive ETA tracker, this one doesn't estimate well
|
||||
# if the processing rate changes (which it does for us, since first few
|
||||
# steps do extra work to precompute values that are cached for later
|
||||
# steps)
|
||||
eta = ETA(total=1000, min_ms_between_updates=0)
|
||||
# Value of i at which we should next update eta
|
||||
next_eta_tick = 0
|
||||
|
||||
while i < len(data):
|
||||
if i >= next_eta_tick:
|
||||
eta.print_status()
|
||||
next_tick = int(eta.i * dlen / 1000)
|
||||
next_eta_tick = int(eta.i * len(data) / 1000)
|
||||
|
||||
if frame_offset >= (FRAME_SIZE - 5): # XXX
|
||||
lookahead_steps = min_lookahead_steps + 130 # XXX parametrize
|
||||
else:
|
||||
lookahead_steps = min_lookahead_steps
|
||||
|
||||
# Compute all possible opcode sequences for this frame offset
|
||||
# The EOF opcodes need to act as a matched pair, so if we're emitting
|
||||
# the second one we need to pass in its predecessor
|
||||
last_opcode = opcode if frame_offset == FRAME_SIZE - 1 else None
|
||||
|
||||
# Compute all possible opcode sequences we could emit starting at this
|
||||
# frame offset
|
||||
next_candidate_opcodes, voltages, lookahead_steps = \
|
||||
opcodes.candidate_opcodes(
|
||||
frame_horizon(frame_offset, lookahead_steps),
|
||||
lookahead_steps, last_opcode)
|
||||
|
||||
# Simulate speaker trajectory for all of these candidate opcode
|
||||
# sequences and pick the one that minimizes total error
|
||||
opcode_idx = lookahead.evolve_return_best(
|
||||
sp, y1, y2, voltage1, voltage2, voltage1 * voltages,
|
||||
data[i:i + lookahead_steps])
|
||||
|
||||
opcode = next_candidate_opcodes[opcode_idx]
|
||||
opcode_length = opcodes.cycle_length(opcode)
|
||||
opcode_counts[opcode] += 1
|
||||
|
@ -182,36 +231,43 @@ def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int,
|
|||
assert all_positions.shape[0] == 1
|
||||
assert all_positions.shape[1] == opcode_length
|
||||
|
||||
# Update to new speaker state
|
||||
voltage1 = opcode_voltages[0, -1]
|
||||
voltage2 = opcode_voltages[0, -2]
|
||||
y1 = all_positions[0, -1]
|
||||
y2 = all_positions[0, -2]
|
||||
|
||||
# Track accumulated error between desired and actual speaker trajectory
|
||||
new_error = total_error(
|
||||
all_positions[0] * sp.scale, data[i:i + opcode_length]).item()
|
||||
total_err += new_error
|
||||
if new_error > 0.3:
|
||||
if new_error > click_threshold:
|
||||
clicks += 1
|
||||
# XXX
|
||||
print(frame_offset, i / sample_rate, opcode, new_error,
|
||||
numpy.mean(data[i:i + opcode_length]))
|
||||
|
||||
# Emit chosen operation and simulated audio samples for recording
|
||||
yield opcode, numpy.array(
|
||||
all_positions * sp.scale, dtype=numpy.float32).reshape(-1)
|
||||
|
||||
# Update input and output stream positions
|
||||
i += opcode_length
|
||||
frame_offset = (frame_offset + 1) % FRAME_SIZE
|
||||
|
||||
# Make sure we have at least 2k left in stream so player will do a
|
||||
# complete read.
|
||||
# Make sure we have at least 2k left in stream so the player will do a
|
||||
# complete read of the last frame.
|
||||
# for _ in range(frame_offset % 2048, 2048):
|
||||
# yield opcodes.Opcode.EXIT
|
||||
eta.done()
|
||||
print("Total error %f" % total_err)
|
||||
|
||||
# Print summary statistics
|
||||
print("Total error %f" % total_err)
|
||||
print("%d clicks" % clicks)
|
||||
print("Opcodes used:")
|
||||
for v, k in sorted(list(opcode_counts.items()), key=lambda kv: kv[1],
|
||||
reverse=True):
|
||||
print("%s: %d" % (v, k))
|
||||
print("%d clicks" % clicks)
|
||||
|
||||
|
||||
def preprocess(
|
||||
|
@ -228,22 +284,34 @@ def preprocess(
|
|||
return data
|
||||
|
||||
|
||||
def resample_output(output_buffer, input_audio, sample_rate, output_rate,
|
||||
noise_output=False):
|
||||
resampled_output = librosa.resample(
|
||||
numpy.array(output_buffer, dtype=numpy.float32),
|
||||
orig_sr=sample_rate,
|
||||
def downsample_audio(simulated_audio, original_audio, input_rate, output_rate,
|
||||
noise_output=False):
|
||||
"""Downscale the 1MHz simulated audio output suitable for writing as .wav
|
||||
|
||||
:arg simulated_audio The simulated audio data to downsample
|
||||
:arg original_audio The original audio data that was simulated
|
||||
:arg input_rate Sample rate of input audio
|
||||
:arg output_rate Desired sample rate of output audio
|
||||
:arg noise_output Whether to also produce a noise waveform, i.e. difference
|
||||
between input and output audio
|
||||
|
||||
|
||||
"""
|
||||
downsampled_output = librosa.resample(
|
||||
numpy.array(simulated_audio, dtype=numpy.float32),
|
||||
orig_sr=input_rate,
|
||||
target_sr=output_rate)
|
||||
|
||||
resampled_noise = None
|
||||
downsampled_noise = None
|
||||
if noise_output:
|
||||
noise_len = min(len(output_buffer), len(input_audio))
|
||||
noise_len = min(len(simulated_audio), len(original_audio))
|
||||
resampled_noise = librosa.resample(
|
||||
numpy.array(output_buffer[:noise_len] - input_audio[:noise_len]),
|
||||
orig_sr=sample_rate,
|
||||
numpy.array(
|
||||
simulated_audio[:noise_len] - original_audio[:noise_len]),
|
||||
orig_sr=input_rate,
|
||||
target_sr=output_rate)
|
||||
|
||||
return resampled_output, resampled_noise
|
||||
return downsampled_output, downsampled_noise
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -323,7 +391,7 @@ def main():
|
|||
continue
|
||||
if (len(input_audio) - input_offset) < 1 * 1024 * 1024:
|
||||
continue
|
||||
resampled_output_buffer, resampled_noise_buffer = resample_output(
|
||||
resampled_output_buffer, resampled_noise_buffer = downsampled_output(
|
||||
output_buffer, input_audio[input_offset - len(output_buffer):],
|
||||
sample_rate, output_rate, bool(args.noise_output)
|
||||
)
|
||||
|
@ -337,7 +405,7 @@ def main():
|
|||
output_buffer = []
|
||||
|
||||
if output_buffer:
|
||||
resampled_output_buffer, resampled_noise_buffer = resample_output(
|
||||
resampled_output_buffer, resampled_noise_buffer = downsampled_output(
|
||||
output_buffer, input_audio[input_offset - len(output_buffer):],
|
||||
sample_rate, output_rate, bool(args.noise_output)
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue