1
0
mirror of https://github.com/TomHarte/CLK.git synced 2024-07-10 12:29:01 +00:00

Merge pull request #55 from TomHarte/AsyncQueue

Introduces an asynchronous task queue
This commit is contained in:
Thomas Harte 2016-10-07 17:20:35 -04:00 committed by GitHub
commit cffdbc7c00
8 changed files with 220 additions and 69 deletions

View File

@ -19,12 +19,16 @@ Speaker::Speaker() :
void Speaker::set_volume(uint8_t volume)
{
_volume = volume;
enqueue([=]() {
_volume = volume;
});
}
void Speaker::set_control(int channel, uint8_t value)
{
_control_registers[channel] = value;
enqueue([=]() {
_control_registers[channel] = value;
});
}
// Source: VICE. Not original.

View File

@ -0,0 +1,63 @@
//
// AsyncTaskQueue.cpp
// Clock Signal
//
// Created by Thomas Harte on 07/10/2016.
// Copyright © 2016 Thomas Harte. All rights reserved.
//
#include "AsyncTaskQueue.hpp"
using namespace Concurrency;
AsyncTaskQueue::AsyncTaskQueue() : should_destruct_(false)
{
thread_.reset(new std::thread([this]() {
while(!should_destruct_)
{
std::function<void(void)> next_function;
queue_mutex_.lock();
if(!pending_tasks_.empty())
{
next_function = pending_tasks_.front();
pending_tasks_.pop_front();
}
queue_mutex_.unlock();
if(next_function)
{
next_function();
}
else
{
std::unique_lock<std::mutex> lock(queue_mutex_);
processing_condition_.wait(lock);
lock.unlock();
}
}
}));
}
AsyncTaskQueue::~AsyncTaskQueue()
{
should_destruct_ = true;
enqueue([](){});
thread_->join();
}
void AsyncTaskQueue::enqueue(std::function<void(void)> function)
{
queue_mutex_.lock();
pending_tasks_.push_back(function);
queue_mutex_.unlock();
std::lock_guard<std::mutex> lock(queue_mutex_);
processing_condition_.notify_all();
}
void AsyncTaskQueue::synchronise()
{
// TODO
// std::mutex
}

View File

@ -0,0 +1,55 @@
//
// AsyncTaskQueue.hpp
// Clock Signal
//
// Created by Thomas Harte on 07/10/2016.
// Copyright © 2016 Thomas Harte. All rights reserved.
//
#ifndef AsyncTaskQueue_hpp
#define AsyncTaskQueue_hpp
#include <memory>
#include <thread>
#include <list>
#include <condition_variable>
namespace Concurrency {
/*!
An async task queue allows a caller to enqueue void(void) functions. Those functions are guaranteed
to be performed serially and asynchronously from the caller. A caller may also request to synchronise,
causing it to block until all previously-enqueued functions are complete.
*/
class AsyncTaskQueue {
public:
AsyncTaskQueue();
~AsyncTaskQueue();
/*!
Adds @c function to the queue.
@discussion Functions will be performed serially and asynchronously. This method is safe to
call from multiple threads.
@parameter function The function to enqueue.
*/
void enqueue(std::function<void(void)> function);
/*!
Blocks the caller until all previously-enqueud functions have completed.
*/
void synchronise();
private:
std::unique_ptr<std::thread> thread_;
std::mutex queue_mutex_;
std::list<std::function<void(void)>> pending_tasks_;
std::condition_variable processing_condition_;
std::atomic_bool should_destruct_;
};
}
#endif /* Concurrency_hpp */

View File

@ -804,18 +804,24 @@ Atari2600::Speaker::~Speaker()
void Atari2600::Speaker::set_volume(int channel, uint8_t volume)
{
_volume[channel] = volume & 0xf;
enqueue([=]() {
_volume[channel] = volume & 0xf;
});
}
void Atari2600::Speaker::set_divider(int channel, uint8_t divider)
{
_divider[channel] = divider & 0x1f;
_divider_counter[channel] = 0;
enqueue([=]() {
_divider[channel] = divider & 0x1f;
_divider_counter[channel] = 0;
});
}
void Atari2600::Speaker::set_control(int channel, uint8_t control)
{
_control[channel] = control & 0xf;
enqueue([=]() {
_control[channel] = control & 0xf;
});
}
#define advance_poly4(c) _poly4_counter[channel] = (_poly4_counter[channel] >> 1) | (((_poly4_counter[channel] << 3) ^ (_poly4_counter[channel] << 2))&0x008)

View File

@ -226,10 +226,11 @@ unsigned int Machine::perform_bus_operation(CPU6502::BusOperation operation, uin
// update speaker mode
bool new_speaker_is_enabled = (*value & 6) == 2;
if(new_speaker_is_enabled != _speaker->get_is_enabled())
if(new_speaker_is_enabled != speaker_is_enabled_)
{
update_audio();
_speaker->set_is_enabled(new_speaker_is_enabled);
speaker_is_enabled_ = new_speaker_is_enabled;
}
_tape.set_is_enabled((*value & 6) != 6);
@ -958,13 +959,17 @@ void Speaker::skip_samples(unsigned int number_of_samples)
void Speaker::set_divider(uint8_t divider)
{
_divider = divider * 32 / clock_rate_audio_divider;
enqueue([=]() {
_divider = divider * 32 / clock_rate_audio_divider;
});
}
void Speaker::set_is_enabled(bool is_enabled)
{
_is_enabled = is_enabled;
_counter = 0;
enqueue([=]() {
_is_enabled = is_enabled;
_counter = 0;
});
}
/*

View File

@ -121,7 +121,6 @@ class Speaker: public ::Outputs::Filter<Speaker> {
void set_divider(uint8_t divider);
void set_is_enabled(bool is_enabled);
inline bool get_is_enabled() { return _is_enabled; }
void get_samples(unsigned int number_of_samples, int16_t *target);
void skip_samples(unsigned int number_of_samples);
@ -238,6 +237,7 @@ class Machine:
// Outputs
std::shared_ptr<Outputs::CRT::CRT> _crt;
std::shared_ptr<Speaker> _speaker;
bool speaker_is_enabled_;
};
}

View File

@ -30,6 +30,7 @@
4B30512D1D989E2200B4FED8 /* Drive.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B30512B1D989E2200B4FED8 /* Drive.cpp */; };
4B3051301D98ACC600B4FED8 /* Plus3.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B30512E1D98ACC600B4FED8 /* Plus3.cpp */; };
4B37EE821D7345A6006A09A4 /* BinaryDump.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B37EE801D7345A6006A09A4 /* BinaryDump.cpp */; };
4B3940E71DA83C8300427841 /* AsyncTaskQueue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */; };
4B3BA0C31D318AEC005DD7A7 /* C1540Tests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C21D318AEB005DD7A7 /* C1540Tests.swift */; };
4B3BA0CE1D318B44005DD7A7 /* C1540Bridge.mm in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C61D318B44005DD7A7 /* C1540Bridge.mm */; };
4B3BA0CF1D318B44005DD7A7 /* MOS6522Bridge.mm in Sources */ = {isa = PBXBuildFile; fileRef = 4B3BA0C91D318B44005DD7A7 /* MOS6522Bridge.mm */; };
@ -438,6 +439,8 @@
4B30512F1D98ACC600B4FED8 /* Plus3.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = Plus3.hpp; path = Electron/Plus3.hpp; sourceTree = "<group>"; };
4B37EE801D7345A6006A09A4 /* BinaryDump.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = BinaryDump.cpp; sourceTree = "<group>"; };
4B37EE811D7345A6006A09A4 /* BinaryDump.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = BinaryDump.hpp; sourceTree = "<group>"; };
4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = AsyncTaskQueue.cpp; path = ../../Concurrency/AsyncTaskQueue.cpp; sourceTree = "<group>"; };
4B3940E61DA83C8300427841 /* AsyncTaskQueue.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = AsyncTaskQueue.hpp; path = ../../Concurrency/AsyncTaskQueue.hpp; sourceTree = "<group>"; };
4B3BA0C21D318AEB005DD7A7 /* C1540Tests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = C1540Tests.swift; sourceTree = "<group>"; };
4B3BA0C51D318B44005DD7A7 /* C1540Bridge.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = C1540Bridge.h; sourceTree = "<group>"; };
4B3BA0C61D318B44005DD7A7 /* C1540Bridge.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = C1540Bridge.mm; sourceTree = "<group>"; };
@ -1003,6 +1006,15 @@
name = Outputs;
sourceTree = "<group>";
};
4B3940E81DA83C8700427841 /* Concurrency */ = {
isa = PBXGroup;
children = (
4B3940E51DA83C8300427841 /* AsyncTaskQueue.cpp */,
4B3940E61DA83C8300427841 /* AsyncTaskQueue.hpp */,
);
name = Concurrency;
sourceTree = "<group>";
};
4B3BA0C41D318B44005DD7A7 /* Bridges */ = {
isa = PBXGroup;
children = (
@ -1469,6 +1481,7 @@
4BB73E951B587A5100552FC2 = {
isa = PBXGroup;
children = (
4B3940E81DA83C8700427841 /* Concurrency */,
4BC76E6A1C98F43700E6EF73 /* Accelerate.framework */,
4BB73EA01B587A5100552FC2 /* Clock Signal */,
4BB73EB51B587A5100552FC2 /* Clock SignalTests */,
@ -2185,6 +2198,7 @@
4B1E85751D170228001EF87D /* Typer.cpp in Sources */,
4BF829631D8F536B001BAE39 /* SSD.cpp in Sources */,
4B2E2D9D1C3A070400138695 /* Electron.cpp in Sources */,
4B3940E71DA83C8300427841 /* AsyncTaskQueue.cpp in Sources */,
4BAB62B81D3302CA00DF5BA0 /* PCMTrack.cpp in Sources */,
4B69FB3D1C4D908A00B5F0AA /* Tape.cpp in Sources */,
4B8FE2291DA1EDDF0090D3CE /* ElectronOptionsPanel.swift in Sources */,

View File

@ -14,6 +14,7 @@
#include <time.h>
#include "../SignalProcessing/Stepper.hpp"
#include "../SignalProcessing/FIRFilter.hpp"
#include "../Concurrency/AsyncTaskQueue.hpp"
namespace Outputs {
@ -25,7 +26,7 @@ namespace Outputs {
Intended to be a parent class, allowing descendants to pick the strategy by which input samples are mapped to
output samples.
*/
class Speaker {
class Speaker: public Concurrency::AsyncTaskQueue {
public:
class Delegate {
public:
@ -124,49 +125,20 @@ template <class T> class Filter: public Speaker {
public:
void run_for_cycles(unsigned int input_cycles)
{
if(_coefficients_are_dirty) update_filter_coefficients();
enqueue([=]() {
unsigned int cycles_remaining = input_cycles;
if(_coefficients_are_dirty) update_filter_coefficients();
// if input and output rates exactly match, just accumulate results and pass on
if(_input_cycles_per_second == _output_cycles_per_second && _high_frequency_cut_off < 0.0)
{
while(input_cycles)
// if input and output rates exactly match, just accumulate results and pass on
if(_input_cycles_per_second == _output_cycles_per_second && _high_frequency_cut_off < 0.0)
{
unsigned int cycles_to_read = (unsigned int)(_buffer_size - _buffer_in_progress_pointer);
if(cycles_to_read > input_cycles) cycles_to_read = input_cycles;
static_cast<T *>(this)->get_samples(cycles_to_read, &_buffer_in_progress.get()[_buffer_in_progress_pointer]);
_buffer_in_progress_pointer += cycles_to_read;
// announce to delegate if full
if(_buffer_in_progress_pointer == _buffer_size)
while(cycles_remaining)
{
_buffer_in_progress_pointer = 0;
if(_delegate)
{
_delegate->speaker_did_complete_samples(this, _buffer_in_progress.get(), _buffer_size);
}
}
unsigned int cycles_to_read = (unsigned int)(_buffer_size - _buffer_in_progress_pointer);
if(cycles_to_read > cycles_remaining) cycles_to_read = cycles_remaining;
input_cycles -= cycles_to_read;
}
return;
}
// if the output rate is less than the input rate, use the filter
if(_input_cycles_per_second > _output_cycles_per_second)
{
while(input_cycles)
{
unsigned int cycles_to_read = (unsigned int)std::min((int)input_cycles, _number_of_taps - _input_buffer_depth);
static_cast<T *>(this)->get_samples(cycles_to_read, &_input_buffer.get()[_input_buffer_depth]);
input_cycles -= cycles_to_read;
_input_buffer_depth += cycles_to_read;
if(_input_buffer_depth == _number_of_taps)
{
_buffer_in_progress.get()[_buffer_in_progress_pointer] = _filter->apply(_input_buffer.get());
_buffer_in_progress_pointer++;
static_cast<T *>(this)->get_samples(cycles_to_read, &_buffer_in_progress.get()[_buffer_in_progress_pointer]);
_buffer_in_progress_pointer += cycles_to_read;
// announce to delegate if full
if(_buffer_in_progress_pointer == _buffer_size)
@ -178,29 +150,61 @@ template <class T> class Filter: public Speaker {
}
}
// If the next loop around is going to reuse some of the samples just collected, use a memmove to
// preserve them in the correct locations (TODO: use a longer buffer to fix that) and don't skip
// anything. Otherwise skip as required to get to the next sample batch and don't expect to reuse.
uint64_t steps = _stepper->step();
if(steps < _number_of_taps)
{
int16_t *input_buffer = _input_buffer.get();
memmove(input_buffer, &input_buffer[steps], sizeof(int16_t) * ((size_t)_number_of_taps - (size_t)steps));
_input_buffer_depth -= steps;
}
else
{
if(steps > _number_of_taps)
static_cast<T *>(this)->skip_samples((unsigned int)steps - (unsigned int)_number_of_taps);
_input_buffer_depth = 0;
}
cycles_remaining -= cycles_to_read;
}
return;
}
return;
}
// if the output rate is less than the input rate, use the filter
if(_input_cycles_per_second > _output_cycles_per_second)
{
while(cycles_remaining)
{
unsigned int cycles_to_read = (unsigned int)std::min((int)cycles_remaining, _number_of_taps - _input_buffer_depth);
static_cast<T *>(this)->get_samples(cycles_to_read, &_input_buffer.get()[_input_buffer_depth]);
cycles_remaining -= cycles_to_read;
_input_buffer_depth += cycles_to_read;
if(_input_buffer_depth == _number_of_taps)
{
_buffer_in_progress.get()[_buffer_in_progress_pointer] = _filter->apply(_input_buffer.get());
_buffer_in_progress_pointer++;
// announce to delegate if full
if(_buffer_in_progress_pointer == _buffer_size)
{
_buffer_in_progress_pointer = 0;
if(_delegate)
{
_delegate->speaker_did_complete_samples(this, _buffer_in_progress.get(), _buffer_size);
}
}
// If the next loop around is going to reuse some of the samples just collected, use a memmove to
// preserve them in the correct locations (TODO: use a longer buffer to fix that) and don't skip
// anything. Otherwise skip as required to get to the next sample batch and don't expect to reuse.
uint64_t steps = _stepper->step();
if(steps < _number_of_taps)
{
int16_t *input_buffer = _input_buffer.get();
memmove(input_buffer, &input_buffer[steps], sizeof(int16_t) * ((size_t)_number_of_taps - (size_t)steps));
_input_buffer_depth -= steps;
}
else
{
if(steps > _number_of_taps)
static_cast<T *>(this)->skip_samples((unsigned int)steps - (unsigned int)_number_of_taps);
_input_buffer_depth = 0;
}
}
}
return;
}
// TODO: input rate is less than output rate
});
}
private: