diff --git a/Analyser/Dynamic/MultiMachine/Implementation/MultiSpeaker.cpp b/Analyser/Dynamic/MultiMachine/Implementation/MultiSpeaker.cpp index 26ec1d4de..c5a8fa724 100644 --- a/Analyser/Dynamic/MultiMachine/Implementation/MultiSpeaker.cpp +++ b/Analyser/Dynamic/MultiMachine/Implementation/MultiSpeaker.cpp @@ -53,7 +53,7 @@ void MultiSpeaker::speaker_did_complete_samples(Speaker *speaker, const std::vec std::lock_guard lock_guard(front_speaker_mutex_); if(speaker != front_speaker_) return; } - delegate_->speaker_did_complete_samples(this, buffer); + did_complete_samples(this, buffer); } void MultiSpeaker::speaker_did_change_input_clock(Speaker *speaker) { diff --git a/Concurrency/BestEffortUpdater.cpp b/Concurrency/BestEffortUpdater.cpp index 3d452a081..6eb3635a2 100644 --- a/Concurrency/BestEffortUpdater.cpp +++ b/Concurrency/BestEffortUpdater.cpp @@ -12,61 +12,89 @@ using namespace Concurrency; -BestEffortUpdater::BestEffortUpdater() { - // ATOMIC_FLAG_INIT isn't necessarily safe to use, so establish default state by other means. - update_is_ongoing_.clear(); -} +BestEffortUpdater::BestEffortUpdater() : + update_thread_([this]() { + this->update_loop(); + }) {} BestEffortUpdater::~BestEffortUpdater() { - // Don't allow further deconstruction until the task queue is stopped. + // Sever the delegate now, as soon as possible, then wait for any + // pending tasks to finish. + set_delegate(nullptr); flush(); + + // Wind up the update thread. + should_quit_ = true; + update(); + update_thread_.join(); } -void BestEffortUpdater::update() { - // Perform an update only if one is not currently ongoing. - if(!update_is_ongoing_.test_and_set()) { - async_task_queue_.enqueue([this]() { - // Get time now using the highest-resolution clock provided by the implementation, and determine - // the duration since the last time this section was entered. - const std::chrono::time_point now = std::chrono::high_resolution_clock::now(); - const auto elapsed = now - previous_time_point_; - previous_time_point_ = now; +void BestEffortUpdater::update(int flags) { + // Bump the requested target time and set the update requested flag. + { + std::lock_guard lock(update_mutex_); + has_skipped_ = update_requested_; + update_requested_ = true; + flags_ |= flags; + target_time_ = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + } + update_condition_.notify_one(); +} - if(has_previous_time_point_) { - // If the duration is valid, convert it to integer cycles, maintaining a rolling error and call the delegate - // if there is one. Proceed only if the number of cycles is positive, and cap it to the per-second maximum as - // it's possible this is an adjustable clock so be ready to swallow unexpected adjustments. - const int64_t integer_duration = std::chrono::duration_cast(elapsed).count(); - if(integer_duration > 0) { - if(delegate_) { - // Cap running at 1/5th of a second, to avoid doing a huge amount of work after any - // brief system interruption. - const double duration = std::min(static_cast(integer_duration) / 1e9, 0.2); - delegate_->update(this, duration, has_skipped_); - } - has_skipped_ = false; - } - } else { - has_previous_time_point_ = true; - } +void BestEffortUpdater::update_loop() { + while(true) { + std::unique_lock lock(update_mutex_); + is_updating_ = false; - // Allow furthers updates to occur. - update_is_ongoing_.clear(); - }); - } else { - async_task_queue_.enqueue([this]() { - has_skipped_ = true; + // Wait to be signalled. + update_condition_.wait(lock, [this]() -> bool { + return update_requested_; }); + + // Possibly this signalling really means 'quit'. + if(should_quit_) return; + + // Note update started, crib the target time. + auto target_time = target_time_; + update_requested_ = false; + + // If this was actually the first update request, silently swallow it. + if(!has_previous_time_point_) { + has_previous_time_point_ = true; + previous_time_point_ = target_time; + continue; + } + + // Release the lock on requesting new updates. + is_updating_ = true; + const int flags = flags_; + flags_ = 0; + lock.unlock(); + + // Invoke the delegate, if supplied, in order to run. + const int64_t integer_duration = std::max(target_time - previous_time_point_, int64_t(0)); + const auto delegate = delegate_.load(); + if(delegate) { + // Cap running at 1/5th of a second, to avoid doing a huge amount of work after any + // brief system interruption. + const double duration = std::min(double(integer_duration) / 1e9, 0.2); + const double elapsed_duraation = delegate->update(this, duration, has_skipped_, flags); + + previous_time_point_ += int64_t(elapsed_duraation * 1e9); + has_skipped_ = false; + } } } void BestEffortUpdater::flush() { - async_task_queue_.flush(); + // Spin lock; this is allowed to be slow. + while(true) { + std::lock_guard lock(update_mutex_); + if(!is_updating_) return; + } } void BestEffortUpdater::set_delegate(Delegate *const delegate) { - async_task_queue_.enqueue([this, delegate]() { - delegate_ = delegate; - }); + delegate_.store(delegate); } diff --git a/Concurrency/BestEffortUpdater.hpp b/Concurrency/BestEffortUpdater.hpp index 6979c6eaa..edf92edb7 100644 --- a/Concurrency/BestEffortUpdater.hpp +++ b/Concurrency/BestEffortUpdater.hpp @@ -11,8 +11,10 @@ #include #include +#include +#include +#include -#include "AsyncTaskQueue.hpp" #include "../ClockReceiver/TimeTypes.hpp" namespace Concurrency { @@ -31,7 +33,13 @@ class BestEffortUpdater { /// A delegate receives timing cues. struct Delegate { - virtual void update(BestEffortUpdater *updater, Time::Seconds duration, bool did_skip_previous_update) = 0; + /*! + Instructs the delegate to run for at least @c duration, providing hints as to whether multiple updates were requested before the previous had completed + (as @c did_skip_previous_update) and providing the union of any flags supplied to @c update. + + @returns The amount of time actually run for. + */ + virtual Time::Seconds update(BestEffortUpdater *updater, Time::Seconds duration, bool did_skip_previous_update, int flags) = 0; }; /// Sets the current delegate. @@ -41,20 +49,32 @@ class BestEffortUpdater { If the delegate is not currently in the process of an `update` call, calls it now to catch up to the current time. The call is asynchronous; this method will return immediately. */ - void update(); + void update(int flags = 0); - /// Blocks until any ongoing update is complete. + /// Blocks until any ongoing update is complete; may spin. void flush(); private: - std::atomic_flag update_is_ongoing_; - AsyncTaskQueue async_task_queue_; + std::atomic should_quit_; + std::atomic is_updating_; - std::chrono::time_point previous_time_point_; + int64_t target_time_; + int flags_ = 0; + bool update_requested_; + std::mutex update_mutex_; + std::condition_variable update_condition_; + + decltype(target_time_) previous_time_point_; bool has_previous_time_point_ = false; - bool has_skipped_ = false; + std::atomic has_skipped_ = false; - Delegate *delegate_ = nullptr; + std::atomicdelegate_ = nullptr; + + void update_loop(); + + // This is deliberately at the bottom, to ensure it constructs after the various + // mutexs, conditions, etc, that it'll depend upon. + std::thread update_thread_; }; } diff --git a/Machines/CRTMachine.hpp b/Machines/CRTMachine.hpp index 64be17500..db100c99c 100644 --- a/Machines/CRTMachine.hpp +++ b/Machines/CRTMachine.hpp @@ -51,6 +51,55 @@ class Machine { run_for(Cycles(static_cast(cycles))); } + /*! + Runs for the machine for at least @c duration seconds, and then until @c condition is true. + + @returns The amount of time run for. + */ + Time::Seconds run_until(Time::Seconds minimum_duration, std::function condition) { + Time::Seconds total_runtime = minimum_duration; + run_for(minimum_duration); + while(!condition()) { + // Advance in increments of one 500th of a second until the condition + // is true; that's 1/10th of a 50Hz frame, but more like 1/8.33 of a + // 60Hz frame. Though most machines aren't exactly 50Hz or 60Hz, and some + // are arbitrary other refresh rates. So those observations are merely + // for scale. + run_for(0.002); + total_runtime += 0.002; + } + return total_runtime; + } + + enum MachineEvent: int { + /// At least one new packet of audio has been delivered to the spaker's delegate. + NewSpeakerSamplesGenerated = 1 << 0 + }; + + /*! + Runs for at least @c duration seconds, and then every one of the @c events has occurred at least once since this + call to @c run_until_event. + + @param events A bitmask comprised of @c MachineEvent flags. + @returns The amount of time run for. + */ + Time::Seconds run_until(Time::Seconds minimum_duration, int events) { + // Tie up a wait-for-samples, if requested. + const Outputs::Speaker::Speaker *speaker = nullptr; + int sample_sets = 0; + if(events & MachineEvent::NewSpeakerSamplesGenerated) { + speaker = get_speaker(); + if(!speaker) events &= ~MachineEvent::NewSpeakerSamplesGenerated; + sample_sets = speaker->completed_sample_sets(); + } + + // Run until all requested events are satisfied. + return run_until(minimum_duration, [=]() { + return + (!(events & MachineEvent::NewSpeakerSamplesGenerated) || (sample_sets != speaker->completed_sample_sets())); + }); + } + protected: /// Runs the machine for @c cycles. virtual void run_for(const Cycles cycles) = 0; diff --git a/OSBindings/Mac/Clock Signal.xcodeproj/xcshareddata/xcschemes/Clock Signal.xcscheme b/OSBindings/Mac/Clock Signal.xcodeproj/xcshareddata/xcschemes/Clock Signal.xcscheme index 47f9c7286..1465a4f62 100644 --- a/OSBindings/Mac/Clock Signal.xcodeproj/xcshareddata/xcschemes/Clock Signal.xcscheme +++ b/OSBindings/Mac/Clock Signal.xcodeproj/xcshareddata/xcschemes/Clock Signal.xcscheme @@ -67,7 +67,7 @@ *)missingROMs NS_DESIGNATED_INITIALIZER; -- (void)runForInterval:(NSTimeInterval)interval; +- (NSTimeInterval)runForInterval:(NSTimeInterval)interval untilEvent:(int)events; - (float)idealSamplingRateFromRange:(NSRange)range; - (void)setAudioSamplingRate:(float)samplingRate bufferSize:(NSUInteger)bufferSize; diff --git a/OSBindings/Mac/Clock Signal/Machine/CSMachine.mm b/OSBindings/Mac/Clock Signal/Machine/CSMachine.mm index 9f36cd417..dbde81399 100644 --- a/OSBindings/Mac/Clock Signal/Machine/CSMachine.mm +++ b/OSBindings/Mac/Clock Signal/Machine/CSMachine.mm @@ -259,7 +259,7 @@ struct ActivityObserver: public Activity::Observer { } } -- (void)runForInterval:(NSTimeInterval)interval { +- (NSTimeInterval)runForInterval:(NSTimeInterval)interval untilEvent:(int)events { @synchronized(self) { if(_joystickMachine && _joystickManager) { [_joystickManager update]; @@ -309,7 +309,7 @@ struct ActivityObserver: public Activity::Observer { } } } - _machine->crt_machine()->run_for(interval); + return _machine->crt_machine()->run_until(interval, events); } } diff --git a/OSBindings/Mac/Clock Signal/Updater/CSBestEffortUpdater.h b/OSBindings/Mac/Clock Signal/Updater/CSBestEffortUpdater.h index b206e54b2..970104107 100644 --- a/OSBindings/Mac/Clock Signal/Updater/CSBestEffortUpdater.h +++ b/OSBindings/Mac/Clock Signal/Updater/CSBestEffortUpdater.h @@ -9,20 +9,19 @@ #import #import -@class CSBestEffortUpdater; - -@protocol CSBestEffortUpdaterDelegate - -- (void)bestEffortUpdater:(CSBestEffortUpdater *)bestEffortUpdater runForInterval:(NSTimeInterval)interval didSkipPreviousUpdate:(BOOL)didSkipPreviousUpdate; - -@end +#import "CSMachine.h" +// The following is coupled to the definitions in CRTMachine.hpp, but exposed here +// for the benefit of Swift. +typedef NS_ENUM(NSInteger, CSBestEffortUpdaterEvent) { + CSBestEffortUpdaterEventAudioNeeded = 1 << 0 +}; @interface CSBestEffortUpdater : NSObject -@property (nonatomic, weak) id delegate; - - (void)update; +- (void)updateWithEvent:(CSBestEffortUpdaterEvent)event; - (void)flush; +- (void)setMachine:(CSMachine *)machine; @end diff --git a/OSBindings/Mac/Clock Signal/Updater/CSBestEffortUpdater.mm b/OSBindings/Mac/Clock Signal/Updater/CSBestEffortUpdater.mm index bc22986db..a0ba3e21f 100644 --- a/OSBindings/Mac/Clock Signal/Updater/CSBestEffortUpdater.mm +++ b/OSBindings/Mac/Clock Signal/Updater/CSBestEffortUpdater.mm @@ -11,60 +11,41 @@ #include "BestEffortUpdater.hpp" struct UpdaterDelegate: public Concurrency::BestEffortUpdater::Delegate { - __weak id delegate; - NSLock *delegateLock; + __weak CSMachine *machine; - void update(Concurrency::BestEffortUpdater *updater, Time::Seconds cycles, bool did_skip_previous_update) { - [delegateLock lock]; - __weak id delegateCopy = delegate; - [delegateLock unlock]; - - [delegateCopy bestEffortUpdater:nil runForInterval:(NSTimeInterval)cycles didSkipPreviousUpdate:did_skip_previous_update]; + Time::Seconds update(Concurrency::BestEffortUpdater *updater, Time::Seconds seconds, bool did_skip_previous_update, int flags) final { + return [machine runForInterval:seconds untilEvent:flags]; } }; @implementation CSBestEffortUpdater { Concurrency::BestEffortUpdater _updater; UpdaterDelegate _updaterDelegate; - NSLock *_delegateLock; } - (instancetype)init { self = [super init]; if(self) { - _delegateLock = [[NSLock alloc] init]; - _updaterDelegate.delegateLock = _delegateLock; _updater.set_delegate(&_updaterDelegate); } return self; } -//- (void)dealloc { -// _updater.flush(); -//} - - (void)update { _updater.update(); } +- (void)updateWithEvent:(CSBestEffortUpdaterEvent)event { + _updater.update((int)event); +} + - (void)flush { _updater.flush(); } -- (void)setDelegate:(id)delegate { - [_delegateLock lock]; - _updaterDelegate.delegate = delegate; - [_delegateLock unlock]; -} - -- (id)delegate { - id delegate; - - [_delegateLock lock]; - delegate = _updaterDelegate.delegate; - [_delegateLock unlock]; - - return delegate; +- (void)setMachine:(CSMachine *)machine { + _updater.flush(); + _updaterDelegate.machine = machine; } @end diff --git a/OSBindings/SDL/main.cpp b/OSBindings/SDL/main.cpp index 4a44d0789..6a70d98dd 100644 --- a/OSBindings/SDL/main.cpp +++ b/OSBindings/SDL/main.cpp @@ -34,8 +34,8 @@ namespace { struct BestEffortUpdaterDelegate: public Concurrency::BestEffortUpdater::Delegate { - void update(Concurrency::BestEffortUpdater *updater, Time::Seconds duration, bool did_skip_previous_update) override { - machine->crt_machine()->run_for(duration); + Time::Seconds update(Concurrency::BestEffortUpdater *updater, Time::Seconds duration, bool did_skip_previous_update, int flags) override { + return machine->crt_machine()->run_until(duration, flags); } Machine::DynamicMachine *machine; diff --git a/Outputs/Speaker/Implementation/LowpassSpeaker.hpp b/Outputs/Speaker/Implementation/LowpassSpeaker.hpp index 38c9fed23..ced763a56 100644 --- a/Outputs/Speaker/Implementation/LowpassSpeaker.hpp +++ b/Outputs/Speaker/Implementation/LowpassSpeaker.hpp @@ -134,7 +134,7 @@ template class LowpassSpeaker: public Speaker { // announce to delegate if full if(output_buffer_pointer_ == output_buffer_.size()) { output_buffer_pointer_ = 0; - delegate_->speaker_did_complete_samples(this, output_buffer_); + did_complete_samples(this, output_buffer_); } cycles_remaining -= cycles_to_read; @@ -159,7 +159,7 @@ template class LowpassSpeaker: public Speaker { // Announce to delegate if full. if(output_buffer_pointer_ == output_buffer_.size()) { output_buffer_pointer_ = 0; - delegate_->speaker_did_complete_samples(this, output_buffer_); + did_complete_samples(this, output_buffer_); } // If the next loop around is going to reuse some of the samples just collected, use a memmove to diff --git a/Outputs/Speaker/Speaker.hpp b/Outputs/Speaker/Speaker.hpp index 11b06bfde..2f575880b 100644 --- a/Outputs/Speaker/Speaker.hpp +++ b/Outputs/Speaker/Speaker.hpp @@ -26,6 +26,8 @@ class Speaker { virtual float get_ideal_clock_rate_in_range(float minimum, float maximum) = 0; virtual void set_output_rate(float cycles_per_second, int buffer_size) = 0; + int completed_sample_sets() const { return completed_sample_sets_; } + struct Delegate { virtual void speaker_did_complete_samples(Speaker *speaker, const std::vector &buffer) = 0; virtual void speaker_did_change_input_clock(Speaker *speaker) {} @@ -35,7 +37,12 @@ class Speaker { } protected: + void did_complete_samples(Speaker *speaker, const std::vector &buffer) { + ++completed_sample_sets_; + delegate_->speaker_did_complete_samples(this, buffer); + } Delegate *delegate_ = nullptr; + int completed_sample_sets_ = 0; }; }