#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/task.h" #include "pls/internal/base/error_handling.h" namespace pls { namespace internal { namespace scheduling { scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads, bool reuse_thread) : num_threads_{num_threads}, reuse_thread_{reuse_thread}, memory_{memory}, sync_barrier_{num_threads + 1 - reuse_thread}, terminated_{false} { if (num_threads_ > memory_->max_threads()) { PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); } for (unsigned int i = 0; i < num_threads_; i++) { // Placement new is required, as the memory of `memory_` is not required to be initialized. new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), memory_->cont_stack_for(i), i}; if (reuse_thread && i == 0) { continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. } new((void *) memory_->thread_for(i))base::thread(&scheduler::worker_routine, memory_->thread_state_for(i)); } } scheduler::~scheduler() { terminate(); } void scheduler::worker_routine() { auto my_state = thread_state::get(); auto scheduler = my_state->scheduler_; while (true) { // Wait to be triggered scheduler->sync_barrier_.wait(); // Check for shutdown if (scheduler->terminated_) { return; } // Execute work if (my_state->id_ == 0) { // Main Thread auto root_task = scheduler->main_thread_root_task_; root_task->parent_ = nullptr; root_task->deque_offset_ = my_state->deque_.save_offset(); root_task->execute(); scheduler->work_section_done_ = true; } else { // Worker Threads while (!scheduler->work_section_done_) { if (!scheduler->try_execute_local()) { scheduler->try_execute_stolen(); } } } // Sync back with main thread my_state->scheduler_->sync_barrier_.wait(); } } void scheduler::terminate() { if (terminated_) { return; } terminated_ = true; sync_barrier_.wait(); for (unsigned int i = 0; i < num_threads_; i++) { if (reuse_thread_ && i == 0) { continue; } memory_->thread_for(i)->join(); } } task *scheduler::get_local_task() { PROFILE_STEALING("Get Local Task") return thread_state::get()->deque_.pop_local_task(); } task *scheduler::steal_task() { PROFILE_STEALING("Steal Task") // Data for victim selection const auto my_state = thread_state::get(); const auto my_id = my_state->id_; const size_t offset = my_state->random_() % num_threads(); const size_t max_tries = num_threads(); // TODO: Tune this value bool any_cas_fails_occured = false; // Current strategy: random start, then round robin from there for (size_t i = 0; i < max_tries; i++) { size_t target = (offset + i) % num_threads(); // Skip our self for stealing target = ((target == my_id) + target) % num_threads(); auto target_state = thread_state_for(target); bool cas_fail; auto result = target_state->deque_.pop_external_task(cas_fail); any_cas_fails_occured |= cas_fail; if (result != nullptr) { return result; } // TODO: See if we should backoff here (per missed steal) } if (!any_cas_fails_occured) { // Went through every task and we did not find any work. // Most likely there is non available right now, yield to other threads. pls::internal::base::this_thread::yield(); } return nullptr; } bool scheduler::try_execute_local() { task *local_task = get_local_task(); if (local_task != nullptr) { local_task->execute(); return true; } else { return false; } } bool scheduler::try_execute_stolen() { task *stolen_task = steal_task(); if (stolen_task != nullptr) { stolen_task->deque_offset_ = thread_state::get()->deque_.save_offset(); stolen_task->execute(); return true; } return false; } void scheduler::wait_for_all() { thread_state::get()->current_task_->wait_for_all(); } thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); } } } }