Commit 08283a37 by FritzFlorian

Add scheduler_active flag and PLS_SERIAL_ELUSION compile option.

The flag can be used to run code annotated with PLS outside of a scheduler environment,
i.e. the app does not crash if the code is called without context. The compile option
allows to omit all spawn and sync calls during compilation, creating the equivalent serial code.
parent f9ec6ecf
Pipeline #1497 passed with stages
in 3 minutes 57 seconds
......@@ -141,8 +141,21 @@ Testing is done using [Catch2](https://github.com/catchorg/Catch2/)
in the test subfolder. Tests are build into a target called `tests`
and can be executed simply by building this executabe and running it.
### PLS profiler
The PLS profiler records the DAG for each scheduler invocation.
Stats can be queried form it and it can be printed in .dot format,
which can later be rendered by the dot software to inspect the actual
executed graph.
The most useful tools are to analyze the maximum memory required per
coroutine stack, th computational depth, T_1 and T_inf.
### Data Race Detection
WARNING: the latest build of clang/thread sanitizer is required for this to work,
as a recent bug-fix regarding user level thread is required!
As this project contains a lot concurrent code we use
[Thread Sanitizer](https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual)
in our CI process and optional in other builds. To setup CMake builds
......
#include "pls/pls.h"
using namespace pls;
#include "benchmark_runner.h"
#include "benchmark_base/fft.h"
......@@ -17,13 +15,13 @@ void pls_conquer(fft::complex_vector::iterator data, fft::complex_vector::iterat
fft::conquer(data, swap_array, n / 2);
fft::conquer(data + n / 2, swap_array + n / 2, n / 2);
} else {
spawn([data, n, swap_array]() {
pls::spawn([data, n, swap_array]() {
pls_conquer(data, swap_array, n / 2);
});
spawn([data, n, swap_array]() {
pls::spawn([data, n, swap_array]() {
pls_conquer(data + n / 2, swap_array + n / 2, n / 2);
});
sync();
pls::sync();
}
fft::combine(data, n);
......@@ -45,7 +43,7 @@ int main(int argc, char **argv) {
fft::complex_vector swap_array(fft::SIZE);
fft::fill_input(data);
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
pls::scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
// scheduler.get_profiler().disable_memory_measure();
runner.run_iterations(10, [&]() {
......
#include "pls/pls.h"
using namespace pls;
#include <iostream>
#include "benchmark_runner.h"
......@@ -18,13 +16,13 @@ int pls_fib(int n) {
}
int a, b;
spawn([n, &a]() {
pls::spawn([n, &a]() {
a = pls_fib(n - 1);
});
spawn([n, &b]() {
pls::spawn([n, &b]() {
b = pls_fib(n - 2);
});
sync();
pls::sync();
return a + b;
}
......@@ -41,7 +39,7 @@ int main(int argc, char **argv) {
string full_directory = directory + "/PLS_v3/";
benchmark_runner runner{full_directory, test_name};
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
pls::scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
volatile int res;
// scheduler.get_profiler().disable_memory_measure();
......
......@@ -79,12 +79,24 @@ class scheduler {
* @param lambda the lambda to be executed in parallel.
*/
template<typename Function>
static void spawn(Function &&lambda);
static void spawn(Function &&lambda) {
#ifdef PLS_SERIAL_ELUSION
lambda();
#else
spawn_internal(std::forward<Function>(lambda));
#endif
}
/**
* Waits for all potentially parallel child tasks created with spawn(...).
*/
static void sync();
static void sync() {
#ifdef PLS_SERIAL_ELUSION
return;
#else
sync_internal();
#endif
}
/**
* Explicitly terminate the worker threads. Scheduler must not be used after this.
......@@ -108,6 +120,10 @@ class scheduler {
#endif
private:
template<typename Function>
static void spawn_internal(Function &&lambda);
static void sync_internal();
static context_switcher::continuation slow_return(thread_state &calling_state);
static void work_thread_main_loop();
......
......@@ -31,7 +31,7 @@ scheduler::scheduler(unsigned int num_threads,
terminated_{false},
stack_allocator_{std::make_shared<ALLOC>(std::forward<ALLOC>(stack_allocator))}
#if PLS_PROFILING_ENABLED
, profiler_{num_threads}
, profiler_{num_threads}
#endif
{
......@@ -144,119 +144,124 @@ void scheduler::perform_work(Function work_section) {
}
template<typename Function>
void scheduler::spawn(Function &&lambda) {
thread_state &spawning_state = thread_state::get();
void scheduler::spawn_internal(Function &&lambda) {
if (thread_state::is_scheduler_active()) {
thread_state &spawning_state = thread_state::get();
base_task *last_task = spawning_state.get_active_task();
base_task *spawned_task = last_task->next_;
base_task *last_task = spawning_state.get_active_task();
base_task *spawned_task = last_task->next_;
#if PLS_PROFILING_ENABLED
spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_);
auto *child_dag_node = spawning_state.get_scheduler().profiler_.task_spawn_child(spawning_state.get_thread_id(),
last_task->profiling_node_);
spawned_task->profiling_node_ = child_dag_node;
spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_);
auto *child_dag_node = spawning_state.get_scheduler().profiler_.task_spawn_child(spawning_state.get_thread_id(),
last_task->profiling_node_);
spawned_task->profiling_node_ = child_dag_node;
#endif
auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) {
// allow stealing threads to continue the last task.
last_task->continuation_ = std::move(cont);
auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) {
// allow stealing threads to continue the last task.
last_task->continuation_ = std::move(cont);
// we are now executing the new task, allow others to steal the last task continuation.
spawned_task->is_synchronized_ = true;
spawning_state.set_active_task(spawned_task);
spawning_state.get_task_manager().push_local_task(last_task);
// we are now executing the new task, allow others to steal the last task continuation.
spawned_task->is_synchronized_ = true;
spawning_state.set_active_task(spawned_task);
spawning_state.get_task_manager().push_local_task(last_task);
#if PLS_SLEEP_WORKERS_ON_EMPTY
// TODO: relax atomic operations on empty flag
data_structures::stamped_integer queue_empty_flag = spawning_state.get_queue_empty_flag().load();
switch (queue_empty_flag.value) {
case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: {
// The queue was not found empty, ignore it.
break;
}
case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: {
// Someone tries to mark us empty and might be re-stealing right now.
data_structures::stamped_integer
queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
auto actual_empty_flag = spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag);
if (actual_empty_flag.value == EMPTY_QUEUE_STATE::QUEUE_EMPTY) {
// TODO: relax atomic operations on empty flag
data_structures::stamped_integer queue_empty_flag = spawning_state.get_queue_empty_flag().load();
switch (queue_empty_flag.value) {
case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: {
// The queue was not found empty, ignore it.
break;
}
case EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY: {
// Someone tries to mark us empty and might be re-stealing right now.
data_structures::stamped_integer
queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
auto actual_empty_flag = spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag);
if (actual_empty_flag.value == EMPTY_QUEUE_STATE::QUEUE_EMPTY) {
spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake();
}
break;
}
case EMPTY_QUEUE_STATE::QUEUE_EMPTY: {
// Someone already marked the queue empty, we must revert its action on the central queue.
data_structures::stamped_integer
queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
spawning_state.get_queue_empty_flag().store(queue_non_empty_flag);
spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake();
break;
}
break;
}
case EMPTY_QUEUE_STATE::QUEUE_EMPTY: {
// Someone already marked the queue empty, we must revert its action on the central queue.
data_structures::stamped_integer
queue_non_empty_flag{queue_empty_flag.stamp++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
spawning_state.get_queue_empty_flag().store(queue_non_empty_flag);
spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake();
break;
}
}
#endif
// execute the lambda itself, which could lead to a different thread returning.
// execute the lambda itself, which could lead to a different thread returning.
#if PLS_PROFILING_ENABLED
spawning_state.get_scheduler().profiler_.task_stop_running(spawning_state.get_thread_id(),
last_task->profiling_node_);
spawning_state.get_scheduler().profiler_.task_start_running(spawning_state.get_thread_id(),
spawned_task->profiling_node_);
spawning_state.get_scheduler().profiler_.task_stop_running(spawning_state.get_thread_id(),
last_task->profiling_node_);
spawning_state.get_scheduler().profiler_.task_start_running(spawning_state.get_thread_id(),
spawned_task->profiling_node_);
#endif
lambda();
thread_state &syncing_state = thread_state::get();
PLS_ASSERT(syncing_state.get_active_task() == spawned_task,
"Task manager must always point its active task onto whats executing.");
// try to pop a task of the syncing task manager.
// possible outcomes:
// - this is a different task manager, it must have an empty deque and fail
// - this is the same task manager and someone stole last tasks, thus this will fail
// - this is the same task manager and no one stole the last task, this this will succeed
base_task *popped_task = syncing_state.get_task_manager().pop_local_task();
if (popped_task) {
// Fast path, simply continue execution where we left of before spawn.
PLS_ASSERT(popped_task == last_task,
"Fast path, nothing can have changed until here.");
PLS_ASSERT(&spawning_state == &syncing_state,
"Fast path, we must only return if the task has not been stolen/moved to other thread.");
PLS_ASSERT(last_task->continuation_.valid(),
"Fast path, no one can have continued working on the last task.");
syncing_state.set_active_task(last_task);
lambda();
thread_state &syncing_state = thread_state::get();
PLS_ASSERT(syncing_state.get_active_task() == spawned_task,
"Task manager must always point its active task onto whats executing.");
// try to pop a task of the syncing task manager.
// possible outcomes:
// - this is a different task manager, it must have an empty deque and fail
// - this is the same task manager and someone stole last tasks, thus this will fail
// - this is the same task manager and no one stole the last task, this this will succeed
base_task *popped_task = syncing_state.get_task_manager().pop_local_task();
if (popped_task) {
// Fast path, simply continue execution where we left of before spawn.
PLS_ASSERT(popped_task == last_task,
"Fast path, nothing can have changed until here.");
PLS_ASSERT(&spawning_state == &syncing_state,
"Fast path, we must only return if the task has not been stolen/moved to other thread.");
PLS_ASSERT(last_task->continuation_.valid(),
"Fast path, no one can have continued working on the last task.");
syncing_state.set_active_task(last_task);
#if PLS_PROFILING_ENABLED
syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_,
spawned_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
spawned_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_,
spawned_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
spawned_task->profiling_node_);
#endif
return std::move(last_task->continuation_);
} else {
// Slow path, the last task was stolen. This path is common to sync() events.
return std::move(last_task->continuation_);
} else {
// Slow path, the last task was stolen. This path is common to sync() events.
#if PLS_PROFILING_ENABLED
syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_,
spawned_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
spawned_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_,
spawned_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
spawned_task->profiling_node_);
#endif
auto continuation = slow_return(syncing_state);
return continuation;
}
});
auto continuation = slow_return(syncing_state);
return continuation;
}
});
#if PLS_PROFILING_ENABLED
thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(),
last_task->profiling_node_);
thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(),
last_task->profiling_node_);
#endif
if (continuation.valid()) {
// We jumped in here from the main loop, keep track!
thread_state::get().main_continuation() = std::move(continuation);
if (continuation.valid()) {
// We jumped in here from the main loop, keep track!
thread_state::get().main_continuation() = std::move(continuation);
}
} else {
// Scheduler not active...
lambda();
}
}
......
......@@ -44,6 +44,8 @@ struct PLS_CACHE_ALIGN thread_state {
std::minstd_rand random_;
base_task *active_task_;
static thread_local bool is_scheduler_active_;
#if PLS_SLEEP_WORKERS_ON_EMPTY
PLS_CACHE_ALIGN std::atomic<data_structures::stamped_integer> queue_empty_{EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
#endif
......@@ -76,6 +78,13 @@ struct PLS_CACHE_ALIGN thread_state {
[[nodiscard]] static thread_state &PLS_NOINLINE get();
static void set(thread_state *);
[[nodiscard]] static bool is_scheduler_active() {
return is_scheduler_active_;
}
static void set_scheduler_active(bool active) {
is_scheduler_active_ = active;
}
[[nodiscard]] unsigned get_thread_id() const { return thread_id_; }
[[nodiscard]] task_manager &get_task_manager() { return task_manager_; }
[[nodiscard]] scheduler &get_scheduler() { return scheduler_; }
......
......@@ -42,6 +42,7 @@ void scheduler::work_thread_main_loop() {
void scheduler::work_thread_work_section() {
thread_state &my_state = thread_state::get();
my_state.set_scheduler_active(true);
unsigned const num_threads = my_state.get_scheduler().num_threads();
if (my_state.get_thread_id() == 0) {
......@@ -145,48 +146,54 @@ void scheduler::work_thread_work_section() {
}
}
}
my_state.set_scheduler_active(false);
}
void scheduler::sync() {
thread_state &syncing_state = thread_state::get();
void scheduler::sync_internal() {
if (thread_state::is_scheduler_active()) {
thread_state &syncing_state = thread_state::get();
base_task *active_task = syncing_state.get_active_task();
base_task *spawned_task = active_task->next_;
base_task *active_task = syncing_state.get_active_task();
base_task *spawned_task = active_task->next_;
#if PLS_PROFILING_ENABLED
syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(),
active_task->stack_memory_,
active_task->stack_size_,
active_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
active_task->profiling_node_);
auto *next_dag_node =
syncing_state.get_scheduler().profiler_.task_sync(syncing_state.get_thread_id(), active_task->profiling_node_);
active_task->profiling_node_ = next_dag_node;
syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(),
active_task->stack_memory_,
active_task->stack_size_,
active_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
active_task->profiling_node_);
auto *next_dag_node =
syncing_state.get_scheduler().profiler_.task_sync(syncing_state.get_thread_id(), active_task->profiling_node_);
active_task->profiling_node_ = next_dag_node;
#endif
if (active_task->is_synchronized_) {
if (active_task->is_synchronized_) {
#if PLS_PROFILING_ENABLED
thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(),
thread_state::get().get_active_task()->profiling_node_);
thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(),
thread_state::get().get_active_task()->profiling_node_);
#endif
return; // We are already the sole owner of last_task
} else {
auto continuation =
spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) {
active_task->continuation_ = std::move(cont);
syncing_state.set_active_task(spawned_task);
return slow_return(syncing_state);
});
PLS_ASSERT(!continuation.valid(),
"We only return to a sync point, never jump to it directly."
"This must therefore never return an unfinished fiber/continuation.");
return; // We are already the sole owner of last_task
} else {
auto continuation =
spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) {
active_task->continuation_ = std::move(cont);
syncing_state.set_active_task(spawned_task);
return slow_return(syncing_state);
});
PLS_ASSERT(!continuation.valid(),
"We only return to a sync point, never jump to it directly."
"This must therefore never return an unfinished fiber/continuation.");
#if PLS_PROFILING_ENABLED
thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(),
thread_state::get().get_active_task()->profiling_node_);
thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(),
thread_state::get().get_active_task()->profiling_node_);
#endif
return; // We cleanly synced to the last one finishing work on last_task
return; // We cleanly synced to the last one finishing work on last_task
}
} else {
// Scheduler not active
return;
}
}
......
......@@ -3,6 +3,7 @@
namespace pls::internal::scheduling {
thread_local thread_state *my_thread_state{nullptr};
thread_local bool thread_state::is_scheduler_active_{false};
thread_state &thread_state::get() { return *my_thread_state; }
void thread_state::set(thread_state *new_state) { my_thread_state = new_state; }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment