diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index bf79171..7117455 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -1,7 +1,5 @@ #include "pls/internal/scheduling/scheduler.h" -#include "pls/internal/scheduling/parallel_result.h" #include "pls/internal/scheduling/scheduler_memory.h" -#include "pls/internal/helpers/profiler.h" using namespace pls::internal::scheduling; @@ -14,24 +12,20 @@ using namespace pls::internal::scheduling; using namespace comparison_benchmarks::base; -parallel_result pls_fib(int n) { +int pls_fib(int n) { if (n <= 1) { - return parallel_result{1}; + return 1; } - return scheduler::par([=]() { - return pls_fib(n - 1); - }, [=]() { - return pls_fib(n - 2); - }).then([=](int a, int b) { - return parallel_result{a + b}; - }); + int a = pls_fib(n - 1); + int b = pls_fib(n - 2); + + return a + b; } -constexpr int MAX_NUM_THREADS = 8; +constexpr int MAX_NUM_THREADS = 1; constexpr int MAX_NUM_TASKS = 64; -constexpr int MAX_NUM_CONTS = 64; -constexpr int MAX_CONT_SIZE = 256; +constexpr int MAX_STACK_SIZE = 128; int main(int argc, char **argv) { int num_threads; @@ -39,43 +33,27 @@ int main(int argc, char **argv) { benchmark_runner::read_args(argc, argv, num_threads, directory); string test_name = to_string(num_threads) + ".csv"; - string full_directory = directory + "/PLS_v2/"; + string full_directory = directory + "/PLS_v3/"; benchmark_runner runner{full_directory, test_name}; static_scheduler_memory static_scheduler_memory; + MAX_STACK_SIZE> static_scheduler_memory; - scheduler scheduler{static_scheduler_memory, (unsigned int) num_threads}; + scheduler scheduler{static_scheduler_memory, (unsigned) num_threads}; volatile int res; for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) { scheduler.perform_work([&]() { - return scheduler::par([&]() { - return pls_fib(fib::INPUT_N); - }, []() { - return parallel_result{0}; - }).then([&](int result, int) { - res = result; - return parallel_result{0}; - }); + res = pls_fib(fib::INPUT_N); }); } for (int i = 0; i < fib::NUM_ITERATIONS; i++) { scheduler.perform_work([&]() { runner.start_iteration(); - - return scheduler::par([&]() { - return pls_fib(fib::INPUT_N); - }, []() { - return parallel_result{0}; - }).then([&](int result, int) { - res = result; - runner.end_iteration(); - return parallel_result{0}; - }); + res = pls_fib(fib::INPUT_N); + runner.end_iteration(); }); } runner.commit_results(true); diff --git a/lib/context_switcher/include/context_switcher/context_switcher.h b/lib/context_switcher/include/context_switcher/context_switcher.h index 757deda..0e1be46 100644 --- a/lib/context_switcher/include/context_switcher/context_switcher.h +++ b/lib/context_switcher/include/context_switcher/context_switcher.h @@ -24,11 +24,7 @@ continuation enter_context(assembly_bindings::stack_pointer_t stack_memory, size return continuation{assembly_bindings::__cs_enter_context(stack_base, captured_lambda, callback, stack_limit)}; } -continuation switch_context(continuation &&cont) { - assembly_bindings::continuation_t cont_pointer = cont.consume(); - - return continuation{assembly_bindings::__cs_switch_context(cont_pointer)}; -} +continuation switch_context(continuation &&cont); } diff --git a/lib/context_switcher/include/context_switcher/lambda_capture.h b/lib/context_switcher/include/context_switcher/lambda_capture.h index ad0fad2..cf286cc 100644 --- a/lib/context_switcher/include/context_switcher/lambda_capture.h +++ b/lib/context_switcher/include/context_switcher/lambda_capture.h @@ -19,6 +19,7 @@ namespace context_switcher { template struct lambda_capture { + // TODO: Check if we need an extra template here to perform the move explicit lambda_capture(F &&lambda) : lambda_{std::forward(lambda)} {} assembly_bindings::continuation_t operator()(assembly_bindings::continuation_t continuation_pointer) { diff --git a/lib/context_switcher/src/context_switcher.cpp b/lib/context_switcher/src/context_switcher.cpp index 8b13789..28c3cd9 100644 --- a/lib/context_switcher/src/context_switcher.cpp +++ b/lib/context_switcher/src/context_switcher.cpp @@ -1 +1,11 @@ +#include "context_switcher/context_switcher.h" +namespace context_switcher { + +continuation switch_context(continuation &&cont) { + assembly_bindings::continuation_t cont_pointer = cont.consume(); + + return continuation{assembly_bindings::__cs_switch_context(cont_pointer)}; +} + +} diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index b22231d..7ec8d16 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -58,7 +58,12 @@ add_library(pls STATIC include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/cont_manager.h include/pls/internal/scheduling/cont.h - include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h src/internal/base/error_handling.cpp include/pls/internal/data_structures/bounded_trading_deque.h ../context_switcher/src/context_switcher.cpp) + include/pls/internal/data_structures/bounded_ws_deque.h + include/pls/internal/data_structures/optional.h + include/pls/internal/scheduling/memory_block.h + include/pls/internal/scheduling/thread_state_static.h + src/internal/base/error_handling.cpp + include/pls/internal/data_structures/bounded_trading_deque.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls @@ -72,6 +77,7 @@ target_include_directories(pls # Add cmake dependencies here if needed target_link_libraries(pls Threads::Threads # pthread support + context_switcher # coroutine support ) if (EASY_PROFILER) target_link_libraries(pls easy_profiler) @@ -79,7 +85,7 @@ endif () # Rules for istalling the library on a system # ...binaries -INSTALL(TARGETS pls +INSTALL(TARGETS pls context_switcher EXPORT pls-targets LIBRARY DESTINATION lib/pls diff --git a/lib/pls/include/pls/internal/scheduling/cont.h b/lib/pls/include/pls/internal/scheduling/cont.h deleted file mode 100644 index 1b981e8..0000000 --- a/lib/pls/include/pls/internal/scheduling/cont.h +++ /dev/null @@ -1,163 +0,0 @@ - -#ifndef PLS_INTERNAL_SCHEDULING_CONT_H_ -#define PLS_INTERNAL_SCHEDULING_CONT_H_ - -#include -#include -#include - -#include "pls/internal/data_structures/stamped_integer.h" -#include "pls/internal/data_structures/delayed_initialization.h" -#include "pls/internal/base/alignment.h" -#include "pls/internal/base/error_handling.h" - -#include "pls/internal/helpers/profiler.h" - -#include "parallel_result.h" -#include "memory_block.h" - -namespace pls { -namespace internal { -namespace scheduling { - -class base_cont { - protected: - // We plan to only init the members for a continuation on the slow path. - // If we can execute everything inline we simply skip it saving runtime overhead. - template - using delayed_init = data_structures::delayed_initialization; - - public: - explicit base_cont(base_cont *parent, memory_block *memory_block, bool is_right_child) - : parent_{parent}, - memory_block_{memory_block}, - is_right_child_{is_right_child} { - PLS_ASSERT(parent_ == nullptr || parent_->memory_block_->get_depth() == memory_block_->get_depth() - 1, - "Must only build cont chains with matching depth!") - }; - - /** - * Execute the continuation itself. - * Make sure to only call when all required results are in. - * Will store the result in it's parent, but not mess with any counters. - */ - virtual void execute() = 0; - - /** - * Execute the right hand side task associated with the continuation. - * Will store the result in it's parent, but not mess with any counters. - */ - virtual void execute_task() = 0; - virtual base_task *get_task() = 0; - - virtual void *get_right_result_pointer() = 0; - virtual void *get_left_result_pointer() = 0; - - template - void store_right_result(T &&result) { - using BASE_T = typename std::remove_cv::type>::type; - reinterpret_cast *>(get_right_result_pointer())->initialize(std::forward(result)); - } - - template - void store_left_result(T &&result) { - using BASE_T = typename std::remove_cv::type>::type; - reinterpret_cast *>(get_left_result_pointer())->initialize(std::forward(result)); - } - - base_cont *get_parent() { return parent_; } - memory_block *get_memory_block() { return memory_block_; } - bool is_right_child() const { return is_right_child_; } - - protected: - base_cont *parent_; - memory_block *memory_block_; - bool is_right_child_; -}; - -template -class cont : public base_cont { - private: - template - struct result_runner { - // Strip off unwanted modifiers... - using BASE_RES_TYPE = typename std::remove_cv::type>::type; - - static void execute(cont &cont) { - parallel_result - result{cont.function_((*cont.left_result_).value(), (*cont.right_result_).value())}; - if (result.fast_path() && cont.parent_ != nullptr) { - if (cont.is_right_child()) { - cont.parent_->store_right_result(std::move(result)); - } else { - cont.parent_->store_left_result(std::move(result)); - } - } - } - }; - template - struct result_runner> { - static void execute(cont &cont) { - auto result = cont.function_((*cont.left_result_).value(), (*cont.right_result_).value()); - if (result.fast_path() && cont.parent_) { - if (cont.is_right_child()) { - cont.parent_->store_right_result(std::move(result)); - } else { - cont.parent_->store_left_result(std::move(result)); - } - } - } - }; - - public: - template - explicit cont(base_cont *parent, - memory_block *memory_block, - bool is_right_child, - FARG &&function, - T2ARGS...task_2_args): - base_cont(parent, memory_block, is_right_child), - function_{std::forward(function)}, - task_{std::forward(task_2_args)..., this} {}; - - void execute() override { - PROFILE_CONTINUATION("execute_cont"); - using result_type = decltype(function_((*left_result_).value(), (*right_result_).value())); - result_runner::execute(*this); - - this->~cont(); - auto *memory_block = this->get_memory_block(); - memory_block->free_buffer(); - memory_block->reset_state(); - } - - void execute_task() override { - task_.execute(); - } - base_task *get_task() override { - return &task_; - } - - void *get_left_result_pointer() override { - return &left_result_; - } - void *get_right_result_pointer() override { - return &right_result_; - } - - private: - // Initial data members. These slow down the fast path, try to init them lazy when possible. - F function_; - T2 task_; - - // Some fields/actual values stay uninitialized (save time on the fast path if we don not need them). - // More fields untouched on the fast path is good, but for ease of an implementation we only keep some for now. - delayed_init left_result_; - delayed_init right_result_; -}; - -} -} -} - -#endif //PLS_INTERNAL_SCHEDULING_CONT_H_ diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h deleted file mode 100644 index 3fa279c..0000000 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ /dev/null @@ -1,203 +0,0 @@ - -#ifndef PLS_CONT_MANAGER_H_ -#define PLS_CONT_MANAGER_H_ - -#include -#include -#include - -#include "pls/internal/data_structures/aligned_stack.h" -#include "pls/internal/scheduling/cont.h" -#include "pls/internal/scheduling/thread_state.h" - -namespace pls { -namespace internal { -namespace scheduling { - -class cont_manager { - public: - // Helper to pass the compile time constants to the constructor. - template - struct template_args {}; - - template - explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args) - : max_cont_size_{MAX_CONT_SIZE}, num_conts_{NUM_CONTS} { - - // First node is currently active and our local start - active_node_ = init_memory_block(cont_storage, nullptr, 0); - - // Build up chain after it - memory_block *current_node = active_node_; - for (size_t i = 1; i < NUM_CONTS; i++) { - memory_block *next_node = init_memory_block(cont_storage, current_node, i); - current_node->set_next(next_node); - current_node = next_node; - } - }; - - // Aquire and release memory blocks... - memory_block *get_next_memory_block() { - auto result = active_node_; - active_node_ = active_node_->get_next(); - return result; - } - void return_memory_block() { - active_node_ = active_node_->get_prev(); - } - void move_active_node(int depth) { - if (depth < 0) { - for (long i = 0; i < (depth * -1); i++) { - active_node_ = active_node_->get_prev(); - } - } else { - for (long i = 0; i < depth; i++) { - active_node_ = active_node_->get_next(); - } - } - } - void move_active_node_to_start() { - move_active_node(-1 * active_node_->get_depth()); - } - memory_block *get_active_node() { - return active_node_; - } - - bool is_clean() { - if (get_active_node()->get_depth() == 0) { - memory_block *current_node = active_node_; - for (size_t i = 1; i < num_conts_; i++) { - if (current_node->get_prev() != nullptr && current_node->get_prev()->get_next() != current_node) { - return false; - } - if (current_node->is_buffer_used()) { - return false; - } - current_node = current_node->get_next(); - } - } else { - return false; - } - - return true; - } - - // Manage the fall through behaviour/slow path behaviour - bool falling_through() const { - return fall_through_; - } - void fall_through_and_notify_cont(base_cont *notified_cont, bool is_child_right) { - fall_through_ = true; - fall_through_cont_ = notified_cont; - fall_through_child_right = is_child_right; - } - - void aquire_memory_chain(memory_block *target_chain) { - PLS_ASSERT(active_node_->get_depth() == target_chain->get_depth() + 1, - "Can only steal aquire chain parts with correct depth."); - - active_node_->set_prev(target_chain); - target_chain->set_next(active_node_); - } - - void execute_fall_through_code() { - PLS_ASSERT(falling_through(), "Must be falling through to execute the associated code.") - - auto &my_state = thread_state::get(); - - // Copy fall through status and reset it (for potentially nested execution paths). - auto *notified_cont = fall_through_cont_; - - fall_through_cont_ = nullptr; - fall_through_ = false; - - // Keep the target chain before we execute, as this potentially frees the memory - auto *target_memory_block = notified_cont->get_memory_block(); - auto *target_chain = target_memory_block->get_offered_chain().load(); - - // Notify the next continuation of finishing a child... - if (target_memory_block->get_results_missing().fetch_add(-1) == 1) { - // ... we finished the continuation. - // We are now in charge continuing to execute the above continuation chain. - - PLS_ASSERT(active_node_->get_prev()->get_depth() == target_memory_block->get_depth(), - "We must hold the system invariant to be in the correct depth.") - if (active_node_->get_prev() != target_memory_block) { - // We do not own the thing we will execute. - // Own it by swapping the chain belonging to it in. - aquire_memory_chain(target_memory_block); - } - my_state.parent_cont_ = notified_cont->get_parent(); - my_state.right_spawn_ = notified_cont->is_right_child(); - active_node_ = target_memory_block; - notified_cont->execute(); - if (!falling_through() && notified_cont->get_parent() != nullptr) { - fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child()); - } - return; - } else { - // ... we did not finish the last continuation. - // We are no longer in charge of executing the above continuation chain. - - PLS_ASSERT(active_node_->get_prev()->get_depth() == target_memory_block->get_depth(), - "We must hold the system invariant to be in the correct depth.") - if (active_node_->get_prev() == target_memory_block) { - // We own the thing we are not allowed to execute. - // Get rid of the ownership by using the offered chain. - aquire_memory_chain(target_chain); - } - - move_active_node_to_start(); - // We are done here...nothing more to execute - return; - } - } - - private: - template - static memory_block *init_memory_block(data_structures::aligned_stack &cont_storage, - memory_block *prev, - unsigned long depth) { - // Represents one cont_node and its corresponding memory buffer (as one continuous block of memory). - constexpr size_t buffer_size = MAX_CONT_SIZE - base::alignment::next_alignment(sizeof(memory_block)); - char *memory_block_ptr = cont_storage.push_bytes(); - char *memory_block_buffer_ptr = cont_storage.push_bytes(buffer_size); - - return new(memory_block_ptr) memory_block(memory_block_buffer_ptr, buffer_size, prev, depth); - } - - private: - const size_t max_cont_size_; - const size_t num_conts_; - - /** - * Managing the continuation chain. - */ - memory_block *active_node_; - - /** - * Managing falling through back to the scheduler. - */ - bool fall_through_{false}; - bool fall_through_child_right{false}; - base_cont *fall_through_cont_{nullptr}; -}; - -template -class static_cont_manager { - public: - static_cont_manager() - : static_cont_storage_{}, - cont_manager_(static_cont_storage_.get_stack(), cont_manager::template_args{}) {} - cont_manager &get_cont_manager() { return cont_manager_; } - - private: - data_structures::static_aligned_stack static_cont_storage_; - cont_manager cont_manager_; -}; - -} -} -} - -#endif //PLS_CONT_MANAGER_H_ diff --git a/lib/pls/include/pls/internal/scheduling/memory_block.h b/lib/pls/include/pls/internal/scheduling/memory_block.h deleted file mode 100644 index f4d42c1..0000000 --- a/lib/pls/include/pls/internal/scheduling/memory_block.h +++ /dev/null @@ -1,121 +0,0 @@ - -#ifndef PLS_INTERNAL_SCHEDULING_CONT_NODE_H_ -#define PLS_INTERNAL_SCHEDULING_CONT_NODE_H_ - -namespace pls { -namespace internal { -namespace scheduling { - -/** - * A block of memory that can be used to store tasks and continuations. - * Threads trade these blocks while executing and stealing tasks. - * - * Each block has an associated, raw memory buffer. The user can place his object - * in this memory region as needed. He is responsible for calling deconstructors of the - * placed objects. - */ -class memory_block { - public: - memory_block(char *memory_buffer, - size_t memory_buffer_size, - memory_block *prev, - int depth) - : prev_{prev}, - next_{nullptr}, - offered_chain_{nullptr}, - results_missing_{2}, - memory_buffer_{memory_buffer}, - memory_buffer_size_{memory_buffer_size}, - memory_buffer_used_{false}, - depth_{depth} {}; - - template - T *place_in_buffer(ARGS &&...args) { - PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node."); - - memory_buffer_used_ = true; - auto *result = new(memory_buffer_) T(std::forward(args)...); - continuation_ = result; - - return result; - } - void free_buffer() { - PLS_ASSERT(memory_buffer_used_, "Can only free a memory spot when it is in use.") - memory_buffer_used_ = false; - } - bool is_buffer_used() { - return memory_buffer_used_; - } - base_cont *get_cont() { - PLS_ASSERT(is_buffer_used(), "Can only read initialized buffer!"); - return continuation_; - } - - memory_block *get_prev() { - return prev_; - } - void set_prev(memory_block *prev) { - prev_ = prev; - } - memory_block *get_next() { - return next_; - } - void set_next(memory_block *next) { - next_ = next; - } - - std::atomic &get_offered_chain() { - return offered_chain_; - } - - std::atomic &get_results_missing() { - return results_missing_; - } - - int get_depth() const noexcept { - return depth_; - } - - void reset_state() { - offered_chain_.store(nullptr); - results_missing_.store(2); - } - - private: - // Linked list property of memory blocks (a complete list represents a threads currently owned memory). - // Each block knows its chain start to allow stealing a whole chain in O(1) - // without the need to traverse back to the chain start. - memory_block *prev_, *next_; - - // When blocked on this chain element, we need to know what other chain of memory we - // got offered by the stealing thread. - // For this we need the offered chain's element up to the point we can steal. - std::atomic offered_chain_; - - // Management for coordinating concurrent result writing and stealing. - // The result count decides atomically who gets to execute the continuation - // and who therefore get's to own this memory block chain. - std::atomic results_missing_; - - // Pointer to memory region reserved for the companion continuation. - // Must be a buffer big enough to hold any continuation encountered in the program. - // This memory is managed explicitly by the continuation manager and runtime system - // (they need to make sure to always call de-constructors and never allocate two continuations). - char *memory_buffer_; - base_cont *continuation_; - - // These two are only helper properties helping with bugs during development. - size_t memory_buffer_size_; - bool memory_buffer_used_; - - // Each element stays at a fixed depth for the entire application run. - // Swapping parts of a memory chain will not reorder it, as always parts of - // the same size are exchanged. - const int depth_; -}; - -} -} -} - -#endif //PLS_INTERNAL_SCHEDULING_CONT_NODE_H_ diff --git a/lib/pls/include/pls/internal/scheduling/parallel_result.h b/lib/pls/include/pls/internal/scheduling/parallel_result.h deleted file mode 100644 index 9cf0358..0000000 --- a/lib/pls/include/pls/internal/scheduling/parallel_result.h +++ /dev/null @@ -1,37 +0,0 @@ - -#ifndef PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_ -#define PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_ - -#include -#include "pls/internal/data_structures/delayed_initialization.h" - -namespace pls { -namespace internal { -namespace scheduling { - -// Used to more enforce the use of parallel_results -class parallel_result_base {}; - -template -class parallel_result : public parallel_result_base { - public: - using value_type = T; - - parallel_result() = default; - parallel_result(parallel_result &&other) noexcept : val_{std::move(other.val_)} {} - parallel_result(const parallel_result &other) noexcept : val_{other.val_} {} - - parallel_result(T val) : val_{std::move(val)} {} - - bool fast_path() { return val_.initialized(); } - T &value() { return val_.object(); } - - private: - data_structures::delayed_initialization val_; -}; - -} -} -} - -#endif //PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_ diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 33081d0..2c9e44c 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -53,23 +53,20 @@ class scheduler { template void perform_work(Function work_section); + template + void spawn(Function &&lambda) { + // TODO: place function on next active + // TODO: capture continuation in current active + // TODO: advance current active + // TODO: after finish, return to last active (if not stolen) + // TODO: revert current active + } + /** * Explicitly terminate the worker threads. Scheduler must not be used after this. */ void terminate(); - /** - * Temporary object used for the parallel(...).then(...) API. - */ - template - struct starter; - - template - starter invoke_parallel(F1 &&function_1, F2 &&function_2); - - template - static starter par(F1 &&function_1, F2 &&function_2); - unsigned int num_threads() const { return num_threads_; } private: @@ -77,7 +74,6 @@ class scheduler { void work_thread_work_section(); thread_state &thread_state_for(size_t id); - friend class base_task; const unsigned int num_threads_; const bool reuse_thread_; scheduler_memory &memory_; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 8b2e8d7..30414ee 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -3,134 +3,17 @@ #define PLS_SCHEDULER_IMPL_H #include -#include "pls/internal/scheduling/cont.h" -#include "pls/internal/scheduling/parallel_result.h" -#include "pls/internal/scheduling/task.h" +#include "context_switcher/context_switcher.h" +#include "context_switcher/continuation.h" + +#include "pls/internal/scheduling/task.h" #include "pls/internal/helpers/profiler.h" namespace pls { namespace internal { namespace scheduling { -template -struct scheduler::starter { - F1 function_1_; - F2 function_2_; - using return_type_1 = decltype(function_1_()); - using return_type_2 = decltype(function_2_()); - - // Enforce correct return types of lambdas (parallel_result) - static_assert(std::is_base_of::value, - "Must only return parallel results in parallel code"); - static_assert(std::is_base_of::value, - "Must only return parallel results in parallel code"); - - template - explicit starter(F1ARG &&function_1, F2ARG &&function_2) : function_1_{std::forward(function_1)}, - function_2_{std::forward(function_2)} {}; - - template - auto then(FCONT &&cont_function) - -> decltype(cont_function(std::declval(), - std::declval())) { - PROFILE_FAST_PATH("then"); - using continuation_type = cont, return_type_1, return_type_2, FCONT>; - using result_type = decltype(cont_function(std::declval(), - std::declval())); - - auto &my_state = thread_state::get(); - auto &cont_manager = my_state.get_cont_manager(); - - // Select current memory block. - // For now directly copy both the continuation function and the second task. - // (We might optimize this in the future to require less memory copies.) - auto *current_memory_block = cont_manager.get_next_memory_block(); - - - // We keep track of the last spawn to build up the parent_cont chain - const bool is_right_cont = my_state.right_spawn_; - base_cont *parent_cont = my_state.parent_cont_; - - continuation_type *current_cont = current_memory_block->place_in_buffer(parent_cont, - current_memory_block, - is_right_cont, - cont_function, - function_2_); - my_state.parent_cont_ = current_cont; - - // Publish the second task. - my_state.get_task_manager().publish_task(current_cont->get_task()); - - // Call first function on fast path - my_state.right_spawn_ = false; - return_type_1 result_1 = function_1_(); - if (!result_1.fast_path()) { - // Get our replacement from the task stack and store it for later use when we are actually blocked. - auto traded_memory = my_state.get_task_manager().try_pop_local(); - current_cont->get_memory_block()->get_offered_chain().store(*traded_memory); - - // Unwind stack... - return result_type{}; - } - - // Try to call second function on fast path - auto traded_memory = my_state.get_task_manager().try_pop_local(); - if (traded_memory) { - // The task got stolen...get_memory_block - // ...but we got a memory block that can be used if we block on this one. - current_cont->get_memory_block()->get_offered_chain().store(*traded_memory); - - // Main scheduling loop is responsible for entering the result to the slow path... - current_cont->store_left_result(std::move(result_1)); - cont_manager.fall_through_and_notify_cont(current_cont, false); - // Unwind stack... - return result_type{}; - } else { - my_state.right_spawn_ = true; - return_type_2 result_2 = function_2_(); - if (!result_2.fast_path()) { - // Main scheduling loop is responsible for entering the result to the slow path... - current_cont->store_left_result(std::move(result_1)); - current_cont->get_memory_block()->get_results_missing().fetch_add(-1); - // Unwind stack... - return result_type{}; - } - - // We fully got all results, inline as good as possible. - // This is the common case, branch prediction should be rather good here. - - // Just return the cont object unused and directly call the function. - current_cont->~continuation_type(); - current_memory_block->free_buffer(); - cont_manager.return_memory_block(); - - // The continuation has the same execution environment as we had for the children. - // We need this to allow spawns in there. - my_state.parent_cont_ = parent_cont; - my_state.right_spawn_ = is_right_cont; - - auto cont_result = cont_function(result_1.value(), result_2.value()); - if (!cont_result.fast_path()) { - // Unwind stack... - return result_type{}; - } - return cont_result; - } - }; -}; - -template -scheduler::starter scheduler::invoke_parallel(F1 &&function_1, F2 &&function_2) { - return scheduler::starter{std::forward(function_1), std::forward(function_2)}; -} - -template -scheduler::starter scheduler::par(F1 &&function_1, F2 &&function_2) { - return thread_state::get().get_scheduler().invoke_parallel(std::forward(function_1), - std::forward(function_2)); -} - class scheduler::init_function { public: virtual void run() = 0; @@ -140,15 +23,12 @@ class scheduler::init_function_impl : public init_function { public: explicit init_function_impl(F &function) : function_{function} {} void run() override { - scheduler::par([]() { - return parallel_result{0}; - }, [=]() { - return function_(); - }).then([=](int, int b) { - thread_state::get().get_scheduler().work_section_done_ = true; - return parallel_result{b}; + auto &thread_state = thread_state::get(); + thread_state.get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) { + function_(); + return std::move(cont); }); - + thread_state.get_scheduler().work_section_done_.store(true); } private: F &function_; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h index 25c8063..6db97d3 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h @@ -11,8 +11,6 @@ namespace pls { namespace internal { namespace scheduling { -void worker_routine(); - class scheduler_memory { // Note: scheduler_memory is a pure interface and has no data. // By not having an initialization routine we can do our 'static and heap specialization' @@ -29,7 +27,7 @@ class scheduler_memory { } }; -template +template class static_scheduler_memory : public scheduler_memory { public: static_scheduler_memory() : scheduler_memory{} { @@ -47,14 +45,14 @@ class static_scheduler_memory : public scheduler_memory { return threads_[id]; } private: - using thread_state_type = thread_state_static; + using thread_state_type = thread_state_static; alignas(base::system_details::CACHE_LINE_SIZE) std::array threads_; alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_states_; alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_state_pointers_; }; -template +template class heap_scheduler_memory : public scheduler_memory { public: explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads}, @@ -80,7 +78,7 @@ class heap_scheduler_memory : public scheduler_memory { return thread_vector_[id]; } private: - using thread_state_type = thread_state_static; + using thread_state_type = thread_state_static; // thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require // the new operator to obey alignments bigger than 16, cache lines are usually 64). // To allow this object to be allocated using 'new' (which the vector does internally), diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 372a18e..c896a64 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -1,64 +1,85 @@ #ifndef PLS_TASK_H #define PLS_TASK_H -#include "pls/internal/scheduling/cont.h" -#include "pls/internal/scheduling/memory_block.h" +#include -#include "pls/internal/helpers/profiler.h" +#include "context_switcher/continuation.h" +#include "context_switcher/context_switcher.h" + +#include "pls/internal/base/system_details.h" namespace pls { namespace internal { namespace scheduling { /** - * A task to be executed by the runtime system. - * Tasks are guaranteed to be executed exactly once. + * A task is the smallest unit of execution seen by the runtime system. + * + * Tasks represent a action dispatched by a potentially parallel call. + * Tasks have their own execution context (stack and register state), making them stackefull coroutines. + * Tasks can be suspended and resumed (stealing happens by resuming a task). * - * Override the execute_internal() method for your custom code. + * Being coroutines tasks go through a very deliberate state machine: + * - initialized (no execution state) + * - running (currently executing user code) + * - suspended (suspended by switching to a different task). */ -class base_task { - public: - /** - * Executes the task and stores its result in the correct continuation. - * The caller must handle finishing the continuation/informing it that task two was finished. - */ - void execute() { - execute_internal(); +struct alignas(base::system_details::CACHE_LINE_SIZE) task { + void init(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) { + stack_memory_ = stack_memory; + stack_size_ = stack_size; + + depth_ = depth; + thread_id_ = thread_id; } - base_cont *get_cont() { - return cont_; + unsigned get_thread_id() const { + return thread_id_; + } + void set_thread_id(unsigned thread_id) { + thread_id_ = thread_id; } - protected: - explicit base_task(base_cont *cont) : cont_{cont} {}; + task *get_prev() const { + return prev_; + } + void set_prev(task *prev) { + prev_ = prev; + } - /** - * Overwrite this with the actual behaviour of concrete tasks. - */ - virtual void execute_internal() = 0; + task *get_next() const { + return next_; + } + void set_next(task *next) { + next_ = next; + } - base_cont *cont_; -}; + task *get_parent_task() const { + return parent_task_; + } + void set_parent_task(task *parent_task) { + parent_task_ = parent_task; + } -template -class task : public base_task { - public: - template - explicit task(FARG &&function, base_cont *cont) - : base_task{cont}, function_{std::forward(function)} {} - - void execute_internal() override { - PROFILE_TASK("execute_internal") - auto result = function_(); - if (result.fast_path()) { - cont_->store_right_result(std::move(result)); - } + template + context_switcher::continuation run_as_task(F &&lambda) { + return context_switcher::enter_context(stack_memory_, stack_size_, std::forward(lambda)); } private: - F function_; + // Stack/Continuation Management + char *stack_memory_; + size_t stack_size_; // TODO: We do not need this in every single task... + context_switcher::continuation continuation_; + + // Task Tree (we have a parent that we want to continue when we finish) + task *parent_task_; + unsigned depth_; + unsigned thread_id_; + // Memory Linked List + task *prev_; + task *next_; }; } diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 5eeb73d..139c842 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -5,18 +5,11 @@ #include #include #include -#include -#include #include "pls/internal/scheduling/task.h" -#include "pls/internal/scheduling/cont_manager.h" -#include "pls/internal/scheduling/memory_block.h" #include "pls/internal/data_structures/bounded_trading_deque.h" -#include "pls/internal/data_structures/stamped_integer.h" -#include "pls/internal/data_structures/optional.h" - -#include "pls/internal/base/spin_lock.h" +#include "pls/internal/data_structures/aligned_stack.h" namespace pls { namespace internal { @@ -28,58 +21,59 @@ namespace scheduling { */ class task_manager { public: - // Publishes a task on the stack, i.e. makes it visible for other threads to steal. - void publish_task(base_task *task) { - task_deque_.push_bot(task->get_cont()->get_memory_block()); - } + explicit task_manager(task *tasks, + data_structures::aligned_stack static_stack_space, + size_t num_tasks, + size_t stack_size) { + for (size_t i = 0; i < num_tasks - 1; i++) { + tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0); + if (i > 0) { + tasks[i].set_prev(&tasks[i - 1]); + } + if (i < num_tasks - 2) { + tasks[i].set_next(&tasks[i + 1]); + } + } - // Try to pop a local task from this task managers stack. - data_structures::optional try_pop_local() { - return task_deque_.pop_bot().traded_; + num_tasks_ = num_tasks; + this_thread_tasks_ = tasks; + active_task_ = &tasks[0]; } - // Try to steal a task from a remote task_manager instance. The stolen task must be stored locally. - // Returns a pair containing the actual task and if the steal was successful. - base_task *steal_remote_task(cont_manager &stealing_cont_manager) { - auto peek = task_deque_.peek_top(); - - if (std::get<0>(peek)) { - memory_block *peeked_memory_block = (*std::get<0>(peek)); - auto peeked_depth = peeked_memory_block->get_depth(); - - stealing_cont_manager.move_active_node(peeked_depth); - auto offered_chain = stealing_cont_manager.get_active_node(); - stealing_cont_manager.move_active_node(1); - - auto stolen_memory_block = task_deque_.pop_top(offered_chain, std::get<1>(peek)); - if (stolen_memory_block) { - PLS_ASSERT(*stolen_memory_block == peeked_memory_block, "Steal must only work if it is equal!"); + task &get_this_thread_task(size_t depth) { + return this_thread_tasks_[depth]; + } - return (*stolen_memory_block)->get_cont()->get_task(); - } else { - stealing_cont_manager.move_active_node(-(peeked_depth + 1)); - return nullptr; - } + void set_thread_id(unsigned id) { + for (size_t i = 0; i < num_tasks_; i++) { + this_thread_tasks_[i].set_thread_id(id); } - - return nullptr; } - explicit task_manager(data_structures::bounded_trading_deque &task_deque) : - task_deque_{task_deque} {} + task &get_active_task() { + return *active_task_; + } private: - data_structures::bounded_trading_deque &task_deque_; + size_t num_tasks_; + + task *this_thread_tasks_; + task *active_task_; }; -template +template class static_task_manager { public: - static_task_manager() : task_deque_{}, task_manager_{task_deque_.get_deque()} {}; + static_task_manager() + : tasks_{}, + static_stack_storage_{}, + task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE} {}; task_manager &get_task_manager() { return task_manager_; } private: - data_structures::static_bounded_trading_deque task_deque_; + std::array tasks_; + data_structures::static_aligned_stack static_stack_storage_; + task_manager task_manager_; }; diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 85a4f0a..32eda14 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -3,54 +3,34 @@ #define PLS_THREAD_STATE_H #include -#include -#include #include +#include "pls/internal/scheduling/task_manager.h" + namespace pls { namespace internal { namespace scheduling { -// forward declaration -class task_manager; -class cont_manager; class scheduler; -class base_task; -class base_cont; +struct task; struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { + private: scheduler *scheduler_; - size_t id_; - - // Keep track of the last spawn state (needed to chain tasks/conts correctly) - bool right_spawn_; - base_cont *parent_cont_; - // TODO: Set this when spawning! - // See if we should move this to the cont manager...seems like a better fit! - + unsigned id_; task_manager &task_manager_; - cont_manager &cont_manager_; - alignas(base::system_details::CACHE_LINE_SIZE) base_task *current_task_; + alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; public: - thread_state(task_manager &task_manager, - cont_manager &cont_manager) : + explicit thread_state(task_manager &task_manager) : scheduler_{nullptr}, id_{0}, - right_spawn_{false}, - parent_cont_{nullptr}, task_manager_{task_manager}, - cont_manager_{cont_manager}, current_task_{nullptr}, random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count())} {}; - void reset() { - right_spawn_ = false; - parent_cont_ = nullptr; - } - /** * Convenience helper to get the thread_state instance associated with this thread. * Must only be called on threads that are associated with a thread_state, @@ -60,10 +40,19 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { */ static thread_state &get() { return *base::this_thread::state(); } - size_t get_id() { return id_; } + unsigned get_id() { return id_; } + void set_id(unsigned id) { + id_ = id; + task_manager_.set_thread_id(id); + } task_manager &get_task_manager() { return task_manager_; } - cont_manager &get_cont_manager() { return cont_manager_; } scheduler &get_scheduler() { return *scheduler_; } + void set_scheduler(scheduler *scheduler) { + scheduler_ = scheduler; + } + long get_rand() { + return random_(); + } // Do not allow move/copy operations. // State is a pure memory container with references/pointers into it from all over the code. @@ -73,6 +62,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { thread_state(const thread_state &) = delete; thread_state &operator=(const thread_state &) = delete; + }; } diff --git a/lib/pls/include/pls/internal/scheduling/thread_state_static.h b/lib/pls/include/pls/internal/scheduling/thread_state_static.h index 4ac295a..93539fb 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state_static.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state_static.h @@ -3,8 +3,6 @@ #define PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ #include "pls/internal/scheduling/task_manager.h" -#include "pls/internal/scheduling/cont_manager.h" - #include "pls/internal/base/system_details.h" #include "thread_state.h" @@ -13,18 +11,16 @@ namespace pls { namespace internal { namespace scheduling { -template +template struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state_static { public: thread_state_static() : static_task_manager_{}, - static_cont_manager_{}, - thread_state_{static_task_manager_.get_task_manager(), static_cont_manager_.get_cont_manager()} {} + thread_state_{static_task_manager_.get_task_manager()} {} thread_state &get_thread_state() { return thread_state_; } private: - alignas(base::system_details::CACHE_LINE_SIZE) static_task_manager static_task_manager_; - alignas(base::system_details::CACHE_LINE_SIZE) static_cont_manager static_cont_manager_; + alignas(base::system_details::CACHE_LINE_SIZE) static_task_manager static_task_manager_; alignas(base::system_details::CACHE_LINE_SIZE) thread_state thread_state_; }; diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 960a216..ae2036c 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -21,8 +21,8 @@ scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, b for (unsigned int i = 0; i < num_threads_; i++) { // Placement new is required, as the memory of `memory_` is not required to be initialized. - memory.thread_state_for(i).scheduler_ = this; - memory.thread_state_for(i).id_ = i; + memory.thread_state_for(i).set_scheduler(this); + memory.thread_state_for(i).set_id(i); if (reuse_thread && i == 0) { continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. @@ -55,8 +55,7 @@ void scheduler::work_thread_main_loop() { void scheduler::work_thread_work_section() { auto &my_state = thread_state::get(); - my_state.reset(); - auto &my_cont_manager = my_state.get_cont_manager(); + auto &my_task_manager = my_state.get_task_manager(); auto const num_threads = my_state.get_scheduler().num_threads(); auto const my_id = my_state.get_id(); @@ -67,41 +66,23 @@ void scheduler::work_thread_work_section() { } do { - // Work off pending continuations we need to execute locally - while (my_cont_manager.falling_through()) { - my_cont_manager.execute_fall_through_code(); - } - // Steal Routine (will be continuously executed when there are no more fall through's). // TODO: move into separate function - const size_t offset = my_state.random_() % num_threads; - const size_t max_tries = num_threads; - for (size_t i = 0; i < max_tries; i++) { - size_t target = (offset + i) % num_threads; - auto &target_state = my_state.get_scheduler().thread_state_for(target); - - PLS_ASSERT(my_cont_manager.is_clean(), "Only steal with clean chain!"); - PROFILE_STEALING("steal") - auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager); - PROFILE_END_BLOCK; - if (stolen_task != nullptr) { - my_state.parent_cont_ = stolen_task->get_cont(); - my_state.right_spawn_ = true; - stolen_task->execute(); - if (my_cont_manager.falling_through()) { - break; - } else { - my_cont_manager.fall_through_and_notify_cont(stolen_task->get_cont(), true); - break; - } - } - } +// const size_t offset = my_state.get_rand() % num_threads; +// const size_t max_tries = num_threads; +// for (size_t i = 0; i < max_tries; i++) { +// size_t target = (offset + i) % num_threads; +// auto &target_state = my_state.get_scheduler().thread_state_for(target); +// +// auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager); +// if (stolen_task != nullptr) { +// stolen_task->execute(); +// } +// } // if (!my_cont_manager.falling_through()) { // base::this_thread::sleep(5); // } } while (!work_section_done_); - - PLS_ASSERT(my_cont_manager.is_clean(), "Only finish work section with clean chain!"); } void scheduler::terminate() {