Commit 7e388ea8 by FritzFlorian

Add merge and split nodes.

This makes the programming model a full dataflow implementation, as it allows for branching and recursion.
parent 6d262a62
Pipeline #1286 passed with stages
in 3 minutes 55 seconds
......@@ -9,7 +9,9 @@
#include <pls/dataflow/internal/outputs.h>
#include <pls/dataflow/internal/function_node.h>
#include <pls/dataflow/internal/graph.h>
#include <pls/dataflow/internal/out_port.h>
#include <pls/dataflow/internal/switch_node.h>
#include <pls/dataflow/internal/split_node.h>
#include <pls/dataflow/internal/merge_node.h>
int main() {
using namespace pls::dataflow;
......@@ -28,26 +30,48 @@ int main() {
};
function_node<inputs<int>, outputs<int>, decltype(minus_one)> minus_one_node{minus_one};
auto is_positive = [](const int &i1, bool &o1) {
o1 = i1 > 0;
};
function_node<inputs<int>, outputs<bool>, decltype(is_positive)> is_positive_node{is_positive};
auto recursion = [&](const int &i1, const int &i2, int &o1) {
if (i1 > 0) {
std::tuple<int> out;
graph.run({i1, i2}, out);
pls::scheduler::wait_for_all();
o1 = std::get<0>(out);
} else {
o1 = i2;
}
std::tuple<int> out;
graph.run({i1, i2}, out);
pls::scheduler::wait_for_all();
o1 = std::get<0>(out);
};
function_node<inputs<int, int>, outputs<int>, decltype(recursion)> recursion_node{recursion};
split_node<int> minus_split;
split_node<bool> decision_split;
switch_node<int> recursion_split;
merge_node<int> recursion_merge;
// Connect
graph.input<0>() >> minus_one_node.in_port<0>();
minus_one_node.out_port<0>() >> recursion_node.in_port<0>();
// Inputs to first processing step
graph.input<0>() >> minus_one_node.in_port<0>();
graph.input<1>() >> triple_node.in_port<0>();
triple_node.out_port<0>() >> recursion_node.in_port<1>();
minus_one_node.out_port<0>() >> minus_split.value_in_port();
// Prepare decision...
minus_split.out_port_1() >> is_positive_node.in_port<0>();
is_positive_node.out_port<0>() >> decision_split.value_in_port();
triple_node.out_port<0>() >> recursion_split.value_in_port();
decision_split.out_port_1() >> recursion_split.condition_in_port();
// Connect true case (recursion)
minus_split.out_port_2() >> recursion_node.in_port<0>();
recursion_split.true_out_port() >> recursion_node.in_port<1>();
recursion_node.out_port<0>() >> recursion_merge.true_in_port();
// Connect false case (no recursion)
recursion_split.false_out_port() >> recursion_merge.false_in_port();
recursion_node.out_port<0>() >> graph.output<0>();
// Deliver final result (back from merge)
decision_split.out_port_2() >> recursion_merge.condition_in_port();
recursion_merge.value_out_port() >> graph.output<0>();
// Build
graph.build();
......
......@@ -18,6 +18,11 @@ add_library(pls STATIC
include/pls/dataflow/internal/function_node.h
include/pls/dataflow/internal/node.h
include/pls/dataflow/internal/graph.h
include/pls/dataflow/internal/build_state.h
include/pls/dataflow/internal/function_node_impl.h
include/pls/dataflow/internal/graph_impl.h
include/pls/dataflow/internal/switch_node.h
include/pls/dataflow/internal/merge_node.h
include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
......@@ -43,13 +48,15 @@ add_library(pls STATIC
include/pls/internal/helpers/mini_benchmark.h
include/pls/internal/helpers/unique_id.h
include/pls/internal/helpers/range.h
include/pls/internal/helpers/seqence.h
include/pls/internal/scheduling/thread_state.h
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/scheduling/lambda_task.h include/pls/internal/helpers/seqence.h include/pls/dataflow/internal/build_state.h include/pls/dataflow/internal/function_node_impl.h include/pls/dataflow/internal/graph_impl.h)
include/pls/internal/scheduling/lambda_task.h
include/pls/dataflow/internal/split_node.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
PUBLIC
......
......@@ -68,13 +68,12 @@ class multi_in_port {
};
template<typename CB, int N, typename I0, typename ...I>
class multi_in_port<CB, N, I0, I...> : public in_port<I0> {
private:
public:
// Helpers for managing recursive types
using my_type = multi_in_port<CB, N, I0, I...>;
using child_type = multi_in_port<CB, N + 1, I...>;
using value_type = I0;
public:
explicit multi_in_port(node *owning_node, CB *cb) : in_port<I0>{owning_node}, cb_{cb}, rec_{owning_node, cb} {};
void token_pushed(token<I0> token) override {
......@@ -103,13 +102,13 @@ class multi_in_port<CB, N, I0, I...> : public in_port<I0> {
using value_type_at = typename type_at<POS, I0, I...>::type;
// Helper struct required for recursive access to input's by index
template<int POS, class RES_T, class MY_T>
template<int POS, typename RES_T, typename MY_T>
struct get_at {
static RES_T &get(MY_T &self) {
return get_at<POS - 1, RES_T, typename MY_T::child_type>::get(self.rec_);
}
};
template<class RESULT, class CURRENT>
template<typename RESULT, typename CURRENT>
struct get_at<0, RESULT, CURRENT> {
static RESULT &get(CURRENT &self) {
return self;
......@@ -126,7 +125,6 @@ class multi_in_port<CB, N, I0, I...> : public in_port<I0> {
return this->is_connected() && rec_.fully_connected();
}
private:
child_type rec_;
CB *cb_;
};
......
#ifndef PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_
#define PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I>
class merge_node : public node {
// Our own type
using self_type = merge_node<I>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I, I, bool>;
using multi_out_port_type = multi_out_port<I>;
// Input-Output tuples
using input_tuple = std::tuple<token < I>, token<I>, token<bool>>;
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
std::atomic<unsigned int> inputs_missing_;
input_tuple input_buffer_;
};
// Encodings for current input state (needs to fit into an atomic)
static constexpr unsigned int IN_0_MISSING = 1u << 0u;
static constexpr unsigned int IN_1_MISSING = 1u << 1u;
static constexpr unsigned int COND_MISSING = 1u << 2u;
static constexpr unsigned int INITIAL_STATE = IN_0_MISSING + IN_1_MISSING + COND_MISSING;
public:
explicit merge_node() : in_port_{this, this}, out_port_{} {}
in_port<I> &true_in_port() {
return in_port_.template get<0>();
}
in_port<I> &false_in_port() {
return in_port_.template get<1>();
}
in_port<bool> &condition_in_port() {
return in_port_.template get<2>();
}
out_port<I> &value_out_port() {
return out_port_.template get<0>();
}
template<int POS, typename T>
void token_pushed(token<T> token) {
auto current_memory = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(current_memory->input_buffer_) = token;
unsigned int remaining_inputs;
if (POS == 0) {
remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_0_MISSING) - IN_0_MISSING;
} else if (POS == 1) {
remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_1_MISSING) - IN_1_MISSING;
} else {
remaining_inputs = (current_memory->inputs_missing_).fetch_sub(COND_MISSING) - COND_MISSING;
}
if ((remaining_inputs & COND_MISSING) == 0) {
if ((remaining_inputs & IN_0_MISSING) == 0) {
auto &data = std::get<0>(current_memory->input_buffer_);
value_out_port().push_token(data);
current_memory->inputs_missing_ += IN_0_MISSING + COND_MISSING;
} else if ((remaining_inputs & IN_1_MISSING) == 0) {
auto &data = std::get<1>(current_memory->input_buffer_);
value_out_port().push_token(data);
current_memory->inputs_missing_ += IN_1_MISSING + COND_MISSING;
}
}
}
int num_successors() const override {
return 1;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = INITIAL_STATE;
};
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_
#define PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I>
class split_node : public node {
// Our own type
using self_type = split_node<I>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I>;
using multi_out_port_type = multi_out_port<I, I>;
public:
explicit split_node() : in_port_{this, this}, out_port_{} {}
in_port<I> &value_in_port() {
return in_port_.template get<0>();
}
out_port<I> &out_port_1() {
return out_port_.template get<0>();
}
out_port<I> &out_port_2() {
return out_port_.template get<1>();
}
template<int POS, typename T>
void token_pushed(token<T> token) {
out_port_1().push_token(token);
out_port_2().push_token(token);
}
int num_successors() const override {
return 2;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return 0;
}
void init_instance_buffer(void *memory) const override {
// No need for memory, we simply forward
};
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_
#define PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I>
class switch_node : public node {
// Our own type
using self_type = switch_node<I>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I, bool>;
using multi_out_port_type = multi_out_port<I, I>;
// Input-Output tuples
using input_tuple = std::tuple<token < I>, token<bool>>;
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
std::atomic<int> inputs_missing_;
input_tuple input_buffer_;
};
public:
explicit switch_node() : in_port_{this, this}, out_port_{} {}
in_port<I> &value_in_port() {
return in_port_.template get<0>();
}
in_port<bool> &condition_in_port() {
return in_port_.template get<1>();
}
out_port<I> &true_out_port() {
return out_port_.template get<0>();
}
out_port<I> &false_out_port() {
return out_port_.template get<1>();
}
template<int POS, typename T>
void token_pushed(token<T> token) {
auto current_memory = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(current_memory->input_buffer_) = token;
auto remaining_inputs = --(current_memory->inputs_missing_);
if (remaining_inputs == 0) {
bool condition = std::get<1>(current_memory->input_buffer_).value();
auto &data = std::get<0>(current_memory->input_buffer_);
if (condition) {
true_out_port().push_token(data);
} else {
false_out_port().push_token(data);
}
current_memory->inputs_missing_ = 2;
}
}
int num_successors() const override {
return 2;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = 2;
};
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment