From 4cf258f9521069c9c4a19e11e430350fb3aa3662 Mon Sep 17 00:00:00 2001 From: Thomas Harte Date: Thu, 8 Feb 2018 20:33:57 -0500 Subject: [PATCH] Parallelises MultiMachine running, and ensures errors propagate. --- .../Implementation/MultiCRTMachine.cpp | 59 +++++++++++++------ .../Implementation/MultiCRTMachine.hpp | 17 ++++++ Machines/Utility/MachineForTarget.cpp | 5 ++ 3 files changed, 64 insertions(+), 17 deletions(-) diff --git a/Analyser/Dynamic/MultiMachine/Implementation/MultiCRTMachine.cpp b/Analyser/Dynamic/MultiMachine/Implementation/MultiCRTMachine.cpp index d83b937e6..4321c3070 100644 --- a/Analyser/Dynamic/MultiMachine/Implementation/MultiCRTMachine.cpp +++ b/Analyser/Dynamic/MultiMachine/Implementation/MultiCRTMachine.cpp @@ -8,29 +8,55 @@ #include "MultiCRTMachine.hpp" +#include +#include + using namespace Analyser::Dynamic; MultiCRTMachine::MultiCRTMachine(const std::vector> &machines) : - machines_(machines) {} + machines_(machines), queues_(machines.size()) {} -void MultiCRTMachine::setup_output(float aspect_ratio) { -// auto reverse_iterator = machines_.rbegin(); -// while(reverse_iterator != machines_.rend()) { -// CRTMachine::Machine *crt_machine = (*reverse_iterator)->crt_machine(); -// if(crt_machine) crt_machine->setup_output(aspect_ratio); -// reverse_iterator++; -// } +void MultiCRTMachine::perform_parallel(const std::function &function) { + // Apply a blunt force parallelisation of the machines; each run_for is dispatched + // to a separate queue and this queue will block until all are done. + std::condition_variable condition; + std::mutex mutex; + std::size_t outstanding_machines = machines_.size(); + + for(std::size_t index = 0; index < machines_.size(); ++index) { + queues_[index].enqueue([&mutex, &condition, this, index, function, &outstanding_machines]() { + CRTMachine::Machine *crt_machine = machines_[index]->crt_machine(); + if(crt_machine) function(crt_machine); + + std::unique_lock lock(mutex); + outstanding_machines--; + condition.notify_all(); + }); + } + + do { + std::unique_lock lock(mutex); + condition.wait(lock); + } while(outstanding_machines); +} + +void MultiCRTMachine::perform_serial(const std::function &function) { for(const auto &machine: machines_) { CRTMachine::Machine *crt_machine = machine->crt_machine(); - if(crt_machine) crt_machine->setup_output(aspect_ratio); + if(crt_machine) function(crt_machine); } } +void MultiCRTMachine::setup_output(float aspect_ratio) { + perform_serial([=](::CRTMachine::Machine *machine) { + machine->setup_output(aspect_ratio); + }); +} + void MultiCRTMachine::close_output() { - for(const auto &machine: machines_) { - CRTMachine::Machine *crt_machine = machine->crt_machine(); - if(crt_machine) crt_machine->close_output(); - } + perform_serial([=](::CRTMachine::Machine *machine) { + machine->close_output(); + }); } Outputs::CRT::CRT *MultiCRTMachine::get_crt() { @@ -44,10 +70,9 @@ Outputs::Speaker::Speaker *MultiCRTMachine::get_speaker() { } void MultiCRTMachine::run_for(const Cycles cycles) { - for(const auto &machine: machines_) { - CRTMachine::Machine *crt_machine = machine->crt_machine(); - if(crt_machine) crt_machine->run_for(cycles); - } + perform_parallel([=](::CRTMachine::Machine *machine) { + machine->run_for(cycles); + }); if(delegate_) delegate_->multi_crt_did_run_machines(); } diff --git a/Analyser/Dynamic/MultiMachine/Implementation/MultiCRTMachine.hpp b/Analyser/Dynamic/MultiMachine/Implementation/MultiCRTMachine.hpp index a667ef9ff..9480fb50a 100644 --- a/Analyser/Dynamic/MultiMachine/Implementation/MultiCRTMachine.hpp +++ b/Analyser/Dynamic/MultiMachine/Implementation/MultiCRTMachine.hpp @@ -9,9 +9,11 @@ #ifndef MultiCRTMachine_hpp #define MultiCRTMachine_hpp +#include "../../../../Concurrency/AsyncTaskQueue.hpp" #include "../../../../Machines/CRTMachine.hpp" #include "../../../../Machines/DynamicMachine.hpp" + #include #include @@ -45,7 +47,22 @@ struct MultiCRTMachine: public ::CRTMachine::Machine, public ::CRTMachine::Machi private: const std::vector> &machines_; + std::vector queues_; Delegate *delegate_ = nullptr; + + /*! + Performs a parallel for operation across all machines, performing the supplied + function on each and returning only once all applications have completed. + + No guarantees are extended as to which thread operations will occur on. + */ + void perform_parallel(const std::function &); + + /*! + Performs a serial for operation across all machines, performing the supplied + function on each on the calling thread. + */ + void perform_serial(const std::function &); }; } diff --git a/Machines/Utility/MachineForTarget.cpp b/Machines/Utility/MachineForTarget.cpp index c523032ed..3c1641fcc 100644 --- a/Machines/Utility/MachineForTarget.cpp +++ b/Machines/Utility/MachineForTarget.cpp @@ -70,6 +70,11 @@ namespace { std::vector> machines; for(const auto &target: targets) { machines.emplace_back(MachineForTarget(*target, rom_fetcher, error)); + + // Exit early if any errors have occurred. + if(error != Error::None) { + return nullptr; + } } return new Analyser::Dynamic::MultiMachine(std::move(machines));