mirror of
				https://github.com/TomHarte/CLK.git
				synced 2025-10-26 17:17:58 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			194 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			194 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| //
 | |
| //  AsyncTaskQueue.hpp
 | |
| //  Clock Signal
 | |
| //
 | |
| //  Created by Thomas Harte on 07/10/2016.
 | |
| //  Copyright 2016 Thomas Harte. All rights reserved.
 | |
| //
 | |
| 
 | |
| #ifndef AsyncTaskQueue_hpp
 | |
| #define AsyncTaskQueue_hpp
 | |
| 
 | |
| #include <atomic>
 | |
| #include <condition_variable>
 | |
| #include <functional>
 | |
| #include <thread>
 | |
| #include <vector>
 | |
| 
 | |
| #include "../ClockReceiver/TimeTypes.hpp"
 | |
| 
 | |
| namespace Concurrency {
 | |
| 
 | |
| /// An implementation detail; provides the time-centric part of a TaskQueue with a real Performer.
 | |
| template <typename Performer> struct TaskQueueStorage {
 | |
| 	template <typename... Args> TaskQueueStorage(Args&&... args) :
 | |
| 		performer(std::forward<Args>(args)...),
 | |
| 		last_fired_(Time::nanos_now()) {}
 | |
| 
 | |
| 	Performer performer;
 | |
| 
 | |
| 	protected:
 | |
| 		void update() {
 | |
| 			auto time_now = Time::nanos_now();
 | |
| 			performer.perform(time_now - last_fired_);
 | |
| 			last_fired_ = time_now;
 | |
| 		}
 | |
| 
 | |
| 	private:
 | |
| 		Time::Nanos last_fired_;
 | |
| };
 | |
| 
 | |
| /// An implementation detail; provides a no-op implementation of time advances for TaskQueues without a Performer.
 | |
| template <> struct TaskQueueStorage<void> {
 | |
| 	TaskQueueStorage() {}
 | |
| 
 | |
| 	protected:
 | |
| 		void update() {}
 | |
| };
 | |
| 
 | |
| /*!
 | |
| 	A task queue allows a caller to enqueue @c void(void) functions. Those functions are guaranteed
 | |
| 	to be performed serially and asynchronously from the caller.
 | |
| 
 | |
| 	If @c perform_automatically is true, functions will be performed as soon as is possible,
 | |
| 	at the cost of thread synchronisation.
 | |
| 
 | |
| 	If @c perform_automatically is false, functions will be queued up but not dispatched
 | |
| 	until a call to perform().
 | |
| 
 | |
| 	If a @c Performer type is supplied then a public member, @c performer will be constructed
 | |
| 	with the arguments supplied to TaskQueue's constructor. That instance will receive calls of the
 | |
| 	form @c .perform(nanos) before every batch of new actions, indicating how much time has
 | |
| 	passed since the previous @c perform.
 | |
| 
 | |
| 	@note Even if @c perform_automatically is true, actions may be batched, when a long-running
 | |
| 	action occupies the asynchronous thread for long enough. So it is not true that @c perform will be
 | |
| 	called once per action.
 | |
| */
 | |
| template <bool perform_automatically, bool start_immediately = true, typename Performer = void> class AsyncTaskQueue: public TaskQueueStorage<Performer> {
 | |
| 	public:
 | |
| 		template <typename... Args> AsyncTaskQueue(Args&&... args) :
 | |
| 			TaskQueueStorage<Performer>(std::forward<Args>(args)...) {
 | |
| 			if constexpr (start_immediately) {
 | |
| 				start();
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		/// Enqueus @c post_action to be performed asynchronously at some point
 | |
| 		/// in the future. If @c perform_automatically is @c true then the action
 | |
| 		/// will be performed as soon as possible. Otherwise it will sit unsheculed until
 | |
| 		/// a call to @c perform().
 | |
| 		///
 | |
| 		/// Actions may be elided.
 | |
| 		///
 | |
| 		/// If this TaskQueue has a @c Performer then the action will be performed
 | |
| 		/// on the same thread as the performer, after the performer has been updated
 | |
| 		/// to 'now'.
 | |
| 		void enqueue(const std::function<void(void)> &post_action) {
 | |
| 			std::lock_guard guard(condition_mutex_);
 | |
| 			actions_.push_back(post_action);
 | |
| 
 | |
| 			if constexpr (perform_automatically) {
 | |
| 				condition_.notify_all();
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		/// Causes any enqueued actions that are not yet scheduled to be scheduled.
 | |
| 		void perform() {
 | |
| 			if(actions_.empty()) {
 | |
| 				return;
 | |
| 			}
 | |
| 			condition_.notify_all();
 | |
| 		}
 | |
| 
 | |
| 		/// Permanently stops this task queue, blocking until that has happened.
 | |
| 		/// All pending actions will be performed first.
 | |
| 		///
 | |
| 		/// The queue cannot be restarted; this is a destructive action.
 | |
| 		void stop() {
 | |
| 			if(thread_.joinable()) {
 | |
| 				should_quit_ = true;
 | |
| 				enqueue([] {});
 | |
| 				if constexpr (!perform_automatically) {
 | |
| 					perform();
 | |
| 				}
 | |
| 				thread_.join();
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		/// Starts the queue if it has never been started before.
 | |
| 		///
 | |
| 		/// This is not guaranteed safely to restart a stopped queue.
 | |
| 		void start() {
 | |
| 			thread_ = std::move(std::thread{
 | |
| 				[this] {
 | |
| 					ActionVector actions;
 | |
| 
 | |
| 					// Continue until told to quit.
 | |
| 					while(!should_quit_) {
 | |
| 						// Wait for new actions to be signalled, and grab them.
 | |
| 						std::unique_lock lock(condition_mutex_);
 | |
| 						while(actions_.empty()) {
 | |
| 							condition_.wait(lock);
 | |
| 						}
 | |
| 						std::swap(actions, actions_);
 | |
| 						lock.unlock();
 | |
| 
 | |
| 						// Update to now (which is possibly a no-op).
 | |
| 						TaskQueueStorage<Performer>::update();
 | |
| 
 | |
| 						// Perform the actions and destroy them.
 | |
| 						for(const auto &action: actions) {
 | |
| 							action();
 | |
| 						}
 | |
| 						actions.clear();
 | |
| 					}
 | |
| 				}
 | |
| 			});
 | |
| 		}
 | |
| 
 | |
| 		/// Schedules any remaining unscheduled work, then blocks synchronously
 | |
| 		/// until all scheduled work has been performed.
 | |
| 		void flush() {
 | |
| 			std::mutex flush_mutex;
 | |
| 			std::condition_variable flush_condition;
 | |
| 			bool has_run = false;
 | |
| 			std::unique_lock lock(flush_mutex);
 | |
| 
 | |
| 			enqueue([&flush_mutex, &flush_condition, &has_run] () {
 | |
| 				std::unique_lock inner_lock(flush_mutex);
 | |
| 				has_run = true;
 | |
| 				flush_condition.notify_all();
 | |
| 			});
 | |
| 
 | |
| 			if constexpr (!perform_automatically) {
 | |
| 				perform();
 | |
| 			}
 | |
| 
 | |
| 			flush_condition.wait(lock, [&has_run] { return has_run; });
 | |
| 		}
 | |
| 
 | |
| 		~AsyncTaskQueue() {
 | |
| 			stop();
 | |
| 		}
 | |
| 
 | |
| 	private:
 | |
| 		// The list of actions waiting be performed. These will be elided,
 | |
| 		// increasing their latency, if the emulation thread falls behind.
 | |
| 		using ActionVector = std::vector<std::function<void(void)>>;
 | |
| 		ActionVector actions_;
 | |
| 
 | |
| 		// Necessary synchronisation parts.
 | |
| 		std::atomic<bool> should_quit_ = false;
 | |
| 		std::mutex condition_mutex_;
 | |
| 		std::condition_variable condition_;
 | |
| 
 | |
| 		// Ensure the thread isn't constructed until after the mutex
 | |
| 		// and condition variable.
 | |
| 		std::thread thread_;
 | |
| };
 | |
| 
 | |
| }
 | |
| 
 | |
| #endif /* AsyncTaskQueue_hpp */
 |