diff --git a/README.md b/README.md index 2318a7f..79b6a7a 100644 --- a/README.md +++ b/README.md @@ -141,8 +141,21 @@ Testing is done using [Catch2](https://github.com/catchorg/Catch2/) in the test subfolder. Tests are build into a target called `tests` and can be executed simply by building this executabe and running it. +### PLS profiler + +The PLS profiler records the DAG for each scheduler invocation. +Stats can be queried form it and it can be printed in .dot format, +which can later be rendered by the dot software to inspect the actual +executed graph. + +The most useful tools are to analyze the maximum memory required per +coroutine stack, th computational depth, T_1 and T_inf. + ### Data Race Detection +WARNING: the latest build of clang/thread sanitizer is required for this to work, +as a recent bug-fix regarding user level thread is required! + As this project contains a lot concurrent code we use [Thread Sanitizer](https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual) in our CI process and optional in other builds. To setup CMake builds diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index 18b05ce..3ba38ea 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -1,7 +1,5 @@ #include "pls/pls.h" -using namespace pls; - #include "benchmark_runner.h" #include "benchmark_base/fft.h" @@ -17,13 +15,13 @@ void pls_conquer(fft::complex_vector::iterator data, fft::complex_vector::iterat fft::conquer(data, swap_array, n / 2); fft::conquer(data + n / 2, swap_array + n / 2, n / 2); } else { - spawn([data, n, swap_array]() { + pls::spawn([data, n, swap_array]() { pls_conquer(data, swap_array, n / 2); }); - spawn([data, n, swap_array]() { + pls::spawn([data, n, swap_array]() { pls_conquer(data + n / 2, swap_array + n / 2, n / 2); }); - sync(); + pls::sync(); } fft::combine(data, n); @@ -45,7 +43,7 @@ int main(int argc, char **argv) { fft::complex_vector swap_array(fft::SIZE); fft::fill_input(data); - scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE}; + pls::scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE}; // scheduler.get_profiler().disable_memory_measure(); runner.run_iterations(10, [&]() { diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index d0a4e40..67492d3 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -1,7 +1,5 @@ #include "pls/pls.h" -using namespace pls; - #include #include "benchmark_runner.h" @@ -18,13 +16,13 @@ int pls_fib(int n) { } int a, b; - spawn([n, &a]() { + pls::spawn([n, &a]() { a = pls_fib(n - 1); }); - spawn([n, &b]() { + pls::spawn([n, &b]() { b = pls_fib(n - 2); }); - sync(); + pls::sync(); return a + b; } @@ -41,7 +39,7 @@ int main(int argc, char **argv) { string full_directory = directory + "/PLS_v3/"; benchmark_runner runner{full_directory, test_name}; - scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE}; + pls::scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE}; volatile int res; // scheduler.get_profiler().disable_memory_measure(); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index fdd2c99..41b33cb 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -79,12 +79,24 @@ class scheduler { * @param lambda the lambda to be executed in parallel. */ template - static void spawn(Function &&lambda); + static void spawn(Function &&lambda) { +#ifdef PLS_SERIAL_ELUSION + lambda(); +#else + spawn_internal(std::forward(lambda)); +#endif + } /** * Waits for all potentially parallel child tasks created with spawn(...). */ - static void sync(); + static void sync() { +#ifdef PLS_SERIAL_ELUSION + return; +#else + sync_internal(); +#endif + } /** * Explicitly terminate the worker threads. Scheduler must not be used after this. @@ -108,6 +120,10 @@ class scheduler { #endif private: + template + static void spawn_internal(Function &&lambda); + static void sync_internal(); + static context_switcher::continuation slow_return(thread_state &calling_state); static void work_thread_main_loop(); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 71f6166..19cbd88 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -31,7 +31,7 @@ scheduler::scheduler(unsigned int num_threads, terminated_{false}, stack_allocator_{std::make_shared(std::forward(stack_allocator))} #if PLS_PROFILING_ENABLED - , profiler_{num_threads} +, profiler_{num_threads} #endif { @@ -144,119 +144,124 @@ void scheduler::perform_work(Function work_section) { } template -void scheduler::spawn(Function &&lambda) { - thread_state &spawning_state = thread_state::get(); +void scheduler::spawn_internal(Function &&lambda) { + if (thread_state::is_scheduler_active()) { + thread_state &spawning_state = thread_state::get(); - base_task *last_task = spawning_state.get_active_task(); - base_task *spawned_task = last_task->next_; + base_task *last_task = spawning_state.get_active_task(); + base_task *spawned_task = last_task->next_; #if PLS_PROFILING_ENABLED - spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(), - spawned_task->stack_memory_, - spawned_task->stack_size_); - auto *child_dag_node = spawning_state.get_scheduler().profiler_.task_spawn_child(spawning_state.get_thread_id(), - last_task->profiling_node_); - spawned_task->profiling_node_ = child_dag_node; + spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(), + spawned_task->stack_memory_, + spawned_task->stack_size_); + auto *child_dag_node = spawning_state.get_scheduler().profiler_.task_spawn_child(spawning_state.get_thread_id(), + last_task->profiling_node_); + spawned_task->profiling_node_ = child_dag_node; #endif - auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) { - // allow stealing threads to continue the last task. - last_task->continuation_ = std::move(cont); + auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) { + // allow stealing threads to continue the last task. + last_task->continuation_ = std::move(cont); - // we are now executing the new task, allow others to steal the last task continuation. - spawned_task->is_synchronized_ = true; - spawning_state.set_active_task(spawned_task); - spawning_state.get_task_manager().push_local_task(last_task); + // we are now executing the new task, allow others to steal the last task continuation. + spawned_task->is_synchronized_ = true; + spawning_state.set_active_task(spawned_task); + spawning_state.get_task_manager().push_local_task(last_task); #if PLS_SLEEP_WORKERS_ON_EMPTY - // TODO: relax atomic operations on empty flag - data_structures::stamped_integer queue_empty_flag = spawning_state.get_queue_empty_flag().load(); - switch (queue_empty_flag.value) { - case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: { - // The queue was not found empty, ignore it. - break; - } - case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: { - // Someone tries to mark us empty and might be re-stealing right now. - data_structures::stamped_integer - queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; - auto actual_empty_flag = spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag); - if (actual_empty_flag.value == EMPTY_QUEUE_STATE::QUEUE_EMPTY) { + // TODO: relax atomic operations on empty flag + data_structures::stamped_integer queue_empty_flag = spawning_state.get_queue_empty_flag().load(); + switch (queue_empty_flag.value) { + case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: { + // The queue was not found empty, ignore it. + break; + } + case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: { + // Someone tries to mark us empty and might be re-stealing right now. + data_structures::stamped_integer + queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; + auto actual_empty_flag = spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag); + if (actual_empty_flag.value == EMPTY_QUEUE_STATE::QUEUE_EMPTY) { + spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake(); + } + break; + } + case EMPTY_QUEUE_STATE::QUEUE_EMPTY: { + // Someone already marked the queue empty, we must revert its action on the central queue. + data_structures::stamped_integer + queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; + spawning_state.get_queue_empty_flag().store(queue_non_empty_flag); spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake(); + break; } - break; - } - case EMPTY_QUEUE_STATE::QUEUE_EMPTY: { - // Someone already marked the queue empty, we must revert its action on the central queue. - data_structures::stamped_integer - queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; - spawning_state.get_queue_empty_flag().store(queue_non_empty_flag); - spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake(); - break; } - } #endif - // execute the lambda itself, which could lead to a different thread returning. + // execute the lambda itself, which could lead to a different thread returning. #if PLS_PROFILING_ENABLED - spawning_state.get_scheduler().profiler_.task_stop_running(spawning_state.get_thread_id(), - last_task->profiling_node_); - spawning_state.get_scheduler().profiler_.task_start_running(spawning_state.get_thread_id(), - spawned_task->profiling_node_); + spawning_state.get_scheduler().profiler_.task_stop_running(spawning_state.get_thread_id(), + last_task->profiling_node_); + spawning_state.get_scheduler().profiler_.task_start_running(spawning_state.get_thread_id(), + spawned_task->profiling_node_); #endif - lambda(); - - thread_state &syncing_state = thread_state::get(); - PLS_ASSERT(syncing_state.get_active_task() == spawned_task, - "Task manager must always point its active task onto whats executing."); - - // try to pop a task of the syncing task manager. - // possible outcomes: - // - this is a different task manager, it must have an empty deque and fail - // - this is the same task manager and someone stole last tasks, thus this will fail - // - this is the same task manager and no one stole the last task, this this will succeed - base_task *popped_task = syncing_state.get_task_manager().pop_local_task(); - if (popped_task) { - // Fast path, simply continue execution where we left of before spawn. - PLS_ASSERT(popped_task == last_task, - "Fast path, nothing can have changed until here."); - PLS_ASSERT(&spawning_state == &syncing_state, - "Fast path, we must only return if the task has not been stolen/moved to other thread."); - PLS_ASSERT(last_task->continuation_.valid(), - "Fast path, no one can have continued working on the last task."); - - syncing_state.set_active_task(last_task); + lambda(); + + thread_state &syncing_state = thread_state::get(); + PLS_ASSERT(syncing_state.get_active_task() == spawned_task, + "Task manager must always point its active task onto whats executing."); + + // try to pop a task of the syncing task manager. + // possible outcomes: + // - this is a different task manager, it must have an empty deque and fail + // - this is the same task manager and someone stole last tasks, thus this will fail + // - this is the same task manager and no one stole the last task, this this will succeed + base_task *popped_task = syncing_state.get_task_manager().pop_local_task(); + if (popped_task) { + // Fast path, simply continue execution where we left of before spawn. + PLS_ASSERT(popped_task == last_task, + "Fast path, nothing can have changed until here."); + PLS_ASSERT(&spawning_state == &syncing_state, + "Fast path, we must only return if the task has not been stolen/moved to other thread."); + PLS_ASSERT(last_task->continuation_.valid(), + "Fast path, no one can have continued working on the last task."); + + syncing_state.set_active_task(last_task); #if PLS_PROFILING_ENABLED - syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), - spawned_task->stack_memory_, - spawned_task->stack_size_, - spawned_task->profiling_node_); - syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), - spawned_task->profiling_node_); + syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), + spawned_task->stack_memory_, + spawned_task->stack_size_, + spawned_task->profiling_node_); + syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), + spawned_task->profiling_node_); #endif - return std::move(last_task->continuation_); - } else { - // Slow path, the last task was stolen. This path is common to sync() events. + return std::move(last_task->continuation_); + } else { + // Slow path, the last task was stolen. This path is common to sync() events. #if PLS_PROFILING_ENABLED - syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), - spawned_task->stack_memory_, - spawned_task->stack_size_, - spawned_task->profiling_node_); - syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), - spawned_task->profiling_node_); + syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), + spawned_task->stack_memory_, + spawned_task->stack_size_, + spawned_task->profiling_node_); + syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), + spawned_task->profiling_node_); #endif - auto continuation = slow_return(syncing_state); - return continuation; - } - }); + auto continuation = slow_return(syncing_state); + return continuation; + } + }); #if PLS_PROFILING_ENABLED - thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), - last_task->profiling_node_); + thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), + last_task->profiling_node_); #endif - if (continuation.valid()) { - // We jumped in here from the main loop, keep track! - thread_state::get().main_continuation() = std::move(continuation); + if (continuation.valid()) { + // We jumped in here from the main loop, keep track! + thread_state::get().main_continuation() = std::move(continuation); + } + } else { + // Scheduler not active... + lambda(); } } diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 545bf34..8c7dcf3 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -44,6 +44,8 @@ struct PLS_CACHE_ALIGN thread_state { std::minstd_rand random_; base_task *active_task_; + static thread_local bool is_scheduler_active_; + #if PLS_SLEEP_WORKERS_ON_EMPTY PLS_CACHE_ALIGN std::atomic queue_empty_{EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; #endif @@ -76,6 +78,13 @@ struct PLS_CACHE_ALIGN thread_state { [[nodiscard]] static thread_state &PLS_NOINLINE get(); static void set(thread_state *); + [[nodiscard]] static bool is_scheduler_active() { + return is_scheduler_active_; + } + static void set_scheduler_active(bool active) { + is_scheduler_active_ = active; + } + [[nodiscard]] unsigned get_thread_id() const { return thread_id_; } [[nodiscard]] task_manager &get_task_manager() { return task_manager_; } [[nodiscard]] scheduler &get_scheduler() { return scheduler_; } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index c174f0c..d5fc96b 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -42,6 +42,7 @@ void scheduler::work_thread_main_loop() { void scheduler::work_thread_work_section() { thread_state &my_state = thread_state::get(); + my_state.set_scheduler_active(true); unsigned const num_threads = my_state.get_scheduler().num_threads(); if (my_state.get_thread_id() == 0) { @@ -145,48 +146,54 @@ void scheduler::work_thread_work_section() { } } } + my_state.set_scheduler_active(false); } -void scheduler::sync() { - thread_state &syncing_state = thread_state::get(); +void scheduler::sync_internal() { + if (thread_state::is_scheduler_active()) { + thread_state &syncing_state = thread_state::get(); - base_task *active_task = syncing_state.get_active_task(); - base_task *spawned_task = active_task->next_; + base_task *active_task = syncing_state.get_active_task(); + base_task *spawned_task = active_task->next_; #if PLS_PROFILING_ENABLED - syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), - active_task->stack_memory_, - active_task->stack_size_, - active_task->profiling_node_); - syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), - active_task->profiling_node_); - auto *next_dag_node = - syncing_state.get_scheduler().profiler_.task_sync(syncing_state.get_thread_id(), active_task->profiling_node_); - active_task->profiling_node_ = next_dag_node; + syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), + active_task->stack_memory_, + active_task->stack_size_, + active_task->profiling_node_); + syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), + active_task->profiling_node_); + auto *next_dag_node = + syncing_state.get_scheduler().profiler_.task_sync(syncing_state.get_thread_id(), active_task->profiling_node_); + active_task->profiling_node_ = next_dag_node; #endif - if (active_task->is_synchronized_) { + if (active_task->is_synchronized_) { #if PLS_PROFILING_ENABLED - thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), - thread_state::get().get_active_task()->profiling_node_); + thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), + thread_state::get().get_active_task()->profiling_node_); #endif - return; // We are already the sole owner of last_task - } else { - auto continuation = - spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) { - active_task->continuation_ = std::move(cont); - syncing_state.set_active_task(spawned_task); - return slow_return(syncing_state); - }); - - PLS_ASSERT(!continuation.valid(), - "We only return to a sync point, never jump to it directly." - "This must therefore never return an unfinished fiber/continuation."); + return; // We are already the sole owner of last_task + } else { + auto continuation = + spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) { + active_task->continuation_ = std::move(cont); + syncing_state.set_active_task(spawned_task); + return slow_return(syncing_state); + }); + + PLS_ASSERT(!continuation.valid(), + "We only return to a sync point, never jump to it directly." + "This must therefore never return an unfinished fiber/continuation."); #if PLS_PROFILING_ENABLED - thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), - thread_state::get().get_active_task()->profiling_node_); + thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), + thread_state::get().get_active_task()->profiling_node_); #endif - return; // We cleanly synced to the last one finishing work on last_task + return; // We cleanly synced to the last one finishing work on last_task + } + } else { + // Scheduler not active + return; } } diff --git a/lib/pls/src/internal/scheduling/thread_state.cpp b/lib/pls/src/internal/scheduling/thread_state.cpp index 85d1c94..acf603a 100644 --- a/lib/pls/src/internal/scheduling/thread_state.cpp +++ b/lib/pls/src/internal/scheduling/thread_state.cpp @@ -3,6 +3,7 @@ namespace pls::internal::scheduling { thread_local thread_state *my_thread_state{nullptr}; +thread_local bool thread_state::is_scheduler_active_{false}; thread_state &thread_state::get() { return *my_thread_state; } void thread_state::set(thread_state *new_state) { my_thread_state = new_state; }