1
0
mirror of https://github.com/TomHarte/CLK.git synced 2024-07-24 12:29:06 +00:00

Merge pull request #1039 from TomHarte/UniqueAsync

Switch DeferringAsyncTaskQueue to `unique_ptr`.
This commit is contained in:
Thomas Harte 2022-06-02 17:06:59 -04:00 committed by GitHub
commit 103de74063
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 43 deletions

View File

@ -12,47 +12,47 @@ using namespace Concurrency;
AsyncTaskQueue::AsyncTaskQueue() AsyncTaskQueue::AsyncTaskQueue()
#ifndef USE_GCD #ifndef USE_GCD
: should_destruct_(false) :
#endif should_destruct_(false),
{ thread_([this] () {
#ifdef USE_GCD while(!should_destruct_) {
serial_dispatch_queue_ = dispatch_queue_create("com.thomasharte.clocksignal.asyntaskqueue", DISPATCH_QUEUE_SERIAL); std::function<void(void)> next_function;
// Take lock, check for a new task.
std::unique_lock lock(queue_mutex_);
if(!pending_tasks_.empty()) {
next_function = pending_tasks_.front();
pending_tasks_.pop_front();
}
if(next_function) {
// If there is a task, release lock and perform it.
lock.unlock();
next_function();
} else {
// If there isn't a task, atomically block on the processing condition and release the lock
// until there's something pending (and then release it again via scope).
processing_condition_.wait(lock);
}
}
})
#else #else
thread_ = std::make_unique<std::thread>([this]() { : serial_dispatch_queue_(dispatch_queue_create("com.thomasharte.clocksignal.asyntaskqueue", DISPATCH_QUEUE_SERIAL))
while(!should_destruct_) {
std::function<void(void)> next_function;
// Take lock, check for a new task
std::unique_lock lock(queue_mutex_);
if(!pending_tasks_.empty()) {
next_function = pending_tasks_.front();
pending_tasks_.pop_front();
}
if(next_function) {
// If there is a task, release lock and perform it
lock.unlock();
next_function();
} else {
// If there isn't a task, atomically block on the processing condition and release the lock
// until there's something pending (and then release it again via scope)
processing_condition_.wait(lock);
}
}
});
#endif #endif
} {}
AsyncTaskQueue::~AsyncTaskQueue() { AsyncTaskQueue::~AsyncTaskQueue() {
#ifdef USE_GCD #ifdef USE_GCD
flush(); flush();
dispatch_release(serial_dispatch_queue_); dispatch_release(serial_dispatch_queue_);
serial_dispatch_queue_ = nullptr;
#else #else
// Set should destruct, and then give the thread a bit of a nudge
// via an empty enqueue.
should_destruct_ = true; should_destruct_ = true;
enqueue([](){}); enqueue([](){});
thread_->join();
thread_.reset(); // Wait for the thread safely to terminate.
thread_.join();
#endif #endif
} }
@ -88,16 +88,15 @@ DeferringAsyncTaskQueue::~DeferringAsyncTaskQueue() {
void DeferringAsyncTaskQueue::defer(std::function<void(void)> function) { void DeferringAsyncTaskQueue::defer(std::function<void(void)> function) {
if(!deferred_tasks_) { if(!deferred_tasks_) {
deferred_tasks_ = std::make_shared<std::list<std::function<void(void)>>>(); deferred_tasks_ = std::make_unique<TaskList>();
} }
deferred_tasks_->push_back(function); deferred_tasks_->push_back(function);
} }
void DeferringAsyncTaskQueue::perform() { void DeferringAsyncTaskQueue::perform() {
if(!deferred_tasks_) return; if(!deferred_tasks_) return;
std::shared_ptr<std::list<std::function<void(void)>>> deferred_tasks = deferred_tasks_; enqueue([deferred_tasks_raw = deferred_tasks_.release()] {
deferred_tasks_.reset(); std::unique_ptr<TaskList> deferred_tasks(deferred_tasks_raw);
enqueue([deferred_tasks] {
for(const auto &function : *deferred_tasks) { for(const auto &function : *deferred_tasks) {
function(); function();
} }

View File

@ -23,6 +23,8 @@
namespace Concurrency { namespace Concurrency {
using TaskList = std::list<std::function<void(void)>>;
/*! /*!
An async task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed An async task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed
to be performed serially and asynchronously from the caller. A caller may also request to flush, to be performed serially and asynchronously from the caller. A caller may also request to flush,
@ -51,12 +53,12 @@ class AsyncTaskQueue {
#ifdef USE_GCD #ifdef USE_GCD
dispatch_queue_t serial_dispatch_queue_; dispatch_queue_t serial_dispatch_queue_;
#else #else
std::unique_ptr<std::thread> thread_;
std::mutex queue_mutex_;
std::list<std::function<void(void)>> pending_tasks_;
std::condition_variable processing_condition_;
std::atomic_bool should_destruct_; std::atomic_bool should_destruct_;
std::condition_variable processing_condition_;
std::mutex queue_mutex_;
TaskList pending_tasks_;
std::thread thread_;
#endif #endif
}; };
@ -93,9 +95,7 @@ class DeferringAsyncTaskQueue: public AsyncTaskQueue {
void flush(); void flush();
private: private:
// TODO: this is a shared_ptr because of the issues capturing moveables in C++11; std::unique_ptr<TaskList> deferred_tasks_;
// switch to a unique_ptr if/when adapting to C++14
std::shared_ptr<std::list<std::function<void(void)>>> deferred_tasks_;
}; };
} }