From c85f2d0facdf5a7e27d09658d012179f7017d226 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Mon, 27 Jan 2020 10:44:37 +0100 Subject: [PATCH] WIP: first stable version of stealing outline. --- app/benchmark_fib/main.cpp | 8 ++++---- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 9 ++++----- lib/pls/include/pls/internal/scheduling/task_manager.h | 7 ++++++- lib/pls/include/pls/internal/scheduling/task_manager_impl.h | 10 ++++++---- lib/pls/src/internal/scheduling/scheduler.cpp | 70 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------- lib/pls/src/internal/scheduling/task_manager.cpp | 36 ++++++++++++++++++++++++++++++++++-- 6 files changed, 111 insertions(+), 29 deletions(-) diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 47ce0a1..63581f0 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -4,8 +4,7 @@ using namespace pls::internal::scheduling; #include -#include -#include +#include #include "benchmark_runner.h" #include "benchmark_base/fib.h" @@ -25,12 +24,13 @@ int pls_fib(int n) { b = pls_fib(n - 2); }); + return a + b; } -constexpr int MAX_NUM_THREADS = 1; +constexpr int MAX_NUM_THREADS = 8; constexpr int MAX_NUM_TASKS = 64; -constexpr int MAX_STACK_SIZE = 1024; +constexpr int MAX_STACK_SIZE = 4096; int main(int argc, char **argv) { int num_threads; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index feb6ca4..0064006 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -25,12 +25,11 @@ class scheduler::init_function_impl : public init_function { public: explicit init_function_impl(F &function) : function_{function} {} void run() override { - auto &thread_state = thread_state::get(); - thread_state.get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) { - thread_state.set_main_continuation(std::move(cont)); + thread_state::get().get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) { + thread_state::get().set_main_continuation(std::move(cont)); function_(); - thread_state.get_scheduler().work_section_done_.store(true); - return std::move(thread_state.get_main_continuation()); + thread_state::get().get_scheduler().work_section_done_.store(true); + return std::move(thread_state::get().get_main_continuation()); }); } diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index d1462b2..e001f44 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -28,7 +28,7 @@ class task_manager { external_trading_deque &deque); void push_resource_on_task(task *target_task, task *spare_task_chain); - task* pop_resource_from_task(task *target_task); + task *pop_resource_from_task(task *target_task); task *get_this_thread_task(size_t depth) { return &this_thread_tasks_[depth]; @@ -43,10 +43,15 @@ class task_manager { task &get_active_task() { return *active_task_; } + void set_active_task(task *active_task) { + active_task_ = active_task; + } template void spawn_child(F &&lambda); + task *steal_task(task_manager &stealing_task_manager); + private: size_t num_tasks_; diff --git a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h index 8439bc1..15811f2 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h @@ -25,12 +25,12 @@ void task_manager::spawn_child(F &&lambda) { last_task->continuation_ = std::move(cont); active_task_ = this_task; - traded_cas_field expected_cas_value = deque_.push_bot(active_task_); + traded_cas_field expected_cas_value = deque_.push_bot(last_task); traded_cas_field empty_cas; lambda(); - if (active_task_->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { + if (last_task->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { // Fast path, simply continue execution where we left of before spawn. // This requires no coordination with the resource stack. active_task_ = last_task; @@ -60,10 +60,12 @@ void task_manager::spawn_child(F &&lambda) { if (clean_chain != nullptr) { // We got a clean chain to continue working on. - PLS_ASSERT(active_task_->prev_->depth_ == clean_chain->depth_, + PLS_ASSERT(this_task->prev_->depth_ == clean_chain->depth_, "Resources must only reside in the correct depth!"); - active_task_->prev_ = clean_chain; + this_task->prev_ = clean_chain; + clean_chain->next_ = this_task; // Walk back chain to make first task active + active_task_ = clean_chain; while (active_task_->prev_ != nullptr) { active_task_ = active_task_->prev_; } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 479c724..de91dd4 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -1,5 +1,7 @@ #include "pls/internal/scheduling/scheduler.h" +#include "context_switcher/context_switcher.h" + #include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/thread_state.h" @@ -67,24 +69,66 @@ void scheduler::work_thread_work_section() { main_thread_starter_function_->run(); } - do { + while (!work_section_done_) { // Steal Routine (will be continuously executed when there are no more fall through's). // TODO: move into separate function -// const size_t offset = my_state.get_rand() % num_threads; -// const size_t max_tries = num_threads; -// for (size_t i = 0; i < max_tries; i++) { -// size_t target = (offset + i) % num_threads; -// auto &target_state = my_state.get_scheduler().thread_state_for(target); -// -// auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager); -// if (stolen_task != nullptr) { -// stolen_task->execute(); -// } -// } + const size_t offset = my_state.get_rand() % num_threads; + const size_t max_tries = num_threads; + for (size_t i = 0; i < max_tries; i++) { + // Perform steal + size_t target = (offset + i) % num_threads; + auto &target_state = my_state.get_scheduler().thread_state_for(target); + auto *stolen_task = target_state.get_task_manager().steal_task(my_task_manager); + + // Handle successful steal + if (stolen_task != nullptr) { + // Adapt our task chain + // Note: This differs from how it worked before. The aquiring of new chains happens + // right at the steal. Whenever we start to work on an continuation we aquire the full + // 'dirty' chain below it. We fix this up at the sync points later on by popping of the resource stack. + auto *exchanged_task = &my_task_manager.get_active_task(); + for (unsigned j = 0; j < stolen_task->depth_; j++) { + exchanged_task = exchanged_task->next_; + } + auto *next_own_task = exchanged_task->next_; + + next_own_task->prev_ = stolen_task; + stolen_task->next_ = next_own_task; + + my_task_manager.set_active_task(stolen_task); + + // move the traded in resource of this active task over to the stack of resources. + my_task_manager.push_resource_on_task(stolen_task, exchanged_task); + traded_cas_field empty_field; + traded_cas_field expected_field; + expected_field.fill_with_trade_object(exchanged_task); + if (stolen_task->traded_field_.compare_exchange_strong(expected_field, empty_field)) { + // All good, nothing more to do + } else { + // The last other active thread took it as its spare resource... + // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). + PLS_ASSERT(expected_field.is_empty(), + "Must be empty, as otherwise no one will steal the 'spare traded task'."); + + auto current_root = stolen_task->resource_stack_root_.load(); + current_root.stamp++; + current_root.value = 0; + stolen_task->resource_stack_root_.store(current_root); + } + + // execute the stolen task by jumping to it's continuation. + PLS_ASSERT(stolen_task->continuation_.valid(), + "A task that we can steal must have a valid continuation for us to start working."); + context_switcher::switch_context(std::move(stolen_task->continuation_)); + + // ...now we are done with this steal attempt, loop over. + break; + } + } // if (!my_cont_manager.falling_through()) { // base::this_thread::sleep(5); // } - } while (!work_section_done_); + } } void scheduler::terminate() { diff --git a/lib/pls/src/internal/scheduling/task_manager.cpp b/lib/pls/src/internal/scheduling/task_manager.cpp index f70eeb4..30bfcce 100644 --- a/lib/pls/src/internal/scheduling/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/task_manager.cpp @@ -1,3 +1,5 @@ +#include + #include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/task.h" @@ -47,12 +49,38 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha spare_task_chain->resource_stack_next_ = current_root_task; } - } while (target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); + } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); +} + +task *task_manager::steal_task(task_manager &stealing_task_manager) { + PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain."); + + auto peek = deque_.peek_top(); + auto optional_target_task = std::get<0>(peek); + auto target_top = std::get<1>(peek); + + if (optional_target_task) { + task *target_task = *optional_target_task; + task *traded_task = stealing_task_manager.active_task_; + for (unsigned i = 0; i < target_task->depth_; i++) { + traded_task = traded_task->next_; + } + + auto optional_result_task = deque_.pop_top(traded_task, target_top); + if (optional_result_task) { + return *optional_result_task; + } else { + return nullptr; + } + } else { + return nullptr; + } } task *task_manager::pop_resource_from_task(task *target_task) { data_structures::stamped_integer current_root; data_structures::stamped_integer target_root; + task *output_task; do { current_root = target_task->resource_stack_root_.load(); target_root.stamp = current_root.stamp + 1; @@ -64,8 +92,12 @@ task *task_manager::pop_resource_from_task(task *target_task) { // Found something, try to pop it auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); target_root.value = current_root_task->next_ != nullptr ? current_root_task->next_->thread_id_ + 1 : 0; + + output_task = current_root_task; } - } while (target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); + } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); + + return output_task; } } -- libgit2 0.26.0