diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 63581f0..7e3976b 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -12,7 +12,10 @@ using namespace pls::internal::scheduling; using namespace comparison_benchmarks::base; int pls_fib(int n) { - if (n <= 1) { + if (n == 0) { + return 0; + } + if (n == 1) { return 1; } @@ -23,14 +26,18 @@ int pls_fib(int n) { scheduler::spawn([n, &b]() { b = pls_fib(n - 2); }); - + scheduler::sync(); return a + b; } -constexpr int MAX_NUM_THREADS = 8; +constexpr int MAX_NUM_THREADS = 4; constexpr int MAX_NUM_TASKS = 64; -constexpr int MAX_STACK_SIZE = 4096; +constexpr int MAX_STACK_SIZE = 4096 * 8; + +static_scheduler_memory global_scheduler_memory; int main(int argc, char **argv) { int num_threads; @@ -41,26 +48,22 @@ int main(int argc, char **argv) { string full_directory = directory + "/PLS_v3/"; benchmark_runner runner{full_directory, test_name}; - static_scheduler_memory static_scheduler_memory; - - scheduler scheduler{static_scheduler_memory, (unsigned) num_threads}; + scheduler scheduler{global_scheduler_memory, (unsigned) num_threads}; volatile int res; - for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) { - scheduler.perform_work([&]() { + scheduler.perform_work([&]() { + for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) { res = pls_fib(fib::INPUT_N); - }); - } + } + }); - for (int i = 0; i < fib::NUM_ITERATIONS; i++) { - scheduler.perform_work([&]() { + scheduler.perform_work([&]() { + for (int i = 0; i < fib::NUM_ITERATIONS; i++) { runner.start_iteration(); res = pls_fib(fib::INPUT_N); runner.end_iteration(); - }); - } + } + }); runner.commit_results(true); return 0; diff --git a/cmake/SetupThreadSanitizer.cmake b/cmake/SetupThreadSanitizer.cmake index 4a5debd..e833aa0 100644 --- a/cmake/SetupThreadSanitizer.cmake +++ b/cmake/SetupThreadSanitizer.cmake @@ -7,4 +7,4 @@ if(THREAD_SANITIZER) add_compile_options(-fsanitize=thread -g) set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread") endif() -message("-- Thread Sanitizer: ${THREAD_SANITIZER}") \ No newline at end of file +message("-- Thread Sanitizer: ${THREAD_SANITIZER}") diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 1fb55ac..5cb8f81 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -55,6 +55,7 @@ class scheduler { template static void spawn(Function &&lambda); + static void sync(); thread_state &thread_state_for(size_t id); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 0064006..2d6d347 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -25,7 +25,9 @@ class scheduler::init_function_impl : public init_function { public: explicit init_function_impl(F &function) : function_{function} {} void run() override { - thread_state::get().get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) { + auto &root_task = thread_state::get().get_task_manager().get_active_task(); + root_task.clean_ = true; + root_task.run_as_task([&](context_switcher::continuation cont) { thread_state::get().set_main_continuation(std::move(cont)); function_(); thread_state::get().get_scheduler().work_section_done_.store(true); diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 5664046..4dc5881 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -58,6 +58,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { std::atomic traded_field_{}; task *resource_stack_next_{}; std::atomic resource_stack_root_{{0, 0}}; + bool clean_; // Task Tree (we have a parent that we want to continue when we finish) task *parent_task_; diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index e001f44..b4de8d7 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -49,8 +49,15 @@ class task_manager { template void spawn_child(F &&lambda); + void sync(); - task *steal_task(task_manager &stealing_task_manager); + bool steal_task(task_manager &stealing_task_manager); + + bool try_clean_return(context_switcher::continuation &result_cont); + + bool check_task_chain_forward(task *start_task); + bool check_task_chain_backward(task *start_task); + bool check_task_chain(); private: size_t num_tasks_; diff --git a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h index 15811f2..84ce60e 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h @@ -18,69 +18,42 @@ namespace scheduling { template void task_manager::spawn_child(F &&lambda) { - auto continuation = active_task_->next_->run_as_task([lambda, this](context_switcher::continuation cont) { - auto *last_task = active_task_; - auto *this_task = active_task_->next_; - - last_task->continuation_ = std::move(cont); - active_task_ = this_task; - - traded_cas_field expected_cas_value = deque_.push_bot(last_task); - traded_cas_field empty_cas; - - lambda(); - - if (last_task->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { - // Fast path, simply continue execution where we left of before spawn. - // This requires no coordination with the resource stack. - active_task_ = last_task; - deque_.popped_bot(); - return std::move(last_task->continuation_); - } else { - // Slow path, the continuation was stolen. - // First empty our own deque (everything below must have been stolen already). - deque_.empty_deque(); - - // TODO: This whole process can be taken out into a function, no need to copy it in every lambda. - // Also split smaller functions out, this is a mess right now... - // Try to get a clean resource chain to go back to the main stealing loop - task *clean_chain = pop_resource_from_task(last_task); - if (clean_chain == nullptr) { - // double-check if we are really last one or we only have unlucky timing - auto cas_field = last_task->traded_field_.load(); - if (cas_field.is_filled_with_object()) { - traded_cas_field empty_target; - if (last_task->traded_field_.compare_exchange_strong(cas_field, empty_target)) { - clean_chain = cas_field.get_trade_object(); + auto *spawning_task_manager = this; + auto continuation = + active_task_->next_->run_as_task([lambda, spawning_task_manager](context_switcher::continuation cont) { + auto *last_task = spawning_task_manager->active_task_; + auto *this_task = spawning_task_manager->active_task_->next_; + + last_task->continuation_ = std::move(cont); + spawning_task_manager->active_task_ = this_task; + + traded_cas_field expected_cas_value = spawning_task_manager->deque_.push_bot(last_task); + traded_cas_field empty_cas; + + lambda(); + auto *syncing_task_manager = &thread_state::get().get_task_manager(); + + if (last_task->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { + // Fast path, simply continue execution where we left of before spawn. + // This requires no coordination with the resource stack. + syncing_task_manager->active_task_ = last_task; + syncing_task_manager->deque_.popped_bot(); + return std::move(last_task->continuation_); + } else { + // Slow path, the continuation was stolen. + // First empty our own deque (everything below must have been stolen already). + syncing_task_manager->deque_.empty_deque(); + + context_switcher::continuation result_cont; + if (syncing_task_manager->try_clean_return(result_cont)) { + // We return back to the main scheduling loop + return result_cont; } else { - clean_chain = pop_resource_from_task(last_task); + // We finish up the last task and are the sole owner again + return result_cont; } } - } - - if (clean_chain != nullptr) { - // We got a clean chain to continue working on. - PLS_ASSERT(this_task->prev_->depth_ == clean_chain->depth_, - "Resources must only reside in the correct depth!"); - this_task->prev_ = clean_chain; - clean_chain->next_ = this_task; - // Walk back chain to make first task active - active_task_ = clean_chain; - while (active_task_->prev_ != nullptr) { - active_task_ = active_task_->prev_; - } - - // jump back to continuation in main scheduling loop, time to steal some work - return std::move(thread_state::get().get_main_continuation()); - } else { - // We are the last one working on this task. Thus the sync must be finished, continue working. - active_task_ = last_task; - return std::move(last_task->continuation_); - } - } - - PLS_ERROR("Slow Path/Stealing not implemented!"); - }); + }); if (continuation.valid()) { // We jumped in here from the main loop, keep track! diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index de91dd4..ac85e6e 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -70,6 +70,8 @@ void scheduler::work_thread_work_section() { } while (!work_section_done_) { + PLS_ASSERT(my_task_manager.check_task_chain(), "Must start stealing with a clean task chain."); + // Steal Routine (will be continuously executed when there are no more fall through's). // TODO: move into separate function const size_t offset = my_state.get_rand() % num_threads; @@ -78,45 +80,42 @@ void scheduler::work_thread_work_section() { // Perform steal size_t target = (offset + i) % num_threads; auto &target_state = my_state.get_scheduler().thread_state_for(target); - auto *stolen_task = target_state.get_task_manager().steal_task(my_task_manager); - - // Handle successful steal - if (stolen_task != nullptr) { - // Adapt our task chain - // Note: This differs from how it worked before. The aquiring of new chains happens - // right at the steal. Whenever we start to work on an continuation we aquire the full - // 'dirty' chain below it. We fix this up at the sync points later on by popping of the resource stack. - auto *exchanged_task = &my_task_manager.get_active_task(); - for (unsigned j = 0; j < stolen_task->depth_; j++) { - exchanged_task = exchanged_task->next_; - } - auto *next_own_task = exchanged_task->next_; - - next_own_task->prev_ = stolen_task; - stolen_task->next_ = next_own_task; - - my_task_manager.set_active_task(stolen_task); - - // move the traded in resource of this active task over to the stack of resources. - my_task_manager.push_resource_on_task(stolen_task, exchanged_task); - traded_cas_field empty_field; - traded_cas_field expected_field; - expected_field.fill_with_trade_object(exchanged_task); - if (stolen_task->traded_field_.compare_exchange_strong(expected_field, empty_field)) { - // All good, nothing more to do - } else { - // The last other active thread took it as its spare resource... - // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). - PLS_ASSERT(expected_field.is_empty(), - "Must be empty, as otherwise no one will steal the 'spare traded task'."); - - auto current_root = stolen_task->resource_stack_root_.load(); - current_root.stamp++; - current_root.value = 0; - stolen_task->resource_stack_root_.store(current_root); + bool steal_success = target_state.get_task_manager().steal_task(my_task_manager); + + if (steal_success) { + // The stealing procedure correctly changed our chain and active task. + // Now we need to perform the 'post steal' actions (manage resources and execute the stolen task). + PLS_ASSERT(my_task_manager.check_task_chain_forward(&my_task_manager.get_active_task()), + "We are sole owner of this chain, it has to be valid!"); + + // Move the traded in resource of this active task over to the stack of resources. + auto *stolen_task = &my_task_manager.get_active_task(); + traded_cas_field stolen_task_cas = stolen_task->traded_field_.load(); + + if (stolen_task_cas.is_filled_with_object()) { + // Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns. + auto *exchanged_task = stolen_task_cas.get_trade_object(); + + my_task_manager.push_resource_on_task(stolen_task, exchanged_task); + traded_cas_field empty_field; + traded_cas_field expected_field; + expected_field.fill_with_trade_object(exchanged_task); + if (stolen_task->traded_field_.compare_exchange_strong(expected_field, empty_field)) { + // All good, nothing more to do + } else { + // The last other active thread took it as its spare resource... + // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). + PLS_ASSERT(expected_field.is_empty(), + "Must be empty, as otherwise no one will steal the 'spare traded task'."); + + auto current_root = stolen_task->resource_stack_root_.load(); + current_root.stamp++; + current_root.value = 0; + stolen_task->resource_stack_root_.store(current_root); + } } - // execute the stolen task by jumping to it's continuation. + // Execute the stolen task by jumping to it's continuation. PLS_ASSERT(stolen_task->continuation_.valid(), "A task that we can steal must have a valid continuation for us to start working."); context_switcher::switch_context(std::move(stolen_task->continuation_)); @@ -149,6 +148,10 @@ void scheduler::terminate() { thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); } +void scheduler::sync() { + thread_state::get().get_task_manager().sync(); +} + } } } diff --git a/lib/pls/src/internal/scheduling/task_manager.cpp b/lib/pls/src/internal/scheduling/task_manager.cpp index 30bfcce..f538ce7 100644 --- a/lib/pls/src/internal/scheduling/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/task_manager.cpp @@ -32,27 +32,7 @@ static task *find_task(unsigned id, unsigned depth) { return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth); } -void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) { - data_structures::stamped_integer current_root; - data_structures::stamped_integer target_root; - do { - current_root = target_task->resource_stack_root_.load(); - target_root.stamp = current_root.stamp + 1; - target_root.value = spare_task_chain->thread_id_ + 1; - - if (current_root.value == 0) { - // Empty, simply push in with no successor - spare_task_chain->resource_stack_next_ = nullptr; - } else { - // Already an entry. Find it's corresponding task and set it as our successor. - auto *current_root_task = find_task(current_root.value, target_task->depth_); - spare_task_chain->resource_stack_next_ = current_root_task; - } - - } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); -} - -task *task_manager::steal_task(task_manager &stealing_task_manager) { +bool task_manager::steal_task(task_manager &stealing_task_manager) { PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain."); auto peek = deque_.peek_top(); @@ -60,23 +40,65 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) { auto target_top = std::get<1>(peek); if (optional_target_task) { + PLS_ASSERT(stealing_task_manager.check_task_chain(), "We are stealing, must not have a bad chain here!"); + + // search for the task we want to trade in task *target_task = *optional_target_task; task *traded_task = stealing_task_manager.active_task_; for (unsigned i = 0; i < target_task->depth_; i++) { traded_task = traded_task->next_; } + // keep a reference to the rest of the task chain that we keep + task *next_own_task = traded_task->next_; + // 'unchain' the traded tasks (to help us find bugs only) + traded_task->next_ = nullptr; + auto optional_result_task = deque_.pop_top(traded_task, target_top); if (optional_result_task) { - return *optional_result_task; + PLS_ASSERT(*optional_result_task == target_task, "We must only steal the task that we peeked at!"); + // the steal was a success, link the chain so we own the stolen part + target_task->next_ = next_own_task; + next_own_task->prev_ = target_task; + stealing_task_manager.set_active_task(target_task); + + return true; } else { - return nullptr; + // the steal failed, reset our chain to its old, clean state (re-link what we have broken) + traded_task->next_ = next_own_task; + + return false; } } else { - return nullptr; + return false; } } +void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) { + PLS_ASSERT(check_task_chain_backward(spare_task_chain), "Must only push proper task chains."); + PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_, + "Makes no sense to push task onto itself, as it is not clean by definition."); + PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth."); + + data_structures::stamped_integer current_root; + data_structures::stamped_integer target_root; + do { + current_root = target_task->resource_stack_root_.load(); + target_root.stamp = current_root.stamp + 1; + target_root.value = spare_task_chain->thread_id_ + 1; + + if (current_root.value == 0) { + // Empty, simply push in with no successor + spare_task_chain->resource_stack_next_ = nullptr; + } else { + // Already an entry. Find it's corresponding task and set it as our successor. + auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); + spare_task_chain->resource_stack_next_ = current_root_task; + } + + } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); +} + task *task_manager::pop_resource_from_task(task *target_task) { data_structures::stamped_integer current_root; data_structures::stamped_integer target_root; @@ -91,15 +113,122 @@ task *task_manager::pop_resource_from_task(task *target_task) { } else { // Found something, try to pop it auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); - target_root.value = current_root_task->next_ != nullptr ? current_root_task->next_->thread_id_ + 1 : 0; + auto *next_stack_task = current_root_task->resource_stack_next_; + target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; output_task = current_root_task; } } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); + PLS_ASSERT(check_task_chain_backward(output_task), "Must only pop proper task chains."); return output_task; } +void task_manager::sync() { + auto continuation = active_task_->next_->run_as_task([this](context_switcher::continuation cont) { + auto *last_task = active_task_; + auto *this_task = active_task_->next_; + + last_task->continuation_ = std::move(cont); + active_task_ = this_task; + + context_switcher::continuation result_cont; + if (try_clean_return(result_cont)) { + // We return back to the main scheduling loop + active_task_->clean_ = true; + return result_cont; + } else { + // We finish up the last task + active_task_->clean_ = false; + return result_cont; + } + }); + + if (continuation.valid()) { + // We jumped in here from the main loop, keep track! + thread_state::get().set_main_continuation(std::move(continuation)); + } +} + +bool task_manager::try_clean_return(context_switcher::continuation &result_cont) { + task *this_task = active_task_; + task *last_task = active_task_->prev_; + + if (last_task == nullptr) { + // We finished the final task of the computation, return to the scheduling loop. + result_cont = thread_state::get().get_main_continuation(); + return true; + } + + // Try to get a clean resource chain to go back to the main stealing loop + task *clean_chain = pop_resource_from_task(last_task); + if (clean_chain == nullptr) { + // double-check if we are really last one or we only have unlucky timing + auto cas_field = last_task->traded_field_.load(); + if (cas_field.is_filled_with_object()) { + traded_cas_field empty_target; + if (last_task->traded_field_.compare_exchange_strong(cas_field, empty_target)) { + clean_chain = cas_field.get_trade_object(); + } else { + clean_chain = pop_resource_from_task(last_task); + } + } + } + + if (clean_chain != nullptr) { + // We got a clean chain to continue working on. + PLS_ASSERT(last_task->depth_ == clean_chain->depth_, + "Resources must only reside in the correct depth!"); + PLS_ASSERT(check_task_chain_backward(clean_chain), "Can only aquire clean chains for clean returns!"); + this_task->prev_ = clean_chain; + clean_chain->next_ = this_task; + // Walk back chain to make first task active + active_task_ = clean_chain; + while (active_task_->prev_ != nullptr) { + active_task_ = active_task_->prev_; + } + + PLS_ASSERT(check_task_chain(), "We just aquired a clean chain..."); + + // jump back to continuation in main scheduling loop, time to steal some work + result_cont = thread_state::get().get_main_continuation(); + return true; + } else { + // We are the last one working on this task. Thus the sync must be finished, continue working. + active_task_ = last_task; + + // Make sure that we are owner fo this full continuation/task chain. + active_task_->next_ = this_task; + this_task->prev_ = active_task_; + + result_cont = std::move(last_task->continuation_); + return false; + } +} + +bool task_manager::check_task_chain_forward(task *start_task) { + while (start_task->next_ != nullptr) { + PLS_ASSERT(start_task->next_->prev_ == start_task, "Chain must have correct prev/next fields for linked list!"); + start_task = start_task->next_; + } + return true; +} + +bool task_manager::check_task_chain_backward(task *start_task) { + while (start_task->prev_ != nullptr) { + PLS_ASSERT(start_task->prev_->next_ == start_task, "Chain must have correct prev/next fields for linked list!"); + start_task = start_task->prev_; + } + return true; +} + +bool task_manager::check_task_chain() { + check_task_chain_backward(active_task_); + check_task_chain_forward(active_task_); + + return true; +} + } } }