Commit c2d4bc25 by FritzFlorian

WIP: Fast path with 'outlined' slow path code in place.

Everything so far is untested. We only made sure tha fast path still seems to function correctly. Next up is writing tests for both the fast and slow path to then introduce the slow path. After that we can look at performance optimizations.
parent 842b518f
Pipeline #1336 failed with stages
in 31 seconds
......@@ -53,7 +53,7 @@ int main() {
scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS};
auto start = std::chrono::steady_clock::now();
std::cout << "fib = " << fib_normal(41) << std::endl;
std::cout << "fib = " << fib_normal(39) << std::endl;
auto end = std::chrono::steady_clock::now();
std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< std::endl;
......@@ -62,11 +62,12 @@ int main() {
scheduler.perform_work([]() {
return scheduling::scheduler::par([]() {
return fib(41);
return fib(39);
}, []() {
return scheduling::parallel_result<int>{0};
}).then([](int a, int b) {
std::cout << "fib = " << a << std::endl;
std::cout << "fib = " << a + b << std::endl;
return a + b;
});
});
......
# List all required files here (cmake best practice to NOT automate this step!)
add_library(pls STATIC
# include/pls/pls.h src/pls.cpp
# include/pls/pls.h src/pls.cpp
#
# include/pls/algorithms/invoke.h
# include/pls/algorithms/invoke_impl.h
......@@ -39,7 +39,7 @@ add_library(pls STATIC
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/stamped_integer.h
include/pls/internal/data_structures/delayed_initialization_wrapper.h
include/pls/internal/data_structures/delayed_initialization.h
include/pls/internal/helpers/prohibit_new.h
include/pls/internal/helpers/profiler.h
......@@ -49,6 +49,7 @@ add_library(pls STATIC
include/pls/internal/helpers/seqence.h
include/pls/internal/helpers/member_function.h
include/pls/internal/scheduling/parallel_result.h
include/pls/internal/scheduling/thread_state.h
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h
......@@ -56,7 +57,8 @@ add_library(pls STATIC
include/pls/internal/scheduling/task_manager.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/cont_manager.h
include/pls/internal/scheduling/continuation.h include/pls/internal/scheduling/parallel_result.h src/internal/base/alignment.cpp)
include/pls/internal/scheduling/continuation.h
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h src/internal/base/alignment.cpp src/internal/base/alignment.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......
......@@ -13,12 +13,16 @@ namespace internal {
namespace base {
namespace alignment {
system_details::pointer_t next_alignment(system_details::pointer_t size);
system_details::pointer_t previous_alignment(system_details::pointer_t size);
char *next_alignment(char *pointer);
system_details::pointer_t next_alignment(system_details::pointer_t size,
size_t alignment = system_details::CACHE_LINE_SIZE);
system_details::pointer_t previous_alignment(system_details::pointer_t size,
size_t alignment = system_details::CACHE_LINE_SIZE);
char *next_alignment(char *pointer, size_t alignment = system_details::CACHE_LINE_SIZE);
/**
* Forces alignment requirements on a type equal to a cache line size.
* Forces alignment requirements on a type equal to the given size.
* This can be useful to store the elements aligned in an array or to allocate them using new.
*
* The object is constructed using perfect forwarding. Thus initialization looks identical to the
......@@ -28,21 +32,36 @@ char *next_alignment(char *pointer);
* (This is required, as C++11 does not allow 'over_aligned' types, meaning alignments higher
* than the max_align_t (most times 16 bytes) wont be respected properly.)
*/
template<typename T>
struct alignas(system_details::CACHE_LINE_SIZE) cache_alignment_wrapper {
template<typename T, size_t ALIGNMENT>
struct alignment_wrapper {
static_assert(alignof(T) <= ALIGNMENT, "Must not wrap an type to an alignment smaller than required by the type!");
private:
std::array<char, sizeof(T) + system_details::CACHE_LINE_SIZE> memory_;
char *data_;
std::array<char, sizeof(T) + ALIGNMENT> memory_;
T *data_;
public:
template<typename ...ARGS>
explicit cache_alignment_wrapper(ARGS &&...args): memory_{}, data_{next_alignment(memory_.data())} {
explicit alignment_wrapper(ARGS &&...args)
: memory_{}, data_{reinterpret_cast<T *>(next_alignment(memory_.data(), ALIGNMENT))} {
new(data_) T(std::forward<ARGS>(args)...);
}
~alignment_wrapper() {
data_->~T();
}
T &object() { return *reinterpret_cast<T *>(data_); }
T *pointer() { return reinterpret_cast<T *>(data_); }
T &object() { return *data_; }
T *pointer() { return data_; }
};
/**
* Our common case or over aligning types is when we want to hit cache lines.
* The most portable way is to use the above alignment wrapper, removing any alignment
* requirements from the wrapped type itself (here the cache line) and manually filling
* up padding/fill bytes as needed.
*/
template<typename T>
using cache_alignment_wrapper = alignment_wrapper<T, system_details::CACHE_LINE_SIZE>;
}
}
}
......
......@@ -68,8 +68,8 @@ class static_aligned_stack {
aligned_stack &get_stack() { return aligned_stack_; }
private:
aligned_stack aligned_stack_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<char, SIZE> memory_;
aligned_stack aligned_stack_;
};
class heap_aligned_stack {
......
......@@ -24,7 +24,7 @@ void aligned_stack::pop() {
current_offset_ -= num_cache_lines;
auto result = *reinterpret_cast<T *>(memory_at_offset(current_offset_));
~result();
result.~T();
}
template<size_t SIZE>
......
#ifndef PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_
#define PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_
#include <cstdio>
#include <array>
#include <atomic>
#include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/data_structures/optional.h"
namespace pls {
namespace internal {
namespace data_structures {
/**
* Classic, text book ws bounded deque based on arrays.
* Stores a fixed amount of fixed size objects in an array,
* allowing or local push/pop on the bottom and remote
* pop on the top.
*
* The local operations are cheap as long as head and tail are
* far enough apart, making it ideal to avoid cache problems.
*
* Depends on overaligned datatypes to be cache line friendly.
* This does not concern C++14 and upwards, but hinders you to properly
* allocate it on the heap in C++11 (see base::alignment::alignment_wrapper for a solution).
*/
// TODO: Relax memory orderings in here...
template<typename T>
class bounded_ws_deque {
public:
bounded_ws_deque(T *item_array, size_t size) : size_{size}, item_array_{item_array} {}
void push_bottom(T item) {
item_array_[bottom_] = item;
local_bottom_++;
bottom_.store(local_bottom_);
}
bool is_empty() {
return top_.load().value < bottom_;
}
optional<T> pop_top() {
stamped_integer old_top = top_.load();
unsigned int new_stamp = old_top.stamp + 1;
unsigned int new_value = old_top.value + 1;
if (bottom_.load() <= old_top.value) {
return optional<T>();
}
optional<T> result(item_array_[old_top.value]);
if (top_.compare_exchange_strong(old_top, {new_stamp, new_value})) {
return result;
}
return optional<T>();
}
optional<T> pop_bottom() {
if (local_bottom_ == 0) {
return optional<T>();
}
local_bottom_--;
bottom_.store(local_bottom_);
optional<T> result(item_array_[local_bottom_]);
stamped_integer old_top = top_.load();
if (local_bottom_ > old_top.value) {
// Enough distance to just return the value
return result;
}
if (local_bottom_ == old_top.value) {
local_bottom_ = 0;
bottom_.store(local_bottom_);
if (top_.compare_exchange_strong(old_top, {old_top.stamp + 1, 0})) {
// We won the competition and the queue is empty
return result;
}
}
// The queue is empty and we lost the competition
top_.store({old_top.stamp + 1, 0});
return optional<T>();
}
private:
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<stamped_integer> top_{stamped_integer{0, 0}};
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<unsigned int> bottom_{0};
unsigned int local_bottom_{0};
size_t size_;
T *item_array_;
};
template<typename T, size_t SIZE>
class static_bounded_ws_deque {
public:
static_bounded_ws_deque() : items_{}, deque_{items_.data(), SIZE} {}
bounded_ws_deque<T> &get_deque() { return deque_; }
private:
std::array<T, SIZE> items_;
bounded_ws_deque<T> deque_;
};
}
}
}
#endif //PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_
#ifndef PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_
#define PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_
#ifndef PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_H_
#define PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_H_
#include <array>
#include <utility>
......@@ -16,13 +16,13 @@ namespace data_structures {
* The member must be initialized before usage using the provided
* perfect forwarding constructor method.
*
* Makes sure to call the wrapped objects de-constructor when an object is wrapped.
* Takes care of the de-construction the contained object if one is active.
*/
template<typename T>
class delayed_initialization_wrapper {
class delayed_initialization {
public:
delayed_initialization_wrapper() : memory_{}, initialized_{false} {}
delayed_initialization_wrapper(delayed_initialization_wrapper &&other) noexcept {
delayed_initialization() : memory_{}, initialized_{false} {}
delayed_initialization(delayed_initialization &&other) noexcept {
initialized_ = other.initialized_;
if (other.initialized_) {
object() = std::move(other.object());
......@@ -31,11 +31,11 @@ class delayed_initialization_wrapper {
}
template<typename ...ARGS>
explicit delayed_initialization_wrapper(ARGS &&...args): memory_{}, initialized_{true} {
explicit delayed_initialization(ARGS &&...args): memory_{}, initialized_{true} {
new(memory_.data()) T(std::forward<ARGS>(args)...);
}
~delayed_initialization_wrapper() {
~delayed_initialization() {
if (initialized_) {
object().~T();
}
......@@ -45,7 +45,7 @@ class delayed_initialization_wrapper {
void initialize(ARGS &&...args) {
PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!")
new(memory_.data()) T(std::forward<ARGS>(args)...);
new((void *) memory_.data()) T(std::forward<ARGS>(args)...);
initialized_ = true;
}
......@@ -61,6 +61,10 @@ class delayed_initialization_wrapper {
return *reinterpret_cast<T *>(memory_.data());
}
T &operator*() {
return object();
}
bool initialized() const { return initialized_; }
private:
......@@ -72,4 +76,4 @@ class delayed_initialization_wrapper {
}
}
#endif // PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_
#endif // PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_H_
#ifndef PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_
#define PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_
#include <utility>
#include "pls/internal/data_structures/delayed_initialization.h"
namespace pls {
namespace internal {
namespace data_structures {
template<typename T>
class optional {
public:
optional() = default;
template<typename ...ARGS>
explicit optional(ARGS &&...args): data_{std::forward<ARGS>(args)...} {}
operator bool() const {
return data_.initialized();
}
T &operator*() {
return *data_;
}
private:
delayed_initialization<T> data_;
};
}
}
}
#endif //PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_
......@@ -20,7 +20,8 @@ class cont_manager {
struct template_args {};
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args<NUM_CONTS, MAX_CONT_SIZE>) {
explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args<NUM_CONTS, MAX_CONT_SIZE>)
: max_cont_size_{MAX_CONT_SIZE} {
// First node is currently active and our local start
start_node_ = active_node_ = init_cont_node<MAX_CONT_SIZE>(cont_storage, nullptr, nullptr);
......@@ -29,37 +30,166 @@ class cont_manager {
continuation_node *current_node = start_node_;
for (size_t i = 1; i < NUM_CONTS; i++) {
continuation_node *next_node = init_cont_node<MAX_CONT_SIZE>(cont_storage, start_node_, current_node);
current_node->set_next(next_node);
current_node->next_ = next_node;
current_node = next_node;
}
};
// Fast-path methods
continuation_node *fast_path_get_next() {
active_node_ = active_node_->get_next();
return active_node_;
auto result = active_node_;
active_node_ = active_node_->next_;
active_node_->cont_chain_start_ = start_node_;
return result;
}
void fast_path_return() {
active_node_ = active_node_->get_prev();
active_node_ = active_node_->prev_;
}
// Slow-path methods
void slow_path_return() {
active_node_ = active_node_->prev_;
active_node_->destroy_continuation();
}
bool falling_through() const {
return fall_through_;
}
void fall_through() {
fall_through_ = true;
fall_through_node_ = nullptr;
}
void fall_through_and_execute(continuation_node *continuation_node) {
fall_through_ = true;
fall_through_node_ = continuation_node;
}
void aquire_clean_cont_chain(continuation_node *clean_chain) {
// Link active node backwards and other chain forwards
active_node_->prev_ = clean_chain;
active_node_->cont_chain_start_ = clean_chain;
clean_chain->next_ = active_node_;
// Update our local chain start AND reset the active node to the start (we operate on a clean chain)
start_node_ = clean_chain->cont_chain_start_;
active_node_ = clean_chain;
}
void aquire_blocked_cont_chain(continuation_node *blocked_chain) {
// Link active node backwards and other chain forwards
active_node_->prev_ = blocked_chain;
active_node_->cont_chain_start_ = blocked_chain;
blocked_chain->next_ = active_node_;
// Update our local chain start, NOT our active node (we continue execution on top of the blocking chain)
start_node_ = blocked_chain->cont_chain_start_;
}
void execute_fall_through_continuation() {
PLS_ASSERT(fall_through_node_ != nullptr,
"Must not execute continuation node without associated continuation code.");
// Reset our state to from falling through
continuation_node *executed_node = fall_through_node_;
fall_through_ = false;
fall_through_node_ = nullptr;
// Grab everyone involved in the transaction
base_continuation *executed_continuation = executed_node->continuation_;
base_continuation *parent_continuation = executed_continuation->parent_;
continuation_node *parent_node = parent_continuation->node_;
// Execute the continuation itself
executed_continuation->execute();
// Slow path return destroys it and resets our current cont_node
slow_path_return();
if (falling_through()) {
return;
}
if (executed_node->is_end_of_cont_chain()) {
// We are at the end of our local part of the cont chain and therefore
// not invested in the continuation_chain above.
// Notify the next continuation of finishing a child...
if (parent_node->results_missing_.fetch_add(-1) == 1) {
// ... we finished the last continuation, thus we need to take responsibility of the above cont chain.
aquire_blocked_cont_chain(parent_node->cont_chain_start_);
fall_through_and_execute(parent_node);
return;
} else {
// ... we did not finish the last continuation, thus we are not concerned with this computational
// path anymore. Just ignore it and fall through to mind our own business.
fall_through();
return;
}
} else {
// We are invested in the continuation_chain above (its part of our local continuation node chain).
// We keep executing it if possible and need to 'clean our state' with a swapped chain if not.
// ...we could be blocked (and then need a clean cont chain).
// For the rare case that the stealing processes is still going on, we
// need to fight with it over directly executing the task that is currently being stolen.
continuation_node::stamped_state task_state{continuation_node::state::execute_local};
parent_node->state_.exchange(task_state, std::memory_order_acq_rel);
if (task_state.value == continuation_node::state::stolen) {
// The other task strain is executing.
// Notify the parent continuation of finishing our child...
if (parent_node->results_missing_.fetch_add(-1) == 1) {
// ... we finished the last continuation, thus we keep the responsibility for the above cont chain.
fall_through_and_execute(parent_node);
return;
} else {
// ... we did not finish the last continuation, thus we need to give up our local cont chain in
// favour of the stolen one (leaving us with a clean chain for further operation).
aquire_clean_cont_chain(parent_node->cont_chain_start_);
fall_through();
return;
}
} else {
// We 'overruled' the stealing thread and execute the other task ourselfs.
// We can be sure that only we where involved in executing the child tasks of the parent_continuation...
parent_continuation->node_->results_missing_.fetch_add(-1);
parent_continuation->execute_task();
if (falling_through()) {
// ... if the second strain was interrupted we fall through without scheduling the parent_continuation
// (the currently pending/interrupted strain will do this itself).
return;
}
// ...if we could execute the second branch without interruption,
// we can schedule the parent safely for execution.
fall_through_and_execute(parent_continuation->node_);
}
}
}
size_t get_max_cont_size() const { return max_cont_size_; }
private:
template<size_t MAX_CONT_SIZE>
static continuation_node *init_cont_node(data_structures::aligned_stack &cont_storage,
continuation_node *cont_chain_start,
continuation_node *prev) {
// Represents one cont node and its corresponding memory buffer (as one continuous block of memory).
// Represents one cont_node and its corresponding memory buffer (as one continuous block of memory).
constexpr size_t buffer_size = MAX_CONT_SIZE - sizeof(continuation_node);
using cont_node_memory_pair = std::pair<continuation_node, std::array<char, buffer_size>>;
char *pair_memory = cont_storage.push_bytes<cont_node_memory_pair>();
char *cont_node_address = pair_memory;
char *cont_node_memory_address = pair_memory + sizeof(continuation_node);
char *cont_node_address = cont_storage.push_bytes<continuation_node>();
char *cont_node_memory_address = cont_storage.push_bytes(buffer_size);
return new(cont_node_address) continuation_node(cont_node_memory_address, cont_chain_start, prev);
}
private:
const size_t max_cont_size_;
/**
* Managing the continuation chain.
*/
continuation_node *start_node_;
continuation_node *active_node_;
/**
* Managing falling through back to the scheduler.
*/
bool fall_through_{false};
continuation_node *fall_through_node_{nullptr};
};
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
......
......@@ -6,106 +6,197 @@
#include <atomic>
#include <utility>
#include "pls/internal/data_structures/delayed_initialization_wrapper.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/data_structures/delayed_initialization.h"
#include "pls/internal/base/alignment.h"
#include "parallel_result.h"
namespace pls {
namespace internal {
namespace scheduling {
class continuation_node;
class base_continuation {
friend class cont_manager;
protected:
// We plan to only init the members for a continuation on the slow path.
// If we can execute everything inline we simply skip it saving runtime overhead.
template<typename T>
using delayed_init = data_structures::delayed_initialization<T>;
public:
virtual void run() = 0;
virtual ~base_continuation() = 0;
explicit base_continuation(base_continuation *parent, continuation_node *node, unsigned int result_index = 0)
: parent_{parent},
node_{node},
result_index_{result_index} {}
virtual void execute() = 0;
virtual void execute_task() = 0;
virtual ~base_continuation() = default;
virtual void *get_result_pointer(unsigned short index) = 0;
template<typename T>
void store_result(unsigned short index, T &&result) {
using BASE_T = typename std::remove_cv<typename std::remove_reference<T>::type>::type;
reinterpret_cast<delayed_init<BASE_T> *>(get_result_pointer(index))->initialize(std::forward<T>(result));
}
base_continuation *get_parent() { return parent_; }
continuation_node *get_cont_node() { return node_; }
protected:
base_continuation *parent_;
continuation_node *node_;
unsigned int result_index_;
};
class continuation_node {
friend class cont_manager;
public:
continuation_node(char *continuation_memory, continuation_node *cont_chain_start, continuation_node *prev)
: continuation_memory_{continuation_memory},
continuation_{nullptr},
cont_chain_start_{cont_chain_start},
continuation_node(char *memory, continuation_node *cont_chain_start, continuation_node *prev)
: cont_chain_start_{cont_chain_start},
prev_{prev},
next_{nullptr},
offered_chain_{nullptr} {}
memory_{memory} {}
void set_cont_chain_start(continuation_node *cont_chain_start) { cont_chain_start_ = cont_chain_start; }
continuation_node *get_cont_chain_start() { return cont_chain_start_; }
void set_next(continuation_node *next) { next_ = next; }
continuation_node *get_next() { return next_; }
void set_prev(continuation_node *prev) { prev_ = prev; }
continuation_node *get_prev() { return prev_; }
// Management of the associated continuation
template<typename T, typename ...ARGS>
T *init_continuation(ARGS &&...args) {
PLS_ASSERT(continuation_ == nullptr, "Must only allocate one continuation at once per node.")
base_continuation *continuation() { return continuation_; }
auto *result = new(memory_) T(std::forward<ARGS>(args)...);
continuation_ = result;
return result;
}
void destroy_continuation() {
// Deconstruct Continuation
continuation_->~base_continuation();
continuation_ = nullptr;
// Reset Associated counters
results_missing_.store(2);
offered_chain_.store(nullptr);
auto old_state = state_.load();
state_.store({old_state.stamp + 1, initialized});
}
template<typename T>
void destroy_continuation_fast() {
(*reinterpret_cast<T *>(continuation_)).~T();
continuation_ = nullptr;
}
base_continuation *get_continuation() {
return continuation_;
}
continuation_node *get_prev() {
return prev_;
}
private:
// Pointer to memory region reserved for the companion continuation.
// Must be a buffer big enough to hold any continuation encountered in the program.
char *continuation_memory_;
base_continuation *continuation_;
bool is_end_of_cont_chain() {
return prev_ == continuation_->get_parent()->get_cont_node();
}
private:
// Linked list property of continuations (continuation chains as memory management).
// Each continuation knows its chain start to allow stealing a whole chain in O(1)
// without the need to traverse back to the chain start.
continuation_node *cont_chain_start_;
continuation_node *prev_, *next_;
continuation_node *prev_, *next_{nullptr};
// When blocked on this continuation, we need to know what other chain we
// got offered by the stealing thread.
// For this we need only the head of the other chain (as each continuation is a
// self describing entity for its chain up to the given node).
continuation_node *offered_chain_;
std::atomic<continuation_node *> offered_chain_{nullptr};
// Management for coordinating concurrent result writing and stealing.
// The result count decides atomically who gets to execute the continuation.
std::atomic<unsigned short> results_missing_{2};
// The flag is needed for an ongoing stealing request.
// Stealing threads need to offer their continuation chain before the
// 'fully' own the stolen task. As long as that is not done the continuation
// chain can abort the steal request in order to be not blocked without a
// new, clean continuation chain to work with.
enum state { initialized, execute_local, stealing, stolen };
using stamped_state = data_structures::stamped_integer;
std::atomic<stamped_state> state_{{initialized}};
// Pointer to memory region reserved for the companion continuation.
// Must be a buffer big enough to hold any continuation encountered in the program.
// This memory is managed explicitly by the continuation manager and runtime system
// (they need to make sure to always call de-constructors and never allocate two continuations).
char *memory_;
base_continuation *continuation_{nullptr};
};
template<typename R1, typename R2, typename F>
template<typename T2, typename R1, typename R2, typename F>
class continuation : public base_continuation {
public:
void run() override {
// TODO: integrate this better into the runtime system.
// E.g. handle passing the result to the parent continuation
function_.object()(result_1_.object(), result_2_.object());
private:
template<typename RES_TYPE>
struct result_runner {
// Strip off unwanted modifiers...
using BASE_RES_TYPE = typename std::remove_cv<typename std::remove_reference<RES_TYPE>::type>::type;
static void execute(continuation &cont) {
parallel_result<BASE_RES_TYPE> result{cont.function_((*cont.result_1_).value(), (*cont.result_2_).value())};
if (result.fast_path()) {
cont.parent_->store_result(cont.result_index_, std::move(result));
}
}
};
template<typename INNER_TYPE>
struct result_runner<parallel_result<INNER_TYPE>> {
static void execute(continuation &cont) {
auto result = cont.function_((*cont.result_1_).value(), (*cont.result_2_).value());
if (result.fast_path()) {
cont.parent_->store_result(cont.result_index_, std::move(result));
}
}
};
// Warning: De-constructor only called once the continuation is actually needed.
// This should be a non issue, as all memory in the continuation is uninitialized
// until the first use anyways to save runtime.
public:
template<typename FARG, typename ...T2ARGS>
explicit continuation(base_continuation *parent,
continuation_node *node,
unsigned int result_index,
FARG &&function,
T2ARGS...task_2_args):
base_continuation(parent, node, result_index),
function_{std::forward<FARG>(function)},
task_{std::forward<T2ARGS>(task_2_args)...} {}
~continuation() override = default;
template<typename R1ARG>
void store_result_1_(R1ARG &&result_1) {
static_assert(std::is_same<R1, typename std::decay<R1ARG>::type>::value,
"must only copy/move objects in, not construct them");
result_1_.initialize(std::forward<R1ARG>(result_1));
void execute() override {
using result_type = decltype(function_((*result_1_).value(), (*result_2_).value()));
result_runner<result_type>::execute(*this);
}
template<typename R2ARG>
void store_result_2(R2ARG &&result_1) {
static_assert(std::is_same<R2, typename std::decay<R2ARG>::type>::value,
"must only copy/move objects in, not construct them");
result_2_.initialize(std::forward<R2ARG>(result_1));
void execute_task() override {
task_.execute();
}
template<typename FARG>
void store_function(FARG &&function) {
static_assert(std::is_same<F, typename std::decay<FARG>::type>::value,
"must only copy/move objects in, not construct them");
function_.initialize(function);
void *get_result_pointer(unsigned short index) override {
switch (index) {
case 0:return &result_1_;
case 1:return &result_2_;
default: PLS_ERROR("Unexpected Result Index!")
}
}
private:
// We plan to only init the members for a continuation on the slow path.
// If we can execute everything inline we simply skip it saving runtime overhead.
template<typename T>
using delayed_init = data_structures::delayed_initialization_wrapper<T>;
T2 *get_task() {
return &task_;
}
// Also uninitialized at first, only take the atomic write on the slow path.
// The stealer will init it to 2 while stealing, the 'stolen' sync will then make sure
// everyone sees the value in correct order.
std::atomic<unsigned short> results_missing_{};
private:
// Initial data members. These slow down the fast path, try to init them lazy when possible.
F function_;
T2 task_;
// All fields/actual values stay uninitialized (save time on the fast path if we don not need them).
// Some fields/actual values stay uninitialized (save time on the fast path if we don not need them).
// More fields untouched on the fast path is good, but for ease of an implementation we only keep some for now.
delayed_init<R1> result_1_;
delayed_init<R2> result_2_;
delayed_init<F> function_;
};
}
......
#ifndef PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_
#define PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_
#include <utility>
#include "pls/internal/data_structures/delayed_initialization.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename T>
class parallel_result {
public:
using value_type = T;
parallel_result() = default;
parallel_result(parallel_result &&other) noexcept : val_{std::move(other.val_)} {}
parallel_result(const parallel_result &other) noexcept : val_{other.val_} {}
parallel_result(T val) : val_{std::move(val)} {}
bool fast_path() { return val_.initialized(); }
T &value() { return val_.object(); }
private:
data_structures::delayed_initialization<T> val_;
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_
......@@ -22,31 +22,69 @@ struct scheduler::starter {
explicit starter(F1ARG &&function_1, F2ARG &&function_2) : function_1_{std::forward<F1ARG>(function_1)},
function_2_{std::forward<F2ARG>(function_2)} {};
template<typename F>
auto then(F &&cont_function)
template<typename FCONT>
auto then(FCONT &&cont_function)
-> decltype(cont_function(std::declval<typename return_type_1::value_type>(),
std::declval<typename return_type_2::value_type>())) {
using continuation_type = continuation<return_type_2, return_type_2, F>;
using continuation_type = continuation<task<F2>, return_type_1, return_type_2, FCONT>;
using result_type = decltype(cont_function(std::declval<typename return_type_1::value_type>(),
std::declval<typename return_type_2::value_type>()));
auto &my_state = thread_state::get();
auto &cont_manager = my_state.get_cont_manager();
PLS_ASSERT(sizeof(continuation_type) <= cont_manager.get_max_cont_size(),
"Must stay within the size limit of the static memory configuration.");
// Select current continuation.
// For now directly copy both the continuation function and the second task.
auto *current_cont_node = cont_manager.fast_path_get_next();
// TODO: Fix null pointers at very first spawn...
// In the fast path case we are always on the left side of the tree
// and our prev cont chain node always also holds our parent continuation.
base_continuation *parent_cont =
current_cont_node->get_prev() == nullptr ? nullptr : current_cont_node->get_prev()->get_continuation();
unsigned short int result_index = 0;
auto *current_cont = current_cont_node->init_continuation<continuation_type>(parent_cont,
current_cont_node,
result_index,
cont_function,
function_2_,
current_cont_node);
// Publish the second task.
my_state.get_task_manager().publish_task(current_cont->get_task());
// Call first function on fast path
return_type_1 result_1 = function_1_();
if (cont_manager.falling_through()) {
return result_type{}; // Unwind stack...
}
// Select current continuation
auto *current_cont = my_state.get_cont_manager().fast_path_get_next();
// Try to call second function on fast path
if (my_state.get_task_manager().steal_local_task()) {
return_type_2 result_2 = function_2_();
if (cont_manager.falling_through()) {
return result_type{}; // Unwind stack...
}
// Move function_2 directly onto the task stack (should in theory be constructed on there).
// Consider only copying it later on, as this could allow a more efficient fast path with inlining.
task<F2, return_type_1, return_type_2, F> task_2{function_2_, current_cont};
my_state.get_task_manager().publish_task(task_2);
// We fully got all results, inline as good as possible.
// This is the common case, branch prediction should be rather good here.
// Call first function
auto result_1 = function_1_();
// Try to pop from stack
if (my_state.get_task_manager().steal_local_task()) {
// Just return the cont object unused and directly call the function.
current_cont_node->destroy_continuation_fast<continuation_type>();
my_state.get_cont_manager().fast_path_return();
auto result_2 = function_2_();
return cont_function(result_1.value(), result_2.value());
} else {
PLS_ERROR("Slow Path Not Implemented!!!")
auto cont_result = cont_function(result_1.value(), result_2.value());
if (cont_manager.falling_through()) {
return result_type{}; // Unwind stack...
}
return cont_result;
}
cont_manager.fall_through();
return result_type{};
};
};
......
......@@ -24,25 +24,24 @@ class base_task {
virtual void execute_internal() = 0;
public:
/**
* Executes the task and stores its result in the correct continuation.
* The caller must handle finishing the continuation/informing it that task two was finished.
*/
void execute() {
// TODO: Figure out slow path execution
execute_internal();
}
};
template<typename F, typename R1, typename R2, typename CF>
template<typename F>
class task : public base_task {
using continutation_type = continuation<R1, R2, CF>;
public:
template<typename FARG>
explicit task(FARG &&function, continuation_node *continuation_node)
: base_task{}, function_{std::forward<FARG>(function)}, continuation_node_{continuation_node} {}
void execute_internal() override {
auto *continuation = dynamic_cast<continutation_type *>(continuation_node_->continuation());
continuation->store_result_2(function_());
// TODO: Properly notify continuation on slow path
continuation_node_->get_continuation()->store_result<decltype(function_())>(1, function_());
}
private:
......
......@@ -8,6 +8,7 @@
#include <atomic>
#include "pls/internal/scheduling/task.h"
#include "pls/internal/data_structures/bounded_ws_deque.h"
#include "pls/internal/data_structures/stamped_integer.h"
namespace pls {
......@@ -16,53 +17,25 @@ namespace scheduling {
struct task_handle {
public:
enum state { uninitialized, initialized, execute_local, stealing, execute_remote, finished };
using stamped_state = data_structures::stamped_integer;
std::atomic<stamped_state> stamped_state_{uninitialized};
task_handle() : task_{nullptr} {};
explicit task_handle(base_task *task) : task_{task} {};
base_task *task_;
};
/**
* Handles management of tasks in the system. Each thread has a local task manager,
* responsible for allocating, freeing and publishing tasks for stealing.
*
* The manager therefore acts as the deque found in work stealing, as well as the memory
* management for the tasks (as both are deeply intertwined in our implementation to
* integrate the memory management into the stealing procedure.
*/
class task_manager {
public:
// Publishes a task on the stack, i.e. makes it visible for other threads to steal.
// The task itself is located on the stack of the worker, as the stealer will copy it away before it is freed.
void publish_task(base_task &task) {
task_handle_stack_[tail_internal_].task_ = &task;
task_handle_stack_[tail_internal_].stamped_state_.store({stamp_internal_++, task_handle::initialized},
std::memory_order_relaxed);
tail_internal_++;
tail_.store(tail_internal_, std::memory_order_release); // Linearization point, handle is published here
void publish_task(base_task *task) {
task_deque_.push_bottom(task_handle{task});
}
// Try to pop a local task from this task managers stack.
// This should only be required on the fast path of the implementation,
// thus only returning if the operation was a success.
// Essentially this is an 'un-publish' of a task with a notion if it was successful.
bool steal_local_task() {
tail_internal_--;
tail_.store(tail_internal_, std::memory_order_relaxed);
task_handle::stamped_state swapped_state{task_handle::execute_local, stamp_internal_++};
task_handle_stack_[tail_internal_].stamped_state_.exchange(swapped_state, std::memory_order_acq_rel);
if (swapped_state.value == task_handle::execute_remote ||
swapped_state.value == task_handle::finished) {
// Someone got the other task, return to 'non linear' execution path
// TODO: Properly handle slow path
return false;
} else {
// No one got the task so far, we are happy and continue our fast path
return true;
}
return task_deque_.pop_bottom();
}
// Try to steal a task from a remote task_manager instance. The stolen task must be stored locally.
......@@ -70,27 +43,20 @@ class task_manager {
// TODO: Re-implement after fast path is done
// std::pair<task, bool> steal_remote_task(task_manager &other);
explicit task_manager(task_handle *task_handle_stack) : task_handle_stack_{task_handle_stack},
head_{{0}},
tail_{0},
tail_internal_{0},
stamp_internal_{0} {}
explicit task_manager(data_structures::bounded_ws_deque<task_handle> &task_deque) : task_deque_{task_deque} {}
private:
task_handle *task_handle_stack_;
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<data_structures::stamped_integer> head_;
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<unsigned int> tail_;
alignas(base::system_details::CACHE_LINE_SIZE) unsigned int tail_internal_, stamp_internal_;
data_structures::bounded_ws_deque<task_handle> &task_deque_;
};
template<size_t NUM_TASKS, size_t MAX_STACK_SIZE>
class static_task_manager {
public:
static_task_manager() : static_task_handle_stack_{}, task_manager_{static_task_handle_stack_.data()} {};
static_task_manager() : task_deque_{}, task_manager_{task_deque_.get_deque()} {};
task_manager &get_task_manager() { return task_manager_; }
private:
std::array<task_handle, NUM_TASKS> static_task_handle_stack_;
data_structures::static_bounded_ws_deque<task_handle, NUM_TASKS> task_deque_;
task_manager task_manager_;
};
......
#include "pls/internal/base/alignment.h"
namespace pls {
namespace internal {
namespace base {
namespace alignment {
system_details::pointer_t next_alignment(system_details::pointer_t size,
size_t alignment) {
return (size % alignment) == 0 ?
size :
size + (alignment - (size % alignment));
}
system_details::pointer_t previous_alignment(system_details::pointer_t size,
size_t alignment) {
return (size % alignment) == 0 ?
size :
size - (size % alignment);
}
char *next_alignment(char *pointer, size_t alignment) {
return reinterpret_cast<char *>(next_alignment(reinterpret_cast<system_details::pointer_t >(pointer), alignment));
}
}
}
}
}
......@@ -2,7 +2,5 @@ add_executable(tests
main.cpp
data_structures_test.cpp
base_tests.cpp
scheduling_tests.cpp
algorithm_test.cpp
dataflow_test.cpp)
scheduling_tests.cpp)
target_link_libraries(tests catch2 pls)
#include <catch.hpp>
#include <array>
#include "pls/pls.h"
using namespace pls;
TEST_CASE("for_each functions correctly", "[algorithms/for_each.h]") {
malloc_scheduler_memory my_scheduler_memory{8, 2 << 12};
scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([]() {
constexpr int SIZE = 1000;
std::array<int, SIZE> result_array{};
result_array.fill(0);
SECTION("integer ranges are processed exactly once") {
pls::for_each_range(0, SIZE, [&result_array](int i) {
result_array[i]++;
});
bool all_equal = true;
for (int i = 0; i < SIZE; i++) {
all_equal &= result_array[i] == 1;
}
REQUIRE (all_equal);
}
SECTION("iterators are processed exactly once") {
std::array<int, SIZE> iterator_array{};
for (int i = 0; i < SIZE; i++) {
iterator_array[i] = i;
}
pls::for_each(iterator_array.begin(), iterator_array.end(), [&result_array](int i) {
result_array[i]++;
});
bool all_equal = true;
for (int i = 0; i < SIZE; i++) {
all_equal &= result_array[i] == 1;
}
REQUIRE (all_equal);
}
});
}
TEST_CASE("scan functions correctly", "[algorithms/scan.h]") {
malloc_scheduler_memory my_scheduler_memory{8, 2 << 12};
scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([]() {
constexpr int SIZE = 10000;
std::array<int, SIZE> input_array{}, result_array{};
input_array.fill(1);
pls::scan(input_array.begin(), input_array.end(), result_array.begin(), std::plus<int>(), 0);
bool all_correct = true;
for (int i = 0; i < SIZE; i++) {
all_correct &= result_array[i] == (i + 1);
}
REQUIRE (all_correct);
});
}
long fib(long n) {
if (n <= 2) {
return 1;
}
long a, b;
pls::invoke(
[&a, n]() { a = fib(n - 1); },
[&b, n]() { b = fib(n - 2); }
);
return a + b;
}
TEST_CASE("invoke functions correctly", "[algorithms/invoke.h]") {
constexpr long fib_30 = 832040;
malloc_scheduler_memory my_scheduler_memory{8, 2u << 14};
scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([=]() {
REQUIRE(fib(30) == fib_30);
});
}
#include <catch.hpp>
#include <mutex>
#include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include <mutex>
using namespace pls::internal::scheduling::data_structures;
using namespace pls::internal::data_structures;
using namespace pls::internal::base;
using namespace std;
// Forward Declaration
void test_stack(aligned_stack &stack);
TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/aligned_stack.h]") {
constexpr long data_size = 1024;
SECTION("plain aligned stack") {
char data[data_size];
aligned_stack stack{data, data_size};
aligned_stack stack{data, data_size, data_size};
test_stack(stack);
}
SECTION("static aligned stack") {
static_aligned_stack<data_size> stack;
test_stack(stack.get_stack());
}
SECTION("heap aligned stack") {
heap_aligned_stack stack{data_size};
test_stack(stack.get_stack());
}
}
void test_stack(aligned_stack &stack) {
SECTION("stack correctly pushes sub linesize objects") {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
......@@ -43,10 +63,11 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a
SECTION("stack correctly stores and retrieves objects") {
std::array<char, 5> data_one{'a', 'b', 'c', 'd', 'e'};
stack.push<decltype(data_one)>(data_one);
auto retrieved_data = stack.pop<std::array<char, 5>>();
auto *push_one = stack.push<decltype(data_one)>(data_one);
stack.pop<std::array<char, 5>>();
auto *push_two = stack.push<decltype(data_one)>(data_one);
REQUIRE(retrieved_data == std::array<char, 5>{'a', 'b', 'c', 'd', 'e'});
REQUIRE(push_one == push_two);
}
SECTION("stack can push and pop multiple times with correct alignment") {
......@@ -73,214 +94,4 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a
}
}
TEST_CASE("work_stealing_deque functions correctly", "[internal/data_structures/work_stealing_deque.h]") {
SECTION("add and remove items form the tail") {
constexpr long data_size = 2 << 14;
char data[data_size];
aligned_stack stack{data, data_size};
work_stealing_deque<int> deque{&stack};
int one = 1, two = 2, three = 3, four = 4;
SECTION("add and remove items form the tail") {
deque.push_task<int>(one);
deque.publish_last_task();
deque.push_task<int>(two);
deque.publish_last_task();
deque.push_task<int>(three);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == three);
REQUIRE(*deque.pop_local_task() == two);
REQUIRE(*deque.pop_local_task() == one);
}
SECTION("handles getting empty by popping the tail correctly") {
deque.push_task<int>(one);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == one);
deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == two);
}
SECTION("remove items form the head") {
deque.push_task<int>(one);
deque.publish_last_task();
deque.push_task<int>(two);
deque.publish_last_task();
deque.push_task<int>(three);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == one);
REQUIRE(*deque.pop_external_task() == two);
REQUIRE(*deque.pop_external_task() == three);
}
SECTION("handles getting empty by popping the head correctly") {
deque.push_task<int>(one);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == one);
deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == two);
}
SECTION("handles getting empty by popping the head and tail correctly") {
deque.push_task<int>(one);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == one);
deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == two);
deque.push_task<int>(three);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == three);
}
SECTION("handles jumps bigger 1 correctly") {
deque.push_task<int>(one);
deque.publish_last_task();
deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == two);
deque.push_task<int>(three);
deque.publish_last_task();
deque.push_task<int>(four);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == one);
REQUIRE(*deque.pop_external_task() == three);
REQUIRE(*deque.pop_external_task() == four);
}
SECTION("handles stack reset 1 correctly when emptied by tail") {
deque.push_task<int>(one);
deque.publish_last_task();
auto state = deque.save_offset();
deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == two);
deque.reset_offset(state);
REQUIRE(*deque.pop_local_task() == one);
deque.push_task<int>(three);
deque.publish_last_task();
deque.push_task<int>(four);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == three);
REQUIRE(*deque.pop_local_task() == four);
}
}
}
TEST_CASE("locking_deque functions correctly", "[internal/data_structures/locking_deque.h]") {
SECTION("add and remove items form the tail") {
constexpr long data_size = 2 << 14;
char data[data_size];
aligned_stack stack{data, data_size};
locking_deque<int> deque{&stack};
int one = 1, two = 2, three = 3, four = 4;
SECTION("add and remove items form the tail") {
deque.push_task<int>(one);
deque.publish_last_task();
deque.push_task<int>(two);
deque.publish_last_task();
deque.push_task<int>(three);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == three);
REQUIRE(*deque.pop_local_task() == two);
REQUIRE(*deque.pop_local_task() == one);
}
SECTION("handles getting empty by popping the tail correctly") {
deque.push_task<int>(one);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == one);
deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == two);
}
SECTION("remove items form the head") {
deque.push_task<int>(one);
deque.publish_last_task();
deque.push_task<int>(two);
deque.publish_last_task();
deque.push_task<int>(three);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == one);
REQUIRE(*deque.pop_external_task() == two);
REQUIRE(*deque.pop_external_task() == three);
}
SECTION("handles getting empty by popping the head correctly") {
deque.push_task<int>(one);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == one);
deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == two);
}
SECTION("handles getting empty by popping the head and tail correctly") {
deque.push_task<int>(one);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == one);
deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == two);
deque.push_task<int>(three);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == three);
}
SECTION("handles jumps bigger 1 correctly") {
deque.push_task<int>(one);
deque.publish_last_task();
deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == two);
deque.push_task<int>(three);
deque.publish_last_task();
deque.push_task<int>(four);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == one);
REQUIRE(*deque.pop_external_task() == three);
REQUIRE(*deque.pop_external_task() == four);
}
SECTION("handles stack reset 1 correctly when emptied by tail") {
deque.push_task<int>(one);
deque.publish_last_task();
auto state = deque.save_offset();
deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == two);
deque.reset_offset(state);
REQUIRE(*deque.pop_local_task() == one);
deque.push_task<int>(three);
deque.publish_last_task();
deque.push_task<int>(four);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == three);
REQUIRE(*deque.pop_local_task() == four);
}
}
}
#include <catch.hpp>
#include <array>
#include <tuple>
#include "pls/pls.h"
#include "pls/dataflow/dataflow.h"
using namespace pls;
using namespace pls::dataflow;
void step_1(const int &in, int &out) {
out = in * 2;
}
class member_call_test {
public:
void step_2(const int &in, int &out) {
out = in * 2;
}
};
TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") {
malloc_scheduler_memory my_scheduler_memory{8, 2u << 12u};
scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([]() {
SECTION("linear pipelines") {
auto step_1 = [](const int &in, double &out1, double &out2) {
out1 = (double) in / 2.0;
out2 = (double) in / 3.0;
};
auto step_2 = [](const double &in1, const double &in2, double &out) {
out = in1 * in2;
};
graph<inputs<int>, outputs<double>> linear_graph;
function_node<inputs<int>, outputs<double, double>, decltype(step_1)> node_1{step_1};
function_node<inputs<double, double>, outputs<double>, decltype(step_2)> node_2{step_2};
linear_graph >> node_1 >> node_2 >> linear_graph;
linear_graph.build();
std::tuple<double> out{};
linear_graph.run(5, &out);
linear_graph.wait_for_all();
REQUIRE(std::get<0>(out) == (5 / 2.0) * (5 / 3.0));
}
SECTION("member and function steps") {
member_call_test instance;
using member_func_type = member_function<member_call_test, void, const int &, int &>;
member_func_type func_1{&instance, &member_call_test::step_2};
graph<inputs<int>, outputs<int>> graph;
function_node<inputs<int>, outputs<int>, void (*)(const int &, int &)> node_1{&step_1};
function_node<inputs<int>, outputs<int>, member_func_type> node_2{func_1};
graph >> node_1 >> node_2 >> graph;
graph.build();
std::tuple<int> out{};
graph.run(1, &out);
graph.wait_for_all();
REQUIRE(std::get<0>(out) == 4);
}
SECTION("non linear pipeline") {
auto path_one = [](const int &in, int &out) {
out = in + 1;
};
auto path_two = [](const int &in, int &out) {
out = in - 1;
};
graph<inputs<int, bool>, outputs<int>> graph;
function_node<inputs<int>, outputs<int>, decltype(path_one)> node_1{path_one};
function_node<inputs<int>, outputs<int>, decltype(path_two)> node_2{path_two};
switch_node<int> switch_node;
merge_node<int> merge_node;
split_node<bool> split;
// Split up boolean signal
graph.input<1>() >> split.value_in_port();
// Feed switch
graph.input<0>() >> switch_node.value_in_port();
split.out_port_1() >> switch_node.condition_in_port();
// True path
switch_node.true_out_port() >> node_1.in_port<0>();
node_1.out_port<0>() >> merge_node.true_in_port();
// False path
switch_node.false_out_port() >> node_2.in_port<0>();
node_2.out_port<0>() >> merge_node.false_in_port();
// Read Merge
split.out_port_2() >> merge_node.condition_in_port();
merge_node.value_out_port() >> graph.output<0>();
// Build and run
graph.build();
std::tuple<int> out1{}, out2{};
graph.run({0, true}, &out1);
graph.run({0, false}, &out2);
graph.wait_for_all();
REQUIRE(std::get<0>(out1) == 1);
REQUIRE(std::get<0>(out2) == -1);
}
});
}
#include <catch.hpp>
#include "pls/pls.h"
using namespace pls;
class once_sub_task : public task {
std::atomic<int> *counter_;
int children_;
protected:
void execute_internal() override {
(*counter_)++;
for (int i = 0; i < children_; i++) {
spawn_child<once_sub_task>(counter_, children_ - 1);
}
}
public:
explicit once_sub_task(std::atomic<int> *counter, int children) :
task{},
counter_{counter},
children_{children} {}
};
class force_steal_sub_task : public task {
std::atomic<int> *parent_counter_;
std::atomic<int> *overall_counter_;
protected:
void execute_internal() override {
(*overall_counter_)--;
if (overall_counter_->load() > 0) {
std::atomic<int> counter{1};
spawn_child<force_steal_sub_task>(&counter, overall_counter_);
while (counter.load() > 0); // Spin...
}
(*parent_counter_)--;
}
public:
explicit force_steal_sub_task(std::atomic<int> *parent_counter, std::atomic<int> *overall_counter) :
task{},
parent_counter_{parent_counter},
overall_counter_{overall_counter} {}
};
TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") {
malloc_scheduler_memory my_scheduler_memory{8, 2 << 12};
SECTION("tasks are executed exactly once") {
scheduler my_scheduler{&my_scheduler_memory, 2};
int start_counter = 4;
int total_tasks = 1 + 4 + 4 * 3 + 4 * 3 * 2 + 4 * 3 * 2 * 1;
std::atomic<int> counter{0};
my_scheduler.perform_work([&]() {
scheduler::spawn_child<once_sub_task>(&counter, start_counter);
});
REQUIRE(counter.load() == total_tasks);
}
SECTION("tasks can be stolen") {
scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([&]() {
std::atomic<int> dummy_parent{1}, overall_counter{8};
scheduler::spawn_child<force_steal_sub_task>(&dummy_parent, &overall_counter);
// Required, as child operates on our stack's memory!!!
scheduler::wait_for_all();
});
}
}
// TODO: Introduce actual tests once multiple threads work...
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment