From 08283a3716109568cfed99124a3b2848ca46e328 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Fri, 29 May 2020 17:23:17 +0200 Subject: [PATCH] Add scheduler_active flag and PLS_SERIAL_ELUSION compile option. The flag can be used to run code annotated with PLS outside of a scheduler environment, i.e. the app does not crash if the code is called without context. The compile option allows to omit all spawn and sync calls during compilation, creating the equivalent serial code. --- README.md | 13 +++++++++++++ app/benchmark_fft/main.cpp | 10 ++++------ app/benchmark_fib/main.cpp | 10 ++++------ lib/pls/include/pls/internal/scheduling/scheduler.h | 20 ++++++++++++++++++-- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 189 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------------------- lib/pls/include/pls/internal/scheduling/thread_state.h | 9 +++++++++ lib/pls/src/internal/scheduling/scheduler.cpp | 69 ++++++++++++++++++++++++++++++++++++++------------------------------- lib/pls/src/internal/scheduling/thread_state.cpp | 1 + 8 files changed, 184 insertions(+), 137 deletions(-) diff --git a/README.md b/README.md index 2318a7f..79b6a7a 100644 --- a/README.md +++ b/README.md @@ -141,8 +141,21 @@ Testing is done using [Catch2](https://github.com/catchorg/Catch2/) in the test subfolder. Tests are build into a target called `tests` and can be executed simply by building this executabe and running it. +### PLS profiler + +The PLS profiler records the DAG for each scheduler invocation. +Stats can be queried form it and it can be printed in .dot format, +which can later be rendered by the dot software to inspect the actual +executed graph. + +The most useful tools are to analyze the maximum memory required per +coroutine stack, th computational depth, T_1 and T_inf. + ### Data Race Detection +WARNING: the latest build of clang/thread sanitizer is required for this to work, +as a recent bug-fix regarding user level thread is required! + As this project contains a lot concurrent code we use [Thread Sanitizer](https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual) in our CI process and optional in other builds. To setup CMake builds diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index 18b05ce..3ba38ea 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -1,7 +1,5 @@ #include "pls/pls.h" -using namespace pls; - #include "benchmark_runner.h" #include "benchmark_base/fft.h" @@ -17,13 +15,13 @@ void pls_conquer(fft::complex_vector::iterator data, fft::complex_vector::iterat fft::conquer(data, swap_array, n / 2); fft::conquer(data + n / 2, swap_array + n / 2, n / 2); } else { - spawn([data, n, swap_array]() { + pls::spawn([data, n, swap_array]() { pls_conquer(data, swap_array, n / 2); }); - spawn([data, n, swap_array]() { + pls::spawn([data, n, swap_array]() { pls_conquer(data + n / 2, swap_array + n / 2, n / 2); }); - sync(); + pls::sync(); } fft::combine(data, n); @@ -45,7 +43,7 @@ int main(int argc, char **argv) { fft::complex_vector swap_array(fft::SIZE); fft::fill_input(data); - scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE}; + pls::scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE}; // scheduler.get_profiler().disable_memory_measure(); runner.run_iterations(10, [&]() { diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index d0a4e40..67492d3 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -1,7 +1,5 @@ #include "pls/pls.h" -using namespace pls; - #include #include "benchmark_runner.h" @@ -18,13 +16,13 @@ int pls_fib(int n) { } int a, b; - spawn([n, &a]() { + pls::spawn([n, &a]() { a = pls_fib(n - 1); }); - spawn([n, &b]() { + pls::spawn([n, &b]() { b = pls_fib(n - 2); }); - sync(); + pls::sync(); return a + b; } @@ -41,7 +39,7 @@ int main(int argc, char **argv) { string full_directory = directory + "/PLS_v3/"; benchmark_runner runner{full_directory, test_name}; - scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE}; + pls::scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE}; volatile int res; // scheduler.get_profiler().disable_memory_measure(); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index fdd2c99..41b33cb 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -79,12 +79,24 @@ class scheduler { * @param lambda the lambda to be executed in parallel. */ template - static void spawn(Function &&lambda); + static void spawn(Function &&lambda) { +#ifdef PLS_SERIAL_ELUSION + lambda(); +#else + spawn_internal(std::forward(lambda)); +#endif + } /** * Waits for all potentially parallel child tasks created with spawn(...). */ - static void sync(); + static void sync() { +#ifdef PLS_SERIAL_ELUSION + return; +#else + sync_internal(); +#endif + } /** * Explicitly terminate the worker threads. Scheduler must not be used after this. @@ -108,6 +120,10 @@ class scheduler { #endif private: + template + static void spawn_internal(Function &&lambda); + static void sync_internal(); + static context_switcher::continuation slow_return(thread_state &calling_state); static void work_thread_main_loop(); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 71f6166..19cbd88 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -31,7 +31,7 @@ scheduler::scheduler(unsigned int num_threads, terminated_{false}, stack_allocator_{std::make_shared(std::forward(stack_allocator))} #if PLS_PROFILING_ENABLED - , profiler_{num_threads} +, profiler_{num_threads} #endif { @@ -144,119 +144,124 @@ void scheduler::perform_work(Function work_section) { } template -void scheduler::spawn(Function &&lambda) { - thread_state &spawning_state = thread_state::get(); +void scheduler::spawn_internal(Function &&lambda) { + if (thread_state::is_scheduler_active()) { + thread_state &spawning_state = thread_state::get(); - base_task *last_task = spawning_state.get_active_task(); - base_task *spawned_task = last_task->next_; + base_task *last_task = spawning_state.get_active_task(); + base_task *spawned_task = last_task->next_; #if PLS_PROFILING_ENABLED - spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(), - spawned_task->stack_memory_, - spawned_task->stack_size_); - auto *child_dag_node = spawning_state.get_scheduler().profiler_.task_spawn_child(spawning_state.get_thread_id(), - last_task->profiling_node_); - spawned_task->profiling_node_ = child_dag_node; + spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(), + spawned_task->stack_memory_, + spawned_task->stack_size_); + auto *child_dag_node = spawning_state.get_scheduler().profiler_.task_spawn_child(spawning_state.get_thread_id(), + last_task->profiling_node_); + spawned_task->profiling_node_ = child_dag_node; #endif - auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) { - // allow stealing threads to continue the last task. - last_task->continuation_ = std::move(cont); + auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) { + // allow stealing threads to continue the last task. + last_task->continuation_ = std::move(cont); - // we are now executing the new task, allow others to steal the last task continuation. - spawned_task->is_synchronized_ = true; - spawning_state.set_active_task(spawned_task); - spawning_state.get_task_manager().push_local_task(last_task); + // we are now executing the new task, allow others to steal the last task continuation. + spawned_task->is_synchronized_ = true; + spawning_state.set_active_task(spawned_task); + spawning_state.get_task_manager().push_local_task(last_task); #if PLS_SLEEP_WORKERS_ON_EMPTY - // TODO: relax atomic operations on empty flag - data_structures::stamped_integer queue_empty_flag = spawning_state.get_queue_empty_flag().load(); - switch (queue_empty_flag.value) { - case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: { - // The queue was not found empty, ignore it. - break; - } - case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: { - // Someone tries to mark us empty and might be re-stealing right now. - data_structures::stamped_integer - queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; - auto actual_empty_flag = spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag); - if (actual_empty_flag.value == EMPTY_QUEUE_STATE::QUEUE_EMPTY) { + // TODO: relax atomic operations on empty flag + data_structures::stamped_integer queue_empty_flag = spawning_state.get_queue_empty_flag().load(); + switch (queue_empty_flag.value) { + case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: { + // The queue was not found empty, ignore it. + break; + } + case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: { + // Someone tries to mark us empty and might be re-stealing right now. + data_structures::stamped_integer + queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; + auto actual_empty_flag = spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag); + if (actual_empty_flag.value == EMPTY_QUEUE_STATE::QUEUE_EMPTY) { + spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake(); + } + break; + } + case EMPTY_QUEUE_STATE::QUEUE_EMPTY: { + // Someone already marked the queue empty, we must revert its action on the central queue. + data_structures::stamped_integer + queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; + spawning_state.get_queue_empty_flag().store(queue_non_empty_flag); spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake(); + break; } - break; - } - case EMPTY_QUEUE_STATE::QUEUE_EMPTY: { - // Someone already marked the queue empty, we must revert its action on the central queue. - data_structures::stamped_integer - queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; - spawning_state.get_queue_empty_flag().store(queue_non_empty_flag); - spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake(); - break; } - } #endif - // execute the lambda itself, which could lead to a different thread returning. + // execute the lambda itself, which could lead to a different thread returning. #if PLS_PROFILING_ENABLED - spawning_state.get_scheduler().profiler_.task_stop_running(spawning_state.get_thread_id(), - last_task->profiling_node_); - spawning_state.get_scheduler().profiler_.task_start_running(spawning_state.get_thread_id(), - spawned_task->profiling_node_); + spawning_state.get_scheduler().profiler_.task_stop_running(spawning_state.get_thread_id(), + last_task->profiling_node_); + spawning_state.get_scheduler().profiler_.task_start_running(spawning_state.get_thread_id(), + spawned_task->profiling_node_); #endif - lambda(); - - thread_state &syncing_state = thread_state::get(); - PLS_ASSERT(syncing_state.get_active_task() == spawned_task, - "Task manager must always point its active task onto whats executing."); - - // try to pop a task of the syncing task manager. - // possible outcomes: - // - this is a different task manager, it must have an empty deque and fail - // - this is the same task manager and someone stole last tasks, thus this will fail - // - this is the same task manager and no one stole the last task, this this will succeed - base_task *popped_task = syncing_state.get_task_manager().pop_local_task(); - if (popped_task) { - // Fast path, simply continue execution where we left of before spawn. - PLS_ASSERT(popped_task == last_task, - "Fast path, nothing can have changed until here."); - PLS_ASSERT(&spawning_state == &syncing_state, - "Fast path, we must only return if the task has not been stolen/moved to other thread."); - PLS_ASSERT(last_task->continuation_.valid(), - "Fast path, no one can have continued working on the last task."); - - syncing_state.set_active_task(last_task); + lambda(); + + thread_state &syncing_state = thread_state::get(); + PLS_ASSERT(syncing_state.get_active_task() == spawned_task, + "Task manager must always point its active task onto whats executing."); + + // try to pop a task of the syncing task manager. + // possible outcomes: + // - this is a different task manager, it must have an empty deque and fail + // - this is the same task manager and someone stole last tasks, thus this will fail + // - this is the same task manager and no one stole the last task, this this will succeed + base_task *popped_task = syncing_state.get_task_manager().pop_local_task(); + if (popped_task) { + // Fast path, simply continue execution where we left of before spawn. + PLS_ASSERT(popped_task == last_task, + "Fast path, nothing can have changed until here."); + PLS_ASSERT(&spawning_state == &syncing_state, + "Fast path, we must only return if the task has not been stolen/moved to other thread."); + PLS_ASSERT(last_task->continuation_.valid(), + "Fast path, no one can have continued working on the last task."); + + syncing_state.set_active_task(last_task); #if PLS_PROFILING_ENABLED - syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), - spawned_task->stack_memory_, - spawned_task->stack_size_, - spawned_task->profiling_node_); - syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), - spawned_task->profiling_node_); + syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), + spawned_task->stack_memory_, + spawned_task->stack_size_, + spawned_task->profiling_node_); + syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), + spawned_task->profiling_node_); #endif - return std::move(last_task->continuation_); - } else { - // Slow path, the last task was stolen. This path is common to sync() events. + return std::move(last_task->continuation_); + } else { + // Slow path, the last task was stolen. This path is common to sync() events. #if PLS_PROFILING_ENABLED - syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), - spawned_task->stack_memory_, - spawned_task->stack_size_, - spawned_task->profiling_node_); - syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), - spawned_task->profiling_node_); + syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), + spawned_task->stack_memory_, + spawned_task->stack_size_, + spawned_task->profiling_node_); + syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), + spawned_task->profiling_node_); #endif - auto continuation = slow_return(syncing_state); - return continuation; - } - }); + auto continuation = slow_return(syncing_state); + return continuation; + } + }); #if PLS_PROFILING_ENABLED - thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), - last_task->profiling_node_); + thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), + last_task->profiling_node_); #endif - if (continuation.valid()) { - // We jumped in here from the main loop, keep track! - thread_state::get().main_continuation() = std::move(continuation); + if (continuation.valid()) { + // We jumped in here from the main loop, keep track! + thread_state::get().main_continuation() = std::move(continuation); + } + } else { + // Scheduler not active... + lambda(); } } diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 545bf34..8c7dcf3 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -44,6 +44,8 @@ struct PLS_CACHE_ALIGN thread_state { std::minstd_rand random_; base_task *active_task_; + static thread_local bool is_scheduler_active_; + #if PLS_SLEEP_WORKERS_ON_EMPTY PLS_CACHE_ALIGN std::atomic queue_empty_{EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; #endif @@ -76,6 +78,13 @@ struct PLS_CACHE_ALIGN thread_state { [[nodiscard]] static thread_state &PLS_NOINLINE get(); static void set(thread_state *); + [[nodiscard]] static bool is_scheduler_active() { + return is_scheduler_active_; + } + static void set_scheduler_active(bool active) { + is_scheduler_active_ = active; + } + [[nodiscard]] unsigned get_thread_id() const { return thread_id_; } [[nodiscard]] task_manager &get_task_manager() { return task_manager_; } [[nodiscard]] scheduler &get_scheduler() { return scheduler_; } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index c174f0c..d5fc96b 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -42,6 +42,7 @@ void scheduler::work_thread_main_loop() { void scheduler::work_thread_work_section() { thread_state &my_state = thread_state::get(); + my_state.set_scheduler_active(true); unsigned const num_threads = my_state.get_scheduler().num_threads(); if (my_state.get_thread_id() == 0) { @@ -145,48 +146,54 @@ void scheduler::work_thread_work_section() { } } } + my_state.set_scheduler_active(false); } -void scheduler::sync() { - thread_state &syncing_state = thread_state::get(); +void scheduler::sync_internal() { + if (thread_state::is_scheduler_active()) { + thread_state &syncing_state = thread_state::get(); - base_task *active_task = syncing_state.get_active_task(); - base_task *spawned_task = active_task->next_; + base_task *active_task = syncing_state.get_active_task(); + base_task *spawned_task = active_task->next_; #if PLS_PROFILING_ENABLED - syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), - active_task->stack_memory_, - active_task->stack_size_, - active_task->profiling_node_); - syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), - active_task->profiling_node_); - auto *next_dag_node = - syncing_state.get_scheduler().profiler_.task_sync(syncing_state.get_thread_id(), active_task->profiling_node_); - active_task->profiling_node_ = next_dag_node; + syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(), + active_task->stack_memory_, + active_task->stack_size_, + active_task->profiling_node_); + syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), + active_task->profiling_node_); + auto *next_dag_node = + syncing_state.get_scheduler().profiler_.task_sync(syncing_state.get_thread_id(), active_task->profiling_node_); + active_task->profiling_node_ = next_dag_node; #endif - if (active_task->is_synchronized_) { + if (active_task->is_synchronized_) { #if PLS_PROFILING_ENABLED - thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), - thread_state::get().get_active_task()->profiling_node_); + thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), + thread_state::get().get_active_task()->profiling_node_); #endif - return; // We are already the sole owner of last_task - } else { - auto continuation = - spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) { - active_task->continuation_ = std::move(cont); - syncing_state.set_active_task(spawned_task); - return slow_return(syncing_state); - }); - - PLS_ASSERT(!continuation.valid(), - "We only return to a sync point, never jump to it directly." - "This must therefore never return an unfinished fiber/continuation."); + return; // We are already the sole owner of last_task + } else { + auto continuation = + spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) { + active_task->continuation_ = std::move(cont); + syncing_state.set_active_task(spawned_task); + return slow_return(syncing_state); + }); + + PLS_ASSERT(!continuation.valid(), + "We only return to a sync point, never jump to it directly." + "This must therefore never return an unfinished fiber/continuation."); #if PLS_PROFILING_ENABLED - thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), - thread_state::get().get_active_task()->profiling_node_); + thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), + thread_state::get().get_active_task()->profiling_node_); #endif - return; // We cleanly synced to the last one finishing work on last_task + return; // We cleanly synced to the last one finishing work on last_task + } + } else { + // Scheduler not active + return; } } diff --git a/lib/pls/src/internal/scheduling/thread_state.cpp b/lib/pls/src/internal/scheduling/thread_state.cpp index 85d1c94..acf603a 100644 --- a/lib/pls/src/internal/scheduling/thread_state.cpp +++ b/lib/pls/src/internal/scheduling/thread_state.cpp @@ -3,6 +3,7 @@ namespace pls::internal::scheduling { thread_local thread_state *my_thread_state{nullptr}; +thread_local bool thread_state::is_scheduler_active_{false}; thread_state &thread_state::get() { return *my_thread_state; } void thread_state::set(thread_state *new_state) { my_thread_state = new_state; } -- libgit2 0.26.0