From e994910ff67cfa1965ae2274694b918be5213650 Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Thu, 2 Jun 2022 16:46:41 -0400 Subject: [PATCH 1/4] Switch to `unique_ptr`. --- Concurrency/AsyncTaskQueue.cpp | 7 ++++--- Concurrency/AsyncTaskQueue.hpp | 4 +--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp index 798683c39..66b0fdaa3 100644 --- a/Concurrency/AsyncTaskQueue.cpp +++ b/Concurrency/AsyncTaskQueue.cpp @@ -88,16 +88,17 @@ DeferringAsyncTaskQueue::~DeferringAsyncTaskQueue() { void DeferringAsyncTaskQueue::defer(std::function function) { if(!deferred_tasks_) { - deferred_tasks_ = std::make_shared>>(); + deferred_tasks_ = std::make_unique>>(); } deferred_tasks_->push_back(function); } void DeferringAsyncTaskQueue::perform() { if(!deferred_tasks_) return; - std::shared_ptr>> deferred_tasks = deferred_tasks_; + auto deferred_tasks_raw = deferred_tasks_.release(); deferred_tasks_.reset(); - enqueue([deferred_tasks] { + enqueue([deferred_tasks_raw] { + std::unique_ptr>> deferred_tasks(deferred_tasks_raw); for(const auto &function : *deferred_tasks) { function(); } diff --git a/Concurrency/AsyncTaskQueue.hpp b/Concurrency/AsyncTaskQueue.hpp index da5d052c8..2962e329c 100644 --- a/Concurrency/AsyncTaskQueue.hpp +++ b/Concurrency/AsyncTaskQueue.hpp @@ -93,9 +93,7 @@ class DeferringAsyncTaskQueue: public AsyncTaskQueue { void flush(); private: - // TODO: this is a shared_ptr because of the issues capturing moveables in C++11; - // switch to a unique_ptr if/when adapting to C++14 - std::shared_ptr>> deferred_tasks_; + std::unique_ptr>> deferred_tasks_; }; } From 9d278d80f196b13cc39ead85a700b9b4d32c970f Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Thu, 2 Jun 2022 16:50:59 -0400 Subject: [PATCH 2/4] Remove redundant `reset`. --- Concurrency/AsyncTaskQueue.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp index 66b0fdaa3..be63ca571 100644 --- a/Concurrency/AsyncTaskQueue.cpp +++ b/Concurrency/AsyncTaskQueue.cpp @@ -96,7 +96,6 @@ void DeferringAsyncTaskQueue::defer(std::function function) { void DeferringAsyncTaskQueue::perform() { if(!deferred_tasks_) return; auto deferred_tasks_raw = deferred_tasks_.release(); - deferred_tasks_.reset(); enqueue([deferred_tasks_raw] { std::unique_ptr>> deferred_tasks(deferred_tasks_raw); for(const auto &function : *deferred_tasks) { From e389dcb9120de28b7a60d1d461bd0322eea54326 Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Thu, 2 Jun 2022 16:52:03 -0400 Subject: [PATCH 3/4] Further simplify syntax. --- Concurrency/AsyncTaskQueue.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp index be63ca571..38350d8df 100644 --- a/Concurrency/AsyncTaskQueue.cpp +++ b/Concurrency/AsyncTaskQueue.cpp @@ -95,8 +95,7 @@ void DeferringAsyncTaskQueue::defer(std::function function) { void DeferringAsyncTaskQueue::perform() { if(!deferred_tasks_) return; - auto deferred_tasks_raw = deferred_tasks_.release(); - enqueue([deferred_tasks_raw] { + enqueue([deferred_tasks_raw = deferred_tasks_.release()] { std::unique_ptr>> deferred_tasks(deferred_tasks_raw); for(const auto &function : *deferred_tasks) { function(); From 7f33a5ca0c37e1b658372b702dafc345f7f55935 Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Thu, 2 Jun 2022 17:02:36 -0400 Subject: [PATCH 4/4] Simplify: (i) repetitive type for `TaskList`; (ii) unnecessary `unique_ptr`. --- Concurrency/AsyncTaskQueue.cpp | 66 +++++++++++++++++----------------- Concurrency/AsyncTaskQueue.hpp | 14 ++++---- 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp index 38350d8df..12615430e 100644 --- a/Concurrency/AsyncTaskQueue.cpp +++ b/Concurrency/AsyncTaskQueue.cpp @@ -12,47 +12,47 @@ using namespace Concurrency; AsyncTaskQueue::AsyncTaskQueue() #ifndef USE_GCD - : should_destruct_(false) -#endif -{ -#ifdef USE_GCD - serial_dispatch_queue_ = dispatch_queue_create("com.thomasharte.clocksignal.asyntaskqueue", DISPATCH_QUEUE_SERIAL); + : + should_destruct_(false), + thread_([this] () { + while(!should_destruct_) { + std::function next_function; + + // Take lock, check for a new task. + std::unique_lock lock(queue_mutex_); + if(!pending_tasks_.empty()) { + next_function = pending_tasks_.front(); + pending_tasks_.pop_front(); + } + + if(next_function) { + // If there is a task, release lock and perform it. + lock.unlock(); + next_function(); + } else { + // If there isn't a task, atomically block on the processing condition and release the lock + // until there's something pending (and then release it again via scope). + processing_condition_.wait(lock); + } + } + }) #else - thread_ = std::make_unique([this]() { - while(!should_destruct_) { - std::function next_function; - - // Take lock, check for a new task - std::unique_lock lock(queue_mutex_); - if(!pending_tasks_.empty()) { - next_function = pending_tasks_.front(); - pending_tasks_.pop_front(); - } - - if(next_function) { - // If there is a task, release lock and perform it - lock.unlock(); - next_function(); - } else { - // If there isn't a task, atomically block on the processing condition and release the lock - // until there's something pending (and then release it again via scope) - processing_condition_.wait(lock); - } - } - }); + : serial_dispatch_queue_(dispatch_queue_create("com.thomasharte.clocksignal.asyntaskqueue", DISPATCH_QUEUE_SERIAL)) #endif -} +{} AsyncTaskQueue::~AsyncTaskQueue() { #ifdef USE_GCD flush(); dispatch_release(serial_dispatch_queue_); - serial_dispatch_queue_ = nullptr; #else + // Set should destruct, and then give the thread a bit of a nudge + // via an empty enqueue. should_destruct_ = true; enqueue([](){}); - thread_->join(); - thread_.reset(); + + // Wait for the thread safely to terminate. + thread_.join(); #endif } @@ -88,7 +88,7 @@ DeferringAsyncTaskQueue::~DeferringAsyncTaskQueue() { void DeferringAsyncTaskQueue::defer(std::function function) { if(!deferred_tasks_) { - deferred_tasks_ = std::make_unique>>(); + deferred_tasks_ = std::make_unique(); } deferred_tasks_->push_back(function); } @@ -96,7 +96,7 @@ void DeferringAsyncTaskQueue::defer(std::function function) { void DeferringAsyncTaskQueue::perform() { if(!deferred_tasks_) return; enqueue([deferred_tasks_raw = deferred_tasks_.release()] { - std::unique_ptr>> deferred_tasks(deferred_tasks_raw); + std::unique_ptr deferred_tasks(deferred_tasks_raw); for(const auto &function : *deferred_tasks) { function(); } diff --git a/Concurrency/AsyncTaskQueue.hpp b/Concurrency/AsyncTaskQueue.hpp index 2962e329c..225f49580 100644 --- a/Concurrency/AsyncTaskQueue.hpp +++ b/Concurrency/AsyncTaskQueue.hpp @@ -23,6 +23,8 @@ namespace Concurrency { +using TaskList = std::list>; + /*! An async task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed to be performed serially and asynchronously from the caller. A caller may also request to flush, @@ -51,12 +53,12 @@ class AsyncTaskQueue { #ifdef USE_GCD dispatch_queue_t serial_dispatch_queue_; #else - std::unique_ptr thread_; - - std::mutex queue_mutex_; - std::list> pending_tasks_; - std::condition_variable processing_condition_; std::atomic_bool should_destruct_; + std::condition_variable processing_condition_; + std::mutex queue_mutex_; + TaskList pending_tasks_; + + std::thread thread_; #endif }; @@ -93,7 +95,7 @@ class DeferringAsyncTaskQueue: public AsyncTaskQueue { void flush(); private: - std::unique_ptr>> deferred_tasks_; + std::unique_ptr deferred_tasks_; }; }