From 42584013841751df69b0e97584b7b701e6e7ff6e Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Wed, 19 Oct 2016 21:15:04 -0400 Subject: [PATCH] Implemented `flush`, added a call to it from the filter speaker's destructor, to ensure no race conditions on accessing the various bits of instance state there and below. --- Concurrency/AsyncTaskQueue.cpp | 14 ++++++++++---- Concurrency/AsyncTaskQueue.hpp | 2 +- Outputs/Speaker.hpp | 20 ++++++++++++++++++-- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp index 75ae57175..1aa544c5e 100644 --- a/Concurrency/AsyncTaskQueue.cpp +++ b/Concurrency/AsyncTaskQueue.cpp @@ -46,7 +46,7 @@ AsyncTaskQueue::~AsyncTaskQueue() should_destruct_ = true; enqueue([](){}); thread_->join(); - thread_.reset( ); + thread_.reset(); } void AsyncTaskQueue::enqueue(std::function function) @@ -56,8 +56,14 @@ void AsyncTaskQueue::enqueue(std::function function) processing_condition_.notify_all(); } -void AsyncTaskQueue::synchronise() +void AsyncTaskQueue::flush() { - // TODO -// std::mutex + std::shared_ptr flush_mutex(new std::mutex); + std::shared_ptr flush_condition(new std::condition_variable); + std::unique_lock lock(*flush_mutex); + enqueue([=] () { + std::unique_lock inner_lock(*flush_mutex); + flush_condition->notify_all(); + }); + flush_condition->wait(lock); } diff --git a/Concurrency/AsyncTaskQueue.hpp b/Concurrency/AsyncTaskQueue.hpp index 6d06f2264..77485265a 100644 --- a/Concurrency/AsyncTaskQueue.hpp +++ b/Concurrency/AsyncTaskQueue.hpp @@ -39,7 +39,7 @@ class AsyncTaskQueue { /*! Blocks the caller until all previously-enqueud functions have completed. */ - void synchronise(); + void flush(); private: std::unique_ptr thread_; diff --git a/Outputs/Speaker.hpp b/Outputs/Speaker.hpp index 04ff49c8f..918dbe4a6 100644 --- a/Outputs/Speaker.hpp +++ b/Outputs/Speaker.hpp @@ -26,7 +26,7 @@ namespace Outputs { Intended to be a parent class, allowing descendants to pick the strategy by which input samples are mapped to output samples. */ -class Speaker: public Concurrency::AsyncTaskQueue { +class Speaker { public: class Delegate { public: @@ -85,9 +85,18 @@ class Speaker: public Concurrency::AsyncTaskQueue { set_needs_updated_filter_coefficients(); } - Speaker() : _buffer_in_progress_pointer(0), _requested_number_of_taps(0), _high_frequency_cut_off(-1.0) {} + Speaker() : _buffer_in_progress_pointer(0), _requested_number_of_taps(0), _high_frequency_cut_off(-1.0), _queue(new Concurrency::AsyncTaskQueue) {} protected: + void enqueue(std::function function) + { + _queue->enqueue(function); + } + void flush() + { + _queue->flush(); + } + std::unique_ptr _buffer_in_progress; float _high_frequency_cut_off; int _buffer_size; @@ -109,6 +118,8 @@ class Speaker: public Concurrency::AsyncTaskQueue { int16_t throwaway_samples[quantity]; get_samples(quantity, throwaway_samples); } + + std::unique_ptr _queue; }; /*! @@ -123,6 +134,11 @@ class Speaker: public Concurrency::AsyncTaskQueue { */ template class Filter: public Speaker { public: + ~Filter() + { + flush(); + } + void run_for_cycles(unsigned int input_cycles) { enqueue([=]() {