Commit 22f4c598 by FritzFlorian

WIP: First running version of stealing.

The project is currently really messy and there are sporadic sigsevs. This indicates that we still have a race in our code. Thread Sanitizer does not work with our current implementation, as it needs annotations for fibers.

The next step is to clean up the project and maybe add thread sanitizer support to our fiber implementation. This should help finding the remaining bugs.
parent c85f2d0f
Pipeline #1389 failed with stages
in 27 seconds
......@@ -12,7 +12,10 @@ using namespace pls::internal::scheduling;
using namespace comparison_benchmarks::base;
int pls_fib(int n) {
if (n <= 1) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
......@@ -23,14 +26,18 @@ int pls_fib(int n) {
scheduler::spawn([n, &b]() {
b = pls_fib(n - 2);
});
scheduler::sync();
return a + b;
}
constexpr int MAX_NUM_THREADS = 8;
constexpr int MAX_NUM_THREADS = 4;
constexpr int MAX_NUM_TASKS = 64;
constexpr int MAX_STACK_SIZE = 4096;
constexpr int MAX_STACK_SIZE = 4096 * 8;
static_scheduler_memory<MAX_NUM_THREADS,
MAX_NUM_TASKS,
MAX_STACK_SIZE> global_scheduler_memory;
int main(int argc, char **argv) {
int num_threads;
......@@ -41,26 +48,22 @@ int main(int argc, char **argv) {
string full_directory = directory + "/PLS_v3/";
benchmark_runner runner{full_directory, test_name};
static_scheduler_memory<MAX_NUM_THREADS,
MAX_NUM_TASKS,
MAX_STACK_SIZE> static_scheduler_memory;
scheduler scheduler{static_scheduler_memory, (unsigned) num_threads};
scheduler scheduler{global_scheduler_memory, (unsigned) num_threads};
volatile int res;
for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) {
scheduler.perform_work([&]() {
scheduler.perform_work([&]() {
for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) {
res = pls_fib(fib::INPUT_N);
});
}
}
});
for (int i = 0; i < fib::NUM_ITERATIONS; i++) {
scheduler.perform_work([&]() {
scheduler.perform_work([&]() {
for (int i = 0; i < fib::NUM_ITERATIONS; i++) {
runner.start_iteration();
res = pls_fib(fib::INPUT_N);
runner.end_iteration();
});
}
}
});
runner.commit_results(true);
return 0;
......
......@@ -7,4 +7,4 @@ if(THREAD_SANITIZER)
add_compile_options(-fsanitize=thread -g)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread")
endif()
message("-- Thread Sanitizer: ${THREAD_SANITIZER}")
\ No newline at end of file
message("-- Thread Sanitizer: ${THREAD_SANITIZER}")
......@@ -55,6 +55,7 @@ class scheduler {
template<typename Function>
static void spawn(Function &&lambda);
static void sync();
thread_state &thread_state_for(size_t id);
......
......@@ -25,7 +25,9 @@ class scheduler::init_function_impl : public init_function {
public:
explicit init_function_impl(F &function) : function_{function} {}
void run() override {
thread_state::get().get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) {
auto &root_task = thread_state::get().get_task_manager().get_active_task();
root_task.clean_ = true;
root_task.run_as_task([&](context_switcher::continuation cont) {
thread_state::get().set_main_continuation(std::move(cont));
function_();
thread_state::get().get_scheduler().work_section_done_.store(true);
......
......@@ -58,6 +58,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task {
std::atomic<traded_cas_field> traded_field_{};
task *resource_stack_next_{};
std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}};
bool clean_;
// Task Tree (we have a parent that we want to continue when we finish)
task *parent_task_;
......
......@@ -49,8 +49,15 @@ class task_manager {
template<typename F>
void spawn_child(F &&lambda);
void sync();
task *steal_task(task_manager &stealing_task_manager);
bool steal_task(task_manager &stealing_task_manager);
bool try_clean_return(context_switcher::continuation &result_cont);
bool check_task_chain_forward(task *start_task);
bool check_task_chain_backward(task *start_task);
bool check_task_chain();
private:
size_t num_tasks_;
......
......@@ -18,69 +18,42 @@ namespace scheduling {
template<typename F>
void task_manager::spawn_child(F &&lambda) {
auto continuation = active_task_->next_->run_as_task([lambda, this](context_switcher::continuation cont) {
auto *last_task = active_task_;
auto *this_task = active_task_->next_;
last_task->continuation_ = std::move(cont);
active_task_ = this_task;
traded_cas_field expected_cas_value = deque_.push_bot(last_task);
traded_cas_field empty_cas;
lambda();
if (last_task->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) {
// Fast path, simply continue execution where we left of before spawn.
// This requires no coordination with the resource stack.
active_task_ = last_task;
deque_.popped_bot();
return std::move(last_task->continuation_);
} else {
// Slow path, the continuation was stolen.
// First empty our own deque (everything below must have been stolen already).
deque_.empty_deque();
// TODO: This whole process can be taken out into a function, no need to copy it in every lambda.
// Also split smaller functions out, this is a mess right now...
// Try to get a clean resource chain to go back to the main stealing loop
task *clean_chain = pop_resource_from_task(last_task);
if (clean_chain == nullptr) {
// double-check if we are really last one or we only have unlucky timing
auto cas_field = last_task->traded_field_.load();
if (cas_field.is_filled_with_object()) {
traded_cas_field empty_target;
if (last_task->traded_field_.compare_exchange_strong(cas_field, empty_target)) {
clean_chain = cas_field.get_trade_object();
auto *spawning_task_manager = this;
auto continuation =
active_task_->next_->run_as_task([lambda, spawning_task_manager](context_switcher::continuation cont) {
auto *last_task = spawning_task_manager->active_task_;
auto *this_task = spawning_task_manager->active_task_->next_;
last_task->continuation_ = std::move(cont);
spawning_task_manager->active_task_ = this_task;
traded_cas_field expected_cas_value = spawning_task_manager->deque_.push_bot(last_task);
traded_cas_field empty_cas;
lambda();
auto *syncing_task_manager = &thread_state::get().get_task_manager();
if (last_task->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) {
// Fast path, simply continue execution where we left of before spawn.
// This requires no coordination with the resource stack.
syncing_task_manager->active_task_ = last_task;
syncing_task_manager->deque_.popped_bot();
return std::move(last_task->continuation_);
} else {
// Slow path, the continuation was stolen.
// First empty our own deque (everything below must have been stolen already).
syncing_task_manager->deque_.empty_deque();
context_switcher::continuation result_cont;
if (syncing_task_manager->try_clean_return(result_cont)) {
// We return back to the main scheduling loop
return result_cont;
} else {
clean_chain = pop_resource_from_task(last_task);
// We finish up the last task and are the sole owner again
return result_cont;
}
}
}
if (clean_chain != nullptr) {
// We got a clean chain to continue working on.
PLS_ASSERT(this_task->prev_->depth_ == clean_chain->depth_,
"Resources must only reside in the correct depth!");
this_task->prev_ = clean_chain;
clean_chain->next_ = this_task;
// Walk back chain to make first task active
active_task_ = clean_chain;
while (active_task_->prev_ != nullptr) {
active_task_ = active_task_->prev_;
}
// jump back to continuation in main scheduling loop, time to steal some work
return std::move(thread_state::get().get_main_continuation());
} else {
// We are the last one working on this task. Thus the sync must be finished, continue working.
active_task_ = last_task;
return std::move(last_task->continuation_);
}
}
PLS_ERROR("Slow Path/Stealing not implemented!");
});
});
if (continuation.valid()) {
// We jumped in here from the main loop, keep track!
......
......@@ -70,6 +70,8 @@ void scheduler::work_thread_work_section() {
}
while (!work_section_done_) {
PLS_ASSERT(my_task_manager.check_task_chain(), "Must start stealing with a clean task chain.");
// Steal Routine (will be continuously executed when there are no more fall through's).
// TODO: move into separate function
const size_t offset = my_state.get_rand() % num_threads;
......@@ -78,45 +80,42 @@ void scheduler::work_thread_work_section() {
// Perform steal
size_t target = (offset + i) % num_threads;
auto &target_state = my_state.get_scheduler().thread_state_for(target);
auto *stolen_task = target_state.get_task_manager().steal_task(my_task_manager);
// Handle successful steal
if (stolen_task != nullptr) {
// Adapt our task chain
// Note: This differs from how it worked before. The aquiring of new chains happens
// right at the steal. Whenever we start to work on an continuation we aquire the full
// 'dirty' chain below it. We fix this up at the sync points later on by popping of the resource stack.
auto *exchanged_task = &my_task_manager.get_active_task();
for (unsigned j = 0; j < stolen_task->depth_; j++) {
exchanged_task = exchanged_task->next_;
}
auto *next_own_task = exchanged_task->next_;
next_own_task->prev_ = stolen_task;
stolen_task->next_ = next_own_task;
my_task_manager.set_active_task(stolen_task);
// move the traded in resource of this active task over to the stack of resources.
my_task_manager.push_resource_on_task(stolen_task, exchanged_task);
traded_cas_field empty_field;
traded_cas_field expected_field;
expected_field.fill_with_trade_object(exchanged_task);
if (stolen_task->traded_field_.compare_exchange_strong(expected_field, empty_field)) {
// All good, nothing more to do
} else {
// The last other active thread took it as its spare resource...
// ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
PLS_ASSERT(expected_field.is_empty(),
"Must be empty, as otherwise no one will steal the 'spare traded task'.");
auto current_root = stolen_task->resource_stack_root_.load();
current_root.stamp++;
current_root.value = 0;
stolen_task->resource_stack_root_.store(current_root);
bool steal_success = target_state.get_task_manager().steal_task(my_task_manager);
if (steal_success) {
// The stealing procedure correctly changed our chain and active task.
// Now we need to perform the 'post steal' actions (manage resources and execute the stolen task).
PLS_ASSERT(my_task_manager.check_task_chain_forward(&my_task_manager.get_active_task()),
"We are sole owner of this chain, it has to be valid!");
// Move the traded in resource of this active task over to the stack of resources.
auto *stolen_task = &my_task_manager.get_active_task();
traded_cas_field stolen_task_cas = stolen_task->traded_field_.load();
if (stolen_task_cas.is_filled_with_object()) {
// Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns.
auto *exchanged_task = stolen_task_cas.get_trade_object();
my_task_manager.push_resource_on_task(stolen_task, exchanged_task);
traded_cas_field empty_field;
traded_cas_field expected_field;
expected_field.fill_with_trade_object(exchanged_task);
if (stolen_task->traded_field_.compare_exchange_strong(expected_field, empty_field)) {
// All good, nothing more to do
} else {
// The last other active thread took it as its spare resource...
// ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
PLS_ASSERT(expected_field.is_empty(),
"Must be empty, as otherwise no one will steal the 'spare traded task'.");
auto current_root = stolen_task->resource_stack_root_.load();
current_root.stamp++;
current_root.value = 0;
stolen_task->resource_stack_root_.store(current_root);
}
}
// execute the stolen task by jumping to it's continuation.
// Execute the stolen task by jumping to it's continuation.
PLS_ASSERT(stolen_task->continuation_.valid(),
"A task that we can steal must have a valid continuation for us to start working.");
context_switcher::switch_context(std::move(stolen_task->continuation_));
......@@ -149,6 +148,10 @@ void scheduler::terminate() {
thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); }
void scheduler::sync() {
thread_state::get().get_task_manager().sync();
}
}
}
}
......@@ -32,27 +32,7 @@ static task *find_task(unsigned id, unsigned depth) {
return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth);
}
void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) {
data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root;
do {
current_root = target_task->resource_stack_root_.load();
target_root.stamp = current_root.stamp + 1;
target_root.value = spare_task_chain->thread_id_ + 1;
if (current_root.value == 0) {
// Empty, simply push in with no successor
spare_task_chain->resource_stack_next_ = nullptr;
} else {
// Already an entry. Find it's corresponding task and set it as our successor.
auto *current_root_task = find_task(current_root.value, target_task->depth_);
spare_task_chain->resource_stack_next_ = current_root_task;
}
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
}
task *task_manager::steal_task(task_manager &stealing_task_manager) {
bool task_manager::steal_task(task_manager &stealing_task_manager) {
PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain.");
auto peek = deque_.peek_top();
......@@ -60,23 +40,65 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) {
auto target_top = std::get<1>(peek);
if (optional_target_task) {
PLS_ASSERT(stealing_task_manager.check_task_chain(), "We are stealing, must not have a bad chain here!");
// search for the task we want to trade in
task *target_task = *optional_target_task;
task *traded_task = stealing_task_manager.active_task_;
for (unsigned i = 0; i < target_task->depth_; i++) {
traded_task = traded_task->next_;
}
// keep a reference to the rest of the task chain that we keep
task *next_own_task = traded_task->next_;
// 'unchain' the traded tasks (to help us find bugs only)
traded_task->next_ = nullptr;
auto optional_result_task = deque_.pop_top(traded_task, target_top);
if (optional_result_task) {
return *optional_result_task;
PLS_ASSERT(*optional_result_task == target_task, "We must only steal the task that we peeked at!");
// the steal was a success, link the chain so we own the stolen part
target_task->next_ = next_own_task;
next_own_task->prev_ = target_task;
stealing_task_manager.set_active_task(target_task);
return true;
} else {
return nullptr;
// the steal failed, reset our chain to its old, clean state (re-link what we have broken)
traded_task->next_ = next_own_task;
return false;
}
} else {
return nullptr;
return false;
}
}
void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) {
PLS_ASSERT(check_task_chain_backward(spare_task_chain), "Must only push proper task chains.");
PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_,
"Makes no sense to push task onto itself, as it is not clean by definition.");
PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth.");
data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root;
do {
current_root = target_task->resource_stack_root_.load();
target_root.stamp = current_root.stamp + 1;
target_root.value = spare_task_chain->thread_id_ + 1;
if (current_root.value == 0) {
// Empty, simply push in with no successor
spare_task_chain->resource_stack_next_ = nullptr;
} else {
// Already an entry. Find it's corresponding task and set it as our successor.
auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
spare_task_chain->resource_stack_next_ = current_root_task;
}
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
}
task *task_manager::pop_resource_from_task(task *target_task) {
data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root;
......@@ -91,15 +113,122 @@ task *task_manager::pop_resource_from_task(task *target_task) {
} else {
// Found something, try to pop it
auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
target_root.value = current_root_task->next_ != nullptr ? current_root_task->next_->thread_id_ + 1 : 0;
auto *next_stack_task = current_root_task->resource_stack_next_;
target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
output_task = current_root_task;
}
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
PLS_ASSERT(check_task_chain_backward(output_task), "Must only pop proper task chains.");
return output_task;
}
void task_manager::sync() {
auto continuation = active_task_->next_->run_as_task([this](context_switcher::continuation cont) {
auto *last_task = active_task_;
auto *this_task = active_task_->next_;
last_task->continuation_ = std::move(cont);
active_task_ = this_task;
context_switcher::continuation result_cont;
if (try_clean_return(result_cont)) {
// We return back to the main scheduling loop
active_task_->clean_ = true;
return result_cont;
} else {
// We finish up the last task
active_task_->clean_ = false;
return result_cont;
}
});
if (continuation.valid()) {
// We jumped in here from the main loop, keep track!
thread_state::get().set_main_continuation(std::move(continuation));
}
}
bool task_manager::try_clean_return(context_switcher::continuation &result_cont) {
task *this_task = active_task_;
task *last_task = active_task_->prev_;
if (last_task == nullptr) {
// We finished the final task of the computation, return to the scheduling loop.
result_cont = thread_state::get().get_main_continuation();
return true;
}
// Try to get a clean resource chain to go back to the main stealing loop
task *clean_chain = pop_resource_from_task(last_task);
if (clean_chain == nullptr) {
// double-check if we are really last one or we only have unlucky timing
auto cas_field = last_task->traded_field_.load();
if (cas_field.is_filled_with_object()) {
traded_cas_field empty_target;
if (last_task->traded_field_.compare_exchange_strong(cas_field, empty_target)) {
clean_chain = cas_field.get_trade_object();
} else {
clean_chain = pop_resource_from_task(last_task);
}
}
}
if (clean_chain != nullptr) {
// We got a clean chain to continue working on.
PLS_ASSERT(last_task->depth_ == clean_chain->depth_,
"Resources must only reside in the correct depth!");
PLS_ASSERT(check_task_chain_backward(clean_chain), "Can only aquire clean chains for clean returns!");
this_task->prev_ = clean_chain;
clean_chain->next_ = this_task;
// Walk back chain to make first task active
active_task_ = clean_chain;
while (active_task_->prev_ != nullptr) {
active_task_ = active_task_->prev_;
}
PLS_ASSERT(check_task_chain(), "We just aquired a clean chain...");
// jump back to continuation in main scheduling loop, time to steal some work
result_cont = thread_state::get().get_main_continuation();
return true;
} else {
// We are the last one working on this task. Thus the sync must be finished, continue working.
active_task_ = last_task;
// Make sure that we are owner fo this full continuation/task chain.
active_task_->next_ = this_task;
this_task->prev_ = active_task_;
result_cont = std::move(last_task->continuation_);
return false;
}
}
bool task_manager::check_task_chain_forward(task *start_task) {
while (start_task->next_ != nullptr) {
PLS_ASSERT(start_task->next_->prev_ == start_task, "Chain must have correct prev/next fields for linked list!");
start_task = start_task->next_;
}
return true;
}
bool task_manager::check_task_chain_backward(task *start_task) {
while (start_task->prev_ != nullptr) {
PLS_ASSERT(start_task->prev_->next_ == start_task, "Chain must have correct prev/next fields for linked list!");
start_task = start_task->prev_;
}
return true;
}
bool task_manager::check_task_chain() {
check_task_chain_backward(active_task_);
check_task_chain_forward(active_task_);
return true;
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment