From bf03bda3144b24359a2943a0385b62b1024027ba Mon Sep 17 00:00:00 2001 From: Thomas Harte <thomas.harte@gmail.com> Date: Thu, 14 Jul 2022 16:39:26 -0400 Subject: [PATCH] Generalise AsyncTaskQueue, DeferringAsyncTaskQueue and AsyncUpdater into a single template. --- .../Implementation/MultiProducer.hpp | 2 +- ClockReceiver/JustInTime.hpp | 2 +- Components/6560/6560.cpp | 6 +- Components/6560/6560.hpp | 6 +- Components/AY38910/AY38910.cpp | 4 +- Components/AY38910/AY38910.hpp | 4 +- Components/AudioToggle/AudioToggle.cpp | 4 +- Components/AudioToggle/AudioToggle.hpp | 4 +- Components/KonamiSCC/KonamiSCC.cpp | 4 +- Components/KonamiSCC/KonamiSCC.hpp | 4 +- Components/OPx/Implementation/OPLBase.hpp | 4 +- Components/OPx/OPLL.cpp | 4 +- Components/OPx/OPLL.hpp | 2 +- Components/SN76489/SN76489.cpp | 4 +- Components/SN76489/SN76489.hpp | 4 +- Concurrency/AsyncTaskQueue.cpp | 110 ---------- Concurrency/AsyncTaskQueue.hpp | 194 ++++++++++++------ Concurrency/AsyncUpdater.hpp | 98 --------- Machines/Amiga/Audio.hpp | 2 +- Machines/AmstradCPC/AmstradCPC.cpp | 2 +- Machines/Apple/AppleII/AppleII.cpp | 2 +- Machines/Apple/AppleIIgs/AppleIIgs.cpp | 2 +- Machines/Apple/AppleIIgs/Sound.cpp | 6 +- Machines/Apple/AppleIIgs/Sound.hpp | 4 +- Machines/Apple/Macintosh/Audio.cpp | 6 +- Machines/Apple/Macintosh/Audio.hpp | 4 +- Machines/Apple/Macintosh/DeferredAudio.hpp | 2 +- Machines/Atari/2600/Bus.hpp | 2 +- Machines/Atari/2600/TIASound.cpp | 8 +- Machines/Atari/2600/TIASound.hpp | 4 +- Machines/Atari/ST/AtariST.cpp | 2 +- Machines/ColecoVision/ColecoVision.cpp | 2 +- Machines/Electron/Electron.cpp | 2 +- Machines/Electron/SoundGenerator.cpp | 6 +- Machines/Electron/SoundGenerator.hpp | 4 +- Machines/Enterprise/Dave.cpp | 6 +- Machines/Enterprise/Dave.hpp | 4 +- Machines/Enterprise/Enterprise.cpp | 2 +- Machines/MSX/MSX.cpp | 2 +- Machines/MasterSystem/MasterSystem.cpp | 4 +- Machines/Oric/Oric.cpp | 6 +- Machines/Sinclair/ZX8081/ZX8081.cpp | 2 +- Machines/Sinclair/ZXSpectrum/ZXSpectrum.cpp | 2 +- .../Clock Signal.xcodeproj/project.pbxproj | 10 - .../Mac/Clock Signal/Machine/CSMachine.mm | 10 +- .../Speaker/Implementation/LowpassSpeaker.hpp | 4 +- Storage/Disk/DiskImage/DiskImage.hpp | 2 +- .../DiskImage/DiskImageImplementation.hpp | 2 +- 48 files changed, 218 insertions(+), 358 deletions(-) delete mode 100644 Concurrency/AsyncTaskQueue.cpp delete mode 100644 Concurrency/AsyncUpdater.hpp diff --git a/Analyser/Dynamic/MultiMachine/Implementation/MultiProducer.hpp b/Analyser/Dynamic/MultiMachine/Implementation/MultiProducer.hpp index 543e24e5b..b03aa9401 100644 --- a/Analyser/Dynamic/MultiMachine/Implementation/MultiProducer.hpp +++ b/Analyser/Dynamic/MultiMachine/Implementation/MultiProducer.hpp @@ -47,7 +47,7 @@ template <typename MachineType> class MultiInterface { std::recursive_mutex &machines_mutex_; private: - std::vector<Concurrency::AsyncTaskQueue> queues_; + std::vector<Concurrency::TaskQueue<true>> queues_; }; class MultiTimedMachine: public MultiInterface<MachineTypes::TimedMachine>, public MachineTypes::TimedMachine { diff --git a/ClockReceiver/JustInTime.hpp b/ClockReceiver/JustInTime.hpp index a7bc2d6bd..a7c80e735 100644 --- a/ClockReceiver/JustInTime.hpp +++ b/ClockReceiver/JustInTime.hpp @@ -315,7 +315,7 @@ template <class T, class LocalTimeScale = HalfCycles, class TargetTimeScale = Lo LocalTimeScale time_since_update_; TargetTimeScale threshold_; bool is_flushed_ = true; - Concurrency::AsyncTaskQueue task_queue_; + Concurrency::TaskQueue<true> task_queue_; }; #endif /* JustInTime_h */ diff --git a/Components/6560/6560.cpp b/Components/6560/6560.cpp index 6e9f6be72..5a87e34a7 100644 --- a/Components/6560/6560.cpp +++ b/Components/6560/6560.cpp @@ -12,18 +12,18 @@ using namespace MOS::MOS6560; -AudioGenerator::AudioGenerator(Concurrency::DeferringAsyncTaskQueue &audio_queue) : +AudioGenerator::AudioGenerator(Concurrency::TaskQueue<false> &audio_queue) : audio_queue_(audio_queue) {} void AudioGenerator::set_volume(uint8_t volume) { - audio_queue_.defer([this, volume]() { + audio_queue_.enqueue([this, volume]() { volume_ = int16_t(volume) * range_multiplier_; }); } void AudioGenerator::set_control(int channel, uint8_t value) { - audio_queue_.defer([this, channel, value]() { + audio_queue_.enqueue([this, channel, value]() { control_registers_[channel] = value; }); } diff --git a/Components/6560/6560.hpp b/Components/6560/6560.hpp index 53e145f29..3facb0065 100644 --- a/Components/6560/6560.hpp +++ b/Components/6560/6560.hpp @@ -21,7 +21,7 @@ namespace MOS6560 { // audio state class AudioGenerator: public ::Outputs::Speaker::SampleSource { public: - AudioGenerator(Concurrency::DeferringAsyncTaskQueue &audio_queue); + AudioGenerator(Concurrency::TaskQueue<false> &audio_queue); void set_volume(uint8_t volume); void set_control(int channel, uint8_t value); @@ -33,7 +33,7 @@ class AudioGenerator: public ::Outputs::Speaker::SampleSource { static constexpr bool get_is_stereo() { return false; } private: - Concurrency::DeferringAsyncTaskQueue &audio_queue_; + Concurrency::TaskQueue<false> &audio_queue_; unsigned int counters_[4] = {2, 1, 0, 0}; // create a slight phase offset for the three channels unsigned int shift_registers_[4] = {0, 0, 0, 0}; @@ -433,7 +433,7 @@ template <class BusHandler> class MOS6560 { BusHandler &bus_handler_; Outputs::CRT::CRT crt_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; AudioGenerator audio_generator_; Outputs::Speaker::PullLowpass<AudioGenerator> speaker_; diff --git a/Components/AY38910/AY38910.cpp b/Components/AY38910/AY38910.cpp index 31a2e7ad6..535b1947f 100644 --- a/Components/AY38910/AY38910.cpp +++ b/Components/AY38910/AY38910.cpp @@ -16,7 +16,7 @@ using namespace GI::AY38910; template <bool is_stereo> -AY38910<is_stereo>::AY38910(Personality personality, Concurrency::DeferringAsyncTaskQueue &task_queue) : task_queue_(task_queue) { +AY38910<is_stereo>::AY38910(Personality personality, Concurrency::TaskQueue<false> &task_queue) : task_queue_(task_queue) { // Don't use the low bit of the envelope position if this is an AY. envelope_position_mask_ |= personality == Personality::AY38910; @@ -252,7 +252,7 @@ template <bool is_stereo> void AY38910<is_stereo>::set_register_value(uint8_t va // If this is a register that affects audio output, enqueue a mutation onto the // audio generation thread. if(selected_register_ < 14) { - task_queue_.defer([this, selected_register = selected_register_, value] () { + task_queue_.enqueue([this, selected_register = selected_register_, value] () { // Perform any register-specific mutation to output generation. uint8_t masked_value = value; switch(selected_register) { diff --git a/Components/AY38910/AY38910.hpp b/Components/AY38910/AY38910.hpp index 4ca99f6a2..22da85b21 100644 --- a/Components/AY38910/AY38910.hpp +++ b/Components/AY38910/AY38910.hpp @@ -71,7 +71,7 @@ enum class Personality { template <bool is_stereo> class AY38910: public ::Outputs::Speaker::SampleSource { public: /// Creates a new AY38910. - AY38910(Personality, Concurrency::DeferringAsyncTaskQueue &); + AY38910(Personality, Concurrency::TaskQueue<false> &); /// Sets the value the AY would read from its data lines if it were not outputting. void set_data_input(uint8_t r); @@ -114,7 +114,7 @@ template <bool is_stereo> class AY38910: public ::Outputs::Speaker::SampleSource static constexpr bool get_is_stereo() { return is_stereo; } private: - Concurrency::DeferringAsyncTaskQueue &task_queue_; + Concurrency::TaskQueue<false> &task_queue_; int selected_register_ = 0; uint8_t registers_[16] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; diff --git a/Components/AudioToggle/AudioToggle.cpp b/Components/AudioToggle/AudioToggle.cpp index d3bbe90bb..f9a3164c7 100644 --- a/Components/AudioToggle/AudioToggle.cpp +++ b/Components/AudioToggle/AudioToggle.cpp @@ -10,7 +10,7 @@ using namespace Audio; -Audio::Toggle::Toggle(Concurrency::DeferringAsyncTaskQueue &audio_queue) : +Audio::Toggle::Toggle(Concurrency::TaskQueue<false> &audio_queue) : audio_queue_(audio_queue) {} void Toggle::get_samples(std::size_t number_of_samples, std::int16_t *target) { @@ -28,7 +28,7 @@ void Toggle::skip_samples(std::size_t) {} void Toggle::set_output(bool enabled) { if(is_enabled_ == enabled) return; is_enabled_ = enabled; - audio_queue_.defer([this, enabled] { + audio_queue_.enqueue([this, enabled] { level_ = enabled ? volume_ : 0; }); } diff --git a/Components/AudioToggle/AudioToggle.hpp b/Components/AudioToggle/AudioToggle.hpp index 991431244..4d67c5b67 100644 --- a/Components/AudioToggle/AudioToggle.hpp +++ b/Components/AudioToggle/AudioToggle.hpp @@ -19,7 +19,7 @@ namespace Audio { */ class Toggle: public Outputs::Speaker::SampleSource { public: - Toggle(Concurrency::DeferringAsyncTaskQueue &audio_queue); + Toggle(Concurrency::TaskQueue<false> &audio_queue); void get_samples(std::size_t number_of_samples, std::int16_t *target); void set_sample_volume_range(std::int16_t range); @@ -31,7 +31,7 @@ class Toggle: public Outputs::Speaker::SampleSource { private: // Accessed on the calling thread. bool is_enabled_ = false; - Concurrency::DeferringAsyncTaskQueue &audio_queue_; + Concurrency::TaskQueue<false> &audio_queue_; // Accessed on the audio thread. int16_t level_ = 0, volume_ = 0; diff --git a/Components/KonamiSCC/KonamiSCC.cpp b/Components/KonamiSCC/KonamiSCC.cpp index ac2dc30d4..c87dd89c6 100644 --- a/Components/KonamiSCC/KonamiSCC.cpp +++ b/Components/KonamiSCC/KonamiSCC.cpp @@ -12,7 +12,7 @@ using namespace Konami; -SCC::SCC(Concurrency::DeferringAsyncTaskQueue &task_queue) : +SCC::SCC(Concurrency::TaskQueue<false> &task_queue) : task_queue_(task_queue) {} bool SCC::is_zero_level() const { @@ -55,7 +55,7 @@ void SCC::write(uint16_t address, uint8_t value) { address &= 0xff; if(address < 0x80) ram_[address] = value; - task_queue_.defer([this, address, value] { + task_queue_.enqueue([this, address, value] { // Check for a write into waveform memory. if(address < 0x80) { waves_[address >> 5].samples[address & 0x1f] = value; diff --git a/Components/KonamiSCC/KonamiSCC.hpp b/Components/KonamiSCC/KonamiSCC.hpp index f08bc5619..6aa04d8f7 100644 --- a/Components/KonamiSCC/KonamiSCC.hpp +++ b/Components/KonamiSCC/KonamiSCC.hpp @@ -24,7 +24,7 @@ namespace Konami { class SCC: public ::Outputs::Speaker::SampleSource { public: /// Creates a new SCC. - SCC(Concurrency::DeferringAsyncTaskQueue &task_queue); + SCC(Concurrency::TaskQueue<false> &task_queue); /// As per ::SampleSource; provides a broadphase test for silence. bool is_zero_level() const; @@ -41,7 +41,7 @@ class SCC: public ::Outputs::Speaker::SampleSource { uint8_t read(uint16_t address); private: - Concurrency::DeferringAsyncTaskQueue &task_queue_; + Concurrency::TaskQueue<false> &task_queue_; // State from here on down is accessed ony from the audio thread. int master_divider_ = 0; diff --git a/Components/OPx/Implementation/OPLBase.hpp b/Components/OPx/Implementation/OPLBase.hpp index 6b6a71038..5409b7cb2 100644 --- a/Components/OPx/Implementation/OPLBase.hpp +++ b/Components/OPx/Implementation/OPLBase.hpp @@ -26,9 +26,9 @@ template <typename Child> class OPLBase: public ::Outputs::Speaker::SampleSource } protected: - OPLBase(Concurrency::DeferringAsyncTaskQueue &task_queue) : task_queue_(task_queue) {} + OPLBase(Concurrency::TaskQueue<false> &task_queue) : task_queue_(task_queue) {} - Concurrency::DeferringAsyncTaskQueue &task_queue_; + Concurrency::TaskQueue<false> &task_queue_; private: uint8_t selected_register_ = 0; diff --git a/Components/OPx/OPLL.cpp b/Components/OPx/OPLL.cpp index b5abd1460..7f86c80c1 100644 --- a/Components/OPx/OPLL.cpp +++ b/Components/OPx/OPLL.cpp @@ -12,7 +12,7 @@ using namespace Yamaha::OPL; -OPLL::OPLL(Concurrency::DeferringAsyncTaskQueue &task_queue, int audio_divider, bool is_vrc7): +OPLL::OPLL(Concurrency::TaskQueue<false> &task_queue, int audio_divider, bool is_vrc7): OPLBase(task_queue), audio_divider_(audio_divider), is_vrc7_(is_vrc7) { // Due to the way that sound mixing works on the OPLL, the audio divider may not // be larger than 4. @@ -74,7 +74,7 @@ OPLL::OPLL(Concurrency::DeferringAsyncTaskQueue &task_queue, int audio_divider, void OPLL::write_register(uint8_t address, uint8_t value) { // The OPLL doesn't have timers or other non-audio functions, so all writes // go to the audio queue. - task_queue_.defer([this, address, value] { + task_queue_.enqueue([this, address, value] { // The first 8 locations are used to define the custom instrument, and have // exactly the same format as the patch set arrays at the head of this file. if(address < 8) { diff --git a/Components/OPx/OPLL.hpp b/Components/OPx/OPLL.hpp index beaa816d8..07072ff41 100644 --- a/Components/OPx/OPLL.hpp +++ b/Components/OPx/OPLL.hpp @@ -24,7 +24,7 @@ namespace OPL { class OPLL: public OPLBase<OPLL> { public: /// Creates a new OPLL or VRC7. - OPLL(Concurrency::DeferringAsyncTaskQueue &task_queue, int audio_divider = 1, bool is_vrc7 = false); + OPLL(Concurrency::TaskQueue<false> &task_queue, int audio_divider = 1, bool is_vrc7 = false); /// As per ::SampleSource; provides audio output. void get_samples(std::size_t number_of_samples, std::int16_t *target); diff --git a/Components/SN76489/SN76489.cpp b/Components/SN76489/SN76489.cpp index bd92d4d60..a413b143c 100644 --- a/Components/SN76489/SN76489.cpp +++ b/Components/SN76489/SN76489.cpp @@ -13,7 +13,7 @@ using namespace TI; -SN76489::SN76489(Personality personality, Concurrency::DeferringAsyncTaskQueue &task_queue, int additional_divider) : task_queue_(task_queue) { +SN76489::SN76489(Personality personality, Concurrency::TaskQueue<false> &task_queue, int additional_divider) : task_queue_(task_queue) { set_sample_volume_range(0); switch(personality) { @@ -49,7 +49,7 @@ void SN76489::set_sample_volume_range(std::int16_t range) { } void SN76489::write(uint8_t value) { - task_queue_.defer([value, this] () { + task_queue_.enqueue([value, this] () { if(value & 0x80) { active_register_ = value; } diff --git a/Components/SN76489/SN76489.hpp b/Components/SN76489/SN76489.hpp index 6f5b0c151..43da90661 100644 --- a/Components/SN76489/SN76489.hpp +++ b/Components/SN76489/SN76489.hpp @@ -23,7 +23,7 @@ class SN76489: public Outputs::Speaker::SampleSource { }; /// Creates a new SN76489. - SN76489(Personality personality, Concurrency::DeferringAsyncTaskQueue &task_queue, int additional_divider = 1); + SN76489(Personality personality, Concurrency::TaskQueue<false> &task_queue, int additional_divider = 1); /// Writes a new value to the SN76489. void write(uint8_t value); @@ -41,7 +41,7 @@ class SN76489: public Outputs::Speaker::SampleSource { void evaluate_output_volume(); int volumes_[16]; - Concurrency::DeferringAsyncTaskQueue &task_queue_; + Concurrency::TaskQueue<false> &task_queue_; struct ToneChannel { // Programmatically-set state; updated by the processor. diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp deleted file mode 100644 index ccc44a990..000000000 --- a/Concurrency/AsyncTaskQueue.cpp +++ /dev/null @@ -1,110 +0,0 @@ -// -// AsyncTaskQueue.cpp -// Clock Signal -// -// Created by Thomas Harte on 07/10/2016. -// Copyright 2016 Thomas Harte. All rights reserved. -// - -#include "AsyncTaskQueue.hpp" - -using namespace Concurrency; - -AsyncTaskQueue::AsyncTaskQueue() -#ifndef USE_GCD - : - should_destruct_(false), - thread_([this] () { - while(!should_destruct_) { - std::function<void(void)> next_function; - - // Take lock, check for a new task. - std::unique_lock lock(queue_mutex_); - if(!pending_tasks_.empty()) { - next_function = pending_tasks_.front(); - pending_tasks_.pop_front(); - } - - if(next_function) { - // If there is a task, release lock and perform it. - lock.unlock(); - next_function(); - } else { - // If there isn't a task, atomically block on the processing condition and release the lock - // until there's something pending (and then release it again via scope). - processing_condition_.wait(lock); - } - } - }) -#else - : serial_dispatch_queue_(dispatch_queue_create("com.thomasharte.clocksignal.asyntaskqueue", DISPATCH_QUEUE_SERIAL)) -#endif -{} - -AsyncTaskQueue::~AsyncTaskQueue() { -#ifdef USE_GCD - flush(); - dispatch_release(serial_dispatch_queue_); -#else - // Set should destruct, and then give the thread a bit of a nudge - // via an empty enqueue. - should_destruct_ = true; - enqueue([](){}); - - // Wait for the thread safely to terminate. - thread_.join(); -#endif -} - -void AsyncTaskQueue::enqueue(std::function<void(void)> function) { -#ifdef USE_GCD - dispatch_async(serial_dispatch_queue_, ^{function();}); -#else - std::lock_guard lock(queue_mutex_); - pending_tasks_.push_back(function); - processing_condition_.notify_all(); -#endif -} - -void AsyncTaskQueue::flush() { -#ifdef USE_GCD - dispatch_sync(serial_dispatch_queue_, ^{}); -#else - auto flush_mutex = std::make_shared<std::mutex>(); - auto flush_condition = std::make_shared<std::condition_variable>(); - std::unique_lock lock(*flush_mutex); - enqueue([=] () { - std::unique_lock inner_lock(*flush_mutex); - flush_condition->notify_all(); - }); - flush_condition->wait(lock); -#endif -} - -DeferringAsyncTaskQueue::~DeferringAsyncTaskQueue() { - perform(); - flush(); -} - -void DeferringAsyncTaskQueue::defer(std::function<void(void)> function) { - if(!deferred_tasks_) { - deferred_tasks_ = std::make_unique<TaskList>(); - deferred_tasks_->reserve(16); - } - deferred_tasks_->push_back(function); -} - -void DeferringAsyncTaskQueue::perform() { - if(!deferred_tasks_) return; - enqueue([deferred_tasks_raw = deferred_tasks_.release()] { - std::unique_ptr<TaskList> deferred_tasks(deferred_tasks_raw); - for(const auto &function : *deferred_tasks) { - function(); - } - }); -} - -void DeferringAsyncTaskQueue::flush() { - perform(); - AsyncTaskQueue::flush(); -} diff --git a/Concurrency/AsyncTaskQueue.hpp b/Concurrency/AsyncTaskQueue.hpp index f301a8717..da56e4388 100644 --- a/Concurrency/AsyncTaskQueue.hpp +++ b/Concurrency/AsyncTaskQueue.hpp @@ -12,11 +12,11 @@ #include <atomic> #include <condition_variable> #include <functional> -#include <list> -#include <memory> #include <thread> #include <vector> +#include "../ClockReceiver/TimeTypes.hpp" + #if defined(__APPLE__) && !defined(IGNORE_APPLE) #include <dispatch/dispatch.h> #define USE_GCD @@ -24,81 +24,159 @@ namespace Concurrency { -using TaskList = std::vector<std::function<void(void)>>; +/// An implementation detail; provides the time-centric part of a TaskQueue with a real Performer. +template <typename Performer> struct TaskQueueStorage { + template <typename... Args> TaskQueueStorage(Args&&... args) : + performer(std::forward<Args>(args)...), + last_fired_(Time::nanos_now()) {} -/*! - An async task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed - to be performed serially and asynchronously from the caller. A caller may also request to flush, - causing it to block until all previously-enqueued functions are complete. -*/ -class AsyncTaskQueue { - public: - AsyncTaskQueue(); - virtual ~AsyncTaskQueue(); + Performer performer; - /*! - Adds @c function to the queue. - - @discussion Functions will be performed serially and asynchronously. This method is safe to - call from multiple threads. - @parameter function The function to enqueue. - */ - void enqueue(std::function<void(void)> function); - - /*! - Blocks the caller until all previously-enqueud functions have completed. - */ - void flush(); + protected: + void update() { + auto time_now = Time::nanos_now(); + performer.perform(time_now - last_fired_); + last_fired_ = time_now; + } private: -#ifdef USE_GCD - dispatch_queue_t serial_dispatch_queue_; -#else - std::atomic_bool should_destruct_; - std::condition_variable processing_condition_; - std::mutex queue_mutex_; - std::list<std::function<void(void)>> pending_tasks_; + Time::Nanos last_fired_; +}; - std::thread thread_; -#endif +/// An implementation detail; provides a no-op implementation of time advances for TaskQueues without a Performer. +template <> struct TaskQueueStorage<int> { + TaskQueueStorage() {} + + protected: + void update() {} }; /*! - A deferring async task queue is one that accepts a list of functions to be performed but defers - any action until told to perform. It performs them by enquing a single asynchronous task that will - perform the deferred tasks in order. + A task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed + to be performed serially and asynchronously from the caller. - It therefore offers similar semantics to an asynchronous task queue, but allows for management of - synchronisation costs, since neither defer nor perform make any effort to be thread safe. + If @c perform_automatically is true, functions will be performed as soon as is possible, + at the cost of thread synchronisation. + + If @c perform_automatically is false, functions will be queued up and not dispatched + until a call to perform(). + + If a @c Performer type is supplied then a public member, @c performer will be constructed + with the arguments supplied to TaskQueue's constructor, and that class will receive calls of the + form @c .perform(nanos) to update it to every batch of new actions. */ -class DeferringAsyncTaskQueue: public AsyncTaskQueue { +template <bool perform_automatically, typename Performer = int> class TaskQueue: public TaskQueueStorage<Performer> { public: - ~DeferringAsyncTaskQueue(); + template <typename... Args> TaskQueue(Args&&... args) : + TaskQueueStorage<Performer>(std::forward<Args>(args)...), + thread_{ + [this] { + ActionVector actions; - /*! - Adds a function to the deferral list. + while(!should_quit) { + // Wait for new actions to be signalled, and grab them. + std::unique_lock lock(condition_mutex_); + while(actions_.empty()) { + condition_.wait(lock); + } + std::swap(actions, actions_); + lock.unlock(); - This is not thread safe; it should be serialised with other calls to itself and to perform. - */ - void defer(std::function<void(void)> function); + // Update to now (which is possibly a no-op). + TaskQueueStorage<Performer>::update(); - /*! - Enqueues a function that will perform all currently deferred functions, in the - order that they were deferred. + // Perform the actions and destroy them. + for(const auto &action: actions) { + action(); + } + actions.clear(); + } + } + } {} - This is not thread safe; it should be serialised with other calls to itself and to defer. - */ - void perform(); + /// Enqueus @c post_action to be performed asynchronously at some point + /// in the future. If @c perform_automatically is @c true then the action + /// will be performed as soon as possible. Otherwise it will sit unsheculed until + /// a call to @c perform(). + /// + /// Actions may be elided. + /// + /// If this TaskQueue has a @c Performer then the action will be performed + /// on the same thread as the performer, after the performer has been updated + /// to 'now'. + void enqueue(const std::function<void(void)> &post_action) { + std::lock_guard guard(condition_mutex_); + actions_.push_back(post_action); - /*! - Blocks the caller until all previously-enqueud functions have completed. - */ - void flush(); + if constexpr (perform_automatically) { + condition_.notify_all(); + } + } + + /// Causes any enqueued actions that are not yet scheduled to be scheduled. + void perform() { + if(actions_.empty()) { + return; + } + condition_.notify_all(); + } + + /// Permanently stops this task queue, blocking until that has happened. + /// All pending actions will be performed first. + /// + /// The queue cannot be restarted; this is a destructive action. + void stop() { + if(thread_.joinable()) { + should_quit = true; + enqueue([] {}); + if constexpr (!perform_automatically) { + perform(); + } + thread_.join(); + } + } + + /// Schedules any remaining unscheduled work, then blocks synchronously + /// until all scheduled work has been performed. + void flush() { + std::mutex flush_mutex; + std::condition_variable flush_condition; + bool has_run = false; + std::unique_lock lock(flush_mutex); + + enqueue([&flush_mutex, &flush_condition, &has_run] () { + std::unique_lock inner_lock(flush_mutex); + has_run = true; + flush_condition.notify_all(); + }); + + if constexpr (!perform_automatically) { + perform(); + } + + flush_condition.wait(lock, [&has_run] { return has_run; }); + } + + ~TaskQueue() { + stop(); + } private: - std::unique_ptr<TaskList> deferred_tasks_; + // The list of actions waiting be performed. These will be elided, + // increasing their latency, if the emulation thread falls behind. + using ActionVector = std::vector<std::function<void(void)>>; + ActionVector actions_; + + // Necessary synchronisation parts. + std::atomic<bool> should_quit = false; + std::mutex condition_mutex_; + std::condition_variable condition_; + + // Ensure the thread isn't constructed until after the mutex + // and condition variable. + std::thread thread_; }; } -#endif /* Concurrency_hpp */ +#endif /* AsyncTaskQueue_hpp */ diff --git a/Concurrency/AsyncUpdater.hpp b/Concurrency/AsyncUpdater.hpp deleted file mode 100644 index cf241c590..000000000 --- a/Concurrency/AsyncUpdater.hpp +++ /dev/null @@ -1,98 +0,0 @@ -// -// AsyncUpdater.h -// Clock Signal -// -// Created by Thomas Harte on 06/07/2022. -// Copyright © 2022 Thomas Harte. All rights reserved. -// - -#ifndef AsyncUpdater_hpp -#define AsyncUpdater_hpp - -#include <atomic> -#include <condition_variable> -#include <mutex> - -#include "../ClockReceiver/TimeTypes.hpp" - -namespace Concurrency { - -template <typename Performer> class AsyncUpdater { - public: - template <typename... Args> AsyncUpdater(Args&&... args) : - performer(std::forward<Args>(args)...), - performer_thread_{ - [this] { - Time::Nanos last_fired = Time::nanos_now(); - ActionVector actions; - - while(!should_quit) { - // Wait for new actions to be signalled, and grab them. - std::unique_lock lock(condition_mutex_); - while(actions_.empty()) { - condition_.wait(lock); - } - std::swap(actions, actions_); - lock.unlock(); - - // Update to now. - auto time_now = Time::nanos_now(); - performer.perform(time_now - last_fired); - last_fired = time_now; - - // Perform the actions. - for(const auto& action: actions) { - action(); - } - actions.clear(); - } - } - } {} - - /// Run the performer up to 'now' and then perform @c post_action. - /// - /// @c post_action will be performed asynchronously, on the same - /// thread as the performer. - /// - /// Actions may be elided, - void update(const std::function<void(void)> &post_action) { - std::lock_guard guard(condition_mutex_); - actions_.push_back(post_action); - condition_.notify_all(); - } - - void stop() { - if(performer_thread_.joinable()) { - should_quit = true; - update([] {}); - performer_thread_.join(); - } - } - - ~AsyncUpdater() { - stop(); - } - - // The object that will actually receive time advances. - Performer performer; - - private: - // The list of actions waiting be performed. These will be elided, - // increasing their latency, if the emulation thread falls behind. - using ActionVector = std::vector<std::function<void(void)>>; - ActionVector actions_; - - // Necessary synchronisation parts. - std::atomic<bool> should_quit = false; - std::mutex condition_mutex_; - std::condition_variable condition_; - - // Ensure the thread isn't constructed until after the mutex - // and condition variable. - std::thread performer_thread_; -}; - - -} - -#endif /* AsyncUpdater_hpp */ diff --git a/Machines/Amiga/Audio.hpp b/Machines/Amiga/Audio.hpp index ae63339fb..41b414f65 100644 --- a/Machines/Amiga/Audio.hpp +++ b/Machines/Amiga/Audio.hpp @@ -149,7 +149,7 @@ class Audio: public DMADevice<4> { // Transient output state, and its destination. Outputs::Speaker::PushLowpass<true> speaker_; - Concurrency::AsyncTaskQueue queue_; + Concurrency::TaskQueue<true> queue_; using AudioBuffer = std::array<int16_t, 4096>; static constexpr int BufferCount = 3; diff --git a/Machines/AmstradCPC/AmstradCPC.cpp b/Machines/AmstradCPC/AmstradCPC.cpp index db4c88391..ca41eb1b2 100644 --- a/Machines/AmstradCPC/AmstradCPC.cpp +++ b/Machines/AmstradCPC/AmstradCPC.cpp @@ -156,7 +156,7 @@ class AYDeferrer { } private: - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; GI::AY38910::AY38910<true> ay_; Outputs::Speaker::PullLowpass<GI::AY38910::AY38910<true>> speaker_; HalfCycles cycles_since_update_; diff --git a/Machines/Apple/AppleII/AppleII.cpp b/Machines/Apple/AppleII/AppleII.cpp index 706e4cb6c..fbbc623d3 100644 --- a/Machines/Apple/AppleII/AppleII.cpp +++ b/Machines/Apple/AppleII/AppleII.cpp @@ -95,7 +95,7 @@ template <Analyser::Static::AppleII::Target::Model model> class ConcreteMachine: uint8_t ram_[65536], aux_ram_[65536]; std::vector<uint8_t> rom_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; Audio::Toggle audio_toggle_; Outputs::Speaker::PullLowpass<Audio::Toggle> speaker_; Cycles cycles_since_audio_update_; diff --git a/Machines/Apple/AppleIIgs/AppleIIgs.cpp b/Machines/Apple/AppleIIgs/AppleIIgs.cpp index 546eeb68a..7037af170 100644 --- a/Machines/Apple/AppleIIgs/AppleIIgs.cpp +++ b/Machines/Apple/AppleIIgs/AppleIIgs.cpp @@ -1150,7 +1150,7 @@ class ConcreteMachine: Apple::Disk::DiskIIDrive drives525_[2]; // The audio parts. - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; Apple::IIgs::Sound::GLU sound_glu_; Audio::Toggle audio_toggle_; using AudioSource = Outputs::Speaker::CompoundSource<Apple::IIgs::Sound::GLU, Audio::Toggle>; diff --git a/Machines/Apple/AppleIIgs/Sound.cpp b/Machines/Apple/AppleIIgs/Sound.cpp index 7e5dbf36e..942746257 100644 --- a/Machines/Apple/AppleIIgs/Sound.cpp +++ b/Machines/Apple/AppleIIgs/Sound.cpp @@ -16,7 +16,7 @@ using namespace Apple::IIgs::Sound; -GLU::GLU(Concurrency::DeferringAsyncTaskQueue &audio_queue) : audio_queue_(audio_queue) { +GLU::GLU(Concurrency::TaskQueue<false> &audio_queue) : audio_queue_(audio_queue) { // Reset all pending stores. MemoryWrite disabled_write; disabled_write.enabled = false; @@ -42,7 +42,7 @@ void GLU::set_data(uint8_t data) { // Register access. const auto address = address_; // To make sure I don't inadvertently 'capture' address_. local_.set_register(address, data); - audio_queue_.defer([this, address, data] () { + audio_queue_.enqueue([this, address, data] () { remote_.set_register(address, data); }); } @@ -191,7 +191,7 @@ void GLU::set_sample_volume_range(std::int16_t range) { void GLU::set_control(uint8_t control) { local_.control = control; - audio_queue_.defer([this, control] () { + audio_queue_.enqueue([this, control] () { remote_.control = control; }); } diff --git a/Machines/Apple/AppleIIgs/Sound.hpp b/Machines/Apple/AppleIIgs/Sound.hpp index 24f99c53c..7b822c3da 100644 --- a/Machines/Apple/AppleIIgs/Sound.hpp +++ b/Machines/Apple/AppleIIgs/Sound.hpp @@ -21,7 +21,7 @@ namespace Sound { class GLU: public Outputs::Speaker::SampleSource { public: - GLU(Concurrency::DeferringAsyncTaskQueue &audio_queue); + GLU(Concurrency::TaskQueue<false> &audio_queue); void set_control(uint8_t); uint8_t get_control(); @@ -42,7 +42,7 @@ class GLU: public Outputs::Speaker::SampleSource { void skip_samples(const std::size_t number_of_samples); private: - Concurrency::DeferringAsyncTaskQueue &audio_queue_; + Concurrency::TaskQueue<false> &audio_queue_; uint16_t address_ = 0; diff --git a/Machines/Apple/Macintosh/Audio.cpp b/Machines/Apple/Macintosh/Audio.cpp index fba2c11ea..076c2b857 100644 --- a/Machines/Apple/Macintosh/Audio.cpp +++ b/Machines/Apple/Macintosh/Audio.cpp @@ -18,7 +18,7 @@ const std::size_t sample_length = 352 / 2; } -Audio::Audio(Concurrency::DeferringAsyncTaskQueue &task_queue) : task_queue_(task_queue) {} +Audio::Audio(Concurrency::TaskQueue<false> &task_queue) : task_queue_(task_queue) {} // MARK: - Inputs @@ -35,7 +35,7 @@ void Audio::set_volume(int volume) { posted_volume_ = volume; // Post the volume change as a deferred event. - task_queue_.defer([this, volume] () { + task_queue_.enqueue([this, volume] () { volume_ = volume; set_volume_multiplier(); }); @@ -47,7 +47,7 @@ void Audio::set_enabled(bool on) { posted_enable_mask_ = int(on); // Post the enabled mask change as a deferred event. - task_queue_.defer([this, on] () { + task_queue_.enqueue([this, on] () { enabled_mask_ = int(on); set_volume_multiplier(); }); diff --git a/Machines/Apple/Macintosh/Audio.hpp b/Machines/Apple/Macintosh/Audio.hpp index a0ae25a80..5bb4b7057 100644 --- a/Machines/Apple/Macintosh/Audio.hpp +++ b/Machines/Apple/Macintosh/Audio.hpp @@ -27,7 +27,7 @@ namespace Macintosh { */ class Audio: public ::Outputs::Speaker::SampleSource { public: - Audio(Concurrency::DeferringAsyncTaskQueue &task_queue); + Audio(Concurrency::TaskQueue<false> &task_queue); /*! Macintosh audio is (partly) sourced by the same scanning @@ -58,7 +58,7 @@ class Audio: public ::Outputs::Speaker::SampleSource { constexpr static bool get_is_stereo() { return false; } private: - Concurrency::DeferringAsyncTaskQueue &task_queue_; + Concurrency::TaskQueue<false> &task_queue_; // A queue of fetched samples; read from by one thread, // written to by another. diff --git a/Machines/Apple/Macintosh/DeferredAudio.hpp b/Machines/Apple/Macintosh/DeferredAudio.hpp index 5488f579c..fe0ebdb3a 100644 --- a/Machines/Apple/Macintosh/DeferredAudio.hpp +++ b/Machines/Apple/Macintosh/DeferredAudio.hpp @@ -16,7 +16,7 @@ namespace Apple { namespace Macintosh { struct DeferredAudio { - Concurrency::DeferringAsyncTaskQueue queue; + Concurrency::TaskQueue<false> queue; Audio audio; Outputs::Speaker::PullLowpass<Audio> speaker; HalfCycles time_since_update; diff --git a/Machines/Atari/2600/Bus.hpp b/Machines/Atari/2600/Bus.hpp index 96b1a99dd..c4df283e5 100644 --- a/Machines/Atari/2600/Bus.hpp +++ b/Machines/Atari/2600/Bus.hpp @@ -39,7 +39,7 @@ class Bus { PIA mos6532_; TIA tia_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; TIASound tia_sound_; Outputs::Speaker::PullLowpass<TIASound> speaker_; diff --git a/Machines/Atari/2600/TIASound.cpp b/Machines/Atari/2600/TIASound.cpp index 463c19d93..62b60e988 100644 --- a/Machines/Atari/2600/TIASound.cpp +++ b/Machines/Atari/2600/TIASound.cpp @@ -10,7 +10,7 @@ using namespace Atari2600; -Atari2600::TIASound::TIASound(Concurrency::DeferringAsyncTaskQueue &audio_queue) : +Atari2600::TIASound::TIASound(Concurrency::TaskQueue<false> &audio_queue) : audio_queue_(audio_queue), poly4_counter_{0x00f, 0x00f}, poly5_counter_{0x01f, 0x01f}, @@ -18,20 +18,20 @@ Atari2600::TIASound::TIASound(Concurrency::DeferringAsyncTaskQueue &audio_queue) {} void Atari2600::TIASound::set_volume(int channel, uint8_t volume) { - audio_queue_.defer([target = &volume_[channel], volume]() { + audio_queue_.enqueue([target = &volume_[channel], volume]() { *target = volume & 0xf; }); } void Atari2600::TIASound::set_divider(int channel, uint8_t divider) { - audio_queue_.defer([this, channel, divider]() { + audio_queue_.enqueue([this, channel, divider]() { divider_[channel] = divider & 0x1f; divider_counter_[channel] = 0; }); } void Atari2600::TIASound::set_control(int channel, uint8_t control) { - audio_queue_.defer([target = &control_[channel], control]() { + audio_queue_.enqueue([target = &control_[channel], control]() { *target = control & 0xf; }); } diff --git a/Machines/Atari/2600/TIASound.hpp b/Machines/Atari/2600/TIASound.hpp index 8bc84cc2a..06c4b99bf 100644 --- a/Machines/Atari/2600/TIASound.hpp +++ b/Machines/Atari/2600/TIASound.hpp @@ -20,7 +20,7 @@ constexpr int CPUTicksPerAudioTick = 2; class TIASound: public Outputs::Speaker::SampleSource { public: - TIASound(Concurrency::DeferringAsyncTaskQueue &audio_queue); + TIASound(Concurrency::TaskQueue<false> &audio_queue); void set_volume(int channel, uint8_t volume); void set_divider(int channel, uint8_t divider); @@ -32,7 +32,7 @@ class TIASound: public Outputs::Speaker::SampleSource { static constexpr bool get_is_stereo() { return false; } private: - Concurrency::DeferringAsyncTaskQueue &audio_queue_; + Concurrency::TaskQueue<false> &audio_queue_; uint8_t volume_[2]; uint8_t divider_[2]; diff --git a/Machines/Atari/ST/AtariST.cpp b/Machines/Atari/ST/AtariST.cpp index 2721de88c..b82d5ef62 100644 --- a/Machines/Atari/ST/AtariST.cpp +++ b/Machines/Atari/ST/AtariST.cpp @@ -485,7 +485,7 @@ class ConcreteMachine: JustInTimeActor<Motorola::ACIA::ACIA, HalfCycles, 16> keyboard_acia_; JustInTimeActor<Motorola::ACIA::ACIA, HalfCycles, 16> midi_acia_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; GI::AY38910::AY38910<false> ay_; Outputs::Speaker::PullLowpass<GI::AY38910::AY38910<false>> speaker_; HalfCycles cycles_since_audio_update_; diff --git a/Machines/ColecoVision/ColecoVision.cpp b/Machines/ColecoVision/ColecoVision.cpp index 5123abdb5..a161b847c 100644 --- a/Machines/ColecoVision/ColecoVision.cpp +++ b/Machines/ColecoVision/ColecoVision.cpp @@ -381,7 +381,7 @@ class ConcreteMachine: CPU::Z80::Processor<ConcreteMachine, false, false> z80_; JustInTimeActor<TI::TMS::TMS9918> vdp_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; TI::SN76489 sn76489_; GI::AY38910::AY38910<false> ay_; Outputs::Speaker::CompoundSource<TI::SN76489, GI::AY38910::AY38910<false>> mixer_; diff --git a/Machines/Electron/Electron.cpp b/Machines/Electron/Electron.cpp index b0f3591e3..6f6fe4f86 100644 --- a/Machines/Electron/Electron.cpp +++ b/Machines/Electron/Electron.cpp @@ -770,7 +770,7 @@ template <bool has_scsi_bus> class ConcreteMachine: // Outputs JustInTimeActor<VideoOutput, Cycles> video_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; SoundGenerator sound_generator_; Outputs::Speaker::PullLowpass<SoundGenerator> speaker_; diff --git a/Machines/Electron/SoundGenerator.cpp b/Machines/Electron/SoundGenerator.cpp index 73bcac8ab..eb3bab3a5 100644 --- a/Machines/Electron/SoundGenerator.cpp +++ b/Machines/Electron/SoundGenerator.cpp @@ -12,7 +12,7 @@ using namespace Electron; -SoundGenerator::SoundGenerator(Concurrency::DeferringAsyncTaskQueue &audio_queue) : +SoundGenerator::SoundGenerator(Concurrency::TaskQueue<false> &audio_queue) : audio_queue_(audio_queue) {} void SoundGenerator::set_sample_volume_range(std::int16_t range) { @@ -36,13 +36,13 @@ void SoundGenerator::skip_samples(std::size_t number_of_samples) { } void SoundGenerator::set_divider(uint8_t divider) { - audio_queue_.defer([this, divider]() { + audio_queue_.enqueue([this, divider]() { divider_ = divider * 32 / clock_rate_divider; }); } void SoundGenerator::set_is_enabled(bool is_enabled) { - audio_queue_.defer([this, is_enabled]() { + audio_queue_.enqueue([this, is_enabled]() { is_enabled_ = is_enabled; counter_ = 0; }); diff --git a/Machines/Electron/SoundGenerator.hpp b/Machines/Electron/SoundGenerator.hpp index fff2ba15c..72aa6e929 100644 --- a/Machines/Electron/SoundGenerator.hpp +++ b/Machines/Electron/SoundGenerator.hpp @@ -16,7 +16,7 @@ namespace Electron { class SoundGenerator: public ::Outputs::Speaker::SampleSource { public: - SoundGenerator(Concurrency::DeferringAsyncTaskQueue &audio_queue); + SoundGenerator(Concurrency::TaskQueue<false> &audio_queue); void set_divider(uint8_t divider); @@ -31,7 +31,7 @@ class SoundGenerator: public ::Outputs::Speaker::SampleSource { static constexpr bool get_is_stereo() { return false; } private: - Concurrency::DeferringAsyncTaskQueue &audio_queue_; + Concurrency::TaskQueue<false> &audio_queue_; unsigned int counter_ = 0; unsigned int divider_ = 0; bool is_enabled_ = false; diff --git a/Machines/Enterprise/Dave.cpp b/Machines/Enterprise/Dave.cpp index f920822f2..150a6765c 100644 --- a/Machines/Enterprise/Dave.cpp +++ b/Machines/Enterprise/Dave.cpp @@ -12,12 +12,12 @@ using namespace Enterprise::Dave; // MARK: - Audio generator -Audio::Audio(Concurrency::DeferringAsyncTaskQueue &audio_queue) : +Audio::Audio(Concurrency::TaskQueue<false> &audio_queue) : audio_queue_(audio_queue) {} void Audio::write(uint16_t address, uint8_t value) { address &= 0x1f; - audio_queue_.defer([address, value, this] { + audio_queue_.enqueue([address, value, this] { switch(address) { case 0: case 2: case 4: channels_[address >> 1].reload = (channels_[address >> 1].reload & 0xff00) | value; @@ -63,7 +63,7 @@ void Audio::write(uint16_t address, uint8_t value) { } void Audio::set_sample_volume_range(int16_t range) { - audio_queue_.defer([range, this] { + audio_queue_.enqueue([range, this] { volume_ = range / (63*4); }); } diff --git a/Machines/Enterprise/Dave.hpp b/Machines/Enterprise/Dave.hpp index e2443779f..cafabdb5f 100644 --- a/Machines/Enterprise/Dave.hpp +++ b/Machines/Enterprise/Dave.hpp @@ -30,7 +30,7 @@ enum class Interrupt: uint8_t { */ class Audio: public Outputs::Speaker::SampleSource { public: - Audio(Concurrency::DeferringAsyncTaskQueue &audio_queue); + Audio(Concurrency::TaskQueue<false> &audio_queue); /// Modifies an register in the audio range; only the low 4 bits are /// used for register decoding so it's assumed that the caller has @@ -43,7 +43,7 @@ class Audio: public Outputs::Speaker::SampleSource { void get_samples(std::size_t number_of_samples, int16_t *target); private: - Concurrency::DeferringAsyncTaskQueue &audio_queue_; + Concurrency::TaskQueue<false> &audio_queue_; // Global divider (i.e. 8MHz/12Mhz switch). uint8_t global_divider_; diff --git a/Machines/Enterprise/Enterprise.cpp b/Machines/Enterprise/Enterprise.cpp index 1481d8e13..0435b6e94 100644 --- a/Machines/Enterprise/Enterprise.cpp +++ b/Machines/Enterprise/Enterprise.cpp @@ -705,7 +705,7 @@ template <bool has_disk_controller, bool is_6mhz> class ConcreteMachine: bool previous_nick_interrupt_line_ = false; // Cf. timing guesses above. - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; Dave::Audio dave_audio_; Outputs::Speaker::PullLowpass<Dave::Audio> speaker_; HalfCycles time_since_audio_update_; diff --git a/Machines/MSX/MSX.cpp b/Machines/MSX/MSX.cpp index 71c598c9f..6013881e5 100644 --- a/Machines/MSX/MSX.cpp +++ b/Machines/MSX/MSX.cpp @@ -747,7 +747,7 @@ class ConcreteMachine: JustInTimeActor<TI::TMS::TMS9918> vdp_; Intel::i8255::i8255<i8255PortHandler> i8255_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; GI::AY38910::AY38910<false> ay_; Audio::Toggle audio_toggle_; Konami::SCC scc_; diff --git a/Machines/MasterSystem/MasterSystem.cpp b/Machines/MasterSystem/MasterSystem.cpp index 3e5fab9c9..0eedcd77c 100644 --- a/Machines/MasterSystem/MasterSystem.cpp +++ b/Machines/MasterSystem/MasterSystem.cpp @@ -459,7 +459,7 @@ class ConcreteMachine: // This is as per the audio control register; // see https://www.smspower.org/Development/AudioControlPort update_audio(); - audio_queue_.defer([this, mode] { + audio_queue_.enqueue([this, mode] { switch(mode & 3) { case 0: // SN76489 only; the default. mixer_.set_relative_volumes({1.0f, 0.0f}); @@ -487,7 +487,7 @@ class ConcreteMachine: CPU::Z80::Processor<ConcreteMachine, false, false> z80_; JustInTimeActor<TI::TMS::TMS9918> vdp_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; TI::SN76489 sn76489_; Yamaha::OPL::OPLL opll_; Outputs::Speaker::CompoundSource<decltype(sn76489_), decltype(opll_)> mixer_; diff --git a/Machines/Oric/Oric.cpp b/Machines/Oric/Oric.cpp index 808c80725..fe6386ac4 100644 --- a/Machines/Oric/Oric.cpp +++ b/Machines/Oric/Oric.cpp @@ -178,7 +178,7 @@ class TapePlayer: public Storage::Tape::BinaryTapePlayer { */ class VIAPortHandler: public MOS::MOS6522::IRQDelegatePortHandler { public: - VIAPortHandler(Concurrency::DeferringAsyncTaskQueue &audio_queue, AY &ay8910, Speaker &speaker, TapePlayer &tape_player, Keyboard &keyboard) : + VIAPortHandler(Concurrency::TaskQueue<false> &audio_queue, AY &ay8910, Speaker &speaker, TapePlayer &tape_player, Keyboard &keyboard) : audio_queue_(audio_queue), ay8910_(ay8910), speaker_(speaker), tape_player_(tape_player), keyboard_(keyboard) { // Attach a couple of joysticks. @@ -254,7 +254,7 @@ class VIAPortHandler: public MOS::MOS6522::IRQDelegatePortHandler { uint8_t porta_output_ = 0xff; HalfCycles cycles_since_ay_update_; - Concurrency::DeferringAsyncTaskQueue &audio_queue_; + Concurrency::TaskQueue<false> &audio_queue_; AY &ay8910_; Speaker &speaker_; TapePlayer &tape_player_; @@ -711,7 +711,7 @@ template <Analyser::Static::Oric::Target::DiskInterface disk_interface, CPU::MOS // Outputs JustInTimeActor<VideoOutput, Cycles> video_; - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; GI::AY38910::AY38910<false> ay8910_; Speaker speaker_; diff --git a/Machines/Sinclair/ZX8081/ZX8081.cpp b/Machines/Sinclair/ZX8081/ZX8081.cpp index 2fbede437..4e6555118 100644 --- a/Machines/Sinclair/ZX8081/ZX8081.cpp +++ b/Machines/Sinclair/ZX8081/ZX8081.cpp @@ -468,7 +468,7 @@ template<bool is_zx81> class ConcreteMachine: } // MARK: - Audio - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; using AY = GI::AY38910::AY38910<false>; AY ay_; Outputs::Speaker::PullLowpass<AY> speaker_; diff --git a/Machines/Sinclair/ZXSpectrum/ZXSpectrum.cpp b/Machines/Sinclair/ZXSpectrum/ZXSpectrum.cpp index c01671947..f9a22231e 100644 --- a/Machines/Sinclair/ZXSpectrum/ZXSpectrum.cpp +++ b/Machines/Sinclair/ZXSpectrum/ZXSpectrum.cpp @@ -849,7 +849,7 @@ template<Model model> class ConcreteMachine: } // MARK: - Audio. - Concurrency::DeferringAsyncTaskQueue audio_queue_; + Concurrency::TaskQueue<false> audio_queue_; GI::AY38910::AY38910<false> ay_; Audio::Toggle audio_toggle_; Outputs::Speaker::CompoundSource<GI::AY38910::AY38910<false>, Audio::Toggle> mixer_; diff --git a/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj b/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj index 87026c868..89dc7e632 100644 --- a/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj +++ b/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj @@ -33,7 +33,6 @@ 4B05401F219D1618001BF69C /* ScanTarget.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B05401D219D1618001BF69C /* ScanTarget.cpp */; }; 4B055A7A1FAE78A00060FFFF /* SDL2.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 4B055A771FAE78210060FFFF /* SDL2.framework */; }; 4B055A7E1FAE84AA0060FFFF /* main.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B055A7C1FAE84A50060FFFF /* main.cpp */; }; - 4B055A8D1FAE85920060FFFF /* AsyncTaskQueue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */; }; 4B055A8F1FAE85A90060FFFF /* FileHolder.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B5FADB81DE3151600AEC565 /* FileHolder.cpp */; }; 4B055A901FAE85A90060FFFF /* TimedEventLoop.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4BB697C91D4B6D3E00248BDF /* TimedEventLoop.cpp */; }; 4B055A911FAE85B50060FFFF /* Cartridge.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4BEE0A6A1D72496600532C7B /* Cartridge.cpp */; }; @@ -221,7 +220,6 @@ 4B322E041F5A2E3C004EB04C /* Z80Base.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B322E031F5A2E3C004EB04C /* Z80Base.cpp */; }; 4B37EE821D7345A6006A09A4 /* BinaryDump.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B37EE801D7345A6006A09A4 /* BinaryDump.cpp */; }; 4B38F3481F2EC11D00D9235D /* AmstradCPC.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B38F3461F2EC11D00D9235D /* AmstradCPC.cpp */; }; - 4B3940E71DA83C8300427841 /* AsyncTaskQueue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */; }; 4B3BA0C31D318AEC005DD7A7 /* C1540Tests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C21D318AEB005DD7A7 /* C1540Tests.swift */; }; 4B3BA0CE1D318B44005DD7A7 /* C1540Bridge.mm in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C61D318B44005DD7A7 /* C1540Bridge.mm */; }; 4B3BA0CF1D318B44005DD7A7 /* MOS6522Bridge.mm in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C91D318B44005DD7A7 /* MOS6522Bridge.mm */; }; @@ -348,7 +346,6 @@ 4B7752C128217F490073E2C5 /* FAT.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B477709268FBE4D005C2340 /* FAT.cpp */; }; 4B7752C228217F5C0073E2C5 /* Spectrum.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B5D5C9525F56FC7001B4623 /* Spectrum.cpp */; }; 4B7752C328217F720073E2C5 /* Z80.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B8DD39526360DDF00B3C866 /* Z80.cpp */; }; - 4B778EEF23A5D6680000D260 /* AsyncTaskQueue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */; }; 4B778EF023A5D68C0000D260 /* 68000Storage.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4BFF1D3822337B0300838EA1 /* 68000Storage.cpp */; }; 4B778EF123A5D6B50000D260 /* 9918.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B0E04F91FC9FA3100F43484 /* 9918.cpp */; }; 4B778EF323A5DB230000D260 /* PCMSegment.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B4518731F75E91800926311 /* PCMSegment.cpp */; }; @@ -1298,7 +1295,6 @@ 4B37EE811D7345A6006A09A4 /* BinaryDump.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = BinaryDump.hpp; sourceTree = "<group>"; }; 4B38F3461F2EC11D00D9235D /* AmstradCPC.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = AmstradCPC.cpp; sourceTree = "<group>"; }; 4B38F3471F2EC11D00D9235D /* AmstradCPC.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = AmstradCPC.hpp; sourceTree = "<group>"; }; - 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = AsyncTaskQueue.cpp; sourceTree = "<group>"; }; 4B3940E61DA83C8300427841 /* AsyncTaskQueue.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = AsyncTaskQueue.hpp; sourceTree = "<group>"; }; 4B3AF7D02413470E00873C0B /* Enum.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = Enum.hpp; sourceTree = "<group>"; }; 4B3AF7D12413472200873C0B /* Struct.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = Struct.hpp; sourceTree = "<group>"; }; @@ -2138,7 +2134,6 @@ 4BDCC5F81FB27A5E001220C5 /* ROMMachine.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = ROMMachine.hpp; sourceTree = "<group>"; }; 4BDDBA981EF3451200347E61 /* Z80MachineCycleTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Z80MachineCycleTests.swift; sourceTree = "<group>"; }; 4BE0151C286A8C8E00EA42E9 /* MemorySwitches.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = MemorySwitches.hpp; sourceTree = "<group>"; }; - 4BE0151E28766ECF00EA42E9 /* AsyncUpdater.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = AsyncUpdater.hpp; sourceTree = "<group>"; }; 4BE0A3EC237BB170002AB46F /* ST.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = ST.cpp; sourceTree = "<group>"; }; 4BE0A3ED237BB170002AB46F /* ST.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = ST.hpp; sourceTree = "<group>"; }; 4BE211DD253E4E4800435408 /* 65C02_no_Rockwell_test.bin */ = {isa = PBXFileReference; lastKnownFileType = archive.macbinary; name = 65C02_no_Rockwell_test.bin; path = "Klaus Dormann/65C02_no_Rockwell_test.bin"; sourceTree = "<group>"; }; @@ -2753,9 +2748,7 @@ 4B3940E81DA83C8700427841 /* Concurrency */ = { isa = PBXGroup; children = ( - 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */, 4B3940E61DA83C8300427841 /* AsyncTaskQueue.hpp */, - 4BE0151E28766ECF00EA42E9 /* AsyncUpdater.hpp */, ); name = Concurrency; path = ../../Concurrency; @@ -5637,7 +5630,6 @@ 4B0ACC2923775819008902D0 /* DMAController.cpp in Sources */, 4B055A951FAE85BB0060FFFF /* BitReverse.cpp in Sources */, 4B055ACE1FAE9B030060FFFF /* Plus3.cpp in Sources */, - 4B055A8D1FAE85920060FFFF /* AsyncTaskQueue.cpp in Sources */, 4BAD13441FF709C700FD114A /* MSX.cpp in Sources */, 4B055AC41FAE9AE80060FFFF /* Keyboard.cpp in Sources */, 4B8DF506254E3C9D00F3433C /* ADB.cpp in Sources */, @@ -5887,7 +5879,6 @@ 4B17B58B20A8A9D9007CCA8F /* StringSerialiser.cpp in Sources */, 4B2E2D9D1C3A070400138695 /* Electron.cpp in Sources */, 4B051CA826781D6500CA44E8 /* StaticAnalyser.cpp in Sources */, - 4B3940E71DA83C8300427841 /* AsyncTaskQueue.cpp in Sources */, 4B0E04FA1FC9FA3100F43484 /* 9918.cpp in Sources */, 4B69FB3D1C4D908A00B5F0AA /* Tape.cpp in Sources */, 4B4518841F75E91A00926311 /* UnformattedTrack.cpp in Sources */, @@ -6118,7 +6109,6 @@ 4B7752A728217E060073E2C5 /* Blitter.cpp in Sources */, 4B778F4F23A5F21C0000D260 /* StaticAnalyser.cpp in Sources */, 4B8DD3682633B2D400B3C866 /* SpectrumVideoContentionTests.mm in Sources */, - 4B778EEF23A5D6680000D260 /* AsyncTaskQueue.cpp in Sources */, 4B7752A928217E200073E2C5 /* 65816Storage.cpp in Sources */, 4B7752AC28217E6E0073E2C5 /* StaticAnalyser.cpp in Sources */, 4B778F1223A5EC720000D260 /* CRT.cpp in Sources */, diff --git a/OSBindings/Mac/Clock Signal/Machine/CSMachine.mm b/OSBindings/Mac/Clock Signal/Machine/CSMachine.mm index dd2323fef..295262a60 100644 --- a/OSBindings/Mac/Clock Signal/Machine/CSMachine.mm +++ b/OSBindings/Mac/Clock Signal/Machine/CSMachine.mm @@ -24,7 +24,7 @@ #include "../../../../ClockReceiver/TimeTypes.hpp" #include "../../../../ClockReceiver/ScanSynchroniser.hpp" -#include "../../../../Concurrency/AsyncUpdater.hpp" +#include "../../../../Concurrency/AsyncTaskQueue.hpp" #import "CSStaticAnalyser+TargetVector.h" #import "NSBundle+DataResource.h" @@ -122,7 +122,7 @@ struct ActivityObserver: public Activity::Observer { CSJoystickManager *_joystickManager; NSMutableArray<CSMachineLED *> *_leds; - Concurrency::AsyncUpdater<MachineUpdater> updater; + Concurrency::TaskQueue<true, MachineUpdater> updater; Time::ScanSynchroniser _scanSynchroniser; NSTimer *_joystickTimer; @@ -455,7 +455,7 @@ struct ActivityObserver: public Activity::Observer { } - (void)applyInputEvent:(dispatch_block_t)event { - updater.update([event] { + updater.enqueue([event] { event(); }); } @@ -669,13 +669,13 @@ struct ActivityObserver: public Activity::Observer { #pragma mark - Timer - (void)audioQueueIsRunningDry:(nonnull CSAudioQueue *)audioQueue { - updater.update([self] { + updater.enqueue([self] { updater.performer.machine->flush_output(MachineTypes::TimedMachine::Output::Audio); }); } - (void)scanTargetViewDisplayLinkDidFire:(CSScanTargetView *)view now:(const CVTimeStamp *)now outputTime:(const CVTimeStamp *)outputTime { - updater.update([self] { + updater.enqueue([self] { // Grab a pointer to the timed machine from somewhere where it has already // been dynamically cast, to avoid that cost here. MachineTypes::TimedMachine *const timed_machine = updater.performer.machine; diff --git a/Outputs/Speaker/Implementation/LowpassSpeaker.hpp b/Outputs/Speaker/Implementation/LowpassSpeaker.hpp index 4a7638a93..d9d6aaa46 100644 --- a/Outputs/Speaker/Implementation/LowpassSpeaker.hpp +++ b/Outputs/Speaker/Implementation/LowpassSpeaker.hpp @@ -367,12 +367,12 @@ template <typename SampleSource> class PullLowpass: public LowpassBase<PullLowpa The speaker will advance by obtaining data from the sample source supplied at construction, filtering it and passing it on to the speaker's delegate if there is one. */ - void run_for(Concurrency::DeferringAsyncTaskQueue &queue, const Cycles cycles) { + void run_for(Concurrency::TaskQueue<false> &queue, const Cycles cycles) { if(cycles == Cycles(0)) { return; } - queue.defer([this, cycles] { + queue.enqueue([this, cycles] { run_for(cycles); }); } diff --git a/Storage/Disk/DiskImage/DiskImage.hpp b/Storage/Disk/DiskImage/DiskImage.hpp index d12d4709a..527565537 100644 --- a/Storage/Disk/DiskImage/DiskImage.hpp +++ b/Storage/Disk/DiskImage/DiskImage.hpp @@ -80,7 +80,7 @@ class DiskImageHolderBase: public Disk { protected: std::set<Track::Address> unwritten_tracks_; std::map<Track::Address, std::shared_ptr<Track>> cached_tracks_; - std::unique_ptr<Concurrency::AsyncTaskQueue> update_queue_; + std::unique_ptr<Concurrency::TaskQueue<true>> update_queue_; }; /*! diff --git a/Storage/Disk/DiskImage/DiskImageImplementation.hpp b/Storage/Disk/DiskImage/DiskImageImplementation.hpp index 799687419..57b181d53 100644 --- a/Storage/Disk/DiskImage/DiskImageImplementation.hpp +++ b/Storage/Disk/DiskImage/DiskImageImplementation.hpp @@ -20,7 +20,7 @@ template <typename T> bool DiskImageHolder<T>::get_is_read_only() { template <typename T> void DiskImageHolder<T>::flush_tracks() { if(!unwritten_tracks_.empty()) { - if(!update_queue_) update_queue_ = std::make_unique<Concurrency::AsyncTaskQueue>(); + if(!update_queue_) update_queue_ = std::make_unique<Concurrency::TaskQueue<true>>(); using TrackMap = std::map<Track::Address, std::shared_ptr<Track>>; std::shared_ptr<TrackMap> track_copies(new TrackMap);