Commit a26720bd by FritzFlorian

Add first, experimental version of 'sleep on empty flag'.

parent 3202323c
Pipeline #1485 failed with stages
in 61 minutes 2 seconds
...@@ -43,7 +43,7 @@ add_library(pls STATIC ...@@ -43,7 +43,7 @@ add_library(pls STATIC
include/pls/internal/profiling/dag_node.h src/internal/profiling/dag_node.cpp include/pls/internal/profiling/dag_node.h src/internal/profiling/dag_node.cpp
include/pls/internal/profiling/profiler.h include/pls/internal/profiling/profiler.h
include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp src/internal/profiling/profiler.cpp) include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp src/internal/profiling/profiler.cpp include/pls/internal/build_flavour.h include/pls/internal/base/futex_wrapper.h)
# Dependencies for pls # Dependencies for pls
target_link_libraries(pls Threads::Threads) target_link_libraries(pls Threads::Threads)
......
#ifndef PLS_INTERNAL_BASE_FUTEX_WRAPPER_H_
#define PLS_INTERNAL_BASE_FUTEX_WRAPPER_H_
#include <unistd.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <ctime>
#include <cerrno>
#include <cstdint>
namespace pls::internal::base {
int futex(int *uaddr, int futex_op, int val, const struct timespec *timeout, int *uaddr2, int val3) {
return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
}
int futex_wait(int32_t *uaddr, int32_t val) {
return futex(uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0);
}
int futex_wakeup(int32_t *uaddr, int num_waked) {
return futex(uaddr, FUTEX_WAKE_PRIVATE, num_waked, nullptr, nullptr, 0);
}
}
#endif //PLS_INTERNAL_BASE_FUTEX_WRAPPER_H_
#ifndef PLS_INTERNAL_BUILD_FLAVOUR_H_
#define PLS_INTERNAL_BUILD_FLAVOUR_H_
#define PLS_SLEEP_WORKERS_ON_EMPTY true
#endif //PLS_INTERNAL_BUILD_FLAVOUR_H_
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
#include <vector> #include <vector>
#include <memory> #include <memory>
#include "pls/internal/build_flavour.h"
#include "pls/internal/base/barrier.h" #include "pls/internal/base/barrier.h"
#include "pls/internal/base/stack_allocator.h" #include "pls/internal/base/stack_allocator.h"
...@@ -130,6 +132,15 @@ class scheduler { ...@@ -130,6 +132,15 @@ class scheduler {
std::shared_ptr<base::stack_allocator> stack_allocator_; std::shared_ptr<base::stack_allocator> stack_allocator_;
#if PLS_SLEEP_WORKERS_ON_EMPTY
PLS_CACHE_ALIGN std::atomic<int32_t> empty_queue_counter_{0};
PLS_CACHE_ALIGN std::atomic<int32_t> threads_sleeping_{0};
void empty_queue_try_sleep_worker();
void empty_queue_increase_counter();
void empty_queue_decrease_counter_and_wake();
#endif
#if PLS_PROFILING_ENABLED #if PLS_PROFILING_ENABLED
profiling::profiler profiler_; profiling::profiler profiler_;
#endif #endif
......
...@@ -86,6 +86,11 @@ class scheduler::init_function_impl : public init_function { ...@@ -86,6 +86,11 @@ class scheduler::init_function_impl : public init_function {
thread_state::get().get_scheduler().work_section_done_.store(true); thread_state::get().get_scheduler().work_section_done_.store(true);
PLS_ASSERT(thread_state::get().main_continuation().valid(), "Must return valid continuation from main task."); PLS_ASSERT(thread_state::get().main_continuation().valid(), "Must return valid continuation from main task.");
#if PLS_SLEEP_WORKERS_ON_EMPTY
thread_state::get().get_scheduler().empty_queue_counter_.store(-1000);
thread_state::get().get_scheduler().empty_queue_decrease_counter_and_wake();
#endif
#if PLS_PROFILING_ENABLED #if PLS_PROFILING_ENABLED
thread_state::get().get_scheduler().profiler_.task_finish_stack_measure(thread_state::get().get_thread_id(), thread_state::get().get_scheduler().profiler_.task_finish_stack_measure(thread_state::get().get_thread_id(),
root_task->stack_memory_, root_task->stack_memory_,
...@@ -113,6 +118,13 @@ void scheduler::perform_work(Function work_section) { ...@@ -113,6 +118,13 @@ void scheduler::perform_work(Function work_section) {
auto *root_node = profiler_.start_profiler_run(); auto *root_node = profiler_.start_profiler_run();
root_task->profiling_node_ = root_node; root_task->profiling_node_ = root_node;
#endif #endif
#if PLS_SLEEP_WORKERS_ON_EMPTY
empty_queue_counter_.store(0);
for (auto &thread_state : thread_states_) {
thread_state->get_queue_empty_flag().store({EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY});
}
#endif
work_section_done_ = false; work_section_done_ = false;
if (reuse_thread_) { if (reuse_thread_) {
auto &my_state = thread_state_for(0); auto &my_state = thread_state_for(0);
...@@ -155,6 +167,34 @@ void scheduler::spawn(Function &&lambda) { ...@@ -155,6 +167,34 @@ void scheduler::spawn(Function &&lambda) {
spawned_task->is_synchronized_ = true; spawned_task->is_synchronized_ = true;
spawning_state.set_active_task(spawned_task); spawning_state.set_active_task(spawned_task);
spawning_state.get_task_manager().push_local_task(last_task); spawning_state.get_task_manager().push_local_task(last_task);
#if PLS_SLEEP_WORKERS_ON_EMPTY
// TODO: relax atomic operations on empty flag
data_structures::stamped_integer queue_empty_flag = spawning_state.get_queue_empty_flag().load();
switch (queue_empty_flag.value) {
case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: {
// The queue was not found empty, ignore it.
break;
}
case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: {
// Someone tries to mark us empty and might be re-stealing right now.
data_structures::stamped_integer
queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
auto actual_empty_flag = spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag);
if (actual_empty_flag.value == EMPTY_QUEUE_STATE::QUEUE_EMPTY) {
spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake();
}
break;
}
case EMPTY_QUEUE_STATE::QUEUE_EMPTY: {
// Someone already marked the queue empty, we must revert its action on the central queue.
data_structures::stamped_integer
queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
spawning_state.get_queue_empty_flag().store(queue_non_empty_flag);
spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake();
break;
}
}
#endif
// execute the lambda itself, which could lead to a different thread returning. // execute the lambda itself, which could lead to a different thread returning.
#if PLS_PROFILING_ENABLED #if PLS_PROFILING_ENABLED
......
...@@ -2,19 +2,29 @@ ...@@ -2,19 +2,29 @@
#ifndef PLS_THREAD_STATE_H #ifndef PLS_THREAD_STATE_H
#define PLS_THREAD_STATE_H #define PLS_THREAD_STATE_H
#include <atomic>
#include <random> #include <random>
#include <chrono> #include <chrono>
#include <utility> #include <utility>
#include "context_switcher/continuation.h" #include "context_switcher/continuation.h"
#include "pls/internal/build_flavour.h"
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/base_task.h" #include "pls/internal/scheduling/base_task.h"
#include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/task_manager.h"
namespace pls::internal::scheduling { namespace pls::internal::scheduling {
enum EMPTY_QUEUE_STATE : int32_t {
QUEUE_NON_EMPTY = 0,
QUEUE_MAYBE_EMPTY = 1,
QUEUE_EMPTY = 2,
};
class scheduler; class scheduler;
/** /**
* Proxy-Object for thread local state needed during scheduling. * Proxy-Object for thread local state needed during scheduling.
...@@ -34,6 +44,10 @@ struct PLS_CACHE_ALIGN thread_state { ...@@ -34,6 +44,10 @@ struct PLS_CACHE_ALIGN thread_state {
std::minstd_rand random_; std::minstd_rand random_;
base_task *active_task_; base_task *active_task_;
#if PLS_SLEEP_WORKERS_ON_EMPTY
PLS_CACHE_ALIGN std::atomic<data_structures::stamped_integer> queue_empty_{EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
#endif
public: public:
explicit thread_state(scheduler &scheduler, explicit thread_state(scheduler &scheduler,
unsigned thread_id, unsigned thread_id,
...@@ -75,6 +89,13 @@ struct PLS_CACHE_ALIGN thread_state { ...@@ -75,6 +89,13 @@ struct PLS_CACHE_ALIGN thread_state {
void set_active_task(base_task *active_task) { active_task_ = active_task; } void set_active_task(base_task *active_task) { active_task_ = active_task; }
base_task *get_active_task() const { return active_task_; } base_task *get_active_task() const { return active_task_; }
#if PLS_SLEEP_WORKERS_ON_EMPTY
[[nodiscard]] std::atomic<data_structures::stamped_integer> &get_queue_empty_flag() {
return queue_empty_;
};
#endif
}; };
} }
......
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "context_switcher/context_switcher.h" #include "context_switcher/context_switcher.h"
#include "pls/internal/build_flavour.h"
#include "pls/internal/base/futex_wrapper.h"
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include <thread> #include <thread>
...@@ -59,6 +62,11 @@ void scheduler::work_thread_work_section() { ...@@ -59,6 +62,11 @@ void scheduler::work_thread_work_section() {
} while (target == my_state.get_thread_id()); } while (target == my_state.get_thread_id());
thread_state &target_state = my_state.get_scheduler().thread_state_for(target); thread_state &target_state = my_state.get_scheduler().thread_state_for(target);
#if PLS_SLEEP_WORKERS_ON_EMPTY
queue_empty_flag_retry_steal:
// TODO: relax atomics for empty flag
data_structures::stamped_integer target_queue_empty_flag = target_state.get_queue_empty_flag().load();
#endif
base_task *stolen_task; base_task *stolen_task;
base_task *chain_after_stolen_task; base_task *chain_after_stolen_task;
...@@ -69,7 +77,7 @@ void scheduler::work_thread_work_section() { ...@@ -69,7 +77,7 @@ void scheduler::work_thread_work_section() {
#endif #endif
std::tie(stolen_task, chain_after_stolen_task, cas_success) = std::tie(stolen_task, chain_after_stolen_task, cas_success) =
target_state.get_task_manager().steal_task(my_state); target_state.get_task_manager().steal_task(my_state);
} while (!cas_success); } while (!cas_success); // re-try on cas-conflicts, as in classic ws
if (stolen_task) { if (stolen_task) {
// Keep task chain consistent. We want to appear as if we are working an a branch upwards of the stolen task. // Keep task chain consistent. We want to appear as if we are working an a branch upwards of the stolen task.
...@@ -93,13 +101,48 @@ void scheduler::work_thread_work_section() { ...@@ -93,13 +101,48 @@ void scheduler::work_thread_work_section() {
// We will continue execution in this line when we finished the stolen work. // We will continue execution in this line when we finished the stolen work.
failed_steals = 0; failed_steals = 0;
} else { } else {
// TODO: tune value for when we start yielding
const unsigned YIELD_AFTER = 1;
failed_steals++; failed_steals++;
if (failed_steals >= num_threads) { if (failed_steals >= YIELD_AFTER) {
std::this_thread::yield(); std::this_thread::yield();
}
#if PLS_PROFILING_ENABLED #if PLS_PROFILING_ENABLED
my_state.get_scheduler().profiler_.stealing_end(my_state.get_thread_id(), false); my_state.get_scheduler().profiler_.stealing_end(my_state.get_thread_id(), false);
#endif #endif
#if PLS_SLEEP_WORKERS_ON_EMPTY
switch (target_queue_empty_flag.value) {
case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: {
// We found the queue empty, but the flag says it should still be full.
// We want to declare it empty, bet we need to re-check the queue in a sub-step to avoid races.
data_structures::stamped_integer
maybe_empty_flag{target_queue_empty_flag.stamp + 1, EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY};
if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag,
maybe_empty_flag)) {
goto queue_empty_flag_retry_steal;
}
break;
}
case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: {
// We found the queue empty and it was already marked as maybe empty.
// We can safely mark it empty and increment the central counter.
data_structures::stamped_integer
empty_flag{target_queue_empty_flag.stamp + 1, EMPTY_QUEUE_STATE::QUEUE_EMPTY};
if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, empty_flag)) {
// We marked it empty, now its our duty to modify the central counter
my_state.get_scheduler().empty_queue_increase_counter();
}
break;
}
case EMPTY_QUEUE_STATE::QUEUE_EMPTY: {
// The queue was already marked empty, just do nothing
break;
}
}
// Disregarding if we found the thread empty, we should check if we can put ourself to sleep
my_state.get_scheduler().empty_queue_try_sleep_worker();
#endif
}
} }
} }
} }
...@@ -253,4 +296,34 @@ bool scheduler::check_task_chain(base_task &start_task) { ...@@ -253,4 +296,34 @@ bool scheduler::check_task_chain(base_task &start_task) {
return check_task_chain_backward(start_task) && check_task_chain_forward(start_task); return check_task_chain_backward(start_task) && check_task_chain_forward(start_task);
} }
#if PLS_SLEEP_WORKERS_ON_EMPTY
// TODO: relax memory orderings
void scheduler::empty_queue_try_sleep_worker() {
int32_t counter_value = empty_queue_counter_.load();
// printf("try sleep thread %d...\n", counter_value);
if (counter_value == num_threads() - 1) {
// printf("sleeping thread...\n");
threads_sleeping_++;
base::futex_wait((int32_t *) &empty_queue_counter_, num_threads() - 1);
base::futex_wakeup((int32_t *) &empty_queue_counter_, 1);
threads_sleeping_--;
// printf("waked thread...\n");
}
}
void scheduler::empty_queue_increase_counter() {
int32_t old_value = empty_queue_counter_.fetch_add(1);
// printf("increase counter from %d", old_value);
}
void scheduler::empty_queue_decrease_counter_and_wake() {
empty_queue_counter_.fetch_sub(1);
if (threads_sleeping_.load() > 0) {
// Threads could be sleeping, we MUST wake them up
base::futex_wakeup((int32_t *) &empty_queue_counter_, 1);
}
}
#endif
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment