diff --git a/lib/pls/include/pls/internal/scheduling/base_task.h b/lib/pls/include/pls/internal/scheduling/base_task.h index 8fab083..2631028 100644 --- a/lib/pls/include/pls/internal/scheduling/base_task.h +++ b/lib/pls/include/pls/internal/scheduling/base_task.h @@ -34,6 +34,7 @@ struct base_task { stack_memory_{stack_memory}, stack_size_{stack_size}, is_synchronized_{false}, + is_serial_section_{false}, prev_{nullptr}, next_{nullptr}, attached_resources_{nullptr} {} @@ -60,6 +61,7 @@ struct base_task { size_t stack_size_; context_switcher::continuation continuation_; bool is_synchronized_; + bool is_serial_section_; // Linked list for trading/memory management base_task *prev_; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index a427b58..7a1b0ba 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -14,6 +14,7 @@ #include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/task_manager.h" +#include "pls/internal/scheduling/strain_local_resource.h" #include "pls/internal/profiling/profiler.h" @@ -43,13 +44,15 @@ class scheduler { explicit scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, - bool reuse_thread = true); + bool reuse_thread = true, + size_t serial_stack_size = 4096 * 8); template explicit scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread, + size_t serial_stack_size, ALLOC &&stack_allocator); /** @@ -99,6 +102,19 @@ class scheduler { } /** + * Runs a function in the tail call portion of the stack. + * Can have nested parallelism if the scheduler is configured accordingly. + */ + template + static void serial(Function &&lambda) { +#ifdef PLS_SERIAL_ELUSION + lambda(); +#else + serial_internal(std::forward(lambda)); +#endif + } + + /** * Explicitly terminate the worker threads. Scheduler must not be used after this. */ void terminate(); @@ -123,6 +139,8 @@ class scheduler { template static void spawn_internal(Function &&lambda); static void sync_internal(); + template + static void serial_internal(Function &&lambda); static context_switcher::continuation slow_return(thread_state &calling_state); @@ -148,6 +166,8 @@ class scheduler { std::shared_ptr stack_allocator_; + size_t serial_stack_size_; + #if PLS_PROFILING_ENABLED profiling::profiler profiler_; #endif diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 2f55d79..48c1b9b 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -20,6 +20,7 @@ scheduler::scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread, + size_t serial_stack_size, ALLOC &&stack_allocator) : num_threads_{num_threads}, reuse_thread_{reuse_thread}, @@ -29,7 +30,8 @@ scheduler::scheduler(unsigned int num_threads, main_thread_starter_function_{nullptr}, work_section_done_{false}, terminated_{false}, - stack_allocator_{std::make_shared(std::forward(stack_allocator))} + stack_allocator_{std::make_shared(std::forward(stack_allocator))}, + serial_stack_size_{serial_stack_size} #if PLS_PROFILING_ENABLED , profiler_{num_threads} #endif @@ -44,7 +46,11 @@ scheduler::scheduler(unsigned int num_threads, computation_depth, stack_size, stack_allocator_)); - auto &this_thread_state = thread_states_.emplace_back(std::make_unique(*this, i, *this_task_manager)); + auto &this_thread_state = thread_states_.emplace_back(std::make_unique(*this, + i, + *this_task_manager, + stack_allocator_, + serial_stack_size)); if (reuse_thread && i == 0) { worker_threads_.emplace_back(); @@ -148,10 +154,24 @@ template void scheduler::spawn_internal(Function &&lambda) { if (thread_state::is_scheduler_active()) { thread_state &spawning_state = thread_state::get(); + scheduler &scheduler = spawning_state.get_scheduler(); base_task *last_task = spawning_state.get_active_task(); base_task *spawned_task = last_task->next_; + // We are at the end of our allocated tasks, go over to serial execution. + if (spawned_task == nullptr) { + scheduler::serial_internal(std::forward(lambda)); + return; + } + // We are in a serial section. + // For now we simply stay serial. Later versions could add nested parallel sections. + if (last_task->is_serial_section_) { + lambda(); + return; + } + + // Carry over the resources allocated by parents auto *attached_resources = last_task->attached_resources_.load(std::memory_order_relaxed); spawned_task->attached_resources_.store(attached_resources, std::memory_order_relaxed); @@ -171,6 +191,7 @@ void scheduler::spawn_internal(Function &&lambda) { // we are now executing the new task, allow others to steal the last task continuation. spawned_task->is_synchronized_ = true; spawning_state.set_active_task(spawned_task); + spawning_state.get_task_manager().push_local_task(last_task); #if PLS_SLEEP_WORKERS_ON_EMPTY // TODO: relax atomic operations on empty flag @@ -269,6 +290,29 @@ void scheduler::spawn_internal(Function &&lambda) { } } +template +void scheduler::serial_internal(Function &&lambda) { + if (thread_state::is_scheduler_active()) { + thread_state &spawning_state = thread_state::get(); + base_task *active_task = spawning_state.get_active_task(); + + if (active_task->is_serial_section_) { + lambda(); + } else { + active_task->is_serial_section_ = true; + context_switcher::enter_context(spawning_state.get_serial_call_stack(), + spawning_state.get_serial_call_stack_size(), + [&](auto cont) { + lambda(); + return cont; + }); + active_task->is_serial_section_ = false; + } + } else { + lambda(); + } +} + } #endif //PLS_SCHEDULER_IMPL_H diff --git a/lib/pls/include/pls/internal/scheduling/strain_local_resource.h b/lib/pls/include/pls/internal/scheduling/strain_local_resource.h index 8d971d8..27f62d6 100644 --- a/lib/pls/include/pls/internal/scheduling/strain_local_resource.h +++ b/lib/pls/include/pls/internal/scheduling/strain_local_resource.h @@ -29,6 +29,15 @@ struct PLS_CACHE_ALIGN strain_resource { }; class strain_local_resource { + struct local_item { + local_item(strain_local_resource *strain_local_resource, unsigned index, unsigned depth) : + initial_resource_{std::make_unique(strain_local_resource, index, depth)}, + resource_{initial_resource_.get()} {} + + std::unique_ptr initial_resource_; + strain_resource *resource_; + }; + public: // Handle to a stack-like resource (index of it). // MUST be handled in a RAII like manner on the stack // and must not be copied or moved for this to work. @@ -42,19 +51,14 @@ class strain_local_resource { explicit item_handle(strain_resource *resource) : resource_{resource} {} ~item_handle(); + unsigned get_strain_index() { + return resource_->index_; + } + private: strain_resource *resource_; }; - struct local_item { - local_item(strain_local_resource *strain_local_resource, unsigned index, unsigned depth) : - initial_resource_{std::make_unique(strain_local_resource, index, depth)}, - resource_{initial_resource_.get()} {} - - std::unique_ptr initial_resource_; - strain_resource *resource_; - }; - public: strain_local_resource(unsigned num_threads, unsigned depth) : local_items_() { local_items_.reserve(num_threads); diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 8c7dcf3..b46b5d2 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -13,6 +13,7 @@ #include "pls/internal/base/system_details.h" #include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/base/stack_allocator.h" #include "pls/internal/scheduling/base_task.h" #include "pls/internal/scheduling/task_manager.h" @@ -46,6 +47,10 @@ struct PLS_CACHE_ALIGN thread_state { static thread_local bool is_scheduler_active_; + std::shared_ptr stack_allocator_; + char *serial_call_stack_; + size_t serial_call_stack_size_; + #if PLS_SLEEP_WORKERS_ON_EMPTY PLS_CACHE_ALIGN std::atomic queue_empty_{EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY}; #endif @@ -53,12 +58,22 @@ struct PLS_CACHE_ALIGN thread_state { public: explicit thread_state(scheduler &scheduler, unsigned thread_id, - task_manager &task_manager) : + task_manager &task_manager, + std::shared_ptr &stack_allocator, + size_t serial_call_stack_size) : thread_id_{thread_id}, scheduler_{scheduler}, task_manager_{task_manager}, random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count()) + thread_id}, - active_task_{task_manager.get_task(0)} {}; + active_task_{task_manager.get_task(0)}, + stack_allocator_{stack_allocator}, + serial_call_stack_size_{serial_call_stack_size} { + serial_call_stack_ = stack_allocator->allocate_stack(serial_call_stack_size_); + }; + + ~thread_state() { + stack_allocator_->free_stack(serial_call_stack_size_, serial_call_stack_); + } // Do not allow accidental copy/move operations. thread_state(const thread_state &) = delete; @@ -97,7 +112,10 @@ struct PLS_CACHE_ALIGN thread_state { } void set_active_task(base_task *active_task) { active_task_ = active_task; } - base_task *get_active_task() const { return active_task_; } + [[nodiscard]] base_task *get_active_task() const { return active_task_; } + + char *get_serial_call_stack() { return serial_call_stack_; } + [[nodiscard]] size_t get_serial_call_stack_size() const { return serial_call_stack_size_; } #if PLS_SLEEP_WORKERS_ON_EMPTY [[nodiscard]] std::atomic &get_queue_empty_flag() { diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index daff79d..645d65f 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -23,6 +23,10 @@ static void spawn(Function &&function) { static void sync() { scheduler::sync(); } +template +static void serial(Function &&function) { + scheduler::serial(std::forward(function)); +} // general helpers that can be handy when using PLS template diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 8f896a5..b797131 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -4,7 +4,6 @@ #include "pls/internal/scheduling/strain_local_resource.h" #include "pls/internal/build_flavour.h" -#include "pls/internal/base/futex_wrapper.h" #include "pls/internal/base/error_handling.h" #include @@ -14,11 +13,13 @@ namespace pls::internal::scheduling { scheduler::scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, - bool reuse_thread) : scheduler(num_threads, - computation_depth, - stack_size, - reuse_thread, - base::mmap_stack_allocator{}) {} + bool reuse_thread, + size_t serial_stack_size) : scheduler(num_threads, + computation_depth, + stack_size, + reuse_thread, + serial_stack_size, + base::mmap_stack_allocator{}) {} scheduler::~scheduler() { terminate();