#include "pls/internal/scheduling/scheduler.h" #include "context_switcher/context_switcher.h" #include "pls/internal/base/thread.h" #include "pls/internal/base/error_handling.h" namespace pls::internal::scheduling { scheduler::scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread) : num_threads_{num_threads}, reuse_thread_{reuse_thread}, sync_barrier_{num_threads + 1 - reuse_thread}, worker_threads_{}, thread_states_{}, main_thread_starter_function_{nullptr}, work_section_done_{false}, terminated_{false} { worker_threads_.reserve(num_threads); task_managers_.reserve(num_threads); thread_states_.reserve(num_threads); for (unsigned int i = 0; i < num_threads_; i++) { auto &this_task_manager = task_managers_.emplace_back(std::make_unique(i, computation_depth, stack_size, stack_allocator_)); auto &this_thread_state = thread_states_.emplace_back(std::make_unique(*this, i, *this_task_manager)); if (reuse_thread && i == 0) { worker_threads_.emplace_back(); continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. } auto *this_thread_state_pointer = this_thread_state.get(); worker_threads_.emplace_back([this_thread_state_pointer] { thread_state::set(this_thread_state_pointer); work_thread_main_loop(); }); } } scheduler::~scheduler() { terminate(); } void scheduler::work_thread_main_loop() { auto &scheduler = thread_state::get().get_scheduler(); while (true) { // Wait to be triggered scheduler.sync_barrier_.wait(); // Check for shutdown if (scheduler.terminated_) { return; } scheduler.work_thread_work_section(); // Sync back with main thread scheduler.sync_barrier_.wait(); } } void scheduler::work_thread_work_section() { auto &my_state = thread_state::get(); auto &my_task_manager = my_state.get_task_manager(); auto const num_threads = my_state.get_scheduler().num_threads(); if (my_state.get_thread_id() == 0) { // Main Thread, kick off by executing the user's main code block. main_thread_starter_function_->run(); } unsigned int failed_steals = 0; while (!work_section_done_) { PLS_ASSERT(my_task_manager.check_task_chain(), "Must start stealing with a clean task chain."); // TODO: move steal routine into separate function const size_t target = my_state.get_rand() % num_threads; if (target == my_state.get_thread_id()) { continue; } auto &target_state = my_state.get_scheduler().thread_state_for(target); task *traded_task = target_state.get_task_manager().steal_task(my_task_manager); if (traded_task != nullptr) { // The stealing procedure correctly changed our chain and active task. // Now we need to perform the 'post steal' actions (manage resources and execute the stolen task). PLS_ASSERT(my_task_manager.check_task_chain_forward(&my_task_manager.get_active_task()), "We are sole owner of this chain, it has to be valid!"); // Move the traded in resource of this active task over to the stack of resources. auto *stolen_task = &my_task_manager.get_active_task(); // Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns. my_task_manager.push_resource_on_task(stolen_task, traded_task); auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task); if (optional_exchanged_task) { // All good, we pushed the task over to the stack, nothing more to do PLS_ASSERT(*optional_exchanged_task == traded_task, "We are currently executing this, no one else can put another task in this field!"); } else { // The last other active thread took it as its spare resource... // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). auto current_root = stolen_task->resource_stack_root_.load(); current_root.stamp++; current_root.value = 0; stolen_task->resource_stack_root_.store(current_root); } // Execute the stolen task by jumping to it's continuation. PLS_ASSERT(stolen_task->continuation_.valid(), "A task that we can steal must have a valid continuation for us to start working."); stolen_task->is_synchronized_ = false; context_switcher::switch_context(std::move(stolen_task->continuation_)); // We will continue execution in this line when we finished the stolen work. failed_steals = 0; } else { failed_steals++; if (failed_steals >= num_threads) { base::this_thread::yield(); } } } } void scheduler::terminate() { if (terminated_) { return; } terminated_ = true; sync_barrier_.wait(); for (unsigned int i = 0; i < num_threads_; i++) { if (reuse_thread_ && i == 0) { continue; } worker_threads_[i].join(); } } void scheduler::sync() { thread_state::get().get_task_manager().sync(); } }