This commit is contained in:
kris 2022-07-22 12:12:46 +01:00
parent 183781da60
commit e4f20ea9c7

View File

@ -270,7 +270,7 @@ def audio_bytestream(data: numpy.ndarray, step: int, lookahead_steps: int,
print("%s: %d" % (v, k)) print("%s: %d" % (v, k))
def preprocess( def preprocess_audio(
filename: str, target_sample_rate: int, normalize: float, filename: str, target_sample_rate: int, normalize: float,
normalization_percentile: int) -> numpy.ndarray: normalization_percentile: int) -> numpy.ndarray:
"""Upscale input audio to target sample rate and normalize signal.""" """Upscale input audio to target sample rate and normalize signal."""
@ -295,7 +295,8 @@ def downsample_audio(simulated_audio, original_audio, input_rate, output_rate,
:arg noise_output Whether to also produce a noise waveform, i.e. difference :arg noise_output Whether to also produce a noise waveform, i.e. difference
between input and output audio between input and output audio
:returns Tuple of downsampled audio and noise data (or None
if noise_output==False)
""" """
downsampled_output = librosa.resample( downsampled_output = librosa.resample(
numpy.array(simulated_audio, dtype=numpy.float32), numpy.array(simulated_audio, dtype=numpy.float32),
@ -344,25 +345,29 @@ def main():
# Effective clock rate, including every-65 cycle "long cycle" that takes # Effective clock rate, including every-65 cycle "long cycle" that takes
# 16/14 as long. # 16/14 as long.
sample_rate = 1015657 if args.clock == 'pal' else 1020484 # NTSC cpu_clock_rate = 1015657 if args.clock == 'pal' else 1020484 # NTSC
input_audio = preprocess(args.input, sample_rate, args.normalization, input_audio = preprocess_audio(
args.norm_percentile) args.input, cpu_clock_rate, args.normalization, args.norm_percentile)
print("Done preprocessing audio") print("Done preprocessing audio")
# Sample rate for output .wav files
# TODO: flag
output_rate = 44100 output_rate = 44100
# Buffers simulated audio output so we can downsample it in suitably
# large chunks for writing to the output .wav file
output_buffer = [] output_buffer = []
input_offset = 0
# Python contexts for writing output files if requested
opcode_context = open(args.output, "wb+") opcode_context = open(args.output, "wb+")
if args.wav_output: if args.wav_output:
wav_context = sf.SoundFile( wav_context = sf.SoundFile(
args.wav_output, "w", output_rate, channels=1, format='WAV') args.wav_output, "w", output_rate, channels=1, format='WAV')
else: else:
# We're not creating a file but still need a context
# XXX does this work?
wav_context = contextlib.nullcontext wav_context = contextlib.nullcontext
if args.noise_output: if args.noise_output:
noise_context = sf.SoundFile( noise_context = sf.SoundFile(
args.noise_output, "w", output_rate, channels=1, args.noise_output, "w", output_rate, channels=1,
@ -373,46 +378,53 @@ def main():
with wav_context as wav_f, noise_context as noise_f, opcode_context \ with wav_context as wav_f, noise_context as noise_f, opcode_context \
as opcode_f: as opcode_f:
# Tracks current position in input audio waveform
input_offset = 0
# Process input audio, writing output to ][-Sound audio file
# and (if requested) .wav files of simulated speaker audio and
# noise (difference between original and simulated audio)
for idx, sample_data in enumerate(audio_bytestream( for idx, sample_data in enumerate(audio_bytestream(
input_audio, args.step_size, args.lookahead_cycles, input_audio, args.step_size, args.lookahead_cycles,
sample_rate)): cpu_clock_rate)):
opcode, samples = sample_data opcode, samples = sample_data
opcode_f.write(bytes([opcode.byte])) opcode_f.write(bytes([opcode.byte]))
output_buffer.extend(samples) output_buffer.extend(samples)
input_offset += len(samples) input_offset += len(samples)
# TODO: don't bother computing if we're not writing wavs
# Keep accumulating as long as we have <1MB in the buffer, or are # Keep accumulating as long as we have <1MB in the buffer, or are
# within 1MB from the end. This ensures we have enough samples to # within 1MB from the end. This ensures we have enough samples to
# resample including the last (partial) buffer # downsample, including the last (partial) buffer.
if len(output_buffer) < 1 * 1024 * 1024: if (
len(output_buffer) < 1 * 1024 * 1024 or (
len(input_audio) - input_offset) < 1 * 1024 * 1024
):
continue continue
if (len(input_audio) - input_offset) < 1 * 1024 * 1024:
continue # TODO: don't bother computing if we're not writing wavs
resampled_output_buffer, resampled_noise_buffer = downsampled_output( downsampled_audio, downsampled_noise = downsampled_audio(
output_buffer, input_audio[input_offset - len(output_buffer):], output_buffer, input_audio[input_offset - len(output_buffer):],
sample_rate, output_rate, bool(args.noise_output) cpu_clock_rate, output_rate, bool(args.noise_output)
) )
if args.wav_output: if args.wav_output:
wav_f.write(resampled_output_buffer) wav_f.write(downsampled_audio)
wav_f.flush() wav_f.flush()
if args.noise_output: if args.noise_output:
noise_f.write(resampled_noise_buffer) noise_f.write(downsampled_noise)
noise_f.flush() noise_f.flush()
output_buffer = [] output_buffer = []
# TODO: handle last buffer more cleanly than duplicating this code
if output_buffer: if output_buffer:
resampled_output_buffer, resampled_noise_buffer = downsampled_output( downsampled_audio, downsampled_noise = downsampled_audio(
output_buffer, input_audio[input_offset - len(output_buffer):], output_buffer, input_audio[input_offset - len(output_buffer):],
sample_rate, output_rate, bool(args.noise_output) cpu_clock_rate, output_rate, bool(args.noise_output)
) )
if args.wav_output: if args.wav_output:
wav_f.write(resampled_output_buffer) wav_f.write(downsampled_audio)
if args.noise_output: if args.noise_output:
noise_f.write(resampled_noise_buffer) noise_f.write(downsampled_noise)
if __name__ == "__main__": if __name__ == "__main__":