Commit c85f2d0f by FritzFlorian

WIP: first stable version of stealing outline.

parent 6027f7be
Pipeline #1388 failed with stages
in 24 seconds
...@@ -4,8 +4,7 @@ ...@@ -4,8 +4,7 @@
using namespace pls::internal::scheduling; using namespace pls::internal::scheduling;
#include <iostream> #include <iostream>
#include <complex> #include <cstdio>
#include <vector>
#include "benchmark_runner.h" #include "benchmark_runner.h"
#include "benchmark_base/fib.h" #include "benchmark_base/fib.h"
...@@ -25,12 +24,13 @@ int pls_fib(int n) { ...@@ -25,12 +24,13 @@ int pls_fib(int n) {
b = pls_fib(n - 2); b = pls_fib(n - 2);
}); });
return a + b; return a + b;
} }
constexpr int MAX_NUM_THREADS = 1; constexpr int MAX_NUM_THREADS = 8;
constexpr int MAX_NUM_TASKS = 64; constexpr int MAX_NUM_TASKS = 64;
constexpr int MAX_STACK_SIZE = 1024; constexpr int MAX_STACK_SIZE = 4096;
int main(int argc, char **argv) { int main(int argc, char **argv) {
int num_threads; int num_threads;
......
...@@ -25,12 +25,11 @@ class scheduler::init_function_impl : public init_function { ...@@ -25,12 +25,11 @@ class scheduler::init_function_impl : public init_function {
public: public:
explicit init_function_impl(F &function) : function_{function} {} explicit init_function_impl(F &function) : function_{function} {}
void run() override { void run() override {
auto &thread_state = thread_state::get(); thread_state::get().get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) {
thread_state.get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) { thread_state::get().set_main_continuation(std::move(cont));
thread_state.set_main_continuation(std::move(cont));
function_(); function_();
thread_state.get_scheduler().work_section_done_.store(true); thread_state::get().get_scheduler().work_section_done_.store(true);
return std::move(thread_state.get_main_continuation()); return std::move(thread_state::get().get_main_continuation());
}); });
} }
......
...@@ -28,7 +28,7 @@ class task_manager { ...@@ -28,7 +28,7 @@ class task_manager {
external_trading_deque &deque); external_trading_deque &deque);
void push_resource_on_task(task *target_task, task *spare_task_chain); void push_resource_on_task(task *target_task, task *spare_task_chain);
task* pop_resource_from_task(task *target_task); task *pop_resource_from_task(task *target_task);
task *get_this_thread_task(size_t depth) { task *get_this_thread_task(size_t depth) {
return &this_thread_tasks_[depth]; return &this_thread_tasks_[depth];
...@@ -43,10 +43,15 @@ class task_manager { ...@@ -43,10 +43,15 @@ class task_manager {
task &get_active_task() { task &get_active_task() {
return *active_task_; return *active_task_;
} }
void set_active_task(task *active_task) {
active_task_ = active_task;
}
template<typename F> template<typename F>
void spawn_child(F &&lambda); void spawn_child(F &&lambda);
task *steal_task(task_manager &stealing_task_manager);
private: private:
size_t num_tasks_; size_t num_tasks_;
......
...@@ -25,12 +25,12 @@ void task_manager::spawn_child(F &&lambda) { ...@@ -25,12 +25,12 @@ void task_manager::spawn_child(F &&lambda) {
last_task->continuation_ = std::move(cont); last_task->continuation_ = std::move(cont);
active_task_ = this_task; active_task_ = this_task;
traded_cas_field expected_cas_value = deque_.push_bot(active_task_); traded_cas_field expected_cas_value = deque_.push_bot(last_task);
traded_cas_field empty_cas; traded_cas_field empty_cas;
lambda(); lambda();
if (active_task_->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { if (last_task->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) {
// Fast path, simply continue execution where we left of before spawn. // Fast path, simply continue execution where we left of before spawn.
// This requires no coordination with the resource stack. // This requires no coordination with the resource stack.
active_task_ = last_task; active_task_ = last_task;
...@@ -60,10 +60,12 @@ void task_manager::spawn_child(F &&lambda) { ...@@ -60,10 +60,12 @@ void task_manager::spawn_child(F &&lambda) {
if (clean_chain != nullptr) { if (clean_chain != nullptr) {
// We got a clean chain to continue working on. // We got a clean chain to continue working on.
PLS_ASSERT(active_task_->prev_->depth_ == clean_chain->depth_, PLS_ASSERT(this_task->prev_->depth_ == clean_chain->depth_,
"Resources must only reside in the correct depth!"); "Resources must only reside in the correct depth!");
active_task_->prev_ = clean_chain; this_task->prev_ = clean_chain;
clean_chain->next_ = this_task;
// Walk back chain to make first task active // Walk back chain to make first task active
active_task_ = clean_chain;
while (active_task_->prev_ != nullptr) { while (active_task_->prev_ != nullptr) {
active_task_ = active_task_->prev_; active_task_ = active_task_->prev_;
} }
......
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "context_switcher/context_switcher.h"
#include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
...@@ -67,24 +69,66 @@ void scheduler::work_thread_work_section() { ...@@ -67,24 +69,66 @@ void scheduler::work_thread_work_section() {
main_thread_starter_function_->run(); main_thread_starter_function_->run();
} }
do { while (!work_section_done_) {
// Steal Routine (will be continuously executed when there are no more fall through's). // Steal Routine (will be continuously executed when there are no more fall through's).
// TODO: move into separate function // TODO: move into separate function
// const size_t offset = my_state.get_rand() % num_threads; const size_t offset = my_state.get_rand() % num_threads;
// const size_t max_tries = num_threads; const size_t max_tries = num_threads;
// for (size_t i = 0; i < max_tries; i++) { for (size_t i = 0; i < max_tries; i++) {
// size_t target = (offset + i) % num_threads; // Perform steal
// auto &target_state = my_state.get_scheduler().thread_state_for(target); size_t target = (offset + i) % num_threads;
// auto &target_state = my_state.get_scheduler().thread_state_for(target);
// auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager); auto *stolen_task = target_state.get_task_manager().steal_task(my_task_manager);
// if (stolen_task != nullptr) {
// stolen_task->execute(); // Handle successful steal
// } if (stolen_task != nullptr) {
// } // Adapt our task chain
// Note: This differs from how it worked before. The aquiring of new chains happens
// right at the steal. Whenever we start to work on an continuation we aquire the full
// 'dirty' chain below it. We fix this up at the sync points later on by popping of the resource stack.
auto *exchanged_task = &my_task_manager.get_active_task();
for (unsigned j = 0; j < stolen_task->depth_; j++) {
exchanged_task = exchanged_task->next_;
}
auto *next_own_task = exchanged_task->next_;
next_own_task->prev_ = stolen_task;
stolen_task->next_ = next_own_task;
my_task_manager.set_active_task(stolen_task);
// move the traded in resource of this active task over to the stack of resources.
my_task_manager.push_resource_on_task(stolen_task, exchanged_task);
traded_cas_field empty_field;
traded_cas_field expected_field;
expected_field.fill_with_trade_object(exchanged_task);
if (stolen_task->traded_field_.compare_exchange_strong(expected_field, empty_field)) {
// All good, nothing more to do
} else {
// The last other active thread took it as its spare resource...
// ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
PLS_ASSERT(expected_field.is_empty(),
"Must be empty, as otherwise no one will steal the 'spare traded task'.");
auto current_root = stolen_task->resource_stack_root_.load();
current_root.stamp++;
current_root.value = 0;
stolen_task->resource_stack_root_.store(current_root);
}
// execute the stolen task by jumping to it's continuation.
PLS_ASSERT(stolen_task->continuation_.valid(),
"A task that we can steal must have a valid continuation for us to start working.");
context_switcher::switch_context(std::move(stolen_task->continuation_));
// ...now we are done with this steal attempt, loop over.
break;
}
}
// if (!my_cont_manager.falling_through()) { // if (!my_cont_manager.falling_through()) {
// base::this_thread::sleep(5); // base::this_thread::sleep(5);
// } // }
} while (!work_section_done_); }
} }
void scheduler::terminate() { void scheduler::terminate() {
......
#include <tuple>
#include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
...@@ -47,12 +49,38 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha ...@@ -47,12 +49,38 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha
spare_task_chain->resource_stack_next_ = current_root_task; spare_task_chain->resource_stack_next_ = current_root_task;
} }
} while (target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
}
task *task_manager::steal_task(task_manager &stealing_task_manager) {
PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain.");
auto peek = deque_.peek_top();
auto optional_target_task = std::get<0>(peek);
auto target_top = std::get<1>(peek);
if (optional_target_task) {
task *target_task = *optional_target_task;
task *traded_task = stealing_task_manager.active_task_;
for (unsigned i = 0; i < target_task->depth_; i++) {
traded_task = traded_task->next_;
}
auto optional_result_task = deque_.pop_top(traded_task, target_top);
if (optional_result_task) {
return *optional_result_task;
} else {
return nullptr;
}
} else {
return nullptr;
}
} }
task *task_manager::pop_resource_from_task(task *target_task) { task *task_manager::pop_resource_from_task(task *target_task) {
data_structures::stamped_integer current_root; data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root; data_structures::stamped_integer target_root;
task *output_task;
do { do {
current_root = target_task->resource_stack_root_.load(); current_root = target_task->resource_stack_root_.load();
target_root.stamp = current_root.stamp + 1; target_root.stamp = current_root.stamp + 1;
...@@ -64,8 +92,12 @@ task *task_manager::pop_resource_from_task(task *target_task) { ...@@ -64,8 +92,12 @@ task *task_manager::pop_resource_from_task(task *target_task) {
// Found something, try to pop it // Found something, try to pop it
auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
target_root.value = current_root_task->next_ != nullptr ? current_root_task->next_->thread_id_ + 1 : 0; target_root.value = current_root_task->next_ != nullptr ? current_root_task->next_->thread_id_ + 1 : 0;
output_task = current_root_task;
} }
} while (target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
return output_task;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment