From e53455a936cf422ec3e72d29fbb9ded085932790 Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Fri, 7 Oct 2016 16:56:34 -0400 Subject: [PATCH] =?UTF-8?q?Not=20having=20read=20the=20C++=20synchronisati?= =?UTF-8?q?on=20primitives=20before,=20this=20async=20task=20queue=20is=20?= =?UTF-8?q?probably=20incorrect.=20But=20nevertheless,=20let's=20have=20a?= =?UTF-8?q?=20quick=20go=20at=20employing=20it=20=E2=80=94=20in=20a=20hide?= =?UTF-8?q?ously=20thread=20unsafe=20fashion=20=E2=80=94=20for=20audio=20g?= =?UTF-8?q?eneration.=20What=20can=20possibly=20go=20wrong=3F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Concurrency/AsyncTaskQueue.cpp | 62 +++++++++ Concurrency/AsyncTaskQueue.hpp | 39 ++++++ .../Clock Signal.xcodeproj/project.pbxproj | 14 ++ Outputs/Speaker.hpp | 120 +++++++++--------- 4 files changed, 177 insertions(+), 58 deletions(-) create mode 100644 Concurrency/AsyncTaskQueue.cpp create mode 100644 Concurrency/AsyncTaskQueue.hpp diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp new file mode 100644 index 000000000..7f5e5a68d --- /dev/null +++ b/Concurrency/AsyncTaskQueue.cpp @@ -0,0 +1,62 @@ +// +// AsyncTaskQueue.cpp +// Clock Signal +// +// Created by Thomas Harte on 07/10/2016. +// Copyright © 2016 Thomas Harte. All rights reserved. +// + +#include "AsyncTaskQueue.hpp" + +using namespace Concurrency; + +AsyncTaskQueue::AsyncTaskQueue() : should_destruct_(false) +{ + thread_.reset(new std::thread([this]() { + while(!should_destruct_) + { + std::function next_function; + + queue_mutex_.lock(); + if(!pending_tasks_.empty()) + { + next_function = pending_tasks_.front(); + pending_tasks_.pop_front(); + } + queue_mutex_.unlock(); + + if(next_function) + { + next_function(); + } + else + { + std::unique_lock lock(queue_mutex_); + processing_condition_.wait(lock); + lock.unlock(); + } + } + })); +} + +AsyncTaskQueue::~AsyncTaskQueue() +{ + should_destruct_ = true; + enqueue([](){}); +} + +void AsyncTaskQueue::enqueue(std::function function) +{ + queue_mutex_.lock(); + pending_tasks_.push_back(function); + queue_mutex_.unlock(); + + std::lock_guard lock(queue_mutex_); + processing_condition_.notify_all(); +} + +void AsyncTaskQueue::synchronise() +{ + // TODO +// std::mutex +} diff --git a/Concurrency/AsyncTaskQueue.hpp b/Concurrency/AsyncTaskQueue.hpp new file mode 100644 index 000000000..00653d420 --- /dev/null +++ b/Concurrency/AsyncTaskQueue.hpp @@ -0,0 +1,39 @@ +// +// AsyncTaskQueue.hpp +// Clock Signal +// +// Created by Thomas Harte on 07/10/2016. +// Copyright © 2016 Thomas Harte. All rights reserved. +// + +#ifndef AsyncTaskQueue_hpp +#define AsyncTaskQueue_hpp + +#include +#include +#include +#include + +namespace Concurrency { + +class AsyncTaskQueue { + + public: + AsyncTaskQueue(); + ~AsyncTaskQueue(); + + void enqueue(std::function function); + void synchronise(); + + private: + std::unique_ptr thread_; + + std::mutex queue_mutex_; + std::list> pending_tasks_; + std::condition_variable processing_condition_; + bool should_destruct_; +}; + +} + +#endif /* Concurrency_hpp */ diff --git a/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj b/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj index a7ea419a9..5f5d8f631 100644 --- a/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj +++ b/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj @@ -30,6 +30,7 @@ 4B30512D1D989E2200B4FED8 /* Drive.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B30512B1D989E2200B4FED8 /* Drive.cpp */; }; 4B3051301D98ACC600B4FED8 /* Plus3.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B30512E1D98ACC600B4FED8 /* Plus3.cpp */; }; 4B37EE821D7345A6006A09A4 /* BinaryDump.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B37EE801D7345A6006A09A4 /* BinaryDump.cpp */; }; + 4B3940E71DA83C8300427841 /* AsyncTaskQueue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */; }; 4B3BA0C31D318AEC005DD7A7 /* C1540Tests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C21D318AEB005DD7A7 /* C1540Tests.swift */; }; 4B3BA0CE1D318B44005DD7A7 /* C1540Bridge.mm in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C61D318B44005DD7A7 /* C1540Bridge.mm */; }; 4B3BA0CF1D318B44005DD7A7 /* MOS6522Bridge.mm in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C91D318B44005DD7A7 /* MOS6522Bridge.mm */; }; @@ -438,6 +439,8 @@ 4B30512F1D98ACC600B4FED8 /* Plus3.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = Plus3.hpp; path = Electron/Plus3.hpp; sourceTree = ""; }; 4B37EE801D7345A6006A09A4 /* BinaryDump.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = BinaryDump.cpp; sourceTree = ""; }; 4B37EE811D7345A6006A09A4 /* BinaryDump.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = BinaryDump.hpp; sourceTree = ""; }; + 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = AsyncTaskQueue.cpp; path = ../../Concurrency/AsyncTaskQueue.cpp; sourceTree = ""; }; + 4B3940E61DA83C8300427841 /* AsyncTaskQueue.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = AsyncTaskQueue.hpp; path = ../../Concurrency/AsyncTaskQueue.hpp; sourceTree = ""; }; 4B3BA0C21D318AEB005DD7A7 /* C1540Tests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = C1540Tests.swift; sourceTree = ""; }; 4B3BA0C51D318B44005DD7A7 /* C1540Bridge.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = C1540Bridge.h; sourceTree = ""; }; 4B3BA0C61D318B44005DD7A7 /* C1540Bridge.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = C1540Bridge.mm; sourceTree = ""; }; @@ -1003,6 +1006,15 @@ name = Outputs; sourceTree = ""; }; + 4B3940E81DA83C8700427841 /* Concurrency */ = { + isa = PBXGroup; + children = ( + 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */, + 4B3940E61DA83C8300427841 /* AsyncTaskQueue.hpp */, + ); + name = Concurrency; + sourceTree = ""; + }; 4B3BA0C41D318B44005DD7A7 /* Bridges */ = { isa = PBXGroup; children = ( @@ -1469,6 +1481,7 @@ 4BB73E951B587A5100552FC2 = { isa = PBXGroup; children = ( + 4B3940E81DA83C8700427841 /* Concurrency */, 4BC76E6A1C98F43700E6EF73 /* Accelerate.framework */, 4BB73EA01B587A5100552FC2 /* Clock Signal */, 4BB73EB51B587A5100552FC2 /* Clock SignalTests */, @@ -2185,6 +2198,7 @@ 4B1E85751D170228001EF87D /* Typer.cpp in Sources */, 4BF829631D8F536B001BAE39 /* SSD.cpp in Sources */, 4B2E2D9D1C3A070400138695 /* Electron.cpp in Sources */, + 4B3940E71DA83C8300427841 /* AsyncTaskQueue.cpp in Sources */, 4BAB62B81D3302CA00DF5BA0 /* PCMTrack.cpp in Sources */, 4B69FB3D1C4D908A00B5F0AA /* Tape.cpp in Sources */, 4B8FE2291DA1EDDF0090D3CE /* ElectronOptionsPanel.swift in Sources */, diff --git a/Outputs/Speaker.hpp b/Outputs/Speaker.hpp index 413a37bf4..1d3b0a560 100644 --- a/Outputs/Speaker.hpp +++ b/Outputs/Speaker.hpp @@ -14,6 +14,7 @@ #include #include "../SignalProcessing/Stepper.hpp" #include "../SignalProcessing/FIRFilter.hpp" +#include "../Concurrency/AsyncTaskQueue.hpp" namespace Outputs { @@ -25,7 +26,7 @@ namespace Outputs { Intended to be a parent class, allowing descendants to pick the strategy by which input samples are mapped to output samples. */ -class Speaker { +class Speaker: public Concurrency::AsyncTaskQueue { public: class Delegate { public: @@ -124,49 +125,20 @@ template class Filter: public Speaker { public: void run_for_cycles(unsigned int input_cycles) { - if(_coefficients_are_dirty) update_filter_coefficients(); + enqueue([=]() { + unsigned int cycles_remaining = input_cycles; + if(_coefficients_are_dirty) update_filter_coefficients(); - // if input and output rates exactly match, just accumulate results and pass on - if(_input_cycles_per_second == _output_cycles_per_second && _high_frequency_cut_off < 0.0) - { - while(input_cycles) + // if input and output rates exactly match, just accumulate results and pass on + if(_input_cycles_per_second == _output_cycles_per_second && _high_frequency_cut_off < 0.0) { - unsigned int cycles_to_read = (unsigned int)(_buffer_size - _buffer_in_progress_pointer); - if(cycles_to_read > input_cycles) cycles_to_read = input_cycles; - - static_cast(this)->get_samples(cycles_to_read, &_buffer_in_progress.get()[_buffer_in_progress_pointer]); - _buffer_in_progress_pointer += cycles_to_read; - - // announce to delegate if full - if(_buffer_in_progress_pointer == _buffer_size) + while(cycles_remaining) { - _buffer_in_progress_pointer = 0; - if(_delegate) - { - _delegate->speaker_did_complete_samples(this, _buffer_in_progress.get(), _buffer_size); - } - } + unsigned int cycles_to_read = (unsigned int)(_buffer_size - _buffer_in_progress_pointer); + if(cycles_to_read > cycles_remaining) cycles_to_read = cycles_remaining; - input_cycles -= cycles_to_read; - } - - return; - } - - // if the output rate is less than the input rate, use the filter - if(_input_cycles_per_second > _output_cycles_per_second) - { - while(input_cycles) - { - unsigned int cycles_to_read = (unsigned int)std::min((int)input_cycles, _number_of_taps - _input_buffer_depth); - static_cast(this)->get_samples(cycles_to_read, &_input_buffer.get()[_input_buffer_depth]); - input_cycles -= cycles_to_read; - _input_buffer_depth += cycles_to_read; - - if(_input_buffer_depth == _number_of_taps) - { - _buffer_in_progress.get()[_buffer_in_progress_pointer] = _filter->apply(_input_buffer.get()); - _buffer_in_progress_pointer++; + static_cast(this)->get_samples(cycles_to_read, &_buffer_in_progress.get()[_buffer_in_progress_pointer]); + _buffer_in_progress_pointer += cycles_to_read; // announce to delegate if full if(_buffer_in_progress_pointer == _buffer_size) @@ -178,29 +150,61 @@ template class Filter: public Speaker { } } - // If the next loop around is going to reuse some of the samples just collected, use a memmove to - // preserve them in the correct locations (TODO: use a longer buffer to fix that) and don't skip - // anything. Otherwise skip as required to get to the next sample batch and don't expect to reuse. - uint64_t steps = _stepper->step(); - if(steps < _number_of_taps) - { - int16_t *input_buffer = _input_buffer.get(); - memmove(input_buffer, &input_buffer[steps], sizeof(int16_t) * ((size_t)_number_of_taps - (size_t)steps)); - _input_buffer_depth -= steps; - } - else - { - if(steps > _number_of_taps) - static_cast(this)->skip_samples((unsigned int)steps - (unsigned int)_number_of_taps); - _input_buffer_depth = 0; - } + cycles_remaining -= cycles_to_read; } + + return; } - return; - } + // if the output rate is less than the input rate, use the filter + if(_input_cycles_per_second > _output_cycles_per_second) + { + while(cycles_remaining) + { + unsigned int cycles_to_read = (unsigned int)std::min((int)cycles_remaining, _number_of_taps - _input_buffer_depth); + static_cast(this)->get_samples(cycles_to_read, &_input_buffer.get()[_input_buffer_depth]); + cycles_remaining -= cycles_to_read; + _input_buffer_depth += cycles_to_read; + + if(_input_buffer_depth == _number_of_taps) + { + _buffer_in_progress.get()[_buffer_in_progress_pointer] = _filter->apply(_input_buffer.get()); + _buffer_in_progress_pointer++; + + // announce to delegate if full + if(_buffer_in_progress_pointer == _buffer_size) + { + _buffer_in_progress_pointer = 0; + if(_delegate) + { + _delegate->speaker_did_complete_samples(this, _buffer_in_progress.get(), _buffer_size); + } + } + + // If the next loop around is going to reuse some of the samples just collected, use a memmove to + // preserve them in the correct locations (TODO: use a longer buffer to fix that) and don't skip + // anything. Otherwise skip as required to get to the next sample batch and don't expect to reuse. + uint64_t steps = _stepper->step(); + if(steps < _number_of_taps) + { + int16_t *input_buffer = _input_buffer.get(); + memmove(input_buffer, &input_buffer[steps], sizeof(int16_t) * ((size_t)_number_of_taps - (size_t)steps)); + _input_buffer_depth -= steps; + } + else + { + if(steps > _number_of_taps) + static_cast(this)->skip_samples((unsigned int)steps - (unsigned int)_number_of_taps); + _input_buffer_depth = 0; + } + } + } + + return; + } // TODO: input rate is less than output rate + }); } private: