diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp index 798683c39..12615430e 100644 --- a/Concurrency/AsyncTaskQueue.cpp +++ b/Concurrency/AsyncTaskQueue.cpp @@ -12,47 +12,47 @@ using namespace Concurrency; AsyncTaskQueue::AsyncTaskQueue() #ifndef USE_GCD - : should_destruct_(false) -#endif -{ -#ifdef USE_GCD - serial_dispatch_queue_ = dispatch_queue_create("com.thomasharte.clocksignal.asyntaskqueue", DISPATCH_QUEUE_SERIAL); + : + should_destruct_(false), + thread_([this] () { + while(!should_destruct_) { + std::function next_function; + + // Take lock, check for a new task. + std::unique_lock lock(queue_mutex_); + if(!pending_tasks_.empty()) { + next_function = pending_tasks_.front(); + pending_tasks_.pop_front(); + } + + if(next_function) { + // If there is a task, release lock and perform it. + lock.unlock(); + next_function(); + } else { + // If there isn't a task, atomically block on the processing condition and release the lock + // until there's something pending (and then release it again via scope). + processing_condition_.wait(lock); + } + } + }) #else - thread_ = std::make_unique([this]() { - while(!should_destruct_) { - std::function next_function; - - // Take lock, check for a new task - std::unique_lock lock(queue_mutex_); - if(!pending_tasks_.empty()) { - next_function = pending_tasks_.front(); - pending_tasks_.pop_front(); - } - - if(next_function) { - // If there is a task, release lock and perform it - lock.unlock(); - next_function(); - } else { - // If there isn't a task, atomically block on the processing condition and release the lock - // until there's something pending (and then release it again via scope) - processing_condition_.wait(lock); - } - } - }); + : serial_dispatch_queue_(dispatch_queue_create("com.thomasharte.clocksignal.asyntaskqueue", DISPATCH_QUEUE_SERIAL)) #endif -} +{} AsyncTaskQueue::~AsyncTaskQueue() { #ifdef USE_GCD flush(); dispatch_release(serial_dispatch_queue_); - serial_dispatch_queue_ = nullptr; #else + // Set should destruct, and then give the thread a bit of a nudge + // via an empty enqueue. should_destruct_ = true; enqueue([](){}); - thread_->join(); - thread_.reset(); + + // Wait for the thread safely to terminate. + thread_.join(); #endif } @@ -88,16 +88,15 @@ DeferringAsyncTaskQueue::~DeferringAsyncTaskQueue() { void DeferringAsyncTaskQueue::defer(std::function function) { if(!deferred_tasks_) { - deferred_tasks_ = std::make_shared>>(); + deferred_tasks_ = std::make_unique(); } deferred_tasks_->push_back(function); } void DeferringAsyncTaskQueue::perform() { if(!deferred_tasks_) return; - std::shared_ptr>> deferred_tasks = deferred_tasks_; - deferred_tasks_.reset(); - enqueue([deferred_tasks] { + enqueue([deferred_tasks_raw = deferred_tasks_.release()] { + std::unique_ptr deferred_tasks(deferred_tasks_raw); for(const auto &function : *deferred_tasks) { function(); } diff --git a/Concurrency/AsyncTaskQueue.hpp b/Concurrency/AsyncTaskQueue.hpp index da5d052c8..225f49580 100644 --- a/Concurrency/AsyncTaskQueue.hpp +++ b/Concurrency/AsyncTaskQueue.hpp @@ -23,6 +23,8 @@ namespace Concurrency { +using TaskList = std::list>; + /*! An async task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed to be performed serially and asynchronously from the caller. A caller may also request to flush, @@ -51,12 +53,12 @@ class AsyncTaskQueue { #ifdef USE_GCD dispatch_queue_t serial_dispatch_queue_; #else - std::unique_ptr thread_; - - std::mutex queue_mutex_; - std::list> pending_tasks_; - std::condition_variable processing_condition_; std::atomic_bool should_destruct_; + std::condition_variable processing_condition_; + std::mutex queue_mutex_; + TaskList pending_tasks_; + + std::thread thread_; #endif }; @@ -93,9 +95,7 @@ class DeferringAsyncTaskQueue: public AsyncTaskQueue { void flush(); private: - // TODO: this is a shared_ptr because of the issues capturing moveables in C++11; - // switch to a unique_ptr if/when adapting to C++14 - std::shared_ptr>> deferred_tasks_; + std::unique_ptr deferred_tasks_; }; }