// // AsyncTaskQueue.cpp // Clock Signal // // Created by Thomas Harte on 07/10/2016. // Copyright 2016 Thomas Harte. All rights reserved. // #include "AsyncTaskQueue.hpp" using namespace Concurrency; AsyncTaskQueue::AsyncTaskQueue() #ifndef USE_GCD : should_destruct_(false), thread_([this] () { while(!should_destruct_) { std::function next_function; // Take lock, check for a new task. std::unique_lock lock(queue_mutex_); if(!pending_tasks_.empty()) { next_function = pending_tasks_.front(); pending_tasks_.pop_front(); } if(next_function) { // If there is a task, release lock and perform it. lock.unlock(); next_function(); } else { // If there isn't a task, atomically block on the processing condition and release the lock // until there's something pending (and then release it again via scope). processing_condition_.wait(lock); } } }) #else : serial_dispatch_queue_(dispatch_queue_create("com.thomasharte.clocksignal.asyntaskqueue", DISPATCH_QUEUE_SERIAL)) #endif {} AsyncTaskQueue::~AsyncTaskQueue() { #ifdef USE_GCD flush(); dispatch_release(serial_dispatch_queue_); #else // Set should destruct, and then give the thread a bit of a nudge // via an empty enqueue. should_destruct_ = true; enqueue([](){}); // Wait for the thread safely to terminate. thread_.join(); #endif } void AsyncTaskQueue::enqueue(std::function function) { #ifdef USE_GCD dispatch_async(serial_dispatch_queue_, ^{function();}); #else std::lock_guard lock(queue_mutex_); pending_tasks_.push_back(function); processing_condition_.notify_all(); #endif } void AsyncTaskQueue::flush() { #ifdef USE_GCD dispatch_sync(serial_dispatch_queue_, ^{}); #else auto flush_mutex = std::make_shared(); auto flush_condition = std::make_shared(); std::unique_lock lock(*flush_mutex); enqueue([=] () { std::unique_lock inner_lock(*flush_mutex); flush_condition->notify_all(); }); flush_condition->wait(lock); #endif } DeferringAsyncTaskQueue::~DeferringAsyncTaskQueue() { perform(); flush(); } void DeferringAsyncTaskQueue::defer(std::function function) { if(!deferred_tasks_) { deferred_tasks_ = std::make_unique(); deferred_tasks_->reserve(16); } deferred_tasks_->push_back(function); } void DeferringAsyncTaskQueue::perform() { if(!deferred_tasks_) return; enqueue([deferred_tasks_raw = deferred_tasks_.release()] { std::unique_ptr deferred_tasks(deferred_tasks_raw); for(const auto &function : *deferred_tasks) { function(); } }); } void DeferringAsyncTaskQueue::flush() { perform(); AsyncTaskQueue::flush(); }