Commit 08bc7855 by FritzFlorian

Minor optimizations to the thread sleeping mechanism.

parent 3ad3a3d4
Pipeline #1572 passed with stages
in 4 minutes 30 seconds
......@@ -120,7 +120,13 @@ class scheduler {
lambda();
#else
if (thread_state::is_scheduler_active()) {
#if PLS_PROFILING_ENABLED
// no nee to re-do the profiling annotations for the optimized path
spawn_internal(std::forward<Function>(lambda));
sync_internal();
#else
spawn_and_sync_internal(std::forward<Function>(lambda));
#endif
} else {
lambda();
}
......@@ -206,11 +212,11 @@ class scheduler {
#if PLS_SLEEP_WORKERS_ON_EMPTY
PLS_CACHE_ALIGN std::atomic<int32_t> empty_queue_counter_{0};
std::atomic<int32_t> threads_sleeping_{0};
void empty_queue_try_sleep_worker();
void empty_queue_increase_counter();
void empty_queue_decrease_counter_and_wake();
void empty_queue_reset_and_wake_all();
#endif
};
......
......@@ -102,8 +102,7 @@ class scheduler::init_function_impl : public init_function {
PLS_ASSERT(thread_state::get().main_continuation().valid(), "Must return valid continuation from main task.");
#if PLS_SLEEP_WORKERS_ON_EMPTY
thread_state::get().get_scheduler().empty_queue_counter_.store(-1000);
thread_state::get().get_scheduler().empty_queue_decrease_counter_and_wake();
thread_state::get().get_scheduler().empty_queue_reset_and_wake_all();
#endif
#if PLS_PROFILING_ENABLED
......@@ -134,9 +133,9 @@ void scheduler::perform_work(Function work_section) {
root_task->profiling_node_ = root_node;
#endif
#if PLS_SLEEP_WORKERS_ON_EMPTY
empty_queue_counter_.store(0);
empty_queue_counter_.store(0, std::memory_order_relaxed);
for (auto &thread_state : thread_states_) {
thread_state->get_queue_empty_flag().store({EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY});
thread_state->get_queue_empty_flag().store({EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}, std::memory_order_relaxed);
}
#endif
......@@ -321,15 +320,6 @@ void scheduler::spawn_and_sync_internal(Function &&lambda) {
auto *attached_resources = last_task->attached_resources_.load(std::memory_order_relaxed);
spawned_task->attached_resources_.store(attached_resources, std::memory_order_relaxed);
#if PLS_PROFILING_ENABLED
spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_);
auto *child_dag_node = spawning_state.get_scheduler().profiler_.task_spawn_child(spawning_state.get_thread_id(),
last_task->profiling_node_);
spawned_task->profiling_node_ = child_dag_node;
#endif
auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) {
// allow stealing threads to continue the last task.
last_task->continuation_ = std::move(cont);
......@@ -339,19 +329,6 @@ void scheduler::spawn_and_sync_internal(Function &&lambda) {
spawning_state.set_active_task(spawned_task);
// execute the lambda itself, which could lead to a different thread returning.
#if PLS_PROFILING_ENABLED
spawning_state.get_scheduler().profiler_.task_finish_stack_measure(spawning_state.get_thread_id(),
last_task->stack_memory_,
last_task->stack_size_,
last_task->profiling_node_);
spawning_state.get_scheduler().profiler_.task_stop_running(spawning_state.get_thread_id(),
last_task->profiling_node_);
auto *next_dag_node =
spawning_state.get_scheduler().profiler_.task_sync(spawning_state.get_thread_id(), last_task->profiling_node_);
last_task->profiling_node_ = next_dag_node;
spawning_state.get_scheduler().profiler_.task_start_running(spawning_state.get_thread_id(),
spawned_task->profiling_node_);
#endif
lambda();
thread_state &syncing_state = thread_state::get();
......@@ -364,35 +341,14 @@ void scheduler::spawn_and_sync_internal(Function &&lambda) {
"Fast path, no one can have continued working on the last task.");
syncing_state.set_active_task(last_task);
#if PLS_PROFILING_ENABLED
syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_,
spawned_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
spawned_task->profiling_node_);
#endif
return std::move(last_task->continuation_);
} else {
// Slow path, the last task was stolen. This path is common to sync() events.
#if PLS_PROFILING_ENABLED
syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_,
spawned_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
spawned_task->profiling_node_);
#endif
auto continuation = slow_return(syncing_state, false);
return continuation;
}
});
#if PLS_PROFILING_ENABLED
thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(),
last_task->profiling_node_);
#endif
PLS_ASSERT(!continuation.valid(), "We must not jump to a not-published (synced) spawn.");
}
......
......@@ -115,15 +115,8 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul
// ...we failed because the top tag lags behind...try to fix it.
// This means only updating the tag, as this location can still hold data we need.
data_structures::stamped_integer forwarded_top{forwarding_stamp, expected_top.value_};
if (top_.compare_exchange_strong(expected_top,
forwarded_top,
std::memory_order_relaxed)) {
// We could update the top as needed, continue the pop request
expected_top = forwarded_top;
} else {
// We did not get to update the top tag, back off
return nullptr;
}
top_.compare_exchange_strong(expected_top, forwarded_top, std::memory_order_relaxed);
return nullptr; // We might be in a consistent state again, however, the peeked task is no longer valid!
}
// Try to get it by CAS with the expected field entry, giving up our offered_task for it
......
......@@ -53,10 +53,19 @@ void scheduler::work_thread_work_section() {
main_thread_starter_function_->run();
}
unsigned int failed_steals = 0;
while (!work_section_done_) {
#if PLS_PROFILING_ENABLED
my_state.get_scheduler().profiler_.stealing_start(my_state.get_thread_id());
#if PLS_SLEEP_WORKERS_ON_EMPTY
// Mark us empty when beginning to steal, this spares another thread from finding us.
data_structures::stamped_integer my_empty_flag = my_state.get_queue_empty_flag().load(std::memory_order_relaxed);
if (my_empty_flag.value_ != EMPTY_QUEUE_STATE::QUEUE_EMPTY) {
data_structures::stamped_integer target_emtpy_flag{my_empty_flag.stamp_, EMPTY_QUEUE_STATE::QUEUE_EMPTY};
if (my_state.get_queue_empty_flag().compare_exchange_strong(my_empty_flag,
target_emtpy_flag,
std::memory_order_relaxed)) {
// Only increase the counter if we got to mark us empty (could be that someone else already marked us!)
empty_queue_increase_counter();
}
}
#endif
PLS_ASSERT_EXPENSIVE(check_task_chain(*my_state.get_active_task()), "Must start stealing with a clean task chain.");
......@@ -68,8 +77,11 @@ void scheduler::work_thread_work_section() {
thread_state &target_state = my_state.get_scheduler().thread_state_for(target);
#if PLS_SLEEP_WORKERS_ON_EMPTY
queue_empty_flag_retry_steal:
data_structures::stamped_integer
target_queue_empty_flag = target_state.get_queue_empty_flag().load(std::memory_order_relaxed);
data_structures::stamped_integer target_queue_empty_flag =
target_state.get_queue_empty_flag().load(std::memory_order_relaxed);
#endif
#if PLS_PROFILING_ENABLED
my_state.get_scheduler().profiler_.stealing_start(my_state.get_thread_id());
#endif
base_task *stolen_task;
......@@ -106,13 +118,8 @@ void scheduler::work_thread_work_section() {
#endif
context_switcher::switch_context(std::move(stolen_task->continuation_));
// We will continue execution in this line when we finished the stolen work.
failed_steals = 0;
} else {
// TODO: tune value for when we start yielding
const unsigned YIELD_AFTER = 1;
failed_steals++;
if (failed_steals >= YIELD_AFTER) {
// Always yield on failed steals
std::this_thread::yield();
#if PLS_PROFILING_ENABLED
my_state.get_scheduler().profiler_.stealing_end(my_state.get_thread_id(), false);
......@@ -148,13 +155,16 @@ void scheduler::work_thread_work_section() {
// The queue was already marked empty, just do nothing
break;
}
default: {
PLS_ASSERT(false, "The sleeping flag only has three possible states!");
break;
}
}
// Disregarding if we found the thread empty, we should check if we can put ourself to sleep
my_state.get_scheduler().empty_queue_try_sleep_worker();
#endif
}
}
}
my_state.set_scheduler_active(false);
}
......@@ -326,13 +336,15 @@ bool scheduler::check_task_chain(base_task &start_task) {
void scheduler::empty_queue_try_sleep_worker() {
int32_t counter_value = empty_queue_counter_.load();
if (counter_value == num_threads()) {
if ((int) counter_value >= (int) num_threads()) {
#if PLS_PROFILING_ENABLED
get_profiler().sleep_start(thread_state::get().get_thread_id());
#endif
threads_sleeping_++;
base::futex_wait((int32_t *) &empty_queue_counter_, num_threads());
threads_sleeping_--;
// Start sleeping
base::futex_wait((int32_t *) &empty_queue_counter_, counter_value);
// Stop sleeping and wake up others
std::this_thread::yield();
base::futex_wakeup((int32_t *) &empty_queue_counter_, 1);
#if PLS_PROFILING_ENABLED
get_profiler().sleep_stop(thread_state::get().get_thread_id());
......@@ -345,11 +357,20 @@ void scheduler::empty_queue_increase_counter() {
}
void scheduler::empty_queue_decrease_counter_and_wake() {
empty_queue_counter_.fetch_sub(1);
if (threads_sleeping_.load() > 0) {
auto old_counter = empty_queue_counter_.fetch_sub(1, std::memory_order_acq_rel);
if ((int) old_counter >= (int) num_threads()) {
base::futex_wakeup((int32_t *) &empty_queue_counter_, 1);
}
}
void scheduler::empty_queue_reset_and_wake_all() {
if (num_threads() == 1) {
return;
}
empty_queue_counter_.store(-1000);
base::futex_wakeup((int32_t *) &empty_queue_counter_, 1);
}
#endif
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment