1
0
mirror of https://github.com/TomHarte/CLK.git synced 2024-11-23 03:32:32 +00:00

Merge pull request #736 from TomHarte/RunUntil

Implements a nascent `run_until`
This commit is contained in:
Thomas Harte 2020-01-20 17:48:13 -05:00 committed by GitHub
commit e74f37d6ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 184 additions and 110 deletions

View File

@ -53,7 +53,7 @@ void MultiSpeaker::speaker_did_complete_samples(Speaker *speaker, const std::vec
std::lock_guard<std::mutex> lock_guard(front_speaker_mutex_);
if(speaker != front_speaker_) return;
}
delegate_->speaker_did_complete_samples(this, buffer);
did_complete_samples(this, buffer);
}
void MultiSpeaker::speaker_did_change_input_clock(Speaker *speaker) {

View File

@ -12,61 +12,89 @@
using namespace Concurrency;
BestEffortUpdater::BestEffortUpdater() {
// ATOMIC_FLAG_INIT isn't necessarily safe to use, so establish default state by other means.
update_is_ongoing_.clear();
}
BestEffortUpdater::BestEffortUpdater() :
update_thread_([this]() {
this->update_loop();
}) {}
BestEffortUpdater::~BestEffortUpdater() {
// Don't allow further deconstruction until the task queue is stopped.
// Sever the delegate now, as soon as possible, then wait for any
// pending tasks to finish.
set_delegate(nullptr);
flush();
// Wind up the update thread.
should_quit_ = true;
update();
update_thread_.join();
}
void BestEffortUpdater::update() {
// Perform an update only if one is not currently ongoing.
if(!update_is_ongoing_.test_and_set()) {
async_task_queue_.enqueue([this]() {
// Get time now using the highest-resolution clock provided by the implementation, and determine
// the duration since the last time this section was entered.
const std::chrono::time_point<std::chrono::high_resolution_clock> now = std::chrono::high_resolution_clock::now();
const auto elapsed = now - previous_time_point_;
previous_time_point_ = now;
void BestEffortUpdater::update(int flags) {
// Bump the requested target time and set the update requested flag.
{
std::lock_guard<decltype(update_mutex_)> lock(update_mutex_);
has_skipped_ = update_requested_;
update_requested_ = true;
flags_ |= flags;
target_time_ = std::chrono::high_resolution_clock::now().time_since_epoch().count();
}
update_condition_.notify_one();
}
if(has_previous_time_point_) {
// If the duration is valid, convert it to integer cycles, maintaining a rolling error and call the delegate
// if there is one. Proceed only if the number of cycles is positive, and cap it to the per-second maximum as
// it's possible this is an adjustable clock so be ready to swallow unexpected adjustments.
const int64_t integer_duration = std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count();
if(integer_duration > 0) {
if(delegate_) {
// Cap running at 1/5th of a second, to avoid doing a huge amount of work after any
// brief system interruption.
const double duration = std::min(static_cast<double>(integer_duration) / 1e9, 0.2);
delegate_->update(this, duration, has_skipped_);
}
has_skipped_ = false;
}
} else {
has_previous_time_point_ = true;
}
void BestEffortUpdater::update_loop() {
while(true) {
std::unique_lock<decltype(update_mutex_)> lock(update_mutex_);
is_updating_ = false;
// Allow furthers updates to occur.
update_is_ongoing_.clear();
});
} else {
async_task_queue_.enqueue([this]() {
has_skipped_ = true;
// Wait to be signalled.
update_condition_.wait(lock, [this]() -> bool {
return update_requested_;
});
// Possibly this signalling really means 'quit'.
if(should_quit_) return;
// Note update started, crib the target time.
auto target_time = target_time_;
update_requested_ = false;
// If this was actually the first update request, silently swallow it.
if(!has_previous_time_point_) {
has_previous_time_point_ = true;
previous_time_point_ = target_time;
continue;
}
// Release the lock on requesting new updates.
is_updating_ = true;
const int flags = flags_;
flags_ = 0;
lock.unlock();
// Invoke the delegate, if supplied, in order to run.
const int64_t integer_duration = std::max(target_time - previous_time_point_, int64_t(0));
const auto delegate = delegate_.load();
if(delegate) {
// Cap running at 1/5th of a second, to avoid doing a huge amount of work after any
// brief system interruption.
const double duration = std::min(double(integer_duration) / 1e9, 0.2);
const double elapsed_duraation = delegate->update(this, duration, has_skipped_, flags);
previous_time_point_ += int64_t(elapsed_duraation * 1e9);
has_skipped_ = false;
}
}
}
void BestEffortUpdater::flush() {
async_task_queue_.flush();
// Spin lock; this is allowed to be slow.
while(true) {
std::lock_guard<decltype(update_mutex_)> lock(update_mutex_);
if(!is_updating_) return;
}
}
void BestEffortUpdater::set_delegate(Delegate *const delegate) {
async_task_queue_.enqueue([this, delegate]() {
delegate_ = delegate;
});
delegate_.store(delegate);
}

View File

@ -11,8 +11,10 @@
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
#include "AsyncTaskQueue.hpp"
#include "../ClockReceiver/TimeTypes.hpp"
namespace Concurrency {
@ -31,7 +33,13 @@ class BestEffortUpdater {
/// A delegate receives timing cues.
struct Delegate {
virtual void update(BestEffortUpdater *updater, Time::Seconds duration, bool did_skip_previous_update) = 0;
/*!
Instructs the delegate to run for at least @c duration, providing hints as to whether multiple updates were requested before the previous had completed
(as @c did_skip_previous_update) and providing the union of any flags supplied to @c update.
@returns The amount of time actually run for.
*/
virtual Time::Seconds update(BestEffortUpdater *updater, Time::Seconds duration, bool did_skip_previous_update, int flags) = 0;
};
/// Sets the current delegate.
@ -41,20 +49,32 @@ class BestEffortUpdater {
If the delegate is not currently in the process of an `update` call, calls it now to catch up to the current time.
The call is asynchronous; this method will return immediately.
*/
void update();
void update(int flags = 0);
/// Blocks until any ongoing update is complete.
/// Blocks until any ongoing update is complete; may spin.
void flush();
private:
std::atomic_flag update_is_ongoing_;
AsyncTaskQueue async_task_queue_;
std::atomic<bool> should_quit_;
std::atomic<bool> is_updating_;
std::chrono::time_point<std::chrono::high_resolution_clock> previous_time_point_;
int64_t target_time_;
int flags_ = 0;
bool update_requested_;
std::mutex update_mutex_;
std::condition_variable update_condition_;
decltype(target_time_) previous_time_point_;
bool has_previous_time_point_ = false;
bool has_skipped_ = false;
std::atomic<bool> has_skipped_ = false;
Delegate *delegate_ = nullptr;
std::atomic<Delegate *>delegate_ = nullptr;
void update_loop();
// This is deliberately at the bottom, to ensure it constructs after the various
// mutexs, conditions, etc, that it'll depend upon.
std::thread update_thread_;
};
}

View File

@ -51,6 +51,55 @@ class Machine {
run_for(Cycles(static_cast<int>(cycles)));
}
/*!
Runs for the machine for at least @c duration seconds, and then until @c condition is true.
@returns The amount of time run for.
*/
Time::Seconds run_until(Time::Seconds minimum_duration, std::function<bool()> condition) {
Time::Seconds total_runtime = minimum_duration;
run_for(minimum_duration);
while(!condition()) {
// Advance in increments of one 500th of a second until the condition
// is true; that's 1/10th of a 50Hz frame, but more like 1/8.33 of a
// 60Hz frame. Though most machines aren't exactly 50Hz or 60Hz, and some
// are arbitrary other refresh rates. So those observations are merely
// for scale.
run_for(0.002);
total_runtime += 0.002;
}
return total_runtime;
}
enum MachineEvent: int {
/// At least one new packet of audio has been delivered to the spaker's delegate.
NewSpeakerSamplesGenerated = 1 << 0
};
/*!
Runs for at least @c duration seconds, and then every one of the @c events has occurred at least once since this
call to @c run_until_event.
@param events A bitmask comprised of @c MachineEvent flags.
@returns The amount of time run for.
*/
Time::Seconds run_until(Time::Seconds minimum_duration, int events) {
// Tie up a wait-for-samples, if requested.
const Outputs::Speaker::Speaker *speaker = nullptr;
int sample_sets = 0;
if(events & MachineEvent::NewSpeakerSamplesGenerated) {
speaker = get_speaker();
if(!speaker) events &= ~MachineEvent::NewSpeakerSamplesGenerated;
sample_sets = speaker->completed_sample_sets();
}
// Run until all requested events are satisfied.
return run_until(minimum_duration, [=]() {
return
(!(events & MachineEvent::NewSpeakerSamplesGenerated) || (sample_sets != speaker->completed_sample_sets()));
});
}
protected:
/// Runs the machine for @c cycles.
virtual void run_for(const Cycles cycles) = 0;

View File

@ -67,7 +67,7 @@
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
buildConfiguration = "Release"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
enableASanStackUseAfterReturn = "YES"

View File

@ -15,7 +15,6 @@ class MachineDocument:
CSMachineDelegate,
CSOpenGLViewDelegate,
CSOpenGLViewResponderDelegate,
CSBestEffortUpdaterDelegate,
CSAudioQueueDelegate,
CSROMReciverViewDelegate
{
@ -98,7 +97,6 @@ class MachineDocument:
bestEffortLock.lock()
if let bestEffortUpdater = bestEffortUpdater {
bestEffortUpdater.delegate = nil
bestEffortUpdater.flush()
self.bestEffortUpdater = nil
}
@ -221,8 +219,8 @@ class MachineDocument:
openGLView.window!.makeKeyAndOrderFront(self)
openGLView.window!.makeFirstResponder(openGLView)
// Start accepting best effort updates.
self.bestEffortUpdater!.delegate = self
// Start forwarding best-effort updates.
self.bestEffortUpdater!.setMachine(machine)
}
}
@ -245,7 +243,7 @@ class MachineDocument:
/// Responds to the CSAudioQueueDelegate dry-queue warning message by requesting a machine update.
final func audioQueueIsRunningDry(_ audioQueue: CSAudioQueue) {
bestEffortLock.lock()
bestEffortUpdater?.update()
bestEffortUpdater?.update(with: .audioNeeded)
bestEffortLock.unlock()
}
@ -271,14 +269,6 @@ class MachineDocument:
}
}
/// Responds to CSBestEffortUpdaterDelegate update message by running the machine.
final func bestEffortUpdater(_ bestEffortUpdater: CSBestEffortUpdater!, runForInterval duration: TimeInterval, didSkipPreviousUpdate: Bool) {
if let machine = self.machine, actionLock.try() {
machine.run(forInterval: duration)
actionLock.unlock()
}
}
// MARK: - Pasteboard Forwarding.
/// Forwards any text currently on the pasteboard into the active machine.

View File

@ -57,7 +57,7 @@ typedef NS_ENUM(NSInteger, CSMachineKeyboardInputMode) {
*/
- (nullable instancetype)initWithAnalyser:(nonnull CSStaticAnalyser *)result missingROMs:(nullable inout NSMutableArray<CSMissingROM *> *)missingROMs NS_DESIGNATED_INITIALIZER;
- (void)runForInterval:(NSTimeInterval)interval;
- (NSTimeInterval)runForInterval:(NSTimeInterval)interval untilEvent:(int)events;
- (float)idealSamplingRateFromRange:(NSRange)range;
- (void)setAudioSamplingRate:(float)samplingRate bufferSize:(NSUInteger)bufferSize;

View File

@ -259,7 +259,7 @@ struct ActivityObserver: public Activity::Observer {
}
}
- (void)runForInterval:(NSTimeInterval)interval {
- (NSTimeInterval)runForInterval:(NSTimeInterval)interval untilEvent:(int)events {
@synchronized(self) {
if(_joystickMachine && _joystickManager) {
[_joystickManager update];
@ -309,7 +309,7 @@ struct ActivityObserver: public Activity::Observer {
}
}
}
_machine->crt_machine()->run_for(interval);
return _machine->crt_machine()->run_until(interval, events);
}
}

View File

@ -9,20 +9,19 @@
#import <Foundation/Foundation.h>
#import <CoreVideo/CoreVideo.h>
@class CSBestEffortUpdater;
@protocol CSBestEffortUpdaterDelegate <NSObject>
- (void)bestEffortUpdater:(CSBestEffortUpdater *)bestEffortUpdater runForInterval:(NSTimeInterval)interval didSkipPreviousUpdate:(BOOL)didSkipPreviousUpdate;
@end
#import "CSMachine.h"
// The following is coupled to the definitions in CRTMachine.hpp, but exposed here
// for the benefit of Swift.
typedef NS_ENUM(NSInteger, CSBestEffortUpdaterEvent) {
CSBestEffortUpdaterEventAudioNeeded = 1 << 0
};
@interface CSBestEffortUpdater : NSObject
@property (nonatomic, weak) id<CSBestEffortUpdaterDelegate> delegate;
- (void)update;
- (void)updateWithEvent:(CSBestEffortUpdaterEvent)event;
- (void)flush;
- (void)setMachine:(CSMachine *)machine;
@end

View File

@ -11,60 +11,41 @@
#include "BestEffortUpdater.hpp"
struct UpdaterDelegate: public Concurrency::BestEffortUpdater::Delegate {
__weak id<CSBestEffortUpdaterDelegate> delegate;
NSLock *delegateLock;
__weak CSMachine *machine;
void update(Concurrency::BestEffortUpdater *updater, Time::Seconds cycles, bool did_skip_previous_update) {
[delegateLock lock];
__weak id<CSBestEffortUpdaterDelegate> delegateCopy = delegate;
[delegateLock unlock];
[delegateCopy bestEffortUpdater:nil runForInterval:(NSTimeInterval)cycles didSkipPreviousUpdate:did_skip_previous_update];
Time::Seconds update(Concurrency::BestEffortUpdater *updater, Time::Seconds seconds, bool did_skip_previous_update, int flags) final {
return [machine runForInterval:seconds untilEvent:flags];
}
};
@implementation CSBestEffortUpdater {
Concurrency::BestEffortUpdater _updater;
UpdaterDelegate _updaterDelegate;
NSLock *_delegateLock;
}
- (instancetype)init {
self = [super init];
if(self) {
_delegateLock = [[NSLock alloc] init];
_updaterDelegate.delegateLock = _delegateLock;
_updater.set_delegate(&_updaterDelegate);
}
return self;
}
//- (void)dealloc {
// _updater.flush();
//}
- (void)update {
_updater.update();
}
- (void)updateWithEvent:(CSBestEffortUpdaterEvent)event {
_updater.update((int)event);
}
- (void)flush {
_updater.flush();
}
- (void)setDelegate:(id<CSBestEffortUpdaterDelegate>)delegate {
[_delegateLock lock];
_updaterDelegate.delegate = delegate;
[_delegateLock unlock];
}
- (id<CSBestEffortUpdaterDelegate>)delegate {
id<CSBestEffortUpdaterDelegate> delegate;
[_delegateLock lock];
delegate = _updaterDelegate.delegate;
[_delegateLock unlock];
return delegate;
- (void)setMachine:(CSMachine *)machine {
_updater.flush();
_updaterDelegate.machine = machine;
}
@end

View File

@ -34,8 +34,8 @@
namespace {
struct BestEffortUpdaterDelegate: public Concurrency::BestEffortUpdater::Delegate {
void update(Concurrency::BestEffortUpdater *updater, Time::Seconds duration, bool did_skip_previous_update) override {
machine->crt_machine()->run_for(duration);
Time::Seconds update(Concurrency::BestEffortUpdater *updater, Time::Seconds duration, bool did_skip_previous_update, int flags) override {
return machine->crt_machine()->run_until(duration, flags);
}
Machine::DynamicMachine *machine;

View File

@ -134,7 +134,7 @@ template <typename T> class LowpassSpeaker: public Speaker {
// announce to delegate if full
if(output_buffer_pointer_ == output_buffer_.size()) {
output_buffer_pointer_ = 0;
delegate_->speaker_did_complete_samples(this, output_buffer_);
did_complete_samples(this, output_buffer_);
}
cycles_remaining -= cycles_to_read;
@ -159,7 +159,7 @@ template <typename T> class LowpassSpeaker: public Speaker {
// Announce to delegate if full.
if(output_buffer_pointer_ == output_buffer_.size()) {
output_buffer_pointer_ = 0;
delegate_->speaker_did_complete_samples(this, output_buffer_);
did_complete_samples(this, output_buffer_);
}
// If the next loop around is going to reuse some of the samples just collected, use a memmove to

View File

@ -26,6 +26,8 @@ class Speaker {
virtual float get_ideal_clock_rate_in_range(float minimum, float maximum) = 0;
virtual void set_output_rate(float cycles_per_second, int buffer_size) = 0;
int completed_sample_sets() const { return completed_sample_sets_; }
struct Delegate {
virtual void speaker_did_complete_samples(Speaker *speaker, const std::vector<int16_t> &buffer) = 0;
virtual void speaker_did_change_input_clock(Speaker *speaker) {}
@ -35,7 +37,12 @@ class Speaker {
}
protected:
void did_complete_samples(Speaker *speaker, const std::vector<int16_t> &buffer) {
++completed_sample_sets_;
delegate_->speaker_did_complete_samples(this, buffer);
}
Delegate *delegate_ = nullptr;
int completed_sample_sets_ = 0;
};
}