From 183781da60a1fb8aa42ab297a7a4dcc1398320ab Mon Sep 17 00:00:00 2001 From: kris Date: Fri, 22 Jul 2022 12:02:35 +0100 Subject: [PATCH] Tidy --- encode_audio.py | 188 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 128 insertions(+), 60 deletions(-) diff --git a/encode_audio.py b/encode_audio.py index 109aa47..00eb1d7 100755 --- a/encode_audio.py +++ b/encode_audio.py @@ -54,25 +54,6 @@ import opcodes_generated FRAME_SIZE = 2048 -def total_error(positions: numpy.ndarray, data: numpy.ndarray) -> numpy.ndarray: - """Computes the total squared error for speaker position matrix vs data.""" - return numpy.sum(numpy.square(positions - data), axis=-1) - - -@functools.lru_cache(None) -def frame_horizon(frame_offset: int, lookahead_steps: int): - """Optimize frame_offset when more than lookahead_steps from end of frame. - - Candidate opcodes for all values of frame_offset are equal, until the - end-of-frame opcode comes within our lookahead horizon. - """ - # TODO: This could be made tighter because a step is always at least 5 - # cycles towards lookahead_steps. - if frame_offset < (FRAME_SIZE - lookahead_steps): - return 0 - return frame_offset - - class Speaker: """Simulates the response of the Apple II speaker.""" @@ -113,11 +94,65 @@ class Speaker: self.b2 = b2 +def total_error(positions: numpy.ndarray, data: numpy.ndarray) -> numpy.ndarray: + """Computes the total squared error for speaker position matrix vs data.""" + return numpy.sum(numpy.square(positions - data), axis=-1) + + +@functools.lru_cache(None) +def frame_horizon(frame_offset: int, lookahead_steps: int): + """Optimize frame_offset when more than lookahead_steps from end of frame. + + Candidate opcodes for all values of frame_offset are equal, until the + end-of-frame opcode comes within our lookahead horizon. This avoids + needing to recompute many copies of the same candidate opcodes. + + TODO: this is a bit of a hack and we should be able to make the candidate + opcode selection itself smarter about avoiding duplicate work + """ + # TODO: This could be made tighter because a step is always at least 5 + # cycles towards lookahead_steps. + if frame_offset < (FRAME_SIZE - lookahead_steps): + return 0 + return frame_offset + + def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int, sample_rate: int): """Computes optimal sequence of player opcodes to reproduce audio data.""" + # At resonance freq the scale is about 22400 but we can only access about 7% + # of it across the frequency range. This is also the equilibrium speaker + # position when voltage is held constant. Normalize to this working + # range for convenience. + inv_scale = 22400 * 0.07759626164027278 # XXX + + # Speaker response parameters can be fitted in several ways. First, by + # recording audio of + # - a single speaker click (impulse response) + # - a frequency sweep over the entire audible spectrum + # + # Resonant frequency can be read off from the frequency spectrum. For my + # speaker there were two primary frequencies, at ~3875 and ~480Hz. + # Looking at the click waveform the higher frequency mode dominates at + # short time scales, and the lower frequency mode dominates at late + # times. Since we're interested in short timescale speaker control we + # can ignore the latter. + # + # Damping factor can be estimated by fitting against the speaker click + # waveform or the spectrum, e.g. by simulating a speaker click and then + # computing its spectrum. + # + # TODO: other Apple II speakers almost certainly have different response + # characteristics, but hopefully not too widely varying. + sp = Speaker(sample_rate, freq=3875, damping=-1210, scale=1 / inv_scale) + + # Starting speaker applied voltage. + voltage1 = voltage2 = 1.0 + + # last 2 speaker positions. + # XXX 0.0? + y1 = y2 = 1.0 - dlen = len(data) # Leave enough padding at the end to look ahead from the last data value, # and in case we schedule an end-of-frame opcode towards the end. # TODO: avoid temporarily doubling memory footprint to concatenate @@ -125,50 +160,64 @@ def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int, [data, numpy.zeros(max(lookahead_steps, opcodes.cycle_length( opcodes_generated.PlayerOps.TICK_00)), dtype=numpy.float32)])) - # At resonance freq the scale is about 22400 but we can only access about 7% - # of it across the frequency range. This is also the equilibrium speaker - # position when voltage is held constant. Normalize to this working - # range for convenience. - inv_scale = 22400 * 0.07759626164027278 # XXX + # index within input audio data + i = 0 + # Position in 2048-byte TCP frame + frame_offset = 0 - # Starting speaker applied voltage. - voltage1 = voltage2 = 1.0 - # last 2 speaker positions. - # XXX 0.0? - y1 = y2 = 1.0 + # Total squared error of audio output + total_err = 0.0 - sp = Speaker(sample_rate, freq=3875, damping=-1210, scale=1 / inv_scale) + # Keep track of how many large deviations we see from the target waveform + # as another measure of audio quality + clicks = 0 + # total squared error threshold to consider an operation as producing a + # click + click_threshold = 0.3 - total_err = 0.0 # Total squared error of audio output - frame_offset = 0 # Position in 2048-byte TCP frame - i = 0 # index within input data - eta = ETA(total=1000, min_ms_between_updates=0) - next_tick = 0 # Value of i at which we should next update eta - # Keep track of how many opcodes we schedule + # Keep track of how many opcodes we schedule, so we can print summary + # statistics at the end opcode_counts = collections.defaultdict(int) - clicks = 0 + # Always look ahead at least this many cycles. We pick a higher value + # when approaching end of frame, so we can maximize the min_lookahead_steps = lookahead_steps - while i < dlen // 1: - if i >= next_tick: + + # Progress tracker + # TODO: find a more adaptive ETA tracker, this one doesn't estimate well + # if the processing rate changes (which it does for us, since first few + # steps do extra work to precompute values that are cached for later + # steps) + eta = ETA(total=1000, min_ms_between_updates=0) + # Value of i at which we should next update eta + next_eta_tick = 0 + + while i < len(data): + if i >= next_eta_tick: eta.print_status() - next_tick = int(eta.i * dlen / 1000) + next_eta_tick = int(eta.i * len(data) / 1000) if frame_offset >= (FRAME_SIZE - 5): # XXX lookahead_steps = min_lookahead_steps + 130 # XXX parametrize else: lookahead_steps = min_lookahead_steps - # Compute all possible opcode sequences for this frame offset + # The EOF opcodes need to act as a matched pair, so if we're emitting + # the second one we need to pass in its predecessor last_opcode = opcode if frame_offset == FRAME_SIZE - 1 else None + + # Compute all possible opcode sequences we could emit starting at this + # frame offset next_candidate_opcodes, voltages, lookahead_steps = \ opcodes.candidate_opcodes( frame_horizon(frame_offset, lookahead_steps), lookahead_steps, last_opcode) + + # Simulate speaker trajectory for all of these candidate opcode + # sequences and pick the one that minimizes total error opcode_idx = lookahead.evolve_return_best( sp, y1, y2, voltage1, voltage2, voltage1 * voltages, data[i:i + lookahead_steps]) - opcode = next_candidate_opcodes[opcode_idx] opcode_length = opcodes.cycle_length(opcode) opcode_counts[opcode] += 1 @@ -182,36 +231,43 @@ def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int, assert all_positions.shape[0] == 1 assert all_positions.shape[1] == opcode_length + # Update to new speaker state voltage1 = opcode_voltages[0, -1] voltage2 = opcode_voltages[0, -2] y1 = all_positions[0, -1] y2 = all_positions[0, -2] + + # Track accumulated error between desired and actual speaker trajectory new_error = total_error( all_positions[0] * sp.scale, data[i:i + opcode_length]).item() total_err += new_error - if new_error > 0.3: + if new_error > click_threshold: clicks += 1 + # XXX print(frame_offset, i / sample_rate, opcode, new_error, numpy.mean(data[i:i + opcode_length])) + # Emit chosen operation and simulated audio samples for recording yield opcode, numpy.array( all_positions * sp.scale, dtype=numpy.float32).reshape(-1) + # Update input and output stream positions i += opcode_length frame_offset = (frame_offset + 1) % FRAME_SIZE - # Make sure we have at least 2k left in stream so player will do a - # complete read. + # Make sure we have at least 2k left in stream so the player will do a + # complete read of the last frame. # for _ in range(frame_offset % 2048, 2048): # yield opcodes.Opcode.EXIT eta.done() - print("Total error %f" % total_err) + # Print summary statistics + print("Total error %f" % total_err) + print("%d clicks" % clicks) print("Opcodes used:") for v, k in sorted(list(opcode_counts.items()), key=lambda kv: kv[1], reverse=True): print("%s: %d" % (v, k)) - print("%d clicks" % clicks) def preprocess( @@ -228,22 +284,34 @@ def preprocess( return data -def resample_output(output_buffer, input_audio, sample_rate, output_rate, - noise_output=False): - resampled_output = librosa.resample( - numpy.array(output_buffer, dtype=numpy.float32), - orig_sr=sample_rate, +def downsample_audio(simulated_audio, original_audio, input_rate, output_rate, + noise_output=False): + """Downscale the 1MHz simulated audio output suitable for writing as .wav + + :arg simulated_audio The simulated audio data to downsample + :arg original_audio The original audio data that was simulated + :arg input_rate Sample rate of input audio + :arg output_rate Desired sample rate of output audio + :arg noise_output Whether to also produce a noise waveform, i.e. difference + between input and output audio + + + """ + downsampled_output = librosa.resample( + numpy.array(simulated_audio, dtype=numpy.float32), + orig_sr=input_rate, target_sr=output_rate) - resampled_noise = None + downsampled_noise = None if noise_output: - noise_len = min(len(output_buffer), len(input_audio)) + noise_len = min(len(simulated_audio), len(original_audio)) resampled_noise = librosa.resample( - numpy.array(output_buffer[:noise_len] - input_audio[:noise_len]), - orig_sr=sample_rate, + numpy.array( + simulated_audio[:noise_len] - original_audio[:noise_len]), + orig_sr=input_rate, target_sr=output_rate) - return resampled_output, resampled_noise + return downsampled_output, downsampled_noise def main(): @@ -323,7 +391,7 @@ def main(): continue if (len(input_audio) - input_offset) < 1 * 1024 * 1024: continue - resampled_output_buffer, resampled_noise_buffer = resample_output( + resampled_output_buffer, resampled_noise_buffer = downsampled_output( output_buffer, input_audio[input_offset - len(output_buffer):], sample_rate, output_rate, bool(args.noise_output) ) @@ -337,7 +405,7 @@ def main(): output_buffer = [] if output_buffer: - resampled_output_buffer, resampled_noise_buffer = resample_output( + resampled_output_buffer, resampled_noise_buffer = downsampled_output( output_buffer, input_audio[input_offset - len(output_buffer):], sample_rate, output_rate, bool(args.noise_output) )