From c2d4bc2544b31d5b85e05a76474a42d519796ff1 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Tue, 19 Nov 2019 15:47:40 +0100 Subject: [PATCH] WIP: Fast path with 'outlined' slow path code in place. Everything so far is untested. We only made sure tha fast path still seems to function correctly. Next up is writing tests for both the fast and slow path to then introduce the slow path. After that we can look at performance optimizations. --- app/playground/main.cpp | 7 ++++--- lib/pls/CMakeLists.txt | 8 +++++--- lib/pls/include/pls/internal/base/alignment.h | 41 ++++++++++++++++++++++++++++++----------- lib/pls/include/pls/internal/data_structures/aligned_stack.h | 2 +- lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h | 2 +- lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/data_structures/delayed_initialization.h | 79 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h | 75 --------------------------------------------------------------------------- lib/pls/include/pls/internal/data_structures/optional.h | 37 +++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/cont_manager.h | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------- lib/pls/include/pls/internal/scheduling/continuation.h | 209 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------------------------------------- lib/pls/include/pls/internal/scheduling/parallel_result.h | 34 ++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------- lib/pls/include/pls/internal/scheduling/task.h | 13 ++++++------- lib/pls/include/pls/internal/scheduling/task_manager.h | 54 ++++++++++-------------------------------------------- lib/pls/src/internal/base/alignment.cpp | 29 +++++++++++++++++++++++++++++ test/CMakeLists.txt | 4 +--- test/algorithm_test.cpp | 88 ---------------------------------------------------------------------------------------- test/data_structures_test.cpp | 249 ++++++++++++++++++++++++++++++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- test/dataflow_test.cpp | 114 ------------------------------------------------------------------------------------------------------------------ test/scheduling_tests.cpp | 74 +------------------------------------------------------------------------- 21 files changed, 728 insertions(+), 728 deletions(-) create mode 100644 lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h create mode 100644 lib/pls/include/pls/internal/data_structures/delayed_initialization.h delete mode 100644 lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h create mode 100644 lib/pls/include/pls/internal/data_structures/optional.h create mode 100644 lib/pls/include/pls/internal/scheduling/parallel_result.h create mode 100644 lib/pls/src/internal/base/alignment.cpp delete mode 100644 test/algorithm_test.cpp delete mode 100644 test/dataflow_test.cpp diff --git a/app/playground/main.cpp b/app/playground/main.cpp index eb815c7..a3417b3 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -53,7 +53,7 @@ int main() { scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS}; auto start = std::chrono::steady_clock::now(); - std::cout << "fib = " << fib_normal(41) << std::endl; + std::cout << "fib = " << fib_normal(39) << std::endl; auto end = std::chrono::steady_clock::now(); std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() << std::endl; @@ -62,11 +62,12 @@ int main() { scheduler.perform_work([]() { return scheduling::scheduler::par([]() { - return fib(41); + return fib(39); }, []() { return scheduling::parallel_result{0}; }).then([](int a, int b) { - std::cout << "fib = " << a << std::endl; + std::cout << "fib = " << a + b << std::endl; + return a + b; }); }); diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 7c6176b..12e7e97 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -1,6 +1,6 @@ # List all required files here (cmake best practice to NOT automate this step!) add_library(pls STATIC -# include/pls/pls.h src/pls.cpp + # include/pls/pls.h src/pls.cpp # # include/pls/algorithms/invoke.h # include/pls/algorithms/invoke_impl.h @@ -39,7 +39,7 @@ add_library(pls STATIC include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/stamped_integer.h - include/pls/internal/data_structures/delayed_initialization_wrapper.h + include/pls/internal/data_structures/delayed_initialization.h include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/profiler.h @@ -49,6 +49,7 @@ add_library(pls STATIC include/pls/internal/helpers/seqence.h include/pls/internal/helpers/member_function.h + include/pls/internal/scheduling/parallel_result.h include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h @@ -56,7 +57,8 @@ add_library(pls STATIC include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/cont_manager.h - include/pls/internal/scheduling/continuation.h include/pls/internal/scheduling/parallel_result.h src/internal/base/alignment.cpp) + include/pls/internal/scheduling/continuation.h + include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h src/internal/base/alignment.cpp src/internal/base/alignment.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index 26c7762..4296d40 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -13,12 +13,16 @@ namespace internal { namespace base { namespace alignment { -system_details::pointer_t next_alignment(system_details::pointer_t size); -system_details::pointer_t previous_alignment(system_details::pointer_t size); -char *next_alignment(char *pointer); +system_details::pointer_t next_alignment(system_details::pointer_t size, + size_t alignment = system_details::CACHE_LINE_SIZE); + +system_details::pointer_t previous_alignment(system_details::pointer_t size, + size_t alignment = system_details::CACHE_LINE_SIZE); + +char *next_alignment(char *pointer, size_t alignment = system_details::CACHE_LINE_SIZE); /** - * Forces alignment requirements on a type equal to a cache line size. + * Forces alignment requirements on a type equal to the given size. * This can be useful to store the elements aligned in an array or to allocate them using new. * * The object is constructed using perfect forwarding. Thus initialization looks identical to the @@ -28,21 +32,36 @@ char *next_alignment(char *pointer); * (This is required, as C++11 does not allow 'over_aligned' types, meaning alignments higher * than the max_align_t (most times 16 bytes) wont be respected properly.) */ -template -struct alignas(system_details::CACHE_LINE_SIZE) cache_alignment_wrapper { +template +struct alignment_wrapper { + static_assert(alignof(T) <= ALIGNMENT, "Must not wrap an type to an alignment smaller than required by the type!"); + private: - std::array memory_; - char *data_; + std::array memory_; + T *data_; public: template - explicit cache_alignment_wrapper(ARGS &&...args): memory_{}, data_{next_alignment(memory_.data())} { + explicit alignment_wrapper(ARGS &&...args) + : memory_{}, data_{reinterpret_cast(next_alignment(memory_.data(), ALIGNMENT))} { new(data_) T(std::forward(args)...); } + ~alignment_wrapper() { + data_->~T(); + } - T &object() { return *reinterpret_cast(data_); } - T *pointer() { return reinterpret_cast(data_); } + T &object() { return *data_; } + T *pointer() { return data_; } }; +/** + * Our common case or over aligning types is when we want to hit cache lines. + * The most portable way is to use the above alignment wrapper, removing any alignment + * requirements from the wrapped type itself (here the cache line) and manually filling + * up padding/fill bytes as needed. + */ +template +using cache_alignment_wrapper = alignment_wrapper; + } } } diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h index cbbadbc..d3ebe0e 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h @@ -68,8 +68,8 @@ class static_aligned_stack { aligned_stack &get_stack() { return aligned_stack_; } private: - aligned_stack aligned_stack_; alignas(base::system_details::CACHE_LINE_SIZE) std::array memory_; + aligned_stack aligned_stack_; }; class heap_aligned_stack { diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h index 41f6bf1..60fb0b8 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h @@ -24,7 +24,7 @@ void aligned_stack::pop() { current_offset_ -= num_cache_lines; auto result = *reinterpret_cast(memory_at_offset(current_offset_)); - ~result(); + result.~T(); } template diff --git a/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h new file mode 100644 index 0000000..ccc6bfa --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h @@ -0,0 +1,115 @@ + +#ifndef PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_ +#define PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_ + +#include +#include +#include + +#include "pls/internal/base/system_details.h" +#include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/data_structures/optional.h" + +namespace pls { +namespace internal { +namespace data_structures { + +/** + * Classic, text book ws bounded deque based on arrays. + * Stores a fixed amount of fixed size objects in an array, + * allowing or local push/pop on the bottom and remote + * pop on the top. + * + * The local operations are cheap as long as head and tail are + * far enough apart, making it ideal to avoid cache problems. + * + * Depends on overaligned datatypes to be cache line friendly. + * This does not concern C++14 and upwards, but hinders you to properly + * allocate it on the heap in C++11 (see base::alignment::alignment_wrapper for a solution). + */ +// TODO: Relax memory orderings in here... +template +class bounded_ws_deque { + public: + bounded_ws_deque(T *item_array, size_t size) : size_{size}, item_array_{item_array} {} + + void push_bottom(T item) { + item_array_[bottom_] = item; + local_bottom_++; + bottom_.store(local_bottom_); + } + + bool is_empty() { + return top_.load().value < bottom_; + } + + optional pop_top() { + stamped_integer old_top = top_.load(); + unsigned int new_stamp = old_top.stamp + 1; + unsigned int new_value = old_top.value + 1; + + if (bottom_.load() <= old_top.value) { + return optional(); + } + + optional result(item_array_[old_top.value]); + if (top_.compare_exchange_strong(old_top, {new_stamp, new_value})) { + return result; + } + + return optional(); + } + + optional pop_bottom() { + if (local_bottom_ == 0) { + return optional(); + } + + local_bottom_--; + bottom_.store(local_bottom_); + + optional result(item_array_[local_bottom_]); + + stamped_integer old_top = top_.load(); + if (local_bottom_ > old_top.value) { + // Enough distance to just return the value + return result; + } + if (local_bottom_ == old_top.value) { + local_bottom_ = 0; + bottom_.store(local_bottom_); + if (top_.compare_exchange_strong(old_top, {old_top.stamp + 1, 0})) { + // We won the competition and the queue is empty + return result; + } + } + + // The queue is empty and we lost the competition + top_.store({old_top.stamp + 1, 0}); + return optional(); + } + + private: + alignas(base::system_details::CACHE_LINE_SIZE) std::atomic top_{stamped_integer{0, 0}}; + alignas(base::system_details::CACHE_LINE_SIZE) std::atomic bottom_{0}; + unsigned int local_bottom_{0}; + size_t size_; + T *item_array_; +}; + +template +class static_bounded_ws_deque { + public: + static_bounded_ws_deque() : items_{}, deque_{items_.data(), SIZE} {} + + bounded_ws_deque &get_deque() { return deque_; } + private: + std::array items_; + bounded_ws_deque deque_; +}; + +} +} +} + +#endif //PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/data_structures/delayed_initialization.h b/lib/pls/include/pls/internal/data_structures/delayed_initialization.h new file mode 100644 index 0000000..610d98a --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/delayed_initialization.h @@ -0,0 +1,79 @@ + +#ifndef PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_H_ +#define PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_H_ + +#include +#include + +#include "pls/internal/base/error_handling.h" + +namespace pls { +namespace internal { +namespace data_structures { + +/** + * Allows to reserve space for an uninitialized member variable. + * The member must be initialized before usage using the provided + * perfect forwarding constructor method. + * + * Takes care of the de-construction the contained object if one is active. + */ +template +class delayed_initialization { + public: + delayed_initialization() : memory_{}, initialized_{false} {} + delayed_initialization(delayed_initialization &&other) noexcept { + initialized_ = other.initialized_; + if (other.initialized_) { + object() = std::move(other.object()); + other.initialized_ = false; + } + } + + template + explicit delayed_initialization(ARGS &&...args): memory_{}, initialized_{true} { + new(memory_.data()) T(std::forward(args)...); + } + + ~delayed_initialization() { + if (initialized_) { + object().~T(); + } + } + + template + void initialize(ARGS &&...args) { + PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!") + + new((void *) memory_.data()) T(std::forward(args)...); + initialized_ = true; + } + + void destroy() { + PLS_ASSERT(initialized_, "Can only destroy initialized objects!") + + object().~T(); + initialized_ = false; + } + + T &object() { + PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!") + return *reinterpret_cast(memory_.data()); + } + + T &operator*() { + return object(); + } + + bool initialized() const { return initialized_; } + + private: + std::array memory_; + bool initialized_; +}; + +} +} +} + +#endif // PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_H_ diff --git a/lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h b/lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h deleted file mode 100644 index ac31ea0..0000000 --- a/lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h +++ /dev/null @@ -1,75 +0,0 @@ - -#ifndef PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_ -#define PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_ - -#include -#include - -#include "pls/internal/base/error_handling.h" - -namespace pls { -namespace internal { -namespace data_structures { - -/** - * Allows to reserve space for an uninitialized member variable. - * The member must be initialized before usage using the provided - * perfect forwarding constructor method. - * - * Makes sure to call the wrapped objects de-constructor when an object is wrapped. - */ -template -class delayed_initialization_wrapper { - public: - delayed_initialization_wrapper() : memory_{}, initialized_{false} {} - delayed_initialization_wrapper(delayed_initialization_wrapper &&other) noexcept { - initialized_ = other.initialized_; - if (other.initialized_) { - object() = std::move(other.object()); - other.initialized_ = false; - } - } - - template - explicit delayed_initialization_wrapper(ARGS &&...args): memory_{}, initialized_{true} { - new(memory_.data()) T(std::forward(args)...); - } - - ~delayed_initialization_wrapper() { - if (initialized_) { - object().~T(); - } - } - - template - void initialize(ARGS &&...args) { - PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!") - - new(memory_.data()) T(std::forward(args)...); - initialized_ = true; - } - - void destroy() { - PLS_ASSERT(initialized_, "Can only destroy initialized objects!") - - object().~T(); - initialized_ = false; - } - - T &object() { - PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!") - return *reinterpret_cast(memory_.data()); - } - - bool initialized() const { return initialized_; } - - private: - std::array memory_; - bool initialized_; -}; - -} -} -} - -#endif // PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_ diff --git a/lib/pls/include/pls/internal/data_structures/optional.h b/lib/pls/include/pls/internal/data_structures/optional.h new file mode 100644 index 0000000..22f9eca --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/optional.h @@ -0,0 +1,37 @@ + +#ifndef PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_ +#define PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_ + +#include + +#include "pls/internal/data_structures/delayed_initialization.h" + +namespace pls { +namespace internal { +namespace data_structures { + +template +class optional { + public: + optional() = default; + + template + explicit optional(ARGS &&...args): data_{std::forward(args)...} {} + + operator bool() const { + return data_.initialized(); + } + + T &operator*() { + return *data_; + } + + private: + delayed_initialization data_; +}; + +} +} +} + +#endif //PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_ diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h index dbc787d..b2b9878 100644 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ b/lib/pls/include/pls/internal/scheduling/cont_manager.h @@ -20,7 +20,8 @@ class cont_manager { struct template_args {}; template - explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args) { + explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args) + : max_cont_size_{MAX_CONT_SIZE} { // First node is currently active and our local start start_node_ = active_node_ = init_cont_node(cont_storage, nullptr, nullptr); @@ -29,37 +30,166 @@ class cont_manager { continuation_node *current_node = start_node_; for (size_t i = 1; i < NUM_CONTS; i++) { continuation_node *next_node = init_cont_node(cont_storage, start_node_, current_node); - current_node->set_next(next_node); + current_node->next_ = next_node; current_node = next_node; } }; + // Fast-path methods continuation_node *fast_path_get_next() { - active_node_ = active_node_->get_next(); - return active_node_; + auto result = active_node_; + + active_node_ = active_node_->next_; + active_node_->cont_chain_start_ = start_node_; + + return result; } void fast_path_return() { - active_node_ = active_node_->get_prev(); + active_node_ = active_node_->prev_; + } + + // Slow-path methods + void slow_path_return() { + active_node_ = active_node_->prev_; + active_node_->destroy_continuation(); + } + bool falling_through() const { + return fall_through_; + } + void fall_through() { + fall_through_ = true; + fall_through_node_ = nullptr; } + void fall_through_and_execute(continuation_node *continuation_node) { + fall_through_ = true; + fall_through_node_ = continuation_node; + } + void aquire_clean_cont_chain(continuation_node *clean_chain) { + // Link active node backwards and other chain forwards + active_node_->prev_ = clean_chain; + active_node_->cont_chain_start_ = clean_chain; + clean_chain->next_ = active_node_; + // Update our local chain start AND reset the active node to the start (we operate on a clean chain) + start_node_ = clean_chain->cont_chain_start_; + active_node_ = clean_chain; + } + void aquire_blocked_cont_chain(continuation_node *blocked_chain) { + // Link active node backwards and other chain forwards + active_node_->prev_ = blocked_chain; + active_node_->cont_chain_start_ = blocked_chain; + blocked_chain->next_ = active_node_; + // Update our local chain start, NOT our active node (we continue execution on top of the blocking chain) + start_node_ = blocked_chain->cont_chain_start_; + } + + void execute_fall_through_continuation() { + PLS_ASSERT(fall_through_node_ != nullptr, + "Must not execute continuation node without associated continuation code."); + + // Reset our state to from falling through + continuation_node *executed_node = fall_through_node_; + fall_through_ = false; + fall_through_node_ = nullptr; + + // Grab everyone involved in the transaction + base_continuation *executed_continuation = executed_node->continuation_; + base_continuation *parent_continuation = executed_continuation->parent_; + continuation_node *parent_node = parent_continuation->node_; + + // Execute the continuation itself + executed_continuation->execute(); + // Slow path return destroys it and resets our current cont_node + slow_path_return(); + + if (falling_through()) { + return; + } + + if (executed_node->is_end_of_cont_chain()) { + // We are at the end of our local part of the cont chain and therefore + // not invested in the continuation_chain above. + + // Notify the next continuation of finishing a child... + if (parent_node->results_missing_.fetch_add(-1) == 1) { + // ... we finished the last continuation, thus we need to take responsibility of the above cont chain. + aquire_blocked_cont_chain(parent_node->cont_chain_start_); + fall_through_and_execute(parent_node); + return; + } else { + // ... we did not finish the last continuation, thus we are not concerned with this computational + // path anymore. Just ignore it and fall through to mind our own business. + fall_through(); + return; + } + } else { + // We are invested in the continuation_chain above (its part of our local continuation node chain). + // We keep executing it if possible and need to 'clean our state' with a swapped chain if not. + + // ...we could be blocked (and then need a clean cont chain). + // For the rare case that the stealing processes is still going on, we + // need to fight with it over directly executing the task that is currently being stolen. + continuation_node::stamped_state task_state{continuation_node::state::execute_local}; + parent_node->state_.exchange(task_state, std::memory_order_acq_rel); + if (task_state.value == continuation_node::state::stolen) { + // The other task strain is executing. + // Notify the parent continuation of finishing our child... + if (parent_node->results_missing_.fetch_add(-1) == 1) { + // ... we finished the last continuation, thus we keep the responsibility for the above cont chain. + fall_through_and_execute(parent_node); + return; + } else { + // ... we did not finish the last continuation, thus we need to give up our local cont chain in + // favour of the stolen one (leaving us with a clean chain for further operation). + aquire_clean_cont_chain(parent_node->cont_chain_start_); + fall_through(); + return; + } + } else { + // We 'overruled' the stealing thread and execute the other task ourselfs. + // We can be sure that only we where involved in executing the child tasks of the parent_continuation... + parent_continuation->node_->results_missing_.fetch_add(-1); + parent_continuation->execute_task(); + if (falling_through()) { + // ... if the second strain was interrupted we fall through without scheduling the parent_continuation + // (the currently pending/interrupted strain will do this itself). + return; + } + // ...if we could execute the second branch without interruption, + // we can schedule the parent safely for execution. + fall_through_and_execute(parent_continuation->node_); + } + } + } + + size_t get_max_cont_size() const { return max_cont_size_; } private: template static continuation_node *init_cont_node(data_structures::aligned_stack &cont_storage, continuation_node *cont_chain_start, continuation_node *prev) { - // Represents one cont node and its corresponding memory buffer (as one continuous block of memory). + // Represents one cont_node and its corresponding memory buffer (as one continuous block of memory). constexpr size_t buffer_size = MAX_CONT_SIZE - sizeof(continuation_node); - using cont_node_memory_pair = std::pair>; - char *pair_memory = cont_storage.push_bytes(); - char *cont_node_address = pair_memory; - char *cont_node_memory_address = pair_memory + sizeof(continuation_node); + char *cont_node_address = cont_storage.push_bytes(); + char *cont_node_memory_address = cont_storage.push_bytes(buffer_size); return new(cont_node_address) continuation_node(cont_node_memory_address, cont_chain_start, prev); } private: + const size_t max_cont_size_; + + /** + * Managing the continuation chain. + */ continuation_node *start_node_; continuation_node *active_node_; + + /** + * Managing falling through back to the scheduler. + */ + bool fall_through_{false}; + continuation_node *fall_through_node_{nullptr}; }; template diff --git a/lib/pls/include/pls/internal/scheduling/continuation.h b/lib/pls/include/pls/internal/scheduling/continuation.h index 6a2ef5f..f5a9919 100644 --- a/lib/pls/include/pls/internal/scheduling/continuation.h +++ b/lib/pls/include/pls/internal/scheduling/continuation.h @@ -6,106 +6,197 @@ #include #include -#include "pls/internal/data_structures/delayed_initialization_wrapper.h" +#include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/data_structures/delayed_initialization.h" +#include "pls/internal/base/alignment.h" +#include "parallel_result.h" namespace pls { namespace internal { namespace scheduling { +class continuation_node; + class base_continuation { + friend class cont_manager; + protected: + // We plan to only init the members for a continuation on the slow path. + // If we can execute everything inline we simply skip it saving runtime overhead. + template + using delayed_init = data_structures::delayed_initialization; + public: - virtual void run() = 0; - virtual ~base_continuation() = 0; + explicit base_continuation(base_continuation *parent, continuation_node *node, unsigned int result_index = 0) + : parent_{parent}, + node_{node}, + result_index_{result_index} {} + + virtual void execute() = 0; + virtual void execute_task() = 0; + virtual ~base_continuation() = default; + + virtual void *get_result_pointer(unsigned short index) = 0; + template + void store_result(unsigned short index, T &&result) { + using BASE_T = typename std::remove_cv::type>::type; + reinterpret_cast *>(get_result_pointer(index))->initialize(std::forward(result)); + } + + base_continuation *get_parent() { return parent_; } + continuation_node *get_cont_node() { return node_; } + + protected: + base_continuation *parent_; + continuation_node *node_; + unsigned int result_index_; }; class continuation_node { + friend class cont_manager; + public: - continuation_node(char *continuation_memory, continuation_node *cont_chain_start, continuation_node *prev) - : continuation_memory_{continuation_memory}, - continuation_{nullptr}, - cont_chain_start_{cont_chain_start}, + continuation_node(char *memory, continuation_node *cont_chain_start, continuation_node *prev) + : cont_chain_start_{cont_chain_start}, prev_{prev}, - next_{nullptr}, - offered_chain_{nullptr} {} + memory_{memory} {} - void set_cont_chain_start(continuation_node *cont_chain_start) { cont_chain_start_ = cont_chain_start; } - continuation_node *get_cont_chain_start() { return cont_chain_start_; } - void set_next(continuation_node *next) { next_ = next; } - continuation_node *get_next() { return next_; } - void set_prev(continuation_node *prev) { prev_ = prev; } - continuation_node *get_prev() { return prev_; } + // Management of the associated continuation + template + T *init_continuation(ARGS &&...args) { + PLS_ASSERT(continuation_ == nullptr, "Must only allocate one continuation at once per node.") - base_continuation *continuation() { return continuation_; } + auto *result = new(memory_) T(std::forward(args)...); + continuation_ = result; + return result; + } + void destroy_continuation() { + // Deconstruct Continuation + continuation_->~base_continuation(); + continuation_ = nullptr; + + // Reset Associated counters + results_missing_.store(2); + offered_chain_.store(nullptr); + auto old_state = state_.load(); + state_.store({old_state.stamp + 1, initialized}); + } + template + void destroy_continuation_fast() { + (*reinterpret_cast(continuation_)).~T(); + continuation_ = nullptr; + } + base_continuation *get_continuation() { + return continuation_; + } + continuation_node *get_prev() { + return prev_; + } - private: - // Pointer to memory region reserved for the companion continuation. - // Must be a buffer big enough to hold any continuation encountered in the program. - char *continuation_memory_; - base_continuation *continuation_; + bool is_end_of_cont_chain() { + return prev_ == continuation_->get_parent()->get_cont_node(); + } + private: // Linked list property of continuations (continuation chains as memory management). // Each continuation knows its chain start to allow stealing a whole chain in O(1) // without the need to traverse back to the chain start. continuation_node *cont_chain_start_; - continuation_node *prev_, *next_; + continuation_node *prev_, *next_{nullptr}; // When blocked on this continuation, we need to know what other chain we // got offered by the stealing thread. // For this we need only the head of the other chain (as each continuation is a // self describing entity for its chain up to the given node). - continuation_node *offered_chain_; + std::atomic offered_chain_{nullptr}; + + // Management for coordinating concurrent result writing and stealing. + // The result count decides atomically who gets to execute the continuation. + std::atomic results_missing_{2}; + + // The flag is needed for an ongoing stealing request. + // Stealing threads need to offer their continuation chain before the + // 'fully' own the stolen task. As long as that is not done the continuation + // chain can abort the steal request in order to be not blocked without a + // new, clean continuation chain to work with. + enum state { initialized, execute_local, stealing, stolen }; + using stamped_state = data_structures::stamped_integer; + std::atomic state_{{initialized}}; + + // Pointer to memory region reserved for the companion continuation. + // Must be a buffer big enough to hold any continuation encountered in the program. + // This memory is managed explicitly by the continuation manager and runtime system + // (they need to make sure to always call de-constructors and never allocate two continuations). + char *memory_; + base_continuation *continuation_{nullptr}; }; -template +template class continuation : public base_continuation { - public: - void run() override { - // TODO: integrate this better into the runtime system. - // E.g. handle passing the result to the parent continuation - function_.object()(result_1_.object(), result_2_.object()); - } + private: + template + struct result_runner { + // Strip off unwanted modifiers... + using BASE_RES_TYPE = typename std::remove_cv::type>::type; + + static void execute(continuation &cont) { + parallel_result result{cont.function_((*cont.result_1_).value(), (*cont.result_2_).value())}; + if (result.fast_path()) { + cont.parent_->store_result(cont.result_index_, std::move(result)); + } + } + }; + template + struct result_runner> { + static void execute(continuation &cont) { + auto result = cont.function_((*cont.result_1_).value(), (*cont.result_2_).value()); + if (result.fast_path()) { + cont.parent_->store_result(cont.result_index_, std::move(result)); + } + } + }; - // Warning: De-constructor only called once the continuation is actually needed. - // This should be a non issue, as all memory in the continuation is uninitialized - // until the first use anyways to save runtime. + public: + template + explicit continuation(base_continuation *parent, + continuation_node *node, + unsigned int result_index, + FARG &&function, + T2ARGS...task_2_args): + base_continuation(parent, node, result_index), + function_{std::forward(function)}, + task_{std::forward(task_2_args)...} {} ~continuation() override = default; - template - void store_result_1_(R1ARG &&result_1) { - static_assert(std::is_same::type>::value, - "must only copy/move objects in, not construct them"); - result_1_.initialize(std::forward(result_1)); + void execute() override { + using result_type = decltype(function_((*result_1_).value(), (*result_2_).value())); + result_runner::execute(*this); } - template - void store_result_2(R2ARG &&result_1) { - static_assert(std::is_same::type>::value, - "must only copy/move objects in, not construct them"); - result_2_.initialize(std::forward(result_1)); + void execute_task() override { + task_.execute(); } - template - void store_function(FARG &&function) { - static_assert(std::is_same::type>::value, - "must only copy/move objects in, not construct them"); - function_.initialize(function); + void *get_result_pointer(unsigned short index) override { + switch (index) { + case 0:return &result_1_; + case 1:return &result_2_; + default: PLS_ERROR("Unexpected Result Index!") + } } - private: - // We plan to only init the members for a continuation on the slow path. - // If we can execute everything inline we simply skip it saving runtime overhead. - template - using delayed_init = data_structures::delayed_initialization_wrapper; + T2 *get_task() { + return &task_; + } - // Also uninitialized at first, only take the atomic write on the slow path. - // The stealer will init it to 2 while stealing, the 'stolen' sync will then make sure - // everyone sees the value in correct order. - std::atomic results_missing_{}; + private: + // Initial data members. These slow down the fast path, try to init them lazy when possible. + F function_; + T2 task_; - // All fields/actual values stay uninitialized (save time on the fast path if we don not need them). + // Some fields/actual values stay uninitialized (save time on the fast path if we don not need them). + // More fields untouched on the fast path is good, but for ease of an implementation we only keep some for now. delayed_init result_1_; delayed_init result_2_; - delayed_init function_; }; } diff --git a/lib/pls/include/pls/internal/scheduling/parallel_result.h b/lib/pls/include/pls/internal/scheduling/parallel_result.h new file mode 100644 index 0000000..f1ef320 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/parallel_result.h @@ -0,0 +1,34 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_ +#define PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_ + +#include +#include "pls/internal/data_structures/delayed_initialization.h" + +namespace pls { +namespace internal { +namespace scheduling { + +template +class parallel_result { + public: + using value_type = T; + + parallel_result() = default; + parallel_result(parallel_result &&other) noexcept : val_{std::move(other.val_)} {} + parallel_result(const parallel_result &other) noexcept : val_{other.val_} {} + + parallel_result(T val) : val_{std::move(val)} {} + + bool fast_path() { return val_.initialized(); } + T &value() { return val_.object(); } + + private: + data_structures::delayed_initialization val_; +}; + +} +} +} + +#endif //PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_ diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 664adc0..54ad872 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -22,31 +22,69 @@ struct scheduler::starter { explicit starter(F1ARG &&function_1, F2ARG &&function_2) : function_1_{std::forward(function_1)}, function_2_{std::forward(function_2)} {}; - template - auto then(F &&cont_function) + template + auto then(FCONT &&cont_function) -> decltype(cont_function(std::declval(), std::declval())) { - using continuation_type = continuation; + using continuation_type = continuation, return_type_1, return_type_2, FCONT>; + using result_type = decltype(cont_function(std::declval(), + std::declval())); + auto &my_state = thread_state::get(); + auto &cont_manager = my_state.get_cont_manager(); + + PLS_ASSERT(sizeof(continuation_type) <= cont_manager.get_max_cont_size(), + "Must stay within the size limit of the static memory configuration."); + + // Select current continuation. + // For now directly copy both the continuation function and the second task. + auto *current_cont_node = cont_manager.fast_path_get_next(); + + // TODO: Fix null pointers at very first spawn... + // In the fast path case we are always on the left side of the tree + // and our prev cont chain node always also holds our parent continuation. + base_continuation *parent_cont = + current_cont_node->get_prev() == nullptr ? nullptr : current_cont_node->get_prev()->get_continuation(); + unsigned short int result_index = 0; + auto *current_cont = current_cont_node->init_continuation(parent_cont, + current_cont_node, + result_index, + cont_function, + function_2_, + current_cont_node); + + // Publish the second task. + my_state.get_task_manager().publish_task(current_cont->get_task()); + + // Call first function on fast path + return_type_1 result_1 = function_1_(); + if (cont_manager.falling_through()) { + return result_type{}; // Unwind stack... + } - // Select current continuation - auto *current_cont = my_state.get_cont_manager().fast_path_get_next(); + // Try to call second function on fast path + if (my_state.get_task_manager().steal_local_task()) { + return_type_2 result_2 = function_2_(); + if (cont_manager.falling_through()) { + return result_type{}; // Unwind stack... + } - // Move function_2 directly onto the task stack (should in theory be constructed on there). - // Consider only copying it later on, as this could allow a more efficient fast path with inlining. - task task_2{function_2_, current_cont}; - my_state.get_task_manager().publish_task(task_2); + // We fully got all results, inline as good as possible. + // This is the common case, branch prediction should be rather good here. - // Call first function - auto result_1 = function_1_(); - // Try to pop from stack - if (my_state.get_task_manager().steal_local_task()) { + // Just return the cont object unused and directly call the function. + current_cont_node->destroy_continuation_fast(); my_state.get_cont_manager().fast_path_return(); - auto result_2 = function_2_(); - return cont_function(result_1.value(), result_2.value()); - } else { - PLS_ERROR("Slow Path Not Implemented!!!") + + auto cont_result = cont_function(result_1.value(), result_2.value()); + if (cont_manager.falling_through()) { + return result_type{}; // Unwind stack... + } + return cont_result; } + + cont_manager.fall_through(); + return result_type{}; }; }; diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 45abb03..ac1b998 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -24,25 +24,24 @@ class base_task { virtual void execute_internal() = 0; public: + /** + * Executes the task and stores its result in the correct continuation. + * The caller must handle finishing the continuation/informing it that task two was finished. + */ void execute() { - // TODO: Figure out slow path execution execute_internal(); } }; -template +template class task : public base_task { - using continutation_type = continuation; - public: template explicit task(FARG &&function, continuation_node *continuation_node) : base_task{}, function_{std::forward(function)}, continuation_node_{continuation_node} {} void execute_internal() override { - auto *continuation = dynamic_cast(continuation_node_->continuation()); - continuation->store_result_2(function_()); - // TODO: Properly notify continuation on slow path + continuation_node_->get_continuation()->store_result(1, function_()); } private: diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 4b1d46d..6ab8c35 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -8,6 +8,7 @@ #include #include "pls/internal/scheduling/task.h" +#include "pls/internal/data_structures/bounded_ws_deque.h" #include "pls/internal/data_structures/stamped_integer.h" namespace pls { @@ -16,53 +17,25 @@ namespace scheduling { struct task_handle { public: - enum state { uninitialized, initialized, execute_local, stealing, execute_remote, finished }; - using stamped_state = data_structures::stamped_integer; - - std::atomic stamped_state_{uninitialized}; + task_handle() : task_{nullptr} {}; + explicit task_handle(base_task *task) : task_{task} {}; base_task *task_; }; /** * Handles management of tasks in the system. Each thread has a local task manager, * responsible for allocating, freeing and publishing tasks for stealing. - * - * The manager therefore acts as the deque found in work stealing, as well as the memory - * management for the tasks (as both are deeply intertwined in our implementation to - * integrate the memory management into the stealing procedure. */ class task_manager { public: // Publishes a task on the stack, i.e. makes it visible for other threads to steal. - // The task itself is located on the stack of the worker, as the stealer will copy it away before it is freed. - void publish_task(base_task &task) { - task_handle_stack_[tail_internal_].task_ = &task; - task_handle_stack_[tail_internal_].stamped_state_.store({stamp_internal_++, task_handle::initialized}, - std::memory_order_relaxed); - tail_internal_++; - tail_.store(tail_internal_, std::memory_order_release); // Linearization point, handle is published here + void publish_task(base_task *task) { + task_deque_.push_bottom(task_handle{task}); } // Try to pop a local task from this task managers stack. - // This should only be required on the fast path of the implementation, - // thus only returning if the operation was a success. - // Essentially this is an 'un-publish' of a task with a notion if it was successful. bool steal_local_task() { - tail_internal_--; - tail_.store(tail_internal_, std::memory_order_relaxed); - - task_handle::stamped_state swapped_state{task_handle::execute_local, stamp_internal_++}; - task_handle_stack_[tail_internal_].stamped_state_.exchange(swapped_state, std::memory_order_acq_rel); - - if (swapped_state.value == task_handle::execute_remote || - swapped_state.value == task_handle::finished) { - // Someone got the other task, return to 'non linear' execution path - // TODO: Properly handle slow path - return false; - } else { - // No one got the task so far, we are happy and continue our fast path - return true; - } + return task_deque_.pop_bottom(); } // Try to steal a task from a remote task_manager instance. The stolen task must be stored locally. @@ -70,27 +43,20 @@ class task_manager { // TODO: Re-implement after fast path is done // std::pair steal_remote_task(task_manager &other); - explicit task_manager(task_handle *task_handle_stack) : task_handle_stack_{task_handle_stack}, - head_{{0}}, - tail_{0}, - tail_internal_{0}, - stamp_internal_{0} {} + explicit task_manager(data_structures::bounded_ws_deque &task_deque) : task_deque_{task_deque} {} private: - task_handle *task_handle_stack_; - alignas(base::system_details::CACHE_LINE_SIZE) std::atomic head_; - alignas(base::system_details::CACHE_LINE_SIZE) std::atomic tail_; - alignas(base::system_details::CACHE_LINE_SIZE) unsigned int tail_internal_, stamp_internal_; + data_structures::bounded_ws_deque &task_deque_; }; template class static_task_manager { public: - static_task_manager() : static_task_handle_stack_{}, task_manager_{static_task_handle_stack_.data()} {}; + static_task_manager() : task_deque_{}, task_manager_{task_deque_.get_deque()} {}; task_manager &get_task_manager() { return task_manager_; } private: - std::array static_task_handle_stack_; + data_structures::static_bounded_ws_deque task_deque_; task_manager task_manager_; }; diff --git a/lib/pls/src/internal/base/alignment.cpp b/lib/pls/src/internal/base/alignment.cpp new file mode 100644 index 0000000..83802d1 --- /dev/null +++ b/lib/pls/src/internal/base/alignment.cpp @@ -0,0 +1,29 @@ +#include "pls/internal/base/alignment.h" + +namespace pls { +namespace internal { +namespace base { +namespace alignment { + +system_details::pointer_t next_alignment(system_details::pointer_t size, + size_t alignment) { + return (size % alignment) == 0 ? + size : + size + (alignment - (size % alignment)); +} + +system_details::pointer_t previous_alignment(system_details::pointer_t size, + size_t alignment) { + return (size % alignment) == 0 ? + size : + size - (size % alignment); +} + +char *next_alignment(char *pointer, size_t alignment) { + return reinterpret_cast(next_alignment(reinterpret_cast(pointer), alignment)); +} + +} +} +} +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 501e622..a648e7d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -2,7 +2,5 @@ add_executable(tests main.cpp data_structures_test.cpp base_tests.cpp - scheduling_tests.cpp - algorithm_test.cpp - dataflow_test.cpp) + scheduling_tests.cpp) target_link_libraries(tests catch2 pls) diff --git a/test/algorithm_test.cpp b/test/algorithm_test.cpp deleted file mode 100644 index beb0cfe..0000000 --- a/test/algorithm_test.cpp +++ /dev/null @@ -1,88 +0,0 @@ -#include -#include - -#include "pls/pls.h" - -using namespace pls; - -TEST_CASE("for_each functions correctly", "[algorithms/for_each.h]") { - malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([]() { - constexpr int SIZE = 1000; - std::array result_array{}; - result_array.fill(0); - - SECTION("integer ranges are processed exactly once") { - pls::for_each_range(0, SIZE, [&result_array](int i) { - result_array[i]++; - }); - - bool all_equal = true; - for (int i = 0; i < SIZE; i++) { - all_equal &= result_array[i] == 1; - } - REQUIRE (all_equal); - } - - SECTION("iterators are processed exactly once") { - std::array iterator_array{}; - for (int i = 0; i < SIZE; i++) { - iterator_array[i] = i; - } - - pls::for_each(iterator_array.begin(), iterator_array.end(), [&result_array](int i) { - result_array[i]++; - }); - - bool all_equal = true; - for (int i = 0; i < SIZE; i++) { - all_equal &= result_array[i] == 1; - } - REQUIRE (all_equal); - } - }); -} - -TEST_CASE("scan functions correctly", "[algorithms/scan.h]") { - malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([]() { - constexpr int SIZE = 10000; - std::array input_array{}, result_array{}; - input_array.fill(1); - - pls::scan(input_array.begin(), input_array.end(), result_array.begin(), std::plus(), 0); - - bool all_correct = true; - for (int i = 0; i < SIZE; i++) { - all_correct &= result_array[i] == (i + 1); - } - REQUIRE (all_correct); - }); -} - -long fib(long n) { - if (n <= 2) { - return 1; - } - - long a, b; - - pls::invoke( - [&a, n]() { a = fib(n - 1); }, - [&b, n]() { b = fib(n - 2); } - ); - - return a + b; -} - -TEST_CASE("invoke functions correctly", "[algorithms/invoke.h]") { - constexpr long fib_30 = 832040; - - malloc_scheduler_memory my_scheduler_memory{8, 2u << 14}; - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([=]() { - REQUIRE(fib(30) == fib_30); - }); -} diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index aa7f29c..6e12861 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -1,20 +1,40 @@ #include +#include #include "pls/internal/base/system_details.h" - #include "pls/internal/data_structures/aligned_stack.h" -#include - -using namespace pls::internal::scheduling::data_structures; +using namespace pls::internal::data_structures; using namespace pls::internal::base; using namespace std; +// Forward Declaration +void test_stack(aligned_stack &stack); + TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/aligned_stack.h]") { constexpr long data_size = 1024; - char data[data_size]; - aligned_stack stack{data, data_size}; + SECTION("plain aligned stack") { + char data[data_size]; + aligned_stack stack{data, data_size, data_size}; + + test_stack(stack); + } + + SECTION("static aligned stack") { + static_aligned_stack stack; + + test_stack(stack.get_stack()); + } + + SECTION("heap aligned stack") { + heap_aligned_stack stack{data_size}; + + test_stack(stack.get_stack()); + } +} + +void test_stack(aligned_stack &stack) { SECTION("stack correctly pushes sub linesize objects") { std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; std::array small_data_two{}; @@ -43,10 +63,11 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a SECTION("stack correctly stores and retrieves objects") { std::array data_one{'a', 'b', 'c', 'd', 'e'}; - stack.push(data_one); - auto retrieved_data = stack.pop>(); + auto *push_one = stack.push(data_one); + stack.pop>(); + auto *push_two = stack.push(data_one); - REQUIRE(retrieved_data == std::array{'a', 'b', 'c', 'd', 'e'}); + REQUIRE(push_one == push_two); } SECTION("stack can push and pop multiple times with correct alignment") { @@ -73,214 +94,4 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a } } -TEST_CASE("work_stealing_deque functions correctly", "[internal/data_structures/work_stealing_deque.h]") { - SECTION("add and remove items form the tail") { - constexpr long data_size = 2 << 14; - char data[data_size]; - aligned_stack stack{data, data_size}; - work_stealing_deque deque{&stack}; - - int one = 1, two = 2, three = 3, four = 4; - - SECTION("add and remove items form the tail") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - deque.push_task(three); - deque.publish_last_task(); - - REQUIRE(*deque.pop_local_task() == three); - REQUIRE(*deque.pop_local_task() == two); - REQUIRE(*deque.pop_local_task() == one); - } - - SECTION("handles getting empty by popping the tail correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == one); - - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - } - - SECTION("remove items form the head") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - deque.push_task(three); - deque.publish_last_task(); - - REQUIRE(*deque.pop_external_task() == one); - REQUIRE(*deque.pop_external_task() == two); - REQUIRE(*deque.pop_external_task() == three); - } - - SECTION("handles getting empty by popping the head correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == one); - - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == two); - } - - SECTION("handles getting empty by popping the head and tail correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == one); - - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == two); - - deque.push_task(three); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == three); - } - - SECTION("handles jumps bigger 1 correctly") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - - deque.push_task(three); - deque.publish_last_task(); - deque.push_task(four); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == one); - REQUIRE(*deque.pop_external_task() == three); - REQUIRE(*deque.pop_external_task() == four); - } - - SECTION("handles stack reset 1 correctly when emptied by tail") { - deque.push_task(one); - deque.publish_last_task(); - auto state = deque.save_offset(); - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - - deque.reset_offset(state); - REQUIRE(*deque.pop_local_task() == one); - - deque.push_task(three); - deque.publish_last_task(); - deque.push_task(four); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == three); - REQUIRE(*deque.pop_local_task() == four); - } - } -} - -TEST_CASE("locking_deque functions correctly", "[internal/data_structures/locking_deque.h]") { - SECTION("add and remove items form the tail") { - constexpr long data_size = 2 << 14; - char data[data_size]; - aligned_stack stack{data, data_size}; - locking_deque deque{&stack}; - - int one = 1, two = 2, three = 3, four = 4; - - SECTION("add and remove items form the tail") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - deque.push_task(three); - deque.publish_last_task(); - - REQUIRE(*deque.pop_local_task() == three); - REQUIRE(*deque.pop_local_task() == two); - REQUIRE(*deque.pop_local_task() == one); - } - - SECTION("handles getting empty by popping the tail correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == one); - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - } - - SECTION("remove items form the head") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - deque.push_task(three); - deque.publish_last_task(); - - REQUIRE(*deque.pop_external_task() == one); - REQUIRE(*deque.pop_external_task() == two); - REQUIRE(*deque.pop_external_task() == three); - } - - SECTION("handles getting empty by popping the head correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == one); - - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == two); - } - - SECTION("handles getting empty by popping the head and tail correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == one); - - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == two); - - deque.push_task(three); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == three); - } - - SECTION("handles jumps bigger 1 correctly") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - - deque.push_task(three); - deque.publish_last_task(); - deque.push_task(four); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == one); - REQUIRE(*deque.pop_external_task() == three); - REQUIRE(*deque.pop_external_task() == four); - } - - SECTION("handles stack reset 1 correctly when emptied by tail") { - deque.push_task(one); - deque.publish_last_task(); - auto state = deque.save_offset(); - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - - deque.reset_offset(state); - REQUIRE(*deque.pop_local_task() == one); - - deque.push_task(three); - deque.publish_last_task(); - deque.push_task(four); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == three); - REQUIRE(*deque.pop_local_task() == four); - } - } -} diff --git a/test/dataflow_test.cpp b/test/dataflow_test.cpp deleted file mode 100644 index 58e26e9..0000000 --- a/test/dataflow_test.cpp +++ /dev/null @@ -1,114 +0,0 @@ -#include -#include -#include - -#include "pls/pls.h" -#include "pls/dataflow/dataflow.h" - -using namespace pls; -using namespace pls::dataflow; - -void step_1(const int &in, int &out) { - out = in * 2; -} - -class member_call_test { - public: - void step_2(const int &in, int &out) { - out = in * 2; - } -}; - -TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") { - malloc_scheduler_memory my_scheduler_memory{8, 2u << 12u}; - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([]() { - SECTION("linear pipelines") { - auto step_1 = [](const int &in, double &out1, double &out2) { - out1 = (double) in / 2.0; - out2 = (double) in / 3.0; - }; - auto step_2 = [](const double &in1, const double &in2, double &out) { - out = in1 * in2; - }; - - graph, outputs> linear_graph; - function_node, outputs, decltype(step_1)> node_1{step_1}; - function_node, outputs, decltype(step_2)> node_2{step_2}; - - linear_graph >> node_1 >> node_2 >> linear_graph; - linear_graph.build(); - - std::tuple out{}; - linear_graph.run(5, &out); - linear_graph.wait_for_all(); - - REQUIRE(std::get<0>(out) == (5 / 2.0) * (5 / 3.0)); - } - - SECTION("member and function steps") { - member_call_test instance; - using member_func_type = member_function; - member_func_type func_1{&instance, &member_call_test::step_2}; - - graph, outputs> graph; - function_node, outputs, void (*)(const int &, int &)> node_1{&step_1}; - function_node, outputs, member_func_type> node_2{func_1}; - - graph >> node_1 >> node_2 >> graph; - graph.build(); - - std::tuple out{}; - graph.run(1, &out); - graph.wait_for_all(); - - REQUIRE(std::get<0>(out) == 4); - } - - SECTION("non linear pipeline") { - auto path_one = [](const int &in, int &out) { - out = in + 1; - }; - auto path_two = [](const int &in, int &out) { - out = in - 1; - }; - - graph, outputs> graph; - function_node, outputs, decltype(path_one)> node_1{path_one}; - function_node, outputs, decltype(path_two)> node_2{path_two}; - switch_node switch_node; - merge_node merge_node; - split_node split; - - // Split up boolean signal - graph.input<1>() >> split.value_in_port(); - - // Feed switch - graph.input<0>() >> switch_node.value_in_port(); - split.out_port_1() >> switch_node.condition_in_port(); - - // True path - switch_node.true_out_port() >> node_1.in_port<0>(); - node_1.out_port<0>() >> merge_node.true_in_port(); - // False path - switch_node.false_out_port() >> node_2.in_port<0>(); - node_2.out_port<0>() >> merge_node.false_in_port(); - - // Read Merge - split.out_port_2() >> merge_node.condition_in_port(); - merge_node.value_out_port() >> graph.output<0>(); - - - // Build and run - graph.build(); - std::tuple out1{}, out2{}; - graph.run({0, true}, &out1); - graph.run({0, false}, &out2); - graph.wait_for_all(); - - REQUIRE(std::get<0>(out1) == 1); - REQUIRE(std::get<0>(out2) == -1); - } - - }); -} diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index 2cce39a..349d63a 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -1,75 +1,3 @@ #include -#include "pls/pls.h" - -using namespace pls; - -class once_sub_task : public task { - std::atomic *counter_; - int children_; - - protected: - void execute_internal() override { - (*counter_)++; - for (int i = 0; i < children_; i++) { - spawn_child(counter_, children_ - 1); - } - } - - public: - explicit once_sub_task(std::atomic *counter, int children) : - task{}, - counter_{counter}, - children_{children} {} -}; - -class force_steal_sub_task : public task { - std::atomic *parent_counter_; - std::atomic *overall_counter_; - - protected: - void execute_internal() override { - (*overall_counter_)--; - if (overall_counter_->load() > 0) { - std::atomic counter{1}; - spawn_child(&counter, overall_counter_); - while (counter.load() > 0); // Spin... - } - - (*parent_counter_)--; - } - - public: - explicit force_steal_sub_task(std::atomic *parent_counter, std::atomic *overall_counter) : - task{}, - parent_counter_{parent_counter}, - overall_counter_{overall_counter} {} -}; - -TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") { - malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; - - SECTION("tasks are executed exactly once") { - scheduler my_scheduler{&my_scheduler_memory, 2}; - int start_counter = 4; - int total_tasks = 1 + 4 + 4 * 3 + 4 * 3 * 2 + 4 * 3 * 2 * 1; - std::atomic counter{0}; - - my_scheduler.perform_work([&]() { - scheduler::spawn_child(&counter, start_counter); - }); - - REQUIRE(counter.load() == total_tasks); - } - - SECTION("tasks can be stolen") { - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([&]() { - std::atomic dummy_parent{1}, overall_counter{8}; - scheduler::spawn_child(&dummy_parent, &overall_counter); - - // Required, as child operates on our stack's memory!!! - scheduler::wait_for_all(); - }); - } -} +// TODO: Introduce actual tests once multiple threads work... -- libgit2 0.26.0