1
0
mirror of https://github.com/TomHarte/CLK.git synced 2026-04-25 11:17:26 +00:00

Generalise AsyncTaskQueue, DeferringAsyncTaskQueue and AsyncUpdater into a single template.

This commit is contained in:
Thomas Harte
2022-07-14 16:39:26 -04:00
parent 126838e7c7
commit bf03bda314
48 changed files with 218 additions and 358 deletions
+136 -58
View File
@@ -12,11 +12,11 @@
#include <atomic>
#include <condition_variable>
#include <functional>
#include <list>
#include <memory>
#include <thread>
#include <vector>
#include "../ClockReceiver/TimeTypes.hpp"
#if defined(__APPLE__) && !defined(IGNORE_APPLE)
#include <dispatch/dispatch.h>
#define USE_GCD
@@ -24,81 +24,159 @@
namespace Concurrency {
using TaskList = std::vector<std::function<void(void)>>;
/// An implementation detail; provides the time-centric part of a TaskQueue with a real Performer.
template <typename Performer> struct TaskQueueStorage {
template <typename... Args> TaskQueueStorage(Args&&... args) :
performer(std::forward<Args>(args)...),
last_fired_(Time::nanos_now()) {}
/*!
An async task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed
to be performed serially and asynchronously from the caller. A caller may also request to flush,
causing it to block until all previously-enqueued functions are complete.
*/
class AsyncTaskQueue {
public:
AsyncTaskQueue();
virtual ~AsyncTaskQueue();
Performer performer;
/*!
Adds @c function to the queue.
@discussion Functions will be performed serially and asynchronously. This method is safe to
call from multiple threads.
@parameter function The function to enqueue.
*/
void enqueue(std::function<void(void)> function);
/*!
Blocks the caller until all previously-enqueud functions have completed.
*/
void flush();
protected:
void update() {
auto time_now = Time::nanos_now();
performer.perform(time_now - last_fired_);
last_fired_ = time_now;
}
private:
#ifdef USE_GCD
dispatch_queue_t serial_dispatch_queue_;
#else
std::atomic_bool should_destruct_;
std::condition_variable processing_condition_;
std::mutex queue_mutex_;
std::list<std::function<void(void)>> pending_tasks_;
Time::Nanos last_fired_;
};
std::thread thread_;
#endif
/// An implementation detail; provides a no-op implementation of time advances for TaskQueues without a Performer.
template <> struct TaskQueueStorage<int> {
TaskQueueStorage() {}
protected:
void update() {}
};
/*!
A deferring async task queue is one that accepts a list of functions to be performed but defers
any action until told to perform. It performs them by enquing a single asynchronous task that will
perform the deferred tasks in order.
A task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed
to be performed serially and asynchronously from the caller.
It therefore offers similar semantics to an asynchronous task queue, but allows for management of
synchronisation costs, since neither defer nor perform make any effort to be thread safe.
If @c perform_automatically is true, functions will be performed as soon as is possible,
at the cost of thread synchronisation.
If @c perform_automatically is false, functions will be queued up and not dispatched
until a call to perform().
If a @c Performer type is supplied then a public member, @c performer will be constructed
with the arguments supplied to TaskQueue's constructor, and that class will receive calls of the
form @c .perform(nanos) to update it to every batch of new actions.
*/
class DeferringAsyncTaskQueue: public AsyncTaskQueue {
template <bool perform_automatically, typename Performer = int> class TaskQueue: public TaskQueueStorage<Performer> {
public:
~DeferringAsyncTaskQueue();
template <typename... Args> TaskQueue(Args&&... args) :
TaskQueueStorage<Performer>(std::forward<Args>(args)...),
thread_{
[this] {
ActionVector actions;
/*!
Adds a function to the deferral list.
while(!should_quit) {
// Wait for new actions to be signalled, and grab them.
std::unique_lock lock(condition_mutex_);
while(actions_.empty()) {
condition_.wait(lock);
}
std::swap(actions, actions_);
lock.unlock();
This is not thread safe; it should be serialised with other calls to itself and to perform.
*/
void defer(std::function<void(void)> function);
// Update to now (which is possibly a no-op).
TaskQueueStorage<Performer>::update();
/*!
Enqueues a function that will perform all currently deferred functions, in the
order that they were deferred.
// Perform the actions and destroy them.
for(const auto &action: actions) {
action();
}
actions.clear();
}
}
} {}
This is not thread safe; it should be serialised with other calls to itself and to defer.
*/
void perform();
/// Enqueus @c post_action to be performed asynchronously at some point
/// in the future. If @c perform_automatically is @c true then the action
/// will be performed as soon as possible. Otherwise it will sit unsheculed until
/// a call to @c perform().
///
/// Actions may be elided.
///
/// If this TaskQueue has a @c Performer then the action will be performed
/// on the same thread as the performer, after the performer has been updated
/// to 'now'.
void enqueue(const std::function<void(void)> &post_action) {
std::lock_guard guard(condition_mutex_);
actions_.push_back(post_action);
/*!
Blocks the caller until all previously-enqueud functions have completed.
*/
void flush();
if constexpr (perform_automatically) {
condition_.notify_all();
}
}
/// Causes any enqueued actions that are not yet scheduled to be scheduled.
void perform() {
if(actions_.empty()) {
return;
}
condition_.notify_all();
}
/// Permanently stops this task queue, blocking until that has happened.
/// All pending actions will be performed first.
///
/// The queue cannot be restarted; this is a destructive action.
void stop() {
if(thread_.joinable()) {
should_quit = true;
enqueue([] {});
if constexpr (!perform_automatically) {
perform();
}
thread_.join();
}
}
/// Schedules any remaining unscheduled work, then blocks synchronously
/// until all scheduled work has been performed.
void flush() {
std::mutex flush_mutex;
std::condition_variable flush_condition;
bool has_run = false;
std::unique_lock lock(flush_mutex);
enqueue([&flush_mutex, &flush_condition, &has_run] () {
std::unique_lock inner_lock(flush_mutex);
has_run = true;
flush_condition.notify_all();
});
if constexpr (!perform_automatically) {
perform();
}
flush_condition.wait(lock, [&has_run] { return has_run; });
}
~TaskQueue() {
stop();
}
private:
std::unique_ptr<TaskList> deferred_tasks_;
// The list of actions waiting be performed. These will be elided,
// increasing their latency, if the emulation thread falls behind.
using ActionVector = std::vector<std::function<void(void)>>;
ActionVector actions_;
// Necessary synchronisation parts.
std::atomic<bool> should_quit = false;
std::mutex condition_mutex_;
std::condition_variable condition_;
// Ensure the thread isn't constructed until after the mutex
// and condition variable.
std::thread thread_;
};
}
#endif /* Concurrency_hpp */
#endif /* AsyncTaskQueue_hpp */