Optimize encoder by memoizing better, and expanding the recurrence

relation for the difference equation to use a closed-form solution
that can be vectorized better.  This is now about 3x faster.
This commit is contained in:
kris 2020-12-03 14:02:36 +00:00
parent 5fb995c435
commit 468d47966d
2 changed files with 75 additions and 93 deletions

View File

@ -24,16 +24,25 @@
# compensate for this "dead" period by pre-positioning.
import collections
import functools
import sys
import librosa
import numpy
from eta import ETA
from typing import Tuple
import opcodes
# TODO: add flags to parametrize options
@functools.lru_cache(None)
def _delta_powers(shape, step_size: int) -> Tuple[float, numpy.ndarray]:
delta = (1 - 1 / step_size)
return delta, numpy.cumprod(numpy.full(shape, delta), axis=-1)
def lookahead(step_size: int, initial_position: float, data: numpy.ndarray,
offset: int, voltages: numpy.ndarray):
"""Evaluate effects of multiple potential opcode sequences and pick best.
@ -46,53 +55,76 @@ def lookahead(step_size: int, initial_position: float, data: numpy.ndarray,
performance with more opcode choices, although also has a larger fixed
overhead.
"""
positions = numpy.empty((voltages.shape[0], voltages.shape[1] + 1),
dtype=numpy.float32)
positions[:, 0] = initial_position
# The speaker position p_i evolves according to
# p_{i+1} = p_i + (v_i - p_i) / s
# where v_i is the i'th applied voltage, s is the speaker step size
#
# Rearranging, we get p_{i+1} = v_i / s + (1-1/s) p_i
# and if we expand the recurrence relation
# p_{i+1} = Sum_{j=0}^i (1-1/s)^(i-j) v_j / s + (1-1/s)^(i+1) p_0
# = (1-1/s)^(i+1)(1/s * Sum_{j=0}^i v_j / (1-1/s)^(j+1) + p0)
delta, delta_powers = _delta_powers(voltages.shape, step_size)
target_val = data[offset:offset + voltages.shape[1]]
scaled_voltages = voltages / step_size
for i in range(0, voltages.shape[1]):
positions[:, i + 1] = (
scaled_voltages[:, i] + positions[:, i] * (1 - 1 / step_size))
err = positions[:, 1:] - target_val
total_error = numpy.sum(numpy.power(err, 2), axis=1)
positions = delta_powers * (
numpy.cumsum(voltages / delta_powers, axis=1) / step_size +
initial_position)
total_error = numpy.sum(
numpy.square(positions - data[offset:offset + voltages.shape[1]]),
axis=1)
best = numpy.argmin(total_error)
return best
# TODO: share implementation with lookahead
# TODO: Merge with lookahead
def evolve(opcode: opcodes.Opcode, starting_position, starting_voltage,
step_size, data, starting_idx):
"""Apply the effects of playing a single opcode to completion.
Returns new state.
"""
opcode_length = opcodes.cycle_length(opcode)
voltages = starting_voltage * opcodes.VOLTAGE_SCHEDULE[opcode]
position = starting_position
total_err = 0.0
v = starting_voltage
for i, v in enumerate(voltages):
position += (v - position) / step_size
err = position - data[starting_idx + i]
total_err += err ** 2
return position, v, total_err, starting_idx + opcode_length
delta, delta_powers = _delta_powers(opcode_length, step_size)
positions = delta_powers * (
numpy.cumsum(voltages / delta_powers) / step_size +
starting_position)
# TODO: compute error once at the end?
total_err = numpy.sum(numpy.square(
positions - data[starting_idx:starting_idx + opcode_length]))
return positions[-1], voltages[-1], total_err, starting_idx + opcode_length
@functools.lru_cache(None)
def frame_horizon(frame_offset: int, lookahead_steps: int):
"""Optimize frame_offset when we're not within lookahead_steps of slowpath.
When computing candidate opcodes, all frame offsets are the same until the
end-of-frame slowpath comes within our lookahead horizon.
"""
if frame_offset < (2047 - lookahead_steps):
return 0
return frame_offset
def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int):
"""Computes optimal sequence of player opcodes to reproduce audio data."""
dlen = len(data)
data = numpy.concatenate([data, numpy.zeros(lookahead_steps)]).astype(
numpy.float32)
# TODO: avoid temporarily doubling memory footprint to concatenate
data = numpy.concatenate(
[data, numpy.zeros(lookahead_steps, dtype=numpy.float32)])
voltage = -1.0
position = -1.0
# Pre-warm cache so we don't skew ETA during encoding
for i in range(2048):
_, _ = opcodes.candidate_opcodes(frame_horizon(i, lookahead_steps),
lookahead_steps)
total_err = 0.0
frame_offset = 0
eta = ETA(total=1000)
@ -105,17 +137,13 @@ def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int):
eta.print_status()
last_updated = i
candidate_opcodes = opcodes.opcode_lookahead(
frame_offset, lookahead_steps)
pruned_opcodes, voltages = opcodes.prune_opcodes(
candidate_opcodes, lookahead_steps)
candidate_opcodes, voltages = opcodes.candidate_opcodes(
frame_horizon(frame_offset, lookahead_steps), lookahead_steps)
opcode_idx = lookahead(step, position, data, i, voltage * voltages)
opcode = pruned_opcodes[opcode_idx].opcodes[0]
opcode = candidate_opcodes[opcode_idx][0]
opcode_counts[opcode] += 1
yield opcode
# TODO: round position and memoize, and use in lookahead too
position, voltage, new_error, i = evolve(
opcode, position, voltage, step, data, i)
@ -135,13 +163,13 @@ def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int):
def preprocess(
filename: str, target_sample_rate: int,
normalize: float = 0.5) -> numpy.ndarray:
filename: str, target_sample_rate: int, normalize: float = 0.5,
normalization_percentile: int = 100) -> numpy.ndarray:
"""Upscale input audio to target sample rate and normalize signal."""
data, _ = librosa.load(filename, sr=target_sample_rate, mono=True)
max_value = numpy.percentile(data, 100)
max_value = numpy.percentile(data, normalization_percentile)
data /= max_value
data *= normalize
@ -160,10 +188,10 @@ def main(argv):
# TODO: PAL Apple ][ clock rate is slightly different
sample_rate = int(1024. * 1000)
data = preprocess(serve_file, sample_rate)
with open(out, "wb+") as f:
for opcode in audio_bytestream(data, step, lookahead_steps):
for opcode in audio_bytestream(
preprocess(serve_file, sample_rate), step, lookahead_steps):
f.write(bytes([opcode.value]))

View File

@ -66,27 +66,6 @@ def cycle_length(op: Opcode) -> int:
return len(VOLTAGE_SCHEDULE[op])
class _Opcodes:
"""Container for immutable Iterable[Opcode], to improve hash performance."""
def __init__(self, opcodes: Iterable[Opcode]):
self.opcodes = tuple(opcodes)
self._hash = hash(self.opcodes)
def __hash__(self):
return self._hash
# Guarantees each Tuple[Opcode] has a unique _Opcodes representation
_OPCODES_SINGLETON = {}
@functools.lru_cache(None)
def Opcodes(opcodes: Tuple[Opcode]):
"""Returns unique _Opcodes representation for Tuple[Opcode]."""
return _OPCODES_SINGLETON.setdefault(opcodes, _Opcodes(opcodes))
@functools.lru_cache(None)
def opcode_choices(frame_offset: int) -> List[Opcode]:
"""Returns sorted list of valid opcodes for given frame offset.
@ -104,16 +83,6 @@ def opcode_choices(frame_offset: int) -> List[Opcode]:
@functools.lru_cache(None)
def opcode_lookahead(
frame_offset: int,
lookahead_cycles: int) -> Tuple[_Opcodes]:
"""Computes all valid sequences of opcodes spanning lookahead_cycles."""
return tuple(Opcodes(ops) for ops in
_opcode_lookahead(frame_offset, lookahead_cycles))
@functools.lru_cache(None)
def _opcode_lookahead(
frame_offset: int,
lookahead_cycles: int) -> Tuple[Tuple[Opcode]]:
"""Recursively enumerates all valid opcode sequences."""
@ -124,53 +93,38 @@ def _opcode_lookahead(
if cycle_length(op) >= lookahead_cycles:
ops.append((op,))
else:
for res in _opcode_lookahead((frame_offset + 1) % 2048,
lookahead_cycles - cycle_length(op)):
for res in opcode_lookahead((frame_offset + 1) % 2048,
lookahead_cycles - cycle_length(op)):
ops.append((op,) + res)
return tuple(ops) # TODO: fix return type
class Cycles:
"""Container for immutable Tuple[float], to improve hash performance."""
def __init__(self, cycles: Tuple[float]):
self.cycles = cycles
self._hash = hash(cycles)
def __hash__(self):
return self._hash
# Guarantees each Tuple[float] has a unique Cycles representation
_CYCLES_SINGLETON = {}
@functools.lru_cache(None)
def cycle_lookahead(
opcodes: _Opcodes,
opcodes: Tuple[Opcode],
lookahead_cycles: int
) -> Cycles:
) -> Tuple[float]:
"""Computes the applied voltage effects of a sequence of opcodes.
i.e. produces the sequence of applied voltage changes that will result
from executing these opcodes, limited to the next lookahead_cycles.
"""
cycles = []
for op in opcodes.opcodes:
for op in opcodes:
cycles.extend(VOLTAGE_SCHEDULE[op])
trunc_cycles = tuple(cycles[:lookahead_cycles])
return _CYCLES_SINGLETON.setdefault(trunc_cycles, Cycles(trunc_cycles))
return tuple(cycles[:lookahead_cycles])
@functools.lru_cache(None)
def prune_opcodes(
opcodes: Tuple[_Opcodes], lookahead_cycles: int
) -> Tuple[List[_Opcodes], numpy.ndarray]:
def candidate_opcodes(
frame_offset: int, lookahead_cycles: int
) -> Tuple[List[Opcode], numpy.ndarray]:
"""Deduplicate a tuple of opcode sequences that are equivalent.
For each opcode sequence whose effect is the same when truncated to
lookahead_cycles, retains the first such opcode sequence.
"""
opcodes = opcode_lookahead(frame_offset, lookahead_cycles)
seen_cycles = set()
pruned_opcodes = []
pruned_cycles = []
@ -180,6 +134,6 @@ def prune_opcodes(
continue
seen_cycles.add(cycles)
pruned_opcodes.append(ops)
pruned_cycles.append(cycles.cycles)
pruned_cycles.append(cycles)
return pruned_opcodes, numpy.array(pruned_cycles, dtype=numpy.float32)
return pruned_opcodes, numpy.array(pruned_cycles, dtype=numpy.float32)