1
0
mirror of https://github.com/TomHarte/CLK.git synced 2025-11-24 13:17:41 +00:00

Enforce perform_automatically, start_immediately; relax Boolean access order.

This commit is contained in:
Thomas Harte
2025-11-09 00:17:39 -05:00
parent c5704aaaff
commit 902f388cb1

View File

@@ -74,7 +74,7 @@ public:
template <typename... Args> AsyncTaskQueue(Args&&... args) : template <typename... Args> AsyncTaskQueue(Args&&... args) :
TaskQueueStorage<Performer>(std::forward<Args>(args)...) { TaskQueueStorage<Performer>(std::forward<Args>(args)...) {
if constexpr (start_immediately) { if constexpr (start_immediately) {
start(); start_impl();
} }
} }
@@ -99,6 +99,7 @@ public:
/// Causes any enqueued actions that are not yet scheduled to be scheduled. /// Causes any enqueued actions that are not yet scheduled to be scheduled.
void perform() { void perform() {
static_assert(!perform_automatically);
if(actions_.empty()) { if(actions_.empty()) {
return; return;
} }
@@ -111,7 +112,7 @@ public:
/// The queue cannot be restarted; this is a destructive action. /// The queue cannot be restarted; this is a destructive action.
void stop() { void stop() {
if(thread_.joinable()) { if(thread_.joinable()) {
should_quit_ = true; should_quit_.store(true, std::memory_order_relaxed);
enqueue([] {}); enqueue([] {});
if constexpr (!perform_automatically) { if constexpr (!perform_automatically) {
perform(); perform();
@@ -124,31 +125,8 @@ public:
/// ///
/// This is not guaranteed safely to restart a stopped queue. /// This is not guaranteed safely to restart a stopped queue.
void start() { void start() {
thread_ = std::thread{ static_assert(!start_immediately);
[this] { start_impl();
ActionVector actions;
// Continue until told to quit.
while(!should_quit_) {
// Wait for new actions to be signalled, and grab them.
std::unique_lock lock(condition_mutex_);
while(actions_.empty() && !should_quit_) {
condition_.wait(lock);
}
std::swap(actions, actions_);
lock.unlock();
// Update to now (which is possibly a no-op).
TaskQueueStorage<Performer>::update();
// Perform the actions and destroy them.
for(const auto &action: actions) {
action();
}
actions.clear();
}
}
};
} }
/// Schedules any remaining unscheduled work, then blocks synchronously /// Schedules any remaining unscheduled work, then blocks synchronously
@@ -177,6 +155,34 @@ public:
} }
private: private:
void start_impl() {
thread_ = std::thread{
[this] {
ActionVector actions;
// Continue until told to quit.
while(!should_quit_.load(std::memory_order_relaxed)) {
// Wait for new actions to be signalled, and grab them.
std::unique_lock lock(condition_mutex_);
while(actions_.empty() && !should_quit_.load(std::memory_order_relaxed)) {
condition_.wait(lock);
}
std::swap(actions, actions_);
lock.unlock();
// Update to now (which is possibly a no-op).
TaskQueueStorage<Performer>::update();
// Perform the actions and destroy them.
for(const auto &action: actions) {
action();
}
actions.clear();
}
}
};
}
// The list of actions waiting be performed. These will be elided, // The list of actions waiting be performed. These will be elided,
// increasing their latency, if the emulation thread falls behind. // increasing their latency, if the emulation thread falls behind.
using ActionVector = std::vector<std::function<void(void)>>; using ActionVector = std::vector<std::function<void(void)>>;