From 126838e7c74f2a8e9fa9333049eccc4585d53024 Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Thu, 14 Jul 2022 15:52:31 -0400 Subject: [PATCH 1/2] Thanks to `std::swap` and move semantics, there's no need for indirection here. --- Concurrency/AsyncUpdater.hpp | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/Concurrency/AsyncUpdater.hpp b/Concurrency/AsyncUpdater.hpp index 62625d452..cf241c590 100644 --- a/Concurrency/AsyncUpdater.hpp +++ b/Concurrency/AsyncUpdater.hpp @@ -21,16 +21,15 @@ template class AsyncUpdater { public: template AsyncUpdater(Args&&... args) : performer(std::forward(args)...), - actions_(std::make_unique()), performer_thread_{ [this] { Time::Nanos last_fired = Time::nanos_now(); - auto actions = std::make_unique(); + ActionVector actions; while(!should_quit) { // Wait for new actions to be signalled, and grab them. std::unique_lock lock(condition_mutex_); - while(actions_->empty()) { + while(actions_.empty()) { condition_.wait(lock); } std::swap(actions, actions_); @@ -42,10 +41,10 @@ template class AsyncUpdater { last_fired = time_now; // Perform the actions. - for(const auto& action: *actions) { + for(const auto& action: actions) { action(); } - actions->clear(); + actions.clear(); } } } {} @@ -58,7 +57,7 @@ template class AsyncUpdater { /// Actions may be elided, void update(const std::function &post_action) { std::lock_guard guard(condition_mutex_); - actions_->push_back(post_action); + actions_.push_back(post_action); condition_.notify_all(); } @@ -81,7 +80,7 @@ template class AsyncUpdater { // The list of actions waiting be performed. These will be elided, // increasing their latency, if the emulation thread falls behind. using ActionVector = std::vector>; - std::unique_ptr actions_; + ActionVector actions_; // Necessary synchronisation parts. std::atomic should_quit = false; From bf03bda3144b24359a2943a0385b62b1024027ba Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Thu, 14 Jul 2022 16:39:26 -0400 Subject: [PATCH 2/2] Generalise AsyncTaskQueue, DeferringAsyncTaskQueue and AsyncUpdater into a single template. --- .../Implementation/MultiProducer.hpp | 2 +- ClockReceiver/JustInTime.hpp | 2 +- Components/6560/6560.cpp | 6 +- Components/6560/6560.hpp | 6 +- Components/AY38910/AY38910.cpp | 4 +- Components/AY38910/AY38910.hpp | 4 +- Components/AudioToggle/AudioToggle.cpp | 4 +- Components/AudioToggle/AudioToggle.hpp | 4 +- Components/KonamiSCC/KonamiSCC.cpp | 4 +- Components/KonamiSCC/KonamiSCC.hpp | 4 +- Components/OPx/Implementation/OPLBase.hpp | 4 +- Components/OPx/OPLL.cpp | 4 +- Components/OPx/OPLL.hpp | 2 +- Components/SN76489/SN76489.cpp | 4 +- Components/SN76489/SN76489.hpp | 4 +- Concurrency/AsyncTaskQueue.cpp | 110 ---------- Concurrency/AsyncTaskQueue.hpp | 194 ++++++++++++------ Concurrency/AsyncUpdater.hpp | 98 --------- Machines/Amiga/Audio.hpp | 2 +- Machines/AmstradCPC/AmstradCPC.cpp | 2 +- Machines/Apple/AppleII/AppleII.cpp | 2 +- Machines/Apple/AppleIIgs/AppleIIgs.cpp | 2 +- Machines/Apple/AppleIIgs/Sound.cpp | 6 +- Machines/Apple/AppleIIgs/Sound.hpp | 4 +- Machines/Apple/Macintosh/Audio.cpp | 6 +- Machines/Apple/Macintosh/Audio.hpp | 4 +- Machines/Apple/Macintosh/DeferredAudio.hpp | 2 +- Machines/Atari/2600/Bus.hpp | 2 +- Machines/Atari/2600/TIASound.cpp | 8 +- Machines/Atari/2600/TIASound.hpp | 4 +- Machines/Atari/ST/AtariST.cpp | 2 +- Machines/ColecoVision/ColecoVision.cpp | 2 +- Machines/Electron/Electron.cpp | 2 +- Machines/Electron/SoundGenerator.cpp | 6 +- Machines/Electron/SoundGenerator.hpp | 4 +- Machines/Enterprise/Dave.cpp | 6 +- Machines/Enterprise/Dave.hpp | 4 +- Machines/Enterprise/Enterprise.cpp | 2 +- Machines/MSX/MSX.cpp | 2 +- Machines/MasterSystem/MasterSystem.cpp | 4 +- Machines/Oric/Oric.cpp | 6 +- Machines/Sinclair/ZX8081/ZX8081.cpp | 2 +- Machines/Sinclair/ZXSpectrum/ZXSpectrum.cpp | 2 +- .../Clock Signal.xcodeproj/project.pbxproj | 10 - .../Mac/Clock Signal/Machine/CSMachine.mm | 10 +- .../Speaker/Implementation/LowpassSpeaker.hpp | 4 +- Storage/Disk/DiskImage/DiskImage.hpp | 2 +- .../DiskImage/DiskImageImplementation.hpp | 2 +- 48 files changed, 218 insertions(+), 358 deletions(-) delete mode 100644 Concurrency/AsyncTaskQueue.cpp delete mode 100644 Concurrency/AsyncUpdater.hpp diff --git a/Analyser/Dynamic/MultiMachine/Implementation/MultiProducer.hpp b/Analyser/Dynamic/MultiMachine/Implementation/MultiProducer.hpp index 543e24e5b..b03aa9401 100644 --- a/Analyser/Dynamic/MultiMachine/Implementation/MultiProducer.hpp +++ b/Analyser/Dynamic/MultiMachine/Implementation/MultiProducer.hpp @@ -47,7 +47,7 @@ template class MultiInterface { std::recursive_mutex &machines_mutex_; private: - std::vector queues_; + std::vector> queues_; }; class MultiTimedMachine: public MultiInterface, public MachineTypes::TimedMachine { diff --git a/ClockReceiver/JustInTime.hpp b/ClockReceiver/JustInTime.hpp index a7bc2d6bd..a7c80e735 100644 --- a/ClockReceiver/JustInTime.hpp +++ b/ClockReceiver/JustInTime.hpp @@ -315,7 +315,7 @@ template task_queue_; }; #endif /* JustInTime_h */ diff --git a/Components/6560/6560.cpp b/Components/6560/6560.cpp index 6e9f6be72..5a87e34a7 100644 --- a/Components/6560/6560.cpp +++ b/Components/6560/6560.cpp @@ -12,18 +12,18 @@ using namespace MOS::MOS6560; -AudioGenerator::AudioGenerator(Concurrency::DeferringAsyncTaskQueue &audio_queue) : +AudioGenerator::AudioGenerator(Concurrency::TaskQueue &audio_queue) : audio_queue_(audio_queue) {} void AudioGenerator::set_volume(uint8_t volume) { - audio_queue_.defer([this, volume]() { + audio_queue_.enqueue([this, volume]() { volume_ = int16_t(volume) * range_multiplier_; }); } void AudioGenerator::set_control(int channel, uint8_t value) { - audio_queue_.defer([this, channel, value]() { + audio_queue_.enqueue([this, channel, value]() { control_registers_[channel] = value; }); } diff --git a/Components/6560/6560.hpp b/Components/6560/6560.hpp index 53e145f29..3facb0065 100644 --- a/Components/6560/6560.hpp +++ b/Components/6560/6560.hpp @@ -21,7 +21,7 @@ namespace MOS6560 { // audio state class AudioGenerator: public ::Outputs::Speaker::SampleSource { public: - AudioGenerator(Concurrency::DeferringAsyncTaskQueue &audio_queue); + AudioGenerator(Concurrency::TaskQueue &audio_queue); void set_volume(uint8_t volume); void set_control(int channel, uint8_t value); @@ -33,7 +33,7 @@ class AudioGenerator: public ::Outputs::Speaker::SampleSource { static constexpr bool get_is_stereo() { return false; } private: - Concurrency::DeferringAsyncTaskQueue &audio_queue_; + Concurrency::TaskQueue &audio_queue_; unsigned int counters_[4] = {2, 1, 0, 0}; // create a slight phase offset for the three channels unsigned int shift_registers_[4] = {0, 0, 0, 0}; @@ -433,7 +433,7 @@ template class MOS6560 { BusHandler &bus_handler_; Outputs::CRT::CRT crt_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue audio_queue_; AudioGenerator audio_generator_; Outputs::Speaker::PullLowpass speaker_; diff --git a/Components/AY38910/AY38910.cpp b/Components/AY38910/AY38910.cpp index 31a2e7ad6..535b1947f 100644 --- a/Components/AY38910/AY38910.cpp +++ b/Components/AY38910/AY38910.cpp @@ -16,7 +16,7 @@ using namespace GI::AY38910; template -AY38910::AY38910(Personality personality, Concurrency::DeferringAsyncTaskQueue &task_queue) : task_queue_(task_queue) { +AY38910::AY38910(Personality personality, Concurrency::TaskQueue &task_queue) : task_queue_(task_queue) { // Don't use the low bit of the envelope position if this is an AY. envelope_position_mask_ |= personality == Personality::AY38910; @@ -252,7 +252,7 @@ template void AY38910::set_register_value(uint8_t va // If this is a register that affects audio output, enqueue a mutation onto the // audio generation thread. if(selected_register_ < 14) { - task_queue_.defer([this, selected_register = selected_register_, value] () { + task_queue_.enqueue([this, selected_register = selected_register_, value] () { // Perform any register-specific mutation to output generation. uint8_t masked_value = value; switch(selected_register) { diff --git a/Components/AY38910/AY38910.hpp b/Components/AY38910/AY38910.hpp index 4ca99f6a2..22da85b21 100644 --- a/Components/AY38910/AY38910.hpp +++ b/Components/AY38910/AY38910.hpp @@ -71,7 +71,7 @@ enum class Personality { template class AY38910: public ::Outputs::Speaker::SampleSource { public: /// Creates a new AY38910. - AY38910(Personality, Concurrency::DeferringAsyncTaskQueue &); + AY38910(Personality, Concurrency::TaskQueue &); /// Sets the value the AY would read from its data lines if it were not outputting. void set_data_input(uint8_t r); @@ -114,7 +114,7 @@ template class AY38910: public ::Outputs::Speaker::SampleSource static constexpr bool get_is_stereo() { return is_stereo; } private: - Concurrency::DeferringAsyncTaskQueue &task_queue_; + Concurrency::TaskQueue &task_queue_; int selected_register_ = 0; uint8_t registers_[16] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; diff --git a/Components/AudioToggle/AudioToggle.cpp b/Components/AudioToggle/AudioToggle.cpp index d3bbe90bb..f9a3164c7 100644 --- a/Components/AudioToggle/AudioToggle.cpp +++ b/Components/AudioToggle/AudioToggle.cpp @@ -10,7 +10,7 @@ using namespace Audio; -Audio::Toggle::Toggle(Concurrency::DeferringAsyncTaskQueue &audio_queue) : +Audio::Toggle::Toggle(Concurrency::TaskQueue &audio_queue) : audio_queue_(audio_queue) {} void Toggle::get_samples(std::size_t number_of_samples, std::int16_t *target) { @@ -28,7 +28,7 @@ void Toggle::skip_samples(std::size_t) {} void Toggle::set_output(bool enabled) { if(is_enabled_ == enabled) return; is_enabled_ = enabled; - audio_queue_.defer([this, enabled] { + audio_queue_.enqueue([this, enabled] { level_ = enabled ? volume_ : 0; }); } diff --git a/Components/AudioToggle/AudioToggle.hpp b/Components/AudioToggle/AudioToggle.hpp index 991431244..4d67c5b67 100644 --- a/Components/AudioToggle/AudioToggle.hpp +++ b/Components/AudioToggle/AudioToggle.hpp @@ -19,7 +19,7 @@ namespace Audio { */ class Toggle: public Outputs::Speaker::SampleSource { public: - Toggle(Concurrency::DeferringAsyncTaskQueue &audio_queue); + Toggle(Concurrency::TaskQueue &audio_queue); void get_samples(std::size_t number_of_samples, std::int16_t *target); void set_sample_volume_range(std::int16_t range); @@ -31,7 +31,7 @@ class Toggle: public Outputs::Speaker::SampleSource { private: // Accessed on the calling thread. bool is_enabled_ = false; - Concurrency::DeferringAsyncTaskQueue &audio_queue_; + Concurrency::TaskQueue &audio_queue_; // Accessed on the audio thread. int16_t level_ = 0, volume_ = 0; diff --git a/Components/KonamiSCC/KonamiSCC.cpp b/Components/KonamiSCC/KonamiSCC.cpp index ac2dc30d4..c87dd89c6 100644 --- a/Components/KonamiSCC/KonamiSCC.cpp +++ b/Components/KonamiSCC/KonamiSCC.cpp @@ -12,7 +12,7 @@ using namespace Konami; -SCC::SCC(Concurrency::DeferringAsyncTaskQueue &task_queue) : +SCC::SCC(Concurrency::TaskQueue &task_queue) : task_queue_(task_queue) {} bool SCC::is_zero_level() const { @@ -55,7 +55,7 @@ void SCC::write(uint16_t address, uint8_t value) { address &= 0xff; if(address < 0x80) ram_[address] = value; - task_queue_.defer([this, address, value] { + task_queue_.enqueue([this, address, value] { // Check for a write into waveform memory. if(address < 0x80) { waves_[address >> 5].samples[address & 0x1f] = value; diff --git a/Components/KonamiSCC/KonamiSCC.hpp b/Components/KonamiSCC/KonamiSCC.hpp index f08bc5619..6aa04d8f7 100644 --- a/Components/KonamiSCC/KonamiSCC.hpp +++ b/Components/KonamiSCC/KonamiSCC.hpp @@ -24,7 +24,7 @@ namespace Konami { class SCC: public ::Outputs::Speaker::SampleSource { public: /// Creates a new SCC. - SCC(Concurrency::DeferringAsyncTaskQueue &task_queue); + SCC(Concurrency::TaskQueue &task_queue); /// As per ::SampleSource; provides a broadphase test for silence. bool is_zero_level() const; @@ -41,7 +41,7 @@ class SCC: public ::Outputs::Speaker::SampleSource { uint8_t read(uint16_t address); private: - Concurrency::DeferringAsyncTaskQueue &task_queue_; + Concurrency::TaskQueue &task_queue_; // State from here on down is accessed ony from the audio thread. int master_divider_ = 0; diff --git a/Components/OPx/Implementation/OPLBase.hpp b/Components/OPx/Implementation/OPLBase.hpp index 6b6a71038..5409b7cb2 100644 --- a/Components/OPx/Implementation/OPLBase.hpp +++ b/Components/OPx/Implementation/OPLBase.hpp @@ -26,9 +26,9 @@ template class OPLBase: public ::Outputs::Speaker::SampleSource } protected: - OPLBase(Concurrency::DeferringAsyncTaskQueue &task_queue) : task_queue_(task_queue) {} + OPLBase(Concurrency::TaskQueue &task_queue) : task_queue_(task_queue) {} - Concurrency::DeferringAsyncTaskQueue &task_queue_; + Concurrency::TaskQueue &task_queue_; private: uint8_t selected_register_ = 0; diff --git a/Components/OPx/OPLL.cpp b/Components/OPx/OPLL.cpp index b5abd1460..7f86c80c1 100644 --- a/Components/OPx/OPLL.cpp +++ b/Components/OPx/OPLL.cpp @@ -12,7 +12,7 @@ using namespace Yamaha::OPL; -OPLL::OPLL(Concurrency::DeferringAsyncTaskQueue &task_queue, int audio_divider, bool is_vrc7): +OPLL::OPLL(Concurrency::TaskQueue &task_queue, int audio_divider, bool is_vrc7): OPLBase(task_queue), audio_divider_(audio_divider), is_vrc7_(is_vrc7) { // Due to the way that sound mixing works on the OPLL, the audio divider may not // be larger than 4. @@ -74,7 +74,7 @@ OPLL::OPLL(Concurrency::DeferringAsyncTaskQueue &task_queue, int audio_divider, void OPLL::write_register(uint8_t address, uint8_t value) { // The OPLL doesn't have timers or other non-audio functions, so all writes // go to the audio queue. - task_queue_.defer([this, address, value] { + task_queue_.enqueue([this, address, value] { // The first 8 locations are used to define the custom instrument, and have // exactly the same format as the patch set arrays at the head of this file. if(address < 8) { diff --git a/Components/OPx/OPLL.hpp b/Components/OPx/OPLL.hpp index beaa816d8..07072ff41 100644 --- a/Components/OPx/OPLL.hpp +++ b/Components/OPx/OPLL.hpp @@ -24,7 +24,7 @@ namespace OPL { class OPLL: public OPLBase { public: /// Creates a new OPLL or VRC7. - OPLL(Concurrency::DeferringAsyncTaskQueue &task_queue, int audio_divider = 1, bool is_vrc7 = false); + OPLL(Concurrency::TaskQueue &task_queue, int audio_divider = 1, bool is_vrc7 = false); /// As per ::SampleSource; provides audio output. void get_samples(std::size_t number_of_samples, std::int16_t *target); diff --git a/Components/SN76489/SN76489.cpp b/Components/SN76489/SN76489.cpp index bd92d4d60..a413b143c 100644 --- a/Components/SN76489/SN76489.cpp +++ b/Components/SN76489/SN76489.cpp @@ -13,7 +13,7 @@ using namespace TI; -SN76489::SN76489(Personality personality, Concurrency::DeferringAsyncTaskQueue &task_queue, int additional_divider) : task_queue_(task_queue) { +SN76489::SN76489(Personality personality, Concurrency::TaskQueue &task_queue, int additional_divider) : task_queue_(task_queue) { set_sample_volume_range(0); switch(personality) { @@ -49,7 +49,7 @@ void SN76489::set_sample_volume_range(std::int16_t range) { } void SN76489::write(uint8_t value) { - task_queue_.defer([value, this] () { + task_queue_.enqueue([value, this] () { if(value & 0x80) { active_register_ = value; } diff --git a/Components/SN76489/SN76489.hpp b/Components/SN76489/SN76489.hpp index 6f5b0c151..43da90661 100644 --- a/Components/SN76489/SN76489.hpp +++ b/Components/SN76489/SN76489.hpp @@ -23,7 +23,7 @@ class SN76489: public Outputs::Speaker::SampleSource { }; /// Creates a new SN76489. - SN76489(Personality personality, Concurrency::DeferringAsyncTaskQueue &task_queue, int additional_divider = 1); + SN76489(Personality personality, Concurrency::TaskQueue &task_queue, int additional_divider = 1); /// Writes a new value to the SN76489. void write(uint8_t value); @@ -41,7 +41,7 @@ class SN76489: public Outputs::Speaker::SampleSource { void evaluate_output_volume(); int volumes_[16]; - Concurrency::DeferringAsyncTaskQueue &task_queue_; + Concurrency::TaskQueue &task_queue_; struct ToneChannel { // Programmatically-set state; updated by the processor. diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp deleted file mode 100644 index ccc44a990..000000000 --- a/Concurrency/AsyncTaskQueue.cpp +++ /dev/null @@ -1,110 +0,0 @@ -// -// AsyncTaskQueue.cpp -// Clock Signal -// -// Created by Thomas Harte on 07/10/2016. -// Copyright 2016 Thomas Harte. All rights reserved. -// - -#include "AsyncTaskQueue.hpp" - -using namespace Concurrency; - -AsyncTaskQueue::AsyncTaskQueue() -#ifndef USE_GCD - : - should_destruct_(false), - thread_([this] () { - while(!should_destruct_) { - std::function next_function; - - // Take lock, check for a new task. - std::unique_lock lock(queue_mutex_); - if(!pending_tasks_.empty()) { - next_function = pending_tasks_.front(); - pending_tasks_.pop_front(); - } - - if(next_function) { - // If there is a task, release lock and perform it. - lock.unlock(); - next_function(); - } else { - // If there isn't a task, atomically block on the processing condition and release the lock - // until there's something pending (and then release it again via scope). - processing_condition_.wait(lock); - } - } - }) -#else - : serial_dispatch_queue_(dispatch_queue_create("com.thomasharte.clocksignal.asyntaskqueue", DISPATCH_QUEUE_SERIAL)) -#endif -{} - -AsyncTaskQueue::~AsyncTaskQueue() { -#ifdef USE_GCD - flush(); - dispatch_release(serial_dispatch_queue_); -#else - // Set should destruct, and then give the thread a bit of a nudge - // via an empty enqueue. - should_destruct_ = true; - enqueue([](){}); - - // Wait for the thread safely to terminate. - thread_.join(); -#endif -} - -void AsyncTaskQueue::enqueue(std::function function) { -#ifdef USE_GCD - dispatch_async(serial_dispatch_queue_, ^{function();}); -#else - std::lock_guard lock(queue_mutex_); - pending_tasks_.push_back(function); - processing_condition_.notify_all(); -#endif -} - -void AsyncTaskQueue::flush() { -#ifdef USE_GCD - dispatch_sync(serial_dispatch_queue_, ^{}); -#else - auto flush_mutex = std::make_shared(); - auto flush_condition = std::make_shared(); - std::unique_lock lock(*flush_mutex); - enqueue([=] () { - std::unique_lock inner_lock(*flush_mutex); - flush_condition->notify_all(); - }); - flush_condition->wait(lock); -#endif -} - -DeferringAsyncTaskQueue::~DeferringAsyncTaskQueue() { - perform(); - flush(); -} - -void DeferringAsyncTaskQueue::defer(std::function function) { - if(!deferred_tasks_) { - deferred_tasks_ = std::make_unique(); - deferred_tasks_->reserve(16); - } - deferred_tasks_->push_back(function); -} - -void DeferringAsyncTaskQueue::perform() { - if(!deferred_tasks_) return; - enqueue([deferred_tasks_raw = deferred_tasks_.release()] { - std::unique_ptr deferred_tasks(deferred_tasks_raw); - for(const auto &function : *deferred_tasks) { - function(); - } - }); -} - -void DeferringAsyncTaskQueue::flush() { - perform(); - AsyncTaskQueue::flush(); -} diff --git a/Concurrency/AsyncTaskQueue.hpp b/Concurrency/AsyncTaskQueue.hpp index f301a8717..da56e4388 100644 --- a/Concurrency/AsyncTaskQueue.hpp +++ b/Concurrency/AsyncTaskQueue.hpp @@ -12,11 +12,11 @@ #include #include #include -#include -#include #include #include +#include "../ClockReceiver/TimeTypes.hpp" + #if defined(__APPLE__) && !defined(IGNORE_APPLE) #include #define USE_GCD @@ -24,81 +24,159 @@ namespace Concurrency { -using TaskList = std::vector>; +/// An implementation detail; provides the time-centric part of a TaskQueue with a real Performer. +template struct TaskQueueStorage { + template TaskQueueStorage(Args&&... args) : + performer(std::forward(args)...), + last_fired_(Time::nanos_now()) {} -/*! - An async task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed - to be performed serially and asynchronously from the caller. A caller may also request to flush, - causing it to block until all previously-enqueued functions are complete. -*/ -class AsyncTaskQueue { - public: - AsyncTaskQueue(); - virtual ~AsyncTaskQueue(); + Performer performer; - /*! - Adds @c function to the queue. - - @discussion Functions will be performed serially and asynchronously. This method is safe to - call from multiple threads. - @parameter function The function to enqueue. - */ - void enqueue(std::function function); - - /*! - Blocks the caller until all previously-enqueud functions have completed. - */ - void flush(); + protected: + void update() { + auto time_now = Time::nanos_now(); + performer.perform(time_now - last_fired_); + last_fired_ = time_now; + } private: -#ifdef USE_GCD - dispatch_queue_t serial_dispatch_queue_; -#else - std::atomic_bool should_destruct_; - std::condition_variable processing_condition_; - std::mutex queue_mutex_; - std::list> pending_tasks_; + Time::Nanos last_fired_; +}; - std::thread thread_; -#endif +/// An implementation detail; provides a no-op implementation of time advances for TaskQueues without a Performer. +template <> struct TaskQueueStorage { + TaskQueueStorage() {} + + protected: + void update() {} }; /*! - A deferring async task queue is one that accepts a list of functions to be performed but defers - any action until told to perform. It performs them by enquing a single asynchronous task that will - perform the deferred tasks in order. + A task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed + to be performed serially and asynchronously from the caller. - It therefore offers similar semantics to an asynchronous task queue, but allows for management of - synchronisation costs, since neither defer nor perform make any effort to be thread safe. + If @c perform_automatically is true, functions will be performed as soon as is possible, + at the cost of thread synchronisation. + + If @c perform_automatically is false, functions will be queued up and not dispatched + until a call to perform(). + + If a @c Performer type is supplied then a public member, @c performer will be constructed + with the arguments supplied to TaskQueue's constructor, and that class will receive calls of the + form @c .perform(nanos) to update it to every batch of new actions. */ -class DeferringAsyncTaskQueue: public AsyncTaskQueue { +template class TaskQueue: public TaskQueueStorage { public: - ~DeferringAsyncTaskQueue(); + template TaskQueue(Args&&... args) : + TaskQueueStorage(std::forward(args)...), + thread_{ + [this] { + ActionVector actions; - /*! - Adds a function to the deferral list. + while(!should_quit) { + // Wait for new actions to be signalled, and grab them. + std::unique_lock lock(condition_mutex_); + while(actions_.empty()) { + condition_.wait(lock); + } + std::swap(actions, actions_); + lock.unlock(); - This is not thread safe; it should be serialised with other calls to itself and to perform. - */ - void defer(std::function function); + // Update to now (which is possibly a no-op). + TaskQueueStorage::update(); - /*! - Enqueues a function that will perform all currently deferred functions, in the - order that they were deferred. + // Perform the actions and destroy them. + for(const auto &action: actions) { + action(); + } + actions.clear(); + } + } + } {} - This is not thread safe; it should be serialised with other calls to itself and to defer. - */ - void perform(); + /// Enqueus @c post_action to be performed asynchronously at some point + /// in the future. If @c perform_automatically is @c true then the action + /// will be performed as soon as possible. Otherwise it will sit unsheculed until + /// a call to @c perform(). + /// + /// Actions may be elided. + /// + /// If this TaskQueue has a @c Performer then the action will be performed + /// on the same thread as the performer, after the performer has been updated + /// to 'now'. + void enqueue(const std::function &post_action) { + std::lock_guard guard(condition_mutex_); + actions_.push_back(post_action); - /*! - Blocks the caller until all previously-enqueud functions have completed. - */ - void flush(); + if constexpr (perform_automatically) { + condition_.notify_all(); + } + } + + /// Causes any enqueued actions that are not yet scheduled to be scheduled. + void perform() { + if(actions_.empty()) { + return; + } + condition_.notify_all(); + } + + /// Permanently stops this task queue, blocking until that has happened. + /// All pending actions will be performed first. + /// + /// The queue cannot be restarted; this is a destructive action. + void stop() { + if(thread_.joinable()) { + should_quit = true; + enqueue([] {}); + if constexpr (!perform_automatically) { + perform(); + } + thread_.join(); + } + } + + /// Schedules any remaining unscheduled work, then blocks synchronously + /// until all scheduled work has been performed. + void flush() { + std::mutex flush_mutex; + std::condition_variable flush_condition; + bool has_run = false; + std::unique_lock lock(flush_mutex); + + enqueue([&flush_mutex, &flush_condition, &has_run] () { + std::unique_lock inner_lock(flush_mutex); + has_run = true; + flush_condition.notify_all(); + }); + + if constexpr (!perform_automatically) { + perform(); + } + + flush_condition.wait(lock, [&has_run] { return has_run; }); + } + + ~TaskQueue() { + stop(); + } private: - std::unique_ptr deferred_tasks_; + // The list of actions waiting be performed. These will be elided, + // increasing their latency, if the emulation thread falls behind. + using ActionVector = std::vector>; + ActionVector actions_; + + // Necessary synchronisation parts. + std::atomic should_quit = false; + std::mutex condition_mutex_; + std::condition_variable condition_; + + // Ensure the thread isn't constructed until after the mutex + // and condition variable. + std::thread thread_; }; } -#endif /* Concurrency_hpp */ +#endif /* AsyncTaskQueue_hpp */ diff --git a/Concurrency/AsyncUpdater.hpp b/Concurrency/AsyncUpdater.hpp deleted file mode 100644 index cf241c590..000000000 --- a/Concurrency/AsyncUpdater.hpp +++ /dev/null @@ -1,98 +0,0 @@ -// -// AsyncUpdater.h -// Clock Signal -// -// Created by Thomas Harte on 06/07/2022. -// Copyright © 2022 Thomas Harte. All rights reserved. -// - -#ifndef AsyncUpdater_hpp -#define AsyncUpdater_hpp - -#include -#include -#include - -#include "../ClockReceiver/TimeTypes.hpp" - -namespace Concurrency { - -template class AsyncUpdater { - public: - template AsyncUpdater(Args&&... args) : - performer(std::forward(args)...), - performer_thread_{ - [this] { - Time::Nanos last_fired = Time::nanos_now(); - ActionVector actions; - - while(!should_quit) { - // Wait for new actions to be signalled, and grab them. - std::unique_lock lock(condition_mutex_); - while(actions_.empty()) { - condition_.wait(lock); - } - std::swap(actions, actions_); - lock.unlock(); - - // Update to now. - auto time_now = Time::nanos_now(); - performer.perform(time_now - last_fired); - last_fired = time_now; - - // Perform the actions. - for(const auto& action: actions) { - action(); - } - actions.clear(); - } - } - } {} - - /// Run the performer up to 'now' and then perform @c post_action. - /// - /// @c post_action will be performed asynchronously, on the same - /// thread as the performer. - /// - /// Actions may be elided, - void update(const std::function &post_action) { - std::lock_guard guard(condition_mutex_); - actions_.push_back(post_action); - condition_.notify_all(); - } - - void stop() { - if(performer_thread_.joinable()) { - should_quit = true; - update([] {}); - performer_thread_.join(); - } - } - - ~AsyncUpdater() { - stop(); - } - - // The object that will actually receive time advances. - Performer performer; - - private: - // The list of actions waiting be performed. These will be elided, - // increasing their latency, if the emulation thread falls behind. - using ActionVector = std::vector>; - ActionVector actions_; - - // Necessary synchronisation parts. - std::atomic should_quit = false; - std::mutex condition_mutex_; - std::condition_variable condition_; - - // Ensure the thread isn't constructed until after the mutex - // and condition variable. - std::thread performer_thread_; -}; - - -} - -#endif /* AsyncUpdater_hpp */ diff --git a/Machines/Amiga/Audio.hpp b/Machines/Amiga/Audio.hpp index ae63339fb..41b414f65 100644 --- a/Machines/Amiga/Audio.hpp +++ b/Machines/Amiga/Audio.hpp @@ -149,7 +149,7 @@ class Audio: public DMADevice<4> { // Transient output state, and its destination. Outputs::Speaker::PushLowpass speaker_; - Concurrency::AsyncTaskQueue queue_; + Concurrency::TaskQueue queue_; using AudioBuffer = std::array; static constexpr int BufferCount = 3; diff --git a/Machines/AmstradCPC/AmstradCPC.cpp b/Machines/AmstradCPC/AmstradCPC.cpp index db4c88391..ca41eb1b2 100644 --- a/Machines/AmstradCPC/AmstradCPC.cpp +++ b/Machines/AmstradCPC/AmstradCPC.cpp @@ -156,7 +156,7 @@ class AYDeferrer { } private: - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue audio_queue_; GI::AY38910::AY38910 ay_; Outputs::Speaker::PullLowpass> speaker_; HalfCycles cycles_since_update_; diff --git a/Machines/Apple/AppleII/AppleII.cpp b/Machines/Apple/AppleII/AppleII.cpp index 706e4cb6c..fbbc623d3 100644 --- a/Machines/Apple/AppleII/AppleII.cpp +++ b/Machines/Apple/AppleII/AppleII.cpp @@ -95,7 +95,7 @@ template class ConcreteMachine: uint8_t ram_[65536], aux_ram_[65536]; std::vector rom_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue audio_queue_; Audio::Toggle audio_toggle_; Outputs::Speaker::PullLowpass speaker_; Cycles cycles_since_audio_update_; diff --git a/Machines/Apple/AppleIIgs/AppleIIgs.cpp b/Machines/Apple/AppleIIgs/AppleIIgs.cpp index 546eeb68a..7037af170 100644 --- a/Machines/Apple/AppleIIgs/AppleIIgs.cpp +++ b/Machines/Apple/AppleIIgs/AppleIIgs.cpp @@ -1150,7 +1150,7 @@ class ConcreteMachine: Apple::Disk::DiskIIDrive drives525_[2]; // The audio parts. - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue audio_queue_; Apple::IIgs::Sound::GLU sound_glu_; Audio::Toggle audio_toggle_; using AudioSource = Outputs::Speaker::CompoundSource; diff --git a/Machines/Apple/AppleIIgs/Sound.cpp b/Machines/Apple/AppleIIgs/Sound.cpp index 7e5dbf36e..942746257 100644 --- a/Machines/Apple/AppleIIgs/Sound.cpp +++ b/Machines/Apple/AppleIIgs/Sound.cpp @@ -16,7 +16,7 @@ using namespace Apple::IIgs::Sound; -GLU::GLU(Concurrency::DeferringAsyncTaskQueue &audio_queue) : audio_queue_(audio_queue) { +GLU::GLU(Concurrency::TaskQueue &audio_queue) : audio_queue_(audio_queue) { // Reset all pending stores. MemoryWrite disabled_write; disabled_write.enabled = false; @@ -42,7 +42,7 @@ void GLU::set_data(uint8_t data) { // Register access. const auto address = address_; // To make sure I don't inadvertently 'capture' address_. local_.set_register(address, data); - audio_queue_.defer([this, address, data] () { + audio_queue_.enqueue([this, address, data] () { remote_.set_register(address, data); }); } @@ -191,7 +191,7 @@ void GLU::set_sample_volume_range(std::int16_t range) { void GLU::set_control(uint8_t control) { local_.control = control; - audio_queue_.defer([this, control] () { + audio_queue_.enqueue([this, control] () { remote_.control = control; }); } diff --git a/Machines/Apple/AppleIIgs/Sound.hpp b/Machines/Apple/AppleIIgs/Sound.hpp index 24f99c53c..7b822c3da 100644 --- a/Machines/Apple/AppleIIgs/Sound.hpp +++ b/Machines/Apple/AppleIIgs/Sound.hpp @@ -21,7 +21,7 @@ namespace Sound { class GLU: public Outputs::Speaker::SampleSource { public: - GLU(Concurrency::DeferringAsyncTaskQueue &audio_queue); + GLU(Concurrency::TaskQueue &audio_queue); void set_control(uint8_t); uint8_t get_control(); @@ -42,7 +42,7 @@ class GLU: public Outputs::Speaker::SampleSource { void skip_samples(const std::size_t number_of_samples); private: - Concurrency::DeferringAsyncTaskQueue &audio_queue_; + Concurrency::TaskQueue &audio_queue_; uint16_t address_ = 0; diff --git a/Machines/Apple/Macintosh/Audio.cpp b/Machines/Apple/Macintosh/Audio.cpp index fba2c11ea..076c2b857 100644 --- a/Machines/Apple/Macintosh/Audio.cpp +++ b/Machines/Apple/Macintosh/Audio.cpp @@ -18,7 +18,7 @@ const std::size_t sample_length = 352 / 2; } -Audio::Audio(Concurrency::DeferringAsyncTaskQueue &task_queue) : task_queue_(task_queue) {} +Audio::Audio(Concurrency::TaskQueue &task_queue) : task_queue_(task_queue) {} // MARK: - Inputs @@ -35,7 +35,7 @@ void Audio::set_volume(int volume) { posted_volume_ = volume; // Post the volume change as a deferred event. - task_queue_.defer([this, volume] () { + task_queue_.enqueue([this, volume] () { volume_ = volume; set_volume_multiplier(); }); @@ -47,7 +47,7 @@ void Audio::set_enabled(bool on) { posted_enable_mask_ = int(on); // Post the enabled mask change as a deferred event. - task_queue_.defer([this, on] () { + task_queue_.enqueue([this, on] () { enabled_mask_ = int(on); set_volume_multiplier(); }); diff --git a/Machines/Apple/Macintosh/Audio.hpp b/Machines/Apple/Macintosh/Audio.hpp index a0ae25a80..5bb4b7057 100644 --- a/Machines/Apple/Macintosh/Audio.hpp +++ b/Machines/Apple/Macintosh/Audio.hpp @@ -27,7 +27,7 @@ namespace Macintosh { */ class Audio: public ::Outputs::Speaker::SampleSource { public: - Audio(Concurrency::DeferringAsyncTaskQueue &task_queue); + Audio(Concurrency::TaskQueue &task_queue); /*! Macintosh audio is (partly) sourced by the same scanning @@ -58,7 +58,7 @@ class Audio: public ::Outputs::Speaker::SampleSource { constexpr static bool get_is_stereo() { return false; } private: - Concurrency::DeferringAsyncTaskQueue &task_queue_; + Concurrency::TaskQueue &task_queue_; // A queue of fetched samples; read from by one thread, // written to by another. diff --git a/Machines/Apple/Macintosh/DeferredAudio.hpp b/Machines/Apple/Macintosh/DeferredAudio.hpp index 5488f579c..fe0ebdb3a 100644 --- a/Machines/Apple/Macintosh/DeferredAudio.hpp +++ b/Machines/Apple/Macintosh/DeferredAudio.hpp @@ -16,7 +16,7 @@ namespace Apple { namespace Macintosh { struct DeferredAudio { - Concurrency::DeferringAsyncTaskQueue queue; + Concurrency::TaskQueue queue; Audio audio; Outputs::Speaker::PullLowpass