#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/base/error_handling.h" namespace pls { namespace internal { namespace scheduling { scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) : num_threads_{num_threads}, memory_{memory}, sync_barrier_{num_threads + 1}, terminated_{false} { if (num_threads_ > memory_->max_threads()) { PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); } for (unsigned int i = 0; i < num_threads_; i++) { // Placement new is required, as the memory of `memory_` is not required to be initialized. new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i}; new((void *) memory_->thread_for(i))base::thread(&worker_routine, memory_->thread_state_for(i)); } } scheduler::~scheduler() { terminate(); } void worker_routine() { auto my_state = base::this_thread::state(); while (true) { my_state->scheduler_->sync_barrier_.wait(); if (my_state->scheduler_->terminated_) { return; } // The root task must only return when all work is done, // because of this a simple call is enough to ensure the // fork-join-section is done (logically joined back into our main thread). my_state->root_task_->execute(); my_state->scheduler_->sync_barrier_.wait(); } } void scheduler::terminate(bool wait_for_workers) { if (terminated_) { return; } terminated_ = true; sync_barrier_.wait(); if (wait_for_workers) { for (unsigned int i = 0; i < num_threads_; i++) { memory_->thread_for(i)->join(); } } } } } }