#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/task.h" #include "pls/internal/base/error_handling.h" namespace pls { namespace internal { namespace scheduling { scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) : num_threads_{num_threads}, memory_{memory}, sync_barrier_{num_threads + 1}, terminated_{false} { if (num_threads_ > memory_->max_threads()) { PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); } for (unsigned int i = 0; i < num_threads_; i++) { // Placement new is required, as the memory of `memory_` is not required to be initialized. new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i}; new((void *) memory_->thread_for(i))base::thread(&scheduler::worker_routine, memory_->thread_state_for(i)); } } scheduler::~scheduler() { terminate(); } void scheduler::worker_routine() { auto my_state = thread_state::get(); auto scheduler = my_state->scheduler_; while (true) { // Wait to be triggered scheduler->sync_barrier_.wait(); // Check for shutdown if (scheduler->terminated_) { return; } // Execute work if (my_state->id_ == 0) { // Main Thread auto root_task = scheduler->main_thread_root_task_; root_task->parent_ = nullptr; root_task->deque_state_ = my_state->deque_.save_state(); root_task->execute(); scheduler->work_section_done_ = true; } else { // Worker Threads while (!scheduler->work_section_done_) { if (!scheduler->try_execute_local()) { scheduler->try_execute_stolen(); } } } // Sync back with main thread my_state->scheduler_->sync_barrier_.wait(); } } void scheduler::terminate(bool wait_for_workers) { if (terminated_) { return; } terminated_ = true; sync_barrier_.wait(); if (wait_for_workers) { for (unsigned int i = 0; i < num_threads_; i++) { memory_->thread_for(i)->join(); } } } task *scheduler::get_local_task() { PROFILE_STEALING("Get Local Task") return thread_state::get()->deque_.pop_tail(); } task *scheduler::steal_task() { PROFILE_STEALING("Steal Task") // Data for victim selection const auto my_state = thread_state::get(); const auto my_id = my_state->id_; const size_t offset = my_state->random_() % num_threads(); const size_t max_tries = num_threads(); // TODO: Tune this value // Current strategy: random start, then round robin from there for (size_t i = 0; i < max_tries; i++) { size_t target = (offset + i) % num_threads(); // Skip our self for stealing if (target == my_id) { continue; } auto target_state = thread_state_for(target); // TODO: See if we should re-try popping if it failed due to contention auto result = target_state->deque_.pop_head(); if (result != nullptr) { return result; } // TODO: See if we should backoff here (per missed steal) } // TODO: See if we should backoff here (after a 'round' of missed steals) return nullptr; } bool scheduler::try_execute_local() { task *local_task = get_local_task(); if (local_task != nullptr) { local_task->execute(); return true; } else { return false; } } bool scheduler::try_execute_stolen() { task *stolen_task = steal_task(); if (stolen_task != nullptr) { stolen_task->deque_state_ = thread_state::get()->deque_.save_state(); stolen_task->execute(); return true; } return false; } void scheduler::wait_for_all() { thread_state::get()->current_task_->wait_for_all(); } thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); } } } }