From 08bc78550934d4ebb68f9f582ab9b5509a0e6955 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Sat, 4 Jul 2020 17:55:45 +0200 Subject: [PATCH] Minor optimizations to the thread sleeping mechanism. --- lib/pls/include/pls/internal/scheduling/scheduler.h | 8 +++++++- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 50 +++----------------------------------------------- lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp | 11 ++--------- lib/pls/src/internal/scheduling/scheduler.cpp | 121 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------- 4 files changed, 83 insertions(+), 107 deletions(-) diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index f6219c5..833f2d3 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -120,7 +120,13 @@ class scheduler { lambda(); #else if (thread_state::is_scheduler_active()) { +#if PLS_PROFILING_ENABLED + // no nee to re-do the profiling annotations for the optimized path + spawn_internal(std::forward(lambda)); + sync_internal(); +#else spawn_and_sync_internal(std::forward(lambda)); +#endif } else { lambda(); } @@ -206,11 +212,11 @@ class scheduler { #if PLS_SLEEP_WORKERS_ON_EMPTY PLS_CACHE_ALIGN std::atomic empty_queue_counter_{0}; - std::atomic threads_sleeping_{0}; void empty_queue_try_sleep_worker(); void empty_queue_increase_counter(); void empty_queue_decrease_counter_and_wake(); + void empty_queue_reset_and_wake_all(); #endif }; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index eeaa771..937c71a 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -102,8 +102,7 @@ class scheduler::init_function_impl : public init_function { PLS_ASSERT(thread_state::get().main_continuation().valid(), "Must return valid continuation from main task."); #if PLS_SLEEP_WORKERS_ON_EMPTY - thread_state::get().get_scheduler().empty_queue_counter_.store(-1000); - thread_state::get().get_scheduler().empty_queue_decrease_counter_and_wake(); + thread_state::get().get_scheduler().empty_queue_reset_and_wake_all(); #endif #if PLS_PROFILING_ENABLED @@ -134,9 +133,9 @@ void scheduler::perform_work(Function work_section) { root_task->profiling_node_ = root_node; #endif #if PLS_SLEEP_WORKERS_ON_EMPTY - empty_queue_counter_.store(0); + empty_queue_counter_.store(0, std::memory_order_relaxed); for (auto &thread_state : thread_states_) { - thread_state->get_queue_empty_flag().store({EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}); + thread_state->get_queue_empty_flag().store({EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}, std::memory_order_relaxed); } #endif @@ -321,15 +320,6 @@ void scheduler::spawn_and_sync_internal(Function &&lambda) { auto *attached_resources = last_task->attached_resources_.load(std::memory_order_relaxed); spawned_task->attached_resources_.store(attached_resources, std::memory_order_relaxed); -#if PLS_PROFILING_ENABLED - spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(), - spawned_task->stack_memory_, - spawned_task->stack_size_); - auto *child_dag_node = spawning_state.get_scheduler().profiler_.task_spawn_child(spawning_state.get_thread_id(), - last_task->profiling_node_); - spawned_task->profiling_node_ = child_dag_node; -#endif - auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) { // allow stealing threads to continue the last task. last_task->continuation_ = std::move(cont); @@ -339,19 +329,6 @@ void scheduler::spawn_and_sync_internal(Function &&lambda) { spawning_state.set_active_task(spawned_task); // execute the lambda itself, which could lead to a different thread returning. -#if PLS_PROFILING_ENABLED - spawning_state.get_scheduler().profiler_.task_finish_stack_measure(spawning_state.get_thread_id(), - last_task->stack_memory_, - last_task->stack_size_, - last_task->profiling_node_); - spawning_state.get_scheduler().profiler_.task_stop_running(spawning_state.get_thread_id(), - last_task->profiling_node_); - auto *next_dag_node = - spawning_state.get_scheduler().profiler_.task_sync(spawning_state.get_thread_id(), last_task->profiling_node_); - last_task->profiling_node_ = next_dag_node; - spawning_state.get_scheduler().profiler_.task_start_running(spawning_state.get_thread_id(), - spawned_task->profiling_node_); -#endif lambda(); thread_state &syncing_state = thread_state::get(); @@ -364,35 +341,14 @@ void scheduler::spawn_and_sync_internal(Function &&lambda) { "Fast path, no one can have continued working on the last task."); syncing_state.set_active_task(last_task); -#if PLS_PROFILING_ENABLED - syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), - spawned_task->stack_memory_, - spawned_task->stack_size_, - spawned_task->profiling_node_); - syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), - spawned_task->profiling_node_); -#endif return std::move(last_task->continuation_); } else { // Slow path, the last task was stolen. This path is common to sync() events. -#if PLS_PROFILING_ENABLED - syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), - spawned_task->stack_memory_, - spawned_task->stack_size_, - spawned_task->profiling_node_); - syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), - spawned_task->profiling_node_); -#endif auto continuation = slow_return(syncing_state, false); return continuation; } }); -#if PLS_PROFILING_ENABLED - thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), - last_task->profiling_node_); -#endif - PLS_ASSERT(!continuation.valid(), "We must not jump to a not-published (synced) spawn."); } diff --git a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp index f8b21d7..2e4dbed 100644 --- a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp @@ -115,15 +115,8 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul // ...we failed because the top tag lags behind...try to fix it. // This means only updating the tag, as this location can still hold data we need. data_structures::stamped_integer forwarded_top{forwarding_stamp, expected_top.value_}; - if (top_.compare_exchange_strong(expected_top, - forwarded_top, - std::memory_order_relaxed)) { - // We could update the top as needed, continue the pop request - expected_top = forwarded_top; - } else { - // We did not get to update the top tag, back off - return nullptr; - } + top_.compare_exchange_strong(expected_top, forwarded_top, std::memory_order_relaxed); + return nullptr; // We might be in a consistent state again, however, the peeked task is no longer valid! } // Try to get it by CAS with the expected field entry, giving up our offered_task for it diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 279b166..35915f4 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -53,10 +53,19 @@ void scheduler::work_thread_work_section() { main_thread_starter_function_->run(); } - unsigned int failed_steals = 0; while (!work_section_done_) { -#if PLS_PROFILING_ENABLED - my_state.get_scheduler().profiler_.stealing_start(my_state.get_thread_id()); +#if PLS_SLEEP_WORKERS_ON_EMPTY + // Mark us empty when beginning to steal, this spares another thread from finding us. + data_structures::stamped_integer my_empty_flag = my_state.get_queue_empty_flag().load(std::memory_order_relaxed); + if (my_empty_flag.value_ != EMPTY_QUEUE_STATE::QUEUE_EMPTY) { + data_structures::stamped_integer target_emtpy_flag{my_empty_flag.stamp_, EMPTY_QUEUE_STATE::QUEUE_EMPTY}; + if (my_state.get_queue_empty_flag().compare_exchange_strong(my_empty_flag, + target_emtpy_flag, + std::memory_order_relaxed)) { + // Only increase the counter if we got to mark us empty (could be that someone else already marked us!) + empty_queue_increase_counter(); + } + } #endif PLS_ASSERT_EXPENSIVE(check_task_chain(*my_state.get_active_task()), "Must start stealing with a clean task chain."); @@ -68,8 +77,11 @@ void scheduler::work_thread_work_section() { thread_state &target_state = my_state.get_scheduler().thread_state_for(target); #if PLS_SLEEP_WORKERS_ON_EMPTY queue_empty_flag_retry_steal: - data_structures::stamped_integer - target_queue_empty_flag = target_state.get_queue_empty_flag().load(std::memory_order_relaxed); + data_structures::stamped_integer target_queue_empty_flag = + target_state.get_queue_empty_flag().load(std::memory_order_relaxed); +#endif +#if PLS_PROFILING_ENABLED + my_state.get_scheduler().profiler_.stealing_start(my_state.get_thread_id()); #endif base_task *stolen_task; @@ -106,53 +118,51 @@ void scheduler::work_thread_work_section() { #endif context_switcher::switch_context(std::move(stolen_task->continuation_)); // We will continue execution in this line when we finished the stolen work. - failed_steals = 0; } else { - // TODO: tune value for when we start yielding - const unsigned YIELD_AFTER = 1; - - failed_steals++; - if (failed_steals >= YIELD_AFTER) { - std::this_thread::yield(); + // Always yield on failed steals + std::this_thread::yield(); #if PLS_PROFILING_ENABLED - my_state.get_scheduler().profiler_.stealing_end(my_state.get_thread_id(), false); + my_state.get_scheduler().profiler_.stealing_end(my_state.get_thread_id(), false); #endif #if PLS_SLEEP_WORKERS_ON_EMPTY - switch (target_queue_empty_flag.value_) { - case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: { - // We found the queue empty, but the flag says it should still be full. - // We want to declare it empty, bet we need to re-check the queue in a sub-step to avoid races. - data_structures::stamped_integer - maybe_empty_flag{target_queue_empty_flag.stamp_ + 1, EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY}; - if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, - maybe_empty_flag, - std::memory_order_acq_rel)) { - goto queue_empty_flag_retry_steal; - } - break; + switch (target_queue_empty_flag.value_) { + case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: { + // We found the queue empty, but the flag says it should still be full. + // We want to declare it empty, bet we need to re-check the queue in a sub-step to avoid races. + data_structures::stamped_integer + maybe_empty_flag{target_queue_empty_flag.stamp_ + 1, EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY}; + if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, + maybe_empty_flag, + std::memory_order_acq_rel)) { + goto queue_empty_flag_retry_steal; } - case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: { - // We found the queue empty and it was already marked as maybe empty. - // We can safely mark it empty and increment the central counter. - data_structures::stamped_integer - empty_flag{target_queue_empty_flag.stamp_ + 1, EMPTY_QUEUE_STATE::QUEUE_EMPTY}; - if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, - empty_flag, - std::memory_order_acq_rel)) { - // We marked it empty, now its our duty to modify the central counter - my_state.get_scheduler().empty_queue_increase_counter(); - } - break; - } - case EMPTY_QUEUE_STATE::QUEUE_EMPTY: { - // The queue was already marked empty, just do nothing - break; + break; + } + case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: { + // We found the queue empty and it was already marked as maybe empty. + // We can safely mark it empty and increment the central counter. + data_structures::stamped_integer + empty_flag{target_queue_empty_flag.stamp_ + 1, EMPTY_QUEUE_STATE::QUEUE_EMPTY}; + if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, + empty_flag, + std::memory_order_acq_rel)) { + // We marked it empty, now its our duty to modify the central counter + my_state.get_scheduler().empty_queue_increase_counter(); } + break; + } + case EMPTY_QUEUE_STATE::QUEUE_EMPTY: { + // The queue was already marked empty, just do nothing + break; + } + default: { + PLS_ASSERT(false, "The sleeping flag only has three possible states!"); + break; } - // Disregarding if we found the thread empty, we should check if we can put ourself to sleep - my_state.get_scheduler().empty_queue_try_sleep_worker(); -#endif } + // Disregarding if we found the thread empty, we should check if we can put ourself to sleep + my_state.get_scheduler().empty_queue_try_sleep_worker(); +#endif } } my_state.set_scheduler_active(false); @@ -326,13 +336,15 @@ bool scheduler::check_task_chain(base_task &start_task) { void scheduler::empty_queue_try_sleep_worker() { int32_t counter_value = empty_queue_counter_.load(); - if (counter_value == num_threads()) { + if ((int) counter_value >= (int) num_threads()) { #if PLS_PROFILING_ENABLED get_profiler().sleep_start(thread_state::get().get_thread_id()); #endif - threads_sleeping_++; - base::futex_wait((int32_t *) &empty_queue_counter_, num_threads()); - threads_sleeping_--; + + // Start sleeping + base::futex_wait((int32_t *) &empty_queue_counter_, counter_value); + // Stop sleeping and wake up others + std::this_thread::yield(); base::futex_wakeup((int32_t *) &empty_queue_counter_, 1); #if PLS_PROFILING_ENABLED get_profiler().sleep_stop(thread_state::get().get_thread_id()); @@ -345,11 +357,20 @@ void scheduler::empty_queue_increase_counter() { } void scheduler::empty_queue_decrease_counter_and_wake() { - empty_queue_counter_.fetch_sub(1); - if (threads_sleeping_.load() > 0) { + auto old_counter = empty_queue_counter_.fetch_sub(1, std::memory_order_acq_rel); + if ((int) old_counter >= (int) num_threads()) { base::futex_wakeup((int32_t *) &empty_queue_counter_, 1); } } + +void scheduler::empty_queue_reset_and_wake_all() { + if (num_threads() == 1) { + return; + } + + empty_queue_counter_.store(-1000); + base::futex_wakeup((int32_t *) &empty_queue_counter_, 1); +} #endif } -- libgit2 0.26.0