From e53455a936cf422ec3e72d29fbb9ded085932790 Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Fri, 7 Oct 2016 16:56:34 -0400 Subject: [PATCH 1/5] =?UTF-8?q?Not=20having=20read=20the=20C++=20synchroni?= =?UTF-8?q?sation=20primitives=20before,=20this=20async=20task=20queue=20i?= =?UTF-8?q?s=20probably=20incorrect.=20But=20nevertheless,=20let's=20have?= =?UTF-8?q?=20a=20quick=20go=20at=20employing=20it=20=E2=80=94=20in=20a=20?= =?UTF-8?q?hideously=20thread=20unsafe=20fashion=20=E2=80=94=20for=20audio?= =?UTF-8?q?=20generation.=20What=20can=20possibly=20go=20wrong=3F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Concurrency/AsyncTaskQueue.cpp | 62 +++++++++ Concurrency/AsyncTaskQueue.hpp | 39 ++++++ .../Clock Signal.xcodeproj/project.pbxproj | 14 ++ Outputs/Speaker.hpp | 120 +++++++++--------- 4 files changed, 177 insertions(+), 58 deletions(-) create mode 100644 Concurrency/AsyncTaskQueue.cpp create mode 100644 Concurrency/AsyncTaskQueue.hpp diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp new file mode 100644 index 000000000..7f5e5a68d --- /dev/null +++ b/Concurrency/AsyncTaskQueue.cpp @@ -0,0 +1,62 @@ +// +// AsyncTaskQueue.cpp +// Clock Signal +// +// Created by Thomas Harte on 07/10/2016. +// Copyright © 2016 Thomas Harte. All rights reserved. +// + +#include "AsyncTaskQueue.hpp" + +using namespace Concurrency; + +AsyncTaskQueue::AsyncTaskQueue() : should_destruct_(false) +{ + thread_.reset(new std::thread([this]() { + while(!should_destruct_) + { + std::function next_function; + + queue_mutex_.lock(); + if(!pending_tasks_.empty()) + { + next_function = pending_tasks_.front(); + pending_tasks_.pop_front(); + } + queue_mutex_.unlock(); + + if(next_function) + { + next_function(); + } + else + { + std::unique_lock lock(queue_mutex_); + processing_condition_.wait(lock); + lock.unlock(); + } + } + })); +} + +AsyncTaskQueue::~AsyncTaskQueue() +{ + should_destruct_ = true; + enqueue([](){}); +} + +void AsyncTaskQueue::enqueue(std::function function) +{ + queue_mutex_.lock(); + pending_tasks_.push_back(function); + queue_mutex_.unlock(); + + std::lock_guard lock(queue_mutex_); + processing_condition_.notify_all(); +} + +void AsyncTaskQueue::synchronise() +{ + // TODO +// std::mutex +} diff --git a/Concurrency/AsyncTaskQueue.hpp b/Concurrency/AsyncTaskQueue.hpp new file mode 100644 index 000000000..00653d420 --- /dev/null +++ b/Concurrency/AsyncTaskQueue.hpp @@ -0,0 +1,39 @@ +// +// AsyncTaskQueue.hpp +// Clock Signal +// +// Created by Thomas Harte on 07/10/2016. +// Copyright © 2016 Thomas Harte. All rights reserved. +// + +#ifndef AsyncTaskQueue_hpp +#define AsyncTaskQueue_hpp + +#include +#include +#include +#include + +namespace Concurrency { + +class AsyncTaskQueue { + + public: + AsyncTaskQueue(); + ~AsyncTaskQueue(); + + void enqueue(std::function function); + void synchronise(); + + private: + std::unique_ptr thread_; + + std::mutex queue_mutex_; + std::list> pending_tasks_; + std::condition_variable processing_condition_; + bool should_destruct_; +}; + +} + +#endif /* Concurrency_hpp */ diff --git a/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj b/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj index a7ea419a9..5f5d8f631 100644 --- a/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj +++ b/OSBindings/Mac/Clock Signal.xcodeproj/project.pbxproj @@ -30,6 +30,7 @@ 4B30512D1D989E2200B4FED8 /* Drive.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B30512B1D989E2200B4FED8 /* Drive.cpp */; }; 4B3051301D98ACC600B4FED8 /* Plus3.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B30512E1D98ACC600B4FED8 /* Plus3.cpp */; }; 4B37EE821D7345A6006A09A4 /* BinaryDump.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B37EE801D7345A6006A09A4 /* BinaryDump.cpp */; }; + 4B3940E71DA83C8300427841 /* AsyncTaskQueue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */; }; 4B3BA0C31D318AEC005DD7A7 /* C1540Tests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C21D318AEB005DD7A7 /* C1540Tests.swift */; }; 4B3BA0CE1D318B44005DD7A7 /* C1540Bridge.mm in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C61D318B44005DD7A7 /* C1540Bridge.mm */; }; 4B3BA0CF1D318B44005DD7A7 /* MOS6522Bridge.mm in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C91D318B44005DD7A7 /* MOS6522Bridge.mm */; }; @@ -438,6 +439,8 @@ 4B30512F1D98ACC600B4FED8 /* Plus3.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = Plus3.hpp; path = Electron/Plus3.hpp; sourceTree = ""; }; 4B37EE801D7345A6006A09A4 /* BinaryDump.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = BinaryDump.cpp; sourceTree = ""; }; 4B37EE811D7345A6006A09A4 /* BinaryDump.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = BinaryDump.hpp; sourceTree = ""; }; + 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = AsyncTaskQueue.cpp; path = ../../Concurrency/AsyncTaskQueue.cpp; sourceTree = ""; }; + 4B3940E61DA83C8300427841 /* AsyncTaskQueue.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = AsyncTaskQueue.hpp; path = ../../Concurrency/AsyncTaskQueue.hpp; sourceTree = ""; }; 4B3BA0C21D318AEB005DD7A7 /* C1540Tests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = C1540Tests.swift; sourceTree = ""; }; 4B3BA0C51D318B44005DD7A7 /* C1540Bridge.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = C1540Bridge.h; sourceTree = ""; }; 4B3BA0C61D318B44005DD7A7 /* C1540Bridge.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = C1540Bridge.mm; sourceTree = ""; }; @@ -1003,6 +1006,15 @@ name = Outputs; sourceTree = ""; }; + 4B3940E81DA83C8700427841 /* Concurrency */ = { + isa = PBXGroup; + children = ( + 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */, + 4B3940E61DA83C8300427841 /* AsyncTaskQueue.hpp */, + ); + name = Concurrency; + sourceTree = ""; + }; 4B3BA0C41D318B44005DD7A7 /* Bridges */ = { isa = PBXGroup; children = ( @@ -1469,6 +1481,7 @@ 4BB73E951B587A5100552FC2 = { isa = PBXGroup; children = ( + 4B3940E81DA83C8700427841 /* Concurrency */, 4BC76E6A1C98F43700E6EF73 /* Accelerate.framework */, 4BB73EA01B587A5100552FC2 /* Clock Signal */, 4BB73EB51B587A5100552FC2 /* Clock SignalTests */, @@ -2185,6 +2198,7 @@ 4B1E85751D170228001EF87D /* Typer.cpp in Sources */, 4BF829631D8F536B001BAE39 /* SSD.cpp in Sources */, 4B2E2D9D1C3A070400138695 /* Electron.cpp in Sources */, + 4B3940E71DA83C8300427841 /* AsyncTaskQueue.cpp in Sources */, 4BAB62B81D3302CA00DF5BA0 /* PCMTrack.cpp in Sources */, 4B69FB3D1C4D908A00B5F0AA /* Tape.cpp in Sources */, 4B8FE2291DA1EDDF0090D3CE /* ElectronOptionsPanel.swift in Sources */, diff --git a/Outputs/Speaker.hpp b/Outputs/Speaker.hpp index 413a37bf4..1d3b0a560 100644 --- a/Outputs/Speaker.hpp +++ b/Outputs/Speaker.hpp @@ -14,6 +14,7 @@ #include #include "../SignalProcessing/Stepper.hpp" #include "../SignalProcessing/FIRFilter.hpp" +#include "../Concurrency/AsyncTaskQueue.hpp" namespace Outputs { @@ -25,7 +26,7 @@ namespace Outputs { Intended to be a parent class, allowing descendants to pick the strategy by which input samples are mapped to output samples. */ -class Speaker { +class Speaker: public Concurrency::AsyncTaskQueue { public: class Delegate { public: @@ -124,49 +125,20 @@ template class Filter: public Speaker { public: void run_for_cycles(unsigned int input_cycles) { - if(_coefficients_are_dirty) update_filter_coefficients(); + enqueue([=]() { + unsigned int cycles_remaining = input_cycles; + if(_coefficients_are_dirty) update_filter_coefficients(); - // if input and output rates exactly match, just accumulate results and pass on - if(_input_cycles_per_second == _output_cycles_per_second && _high_frequency_cut_off < 0.0) - { - while(input_cycles) + // if input and output rates exactly match, just accumulate results and pass on + if(_input_cycles_per_second == _output_cycles_per_second && _high_frequency_cut_off < 0.0) { - unsigned int cycles_to_read = (unsigned int)(_buffer_size - _buffer_in_progress_pointer); - if(cycles_to_read > input_cycles) cycles_to_read = input_cycles; - - static_cast(this)->get_samples(cycles_to_read, &_buffer_in_progress.get()[_buffer_in_progress_pointer]); - _buffer_in_progress_pointer += cycles_to_read; - - // announce to delegate if full - if(_buffer_in_progress_pointer == _buffer_size) + while(cycles_remaining) { - _buffer_in_progress_pointer = 0; - if(_delegate) - { - _delegate->speaker_did_complete_samples(this, _buffer_in_progress.get(), _buffer_size); - } - } + unsigned int cycles_to_read = (unsigned int)(_buffer_size - _buffer_in_progress_pointer); + if(cycles_to_read > cycles_remaining) cycles_to_read = cycles_remaining; - input_cycles -= cycles_to_read; - } - - return; - } - - // if the output rate is less than the input rate, use the filter - if(_input_cycles_per_second > _output_cycles_per_second) - { - while(input_cycles) - { - unsigned int cycles_to_read = (unsigned int)std::min((int)input_cycles, _number_of_taps - _input_buffer_depth); - static_cast(this)->get_samples(cycles_to_read, &_input_buffer.get()[_input_buffer_depth]); - input_cycles -= cycles_to_read; - _input_buffer_depth += cycles_to_read; - - if(_input_buffer_depth == _number_of_taps) - { - _buffer_in_progress.get()[_buffer_in_progress_pointer] = _filter->apply(_input_buffer.get()); - _buffer_in_progress_pointer++; + static_cast(this)->get_samples(cycles_to_read, &_buffer_in_progress.get()[_buffer_in_progress_pointer]); + _buffer_in_progress_pointer += cycles_to_read; // announce to delegate if full if(_buffer_in_progress_pointer == _buffer_size) @@ -178,29 +150,61 @@ template class Filter: public Speaker { } } - // If the next loop around is going to reuse some of the samples just collected, use a memmove to - // preserve them in the correct locations (TODO: use a longer buffer to fix that) and don't skip - // anything. Otherwise skip as required to get to the next sample batch and don't expect to reuse. - uint64_t steps = _stepper->step(); - if(steps < _number_of_taps) - { - int16_t *input_buffer = _input_buffer.get(); - memmove(input_buffer, &input_buffer[steps], sizeof(int16_t) * ((size_t)_number_of_taps - (size_t)steps)); - _input_buffer_depth -= steps; - } - else - { - if(steps > _number_of_taps) - static_cast(this)->skip_samples((unsigned int)steps - (unsigned int)_number_of_taps); - _input_buffer_depth = 0; - } + cycles_remaining -= cycles_to_read; } + + return; } - return; - } + // if the output rate is less than the input rate, use the filter + if(_input_cycles_per_second > _output_cycles_per_second) + { + while(cycles_remaining) + { + unsigned int cycles_to_read = (unsigned int)std::min((int)cycles_remaining, _number_of_taps - _input_buffer_depth); + static_cast(this)->get_samples(cycles_to_read, &_input_buffer.get()[_input_buffer_depth]); + cycles_remaining -= cycles_to_read; + _input_buffer_depth += cycles_to_read; + + if(_input_buffer_depth == _number_of_taps) + { + _buffer_in_progress.get()[_buffer_in_progress_pointer] = _filter->apply(_input_buffer.get()); + _buffer_in_progress_pointer++; + + // announce to delegate if full + if(_buffer_in_progress_pointer == _buffer_size) + { + _buffer_in_progress_pointer = 0; + if(_delegate) + { + _delegate->speaker_did_complete_samples(this, _buffer_in_progress.get(), _buffer_size); + } + } + + // If the next loop around is going to reuse some of the samples just collected, use a memmove to + // preserve them in the correct locations (TODO: use a longer buffer to fix that) and don't skip + // anything. Otherwise skip as required to get to the next sample batch and don't expect to reuse. + uint64_t steps = _stepper->step(); + if(steps < _number_of_taps) + { + int16_t *input_buffer = _input_buffer.get(); + memmove(input_buffer, &input_buffer[steps], sizeof(int16_t) * ((size_t)_number_of_taps - (size_t)steps)); + _input_buffer_depth -= steps; + } + else + { + if(steps > _number_of_taps) + static_cast(this)->skip_samples((unsigned int)steps - (unsigned int)_number_of_taps); + _input_buffer_depth = 0; + } + } + } + + return; + } // TODO: input rate is less than output rate + }); } private: From de658b70d760da7a949d59277e79e3c42152e8d1 Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Fri, 7 Oct 2016 17:02:36 -0400 Subject: [PATCH 2/5] That's thread safety, subject to the async task queue being made to work... --- Machines/Electron/Electron.cpp | 13 +++++++++---- Machines/Electron/Electron.hpp | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/Machines/Electron/Electron.cpp b/Machines/Electron/Electron.cpp index d96f4d0da..69917f4f6 100644 --- a/Machines/Electron/Electron.cpp +++ b/Machines/Electron/Electron.cpp @@ -226,10 +226,11 @@ unsigned int Machine::perform_bus_operation(CPU6502::BusOperation operation, uin // update speaker mode bool new_speaker_is_enabled = (*value & 6) == 2; - if(new_speaker_is_enabled != _speaker->get_is_enabled()) + if(new_speaker_is_enabled != speaker_is_enabled_) { update_audio(); _speaker->set_is_enabled(new_speaker_is_enabled); + speaker_is_enabled_ = new_speaker_is_enabled; } _tape.set_is_enabled((*value & 6) != 6); @@ -958,13 +959,17 @@ void Speaker::skip_samples(unsigned int number_of_samples) void Speaker::set_divider(uint8_t divider) { - _divider = divider * 32 / clock_rate_audio_divider; + enqueue([=]() { + _divider = divider * 32 / clock_rate_audio_divider; + }); } void Speaker::set_is_enabled(bool is_enabled) { - _is_enabled = is_enabled; - _counter = 0; + enqueue([=]() { + _is_enabled = is_enabled; + _counter = 0; + }); } /* diff --git a/Machines/Electron/Electron.hpp b/Machines/Electron/Electron.hpp index 7b88f775a..e2659146e 100644 --- a/Machines/Electron/Electron.hpp +++ b/Machines/Electron/Electron.hpp @@ -121,7 +121,6 @@ class Speaker: public ::Outputs::Filter { void set_divider(uint8_t divider); void set_is_enabled(bool is_enabled); - inline bool get_is_enabled() { return _is_enabled; } void get_samples(unsigned int number_of_samples, int16_t *target); void skip_samples(unsigned int number_of_samples); @@ -238,6 +237,7 @@ class Machine: // Outputs std::shared_ptr _crt; std::shared_ptr _speaker; + bool speaker_is_enabled_; }; } From 002e923cf1306c7773942a0bf10fb5172ff22432 Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Fri, 7 Oct 2016 17:08:29 -0400 Subject: [PATCH 3/5] Added shutdown logic. --- Concurrency/AsyncTaskQueue.cpp | 1 + Concurrency/AsyncTaskQueue.hpp | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Concurrency/AsyncTaskQueue.cpp b/Concurrency/AsyncTaskQueue.cpp index 7f5e5a68d..6f9f40027 100644 --- a/Concurrency/AsyncTaskQueue.cpp +++ b/Concurrency/AsyncTaskQueue.cpp @@ -43,6 +43,7 @@ AsyncTaskQueue::~AsyncTaskQueue() { should_destruct_ = true; enqueue([](){}); + thread_->join(); } void AsyncTaskQueue::enqueue(std::function function) diff --git a/Concurrency/AsyncTaskQueue.hpp b/Concurrency/AsyncTaskQueue.hpp index 00653d420..5cb14125f 100644 --- a/Concurrency/AsyncTaskQueue.hpp +++ b/Concurrency/AsyncTaskQueue.hpp @@ -31,7 +31,7 @@ class AsyncTaskQueue { std::mutex queue_mutex_; std::list> pending_tasks_; std::condition_variable processing_condition_; - bool should_destruct_; + std::atomic_bool should_destruct_; }; } From 922dd6a58612c26a4524d596d1e6b79fe47e5e8c Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Fri, 7 Oct 2016 17:10:00 -0400 Subject: [PATCH 4/5] Hit up the other two kinds of audio generator. --- Components/6560/6560.cpp | 8 ++++++-- Machines/Atari2600/Atari2600.cpp | 14 ++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/Components/6560/6560.cpp b/Components/6560/6560.cpp index 7bb013ebb..8882decd3 100644 --- a/Components/6560/6560.cpp +++ b/Components/6560/6560.cpp @@ -19,12 +19,16 @@ Speaker::Speaker() : void Speaker::set_volume(uint8_t volume) { - _volume = volume; + enqueue([=]() { + _volume = volume; + }); } void Speaker::set_control(int channel, uint8_t value) { - _control_registers[channel] = value; + enqueue([=]() { + _control_registers[channel] = value; + }); } // Source: VICE. Not original. diff --git a/Machines/Atari2600/Atari2600.cpp b/Machines/Atari2600/Atari2600.cpp index 8970ad2d1..72b4c2bca 100644 --- a/Machines/Atari2600/Atari2600.cpp +++ b/Machines/Atari2600/Atari2600.cpp @@ -804,18 +804,24 @@ Atari2600::Speaker::~Speaker() void Atari2600::Speaker::set_volume(int channel, uint8_t volume) { - _volume[channel] = volume & 0xf; + enqueue([=]() { + _volume[channel] = volume & 0xf; + }); } void Atari2600::Speaker::set_divider(int channel, uint8_t divider) { - _divider[channel] = divider & 0x1f; - _divider_counter[channel] = 0; + enqueue([=]() { + _divider[channel] = divider & 0x1f; + _divider_counter[channel] = 0; + }); } void Atari2600::Speaker::set_control(int channel, uint8_t control) { - _control[channel] = control & 0xf; + enqueue([=]() { + _control[channel] = control & 0xf; + }); } #define advance_poly4(c) _poly4_counter[channel] = (_poly4_counter[channel] >> 1) | (((_poly4_counter[channel] << 3) ^ (_poly4_counter[channel] << 2))&0x008) From 4829b896f790b3d9439cbcf19b5580d4a8e52112 Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Fri, 7 Oct 2016 17:18:46 -0400 Subject: [PATCH 5/5] Documented the async task queue. --- Concurrency/AsyncTaskQueue.hpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Concurrency/AsyncTaskQueue.hpp b/Concurrency/AsyncTaskQueue.hpp index 5cb14125f..6d06f2264 100644 --- a/Concurrency/AsyncTaskQueue.hpp +++ b/Concurrency/AsyncTaskQueue.hpp @@ -16,13 +16,29 @@ namespace Concurrency { +/*! + An async task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed + to be performed serially and asynchronously from the caller. A caller may also request to synchronise, + causing it to block until all previously-enqueued functions are complete. +*/ class AsyncTaskQueue { public: AsyncTaskQueue(); ~AsyncTaskQueue(); + /*! + Adds @c function to the queue. + + @discussion Functions will be performed serially and asynchronously. This method is safe to + call from multiple threads. + @parameter function The function to enqueue. + */ void enqueue(std::function function); + + /*! + Blocks the caller until all previously-enqueud functions have completed. + */ void synchronise(); private: