From 69fd7e0c0f6c40bff7892cd6e8f319022df4fd89 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Wed, 27 Nov 2019 19:14:11 +0100 Subject: [PATCH] WIP: Refactor memory manager to reduce redundancy. It is still not working, however we now have no more redundant code, making debugging it simpler. --- app/playground/main.cpp | 26 +++++++++++++------------- lib/pls/CMakeLists.txt | 4 ++-- lib/pls/include/pls/internal/base/alignment.h | 16 ++++++++++++---- lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h | 2 +- lib/pls/include/pls/internal/data_structures/delayed_initialization.h | 8 ++++++-- lib/pls/include/pls/internal/data_structures/stamped_integer.h | 8 ++++++++ lib/pls/include/pls/internal/scheduling/cont.h | 153 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/cont_manager.h | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------------------------------------------------------------------------------------- lib/pls/include/pls/internal/scheduling/continuation.h | 206 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- lib/pls/include/pls/internal/scheduling/parallel_result.h | 5 ++++- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------- lib/pls/include/pls/internal/scheduling/scheduler_memory.h | 5 +++-- lib/pls/include/pls/internal/scheduling/task.h | 37 +++++++++++++++++++++++-------------- lib/pls/include/pls/internal/scheduling/task_manager.h | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++---- lib/pls/include/pls/internal/scheduling/thread_state.h | 29 +++++++++++------------------ lib/pls/src/internal/base/alignment.cpp | 14 -------------- lib/pls/src/internal/scheduling/scheduler.cpp | 38 +++++++++++++++++++++++++++++++------- 17 files changed, 485 insertions(+), 434 deletions(-) create mode 100644 lib/pls/include/pls/internal/scheduling/cont.h delete mode 100644 lib/pls/include/pls/internal/scheduling/continuation.h diff --git a/app/playground/main.cpp b/app/playground/main.cpp index a3417b3..0578201 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -13,9 +13,11 @@ constexpr size_t NUM_TASKS = 64; constexpr size_t MAX_TASK_STACK_SIZE = 0; constexpr size_t NUM_CONTS = 64; -constexpr size_t MAX_CONT_SIZE = 128; +constexpr size_t MAX_CONT_SIZE = 256; +std::atomic count{0}; scheduling::parallel_result fib(int n) { + std::cout << "Fib(" << n << "): " << count++ << ", " << scheduling::thread_state::get().get_id() << std::endl; if (n == 0) { return 0; } @@ -27,12 +29,15 @@ scheduling::parallel_result fib(int n) { return fib(n - 1); }, [=]() { return fib(n - 2); - }).then([](int a, int b) { - return a + b; + }).then([=](int a, int b) { + scheduling::parallel_result result{a + b}; + std::cout << "Done Fib(" << n << "): " << (a + b) << ", " << scheduling::thread_state::get().get_id() << std::endl; + return result; }); } int fib_normal(int n) { + std::cout << "Fib(" << n << "): " << count++ << std::endl; if (n == 0) { return 0; } @@ -40,7 +45,9 @@ int fib_normal(int n) { return 1; } - return fib_normal(n - 1) + fib_normal(n - 2); + int result = fib_normal(n - 1) + fib_normal(n - 2); + std::cout << "Done Fib(" << n << "): " << result << std::endl; + return result; } int main() { @@ -53,7 +60,7 @@ int main() { scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS}; auto start = std::chrono::steady_clock::now(); - std::cout << "fib = " << fib_normal(39) << std::endl; +// std::cout << "fib = " << fib_normal(10) << std::endl; auto end = std::chrono::steady_clock::now(); std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() << std::endl; @@ -61,14 +68,7 @@ int main() { start = std::chrono::steady_clock::now(); scheduler.perform_work([]() { - return scheduling::scheduler::par([]() { - return fib(39); - }, []() { - return scheduling::parallel_result{0}; - }).then([](int a, int b) { - std::cout << "fib = " << a + b << std::endl; - return a + b; - }); + return fib(10); }); end = std::chrono::steady_clock::now(); diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 9e9dd85..9ec640a 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -57,8 +57,8 @@ add_library(pls STATIC include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/cont_manager.h - include/pls/internal/scheduling/continuation.h - include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h) + include/pls/internal/scheduling/cont.h + include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/cont_manager_impl.h include/pls/internal/scheduling/thread_state_static.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index 4296d40..699aaf0 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -13,11 +13,19 @@ namespace internal { namespace base { namespace alignment { -system_details::pointer_t next_alignment(system_details::pointer_t size, - size_t alignment = system_details::CACHE_LINE_SIZE); +constexpr system_details::pointer_t next_alignment(system_details::pointer_t size, + size_t alignment = system_details::CACHE_LINE_SIZE) { + return (size % alignment) == 0 ? + size : + size + (alignment - (size % alignment)); +} -system_details::pointer_t previous_alignment(system_details::pointer_t size, - size_t alignment = system_details::CACHE_LINE_SIZE); +constexpr system_details::pointer_t previous_alignment(system_details::pointer_t size, + size_t alignment = system_details::CACHE_LINE_SIZE) { + return (size % alignment) == 0 ? + size : + size - (size % alignment); +} char *next_alignment(char *pointer, size_t alignment = system_details::CACHE_LINE_SIZE); diff --git a/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h index 7a37f7b..f3c20fb 100644 --- a/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h +++ b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h @@ -34,7 +34,7 @@ class bounded_ws_deque { bounded_ws_deque(T *item_array, size_t size) : size_{size}, item_array_{item_array} {} void push_bottom(T item) { - item_array_[bottom_] = item; + item_array_[local_bottom_] = item; local_bottom_++; bottom_.store(local_bottom_, std::memory_order_release); } diff --git a/lib/pls/include/pls/internal/data_structures/delayed_initialization.h b/lib/pls/include/pls/internal/data_structures/delayed_initialization.h index 610d98a..03b0f51 100644 --- a/lib/pls/include/pls/internal/data_structures/delayed_initialization.h +++ b/lib/pls/include/pls/internal/data_structures/delayed_initialization.h @@ -43,7 +43,9 @@ class delayed_initialization { template void initialize(ARGS &&...args) { - PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!") + if (initialized_) { + PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!"); + } new((void *) memory_.data()) T(std::forward(args)...); initialized_ = true; @@ -57,7 +59,9 @@ class delayed_initialization { } T &object() { - PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!") + if (!initialized_) { + PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!") + } return *reinterpret_cast(memory_.data()); } diff --git a/lib/pls/include/pls/internal/data_structures/stamped_integer.h b/lib/pls/include/pls/internal/data_structures/stamped_integer.h index cc68398..511f774 100644 --- a/lib/pls/include/pls/internal/data_structures/stamped_integer.h +++ b/lib/pls/include/pls/internal/data_structures/stamped_integer.h @@ -18,6 +18,14 @@ struct stamped_integer { stamped_integer() : stamp{0}, value{0} {}; stamped_integer(member_t new_value) : stamp{0}, value{new_value} {}; stamped_integer(member_t new_stamp, member_t new_value) : stamp{new_stamp}, value{new_value} {}; + + bool operator==(const stamped_integer &other) const noexcept { + return stamp == other.stamp && value == other.value; + } + + bool operator!=(const stamped_integer &other) const noexcept { + return !(*this == other); + } }; } diff --git a/lib/pls/include/pls/internal/scheduling/cont.h b/lib/pls/include/pls/internal/scheduling/cont.h new file mode 100644 index 0000000..3254746 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/cont.h @@ -0,0 +1,153 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_CONT_H_ +#define PLS_INTERNAL_SCHEDULING_CONT_H_ + +#include +#include +#include + +#include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/data_structures/delayed_initialization.h" +#include "pls/internal/base/alignment.h" + +#include "parallel_result.h" +#include "memory_block.h" + +namespace pls { +namespace internal { +namespace scheduling { + +class base_cont { + protected: + // We plan to only init the members for a continuation on the slow path. + // If we can execute everything inline we simply skip it saving runtime overhead. + template + using delayed_init = data_structures::delayed_initialization; + + public: + explicit base_cont(base_cont *parent, memory_block *memory_block, bool is_right_child) + : parent_{parent}, + memory_block_{memory_block}, + is_right_child_{is_right_child} {}; + + /** + * Execute the continuation itself. + * Make sure to only call when all required results are in. + * Will store the result in it's parent, but not mess with any counters. + */ + virtual void execute() = 0; + + /** + * Execute the right hand side task associated with the continuation. + * Will store the result in it's parent, but not mess with any counters. + */ + virtual void execute_task() = 0; + + virtual void *get_right_result_pointer() = 0; + virtual void *get_left_result_pointer() = 0; + + template + void store_right_result(T &&result) { + using BASE_T = typename std::remove_cv::type>::type; + reinterpret_cast *>(get_right_result_pointer())->initialize(std::forward(result)); + } + + template + void store_left_result(T &&result) { + using BASE_T = typename std::remove_cv::type>::type; + reinterpret_cast *>(get_left_result_pointer())->initialize(std::forward(result)); + } + + base_cont *get_parent() { return parent_; } + memory_block *get_memory_block() { return memory_block_; } + bool is_right_child() const { return is_right_child_; } + + protected: + base_cont *parent_; + memory_block *memory_block_; + bool is_right_child_; +}; + +template +class cont : public base_cont { + private: + template + struct result_runner { + // Strip off unwanted modifiers... + using BASE_RES_TYPE = typename std::remove_cv::type>::type; + + static void execute(cont &cont) { + parallel_result result{cont.function_((*cont.result_1_).value(), (*cont.result_2_).value())}; + if (result.fast_path() && cont.parent_ != nullptr) { + if (cont.is_right_child()) { + cont.parent_->store_right_result(std::move(result)); + } else { + cont.parent_->store_left_result(std::move(result)); + } + } + } + }; + template + struct result_runner> { + static void execute(cont &cont) { + auto result = cont.function_((*cont.result_1_).value(), (*cont.result_2_).value()); + if (result.fast_path() && cont.parent_) { + if (cont.is_right_child()) { + cont.parent_->store_right_result(std::move(result)); + } else { + cont.parent_->store_left_result(std::move(result)); + } + } + } + }; + + public: + template + explicit cont(base_cont *parent, + memory_block *memory_block, + bool is_right_child, + FARG &&function, + T2ARGS...task_2_args): + base_cont(parent, memory_block, is_right_child), + function_{std::forward(function)}, + task_{std::forward(task_2_args)..., this} {}; + + void execute() override { + using result_type = decltype(function_((*result_1_).value(), (*result_2_).value())); + result_runner::execute(*this); + this->get_memory_block()->free_buffer(); + this->~cont(); + } + + void execute_task() override { + task_.execute(); + } + + void *get_right_result_pointer() override { + return &result_1_; + } + + void *get_left_result_pointer() override { + return &result_2_; + } + + T2 *get_task() { + return &task_; + } + + private: + // Initial data members. These slow down the fast path, try to init them lazy when possible. + F function_; + T2 task_; + + // Some fields/actual values stay uninitialized (save time on the fast path if we don not need them). + // More fields untouched on the fast path is good, but for ease of an implementation we only keep some for now. + delayed_init result_1_; + delayed_init result_2_; +}; + +} +} +} + +#endif //PLS_INTERNAL_SCHEDULING_CONT_H_ diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h index b2b9878..792593e 100644 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ b/lib/pls/include/pls/internal/scheduling/cont_manager.h @@ -7,7 +7,8 @@ #include #include "pls/internal/data_structures/aligned_stack.h" -#include "pls/internal/scheduling/continuation.h" +#include "pls/internal/scheduling/cont.h" +#include "pls/internal/scheduling/thread_state.h" namespace pls { namespace internal { @@ -24,156 +25,155 @@ class cont_manager { : max_cont_size_{MAX_CONT_SIZE} { // First node is currently active and our local start - start_node_ = active_node_ = init_cont_node(cont_storage, nullptr, nullptr); + start_node_ = active_node_ = init_memory_block(cont_storage, nullptr, nullptr, 0); // Build up chain after it - continuation_node *current_node = start_node_; + memory_block *current_node = start_node_; for (size_t i = 1; i < NUM_CONTS; i++) { - continuation_node *next_node = init_cont_node(cont_storage, start_node_, current_node); - current_node->next_ = next_node; + memory_block *next_node = init_memory_block(cont_storage, start_node_, current_node, i); + current_node->set_next(next_node); current_node = next_node; } }; - // Fast-path methods - continuation_node *fast_path_get_next() { + // Aquire and release memory blocks... + memory_block *get_next_memory_block() { + active_node_->set_start(start_node_); auto result = active_node_; - active_node_ = active_node_->next_; - active_node_->cont_chain_start_ = start_node_; + active_node_ = active_node_->get_next(); return result; } - void fast_path_return() { - active_node_ = active_node_->prev_; + void return_memory_block() { + active_node_ = active_node_->get_prev(); } - // Slow-path methods - void slow_path_return() { - active_node_ = active_node_->prev_; - active_node_->destroy_continuation(); - } + // Manage the fall through behaviour/slow path behaviour bool falling_through() const { return fall_through_; } - void fall_through() { - fall_through_ = true; - fall_through_node_ = nullptr; - } - void fall_through_and_execute(continuation_node *continuation_node) { + void fall_through_and_notify_cont(base_cont *notified_cont, bool is_child_right) { fall_through_ = true; - fall_through_node_ = continuation_node; + fall_through_cont_ = notified_cont; + fall_through_child_right = is_child_right; } - void aquire_clean_cont_chain(continuation_node *clean_chain) { - // Link active node backwards and other chain forwards - active_node_->prev_ = clean_chain; - active_node_->cont_chain_start_ = clean_chain; - clean_chain->next_ = active_node_; - // Update our local chain start AND reset the active node to the start (we operate on a clean chain) - start_node_ = clean_chain->cont_chain_start_; - active_node_ = clean_chain; + + void aquire_memory_chain(memory_block *target_chain) { + auto *our_next_node = get_node(target_chain->get_depth() + 1); + + our_next_node->set_prev(target_chain); + target_chain->set_next(our_next_node); + + start_node_ = target_chain->get_start(); } - void aquire_blocked_cont_chain(continuation_node *blocked_chain) { - // Link active node backwards and other chain forwards - active_node_->prev_ = blocked_chain; - active_node_->cont_chain_start_ = blocked_chain; - blocked_chain->next_ = active_node_; - // Update our local chain start, NOT our active node (we continue execution on top of the blocking chain) - start_node_ = blocked_chain->cont_chain_start_; + + memory_block *get_node(unsigned int depth) { + // TODO: Remove this O(n) factor to avoid the + // T_1/P + (T_lim + D) stealing time bound. + memory_block *current = start_node_; + for (unsigned int i = 0; i < depth; i++) { + current->set_start(start_node_); + current = current->get_next(); + } + + return current; } - void execute_fall_through_continuation() { - PLS_ASSERT(fall_through_node_ != nullptr, - "Must not execute continuation node without associated continuation code."); + void set_active_depth(unsigned int depth) { + active_node_ = get_node(depth); + } - // Reset our state to from falling through - continuation_node *executed_node = fall_through_node_; - fall_through_ = false; - fall_through_node_ = nullptr; + void execute_fall_through_code() { + PLS_ASSERT(falling_through(), "Must be falling through to execute the associated code.") - // Grab everyone involved in the transaction - base_continuation *executed_continuation = executed_node->continuation_; - base_continuation *parent_continuation = executed_continuation->parent_; - continuation_node *parent_node = parent_continuation->node_; + auto &my_state = thread_state::get(); - // Execute the continuation itself - executed_continuation->execute(); - // Slow path return destroys it and resets our current cont_node - slow_path_return(); + // Copy fall through status and reset it (for potentially nested execution paths). + auto *notified_cont = fall_through_cont_; + bool notifier_is_right_child = fall_through_child_right; - if (falling_through()) { - return; - } + fall_through_cont_ = nullptr; + fall_through_ = false; - if (executed_node->is_end_of_cont_chain()) { - // We are at the end of our local part of the cont chain and therefore - // not invested in the continuation_chain above. - - // Notify the next continuation of finishing a child... - if (parent_node->results_missing_.fetch_add(-1) == 1) { - // ... we finished the last continuation, thus we need to take responsibility of the above cont chain. - aquire_blocked_cont_chain(parent_node->cont_chain_start_); - fall_through_and_execute(parent_node); - return; - } else { - // ... we did not finish the last continuation, thus we are not concerned with this computational - // path anymore. Just ignore it and fall through to mind our own business. - fall_through(); - return; - } - } else { - // We are invested in the continuation_chain above (its part of our local continuation node chain). - // We keep executing it if possible and need to 'clean our state' with a swapped chain if not. - - // ...we could be blocked (and then need a clean cont chain). - // For the rare case that the stealing processes is still going on, we - // need to fight with it over directly executing the task that is currently being stolen. - continuation_node::stamped_state task_state{continuation_node::state::execute_local}; - parent_node->state_.exchange(task_state, std::memory_order_acq_rel); - if (task_state.value == continuation_node::state::stolen) { - // The other task strain is executing. - // Notify the parent continuation of finishing our child... - if (parent_node->results_missing_.fetch_add(-1) == 1) { - // ... we finished the last continuation, thus we keep the responsibility for the above cont chain. - fall_through_and_execute(parent_node); - return; - } else { - // ... we did not finish the last continuation, thus we need to give up our local cont chain in - // favour of the stolen one (leaving us with a clean chain for further operation). - aquire_clean_cont_chain(parent_node->cont_chain_start_); - fall_through(); - return; - } - } else { + // Special case for lock free implementation. + // When we finish a 'left strain' it can happen that the 'right strain' + // is currently being stolen. We need to be sure that this steal is finished + // as we need the thieves memory blocks in case we get blocked by it. + if (!notifier_is_right_child) { + // Check to execute right child directly... + auto &atomic_state = notified_cont->get_memory_block()->get_state(); + memory_block::stamped_state target_state{atomic_state.load().stamp + 1, memory_block::state::execute_local}; + atomic_state.exchange(target_state); + if (target_state.value != memory_block::state::stolen) { // We 'overruled' the stealing thread and execute the other task ourselfs. // We can be sure that only we where involved in executing the child tasks of the parent_continuation... - parent_continuation->node_->results_missing_.fetch_add(-1); - parent_continuation->execute_task(); + notified_cont->get_memory_block()->get_results_missing().fetch_add(-1); + + my_state.parent_cont_ = notified_cont; + my_state.right_spawn_ = true; + notified_cont->execute_task(); if (falling_through()) { // ... if the second strain was interrupted we fall through without scheduling the parent_continuation // (the currently pending/interrupted strain will do this itself). return; + } else { + // ... we could finish the second strain. + // Register the parent continuation for being notified. + // (This is not the most efficient, as we could simply execute it. However, + // this way of doing it spares us from duplicating a lot of code). + fall_through_and_notify_cont(notified_cont, true); + return; } - // ...if we could execute the second branch without interruption, - // we can schedule the parent safely for execution. - fall_through_and_execute(parent_continuation->node_); } + + // Right side is 'fully' stolen. We can continue to inform the parent like we would do normally. } - } - size_t get_max_cont_size() const { return max_cont_size_; } + // Notify the next continuation of finishing a child... + if (notified_cont->get_memory_block()->get_results_missing().fetch_add(-1) == 1) { + // ... we finished the continuation. + // We are now in charge continuing to execute the above continuation chain. + + if (get_node(notified_cont->get_memory_block()->get_depth()) != notified_cont->get_memory_block()) { + // We do not own the thing we will execute. + // Own it by swapping the chain belonging to it in. + aquire_memory_chain(notified_cont->get_memory_block()); + } + my_state.parent_cont_ = notified_cont->get_parent(); + my_state.right_spawn_ = notified_cont->is_right_child(); + active_node_ = notified_cont->get_memory_block(); + notified_cont->execute(); + if (!falling_through() && notified_cont->get_parent() != nullptr) { + fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child()); + } + return; + } else { + // ... we did not finish the last continuation. + // We are no longer in charge of executing the above continuation chain. + + if (get_node(notified_cont->get_memory_block()->get_depth()) == notified_cont->get_memory_block()) { + // We own the thing we are not allowed to execute. + // Get rid of the ownership by using the offered chain. + aquire_memory_chain(notified_cont->get_memory_block()->get_offered_chain().load()); + } + // We are done here...nothing more to execute + return; + } + } private: template - static continuation_node *init_cont_node(data_structures::aligned_stack &cont_storage, - continuation_node *cont_chain_start, - continuation_node *prev) { + static memory_block *init_memory_block(data_structures::aligned_stack &cont_storage, + memory_block *memory_chain_start, + memory_block *prev, + unsigned long depth) { // Represents one cont_node and its corresponding memory buffer (as one continuous block of memory). - constexpr size_t buffer_size = MAX_CONT_SIZE - sizeof(continuation_node); - char *cont_node_address = cont_storage.push_bytes(); - char *cont_node_memory_address = cont_storage.push_bytes(buffer_size); + constexpr size_t buffer_size = MAX_CONT_SIZE - base::alignment::next_alignment(sizeof(memory_block)); + char *memory_block_ptr = cont_storage.push_bytes(); + char *memory_block_buffer_ptr = cont_storage.push_bytes(buffer_size); - return new(cont_node_address) continuation_node(cont_node_memory_address, cont_chain_start, prev); + return new(memory_block_ptr) memory_block(memory_block_buffer_ptr, buffer_size, memory_chain_start, prev, depth); } private: @@ -182,14 +182,15 @@ class cont_manager { /** * Managing the continuation chain. */ - continuation_node *start_node_; - continuation_node *active_node_; + memory_block *start_node_; + memory_block *active_node_; /** * Managing falling through back to the scheduler. */ bool fall_through_{false}; - continuation_node *fall_through_node_{nullptr}; + bool fall_through_child_right{false}; + base_cont *fall_through_cont_{nullptr}; }; template @@ -208,4 +209,5 @@ class static_cont_manager { } } } + #endif //PLS_CONT_MANAGER_H_ diff --git a/lib/pls/include/pls/internal/scheduling/continuation.h b/lib/pls/include/pls/internal/scheduling/continuation.h deleted file mode 100644 index f5a9919..0000000 --- a/lib/pls/include/pls/internal/scheduling/continuation.h +++ /dev/null @@ -1,206 +0,0 @@ - -#ifndef PLS_INTERNAL_SCHEDULING_CONTINUATION_H_ -#define PLS_INTERNAL_SCHEDULING_CONTINUATION_H_ - -#include -#include -#include - -#include "pls/internal/data_structures/stamped_integer.h" -#include "pls/internal/data_structures/delayed_initialization.h" -#include "pls/internal/base/alignment.h" -#include "parallel_result.h" - -namespace pls { -namespace internal { -namespace scheduling { - -class continuation_node; - -class base_continuation { - friend class cont_manager; - protected: - // We plan to only init the members for a continuation on the slow path. - // If we can execute everything inline we simply skip it saving runtime overhead. - template - using delayed_init = data_structures::delayed_initialization; - - public: - explicit base_continuation(base_continuation *parent, continuation_node *node, unsigned int result_index = 0) - : parent_{parent}, - node_{node}, - result_index_{result_index} {} - - virtual void execute() = 0; - virtual void execute_task() = 0; - virtual ~base_continuation() = default; - - virtual void *get_result_pointer(unsigned short index) = 0; - template - void store_result(unsigned short index, T &&result) { - using BASE_T = typename std::remove_cv::type>::type; - reinterpret_cast *>(get_result_pointer(index))->initialize(std::forward(result)); - } - - base_continuation *get_parent() { return parent_; } - continuation_node *get_cont_node() { return node_; } - - protected: - base_continuation *parent_; - continuation_node *node_; - unsigned int result_index_; -}; - -class continuation_node { - friend class cont_manager; - - public: - continuation_node(char *memory, continuation_node *cont_chain_start, continuation_node *prev) - : cont_chain_start_{cont_chain_start}, - prev_{prev}, - memory_{memory} {} - - // Management of the associated continuation - template - T *init_continuation(ARGS &&...args) { - PLS_ASSERT(continuation_ == nullptr, "Must only allocate one continuation at once per node.") - - auto *result = new(memory_) T(std::forward(args)...); - continuation_ = result; - return result; - } - void destroy_continuation() { - // Deconstruct Continuation - continuation_->~base_continuation(); - continuation_ = nullptr; - - // Reset Associated counters - results_missing_.store(2); - offered_chain_.store(nullptr); - auto old_state = state_.load(); - state_.store({old_state.stamp + 1, initialized}); - } - template - void destroy_continuation_fast() { - (*reinterpret_cast(continuation_)).~T(); - continuation_ = nullptr; - } - base_continuation *get_continuation() { - return continuation_; - } - continuation_node *get_prev() { - return prev_; - } - - bool is_end_of_cont_chain() { - return prev_ == continuation_->get_parent()->get_cont_node(); - } - - private: - // Linked list property of continuations (continuation chains as memory management). - // Each continuation knows its chain start to allow stealing a whole chain in O(1) - // without the need to traverse back to the chain start. - continuation_node *cont_chain_start_; - continuation_node *prev_, *next_{nullptr}; - - // When blocked on this continuation, we need to know what other chain we - // got offered by the stealing thread. - // For this we need only the head of the other chain (as each continuation is a - // self describing entity for its chain up to the given node). - std::atomic offered_chain_{nullptr}; - - // Management for coordinating concurrent result writing and stealing. - // The result count decides atomically who gets to execute the continuation. - std::atomic results_missing_{2}; - - // The flag is needed for an ongoing stealing request. - // Stealing threads need to offer their continuation chain before the - // 'fully' own the stolen task. As long as that is not done the continuation - // chain can abort the steal request in order to be not blocked without a - // new, clean continuation chain to work with. - enum state { initialized, execute_local, stealing, stolen }; - using stamped_state = data_structures::stamped_integer; - std::atomic state_{{initialized}}; - - // Pointer to memory region reserved for the companion continuation. - // Must be a buffer big enough to hold any continuation encountered in the program. - // This memory is managed explicitly by the continuation manager and runtime system - // (they need to make sure to always call de-constructors and never allocate two continuations). - char *memory_; - base_continuation *continuation_{nullptr}; -}; - -template -class continuation : public base_continuation { - private: - template - struct result_runner { - // Strip off unwanted modifiers... - using BASE_RES_TYPE = typename std::remove_cv::type>::type; - - static void execute(continuation &cont) { - parallel_result result{cont.function_((*cont.result_1_).value(), (*cont.result_2_).value())}; - if (result.fast_path()) { - cont.parent_->store_result(cont.result_index_, std::move(result)); - } - } - }; - template - struct result_runner> { - static void execute(continuation &cont) { - auto result = cont.function_((*cont.result_1_).value(), (*cont.result_2_).value()); - if (result.fast_path()) { - cont.parent_->store_result(cont.result_index_, std::move(result)); - } - } - }; - - public: - template - explicit continuation(base_continuation *parent, - continuation_node *node, - unsigned int result_index, - FARG &&function, - T2ARGS...task_2_args): - base_continuation(parent, node, result_index), - function_{std::forward(function)}, - task_{std::forward(task_2_args)...} {} - ~continuation() override = default; - - void execute() override { - using result_type = decltype(function_((*result_1_).value(), (*result_2_).value())); - result_runner::execute(*this); - } - - void execute_task() override { - task_.execute(); - } - - void *get_result_pointer(unsigned short index) override { - switch (index) { - case 0:return &result_1_; - case 1:return &result_2_; - default: PLS_ERROR("Unexpected Result Index!") - } - } - - T2 *get_task() { - return &task_; - } - - private: - // Initial data members. These slow down the fast path, try to init them lazy when possible. - F function_; - T2 task_; - - // Some fields/actual values stay uninitialized (save time on the fast path if we don not need them). - // More fields untouched on the fast path is good, but for ease of an implementation we only keep some for now. - delayed_init result_1_; - delayed_init result_2_; -}; - -} -} -} - -#endif //PLS_INTERNAL_SCHEDULING_CONTINUATION_H_ diff --git a/lib/pls/include/pls/internal/scheduling/parallel_result.h b/lib/pls/include/pls/internal/scheduling/parallel_result.h index f1ef320..9cf0358 100644 --- a/lib/pls/include/pls/internal/scheduling/parallel_result.h +++ b/lib/pls/include/pls/internal/scheduling/parallel_result.h @@ -9,8 +9,11 @@ namespace pls { namespace internal { namespace scheduling { +// Used to more enforce the use of parallel_results +class parallel_result_base {}; + template -class parallel_result { +class parallel_result : public parallel_result_base { public: using value_type = T; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 54ad872..8a6dfc9 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -3,7 +3,7 @@ #define PLS_SCHEDULER_IMPL_H #include -#include "pls/internal/scheduling/continuation.h" +#include "pls/internal/scheduling/cont.h" #include "pls/internal/scheduling/parallel_result.h" #include "pls/internal/scheduling/task.h" @@ -18,6 +18,12 @@ struct scheduler::starter { using return_type_1 = decltype(function_1_()); using return_type_2 = decltype(function_2_()); + // Enforce correct return types of lambdas (parallel_result) + static_assert(std::is_base_of::value, + "Must only return parallel results in parallel code"); + static_assert(std::is_base_of::value, + "Must only return parallel results in parallel code"); + template explicit starter(F1ARG &&function_1, F2ARG &&function_2) : function_1_{std::forward(function_1)}, function_2_{std::forward(function_2)} {}; @@ -26,64 +32,78 @@ struct scheduler::starter { auto then(FCONT &&cont_function) -> decltype(cont_function(std::declval(), std::declval())) { - using continuation_type = continuation, return_type_1, return_type_2, FCONT>; + using continuation_type = cont, return_type_1, return_type_2, FCONT>; using result_type = decltype(cont_function(std::declval(), std::declval())); auto &my_state = thread_state::get(); auto &cont_manager = my_state.get_cont_manager(); - PLS_ASSERT(sizeof(continuation_type) <= cont_manager.get_max_cont_size(), - "Must stay within the size limit of the static memory configuration."); - - // Select current continuation. + // Select current memory block. // For now directly copy both the continuation function and the second task. - auto *current_cont_node = cont_manager.fast_path_get_next(); - - // TODO: Fix null pointers at very first spawn... - // In the fast path case we are always on the left side of the tree - // and our prev cont chain node always also holds our parent continuation. - base_continuation *parent_cont = - current_cont_node->get_prev() == nullptr ? nullptr : current_cont_node->get_prev()->get_continuation(); - unsigned short int result_index = 0; - auto *current_cont = current_cont_node->init_continuation(parent_cont, - current_cont_node, - result_index, - cont_function, - function_2_, - current_cont_node); + // (We might optimize this in the future to require less memory copies.) + auto *current_memory_block = cont_manager.get_next_memory_block(); + + // We set the correct side when invoking user code. + const bool is_right_cont = my_state.right_spawn_; + + // We keep track of the last spawn to build up the parent_cont chain + base_cont *parent_cont = my_state.parent_cont_; + continuation_type *current_cont = current_memory_block->place_in_buffer(parent_cont, + current_memory_block, + is_right_cont, + cont_function, + function_2_); + my_state.parent_cont_ = current_cont; // Publish the second task. my_state.get_task_manager().publish_task(current_cont->get_task()); // Call first function on fast path + my_state.right_spawn_ = false; return_type_1 result_1 = function_1_(); if (cont_manager.falling_through()) { - return result_type{}; // Unwind stack... + // Unwind stack... + return result_type{}; } // Try to call second function on fast path if (my_state.get_task_manager().steal_local_task()) { + my_state.right_spawn_ = true; return_type_2 result_2 = function_2_(); if (cont_manager.falling_through()) { - return result_type{}; // Unwind stack... + // Main scheduling loop is responsible for entering the result to the slow path... + current_cont->store_left_result(std::move(result_1)); + cont_manager.fall_through_and_notify_cont(current_cont, false); + // Unwind stack... + return result_type{}; } // We fully got all results, inline as good as possible. // This is the common case, branch prediction should be rather good here. // Just return the cont object unused and directly call the function. - current_cont_node->destroy_continuation_fast(); - my_state.get_cont_manager().fast_path_return(); + current_cont->~continuation_type(); + current_memory_block->free_buffer(); + cont_manager.return_memory_block(); + + // The continuation has the same execution environment as we had for the children. + // We need this to allow spawns in there. + my_state.parent_cont_ = parent_cont; + my_state.right_spawn_ = is_right_cont; auto cont_result = cont_function(result_1.value(), result_2.value()); if (cont_manager.falling_through()) { - return result_type{}; // Unwind stack... + // Unwind stack... + return result_type{}; } return cont_result; } - cont_manager.fall_through(); + // Main scheduling loop is responsible for entering the result to the slow path... + current_cont->store_left_result(std::move(result_1)); + cont_manager.fall_through_and_notify_cont(current_cont, false); + // Unwind stack... return result_type{}; }; }; @@ -101,24 +121,23 @@ scheduler::starter scheduler::par(F1 &&function_1, F2 &&function_2) { class scheduler::init_function { public: - virtual parallel_result run() = 0; + virtual void run() = 0; }; template class scheduler::init_function_impl : public init_function { public: explicit init_function_impl(F &function) : function_{function} {} - parallel_result run() override { - return scheduler::par([]() { - // No-op + void run() override { + scheduler::par([]() { + std::cout << "Dummy Strain, " << scheduling::thread_state::get().get_id() << std::endl; return parallel_result{0}; }, [=]() { - function_(); - return parallel_result{0}; - }).then([](const int &, const int &) { - // Notify that work is done after finishing the last user continuation. + return function_(); + }).then([=](int, int) { thread_state::get().get_scheduler().work_section_done_ = true; return parallel_result{0}; }); + } private: F &function_; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h index 9fa177a..d6878dd 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h @@ -5,6 +5,7 @@ #include "pls/internal/base/thread.h" #include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/thread_state_static.h" namespace pls { namespace internal { @@ -43,7 +44,7 @@ class static_scheduler_memory : public scheduler_memory { } private: - using thread_state_type = static_thread_state; + using thread_state_type = thread_state_static; alignas(base::system_details::CACHE_LINE_SIZE) std::array threads_; alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_states_; @@ -77,7 +78,7 @@ class heap_scheduler_memory : public scheduler_memory { } private: - using thread_state_type = static_thread_state; + using thread_state_type = thread_state_static; // thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require // the new operator to obey alignments bigger than 16, cache lines are usually 64). // To allow this object to be allocated using 'new' (which the vector does internally), diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index ac1b998..8b10e90 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -1,8 +1,8 @@ #ifndef PLS_TASK_H #define PLS_TASK_H -#include "pls/internal/scheduling/continuation.h" -#include "pls/internal/scheduling/cont_manager.h" +#include "pls/internal/scheduling/cont.h" +#include "pls/internal/scheduling/memory_block.h" namespace pls { namespace internal { @@ -15,14 +15,6 @@ namespace scheduling { * Override the execute_internal() method for your custom code. */ class base_task { - protected: - base_task() = default; - - /** - * Overwrite this with the actual behaviour of concrete tasks. - */ - virtual void execute_internal() = 0; - public: /** * Executes the task and stores its result in the correct continuation. @@ -31,22 +23,39 @@ class base_task { void execute() { execute_internal(); } + + base_cont *get_cont() { + return cont_; + } + + protected: + explicit base_task(base_cont *cont) : cont_{cont} {}; + + /** + * Overwrite this with the actual behaviour of concrete tasks. + */ + virtual void execute_internal() = 0; + + base_cont *cont_; }; template class task : public base_task { public: template - explicit task(FARG &&function, continuation_node *continuation_node) - : base_task{}, function_{std::forward(function)}, continuation_node_{continuation_node} {} + explicit task(FARG &&function, base_cont *cont) + : base_task{cont}, function_{std::forward(function)} {} void execute_internal() override { - continuation_node_->get_continuation()->store_result(1, function_()); + auto result = function_(); + if (result.fast_path()) { + cont_->store_right_result(std::move(result)); + } } private: F function_; - continuation_node *continuation_node_; + }; } diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 6ab8c35..41117f3 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -8,6 +8,8 @@ #include #include "pls/internal/scheduling/task.h" +#include "pls/internal/scheduling/cont_manager.h" +#include "pls/internal/scheduling/memory_block.h" #include "pls/internal/data_structures/bounded_ws_deque.h" #include "pls/internal/data_structures/stamped_integer.h" @@ -17,9 +19,15 @@ namespace scheduling { struct task_handle { public: - task_handle() : task_{nullptr} {}; - explicit task_handle(base_task *task) : task_{task} {}; + task_handle() : task_{nullptr}, task_memory_block_{nullptr} {}; + explicit task_handle(base_task *task) : task_{task}, + task_memory_block_{task->get_cont()->get_memory_block()} {}; + base_task *task_; + // This seems redundant first, but is needed for a race-free steal. + // It could happen that the task's memory is overwritten and the pointer to it's memory block gets invalid. + // We can do this more elegantly in the future. + memory_block *task_memory_block_; }; /** @@ -40,8 +48,47 @@ class task_manager { // Try to steal a task from a remote task_manager instance. The stolen task must be stored locally. // Returns a pair containing the actual task and if the steal was successful. - // TODO: Re-implement after fast path is done -// std::pair steal_remote_task(task_manager &other); + base_task *steal_remote_task(cont_manager &stealing_cont_manager) { + auto stolen_task_handle = task_deque_.pop_top(); + if (stolen_task_handle) { + base_task *stolen_task = (*stolen_task_handle).task_; + auto &atomic_state = (*stolen_task_handle).task_memory_block_->get_state(); + auto &atomic_offered_chain = (*stolen_task_handle).task_memory_block_->get_offered_chain(); + auto offered_chain = stealing_cont_manager.get_node((*stolen_task_handle).task_memory_block_->get_depth()); + + auto last_state = atomic_state.load(); + if (last_state.value != memory_block::initialized) { + return nullptr; + } + + auto last_offered_chain = atomic_offered_chain.load(); + memory_block::stamped_state loop_state = {last_state.stamp + 1, memory_block::stealing}; + + if (atomic_state.compare_exchange_strong(last_state, loop_state)) { + while (true) { + if (atomic_offered_chain.compare_exchange_strong(last_offered_chain, offered_chain)) { + break; + } + + last_offered_chain = atomic_offered_chain.load(); + last_state = atomic_state.load(); + if (last_state != loop_state) { + return nullptr; + } + } + + if (atomic_state.compare_exchange_strong(loop_state, {loop_state.stamp + 1, memory_block::stolen})) { + return stolen_task; + } else { + return nullptr; + } + } else { + return nullptr; + } + } else { + return nullptr; + } + } explicit task_manager(data_structures::bounded_ws_deque &task_deque) : task_deque_{task_deque} {} diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index b1b3867..5b665a0 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -7,21 +7,27 @@ #include #include -#include "pls/internal/scheduling/task_manager.h" -#include "pls/internal/scheduling/cont_manager.h" - namespace pls { namespace internal { namespace scheduling { // forward declaration +class task_manager; +class cont_manager; class scheduler; class base_task; +class base_cont; struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { scheduler *scheduler_; size_t id_; + // Keep track of the last spawn state (needed to chain tasks/conts correctly) + bool right_spawn_; + base_cont *parent_cont_; + // TODO: Set this when spawning! + // See if we should move this to the cont manager...seems like a better fit! + task_manager &task_manager_; cont_manager &cont_manager_; @@ -33,6 +39,8 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { cont_manager &cont_manager) : scheduler_{nullptr}, id_{0}, + right_spawn_{false}, + parent_cont_{nullptr}, task_manager_{task_manager}, cont_manager_{cont_manager}, current_task_{nullptr}, @@ -62,21 +70,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { thread_state &operator=(const thread_state &) = delete; }; -template -struct static_thread_state { - public: - static_thread_state() - : static_task_manager_{}, - static_cont_manager_{}, - thread_state_{static_task_manager_.get_task_manager(), static_cont_manager_.get_cont_manager()} {} - thread_state &get_thread_state() { return thread_state_; } - - private: - static_task_manager static_task_manager_; - static_cont_manager static_cont_manager_; - thread_state thread_state_; -}; - } } } diff --git a/lib/pls/src/internal/base/alignment.cpp b/lib/pls/src/internal/base/alignment.cpp index 83802d1..ea8a22d 100644 --- a/lib/pls/src/internal/base/alignment.cpp +++ b/lib/pls/src/internal/base/alignment.cpp @@ -5,20 +5,6 @@ namespace internal { namespace base { namespace alignment { -system_details::pointer_t next_alignment(system_details::pointer_t size, - size_t alignment) { - return (size % alignment) == 0 ? - size : - size + (alignment - (size % alignment)); -} - -system_details::pointer_t previous_alignment(system_details::pointer_t size, - size_t alignment) { - return (size % alignment) == 0 ? - size : - size - (size % alignment); -} - char *next_alignment(char *pointer, size_t alignment) { return reinterpret_cast(next_alignment(reinterpret_cast(pointer), alignment)); } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 9344a8b..78d2719 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -54,20 +54,44 @@ void scheduler::work_thread_main_loop() { void scheduler::work_thread_work_section() { auto &my_state = thread_state::get(); + auto &my_cont_manager = my_state.get_cont_manager(); + + auto const num_threads = my_state.get_scheduler().num_threads(); + auto const my_id = my_state.get_id(); if (my_state.get_id() == 0) { - // Main Thread, kick of by executing the user's main code block. + // Main Thread, kick off by executing the user's main code block. main_thread_starter_function_->run(); } do { - // TODO: Implement other threads, for now we are happy if it compiles and runs on one thread - // For now we can test without this, as the fast path should never hit this. - // 1) Try Steal - // 2) Copy Over - // 3) Finish Steal - // 4) Execute Local Copy + // Work off pending continuations we need to execute locally + while (my_cont_manager.falling_through()) { + my_cont_manager.execute_fall_through_code(); + } + + // Steal Routine (will be continuously executed when there are no more fall through's). + // TODO: move into separate function + const size_t offset = my_state.random_() % num_threads; + const size_t max_tries = num_threads - 1; + for (size_t i = 0; i < max_tries; i++) { + size_t target = (offset + i) % num_threads; + + // Skip our self for stealing + target = ((target == my_id) + target) % num_threads; + + auto &target_state = my_state.get_scheduler().thread_state_for(target); + + auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager); + if (stolen_task != nullptr) { + my_state.parent_cont_ = stolen_task->get_cont(); + my_state.right_spawn_ = true; + my_cont_manager.set_active_depth(stolen_task->get_cont()->get_memory_block()->get_depth() + 1); + stolen_task->execute(); + } + } } while (!work_section_done_); + } void scheduler::terminate() { -- libgit2 0.26.0