From a26720bd1b0bebfeefb6528ae88e3047ac4306a5 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Tue, 26 May 2020 13:23:31 +0200 Subject: [PATCH] Add first, experimental version of 'sleep on empty flag'. --- lib/pls/CMakeLists.txt | 2 +- lib/pls/include/pls/internal/base/futex_wrapper.h | 28 ++++++++++++++++++++++++++++ lib/pls/include/pls/internal/build_flavour.h | 7 +++++++ lib/pls/include/pls/internal/scheduling/scheduler.h | 11 +++++++++++ lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 40 ++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/thread_state.h | 21 +++++++++++++++++++++ lib/pls/src/internal/scheduling/scheduler.cpp | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 7 files changed, 185 insertions(+), 5 deletions(-) create mode 100644 lib/pls/include/pls/internal/base/futex_wrapper.h create mode 100644 lib/pls/include/pls/internal/build_flavour.h diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index f100c1c..9124753 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -43,7 +43,7 @@ add_library(pls STATIC include/pls/internal/profiling/dag_node.h src/internal/profiling/dag_node.cpp include/pls/internal/profiling/profiler.h - include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp src/internal/profiling/profiler.cpp) + include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp src/internal/profiling/profiler.cpp include/pls/internal/build_flavour.h include/pls/internal/base/futex_wrapper.h) # Dependencies for pls target_link_libraries(pls Threads::Threads) diff --git a/lib/pls/include/pls/internal/base/futex_wrapper.h b/lib/pls/include/pls/internal/base/futex_wrapper.h new file mode 100644 index 0000000..e07f2c0 --- /dev/null +++ b/lib/pls/include/pls/internal/base/futex_wrapper.h @@ -0,0 +1,28 @@ + +#ifndef PLS_INTERNAL_BASE_FUTEX_WRAPPER_H_ +#define PLS_INTERNAL_BASE_FUTEX_WRAPPER_H_ + +#include +#include +#include +#include +#include +#include + +namespace pls::internal::base { + +int futex(int *uaddr, int futex_op, int val, const struct timespec *timeout, int *uaddr2, int val3) { + return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3); +} + +int futex_wait(int32_t *uaddr, int32_t val) { + return futex(uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0); +} + +int futex_wakeup(int32_t *uaddr, int num_waked) { + return futex(uaddr, FUTEX_WAKE_PRIVATE, num_waked, nullptr, nullptr, 0); +} + +} + +#endif //PLS_INTERNAL_BASE_FUTEX_WRAPPER_H_ diff --git a/lib/pls/include/pls/internal/build_flavour.h b/lib/pls/include/pls/internal/build_flavour.h new file mode 100644 index 0000000..c0c7294 --- /dev/null +++ b/lib/pls/include/pls/internal/build_flavour.h @@ -0,0 +1,7 @@ + +#ifndef PLS_INTERNAL_BUILD_FLAVOUR_H_ +#define PLS_INTERNAL_BUILD_FLAVOUR_H_ + +#define PLS_SLEEP_WORKERS_ON_EMPTY true + +#endif //PLS_INTERNAL_BUILD_FLAVOUR_H_ diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index b9affc5..7b39276 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -7,6 +7,8 @@ #include #include +#include "pls/internal/build_flavour.h" + #include "pls/internal/base/barrier.h" #include "pls/internal/base/stack_allocator.h" @@ -130,6 +132,15 @@ class scheduler { std::shared_ptr stack_allocator_; +#if PLS_SLEEP_WORKERS_ON_EMPTY + PLS_CACHE_ALIGN std::atomic empty_queue_counter_{0}; + PLS_CACHE_ALIGN std::atomic threads_sleeping_{0}; + + void empty_queue_try_sleep_worker(); + void empty_queue_increase_counter(); + void empty_queue_decrease_counter_and_wake(); +#endif + #if PLS_PROFILING_ENABLED profiling::profiler profiler_; #endif diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 55bb8e3..71f6166 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -86,6 +86,11 @@ class scheduler::init_function_impl : public init_function { thread_state::get().get_scheduler().work_section_done_.store(true); PLS_ASSERT(thread_state::get().main_continuation().valid(), "Must return valid continuation from main task."); +#if PLS_SLEEP_WORKERS_ON_EMPTY + thread_state::get().get_scheduler().empty_queue_counter_.store(-1000); + thread_state::get().get_scheduler().empty_queue_decrease_counter_and_wake(); +#endif + #if PLS_PROFILING_ENABLED thread_state::get().get_scheduler().profiler_.task_finish_stack_measure(thread_state::get().get_thread_id(), root_task->stack_memory_, @@ -113,6 +118,13 @@ void scheduler::perform_work(Function work_section) { auto *root_node = profiler_.start_profiler_run(); root_task->profiling_node_ = root_node; #endif +#if PLS_SLEEP_WORKERS_ON_EMPTY + empty_queue_counter_.store(0); + for (auto &thread_state : thread_states_) { + thread_state->get_queue_empty_flag().store({EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}); + } +#endif + work_section_done_ = false; if (reuse_thread_) { auto &my_state = thread_state_for(0); @@ -155,6 +167,34 @@ void scheduler::spawn(Function &&lambda) { spawned_task->is_synchronized_ = true; spawning_state.set_active_task(spawned_task); spawning_state.get_task_manager().push_local_task(last_task); +#if PLS_SLEEP_WORKERS_ON_EMPTY + // TODO: relax atomic operations on empty flag + data_structures::stamped_integer queue_empty_flag = spawning_state.get_queue_empty_flag().load(); + switch (queue_empty_flag.value) { + case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: { + // The queue was not found empty, ignore it. + break; + } + case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: { + // Someone tries to mark us empty and might be re-stealing right now. + data_structures::stamped_integer + queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; + auto actual_empty_flag = spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag); + if (actual_empty_flag.value == EMPTY_QUEUE_STATE::QUEUE_EMPTY) { + spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake(); + } + break; + } + case EMPTY_QUEUE_STATE::QUEUE_EMPTY: { + // Someone already marked the queue empty, we must revert its action on the central queue. + data_structures::stamped_integer + queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; + spawning_state.get_queue_empty_flag().store(queue_non_empty_flag); + spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake(); + break; + } + } +#endif // execute the lambda itself, which could lead to a different thread returning. #if PLS_PROFILING_ENABLED diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 5df607c..545bf34 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -2,19 +2,29 @@ #ifndef PLS_THREAD_STATE_H #define PLS_THREAD_STATE_H +#include #include #include #include #include "context_switcher/continuation.h" +#include "pls/internal/build_flavour.h" + #include "pls/internal/base/system_details.h" +#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/scheduling/base_task.h" #include "pls/internal/scheduling/task_manager.h" namespace pls::internal::scheduling { +enum EMPTY_QUEUE_STATE : int32_t { + QUEUE_NON_EMPTY = 0, + QUEUE_MAYBE_EMPTY = 1, + QUEUE_EMPTY = 2, +}; + class scheduler; /** * Proxy-Object for thread local state needed during scheduling. @@ -34,6 +44,10 @@ struct PLS_CACHE_ALIGN thread_state { std::minstd_rand random_; base_task *active_task_; +#if PLS_SLEEP_WORKERS_ON_EMPTY + PLS_CACHE_ALIGN std::atomic queue_empty_{EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; +#endif + public: explicit thread_state(scheduler &scheduler, unsigned thread_id, @@ -75,6 +89,13 @@ struct PLS_CACHE_ALIGN thread_state { void set_active_task(base_task *active_task) { active_task_ = active_task; } base_task *get_active_task() const { return active_task_; } + +#if PLS_SLEEP_WORKERS_ON_EMPTY + [[nodiscard]] std::atomic &get_queue_empty_flag() { + return queue_empty_; + }; +#endif + }; } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index e682e32..56c95d3 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -1,6 +1,9 @@ #include "pls/internal/scheduling/scheduler.h" #include "context_switcher/context_switcher.h" + +#include "pls/internal/build_flavour.h" +#include "pls/internal/base/futex_wrapper.h" #include "pls/internal/base/error_handling.h" #include @@ -59,6 +62,11 @@ void scheduler::work_thread_work_section() { } while (target == my_state.get_thread_id()); thread_state &target_state = my_state.get_scheduler().thread_state_for(target); +#if PLS_SLEEP_WORKERS_ON_EMPTY + queue_empty_flag_retry_steal: + // TODO: relax atomics for empty flag + data_structures::stamped_integer target_queue_empty_flag = target_state.get_queue_empty_flag().load(); +#endif base_task *stolen_task; base_task *chain_after_stolen_task; @@ -69,7 +77,7 @@ void scheduler::work_thread_work_section() { #endif std::tie(stolen_task, chain_after_stolen_task, cas_success) = target_state.get_task_manager().steal_task(my_state); - } while (!cas_success); + } while (!cas_success); // re-try on cas-conflicts, as in classic ws if (stolen_task) { // Keep task chain consistent. We want to appear as if we are working an a branch upwards of the stolen task. @@ -93,13 +101,48 @@ void scheduler::work_thread_work_section() { // We will continue execution in this line when we finished the stolen work. failed_steals = 0; } else { + // TODO: tune value for when we start yielding + const unsigned YIELD_AFTER = 1; + failed_steals++; - if (failed_steals >= num_threads) { + if (failed_steals >= YIELD_AFTER) { std::this_thread::yield(); - } #if PLS_PROFILING_ENABLED - my_state.get_scheduler().profiler_.stealing_end(my_state.get_thread_id(), false); + my_state.get_scheduler().profiler_.stealing_end(my_state.get_thread_id(), false); #endif +#if PLS_SLEEP_WORKERS_ON_EMPTY + switch (target_queue_empty_flag.value) { + case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: { + // We found the queue empty, but the flag says it should still be full. + // We want to declare it empty, bet we need to re-check the queue in a sub-step to avoid races. + data_structures::stamped_integer + maybe_empty_flag{target_queue_empty_flag.stamp + 1, EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY}; + if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, + maybe_empty_flag)) { + goto queue_empty_flag_retry_steal; + } + break; + } + case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: { + // We found the queue empty and it was already marked as maybe empty. + // We can safely mark it empty and increment the central counter. + data_structures::stamped_integer + empty_flag{target_queue_empty_flag.stamp + 1, EMPTY_QUEUE_STATE::QUEUE_EMPTY}; + if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, empty_flag)) { + // We marked it empty, now its our duty to modify the central counter + my_state.get_scheduler().empty_queue_increase_counter(); + } + break; + } + case EMPTY_QUEUE_STATE::QUEUE_EMPTY: { + // The queue was already marked empty, just do nothing + break; + } + } + // Disregarding if we found the thread empty, we should check if we can put ourself to sleep + my_state.get_scheduler().empty_queue_try_sleep_worker(); +#endif + } } } } @@ -253,4 +296,34 @@ bool scheduler::check_task_chain(base_task &start_task) { return check_task_chain_backward(start_task) && check_task_chain_forward(start_task); } +#if PLS_SLEEP_WORKERS_ON_EMPTY +// TODO: relax memory orderings + +void scheduler::empty_queue_try_sleep_worker() { + int32_t counter_value = empty_queue_counter_.load(); +// printf("try sleep thread %d...\n", counter_value); + if (counter_value == num_threads() - 1) { +// printf("sleeping thread...\n"); + threads_sleeping_++; + base::futex_wait((int32_t *) &empty_queue_counter_, num_threads() - 1); + base::futex_wakeup((int32_t *) &empty_queue_counter_, 1); + threads_sleeping_--; +// printf("waked thread...\n"); + } +} + +void scheduler::empty_queue_increase_counter() { + int32_t old_value = empty_queue_counter_.fetch_add(1); +// printf("increase counter from %d", old_value); +} + +void scheduler::empty_queue_decrease_counter_and_wake() { + empty_queue_counter_.fetch_sub(1); + if (threads_sleeping_.load() > 0) { + // Threads could be sleeping, we MUST wake them up + base::futex_wakeup((int32_t *) &empty_queue_counter_, 1); + } +} +#endif + } -- libgit2 0.26.0