#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/static_scheduler_memory.h"
#include "pls/internal/helpers/profiler.h"
using namespace pls::internal::scheduling;
......@@ -14,33 +13,35 @@ using namespace pls::internal::scheduling;
using namespace comparison_benchmarks::base;
parallel_result<short> conquer(fft::complex_vector::iterator data, int n) {
void conquer(fft::complex_vector::iterator data, int n) {
if (n < 2) {
return parallel_result<short>{0};
fft::divide(data, n);
if (n <= fft::RECURSIVE_CUTOFF) {
fft::conquer(data, n / 2);
fft::conquer(data + n / 2, n / 2);
fft::combine(data, n);
return parallel_result<short>{0};
} else {
return scheduler::par([=]() {
return conquer(data, n / 2);
}, [=]() {
return conquer(data + n / 2, n / 2);
}).then([=](int, int) {
fft::combine(data, n);
return parallel_result<short>{0};
scheduler::spawn([data, n]() {
conquer(data, n / 2);
scheduler::spawn([data, n]() {
conquer(data + n / 2, n / 2);
fft::combine(data, n);
constexpr int MAX_NUM_THREADS = 8;
constexpr int MAX_NUM_TASKS = 64;
constexpr int MAX_NUM_CONTS = 64;
constexpr int MAX_CONT_SIZE = 256;
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 8;
MAX_STACK_SIZE> global_scheduler_memory;
int main(int argc, char **argv) {
int num_threads;
......@@ -53,39 +54,21 @@ int main(int argc, char **argv) {
fft::complex_vector data = fft::generate_input();
MAX_CONT_SIZE> static_scheduler_memory;
scheduler scheduler{static_scheduler_memory, (unsigned int) num_threads};
for (int i = 0; i < fft::NUM_WARMUP_ITERATIONS; i++) {
scheduler.perform_work([&]() {
return scheduler::par([&]() {
return conquer(data.begin(), fft::SIZE);
}, []() {
return parallel_result<short>{0};
}).then([&](short, short) {
return parallel_result<int>{0};
scheduler scheduler{global_scheduler_memory, (unsigned) num_threads};
for (int i = 0; i < fft::NUM_ITERATIONS; i++) {
scheduler.perform_work([&]() {
scheduler.perform_work([&]() {
for (int i = 0; i < fft::NUM_WARMUP_ITERATIONS; i++) {
conquer(data.begin(), fft::SIZE);
return scheduler::par([&]() {
return conquer(data.begin(), fft::SIZE);
}, []() {
return parallel_result<short>{0};
}).then([&](short, short) {
return parallel_result<int>{0};
scheduler.perform_work([&]() {
for (int i = 0; i < fft::NUM_ITERATIONS; i++) {
conquer(data.begin(), fft::SIZE);
return 0;
......@@ -31,9 +31,9 @@ int pls_fib(int n) {
return a + b;
constexpr int MAX_NUM_THREADS = 4;
constexpr int MAX_NUM_THREADS = 8;
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 32;
constexpr int MAX_STACK_SIZE = 1024 * 1;
......@@ -48,7 +48,7 @@ int main(int argc, char **argv) {
string full_directory = directory + "/PLS_v3/";
benchmark_runner runner{full_directory, test_name};
scheduler scheduler{global_scheduler_memory, (unsigned) num_threads, false};
scheduler scheduler{global_scheduler_memory, (unsigned) num_threads};
volatile int res;
scheduler.perform_work([&]() {
......@@ -58,7 +58,7 @@ int main(int argc, char **argv) {
scheduler.perform_work([&]() {
for (int i = 0; i < fib::NUM_ITERATIONS * 100; i++) {
for (int i = 0; i < fib::NUM_ITERATIONS; i++) {
res = pls_fib(fib::INPUT_N);
......@@ -16,6 +16,6 @@
void pls_error(const char *msg);
// TODO: Distinguish between debug/internal asserts and production asserts.
#define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); }
#define PLS_ASSERT(cond, msg) // if (!(cond)) { pls_error(msg); }
......@@ -46,6 +46,15 @@ class external_trading_deque {
thread_id_ = id;
static optional<task *> peek_traded_object(task *target_task) {
traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
if (current_cas.is_filled_with_object()) {
return optional<task *>{current_cas.get_trade_object()};
} else {
return optional<task *>{};
static optional<task *> get_trade_object(task *target_task) {
traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
if (current_cas.is_filled_with_object()) {
......@@ -129,7 +138,7 @@ class external_trading_deque {
struct peek_result {
peek_result(optional<task *> top_task, stamped_integer top_pointer) : top_task_{std::move(top_task)},
top_pointer_{top_pointer} {};
top_pointer_{top_pointer} {};
optional<task *> top_task_;
stamped_integer top_pointer_;
......@@ -56,7 +56,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task {
// Work-Stealing
std::atomic<traded_cas_field> external_trading_deque_cas_{};
task *resource_stack_next_{};
std::atomic<task *> resource_stack_next_{};
std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}};
bool clean_;
......@@ -52,7 +52,7 @@ class task_manager {
void spawn_child(F &&lambda);
void sync();
bool steal_task(task_manager &stealing_task_manager);
task* steal_task(task_manager &stealing_task_manager);
bool try_clean_return(context_switcher::continuation &result_cont);
......@@ -19,18 +19,29 @@ namespace scheduling {
template<typename F>
void task_manager::spawn_child(F &&lambda) {
auto *spawning_task_manager = this;
auto continuation =
active_task_->next_->run_as_task([lambda, spawning_task_manager](context_switcher::continuation cont) {
auto *last_task = spawning_task_manager->active_task_;
auto *this_task = spawning_task_manager->active_task_->next_;
auto *last_task = spawning_task_manager->active_task_;
auto *spawned_task = spawning_task_manager->active_task_->next_;
auto continuation =
spawned_task->run_as_task([=](context_switcher::continuation cont) {
// allow stealing threads to continue the last task.
last_task->continuation_ = std::move(cont);
spawning_task_manager->active_task_ = this_task;
// we are now executing the new task, allow others to steal the last task continuation.
spawning_task_manager->active_task_ = spawned_task;
// execute the lambda itself, which could lead to a different thread returning.
auto *syncing_task_manager = &thread_state::get().get_task_manager();
PLS_ASSERT(syncing_task_manager->active_task_ == spawned_task,
"Task manager must always point its active task onto whats executing.");
// try to pop a task of the syncing task manager.
// possible outcomes:
// - this is a different task manager, it must have an empty deque and fail
// - this is the same task manager and someone stole last tasks, thus this will fail
// - this is the same task manager and no one stole the last task, this this will succeed
auto pop_result = syncing_task_manager->deque_.pop_bot();
if (pop_result) {
// Fast path, simply continue execution where we left of before spawn.
......@@ -44,7 +55,7 @@ void task_manager::spawn_child(F &&lambda) {
syncing_task_manager->active_task_ = last_task;
return std::move(last_task->continuation_);
} else {
// Slow path, the continuation was stolen.
// Slow path, the last task was stolen. Sync using the resource stack.
context_switcher::continuation result_cont;
if (syncing_task_manager->try_clean_return(result_cont)) {
// We return back to the main scheduling loop
......@@ -62,7 +62,6 @@ void scheduler::work_thread_work_section() {
auto &my_task_manager = my_state.get_task_manager();
auto const num_threads = my_state.get_scheduler().num_threads();
auto const my_id = my_state.get_id();
if (my_state.get_id() == 0) {
// Main Thread, kick off by executing the user's main code block.
......@@ -72,60 +71,46 @@ void scheduler::work_thread_work_section() {
while (!work_section_done_) {
PLS_ASSERT(my_task_manager.check_task_chain(), "Must start stealing with a clean task chain.");
// Steal Routine (will be continuously executed when there are no more fall through's).
// TODO: move into separate function
const size_t offset = my_state.get_rand() % num_threads;
const size_t max_tries = num_threads;
for (size_t i = 0; i < max_tries; i++) {
// Perform steal
size_t target = (offset + i) % num_threads;
auto &target_state = my_state.get_scheduler().thread_state_for(target);
bool steal_success = target_state.get_task_manager().steal_task(my_task_manager);
if (steal_success) {
// The stealing procedure correctly changed our chain and active task.
// Now we need to perform the 'post steal' actions (manage resources and execute the stolen task).
"We are sole owner of this chain, it has to be valid!");
// Move the traded in resource of this active task over to the stack of resources.
auto *stolen_task = &my_task_manager.get_active_task();
traded_cas_field stolen_task_cas = stolen_task->external_trading_deque_cas_.load();
if (stolen_task_cas.is_filled_with_object()) {
// Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns.
auto *exchanged_task = stolen_task_cas.get_trade_object();
my_task_manager.push_resource_on_task(stolen_task, exchanged_task);
traded_cas_field empty_field;
traded_cas_field expected_field;
if (stolen_task->external_trading_deque_cas_.compare_exchange_strong(expected_field, empty_field)) {
// All good, nothing more to do
} else {
// The last other active thread took it as its spare resource...
// ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
"Must be empty, as otherwise no one will steal the 'spare traded task'.");
auto current_root = stolen_task->resource_stack_root_.load();
current_root.value = 0;
// Execute the stolen task by jumping to it's continuation.
"A task that we can steal must have a valid continuation for us to start working.");
// we are done with this steal attempt, loop over.
// TODO: move steal routine into separate function
const size_t target = my_state.get_rand() % num_threads;
if (target == my_state.get_id()) {
auto &target_state = my_state.get_scheduler().thread_state_for(target);
task *traded_task = target_state.get_task_manager().steal_task(my_task_manager);
if (traded_task != nullptr) {
// The stealing procedure correctly changed our chain and active task.
// Now we need to perform the 'post steal' actions (manage resources and execute the stolen task).
"We are sole owner of this chain, it has to be valid!");
// Move the traded in resource of this active task over to the stack of resources.
auto *stolen_task = &my_task_manager.get_active_task();
// Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns.
my_task_manager.push_resource_on_task(stolen_task, traded_task);
auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task);
if (optional_exchanged_task) {
// All good, we pushed the task over to the stack, nothing more to do
PLS_ASSERT(*optional_exchanged_task == traded_task,
"We are currently executing this, no one else can put another task in this field!");
} else {
// The last other active thread took it as its spare resource...
// ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
auto current_root = stolen_task->resource_stack_root_.load();
current_root.value = 0;
// Execute the stolen task by jumping to it's continuation.
"A task that we can steal must have a valid continuation for us to start working.");
// We will continue execution in this line when we finished the stolen work.
// if (!my_cont_manager.falling_through()) {
// base::this_thread::sleep(5);
// }
......@@ -32,48 +32,46 @@ static task *find_task(unsigned id, unsigned depth) {
return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth);
bool task_manager::steal_task(task_manager &stealing_task_manager) {
task *task_manager::steal_task(task_manager &stealing_task_manager) {
PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain.");
PLS_ASSERT(stealing_task_manager.check_task_chain(), "Must only steal with clean task chain.");
auto peek = deque_.peek_top();
auto optional_target_task = peek.top_task_;
auto target_top = peek.top_pointer_;
if (optional_target_task) {
PLS_ASSERT(stealing_task_manager.check_task_chain(), "We are stealing, must not have a bad chain here!");
if (peek.top_task_) {
// search for the task we want to trade in
task *target_task = *optional_target_task;
task *stolen_task = *peek.top_task_;
task *traded_task = stealing_task_manager.active_task_;
for (unsigned i = 0; i < target_task->depth_; i++) {
for (unsigned i = 0; i < stolen_task->depth_; i++) {
traded_task = traded_task->next_;
// keep a reference to the rest of the task chain that we keep
task *next_own_task = traded_task->next_;
// 'unchain' the traded tasks (to help us find bugs only)
// 'unchain' the traded tasks (to help us find bugs)
traded_task->next_ = nullptr;
auto optional_result_task = deque_.pop_top(traded_task, target_top);
if (optional_result_task) {
PLS_ASSERT(target_task->thread_id_ != traded_task->thread_id_,
// perform the actual pop operation
auto pop_result_task = deque_.pop_top(traded_task, peek.top_pointer_);
if (pop_result_task) {
PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_,
"It is impossible to steal an task we already own!");
PLS_ASSERT(*optional_result_task == target_task,
PLS_ASSERT(*pop_result_task == stolen_task,
"We must only steal the task that we peeked at!");
// the steal was a success, link the chain so we own the stolen part
target_task->next_ = next_own_task;
next_own_task->prev_ = target_task;
stolen_task->next_ = next_own_task;
next_own_task->prev_ = stolen_task;
stealing_task_manager.active_task_ = stolen_task;
return true;
return traded_task;
} else {
// the steal failed, reset our chain to its old, clean state (re-link what we have broken)
traded_task->next_ = next_own_task;
return false;
return nullptr;
} else {
return false;
return nullptr;
......@@ -92,11 +90,11 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha
if (current_root.value == 0) {
// Empty, simply push in with no successor
spare_task_chain->resource_stack_next_ = nullptr;
spare_task_chain->, std::memory_order_relaxed);
} else {
// Already an entry. Find it's corresponding task and set it as our successor.
auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
spare_task_chain->resource_stack_next_ = current_root_task;
spare_task_chain->, std::memory_order_relaxed);
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
......@@ -108,15 +106,15 @@ task *task_manager::pop_resource_from_task(task *target_task) {
task *output_task;
do {
current_root = target_task->resource_stack_root_.load();
target_root.stamp = current_root.stamp + 1;
if (current_root.value == 0) {
// Empty...
return nullptr;
} else {
// Found something, try to pop it
auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
auto *next_stack_task = current_root_task->resource_stack_next_;
auto *next_stack_task = current_root_task->resource_stack_next_.load(std::memory_order_relaxed);
target_root.stamp = current_root.stamp + 1;
target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
output_task = current_root_task;
......@@ -124,19 +122,21 @@ task *task_manager::pop_resource_from_task(task *target_task) {
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
PLS_ASSERT(check_task_chain_backward(output_task), "Must only pop proper task chains.");
output_task->, std::memory_order_relaxed);
return output_task;
void task_manager::sync() {
auto continuation = active_task_->next_->run_as_task([this](context_switcher::continuation cont) {
auto *last_task = active_task_;
auto *this_task = active_task_->next_;
auto *spawning_task_manager = this;
auto *last_task = spawning_task_manager->active_task_;
auto *spawned_task = spawning_task_manager->active_task_->next_;
auto continuation = spawned_task->run_as_task([=](context_switcher::continuation cont) {
last_task->continuation_ = std::move(cont);
active_task_ = this_task;
spawning_task_manager->active_task_ = spawned_task;
context_switcher::continuation result_cont;
if (try_clean_return(result_cont)) {
if (spawning_task_manager->try_clean_return(result_cont)) {
// We return back to the main scheduling loop
active_task_->clean_ = true;
return result_cont;
......@@ -147,21 +147,17 @@ void task_manager::sync() {
if (continuation.valid()) {
// We jumped in here from the main loop, keep track!
"We only return to a sync point, never jump to it directly."
"This must therefore never return an unfinished fiber/continuation.");
bool task_manager::try_clean_return(context_switcher::continuation &result_cont) {
task *this_task = active_task_;
task *last_task = active_task_->prev_;
if (last_task == nullptr) {
// We finished the final task of the computation, return to the scheduling loop.
result_cont = thread_state::get().get_main_continuation();
return true;
PLS_ASSERT(last_task != nullptr,
"Must never try to return from a task at level 0 (no last task), as we must have a target to return to.");
// Try to get a clean resource chain to go back to the main stealing loop
task *clean_chain = pop_resource_from_task(last_task);
......@@ -179,28 +175,30 @@ bool task_manager::try_clean_return(context_switcher::continuation &result_cont)
// We got a clean chain to continue working on.
PLS_ASSERT(last_task->depth_ == clean_chain->depth_,
"Resources must only reside in the correct depth!");
PLS_ASSERT(check_task_chain_backward(clean_chain), "Can only aquire clean chains for clean returns!");
PLS_ASSERT(clean_chain != last_task,
"We want to swap out the last task and its chain to use a clean one, thus they must differ.");
"Can only acquire clean chains for clean returns!");
this_task->prev_ = clean_chain;
clean_chain->next_ = this_task;
// Walk back chain to make first task active
active_task_ = clean_chain;
while (active_task_->prev_ != nullptr) {
active_task_ = active_task_->prev_;
PLS_ASSERT(check_task_chain(), "We just aquired a clean chain...");
// jump back to continuation in main scheduling loop, time to steal some work
// jump back to the continuation in main scheduling loop, time to steal some work
result_cont = thread_state::get().get_main_continuation();
return true;
} else {
// Make sure that we are owner fo this full continuation/task chain.
last_task->next_ = this_task;
this_task->prev_ = last_task;
// We are the last one working on this task. Thus the sync must be finished, continue working.
active_task_ = last_task;
// Make sure that we are owner fo this full continuation/task chain.
active_task_->next_ = this_task;
this_task->prev_ = active_task_;
result_cont = std::move(last_task->continuation_);
return false;
