Commit a81e082d by FritzFlorian

Add support for serial tail calls.

Currently once we are serial no more parallel strains are executed. However, this could be changed using strain local resources in the future.
parent 6394b203
Pipeline #1499 passed with stages
in 4 minutes 0 seconds
...@@ -34,6 +34,7 @@ struct base_task { ...@@ -34,6 +34,7 @@ struct base_task {
stack_memory_{stack_memory}, stack_memory_{stack_memory},
stack_size_{stack_size}, stack_size_{stack_size},
is_synchronized_{false}, is_synchronized_{false},
is_serial_section_{false},
prev_{nullptr}, prev_{nullptr},
next_{nullptr}, next_{nullptr},
attached_resources_{nullptr} {} attached_resources_{nullptr} {}
...@@ -60,6 +61,7 @@ struct base_task { ...@@ -60,6 +61,7 @@ struct base_task {
size_t stack_size_; size_t stack_size_;
context_switcher::continuation continuation_; context_switcher::continuation continuation_;
bool is_synchronized_; bool is_synchronized_;
bool is_serial_section_;
// Linked list for trading/memory management // Linked list for trading/memory management
base_task *prev_; base_task *prev_;
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/strain_local_resource.h"
#include "pls/internal/profiling/profiler.h" #include "pls/internal/profiling/profiler.h"
...@@ -43,13 +44,15 @@ class scheduler { ...@@ -43,13 +44,15 @@ class scheduler {
explicit scheduler(unsigned int num_threads, explicit scheduler(unsigned int num_threads,
size_t computation_depth, size_t computation_depth,
size_t stack_size, size_t stack_size,
bool reuse_thread = true); bool reuse_thread = true,
size_t serial_stack_size = 4096 * 8);
template<typename ALLOC> template<typename ALLOC>
explicit scheduler(unsigned int num_threads, explicit scheduler(unsigned int num_threads,
size_t computation_depth, size_t computation_depth,
size_t stack_size, size_t stack_size,
bool reuse_thread, bool reuse_thread,
size_t serial_stack_size,
ALLOC &&stack_allocator); ALLOC &&stack_allocator);
/** /**
...@@ -99,6 +102,19 @@ class scheduler { ...@@ -99,6 +102,19 @@ class scheduler {
} }
/** /**
* Runs a function in the tail call portion of the stack.
* Can have nested parallelism if the scheduler is configured accordingly.
*/
template<typename Function>
static void serial(Function &&lambda) {
#ifdef PLS_SERIAL_ELUSION
lambda();
#else
serial_internal(std::forward<Function>(lambda));
#endif
}
/**
* Explicitly terminate the worker threads. Scheduler must not be used after this. * Explicitly terminate the worker threads. Scheduler must not be used after this.
*/ */
void terminate(); void terminate();
...@@ -123,6 +139,8 @@ class scheduler { ...@@ -123,6 +139,8 @@ class scheduler {
template<typename Function> template<typename Function>
static void spawn_internal(Function &&lambda); static void spawn_internal(Function &&lambda);
static void sync_internal(); static void sync_internal();
template<typename Function>
static void serial_internal(Function &&lambda);
static context_switcher::continuation slow_return(thread_state &calling_state); static context_switcher::continuation slow_return(thread_state &calling_state);
...@@ -148,6 +166,8 @@ class scheduler { ...@@ -148,6 +166,8 @@ class scheduler {
std::shared_ptr<base::stack_allocator> stack_allocator_; std::shared_ptr<base::stack_allocator> stack_allocator_;
size_t serial_stack_size_;
#if PLS_PROFILING_ENABLED #if PLS_PROFILING_ENABLED
profiling::profiler profiler_; profiling::profiler profiler_;
#endif #endif
......
...@@ -20,6 +20,7 @@ scheduler::scheduler(unsigned int num_threads, ...@@ -20,6 +20,7 @@ scheduler::scheduler(unsigned int num_threads,
size_t computation_depth, size_t computation_depth,
size_t stack_size, size_t stack_size,
bool reuse_thread, bool reuse_thread,
size_t serial_stack_size,
ALLOC &&stack_allocator) : ALLOC &&stack_allocator) :
num_threads_{num_threads}, num_threads_{num_threads},
reuse_thread_{reuse_thread}, reuse_thread_{reuse_thread},
...@@ -29,7 +30,8 @@ scheduler::scheduler(unsigned int num_threads, ...@@ -29,7 +30,8 @@ scheduler::scheduler(unsigned int num_threads,
main_thread_starter_function_{nullptr}, main_thread_starter_function_{nullptr},
work_section_done_{false}, work_section_done_{false},
terminated_{false}, terminated_{false},
stack_allocator_{std::make_shared<ALLOC>(std::forward<ALLOC>(stack_allocator))} stack_allocator_{std::make_shared<ALLOC>(std::forward<ALLOC>(stack_allocator))},
serial_stack_size_{serial_stack_size}
#if PLS_PROFILING_ENABLED #if PLS_PROFILING_ENABLED
, profiler_{num_threads} , profiler_{num_threads}
#endif #endif
...@@ -44,7 +46,11 @@ scheduler::scheduler(unsigned int num_threads, ...@@ -44,7 +46,11 @@ scheduler::scheduler(unsigned int num_threads,
computation_depth, computation_depth,
stack_size, stack_size,
stack_allocator_)); stack_allocator_));
auto &this_thread_state = thread_states_.emplace_back(std::make_unique<thread_state>(*this, i, *this_task_manager)); auto &this_thread_state = thread_states_.emplace_back(std::make_unique<thread_state>(*this,
i,
*this_task_manager,
stack_allocator_,
serial_stack_size));
if (reuse_thread && i == 0) { if (reuse_thread && i == 0) {
worker_threads_.emplace_back(); worker_threads_.emplace_back();
...@@ -148,10 +154,24 @@ template<typename Function> ...@@ -148,10 +154,24 @@ template<typename Function>
void scheduler::spawn_internal(Function &&lambda) { void scheduler::spawn_internal(Function &&lambda) {
if (thread_state::is_scheduler_active()) { if (thread_state::is_scheduler_active()) {
thread_state &spawning_state = thread_state::get(); thread_state &spawning_state = thread_state::get();
scheduler &scheduler = spawning_state.get_scheduler();
base_task *last_task = spawning_state.get_active_task(); base_task *last_task = spawning_state.get_active_task();
base_task *spawned_task = last_task->next_; base_task *spawned_task = last_task->next_;
// We are at the end of our allocated tasks, go over to serial execution.
if (spawned_task == nullptr) {
scheduler::serial_internal(std::forward<Function>(lambda));
return;
}
// We are in a serial section.
// For now we simply stay serial. Later versions could add nested parallel sections.
if (last_task->is_serial_section_) {
lambda();
return;
}
// Carry over the resources allocated by parents
auto *attached_resources = last_task->attached_resources_.load(std::memory_order_relaxed); auto *attached_resources = last_task->attached_resources_.load(std::memory_order_relaxed);
spawned_task->attached_resources_.store(attached_resources, std::memory_order_relaxed); spawned_task->attached_resources_.store(attached_resources, std::memory_order_relaxed);
...@@ -171,6 +191,7 @@ void scheduler::spawn_internal(Function &&lambda) { ...@@ -171,6 +191,7 @@ void scheduler::spawn_internal(Function &&lambda) {
// we are now executing the new task, allow others to steal the last task continuation. // we are now executing the new task, allow others to steal the last task continuation.
spawned_task->is_synchronized_ = true; spawned_task->is_synchronized_ = true;
spawning_state.set_active_task(spawned_task); spawning_state.set_active_task(spawned_task);
spawning_state.get_task_manager().push_local_task(last_task); spawning_state.get_task_manager().push_local_task(last_task);
#if PLS_SLEEP_WORKERS_ON_EMPTY #if PLS_SLEEP_WORKERS_ON_EMPTY
// TODO: relax atomic operations on empty flag // TODO: relax atomic operations on empty flag
...@@ -269,6 +290,29 @@ void scheduler::spawn_internal(Function &&lambda) { ...@@ -269,6 +290,29 @@ void scheduler::spawn_internal(Function &&lambda) {
} }
} }
template<typename Function>
void scheduler::serial_internal(Function &&lambda) {
if (thread_state::is_scheduler_active()) {
thread_state &spawning_state = thread_state::get();
base_task *active_task = spawning_state.get_active_task();
if (active_task->is_serial_section_) {
lambda();
} else {
active_task->is_serial_section_ = true;
context_switcher::enter_context(spawning_state.get_serial_call_stack(),
spawning_state.get_serial_call_stack_size(),
[&](auto cont) {
lambda();
return cont;
});
active_task->is_serial_section_ = false;
}
} else {
lambda();
}
}
} }
#endif //PLS_SCHEDULER_IMPL_H #endif //PLS_SCHEDULER_IMPL_H
...@@ -29,6 +29,15 @@ struct PLS_CACHE_ALIGN strain_resource { ...@@ -29,6 +29,15 @@ struct PLS_CACHE_ALIGN strain_resource {
}; };
class strain_local_resource { class strain_local_resource {
struct local_item {
local_item(strain_local_resource *strain_local_resource, unsigned index, unsigned depth) :
initial_resource_{std::make_unique<strain_resource>(strain_local_resource, index, depth)},
resource_{initial_resource_.get()} {}
std::unique_ptr<strain_resource> initial_resource_;
strain_resource *resource_;
};
public:
// Handle to a stack-like resource (index of it). // Handle to a stack-like resource (index of it).
// MUST be handled in a RAII like manner on the stack // MUST be handled in a RAII like manner on the stack
// and must not be copied or moved for this to work. // and must not be copied or moved for this to work.
...@@ -42,19 +51,14 @@ class strain_local_resource { ...@@ -42,19 +51,14 @@ class strain_local_resource {
explicit item_handle(strain_resource *resource) : resource_{resource} {} explicit item_handle(strain_resource *resource) : resource_{resource} {}
~item_handle(); ~item_handle();
unsigned get_strain_index() {
return resource_->index_;
}
private: private:
strain_resource *resource_; strain_resource *resource_;
}; };
struct local_item {
local_item(strain_local_resource *strain_local_resource, unsigned index, unsigned depth) :
initial_resource_{std::make_unique<strain_resource>(strain_local_resource, index, depth)},
resource_{initial_resource_.get()} {}
std::unique_ptr<strain_resource> initial_resource_;
strain_resource *resource_;
};
public:
strain_local_resource(unsigned num_threads, strain_local_resource(unsigned num_threads,
unsigned depth) : local_items_() { unsigned depth) : local_items_() {
local_items_.reserve(num_threads); local_items_.reserve(num_threads);
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/base/stack_allocator.h"
#include "pls/internal/scheduling/base_task.h" #include "pls/internal/scheduling/base_task.h"
#include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/task_manager.h"
...@@ -46,6 +47,10 @@ struct PLS_CACHE_ALIGN thread_state { ...@@ -46,6 +47,10 @@ struct PLS_CACHE_ALIGN thread_state {
static thread_local bool is_scheduler_active_; static thread_local bool is_scheduler_active_;
std::shared_ptr<base::stack_allocator> stack_allocator_;
char *serial_call_stack_;
size_t serial_call_stack_size_;
#if PLS_SLEEP_WORKERS_ON_EMPTY #if PLS_SLEEP_WORKERS_ON_EMPTY
PLS_CACHE_ALIGN std::atomic<data_structures::stamped_integer> queue_empty_{EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; PLS_CACHE_ALIGN std::atomic<data_structures::stamped_integer> queue_empty_{EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
#endif #endif
...@@ -53,12 +58,22 @@ struct PLS_CACHE_ALIGN thread_state { ...@@ -53,12 +58,22 @@ struct PLS_CACHE_ALIGN thread_state {
public: public:
explicit thread_state(scheduler &scheduler, explicit thread_state(scheduler &scheduler,
unsigned thread_id, unsigned thread_id,
task_manager &task_manager) : task_manager &task_manager,
std::shared_ptr<base::stack_allocator> &stack_allocator,
size_t serial_call_stack_size) :
thread_id_{thread_id}, thread_id_{thread_id},
scheduler_{scheduler}, scheduler_{scheduler},
task_manager_{task_manager}, task_manager_{task_manager},
random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count()) + thread_id}, random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count()) + thread_id},
active_task_{task_manager.get_task(0)} {}; active_task_{task_manager.get_task(0)},
stack_allocator_{stack_allocator},
serial_call_stack_size_{serial_call_stack_size} {
serial_call_stack_ = stack_allocator->allocate_stack(serial_call_stack_size_);
};
~thread_state() {
stack_allocator_->free_stack(serial_call_stack_size_, serial_call_stack_);
}
// Do not allow accidental copy/move operations. // Do not allow accidental copy/move operations.
thread_state(const thread_state &) = delete; thread_state(const thread_state &) = delete;
...@@ -97,7 +112,10 @@ struct PLS_CACHE_ALIGN thread_state { ...@@ -97,7 +112,10 @@ struct PLS_CACHE_ALIGN thread_state {
} }
void set_active_task(base_task *active_task) { active_task_ = active_task; } void set_active_task(base_task *active_task) { active_task_ = active_task; }
base_task *get_active_task() const { return active_task_; } [[nodiscard]] base_task *get_active_task() const { return active_task_; }
char *get_serial_call_stack() { return serial_call_stack_; }
[[nodiscard]] size_t get_serial_call_stack_size() const { return serial_call_stack_size_; }
#if PLS_SLEEP_WORKERS_ON_EMPTY #if PLS_SLEEP_WORKERS_ON_EMPTY
[[nodiscard]] std::atomic<data_structures::stamped_integer> &get_queue_empty_flag() { [[nodiscard]] std::atomic<data_structures::stamped_integer> &get_queue_empty_flag() {
......
...@@ -23,6 +23,10 @@ static void spawn(Function &&function) { ...@@ -23,6 +23,10 @@ static void spawn(Function &&function) {
static void sync() { static void sync() {
scheduler::sync(); scheduler::sync();
} }
template<typename Function>
static void serial(Function &&function) {
scheduler::serial(std::forward<Function>(function));
}
// general helpers that can be handy when using PLS // general helpers that can be handy when using PLS
template<class C, typename R, typename ...ARGS> template<class C, typename R, typename ...ARGS>
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
#include "pls/internal/scheduling/strain_local_resource.h" #include "pls/internal/scheduling/strain_local_resource.h"
#include "pls/internal/build_flavour.h" #include "pls/internal/build_flavour.h"
#include "pls/internal/base/futex_wrapper.h"
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include <thread> #include <thread>
...@@ -14,11 +13,13 @@ namespace pls::internal::scheduling { ...@@ -14,11 +13,13 @@ namespace pls::internal::scheduling {
scheduler::scheduler(unsigned int num_threads, scheduler::scheduler(unsigned int num_threads,
size_t computation_depth, size_t computation_depth,
size_t stack_size, size_t stack_size,
bool reuse_thread) : scheduler(num_threads, bool reuse_thread,
computation_depth, size_t serial_stack_size) : scheduler(num_threads,
stack_size, computation_depth,
reuse_thread, stack_size,
base::mmap_stack_allocator{}) {} reuse_thread,
serial_stack_size,
base::mmap_stack_allocator{}) {}
scheduler::~scheduler() { scheduler::~scheduler() {
terminate(); terminate();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment