From 22f4c5984c5d856ec5d60ce1aa6b276b7cf10bfa Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Mon, 27 Jan 2020 19:32:00 +0100 Subject: [PATCH] WIP: First running version of stealing. The project is currently really messy and there are sporadic sigsevs. This indicates that we still have a race in our code. Thread Sanitizer does not work with our current implementation, as it needs annotations for fibers. The next step is to clean up the project and maybe add thread sanitizer support to our fiber implementation. This should help finding the remaining bugs. --- app/benchmark_fib/main.cpp | 37 ++++++++++++++++++++----------------- cmake/SetupThreadSanitizer.cmake | 2 +- lib/pls/include/pls/internal/scheduling/scheduler.h | 1 + lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 4 +++- lib/pls/include/pls/internal/scheduling/task.h | 1 + lib/pls/include/pls/internal/scheduling/task_manager.h | 9 ++++++++- lib/pls/include/pls/internal/scheduling/task_manager_impl.h | 93 +++++++++++++++++++++++++++++++++------------------------------------------------------------ lib/pls/src/internal/scheduling/scheduler.cpp | 77 ++++++++++++++++++++++++++++++++++++++++------------------------------------- lib/pls/src/internal/scheduling/task_manager.cpp | 179 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------- 9 files changed, 261 insertions(+), 142 deletions(-) diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 63581f0..7e3976b 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -12,7 +12,10 @@ using namespace pls::internal::scheduling; using namespace comparison_benchmarks::base; int pls_fib(int n) { - if (n <= 1) { + if (n == 0) { + return 0; + } + if (n == 1) { return 1; } @@ -23,14 +26,18 @@ int pls_fib(int n) { scheduler::spawn([n, &b]() { b = pls_fib(n - 2); }); - + scheduler::sync(); return a + b; } -constexpr int MAX_NUM_THREADS = 8; +constexpr int MAX_NUM_THREADS = 4; constexpr int MAX_NUM_TASKS = 64; -constexpr int MAX_STACK_SIZE = 4096; +constexpr int MAX_STACK_SIZE = 4096 * 8; + +static_scheduler_memory global_scheduler_memory; int main(int argc, char **argv) { int num_threads; @@ -41,26 +48,22 @@ int main(int argc, char **argv) { string full_directory = directory + "/PLS_v3/"; benchmark_runner runner{full_directory, test_name}; - static_scheduler_memory static_scheduler_memory; - - scheduler scheduler{static_scheduler_memory, (unsigned) num_threads}; + scheduler scheduler{global_scheduler_memory, (unsigned) num_threads}; volatile int res; - for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) { - scheduler.perform_work([&]() { + scheduler.perform_work([&]() { + for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) { res = pls_fib(fib::INPUT_N); - }); - } + } + }); - for (int i = 0; i < fib::NUM_ITERATIONS; i++) { - scheduler.perform_work([&]() { + scheduler.perform_work([&]() { + for (int i = 0; i < fib::NUM_ITERATIONS; i++) { runner.start_iteration(); res = pls_fib(fib::INPUT_N); runner.end_iteration(); - }); - } + } + }); runner.commit_results(true); return 0; diff --git a/cmake/SetupThreadSanitizer.cmake b/cmake/SetupThreadSanitizer.cmake index 4a5debd..e833aa0 100644 --- a/cmake/SetupThreadSanitizer.cmake +++ b/cmake/SetupThreadSanitizer.cmake @@ -7,4 +7,4 @@ if(THREAD_SANITIZER) add_compile_options(-fsanitize=thread -g) set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread") endif() -message("-- Thread Sanitizer: ${THREAD_SANITIZER}") \ No newline at end of file +message("-- Thread Sanitizer: ${THREAD_SANITIZER}") diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 1fb55ac..5cb8f81 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -55,6 +55,7 @@ class scheduler { template static void spawn(Function &&lambda); + static void sync(); thread_state &thread_state_for(size_t id); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 0064006..2d6d347 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -25,7 +25,9 @@ class scheduler::init_function_impl : public init_function { public: explicit init_function_impl(F &function) : function_{function} {} void run() override { - thread_state::get().get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) { + auto &root_task = thread_state::get().get_task_manager().get_active_task(); + root_task.clean_ = true; + root_task.run_as_task([&](context_switcher::continuation cont) { thread_state::get().set_main_continuation(std::move(cont)); function_(); thread_state::get().get_scheduler().work_section_done_.store(true); diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 5664046..4dc5881 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -58,6 +58,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { std::atomic traded_field_{}; task *resource_stack_next_{}; std::atomic resource_stack_root_{{0, 0}}; + bool clean_; // Task Tree (we have a parent that we want to continue when we finish) task *parent_task_; diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index e001f44..b4de8d7 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -49,8 +49,15 @@ class task_manager { template void spawn_child(F &&lambda); + void sync(); - task *steal_task(task_manager &stealing_task_manager); + bool steal_task(task_manager &stealing_task_manager); + + bool try_clean_return(context_switcher::continuation &result_cont); + + bool check_task_chain_forward(task *start_task); + bool check_task_chain_backward(task *start_task); + bool check_task_chain(); private: size_t num_tasks_; diff --git a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h index 15811f2..84ce60e 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h @@ -18,69 +18,42 @@ namespace scheduling { template void task_manager::spawn_child(F &&lambda) { - auto continuation = active_task_->next_->run_as_task([lambda, this](context_switcher::continuation cont) { - auto *last_task = active_task_; - auto *this_task = active_task_->next_; - - last_task->continuation_ = std::move(cont); - active_task_ = this_task; - - traded_cas_field expected_cas_value = deque_.push_bot(last_task); - traded_cas_field empty_cas; - - lambda(); - - if (last_task->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { - // Fast path, simply continue execution where we left of before spawn. - // This requires no coordination with the resource stack. - active_task_ = last_task; - deque_.popped_bot(); - return std::move(last_task->continuation_); - } else { - // Slow path, the continuation was stolen. - // First empty our own deque (everything below must have been stolen already). - deque_.empty_deque(); - - // TODO: This whole process can be taken out into a function, no need to copy it in every lambda. - // Also split smaller functions out, this is a mess right now... - // Try to get a clean resource chain to go back to the main stealing loop - task *clean_chain = pop_resource_from_task(last_task); - if (clean_chain == nullptr) { - // double-check if we are really last one or we only have unlucky timing - auto cas_field = last_task->traded_field_.load(); - if (cas_field.is_filled_with_object()) { - traded_cas_field empty_target; - if (last_task->traded_field_.compare_exchange_strong(cas_field, empty_target)) { - clean_chain = cas_field.get_trade_object(); + auto *spawning_task_manager = this; + auto continuation = + active_task_->next_->run_as_task([lambda, spawning_task_manager](context_switcher::continuation cont) { + auto *last_task = spawning_task_manager->active_task_; + auto *this_task = spawning_task_manager->active_task_->next_; + + last_task->continuation_ = std::move(cont); + spawning_task_manager->active_task_ = this_task; + + traded_cas_field expected_cas_value = spawning_task_manager->deque_.push_bot(last_task); + traded_cas_field empty_cas; + + lambda(); + auto *syncing_task_manager = &thread_state::get().get_task_manager(); + + if (last_task->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { + // Fast path, simply continue execution where we left of before spawn. + // This requires no coordination with the resource stack. + syncing_task_manager->active_task_ = last_task; + syncing_task_manager->deque_.popped_bot(); + return std::move(last_task->continuation_); + } else { + // Slow path, the continuation was stolen. + // First empty our own deque (everything below must have been stolen already). + syncing_task_manager->deque_.empty_deque(); + + context_switcher::continuation result_cont; + if (syncing_task_manager->try_clean_return(result_cont)) { + // We return back to the main scheduling loop + return result_cont; } else { - clean_chain = pop_resource_from_task(last_task); + // We finish up the last task and are the sole owner again + return result_cont; } } - } - - if (clean_chain != nullptr) { - // We got a clean chain to continue working on. - PLS_ASSERT(this_task->prev_->depth_ == clean_chain->depth_, - "Resources must only reside in the correct depth!"); - this_task->prev_ = clean_chain; - clean_chain->next_ = this_task; - // Walk back chain to make first task active - active_task_ = clean_chain; - while (active_task_->prev_ != nullptr) { - active_task_ = active_task_->prev_; - } - - // jump back to continuation in main scheduling loop, time to steal some work - return std::move(thread_state::get().get_main_continuation()); - } else { - // We are the last one working on this task. Thus the sync must be finished, continue working. - active_task_ = last_task; - return std::move(last_task->continuation_); - } - } - - PLS_ERROR("Slow Path/Stealing not implemented!"); - }); + }); if (continuation.valid()) { // We jumped in here from the main loop, keep track! diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index de91dd4..ac85e6e 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -70,6 +70,8 @@ void scheduler::work_thread_work_section() { } while (!work_section_done_) { + PLS_ASSERT(my_task_manager.check_task_chain(), "Must start stealing with a clean task chain."); + // Steal Routine (will be continuously executed when there are no more fall through's). // TODO: move into separate function const size_t offset = my_state.get_rand() % num_threads; @@ -78,45 +80,42 @@ void scheduler::work_thread_work_section() { // Perform steal size_t target = (offset + i) % num_threads; auto &target_state = my_state.get_scheduler().thread_state_for(target); - auto *stolen_task = target_state.get_task_manager().steal_task(my_task_manager); - - // Handle successful steal - if (stolen_task != nullptr) { - // Adapt our task chain - // Note: This differs from how it worked before. The aquiring of new chains happens - // right at the steal. Whenever we start to work on an continuation we aquire the full - // 'dirty' chain below it. We fix this up at the sync points later on by popping of the resource stack. - auto *exchanged_task = &my_task_manager.get_active_task(); - for (unsigned j = 0; j < stolen_task->depth_; j++) { - exchanged_task = exchanged_task->next_; - } - auto *next_own_task = exchanged_task->next_; - - next_own_task->prev_ = stolen_task; - stolen_task->next_ = next_own_task; - - my_task_manager.set_active_task(stolen_task); - - // move the traded in resource of this active task over to the stack of resources. - my_task_manager.push_resource_on_task(stolen_task, exchanged_task); - traded_cas_field empty_field; - traded_cas_field expected_field; - expected_field.fill_with_trade_object(exchanged_task); - if (stolen_task->traded_field_.compare_exchange_strong(expected_field, empty_field)) { - // All good, nothing more to do - } else { - // The last other active thread took it as its spare resource... - // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). - PLS_ASSERT(expected_field.is_empty(), - "Must be empty, as otherwise no one will steal the 'spare traded task'."); - - auto current_root = stolen_task->resource_stack_root_.load(); - current_root.stamp++; - current_root.value = 0; - stolen_task->resource_stack_root_.store(current_root); + bool steal_success = target_state.get_task_manager().steal_task(my_task_manager); + + if (steal_success) { + // The stealing procedure correctly changed our chain and active task. + // Now we need to perform the 'post steal' actions (manage resources and execute the stolen task). + PLS_ASSERT(my_task_manager.check_task_chain_forward(&my_task_manager.get_active_task()), + "We are sole owner of this chain, it has to be valid!"); + + // Move the traded in resource of this active task over to the stack of resources. + auto *stolen_task = &my_task_manager.get_active_task(); + traded_cas_field stolen_task_cas = stolen_task->traded_field_.load(); + + if (stolen_task_cas.is_filled_with_object()) { + // Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns. + auto *exchanged_task = stolen_task_cas.get_trade_object(); + + my_task_manager.push_resource_on_task(stolen_task, exchanged_task); + traded_cas_field empty_field; + traded_cas_field expected_field; + expected_field.fill_with_trade_object(exchanged_task); + if (stolen_task->traded_field_.compare_exchange_strong(expected_field, empty_field)) { + // All good, nothing more to do + } else { + // The last other active thread took it as its spare resource... + // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). + PLS_ASSERT(expected_field.is_empty(), + "Must be empty, as otherwise no one will steal the 'spare traded task'."); + + auto current_root = stolen_task->resource_stack_root_.load(); + current_root.stamp++; + current_root.value = 0; + stolen_task->resource_stack_root_.store(current_root); + } } - // execute the stolen task by jumping to it's continuation. + // Execute the stolen task by jumping to it's continuation. PLS_ASSERT(stolen_task->continuation_.valid(), "A task that we can steal must have a valid continuation for us to start working."); context_switcher::switch_context(std::move(stolen_task->continuation_)); @@ -149,6 +148,10 @@ void scheduler::terminate() { thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); } +void scheduler::sync() { + thread_state::get().get_task_manager().sync(); +} + } } } diff --git a/lib/pls/src/internal/scheduling/task_manager.cpp b/lib/pls/src/internal/scheduling/task_manager.cpp index 30bfcce..f538ce7 100644 --- a/lib/pls/src/internal/scheduling/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/task_manager.cpp @@ -32,27 +32,7 @@ static task *find_task(unsigned id, unsigned depth) { return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth); } -void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) { - data_structures::stamped_integer current_root; - data_structures::stamped_integer target_root; - do { - current_root = target_task->resource_stack_root_.load(); - target_root.stamp = current_root.stamp + 1; - target_root.value = spare_task_chain->thread_id_ + 1; - - if (current_root.value == 0) { - // Empty, simply push in with no successor - spare_task_chain->resource_stack_next_ = nullptr; - } else { - // Already an entry. Find it's corresponding task and set it as our successor. - auto *current_root_task = find_task(current_root.value, target_task->depth_); - spare_task_chain->resource_stack_next_ = current_root_task; - } - - } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); -} - -task *task_manager::steal_task(task_manager &stealing_task_manager) { +bool task_manager::steal_task(task_manager &stealing_task_manager) { PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain."); auto peek = deque_.peek_top(); @@ -60,23 +40,65 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) { auto target_top = std::get<1>(peek); if (optional_target_task) { + PLS_ASSERT(stealing_task_manager.check_task_chain(), "We are stealing, must not have a bad chain here!"); + + // search for the task we want to trade in task *target_task = *optional_target_task; task *traded_task = stealing_task_manager.active_task_; for (unsigned i = 0; i < target_task->depth_; i++) { traded_task = traded_task->next_; } + // keep a reference to the rest of the task chain that we keep + task *next_own_task = traded_task->next_; + // 'unchain' the traded tasks (to help us find bugs only) + traded_task->next_ = nullptr; + auto optional_result_task = deque_.pop_top(traded_task, target_top); if (optional_result_task) { - return *optional_result_task; + PLS_ASSERT(*optional_result_task == target_task, "We must only steal the task that we peeked at!"); + // the steal was a success, link the chain so we own the stolen part + target_task->next_ = next_own_task; + next_own_task->prev_ = target_task; + stealing_task_manager.set_active_task(target_task); + + return true; } else { - return nullptr; + // the steal failed, reset our chain to its old, clean state (re-link what we have broken) + traded_task->next_ = next_own_task; + + return false; } } else { - return nullptr; + return false; } } +void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) { + PLS_ASSERT(check_task_chain_backward(spare_task_chain), "Must only push proper task chains."); + PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_, + "Makes no sense to push task onto itself, as it is not clean by definition."); + PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth."); + + data_structures::stamped_integer current_root; + data_structures::stamped_integer target_root; + do { + current_root = target_task->resource_stack_root_.load(); + target_root.stamp = current_root.stamp + 1; + target_root.value = spare_task_chain->thread_id_ + 1; + + if (current_root.value == 0) { + // Empty, simply push in with no successor + spare_task_chain->resource_stack_next_ = nullptr; + } else { + // Already an entry. Find it's corresponding task and set it as our successor. + auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); + spare_task_chain->resource_stack_next_ = current_root_task; + } + + } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); +} + task *task_manager::pop_resource_from_task(task *target_task) { data_structures::stamped_integer current_root; data_structures::stamped_integer target_root; @@ -91,15 +113,122 @@ task *task_manager::pop_resource_from_task(task *target_task) { } else { // Found something, try to pop it auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); - target_root.value = current_root_task->next_ != nullptr ? current_root_task->next_->thread_id_ + 1 : 0; + auto *next_stack_task = current_root_task->resource_stack_next_; + target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; output_task = current_root_task; } } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); + PLS_ASSERT(check_task_chain_backward(output_task), "Must only pop proper task chains."); return output_task; } +void task_manager::sync() { + auto continuation = active_task_->next_->run_as_task([this](context_switcher::continuation cont) { + auto *last_task = active_task_; + auto *this_task = active_task_->next_; + + last_task->continuation_ = std::move(cont); + active_task_ = this_task; + + context_switcher::continuation result_cont; + if (try_clean_return(result_cont)) { + // We return back to the main scheduling loop + active_task_->clean_ = true; + return result_cont; + } else { + // We finish up the last task + active_task_->clean_ = false; + return result_cont; + } + }); + + if (continuation.valid()) { + // We jumped in here from the main loop, keep track! + thread_state::get().set_main_continuation(std::move(continuation)); + } +} + +bool task_manager::try_clean_return(context_switcher::continuation &result_cont) { + task *this_task = active_task_; + task *last_task = active_task_->prev_; + + if (last_task == nullptr) { + // We finished the final task of the computation, return to the scheduling loop. + result_cont = thread_state::get().get_main_continuation(); + return true; + } + + // Try to get a clean resource chain to go back to the main stealing loop + task *clean_chain = pop_resource_from_task(last_task); + if (clean_chain == nullptr) { + // double-check if we are really last one or we only have unlucky timing + auto cas_field = last_task->traded_field_.load(); + if (cas_field.is_filled_with_object()) { + traded_cas_field empty_target; + if (last_task->traded_field_.compare_exchange_strong(cas_field, empty_target)) { + clean_chain = cas_field.get_trade_object(); + } else { + clean_chain = pop_resource_from_task(last_task); + } + } + } + + if (clean_chain != nullptr) { + // We got a clean chain to continue working on. + PLS_ASSERT(last_task->depth_ == clean_chain->depth_, + "Resources must only reside in the correct depth!"); + PLS_ASSERT(check_task_chain_backward(clean_chain), "Can only aquire clean chains for clean returns!"); + this_task->prev_ = clean_chain; + clean_chain->next_ = this_task; + // Walk back chain to make first task active + active_task_ = clean_chain; + while (active_task_->prev_ != nullptr) { + active_task_ = active_task_->prev_; + } + + PLS_ASSERT(check_task_chain(), "We just aquired a clean chain..."); + + // jump back to continuation in main scheduling loop, time to steal some work + result_cont = thread_state::get().get_main_continuation(); + return true; + } else { + // We are the last one working on this task. Thus the sync must be finished, continue working. + active_task_ = last_task; + + // Make sure that we are owner fo this full continuation/task chain. + active_task_->next_ = this_task; + this_task->prev_ = active_task_; + + result_cont = std::move(last_task->continuation_); + return false; + } +} + +bool task_manager::check_task_chain_forward(task *start_task) { + while (start_task->next_ != nullptr) { + PLS_ASSERT(start_task->next_->prev_ == start_task, "Chain must have correct prev/next fields for linked list!"); + start_task = start_task->next_; + } + return true; +} + +bool task_manager::check_task_chain_backward(task *start_task) { + while (start_task->prev_ != nullptr) { + PLS_ASSERT(start_task->prev_->next_ == start_task, "Chain must have correct prev/next fields for linked list!"); + start_task = start_task->prev_; + } + return true; +} + +bool task_manager::check_task_chain() { + check_task_chain_backward(active_task_); + check_task_chain_forward(active_task_); + + return true; +} + } } } -- libgit2 0.26.0