2020-08-11 22:26:55 +00:00
|
|
|
#!/usr/bin/env python3
|
2020-08-11 22:23:33 +00:00
|
|
|
# Delta modulation audio encoder.
|
|
|
|
#
|
2020-08-16 22:15:30 +00:00
|
|
|
# Simulates the Apple II speaker at 1MHz (i.e. cycle-level) resolution,
|
|
|
|
# by modeling it as an RC circuit with given time constant. In order to
|
|
|
|
# reproduce a target audio waveform, we upscale it to 1MHz sample rate,
|
|
|
|
# and compute the sequence of player opcodes to best reproduce this waveform.
|
2020-08-11 22:23:33 +00:00
|
|
|
#
|
2020-12-28 22:42:34 +00:00
|
|
|
# XXX
|
2020-08-16 22:15:30 +00:00
|
|
|
# Since the player opcodes are chosen to allow ticking the speaker during any
|
|
|
|
# given clock cycle (though with some limits on the minimum time
|
|
|
|
# between ticks), this means that we are able to control the Apple II speaker
|
|
|
|
# with cycle-level precision, which results in high audio fidelity with low
|
|
|
|
# noise.
|
|
|
|
#
|
|
|
|
# To further optimize the audio quality we look ahead some defined number of
|
|
|
|
# cycles and choose a speaker trajectory that minimizes errors over this range.
|
|
|
|
# e.g. this allows us to anticipate large amplitude changes by pre-moving
|
2020-08-11 22:23:33 +00:00
|
|
|
# the speaker to better approximate them.
|
|
|
|
#
|
2020-12-28 22:42:34 +00:00
|
|
|
# This also needs to take into account scheduling the "end of frame" opcode
|
|
|
|
# every 2048 output bytes, where the Apple II will manage the TCP socket buffer
|
|
|
|
# while ticking the speaker at a regular cadence to keep it in a net-neutral
|
|
|
|
# position. When looking ahead we can also (partially) compensate for this
|
|
|
|
# "dead" period by pre-positioning.
|
2020-08-11 22:23:33 +00:00
|
|
|
|
2020-12-28 12:54:44 +00:00
|
|
|
import argparse
|
2020-08-16 22:15:30 +00:00
|
|
|
import collections
|
2020-08-24 21:28:28 +00:00
|
|
|
import functools
|
2020-08-11 22:23:33 +00:00
|
|
|
import librosa
|
2020-08-10 20:03:12 +00:00
|
|
|
import numpy
|
2022-07-02 15:50:42 +00:00
|
|
|
import soundfile as sf
|
2020-08-11 22:23:33 +00:00
|
|
|
from eta import ETA
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2022-05-16 20:11:17 +00:00
|
|
|
import lookahead
|
2022-07-02 15:50:42 +00:00
|
|
|
import opcodes
|
2022-07-02 13:15:41 +00:00
|
|
|
import opcodes_generated
|
|
|
|
|
2020-08-24 21:28:28 +00:00
|
|
|
|
2020-12-07 20:48:15 +00:00
|
|
|
def total_error(positions: numpy.ndarray, data: numpy.ndarray) -> numpy.ndarray:
|
|
|
|
"""Computes the total squared error for speaker position matrix vs data."""
|
|
|
|
return numpy.sum(numpy.square(positions - data), axis=-1)
|
2020-08-13 21:08:50 +00:00
|
|
|
|
2020-08-16 22:15:30 +00:00
|
|
|
|
2020-08-25 19:46:00 +00:00
|
|
|
@functools.lru_cache(None)
|
2020-10-15 12:08:33 +00:00
|
|
|
def frame_horizon(frame_offset: int, lookahead_steps: int):
|
2020-12-28 22:42:34 +00:00
|
|
|
"""Optimize frame_offset when more than lookahead_steps from end of frame.
|
2020-08-25 19:46:00 +00:00
|
|
|
|
2020-12-28 22:42:34 +00:00
|
|
|
Candidate opcodes for all values of frame_offset are equal, until the
|
|
|
|
end-of-frame opcode comes within our lookahead horizon.
|
2020-08-25 19:46:00 +00:00
|
|
|
"""
|
2020-12-07 20:48:15 +00:00
|
|
|
# TODO: This could be made tighter because a step is always at least 5
|
|
|
|
# cycles towards lookahead_steps.
|
2020-08-25 19:46:00 +00:00
|
|
|
if frame_offset < (2047 - lookahead_steps):
|
|
|
|
return 0
|
|
|
|
return frame_offset
|
|
|
|
|
|
|
|
|
2022-05-16 20:11:17 +00:00
|
|
|
class Speaker:
|
|
|
|
def __init__(self, sample_rate: float, freq: float, damping: float):
|
|
|
|
self.sample_rate = sample_rate
|
|
|
|
self.freq = freq
|
|
|
|
self.damping = damping
|
|
|
|
|
|
|
|
dt = numpy.float64(1 / sample_rate)
|
|
|
|
|
|
|
|
w = numpy.float64(freq * 2 * numpy.pi * dt)
|
|
|
|
|
|
|
|
d = damping * dt
|
|
|
|
e = numpy.exp(d)
|
|
|
|
c1 = 2 * e * numpy.cos(w)
|
|
|
|
|
|
|
|
c2 = e * e
|
|
|
|
t0 = (1 - 2 * e * numpy.cos(w) + e * e) / (d * d + w * w)
|
|
|
|
t = d * d + w * w - numpy.pi * numpy.pi
|
|
|
|
t1 = (1 + 2 * e * numpy.cos(w) + e * e) / numpy.sqrt(t * t + 4 * d * d *
|
|
|
|
numpy.pi * numpy.pi)
|
|
|
|
b2 = (t1 - t0) / (t1 + t0)
|
|
|
|
b1 = b2 * dt * dt * (t0 + t1) / 2
|
|
|
|
|
|
|
|
self.c1 = c1
|
|
|
|
self.c2 = c2
|
|
|
|
self.b1 = b1
|
|
|
|
self.b2 = b2
|
|
|
|
# print(dt, w, d, e, c1,c2,b1,b2)
|
|
|
|
|
2022-06-11 16:27:52 +00:00
|
|
|
# 3000 - 241
|
|
|
|
# 2500 - 97
|
|
|
|
# 2000 - 24
|
|
|
|
# 1700 - 9.6
|
|
|
|
# 1600 - 8.8
|
|
|
|
# 1500 - 9.39
|
|
|
|
# 1400 - 10.46
|
|
|
|
# 1000 - 21.56
|
2022-05-16 20:11:17 +00:00
|
|
|
|
2022-06-11 16:27:52 +00:00
|
|
|
# 1600 - 3603
|
|
|
|
# 1000 - 708
|
|
|
|
# 800 - 802
|
|
|
|
self.scale = numpy.float64(1 / 800) # TODO: analytic expression
|
2022-05-16 20:11:17 +00:00
|
|
|
|
|
|
|
|
2020-12-28 12:29:05 +00:00
|
|
|
def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int,
|
2022-07-02 13:15:41 +00:00
|
|
|
sample_rate: int):
|
2020-08-16 22:15:30 +00:00
|
|
|
"""Computes optimal sequence of player opcodes to reproduce audio data."""
|
|
|
|
|
2020-08-10 20:03:12 +00:00
|
|
|
dlen = len(data)
|
2020-12-07 20:48:15 +00:00
|
|
|
# Leave enough padding at the end to look ahead from the last data value,
|
2020-12-28 22:42:34 +00:00
|
|
|
# and in case we schedule an end-of-frame opcode towards the end.
|
2020-08-25 19:46:00 +00:00
|
|
|
# TODO: avoid temporarily doubling memory footprint to concatenate
|
2020-12-07 20:48:15 +00:00
|
|
|
data = numpy.ascontiguousarray(numpy.concatenate(
|
|
|
|
[data, numpy.zeros(max(lookahead_steps, opcodes.cycle_length(
|
2022-07-02 13:15:41 +00:00
|
|
|
opcodes_generated.PlayerOps.TICK_00)), dtype=numpy.float32)]))
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2022-06-11 16:27:52 +00:00
|
|
|
# Starting speaker applied voltage.
|
|
|
|
voltage1 = voltage2 = -1.0 # * 2.5
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2020-12-28 12:29:05 +00:00
|
|
|
toggles = 0
|
2022-05-16 20:11:17 +00:00
|
|
|
|
|
|
|
sp = Speaker(sample_rate, freq=3875, damping=-1210)
|
2020-12-07 20:48:15 +00:00
|
|
|
|
2020-12-07 20:58:18 +00:00
|
|
|
total_err = 0.0 # Total squared error of audio output
|
2020-12-07 20:48:15 +00:00
|
|
|
frame_offset = 0 # Position in 2048-byte TCP frame
|
2020-12-07 20:58:18 +00:00
|
|
|
i = 0 # index within input data
|
2020-12-07 20:48:15 +00:00
|
|
|
eta = ETA(total=1000, min_ms_between_updates=0)
|
|
|
|
next_tick = 0 # Value of i at which we should next update eta
|
|
|
|
# Keep track of how many opcodes we schedule
|
2020-08-16 22:15:30 +00:00
|
|
|
opcode_counts = collections.defaultdict(int)
|
2022-05-16 20:11:17 +00:00
|
|
|
|
2022-06-28 21:04:57 +00:00
|
|
|
y1 = y2 = 1.0 # last 2 speaker positions
|
2022-06-11 16:27:52 +00:00
|
|
|
# data = numpy.full(data.shape, 0.0)
|
|
|
|
# data = numpy.sin(
|
|
|
|
# numpy.arange(len(data)) * (2 * numpy.pi / (sample_rate / 3875)))
|
2022-06-04 13:06:52 +00:00
|
|
|
|
2022-06-11 16:27:52 +00:00
|
|
|
clicks = 0
|
2022-06-21 20:33:19 +00:00
|
|
|
min_lookahead_steps = lookahead_steps
|
2022-07-02 15:50:42 +00:00
|
|
|
while i < dlen // 20:
|
2022-06-04 13:06:52 +00:00
|
|
|
# XXX handle end of data cleanly
|
2022-06-11 20:13:36 +00:00
|
|
|
if i >= next_tick:
|
|
|
|
eta.print_status()
|
|
|
|
next_tick = int(eta.i * dlen / 1000)
|
2022-06-04 13:06:52 +00:00
|
|
|
|
2022-07-02 13:15:41 +00:00
|
|
|
# XXX
|
2022-07-02 15:50:42 +00:00
|
|
|
if frame_offset >= 2045: # XXX
|
|
|
|
lookahead_steps = min_lookahead_steps + 120 # XXX parametrize
|
2022-06-21 20:33:19 +00:00
|
|
|
else:
|
|
|
|
lookahead_steps = min_lookahead_steps
|
|
|
|
|
2020-12-07 20:48:15 +00:00
|
|
|
# Compute all possible opcode sequences for this frame offset
|
2022-07-02 13:15:41 +00:00
|
|
|
candidate_opcodes, voltages, lookahead_steps = \
|
2022-05-24 21:19:40 +00:00
|
|
|
opcodes.candidate_opcodes(
|
2022-06-21 20:33:19 +00:00
|
|
|
frame_horizon(frame_offset, lookahead_steps),
|
2022-07-02 13:15:41 +00:00
|
|
|
lookahead_steps, opcode if frame_offset == 2047 else None)
|
2022-06-11 16:27:52 +00:00
|
|
|
all_positions = lookahead.evolve(
|
|
|
|
sp, y1, y2, voltage1, voltage2, voltage1 * voltages)
|
2020-12-07 20:48:15 +00:00
|
|
|
|
|
|
|
# Pick the opcode sequence that minimizes the total squared error
|
2022-06-11 16:27:52 +00:00
|
|
|
# relative to the data waveform.
|
2022-06-04 13:06:52 +00:00
|
|
|
errors = total_error(
|
|
|
|
all_positions * sp.scale, data[i:i + lookahead_steps])
|
|
|
|
opcode_idx = numpy.argmin(errors).item()
|
2022-06-28 21:04:57 +00:00
|
|
|
# if frame_offset == 2046:
|
2022-07-02 13:15:41 +00:00
|
|
|
# print("XXX", lookahead_steps)
|
2022-06-28 21:04:57 +00:00
|
|
|
# print(opcode_idx)
|
|
|
|
# for i, e in enumerate(errors):
|
|
|
|
# print(i, e, candidate_opcodes[i])
|
2020-12-07 20:48:15 +00:00
|
|
|
# Next opcode
|
2020-08-25 19:46:00 +00:00
|
|
|
opcode = candidate_opcodes[opcode_idx][0]
|
2022-06-04 13:06:52 +00:00
|
|
|
# opcode = opcode_seq.__next__()
|
|
|
|
|
2022-07-02 13:15:41 +00:00
|
|
|
opcode_length = opcodes.cycle_length(opcode)
|
2020-08-16 22:15:30 +00:00
|
|
|
opcode_counts[opcode] += 1
|
2022-05-24 21:19:40 +00:00
|
|
|
# toggles += opcodes.TOGGLES[opcode]
|
2020-08-13 21:08:50 +00:00
|
|
|
|
2020-12-07 20:48:15 +00:00
|
|
|
# Apply this opcode to evolve the speaker position
|
2022-05-16 20:11:17 +00:00
|
|
|
opcode_voltages = (voltage1 * opcodes.voltage_schedule(
|
2022-07-02 13:15:41 +00:00
|
|
|
opcode)).reshape((1, -1))
|
2022-06-11 16:27:52 +00:00
|
|
|
all_positions = lookahead.evolve(
|
|
|
|
sp, y1, y2, voltage1, voltage2, opcode_voltages)
|
2022-05-16 20:11:17 +00:00
|
|
|
|
|
|
|
assert all_positions.shape[0] == 1
|
|
|
|
assert all_positions.shape[1] == opcode_length
|
|
|
|
|
|
|
|
voltage1 = opcode_voltages[0, -1]
|
|
|
|
voltage2 = opcode_voltages[0, -2]
|
|
|
|
y1 = all_positions[0, -1]
|
|
|
|
y2 = all_positions[0, -2]
|
2022-05-24 21:19:40 +00:00
|
|
|
new_error = total_error(
|
2022-05-16 20:11:17 +00:00
|
|
|
all_positions[0] * sp.scale, data[i:i + opcode_length]).item()
|
2022-05-24 21:19:40 +00:00
|
|
|
total_err += new_error
|
2022-06-11 16:27:52 +00:00
|
|
|
if new_error > 0.3:
|
|
|
|
clicks += 1
|
|
|
|
print(frame_offset, i / sample_rate, opcode, new_error,
|
|
|
|
numpy.mean(data[i:i + opcode_length])) # , "<----" if \
|
|
|
|
# new_error > 0.3 else "")
|
|
|
|
|
2022-07-02 13:15:41 +00:00
|
|
|
# print(frame_offset, i / sample_rate, opcode)
|
2022-06-11 16:27:52 +00:00
|
|
|
for v in all_positions[0]:
|
2022-06-28 21:04:57 +00:00
|
|
|
yield (v * sp.scale).astype(numpy.float32)
|
2022-06-11 16:27:52 +00:00
|
|
|
# # print(v * sp.scale)
|
2022-06-11 20:13:36 +00:00
|
|
|
# if frame_offset == 2047:
|
|
|
|
# print(opcode)
|
2022-06-11 16:27:52 +00:00
|
|
|
# yield opcode
|
2022-06-04 13:06:52 +00:00
|
|
|
|
2020-12-07 20:48:15 +00:00
|
|
|
i += opcode_length
|
2020-08-13 21:08:50 +00:00
|
|
|
frame_offset = (frame_offset + 1) % 2048
|
|
|
|
|
2020-12-07 20:48:15 +00:00
|
|
|
# Make sure we have at least 2k left in stream so player will do a
|
|
|
|
# complete read.
|
2022-06-11 16:27:52 +00:00
|
|
|
# for _ in range(frame_offset % 2048, 2048):
|
|
|
|
# yield opcodes.Opcode.EXIT
|
2020-08-11 22:23:33 +00:00
|
|
|
eta.done()
|
2020-08-10 20:03:12 +00:00
|
|
|
print("Total error %f" % total_err)
|
2020-12-28 12:29:05 +00:00
|
|
|
toggles_per_sec = toggles / dlen * sample_rate
|
|
|
|
print("%d speaker toggles/sec" % toggles_per_sec)
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2020-08-16 22:15:30 +00:00
|
|
|
print("Opcodes used:")
|
|
|
|
for v, k in sorted(list(opcode_counts.items()), key=lambda kv: kv[1],
|
|
|
|
reverse=True):
|
|
|
|
print("%s: %d" % (v, k))
|
2022-06-11 16:27:52 +00:00
|
|
|
print("%d clicks" % clicks)
|
2020-08-16 22:15:30 +00:00
|
|
|
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2020-08-11 22:23:33 +00:00
|
|
|
def preprocess(
|
2020-12-28 22:42:34 +00:00
|
|
|
filename: str, target_sample_rate: int, normalize: float,
|
|
|
|
normalization_percentile: int) -> numpy.ndarray:
|
2020-08-16 22:15:30 +00:00
|
|
|
"""Upscale input audio to target sample rate and normalize signal."""
|
|
|
|
|
2020-08-11 22:23:33 +00:00
|
|
|
data, _ = librosa.load(filename, sr=target_sample_rate, mono=True)
|
|
|
|
|
2022-06-21 20:33:19 +00:00
|
|
|
max_value = numpy.percentile(data, normalization_percentile)
|
2020-08-11 22:23:33 +00:00
|
|
|
data /= max_value
|
|
|
|
data *= normalize
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
2020-08-13 21:08:50 +00:00
|
|
|
|
2020-12-28 12:54:44 +00:00
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument("--clock", choices=['pal', 'ntsc'],
|
|
|
|
help="Whether target machine clock speed is PAL ("
|
|
|
|
"1015657Hz) or NTSC (1020484)",
|
|
|
|
required=True)
|
2022-06-11 16:27:52 +00:00
|
|
|
# TODO: implement 6502 - JMP indirect takes 5 cycles instead of 6
|
2020-12-28 12:54:44 +00:00
|
|
|
parser.add_argument("--step_size", type=int,
|
|
|
|
help="Delta encoding step size")
|
2020-12-28 22:42:34 +00:00
|
|
|
# TODO: if we're not looking ahead beyond the longest (non-end-of-frame)
|
2020-12-29 14:33:22 +00:00
|
|
|
# opcode then this will reduce quality, e.g. two opcodes may truncate to
|
|
|
|
# the same prefix, but have different results when we apply them
|
|
|
|
# fully.
|
2020-12-28 12:54:44 +00:00
|
|
|
parser.add_argument("--lookahead_cycles", type=int,
|
|
|
|
help="Number of clock cycles to look ahead in audio "
|
|
|
|
"stream.")
|
2020-12-28 22:42:34 +00:00
|
|
|
parser.add_argument("--normalization", default=1.0, type=float,
|
|
|
|
help="Overall multiplier to rescale input audio "
|
|
|
|
"values.")
|
2022-06-21 20:33:19 +00:00
|
|
|
parser.add_argument("--norm_percentile", default=100,
|
2020-12-28 22:42:34 +00:00
|
|
|
help="Normalize to specified percentile value of input "
|
|
|
|
"audio")
|
2022-07-02 15:50:42 +00:00
|
|
|
parser.add_argument("--noise_output", type=str, help="output audio file")
|
2020-12-28 12:54:44 +00:00
|
|
|
parser.add_argument("input", type=str, help="input audio file to convert")
|
|
|
|
parser.add_argument("output", type=str, help="output audio file")
|
|
|
|
args = parser.parse_args()
|
2020-08-10 20:03:12 +00:00
|
|
|
|
2020-12-24 14:43:00 +00:00
|
|
|
# Effective clock rate, including every-65 cycle "long cycle" that takes
|
|
|
|
# 16/14 as long.
|
2020-12-28 12:54:44 +00:00
|
|
|
sample_rate = 1015657 if args.clock == 'pal' else 1020484 # NTSC
|
2020-08-16 22:15:30 +00:00
|
|
|
|
2022-06-28 21:04:57 +00:00
|
|
|
input_audio = preprocess(args.input, sample_rate, args.normalization,
|
|
|
|
args.norm_percentile)
|
|
|
|
print("Done preprocessing audio")
|
2022-07-02 15:50:42 +00:00
|
|
|
|
|
|
|
output_rate = 44100
|
|
|
|
|
2022-06-28 21:04:57 +00:00
|
|
|
output = numpy.array(list(
|
|
|
|
audio_bytestream(input_audio, args.step_size, args.lookahead_cycles,
|
2022-07-02 15:50:42 +00:00
|
|
|
sample_rate)), dtype=numpy.float32)
|
|
|
|
if args.noise_output:
|
|
|
|
noise = numpy.array(output - input_audio[:len(output)])
|
|
|
|
noise = librosa.resample(noise, orig_sr=sample_rate,
|
|
|
|
target_sr=output_rate)
|
|
|
|
with sf.SoundFile(
|
|
|
|
args.noise_output, "w", output_rate, channels=1,
|
|
|
|
format='WAV') as f:
|
|
|
|
f.write(noise)
|
|
|
|
|
2022-06-11 16:27:52 +00:00
|
|
|
output = librosa.resample(output, orig_sr=sample_rate,
|
|
|
|
target_sr=output_rate)
|
|
|
|
with sf.SoundFile(
|
|
|
|
args.output, "w", output_rate, channels=1, format='WAV') \
|
|
|
|
as f:
|
|
|
|
f.write(output)
|
2022-07-02 15:50:42 +00:00
|
|
|
|
2022-06-11 16:27:52 +00:00
|
|
|
# with open(args.output, "wb+") as f:
|
|
|
|
# for opcode in audio_bytestream(
|
|
|
|
# preprocess(args.input, sample_rate, args.normalization,
|
|
|
|
# args.norm_percentile), args.step_size,
|
|
|
|
# args.lookahead_cycles, sample_rate, args.cpu == '6502'):
|
|
|
|
# f.write(bytes([opcode.value]))
|
2020-08-10 20:03:12 +00:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2020-12-28 12:54:44 +00:00
|
|
|
main()
|