1
0
mirror of https://github.com/TomHarte/CLK.git synced 2025-01-26 15:32:04 +00:00

Not having read the C++ synchronisation primitives before, this async task queue is probably incorrect. But nevertheless, let's have a quick go at employing it — in a hideously thread unsafe fashion — for audio generation. What can possibly go wrong?

This commit is contained in:
Thomas Harte 2016-10-07 16:56:34 -04:00
parent 2d26feb073
commit e53455a936
4 changed files with 177 additions and 58 deletions

View File

@ -0,0 +1,62 @@
//
// AsyncTaskQueue.cpp
// Clock Signal
//
// Created by Thomas Harte on 07/10/2016.
// Copyright © 2016 Thomas Harte. All rights reserved.
//
#include "AsyncTaskQueue.hpp"
using namespace Concurrency;
AsyncTaskQueue::AsyncTaskQueue() : should_destruct_(false)
{
thread_.reset(new std::thread([this]() {
while(!should_destruct_)
{
std::function<void(void)> next_function;
queue_mutex_.lock();
if(!pending_tasks_.empty())
{
next_function = pending_tasks_.front();
pending_tasks_.pop_front();
}
queue_mutex_.unlock();
if(next_function)
{
next_function();
}
else
{
std::unique_lock<std::mutex> lock(queue_mutex_);
processing_condition_.wait(lock);
lock.unlock();
}
}
}));
}
AsyncTaskQueue::~AsyncTaskQueue()
{
should_destruct_ = true;
enqueue([](){});
}
void AsyncTaskQueue::enqueue(std::function<void(void)> function)
{
queue_mutex_.lock();
pending_tasks_.push_back(function);
queue_mutex_.unlock();
std::lock_guard<std::mutex> lock(queue_mutex_);
processing_condition_.notify_all();
}
void AsyncTaskQueue::synchronise()
{
// TODO
// std::mutex
}

View File

@ -0,0 +1,39 @@
//
// AsyncTaskQueue.hpp
// Clock Signal
//
// Created by Thomas Harte on 07/10/2016.
// Copyright © 2016 Thomas Harte. All rights reserved.
//
#ifndef AsyncTaskQueue_hpp
#define AsyncTaskQueue_hpp
#include <memory>
#include <thread>
#include <list>
#include <condition_variable>
namespace Concurrency {
class AsyncTaskQueue {
public:
AsyncTaskQueue();
~AsyncTaskQueue();
void enqueue(std::function<void(void)> function);
void synchronise();
private:
std::unique_ptr<std::thread> thread_;
std::mutex queue_mutex_;
std::list<std::function<void(void)>> pending_tasks_;
std::condition_variable processing_condition_;
bool should_destruct_;
};
}
#endif /* Concurrency_hpp */

View File

@ -30,6 +30,7 @@
4B30512D1D989E2200B4FED8 /* Drive.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B30512B1D989E2200B4FED8 /* Drive.cpp */; };
4B3051301D98ACC600B4FED8 /* Plus3.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B30512E1D98ACC600B4FED8 /* Plus3.cpp */; };
4B37EE821D7345A6006A09A4 /* BinaryDump.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B37EE801D7345A6006A09A4 /* BinaryDump.cpp */; };
4B3940E71DA83C8300427841 /* AsyncTaskQueue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */; };
4B3BA0C31D318AEC005DD7A7 /* C1540Tests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C21D318AEB005DD7A7 /* C1540Tests.swift */; };
4B3BA0CE1D318B44005DD7A7 /* C1540Bridge.mm in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C61D318B44005DD7A7 /* C1540Bridge.mm */; };
4B3BA0CF1D318B44005DD7A7 /* MOS6522Bridge.mm in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C91D318B44005DD7A7 /* MOS6522Bridge.mm */; };
@ -438,6 +439,8 @@
4B30512F1D98ACC600B4FED8 /* Plus3.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = Plus3.hpp; path = Electron/Plus3.hpp; sourceTree = "<group>"; };
4B37EE801D7345A6006A09A4 /* BinaryDump.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = BinaryDump.cpp; sourceTree = "<group>"; };
4B37EE811D7345A6006A09A4 /* BinaryDump.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = BinaryDump.hpp; sourceTree = "<group>"; };
4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = AsyncTaskQueue.cpp; path = ../../Concurrency/AsyncTaskQueue.cpp; sourceTree = "<group>"; };
4B3940E61DA83C8300427841 /* AsyncTaskQueue.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = AsyncTaskQueue.hpp; path = ../../Concurrency/AsyncTaskQueue.hpp; sourceTree = "<group>"; };
4B3BA0C21D318AEB005DD7A7 /* C1540Tests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = C1540Tests.swift; sourceTree = "<group>"; };
4B3BA0C51D318B44005DD7A7 /* C1540Bridge.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = C1540Bridge.h; sourceTree = "<group>"; };
4B3BA0C61D318B44005DD7A7 /* C1540Bridge.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = C1540Bridge.mm; sourceTree = "<group>"; };
@ -1003,6 +1006,15 @@
name = Outputs;
sourceTree = "<group>";
};
4B3940E81DA83C8700427841 /* Concurrency */ = {
isa = PBXGroup;
children = (
4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */,
4B3940E61DA83C8300427841 /* AsyncTaskQueue.hpp */,
);
name = Concurrency;
sourceTree = "<group>";
};
4B3BA0C41D318B44005DD7A7 /* Bridges */ = {
isa = PBXGroup;
children = (
@ -1469,6 +1481,7 @@
4BB73E951B587A5100552FC2 = {
isa = PBXGroup;
children = (
4B3940E81DA83C8700427841 /* Concurrency */,
4BC76E6A1C98F43700E6EF73 /* Accelerate.framework */,
4BB73EA01B587A5100552FC2 /* Clock Signal */,
4BB73EB51B587A5100552FC2 /* Clock SignalTests */,
@ -2185,6 +2198,7 @@
4B1E85751D170228001EF87D /* Typer.cpp in Sources */,
4BF829631D8F536B001BAE39 /* SSD.cpp in Sources */,
4B2E2D9D1C3A070400138695 /* Electron.cpp in Sources */,
4B3940E71DA83C8300427841 /* AsyncTaskQueue.cpp in Sources */,
4BAB62B81D3302CA00DF5BA0 /* PCMTrack.cpp in Sources */,
4B69FB3D1C4D908A00B5F0AA /* Tape.cpp in Sources */,
4B8FE2291DA1EDDF0090D3CE /* ElectronOptionsPanel.swift in Sources */,

View File

@ -14,6 +14,7 @@
#include <time.h>
#include "../SignalProcessing/Stepper.hpp"
#include "../SignalProcessing/FIRFilter.hpp"
#include "../Concurrency/AsyncTaskQueue.hpp"
namespace Outputs {
@ -25,7 +26,7 @@ namespace Outputs {
Intended to be a parent class, allowing descendants to pick the strategy by which input samples are mapped to
output samples.
*/
class Speaker {
class Speaker: public Concurrency::AsyncTaskQueue {
public:
class Delegate {
public:
@ -124,49 +125,20 @@ template <class T> class Filter: public Speaker {
public:
void run_for_cycles(unsigned int input_cycles)
{
if(_coefficients_are_dirty) update_filter_coefficients();
enqueue([=]() {
unsigned int cycles_remaining = input_cycles;
if(_coefficients_are_dirty) update_filter_coefficients();
// if input and output rates exactly match, just accumulate results and pass on
if(_input_cycles_per_second == _output_cycles_per_second && _high_frequency_cut_off < 0.0)
{
while(input_cycles)
// if input and output rates exactly match, just accumulate results and pass on
if(_input_cycles_per_second == _output_cycles_per_second && _high_frequency_cut_off < 0.0)
{
unsigned int cycles_to_read = (unsigned int)(_buffer_size - _buffer_in_progress_pointer);
if(cycles_to_read > input_cycles) cycles_to_read = input_cycles;
static_cast<T *>(this)->get_samples(cycles_to_read, &_buffer_in_progress.get()[_buffer_in_progress_pointer]);
_buffer_in_progress_pointer += cycles_to_read;
// announce to delegate if full
if(_buffer_in_progress_pointer == _buffer_size)
while(cycles_remaining)
{
_buffer_in_progress_pointer = 0;
if(_delegate)
{
_delegate->speaker_did_complete_samples(this, _buffer_in_progress.get(), _buffer_size);
}
}
unsigned int cycles_to_read = (unsigned int)(_buffer_size - _buffer_in_progress_pointer);
if(cycles_to_read > cycles_remaining) cycles_to_read = cycles_remaining;
input_cycles -= cycles_to_read;
}
return;
}
// if the output rate is less than the input rate, use the filter
if(_input_cycles_per_second > _output_cycles_per_second)
{
while(input_cycles)
{
unsigned int cycles_to_read = (unsigned int)std::min((int)input_cycles, _number_of_taps - _input_buffer_depth);
static_cast<T *>(this)->get_samples(cycles_to_read, &_input_buffer.get()[_input_buffer_depth]);
input_cycles -= cycles_to_read;
_input_buffer_depth += cycles_to_read;
if(_input_buffer_depth == _number_of_taps)
{
_buffer_in_progress.get()[_buffer_in_progress_pointer] = _filter->apply(_input_buffer.get());
_buffer_in_progress_pointer++;
static_cast<T *>(this)->get_samples(cycles_to_read, &_buffer_in_progress.get()[_buffer_in_progress_pointer]);
_buffer_in_progress_pointer += cycles_to_read;
// announce to delegate if full
if(_buffer_in_progress_pointer == _buffer_size)
@ -178,29 +150,61 @@ template <class T> class Filter: public Speaker {
}
}
// If the next loop around is going to reuse some of the samples just collected, use a memmove to
// preserve them in the correct locations (TODO: use a longer buffer to fix that) and don't skip
// anything. Otherwise skip as required to get to the next sample batch and don't expect to reuse.
uint64_t steps = _stepper->step();
if(steps < _number_of_taps)
{
int16_t *input_buffer = _input_buffer.get();
memmove(input_buffer, &input_buffer[steps], sizeof(int16_t) * ((size_t)_number_of_taps - (size_t)steps));
_input_buffer_depth -= steps;
}
else
{
if(steps > _number_of_taps)
static_cast<T *>(this)->skip_samples((unsigned int)steps - (unsigned int)_number_of_taps);
_input_buffer_depth = 0;
}
cycles_remaining -= cycles_to_read;
}
return;
}
return;
}
// if the output rate is less than the input rate, use the filter
if(_input_cycles_per_second > _output_cycles_per_second)
{
while(cycles_remaining)
{
unsigned int cycles_to_read = (unsigned int)std::min((int)cycles_remaining, _number_of_taps - _input_buffer_depth);
static_cast<T *>(this)->get_samples(cycles_to_read, &_input_buffer.get()[_input_buffer_depth]);
cycles_remaining -= cycles_to_read;
_input_buffer_depth += cycles_to_read;
if(_input_buffer_depth == _number_of_taps)
{
_buffer_in_progress.get()[_buffer_in_progress_pointer] = _filter->apply(_input_buffer.get());
_buffer_in_progress_pointer++;
// announce to delegate if full
if(_buffer_in_progress_pointer == _buffer_size)
{
_buffer_in_progress_pointer = 0;
if(_delegate)
{
_delegate->speaker_did_complete_samples(this, _buffer_in_progress.get(), _buffer_size);
}
}
// If the next loop around is going to reuse some of the samples just collected, use a memmove to
// preserve them in the correct locations (TODO: use a longer buffer to fix that) and don't skip
// anything. Otherwise skip as required to get to the next sample batch and don't expect to reuse.
uint64_t steps = _stepper->step();
if(steps < _number_of_taps)
{
int16_t *input_buffer = _input_buffer.get();
memmove(input_buffer, &input_buffer[steps], sizeof(int16_t) * ((size_t)_number_of_taps - (size_t)steps));
_input_buffer_depth -= steps;
}
else
{
if(steps > _number_of_taps)
static_cast<T *>(this)->skip_samples((unsigned int)steps - (unsigned int)_number_of_taps);
_input_buffer_depth = 0;
}
}
}
return;
}
// TODO: input rate is less than output rate
});
}
private: