1
0
mirror of https://github.com/TomHarte/CLK.git synced 2024-11-29 12:50:28 +00:00

Parallelises MultiMachine running, and ensures errors propagate.

This commit is contained in:
Thomas Harte 2018-02-08 20:33:57 -05:00
parent 4e720d57b2
commit 4cf258f952
3 changed files with 64 additions and 17 deletions

View File

@ -8,29 +8,55 @@
#include "MultiCRTMachine.hpp" #include "MultiCRTMachine.hpp"
#include <condition_variable>
#include <mutex>
using namespace Analyser::Dynamic; using namespace Analyser::Dynamic;
MultiCRTMachine::MultiCRTMachine(const std::vector<std::unique_ptr<::Machine::DynamicMachine>> &machines) : MultiCRTMachine::MultiCRTMachine(const std::vector<std::unique_ptr<::Machine::DynamicMachine>> &machines) :
machines_(machines) {} machines_(machines), queues_(machines.size()) {}
void MultiCRTMachine::setup_output(float aspect_ratio) { void MultiCRTMachine::perform_parallel(const std::function<void(::CRTMachine::Machine *)> &function) {
// auto reverse_iterator = machines_.rbegin(); // Apply a blunt force parallelisation of the machines; each run_for is dispatched
// while(reverse_iterator != machines_.rend()) { // to a separate queue and this queue will block until all are done.
// CRTMachine::Machine *crt_machine = (*reverse_iterator)->crt_machine(); std::condition_variable condition;
// if(crt_machine) crt_machine->setup_output(aspect_ratio); std::mutex mutex;
// reverse_iterator++; std::size_t outstanding_machines = machines_.size();
// }
for(std::size_t index = 0; index < machines_.size(); ++index) {
queues_[index].enqueue([&mutex, &condition, this, index, function, &outstanding_machines]() {
CRTMachine::Machine *crt_machine = machines_[index]->crt_machine();
if(crt_machine) function(crt_machine);
std::unique_lock<std::mutex> lock(mutex);
outstanding_machines--;
condition.notify_all();
});
}
do {
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
} while(outstanding_machines);
}
void MultiCRTMachine::perform_serial(const std::function<void (::CRTMachine::Machine *)> &function) {
for(const auto &machine: machines_) { for(const auto &machine: machines_) {
CRTMachine::Machine *crt_machine = machine->crt_machine(); CRTMachine::Machine *crt_machine = machine->crt_machine();
if(crt_machine) crt_machine->setup_output(aspect_ratio); if(crt_machine) function(crt_machine);
} }
} }
void MultiCRTMachine::setup_output(float aspect_ratio) {
perform_serial([=](::CRTMachine::Machine *machine) {
machine->setup_output(aspect_ratio);
});
}
void MultiCRTMachine::close_output() { void MultiCRTMachine::close_output() {
for(const auto &machine: machines_) { perform_serial([=](::CRTMachine::Machine *machine) {
CRTMachine::Machine *crt_machine = machine->crt_machine(); machine->close_output();
if(crt_machine) crt_machine->close_output(); });
}
} }
Outputs::CRT::CRT *MultiCRTMachine::get_crt() { Outputs::CRT::CRT *MultiCRTMachine::get_crt() {
@ -44,10 +70,9 @@ Outputs::Speaker::Speaker *MultiCRTMachine::get_speaker() {
} }
void MultiCRTMachine::run_for(const Cycles cycles) { void MultiCRTMachine::run_for(const Cycles cycles) {
for(const auto &machine: machines_) { perform_parallel([=](::CRTMachine::Machine *machine) {
CRTMachine::Machine *crt_machine = machine->crt_machine(); machine->run_for(cycles);
if(crt_machine) crt_machine->run_for(cycles); });
}
if(delegate_) delegate_->multi_crt_did_run_machines(); if(delegate_) delegate_->multi_crt_did_run_machines();
} }

View File

@ -9,9 +9,11 @@
#ifndef MultiCRTMachine_hpp #ifndef MultiCRTMachine_hpp
#define MultiCRTMachine_hpp #define MultiCRTMachine_hpp
#include "../../../../Concurrency/AsyncTaskQueue.hpp"
#include "../../../../Machines/CRTMachine.hpp" #include "../../../../Machines/CRTMachine.hpp"
#include "../../../../Machines/DynamicMachine.hpp" #include "../../../../Machines/DynamicMachine.hpp"
#include <memory> #include <memory>
#include <vector> #include <vector>
@ -45,7 +47,22 @@ struct MultiCRTMachine: public ::CRTMachine::Machine, public ::CRTMachine::Machi
private: private:
const std::vector<std::unique_ptr<::Machine::DynamicMachine>> &machines_; const std::vector<std::unique_ptr<::Machine::DynamicMachine>> &machines_;
std::vector<Concurrency::AsyncTaskQueue> queues_;
Delegate *delegate_ = nullptr; Delegate *delegate_ = nullptr;
/*!
Performs a parallel for operation across all machines, performing the supplied
function on each and returning only once all applications have completed.
No guarantees are extended as to which thread operations will occur on.
*/
void perform_parallel(const std::function<void(::CRTMachine::Machine *)> &);
/*!
Performs a serial for operation across all machines, performing the supplied
function on each on the calling thread.
*/
void perform_serial(const std::function<void(::CRTMachine::Machine *)> &);
}; };
} }

View File

@ -70,6 +70,11 @@ namespace {
std::vector<std::unique_ptr<Machine::DynamicMachine>> machines; std::vector<std::unique_ptr<Machine::DynamicMachine>> machines;
for(const auto &target: targets) { for(const auto &target: targets) {
machines.emplace_back(MachineForTarget(*target, rom_fetcher, error)); machines.emplace_back(MachineForTarget(*target, rom_fetcher, error));
// Exit early if any errors have occurred.
if(error != Error::None) {
return nullptr;
}
} }
return new Analyser::Dynamic::MultiMachine(std::move(machines)); return new Analyser::Dynamic::MultiMachine(std::move(machines));