From 82ccdc910c7b934e8c9c5e98ac30374f13048e23 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Tue, 30 Jun 2020 14:18:09 +0200 Subject: [PATCH] Update: add spawn_and_sync to final version for benchmarks. --- app/benchmark_fft/main.cpp | 3 +-- app/benchmark_fib/main.cpp | 9 ++------- lib/pls/include/pls/algorithms/invoke_impl.h | 6 ++---- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 9 +++++---- lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp | 12 ++++++++++-- lib/pls/src/internal/scheduling/scheduler.cpp | 13 ++++++++----- 6 files changed, 28 insertions(+), 24 deletions(-) diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index 50f4419..03090d4 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -18,10 +18,9 @@ void pls_conquer(fft::complex_vector::iterator data, fft::complex_vector::iterat pls::spawn([data, n, swap_array]() { pls_conquer(data, swap_array, n / 2); }); - pls::spawn([data, n, swap_array]() { + pls::spawn_and_sync([data, n, swap_array]() { pls_conquer(data + n / 2, swap_array + n / 2, n / 2); }); - pls::sync(); } fft::combine(data, n); diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 6e3452b..d46ff8f 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -1,11 +1,7 @@ #include "pls/pls.h" #include "benchmark_runner.h" -#include "benchmark_base/fib.h" -using namespace comparison_benchmarks::base; - -constexpr int MAX_NUM_TASKS = 32; constexpr int MAX_STACK_SIZE = 4096 * 1; int pls_fib(int n) { @@ -20,10 +16,9 @@ int pls_fib(int n) { pls::spawn([n, &a]() { a = pls_fib(n - 1); }); - pls::spawn([n, &b]() { + pls::spawn_and_sync([n, &b]() { b = pls_fib(n - 2); }); - pls::sync(); return a + b; } @@ -35,7 +30,7 @@ int main(int argc, char **argv) { string full_directory = settings.output_directory_ + "/PLS_v3/"; benchmark_runner runner{full_directory, test_name}; - pls::scheduler scheduler{(unsigned) settings.num_threads_, MAX_NUM_TASKS, MAX_STACK_SIZE}; + pls::scheduler scheduler{(unsigned) settings.num_threads_, settings.size_ + 2, MAX_STACK_SIZE}; volatile int res; if (settings.type_ == benchmark_runner::benchmark_settings::ISOLATED) { diff --git a/lib/pls/include/pls/algorithms/invoke_impl.h b/lib/pls/include/pls/algorithms/invoke_impl.h index 83c4126..d1562ae 100644 --- a/lib/pls/include/pls/algorithms/invoke_impl.h +++ b/lib/pls/include/pls/algorithms/invoke_impl.h @@ -11,8 +11,7 @@ void invoke(Function1 &&function1, Function2 &&function2) { using namespace ::pls::internal::scheduling; scheduler::spawn(std::forward(function1)); - scheduler::spawn(std::forward(function2)); - scheduler::sync(); + scheduler::spawn_and_sync(std::forward(function2)); } template @@ -21,8 +20,7 @@ void invoke(Function1 &&function1, Function2 &&function2, Function3 &&function3) scheduler::spawn(std::forward(function1)); scheduler::spawn(std::forward(function2)); - scheduler::spawn(std::forward(function3)); - scheduler::sync(); + scheduler::spawn_and_sync(std::forward(function3)); } } diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 8101c05..28ddf35 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -207,8 +207,8 @@ void scheduler::spawn_internal(Function &&lambda) { spawning_state.get_task_manager().push_local_task(last_task); #if PLS_SLEEP_WORKERS_ON_EMPTY - // TODO: relax atomic operations on empty flag - data_structures::stamped_integer queue_empty_flag = spawning_state.get_queue_empty_flag().load(); + data_structures::stamped_integer + queue_empty_flag = spawning_state.get_queue_empty_flag().load(std::memory_order_relaxed); switch (queue_empty_flag.value_) { case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: { // The queue was not found empty, ignore it. @@ -218,7 +218,8 @@ void scheduler::spawn_internal(Function &&lambda) { // Someone tries to mark us empty and might be re-stealing right now. data_structures::stamped_integer queue_non_empty_flag{queue_empty_flag.stamp_++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; - auto actual_empty_flag = spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag); + auto actual_empty_flag = + spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag, std::memory_order_acq_rel); if (actual_empty_flag.value_ == EMPTY_QUEUE_STATE::QUEUE_EMPTY) { spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake(); } @@ -228,7 +229,7 @@ void scheduler::spawn_internal(Function &&lambda) { // Someone already marked the queue empty, we must revert its action on the central queue. data_structures::stamped_integer queue_non_empty_flag{queue_empty_flag.stamp_++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; - spawning_state.get_queue_empty_flag().store(queue_non_empty_flag); + spawning_state.get_queue_empty_flag().store(queue_non_empty_flag, std::memory_order_release); spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake(); break; } diff --git a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp index 9bf2e4e..f8b21d7 100644 --- a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp @@ -114,8 +114,16 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul if (forwarding_stamp != expected_top.stamp_) { // ...we failed because the top tag lags behind...try to fix it. // This means only updating the tag, as this location can still hold data we need. - top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value_}, std::memory_order_relaxed); - return nullptr; + data_structures::stamped_integer forwarded_top{forwarding_stamp, expected_top.value_}; + if (top_.compare_exchange_strong(expected_top, + forwarded_top, + std::memory_order_relaxed)) { + // We could update the top as needed, continue the pop request + expected_top = forwarded_top; + } else { + // We did not get to update the top tag, back off + return nullptr; + } } // Try to get it by CAS with the expected field entry, giving up our offered_task for it diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 6596c6f..279b166 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -68,8 +68,8 @@ void scheduler::work_thread_work_section() { thread_state &target_state = my_state.get_scheduler().thread_state_for(target); #if PLS_SLEEP_WORKERS_ON_EMPTY queue_empty_flag_retry_steal: - // TODO: relax atomics for empty flag - data_structures::stamped_integer target_queue_empty_flag = target_state.get_queue_empty_flag().load(); + data_structures::stamped_integer + target_queue_empty_flag = target_state.get_queue_empty_flag().load(std::memory_order_relaxed); #endif base_task *stolen_task; @@ -93,7 +93,7 @@ void scheduler::work_thread_work_section() { strain_local_resource::acquire_locally(stolen_resources, my_state.get_thread_id()); PLS_ASSERT_EXPENSIVE(check_task_chain_forward(*my_state.get_active_task()), - "We are sole owner of this chain, it has to be valid!"); + "We are sole owner of this chain, it has to be valid!"); // Execute the stolen task by jumping to it's continuation. PLS_ASSERT(stolen_task->continuation_.valid(), @@ -125,7 +125,8 @@ void scheduler::work_thread_work_section() { data_structures::stamped_integer maybe_empty_flag{target_queue_empty_flag.stamp_ + 1, EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY}; if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, - maybe_empty_flag)) { + maybe_empty_flag, + std::memory_order_acq_rel)) { goto queue_empty_flag_retry_steal; } break; @@ -135,7 +136,9 @@ void scheduler::work_thread_work_section() { // We can safely mark it empty and increment the central counter. data_structures::stamped_integer empty_flag{target_queue_empty_flag.stamp_ + 1, EMPTY_QUEUE_STATE::QUEUE_EMPTY}; - if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, empty_flag)) { + if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, + empty_flag, + std::memory_order_acq_rel)) { // We marked it empty, now its our duty to modify the central counter my_state.get_scheduler().empty_queue_increase_counter(); } -- libgit2 0.26.0