diff --git a/app/playground/main.cpp b/app/playground/main.cpp index eb815c7..a3417b3 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -53,7 +53,7 @@ int main() { scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS}; auto start = std::chrono::steady_clock::now(); - std::cout << "fib = " << fib_normal(41) << std::endl; + std::cout << "fib = " << fib_normal(39) << std::endl; auto end = std::chrono::steady_clock::now(); std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() << std::endl; @@ -62,11 +62,12 @@ int main() { scheduler.perform_work([]() { return scheduling::scheduler::par([]() { - return fib(41); + return fib(39); }, []() { return scheduling::parallel_result{0}; }).then([](int a, int b) { - std::cout << "fib = " << a << std::endl; + std::cout << "fib = " << a + b << std::endl; + return a + b; }); }); diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 7c6176b..12e7e97 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -1,6 +1,6 @@ # List all required files here (cmake best practice to NOT automate this step!) add_library(pls STATIC -# include/pls/pls.h src/pls.cpp + # include/pls/pls.h src/pls.cpp # # include/pls/algorithms/invoke.h # include/pls/algorithms/invoke_impl.h @@ -39,7 +39,7 @@ add_library(pls STATIC include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/stamped_integer.h - include/pls/internal/data_structures/delayed_initialization_wrapper.h + include/pls/internal/data_structures/delayed_initialization.h include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/profiler.h @@ -49,6 +49,7 @@ add_library(pls STATIC include/pls/internal/helpers/seqence.h include/pls/internal/helpers/member_function.h + include/pls/internal/scheduling/parallel_result.h include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h @@ -56,7 +57,8 @@ add_library(pls STATIC include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/cont_manager.h - include/pls/internal/scheduling/continuation.h include/pls/internal/scheduling/parallel_result.h src/internal/base/alignment.cpp) + include/pls/internal/scheduling/continuation.h + include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h src/internal/base/alignment.cpp src/internal/base/alignment.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index 26c7762..4296d40 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -13,12 +13,16 @@ namespace internal { namespace base { namespace alignment { -system_details::pointer_t next_alignment(system_details::pointer_t size); -system_details::pointer_t previous_alignment(system_details::pointer_t size); -char *next_alignment(char *pointer); +system_details::pointer_t next_alignment(system_details::pointer_t size, + size_t alignment = system_details::CACHE_LINE_SIZE); + +system_details::pointer_t previous_alignment(system_details::pointer_t size, + size_t alignment = system_details::CACHE_LINE_SIZE); + +char *next_alignment(char *pointer, size_t alignment = system_details::CACHE_LINE_SIZE); /** - * Forces alignment requirements on a type equal to a cache line size. + * Forces alignment requirements on a type equal to the given size. * This can be useful to store the elements aligned in an array or to allocate them using new. * * The object is constructed using perfect forwarding. Thus initialization looks identical to the @@ -28,21 +32,36 @@ char *next_alignment(char *pointer); * (This is required, as C++11 does not allow 'over_aligned' types, meaning alignments higher * than the max_align_t (most times 16 bytes) wont be respected properly.) */ -template -struct alignas(system_details::CACHE_LINE_SIZE) cache_alignment_wrapper { +template +struct alignment_wrapper { + static_assert(alignof(T) <= ALIGNMENT, "Must not wrap an type to an alignment smaller than required by the type!"); + private: - std::array memory_; - char *data_; + std::array memory_; + T *data_; public: template - explicit cache_alignment_wrapper(ARGS &&...args): memory_{}, data_{next_alignment(memory_.data())} { + explicit alignment_wrapper(ARGS &&...args) + : memory_{}, data_{reinterpret_cast(next_alignment(memory_.data(), ALIGNMENT))} { new(data_) T(std::forward(args)...); } + ~alignment_wrapper() { + data_->~T(); + } - T &object() { return *reinterpret_cast(data_); } - T *pointer() { return reinterpret_cast(data_); } + T &object() { return *data_; } + T *pointer() { return data_; } }; +/** + * Our common case or over aligning types is when we want to hit cache lines. + * The most portable way is to use the above alignment wrapper, removing any alignment + * requirements from the wrapped type itself (here the cache line) and manually filling + * up padding/fill bytes as needed. + */ +template +using cache_alignment_wrapper = alignment_wrapper; + } } } diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h index cbbadbc..d3ebe0e 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h @@ -68,8 +68,8 @@ class static_aligned_stack { aligned_stack &get_stack() { return aligned_stack_; } private: - aligned_stack aligned_stack_; alignas(base::system_details::CACHE_LINE_SIZE) std::array memory_; + aligned_stack aligned_stack_; }; class heap_aligned_stack { diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h index 41f6bf1..60fb0b8 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h @@ -24,7 +24,7 @@ void aligned_stack::pop() { current_offset_ -= num_cache_lines; auto result = *reinterpret_cast(memory_at_offset(current_offset_)); - ~result(); + result.~T(); } template diff --git a/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h new file mode 100644 index 0000000..ccc6bfa --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h @@ -0,0 +1,115 @@ + +#ifndef PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_ +#define PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_ + +#include +#include +#include + +#include "pls/internal/base/system_details.h" +#include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/data_structures/optional.h" + +namespace pls { +namespace internal { +namespace data_structures { + +/** + * Classic, text book ws bounded deque based on arrays. + * Stores a fixed amount of fixed size objects in an array, + * allowing or local push/pop on the bottom and remote + * pop on the top. + * + * The local operations are cheap as long as head and tail are + * far enough apart, making it ideal to avoid cache problems. + * + * Depends on overaligned datatypes to be cache line friendly. + * This does not concern C++14 and upwards, but hinders you to properly + * allocate it on the heap in C++11 (see base::alignment::alignment_wrapper for a solution). + */ +// TODO: Relax memory orderings in here... +template +class bounded_ws_deque { + public: + bounded_ws_deque(T *item_array, size_t size) : size_{size}, item_array_{item_array} {} + + void push_bottom(T item) { + item_array_[bottom_] = item; + local_bottom_++; + bottom_.store(local_bottom_); + } + + bool is_empty() { + return top_.load().value < bottom_; + } + + optional pop_top() { + stamped_integer old_top = top_.load(); + unsigned int new_stamp = old_top.stamp + 1; + unsigned int new_value = old_top.value + 1; + + if (bottom_.load() <= old_top.value) { + return optional(); + } + + optional result(item_array_[old_top.value]); + if (top_.compare_exchange_strong(old_top, {new_stamp, new_value})) { + return result; + } + + return optional(); + } + + optional pop_bottom() { + if (local_bottom_ == 0) { + return optional(); + } + + local_bottom_--; + bottom_.store(local_bottom_); + + optional result(item_array_[local_bottom_]); + + stamped_integer old_top = top_.load(); + if (local_bottom_ > old_top.value) { + // Enough distance to just return the value + return result; + } + if (local_bottom_ == old_top.value) { + local_bottom_ = 0; + bottom_.store(local_bottom_); + if (top_.compare_exchange_strong(old_top, {old_top.stamp + 1, 0})) { + // We won the competition and the queue is empty + return result; + } + } + + // The queue is empty and we lost the competition + top_.store({old_top.stamp + 1, 0}); + return optional(); + } + + private: + alignas(base::system_details::CACHE_LINE_SIZE) std::atomic top_{stamped_integer{0, 0}}; + alignas(base::system_details::CACHE_LINE_SIZE) std::atomic bottom_{0}; + unsigned int local_bottom_{0}; + size_t size_; + T *item_array_; +}; + +template +class static_bounded_ws_deque { + public: + static_bounded_ws_deque() : items_{}, deque_{items_.data(), SIZE} {} + + bounded_ws_deque &get_deque() { return deque_; } + private: + std::array items_; + bounded_ws_deque deque_; +}; + +} +} +} + +#endif //PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h b/lib/pls/include/pls/internal/data_structures/delayed_initialization.h similarity index 77% rename from lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h rename to lib/pls/include/pls/internal/data_structures/delayed_initialization.h index ac31ea0..610d98a 100644 --- a/lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h +++ b/lib/pls/include/pls/internal/data_structures/delayed_initialization.h @@ -1,6 +1,6 @@ -#ifndef PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_ -#define PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_ +#ifndef PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_H_ +#define PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_H_ #include #include @@ -16,13 +16,13 @@ namespace data_structures { * The member must be initialized before usage using the provided * perfect forwarding constructor method. * - * Makes sure to call the wrapped objects de-constructor when an object is wrapped. + * Takes care of the de-construction the contained object if one is active. */ template -class delayed_initialization_wrapper { +class delayed_initialization { public: - delayed_initialization_wrapper() : memory_{}, initialized_{false} {} - delayed_initialization_wrapper(delayed_initialization_wrapper &&other) noexcept { + delayed_initialization() : memory_{}, initialized_{false} {} + delayed_initialization(delayed_initialization &&other) noexcept { initialized_ = other.initialized_; if (other.initialized_) { object() = std::move(other.object()); @@ -31,11 +31,11 @@ class delayed_initialization_wrapper { } template - explicit delayed_initialization_wrapper(ARGS &&...args): memory_{}, initialized_{true} { + explicit delayed_initialization(ARGS &&...args): memory_{}, initialized_{true} { new(memory_.data()) T(std::forward(args)...); } - ~delayed_initialization_wrapper() { + ~delayed_initialization() { if (initialized_) { object().~T(); } @@ -45,7 +45,7 @@ class delayed_initialization_wrapper { void initialize(ARGS &&...args) { PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!") - new(memory_.data()) T(std::forward(args)...); + new((void *) memory_.data()) T(std::forward(args)...); initialized_ = true; } @@ -61,6 +61,10 @@ class delayed_initialization_wrapper { return *reinterpret_cast(memory_.data()); } + T &operator*() { + return object(); + } + bool initialized() const { return initialized_; } private: @@ -72,4 +76,4 @@ class delayed_initialization_wrapper { } } -#endif // PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_ +#endif // PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_H_ diff --git a/lib/pls/include/pls/internal/data_structures/optional.h b/lib/pls/include/pls/internal/data_structures/optional.h new file mode 100644 index 0000000..22f9eca --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/optional.h @@ -0,0 +1,37 @@ + +#ifndef PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_ +#define PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_ + +#include + +#include "pls/internal/data_structures/delayed_initialization.h" + +namespace pls { +namespace internal { +namespace data_structures { + +template +class optional { + public: + optional() = default; + + template + explicit optional(ARGS &&...args): data_{std::forward(args)...} {} + + operator bool() const { + return data_.initialized(); + } + + T &operator*() { + return *data_; + } + + private: + delayed_initialization data_; +}; + +} +} +} + +#endif //PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_ diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h index dbc787d..b2b9878 100644 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ b/lib/pls/include/pls/internal/scheduling/cont_manager.h @@ -20,7 +20,8 @@ class cont_manager { struct template_args {}; template - explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args) { + explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args) + : max_cont_size_{MAX_CONT_SIZE} { // First node is currently active and our local start start_node_ = active_node_ = init_cont_node(cont_storage, nullptr, nullptr); @@ -29,37 +30,166 @@ class cont_manager { continuation_node *current_node = start_node_; for (size_t i = 1; i < NUM_CONTS; i++) { continuation_node *next_node = init_cont_node(cont_storage, start_node_, current_node); - current_node->set_next(next_node); + current_node->next_ = next_node; current_node = next_node; } }; + // Fast-path methods continuation_node *fast_path_get_next() { - active_node_ = active_node_->get_next(); - return active_node_; + auto result = active_node_; + + active_node_ = active_node_->next_; + active_node_->cont_chain_start_ = start_node_; + + return result; } void fast_path_return() { - active_node_ = active_node_->get_prev(); + active_node_ = active_node_->prev_; + } + + // Slow-path methods + void slow_path_return() { + active_node_ = active_node_->prev_; + active_node_->destroy_continuation(); + } + bool falling_through() const { + return fall_through_; + } + void fall_through() { + fall_through_ = true; + fall_through_node_ = nullptr; } + void fall_through_and_execute(continuation_node *continuation_node) { + fall_through_ = true; + fall_through_node_ = continuation_node; + } + void aquire_clean_cont_chain(continuation_node *clean_chain) { + // Link active node backwards and other chain forwards + active_node_->prev_ = clean_chain; + active_node_->cont_chain_start_ = clean_chain; + clean_chain->next_ = active_node_; + // Update our local chain start AND reset the active node to the start (we operate on a clean chain) + start_node_ = clean_chain->cont_chain_start_; + active_node_ = clean_chain; + } + void aquire_blocked_cont_chain(continuation_node *blocked_chain) { + // Link active node backwards and other chain forwards + active_node_->prev_ = blocked_chain; + active_node_->cont_chain_start_ = blocked_chain; + blocked_chain->next_ = active_node_; + // Update our local chain start, NOT our active node (we continue execution on top of the blocking chain) + start_node_ = blocked_chain->cont_chain_start_; + } + + void execute_fall_through_continuation() { + PLS_ASSERT(fall_through_node_ != nullptr, + "Must not execute continuation node without associated continuation code."); + + // Reset our state to from falling through + continuation_node *executed_node = fall_through_node_; + fall_through_ = false; + fall_through_node_ = nullptr; + + // Grab everyone involved in the transaction + base_continuation *executed_continuation = executed_node->continuation_; + base_continuation *parent_continuation = executed_continuation->parent_; + continuation_node *parent_node = parent_continuation->node_; + + // Execute the continuation itself + executed_continuation->execute(); + // Slow path return destroys it and resets our current cont_node + slow_path_return(); + + if (falling_through()) { + return; + } + + if (executed_node->is_end_of_cont_chain()) { + // We are at the end of our local part of the cont chain and therefore + // not invested in the continuation_chain above. + + // Notify the next continuation of finishing a child... + if (parent_node->results_missing_.fetch_add(-1) == 1) { + // ... we finished the last continuation, thus we need to take responsibility of the above cont chain. + aquire_blocked_cont_chain(parent_node->cont_chain_start_); + fall_through_and_execute(parent_node); + return; + } else { + // ... we did not finish the last continuation, thus we are not concerned with this computational + // path anymore. Just ignore it and fall through to mind our own business. + fall_through(); + return; + } + } else { + // We are invested in the continuation_chain above (its part of our local continuation node chain). + // We keep executing it if possible and need to 'clean our state' with a swapped chain if not. + + // ...we could be blocked (and then need a clean cont chain). + // For the rare case that the stealing processes is still going on, we + // need to fight with it over directly executing the task that is currently being stolen. + continuation_node::stamped_state task_state{continuation_node::state::execute_local}; + parent_node->state_.exchange(task_state, std::memory_order_acq_rel); + if (task_state.value == continuation_node::state::stolen) { + // The other task strain is executing. + // Notify the parent continuation of finishing our child... + if (parent_node->results_missing_.fetch_add(-1) == 1) { + // ... we finished the last continuation, thus we keep the responsibility for the above cont chain. + fall_through_and_execute(parent_node); + return; + } else { + // ... we did not finish the last continuation, thus we need to give up our local cont chain in + // favour of the stolen one (leaving us with a clean chain for further operation). + aquire_clean_cont_chain(parent_node->cont_chain_start_); + fall_through(); + return; + } + } else { + // We 'overruled' the stealing thread and execute the other task ourselfs. + // We can be sure that only we where involved in executing the child tasks of the parent_continuation... + parent_continuation->node_->results_missing_.fetch_add(-1); + parent_continuation->execute_task(); + if (falling_through()) { + // ... if the second strain was interrupted we fall through without scheduling the parent_continuation + // (the currently pending/interrupted strain will do this itself). + return; + } + // ...if we could execute the second branch without interruption, + // we can schedule the parent safely for execution. + fall_through_and_execute(parent_continuation->node_); + } + } + } + + size_t get_max_cont_size() const { return max_cont_size_; } private: template static continuation_node *init_cont_node(data_structures::aligned_stack &cont_storage, continuation_node *cont_chain_start, continuation_node *prev) { - // Represents one cont node and its corresponding memory buffer (as one continuous block of memory). + // Represents one cont_node and its corresponding memory buffer (as one continuous block of memory). constexpr size_t buffer_size = MAX_CONT_SIZE - sizeof(continuation_node); - using cont_node_memory_pair = std::pair>; - char *pair_memory = cont_storage.push_bytes(); - char *cont_node_address = pair_memory; - char *cont_node_memory_address = pair_memory + sizeof(continuation_node); + char *cont_node_address = cont_storage.push_bytes(); + char *cont_node_memory_address = cont_storage.push_bytes(buffer_size); return new(cont_node_address) continuation_node(cont_node_memory_address, cont_chain_start, prev); } private: + const size_t max_cont_size_; + + /** + * Managing the continuation chain. + */ continuation_node *start_node_; continuation_node *active_node_; + + /** + * Managing falling through back to the scheduler. + */ + bool fall_through_{false}; + continuation_node *fall_through_node_{nullptr}; }; template diff --git a/lib/pls/include/pls/internal/scheduling/continuation.h b/lib/pls/include/pls/internal/scheduling/continuation.h index 6a2ef5f..f5a9919 100644 --- a/lib/pls/include/pls/internal/scheduling/continuation.h +++ b/lib/pls/include/pls/internal/scheduling/continuation.h @@ -6,106 +6,197 @@ #include #include -#include "pls/internal/data_structures/delayed_initialization_wrapper.h" +#include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/data_structures/delayed_initialization.h" +#include "pls/internal/base/alignment.h" +#include "parallel_result.h" namespace pls { namespace internal { namespace scheduling { +class continuation_node; + class base_continuation { + friend class cont_manager; + protected: + // We plan to only init the members for a continuation on the slow path. + // If we can execute everything inline we simply skip it saving runtime overhead. + template + using delayed_init = data_structures::delayed_initialization; + public: - virtual void run() = 0; - virtual ~base_continuation() = 0; + explicit base_continuation(base_continuation *parent, continuation_node *node, unsigned int result_index = 0) + : parent_{parent}, + node_{node}, + result_index_{result_index} {} + + virtual void execute() = 0; + virtual void execute_task() = 0; + virtual ~base_continuation() = default; + + virtual void *get_result_pointer(unsigned short index) = 0; + template + void store_result(unsigned short index, T &&result) { + using BASE_T = typename std::remove_cv::type>::type; + reinterpret_cast *>(get_result_pointer(index))->initialize(std::forward(result)); + } + + base_continuation *get_parent() { return parent_; } + continuation_node *get_cont_node() { return node_; } + + protected: + base_continuation *parent_; + continuation_node *node_; + unsigned int result_index_; }; class continuation_node { + friend class cont_manager; + public: - continuation_node(char *continuation_memory, continuation_node *cont_chain_start, continuation_node *prev) - : continuation_memory_{continuation_memory}, - continuation_{nullptr}, - cont_chain_start_{cont_chain_start}, + continuation_node(char *memory, continuation_node *cont_chain_start, continuation_node *prev) + : cont_chain_start_{cont_chain_start}, prev_{prev}, - next_{nullptr}, - offered_chain_{nullptr} {} + memory_{memory} {} - void set_cont_chain_start(continuation_node *cont_chain_start) { cont_chain_start_ = cont_chain_start; } - continuation_node *get_cont_chain_start() { return cont_chain_start_; } - void set_next(continuation_node *next) { next_ = next; } - continuation_node *get_next() { return next_; } - void set_prev(continuation_node *prev) { prev_ = prev; } - continuation_node *get_prev() { return prev_; } + // Management of the associated continuation + template + T *init_continuation(ARGS &&...args) { + PLS_ASSERT(continuation_ == nullptr, "Must only allocate one continuation at once per node.") - base_continuation *continuation() { return continuation_; } + auto *result = new(memory_) T(std::forward(args)...); + continuation_ = result; + return result; + } + void destroy_continuation() { + // Deconstruct Continuation + continuation_->~base_continuation(); + continuation_ = nullptr; + + // Reset Associated counters + results_missing_.store(2); + offered_chain_.store(nullptr); + auto old_state = state_.load(); + state_.store({old_state.stamp + 1, initialized}); + } + template + void destroy_continuation_fast() { + (*reinterpret_cast(continuation_)).~T(); + continuation_ = nullptr; + } + base_continuation *get_continuation() { + return continuation_; + } + continuation_node *get_prev() { + return prev_; + } - private: - // Pointer to memory region reserved for the companion continuation. - // Must be a buffer big enough to hold any continuation encountered in the program. - char *continuation_memory_; - base_continuation *continuation_; + bool is_end_of_cont_chain() { + return prev_ == continuation_->get_parent()->get_cont_node(); + } + private: // Linked list property of continuations (continuation chains as memory management). // Each continuation knows its chain start to allow stealing a whole chain in O(1) // without the need to traverse back to the chain start. continuation_node *cont_chain_start_; - continuation_node *prev_, *next_; + continuation_node *prev_, *next_{nullptr}; // When blocked on this continuation, we need to know what other chain we // got offered by the stealing thread. // For this we need only the head of the other chain (as each continuation is a // self describing entity for its chain up to the given node). - continuation_node *offered_chain_; + std::atomic offered_chain_{nullptr}; + + // Management for coordinating concurrent result writing and stealing. + // The result count decides atomically who gets to execute the continuation. + std::atomic results_missing_{2}; + + // The flag is needed for an ongoing stealing request. + // Stealing threads need to offer their continuation chain before the + // 'fully' own the stolen task. As long as that is not done the continuation + // chain can abort the steal request in order to be not blocked without a + // new, clean continuation chain to work with. + enum state { initialized, execute_local, stealing, stolen }; + using stamped_state = data_structures::stamped_integer; + std::atomic state_{{initialized}}; + + // Pointer to memory region reserved for the companion continuation. + // Must be a buffer big enough to hold any continuation encountered in the program. + // This memory is managed explicitly by the continuation manager and runtime system + // (they need to make sure to always call de-constructors and never allocate two continuations). + char *memory_; + base_continuation *continuation_{nullptr}; }; -template +template class continuation : public base_continuation { - public: - void run() override { - // TODO: integrate this better into the runtime system. - // E.g. handle passing the result to the parent continuation - function_.object()(result_1_.object(), result_2_.object()); - } + private: + template + struct result_runner { + // Strip off unwanted modifiers... + using BASE_RES_TYPE = typename std::remove_cv::type>::type; + + static void execute(continuation &cont) { + parallel_result result{cont.function_((*cont.result_1_).value(), (*cont.result_2_).value())}; + if (result.fast_path()) { + cont.parent_->store_result(cont.result_index_, std::move(result)); + } + } + }; + template + struct result_runner> { + static void execute(continuation &cont) { + auto result = cont.function_((*cont.result_1_).value(), (*cont.result_2_).value()); + if (result.fast_path()) { + cont.parent_->store_result(cont.result_index_, std::move(result)); + } + } + }; - // Warning: De-constructor only called once the continuation is actually needed. - // This should be a non issue, as all memory in the continuation is uninitialized - // until the first use anyways to save runtime. + public: + template + explicit continuation(base_continuation *parent, + continuation_node *node, + unsigned int result_index, + FARG &&function, + T2ARGS...task_2_args): + base_continuation(parent, node, result_index), + function_{std::forward(function)}, + task_{std::forward(task_2_args)...} {} ~continuation() override = default; - template - void store_result_1_(R1ARG &&result_1) { - static_assert(std::is_same::type>::value, - "must only copy/move objects in, not construct them"); - result_1_.initialize(std::forward(result_1)); + void execute() override { + using result_type = decltype(function_((*result_1_).value(), (*result_2_).value())); + result_runner::execute(*this); } - template - void store_result_2(R2ARG &&result_1) { - static_assert(std::is_same::type>::value, - "must only copy/move objects in, not construct them"); - result_2_.initialize(std::forward(result_1)); + void execute_task() override { + task_.execute(); } - template - void store_function(FARG &&function) { - static_assert(std::is_same::type>::value, - "must only copy/move objects in, not construct them"); - function_.initialize(function); + void *get_result_pointer(unsigned short index) override { + switch (index) { + case 0:return &result_1_; + case 1:return &result_2_; + default: PLS_ERROR("Unexpected Result Index!") + } } - private: - // We plan to only init the members for a continuation on the slow path. - // If we can execute everything inline we simply skip it saving runtime overhead. - template - using delayed_init = data_structures::delayed_initialization_wrapper; + T2 *get_task() { + return &task_; + } - // Also uninitialized at first, only take the atomic write on the slow path. - // The stealer will init it to 2 while stealing, the 'stolen' sync will then make sure - // everyone sees the value in correct order. - std::atomic results_missing_{}; + private: + // Initial data members. These slow down the fast path, try to init them lazy when possible. + F function_; + T2 task_; - // All fields/actual values stay uninitialized (save time on the fast path if we don not need them). + // Some fields/actual values stay uninitialized (save time on the fast path if we don not need them). + // More fields untouched on the fast path is good, but for ease of an implementation we only keep some for now. delayed_init result_1_; delayed_init result_2_; - delayed_init function_; }; } diff --git a/lib/pls/include/pls/internal/scheduling/parallel_result.h b/lib/pls/include/pls/internal/scheduling/parallel_result.h new file mode 100644 index 0000000..f1ef320 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/parallel_result.h @@ -0,0 +1,34 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_ +#define PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_ + +#include +#include "pls/internal/data_structures/delayed_initialization.h" + +namespace pls { +namespace internal { +namespace scheduling { + +template +class parallel_result { + public: + using value_type = T; + + parallel_result() = default; + parallel_result(parallel_result &&other) noexcept : val_{std::move(other.val_)} {} + parallel_result(const parallel_result &other) noexcept : val_{other.val_} {} + + parallel_result(T val) : val_{std::move(val)} {} + + bool fast_path() { return val_.initialized(); } + T &value() { return val_.object(); } + + private: + data_structures::delayed_initialization val_; +}; + +} +} +} + +#endif //PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_ diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 664adc0..54ad872 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -22,31 +22,69 @@ struct scheduler::starter { explicit starter(F1ARG &&function_1, F2ARG &&function_2) : function_1_{std::forward(function_1)}, function_2_{std::forward(function_2)} {}; - template - auto then(F &&cont_function) + template + auto then(FCONT &&cont_function) -> decltype(cont_function(std::declval(), std::declval())) { - using continuation_type = continuation; + using continuation_type = continuation, return_type_1, return_type_2, FCONT>; + using result_type = decltype(cont_function(std::declval(), + std::declval())); + auto &my_state = thread_state::get(); + auto &cont_manager = my_state.get_cont_manager(); + + PLS_ASSERT(sizeof(continuation_type) <= cont_manager.get_max_cont_size(), + "Must stay within the size limit of the static memory configuration."); + + // Select current continuation. + // For now directly copy both the continuation function and the second task. + auto *current_cont_node = cont_manager.fast_path_get_next(); + + // TODO: Fix null pointers at very first spawn... + // In the fast path case we are always on the left side of the tree + // and our prev cont chain node always also holds our parent continuation. + base_continuation *parent_cont = + current_cont_node->get_prev() == nullptr ? nullptr : current_cont_node->get_prev()->get_continuation(); + unsigned short int result_index = 0; + auto *current_cont = current_cont_node->init_continuation(parent_cont, + current_cont_node, + result_index, + cont_function, + function_2_, + current_cont_node); + + // Publish the second task. + my_state.get_task_manager().publish_task(current_cont->get_task()); + + // Call first function on fast path + return_type_1 result_1 = function_1_(); + if (cont_manager.falling_through()) { + return result_type{}; // Unwind stack... + } - // Select current continuation - auto *current_cont = my_state.get_cont_manager().fast_path_get_next(); + // Try to call second function on fast path + if (my_state.get_task_manager().steal_local_task()) { + return_type_2 result_2 = function_2_(); + if (cont_manager.falling_through()) { + return result_type{}; // Unwind stack... + } - // Move function_2 directly onto the task stack (should in theory be constructed on there). - // Consider only copying it later on, as this could allow a more efficient fast path with inlining. - task task_2{function_2_, current_cont}; - my_state.get_task_manager().publish_task(task_2); + // We fully got all results, inline as good as possible. + // This is the common case, branch prediction should be rather good here. - // Call first function - auto result_1 = function_1_(); - // Try to pop from stack - if (my_state.get_task_manager().steal_local_task()) { + // Just return the cont object unused and directly call the function. + current_cont_node->destroy_continuation_fast(); my_state.get_cont_manager().fast_path_return(); - auto result_2 = function_2_(); - return cont_function(result_1.value(), result_2.value()); - } else { - PLS_ERROR("Slow Path Not Implemented!!!") + + auto cont_result = cont_function(result_1.value(), result_2.value()); + if (cont_manager.falling_through()) { + return result_type{}; // Unwind stack... + } + return cont_result; } + + cont_manager.fall_through(); + return result_type{}; }; }; diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 45abb03..ac1b998 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -24,25 +24,24 @@ class base_task { virtual void execute_internal() = 0; public: + /** + * Executes the task and stores its result in the correct continuation. + * The caller must handle finishing the continuation/informing it that task two was finished. + */ void execute() { - // TODO: Figure out slow path execution execute_internal(); } }; -template +template class task : public base_task { - using continutation_type = continuation; - public: template explicit task(FARG &&function, continuation_node *continuation_node) : base_task{}, function_{std::forward(function)}, continuation_node_{continuation_node} {} void execute_internal() override { - auto *continuation = dynamic_cast(continuation_node_->continuation()); - continuation->store_result_2(function_()); - // TODO: Properly notify continuation on slow path + continuation_node_->get_continuation()->store_result(1, function_()); } private: diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 4b1d46d..6ab8c35 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -8,6 +8,7 @@ #include #include "pls/internal/scheduling/task.h" +#include "pls/internal/data_structures/bounded_ws_deque.h" #include "pls/internal/data_structures/stamped_integer.h" namespace pls { @@ -16,53 +17,25 @@ namespace scheduling { struct task_handle { public: - enum state { uninitialized, initialized, execute_local, stealing, execute_remote, finished }; - using stamped_state = data_structures::stamped_integer; - - std::atomic stamped_state_{uninitialized}; + task_handle() : task_{nullptr} {}; + explicit task_handle(base_task *task) : task_{task} {}; base_task *task_; }; /** * Handles management of tasks in the system. Each thread has a local task manager, * responsible for allocating, freeing and publishing tasks for stealing. - * - * The manager therefore acts as the deque found in work stealing, as well as the memory - * management for the tasks (as both are deeply intertwined in our implementation to - * integrate the memory management into the stealing procedure. */ class task_manager { public: // Publishes a task on the stack, i.e. makes it visible for other threads to steal. - // The task itself is located on the stack of the worker, as the stealer will copy it away before it is freed. - void publish_task(base_task &task) { - task_handle_stack_[tail_internal_].task_ = &task; - task_handle_stack_[tail_internal_].stamped_state_.store({stamp_internal_++, task_handle::initialized}, - std::memory_order_relaxed); - tail_internal_++; - tail_.store(tail_internal_, std::memory_order_release); // Linearization point, handle is published here + void publish_task(base_task *task) { + task_deque_.push_bottom(task_handle{task}); } // Try to pop a local task from this task managers stack. - // This should only be required on the fast path of the implementation, - // thus only returning if the operation was a success. - // Essentially this is an 'un-publish' of a task with a notion if it was successful. bool steal_local_task() { - tail_internal_--; - tail_.store(tail_internal_, std::memory_order_relaxed); - - task_handle::stamped_state swapped_state{task_handle::execute_local, stamp_internal_++}; - task_handle_stack_[tail_internal_].stamped_state_.exchange(swapped_state, std::memory_order_acq_rel); - - if (swapped_state.value == task_handle::execute_remote || - swapped_state.value == task_handle::finished) { - // Someone got the other task, return to 'non linear' execution path - // TODO: Properly handle slow path - return false; - } else { - // No one got the task so far, we are happy and continue our fast path - return true; - } + return task_deque_.pop_bottom(); } // Try to steal a task from a remote task_manager instance. The stolen task must be stored locally. @@ -70,27 +43,20 @@ class task_manager { // TODO: Re-implement after fast path is done // std::pair steal_remote_task(task_manager &other); - explicit task_manager(task_handle *task_handle_stack) : task_handle_stack_{task_handle_stack}, - head_{{0}}, - tail_{0}, - tail_internal_{0}, - stamp_internal_{0} {} + explicit task_manager(data_structures::bounded_ws_deque &task_deque) : task_deque_{task_deque} {} private: - task_handle *task_handle_stack_; - alignas(base::system_details::CACHE_LINE_SIZE) std::atomic head_; - alignas(base::system_details::CACHE_LINE_SIZE) std::atomic tail_; - alignas(base::system_details::CACHE_LINE_SIZE) unsigned int tail_internal_, stamp_internal_; + data_structures::bounded_ws_deque &task_deque_; }; template class static_task_manager { public: - static_task_manager() : static_task_handle_stack_{}, task_manager_{static_task_handle_stack_.data()} {}; + static_task_manager() : task_deque_{}, task_manager_{task_deque_.get_deque()} {}; task_manager &get_task_manager() { return task_manager_; } private: - std::array static_task_handle_stack_; + data_structures::static_bounded_ws_deque task_deque_; task_manager task_manager_; }; diff --git a/lib/pls/src/internal/base/alignment.cpp b/lib/pls/src/internal/base/alignment.cpp new file mode 100644 index 0000000..83802d1 --- /dev/null +++ b/lib/pls/src/internal/base/alignment.cpp @@ -0,0 +1,29 @@ +#include "pls/internal/base/alignment.h" + +namespace pls { +namespace internal { +namespace base { +namespace alignment { + +system_details::pointer_t next_alignment(system_details::pointer_t size, + size_t alignment) { + return (size % alignment) == 0 ? + size : + size + (alignment - (size % alignment)); +} + +system_details::pointer_t previous_alignment(system_details::pointer_t size, + size_t alignment) { + return (size % alignment) == 0 ? + size : + size - (size % alignment); +} + +char *next_alignment(char *pointer, size_t alignment) { + return reinterpret_cast(next_alignment(reinterpret_cast(pointer), alignment)); +} + +} +} +} +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 501e622..a648e7d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -2,7 +2,5 @@ add_executable(tests main.cpp data_structures_test.cpp base_tests.cpp - scheduling_tests.cpp - algorithm_test.cpp - dataflow_test.cpp) + scheduling_tests.cpp) target_link_libraries(tests catch2 pls) diff --git a/test/algorithm_test.cpp b/test/algorithm_test.cpp deleted file mode 100644 index beb0cfe..0000000 --- a/test/algorithm_test.cpp +++ /dev/null @@ -1,88 +0,0 @@ -#include -#include - -#include "pls/pls.h" - -using namespace pls; - -TEST_CASE("for_each functions correctly", "[algorithms/for_each.h]") { - malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([]() { - constexpr int SIZE = 1000; - std::array result_array{}; - result_array.fill(0); - - SECTION("integer ranges are processed exactly once") { - pls::for_each_range(0, SIZE, [&result_array](int i) { - result_array[i]++; - }); - - bool all_equal = true; - for (int i = 0; i < SIZE; i++) { - all_equal &= result_array[i] == 1; - } - REQUIRE (all_equal); - } - - SECTION("iterators are processed exactly once") { - std::array iterator_array{}; - for (int i = 0; i < SIZE; i++) { - iterator_array[i] = i; - } - - pls::for_each(iterator_array.begin(), iterator_array.end(), [&result_array](int i) { - result_array[i]++; - }); - - bool all_equal = true; - for (int i = 0; i < SIZE; i++) { - all_equal &= result_array[i] == 1; - } - REQUIRE (all_equal); - } - }); -} - -TEST_CASE("scan functions correctly", "[algorithms/scan.h]") { - malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([]() { - constexpr int SIZE = 10000; - std::array input_array{}, result_array{}; - input_array.fill(1); - - pls::scan(input_array.begin(), input_array.end(), result_array.begin(), std::plus(), 0); - - bool all_correct = true; - for (int i = 0; i < SIZE; i++) { - all_correct &= result_array[i] == (i + 1); - } - REQUIRE (all_correct); - }); -} - -long fib(long n) { - if (n <= 2) { - return 1; - } - - long a, b; - - pls::invoke( - [&a, n]() { a = fib(n - 1); }, - [&b, n]() { b = fib(n - 2); } - ); - - return a + b; -} - -TEST_CASE("invoke functions correctly", "[algorithms/invoke.h]") { - constexpr long fib_30 = 832040; - - malloc_scheduler_memory my_scheduler_memory{8, 2u << 14}; - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([=]() { - REQUIRE(fib(30) == fib_30); - }); -} diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index aa7f29c..6e12861 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -1,20 +1,40 @@ #include +#include #include "pls/internal/base/system_details.h" - #include "pls/internal/data_structures/aligned_stack.h" -#include - -using namespace pls::internal::scheduling::data_structures; +using namespace pls::internal::data_structures; using namespace pls::internal::base; using namespace std; +// Forward Declaration +void test_stack(aligned_stack &stack); + TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/aligned_stack.h]") { constexpr long data_size = 1024; - char data[data_size]; - aligned_stack stack{data, data_size}; + SECTION("plain aligned stack") { + char data[data_size]; + aligned_stack stack{data, data_size, data_size}; + + test_stack(stack); + } + + SECTION("static aligned stack") { + static_aligned_stack stack; + + test_stack(stack.get_stack()); + } + + SECTION("heap aligned stack") { + heap_aligned_stack stack{data_size}; + + test_stack(stack.get_stack()); + } +} + +void test_stack(aligned_stack &stack) { SECTION("stack correctly pushes sub linesize objects") { std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; std::array small_data_two{}; @@ -43,10 +63,11 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a SECTION("stack correctly stores and retrieves objects") { std::array data_one{'a', 'b', 'c', 'd', 'e'}; - stack.push(data_one); - auto retrieved_data = stack.pop>(); + auto *push_one = stack.push(data_one); + stack.pop>(); + auto *push_two = stack.push(data_one); - REQUIRE(retrieved_data == std::array{'a', 'b', 'c', 'd', 'e'}); + REQUIRE(push_one == push_two); } SECTION("stack can push and pop multiple times with correct alignment") { @@ -73,214 +94,4 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a } } -TEST_CASE("work_stealing_deque functions correctly", "[internal/data_structures/work_stealing_deque.h]") { - SECTION("add and remove items form the tail") { - constexpr long data_size = 2 << 14; - char data[data_size]; - aligned_stack stack{data, data_size}; - work_stealing_deque deque{&stack}; - - int one = 1, two = 2, three = 3, four = 4; - - SECTION("add and remove items form the tail") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - deque.push_task(three); - deque.publish_last_task(); - - REQUIRE(*deque.pop_local_task() == three); - REQUIRE(*deque.pop_local_task() == two); - REQUIRE(*deque.pop_local_task() == one); - } - - SECTION("handles getting empty by popping the tail correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == one); - - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - } - - SECTION("remove items form the head") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - deque.push_task(three); - deque.publish_last_task(); - - REQUIRE(*deque.pop_external_task() == one); - REQUIRE(*deque.pop_external_task() == two); - REQUIRE(*deque.pop_external_task() == three); - } - - SECTION("handles getting empty by popping the head correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == one); - - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == two); - } - - SECTION("handles getting empty by popping the head and tail correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == one); - - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == two); - - deque.push_task(three); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == three); - } - - SECTION("handles jumps bigger 1 correctly") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - - deque.push_task(three); - deque.publish_last_task(); - deque.push_task(four); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == one); - REQUIRE(*deque.pop_external_task() == three); - REQUIRE(*deque.pop_external_task() == four); - } - - SECTION("handles stack reset 1 correctly when emptied by tail") { - deque.push_task(one); - deque.publish_last_task(); - auto state = deque.save_offset(); - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - - deque.reset_offset(state); - REQUIRE(*deque.pop_local_task() == one); - - deque.push_task(three); - deque.publish_last_task(); - deque.push_task(four); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == three); - REQUIRE(*deque.pop_local_task() == four); - } - } -} - -TEST_CASE("locking_deque functions correctly", "[internal/data_structures/locking_deque.h]") { - SECTION("add and remove items form the tail") { - constexpr long data_size = 2 << 14; - char data[data_size]; - aligned_stack stack{data, data_size}; - locking_deque deque{&stack}; - - int one = 1, two = 2, three = 3, four = 4; - - SECTION("add and remove items form the tail") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - deque.push_task(three); - deque.publish_last_task(); - - REQUIRE(*deque.pop_local_task() == three); - REQUIRE(*deque.pop_local_task() == two); - REQUIRE(*deque.pop_local_task() == one); - } - - SECTION("handles getting empty by popping the tail correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == one); - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - } - - SECTION("remove items form the head") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - deque.push_task(three); - deque.publish_last_task(); - - REQUIRE(*deque.pop_external_task() == one); - REQUIRE(*deque.pop_external_task() == two); - REQUIRE(*deque.pop_external_task() == three); - } - - SECTION("handles getting empty by popping the head correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == one); - - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == two); - } - - SECTION("handles getting empty by popping the head and tail correctly") { - deque.push_task(one); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == one); - - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == two); - - deque.push_task(three); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == three); - } - - SECTION("handles jumps bigger 1 correctly") { - deque.push_task(one); - deque.publish_last_task(); - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - - deque.push_task(three); - deque.publish_last_task(); - deque.push_task(four); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == one); - REQUIRE(*deque.pop_external_task() == three); - REQUIRE(*deque.pop_external_task() == four); - } - - SECTION("handles stack reset 1 correctly when emptied by tail") { - deque.push_task(one); - deque.publish_last_task(); - auto state = deque.save_offset(); - deque.push_task(two); - deque.publish_last_task(); - REQUIRE(*deque.pop_local_task() == two); - - deque.reset_offset(state); - REQUIRE(*deque.pop_local_task() == one); - - deque.push_task(three); - deque.publish_last_task(); - deque.push_task(four); - deque.publish_last_task(); - REQUIRE(*deque.pop_external_task() == three); - REQUIRE(*deque.pop_local_task() == four); - } - } -} diff --git a/test/dataflow_test.cpp b/test/dataflow_test.cpp deleted file mode 100644 index 58e26e9..0000000 --- a/test/dataflow_test.cpp +++ /dev/null @@ -1,114 +0,0 @@ -#include -#include -#include - -#include "pls/pls.h" -#include "pls/dataflow/dataflow.h" - -using namespace pls; -using namespace pls::dataflow; - -void step_1(const int &in, int &out) { - out = in * 2; -} - -class member_call_test { - public: - void step_2(const int &in, int &out) { - out = in * 2; - } -}; - -TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") { - malloc_scheduler_memory my_scheduler_memory{8, 2u << 12u}; - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([]() { - SECTION("linear pipelines") { - auto step_1 = [](const int &in, double &out1, double &out2) { - out1 = (double) in / 2.0; - out2 = (double) in / 3.0; - }; - auto step_2 = [](const double &in1, const double &in2, double &out) { - out = in1 * in2; - }; - - graph, outputs> linear_graph; - function_node, outputs, decltype(step_1)> node_1{step_1}; - function_node, outputs, decltype(step_2)> node_2{step_2}; - - linear_graph >> node_1 >> node_2 >> linear_graph; - linear_graph.build(); - - std::tuple out{}; - linear_graph.run(5, &out); - linear_graph.wait_for_all(); - - REQUIRE(std::get<0>(out) == (5 / 2.0) * (5 / 3.0)); - } - - SECTION("member and function steps") { - member_call_test instance; - using member_func_type = member_function; - member_func_type func_1{&instance, &member_call_test::step_2}; - - graph, outputs> graph; - function_node, outputs, void (*)(const int &, int &)> node_1{&step_1}; - function_node, outputs, member_func_type> node_2{func_1}; - - graph >> node_1 >> node_2 >> graph; - graph.build(); - - std::tuple out{}; - graph.run(1, &out); - graph.wait_for_all(); - - REQUIRE(std::get<0>(out) == 4); - } - - SECTION("non linear pipeline") { - auto path_one = [](const int &in, int &out) { - out = in + 1; - }; - auto path_two = [](const int &in, int &out) { - out = in - 1; - }; - - graph, outputs> graph; - function_node, outputs, decltype(path_one)> node_1{path_one}; - function_node, outputs, decltype(path_two)> node_2{path_two}; - switch_node switch_node; - merge_node merge_node; - split_node split; - - // Split up boolean signal - graph.input<1>() >> split.value_in_port(); - - // Feed switch - graph.input<0>() >> switch_node.value_in_port(); - split.out_port_1() >> switch_node.condition_in_port(); - - // True path - switch_node.true_out_port() >> node_1.in_port<0>(); - node_1.out_port<0>() >> merge_node.true_in_port(); - // False path - switch_node.false_out_port() >> node_2.in_port<0>(); - node_2.out_port<0>() >> merge_node.false_in_port(); - - // Read Merge - split.out_port_2() >> merge_node.condition_in_port(); - merge_node.value_out_port() >> graph.output<0>(); - - - // Build and run - graph.build(); - std::tuple out1{}, out2{}; - graph.run({0, true}, &out1); - graph.run({0, false}, &out2); - graph.wait_for_all(); - - REQUIRE(std::get<0>(out1) == 1); - REQUIRE(std::get<0>(out2) == -1); - } - - }); -} diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index 2cce39a..349d63a 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -1,75 +1,3 @@ #include -#include "pls/pls.h" - -using namespace pls; - -class once_sub_task : public task { - std::atomic *counter_; - int children_; - - protected: - void execute_internal() override { - (*counter_)++; - for (int i = 0; i < children_; i++) { - spawn_child(counter_, children_ - 1); - } - } - - public: - explicit once_sub_task(std::atomic *counter, int children) : - task{}, - counter_{counter}, - children_{children} {} -}; - -class force_steal_sub_task : public task { - std::atomic *parent_counter_; - std::atomic *overall_counter_; - - protected: - void execute_internal() override { - (*overall_counter_)--; - if (overall_counter_->load() > 0) { - std::atomic counter{1}; - spawn_child(&counter, overall_counter_); - while (counter.load() > 0); // Spin... - } - - (*parent_counter_)--; - } - - public: - explicit force_steal_sub_task(std::atomic *parent_counter, std::atomic *overall_counter) : - task{}, - parent_counter_{parent_counter}, - overall_counter_{overall_counter} {} -}; - -TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") { - malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; - - SECTION("tasks are executed exactly once") { - scheduler my_scheduler{&my_scheduler_memory, 2}; - int start_counter = 4; - int total_tasks = 1 + 4 + 4 * 3 + 4 * 3 * 2 + 4 * 3 * 2 * 1; - std::atomic counter{0}; - - my_scheduler.perform_work([&]() { - scheduler::spawn_child(&counter, start_counter); - }); - - REQUIRE(counter.load() == total_tasks); - } - - SECTION("tasks can be stolen") { - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([&]() { - std::atomic dummy_parent{1}, overall_counter{8}; - scheduler::spawn_child(&dummy_parent, &overall_counter); - - // Required, as child operates on our stack's memory!!! - scheduler::wait_for_all(); - }); - } -} +// TODO: Introduce actual tests once multiple threads work...