2020-08-11 22:26:55 +00:00
|
|
|
#!/usr/bin/env python3
|
2020-08-11 22:23:33 +00:00
|
|
|
# Delta modulation audio encoder.
|
|
|
|
#
|
2020-08-16 22:15:30 +00:00
|
|
|
# Simulates the Apple II speaker at 1MHz (i.e. cycle-level) resolution,
|
|
|
|
# by modeling it as an RC circuit with given time constant. In order to
|
|
|
|
# reproduce a target audio waveform, we upscale it to 1MHz sample rate,
|
|
|
|
# and compute the sequence of player opcodes to best reproduce this waveform.
|
2020-08-11 22:23:33 +00:00
|
|
|
#
|
2020-08-16 22:15:30 +00:00
|
|
|
# Since the player opcodes are chosen to allow ticking the speaker during any
|
|
|
|
# given clock cycle (though with some limits on the minimum time
|
|
|
|
# between ticks), this means that we are able to control the Apple II speaker
|
|
|
|
# with cycle-level precision, which results in high audio fidelity with low
|
|
|
|
# noise.
|
|
|
|
#
|
|
|
|
# To further optimize the audio quality we look ahead some defined number of
|
|
|
|
# cycles and choose a speaker trajectory that minimizes errors over this range.
|
|
|
|
# e.g. this allows us to anticipate large amplitude changes by pre-moving
|
2020-08-11 22:23:33 +00:00
|
|
|
# the speaker to better approximate them.
|
|
|
|
#
|
2020-08-16 22:15:30 +00:00
|
|
|
# This also needs to take into account scheduling the "slow path" opcode every
|
|
|
|
# 2048 output bytes, where the Apple II will manage the TCP socket buffer while
|
|
|
|
# ticking the speaker at a regular cadence of 13 cycles to keep it in a
|
|
|
|
# net-neutral position. When looking ahead we can also (partially)
|
|
|
|
# compensate for this "dead" period by pre-positioning.
|
2020-08-11 22:23:33 +00:00
|
|
|
|
2020-08-16 22:15:30 +00:00
|
|
|
import collections
|
2020-08-10 20:03:12 +00:00
|
|
|
import sys
|
2020-08-11 22:23:33 +00:00
|
|
|
import librosa
|
2020-08-10 20:03:12 +00:00
|
|
|
import numpy
|
2020-08-11 22:23:33 +00:00
|
|
|
from eta import ETA
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2020-08-13 21:08:50 +00:00
|
|
|
import opcodes
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2020-08-16 22:15:30 +00:00
|
|
|
# TODO: add flags to parametrize options
|
2020-08-10 20:03:12 +00:00
|
|
|
|
|
|
|
|
2020-08-11 22:23:33 +00:00
|
|
|
def lookahead(step_size: int, initial_position: float, data: numpy.ndarray,
|
2020-08-16 22:15:30 +00:00
|
|
|
offset: int, voltages: numpy.ndarray):
|
|
|
|
"""Evaluate effects of multiple potential opcode sequences and pick best.
|
|
|
|
|
|
|
|
We simulate the speaker voltage trajectory resulting from applying multiple
|
|
|
|
voltage profiles, compute the resulting squared error relative to the
|
|
|
|
target waveform, and pick the best one.
|
|
|
|
|
|
|
|
We use numpy to vectorize the computation since it has better scaling
|
|
|
|
performance with more opcode choices, although also has a larger fixed
|
|
|
|
overhead.
|
|
|
|
"""
|
2020-08-13 21:08:50 +00:00
|
|
|
positions = numpy.empty((voltages.shape[0], voltages.shape[1] + 1),
|
|
|
|
dtype=numpy.float32)
|
|
|
|
positions[:, 0] = initial_position
|
|
|
|
|
2020-08-11 22:23:33 +00:00
|
|
|
target_val = data[offset:offset + voltages.shape[1]]
|
2020-08-16 22:15:30 +00:00
|
|
|
scaled_voltages = voltages / step_size
|
2020-08-24 20:27:24 +00:00
|
|
|
position_scale = (1 - 1 / step_size)
|
2020-08-11 22:23:33 +00:00
|
|
|
for i in range(0, voltages.shape[1]):
|
2020-08-16 22:15:30 +00:00
|
|
|
positions[:, i + 1] = (
|
2020-08-24 20:27:24 +00:00
|
|
|
scaled_voltages[:, i] + positions[:, i] * position_scale)
|
2020-08-16 22:15:30 +00:00
|
|
|
err = positions[:, 1:] - target_val
|
2020-08-13 21:08:50 +00:00
|
|
|
total_error = numpy.sum(numpy.power(err, 2), axis=1)
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2020-08-11 22:23:33 +00:00
|
|
|
best = numpy.argmin(total_error)
|
2020-08-13 21:08:50 +00:00
|
|
|
return best
|
2020-08-10 20:03:12 +00:00
|
|
|
|
|
|
|
|
2020-08-16 22:15:30 +00:00
|
|
|
# TODO: share implementation with lookahead
|
2020-08-13 21:08:50 +00:00
|
|
|
def evolve(opcode: opcodes.Opcode, starting_position, starting_voltage,
|
|
|
|
step_size, data, starting_idx):
|
2020-08-16 22:15:30 +00:00
|
|
|
"""Apply the effects of playing a single opcode to completion.
|
|
|
|
|
|
|
|
Returns new state.
|
|
|
|
"""
|
|
|
|
|
2020-08-13 21:08:50 +00:00
|
|
|
opcode_length = opcodes.cycle_length(opcode)
|
2020-08-16 22:15:30 +00:00
|
|
|
voltages = starting_voltage * opcodes.VOLTAGE_SCHEDULE[opcode]
|
2020-08-13 21:08:50 +00:00
|
|
|
position = starting_position
|
|
|
|
total_err = 0.0
|
|
|
|
v = starting_voltage
|
2020-08-24 20:27:24 +00:00
|
|
|
last_v = v
|
|
|
|
num_flips = 0
|
2020-08-13 21:08:50 +00:00
|
|
|
for i, v in enumerate(voltages):
|
2020-08-24 20:27:24 +00:00
|
|
|
if v != last_v:
|
|
|
|
num_flips += 1
|
|
|
|
last_v = v
|
2020-08-13 21:08:50 +00:00
|
|
|
position += (v - position) / step_size
|
|
|
|
err = position - data[starting_idx + i]
|
|
|
|
total_err += err ** 2
|
2020-08-24 20:27:24 +00:00
|
|
|
return position, v, total_err, starting_idx + opcode_length, num_flips
|
2020-08-13 21:08:50 +00:00
|
|
|
|
2020-08-16 22:15:30 +00:00
|
|
|
|
|
|
|
def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int):
|
|
|
|
"""Computes optimal sequence of player opcodes to reproduce audio data."""
|
|
|
|
|
2020-08-10 20:03:12 +00:00
|
|
|
dlen = len(data)
|
2020-08-11 22:23:33 +00:00
|
|
|
data = numpy.concatenate([data, numpy.zeros(lookahead_steps)]).astype(
|
|
|
|
numpy.float32)
|
2020-08-10 20:03:12 +00:00
|
|
|
|
|
|
|
voltage = -1.0
|
|
|
|
position = -1.0
|
|
|
|
|
|
|
|
total_err = 0.0
|
2020-08-13 21:08:50 +00:00
|
|
|
frame_offset = 0
|
2020-08-11 22:23:33 +00:00
|
|
|
eta = ETA(total=1000)
|
2020-08-13 21:08:50 +00:00
|
|
|
i = 0
|
|
|
|
last_updated = 0
|
2020-08-16 22:15:30 +00:00
|
|
|
opcode_counts = collections.defaultdict(int)
|
2020-08-24 20:27:24 +00:00
|
|
|
num_flips = 0
|
|
|
|
while i < int(dlen/10):
|
2020-08-13 21:08:50 +00:00
|
|
|
if (i - last_updated) > int((dlen / 1000)):
|
2020-08-11 22:23:33 +00:00
|
|
|
eta.print_status()
|
2020-08-13 21:08:50 +00:00
|
|
|
last_updated = i
|
2020-08-11 22:23:33 +00:00
|
|
|
|
2020-08-24 20:27:24 +00:00
|
|
|
candidate_opcodes, voltages = opcodes.candidate_opcodes(
|
2020-08-13 21:08:50 +00:00
|
|
|
frame_offset, lookahead_steps)
|
|
|
|
|
|
|
|
opcode_idx = lookahead(step, position, data, i, voltage * voltages)
|
2020-08-24 20:27:24 +00:00
|
|
|
opcode = candidate_opcodes[opcode_idx].opcodes[0]
|
2020-08-16 22:15:30 +00:00
|
|
|
opcode_counts[opcode] += 1
|
2020-08-13 21:08:50 +00:00
|
|
|
yield opcode
|
|
|
|
|
2020-08-24 20:27:24 +00:00
|
|
|
position, voltage, new_error, i, new_flips = evolve(
|
2020-08-13 21:08:50 +00:00
|
|
|
opcode, position, voltage, step, data, i)
|
|
|
|
|
|
|
|
total_err += new_error
|
2020-08-24 20:27:24 +00:00
|
|
|
num_flips += new_flips
|
2020-08-13 21:08:50 +00:00
|
|
|
frame_offset = (frame_offset + 1) % 2048
|
|
|
|
|
|
|
|
for _ in range(frame_offset % 2048, 2047):
|
2020-08-24 20:27:24 +00:00
|
|
|
yield opcodes.Opcode.TICK_00
|
2020-08-13 21:08:50 +00:00
|
|
|
yield opcodes.Opcode.EXIT
|
2020-08-11 22:23:33 +00:00
|
|
|
eta.done()
|
2020-08-10 20:03:12 +00:00
|
|
|
print("Total error %f" % total_err)
|
2020-08-24 20:27:24 +00:00
|
|
|
print("%d speaker actuations" % num_flips)
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2020-08-16 22:15:30 +00:00
|
|
|
print("Opcodes used:")
|
|
|
|
for v, k in sorted(list(opcode_counts.items()), key=lambda kv: kv[1],
|
|
|
|
reverse=True):
|
|
|
|
print("%s: %d" % (v, k))
|
|
|
|
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2020-08-11 22:23:33 +00:00
|
|
|
def preprocess(
|
|
|
|
filename: str, target_sample_rate: int,
|
|
|
|
normalize: float = 0.5) -> numpy.ndarray:
|
2020-08-16 22:15:30 +00:00
|
|
|
"""Upscale input audio to target sample rate and normalize signal."""
|
|
|
|
|
2020-08-11 22:23:33 +00:00
|
|
|
data, _ = librosa.load(filename, sr=target_sample_rate, mono=True)
|
|
|
|
|
2020-08-16 22:15:30 +00:00
|
|
|
max_value = numpy.percentile(data, 100)
|
2020-08-11 22:23:33 +00:00
|
|
|
data /= max_value
|
|
|
|
data *= normalize
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
2020-08-13 21:08:50 +00:00
|
|
|
|
2020-08-10 20:03:12 +00:00
|
|
|
def main(argv):
|
|
|
|
serve_file = argv[1]
|
|
|
|
step = int(argv[2])
|
2020-08-16 22:15:30 +00:00
|
|
|
|
|
|
|
# TODO: if we're not looking ahead beyond the longest (non-slowpath) opcode
|
|
|
|
# then this will reduce quality, e.g. a long NOTICK and TICK will
|
|
|
|
# both look the same over a too-short horizon, but have different results.
|
2020-08-10 20:03:12 +00:00
|
|
|
lookahead_steps = int(argv[3])
|
|
|
|
out = argv[4]
|
|
|
|
|
2020-08-16 22:15:30 +00:00
|
|
|
# TODO: PAL Apple ][ clock rate is slightly different
|
2020-08-13 21:08:50 +00:00
|
|
|
sample_rate = int(1024. * 1000)
|
2020-08-11 22:23:33 +00:00
|
|
|
data = preprocess(serve_file, sample_rate)
|
2020-08-16 22:15:30 +00:00
|
|
|
|
2020-08-10 20:03:12 +00:00
|
|
|
with open(out, "wb+") as f:
|
2020-08-16 22:15:30 +00:00
|
|
|
for opcode in audio_bytestream(data, step, lookahead_steps):
|
2020-08-13 21:08:50 +00:00
|
|
|
f.write(bytes([opcode.value]))
|
2020-08-10 20:03:12 +00:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main(sys.argv)
|