Commit fa4283eb by FritzFlorian

Add convenience API for linear pipelines.

parent ebf6fec7
......@@ -28,7 +28,8 @@ int main() {
auto minus_one = [](const int &i1, int &o1) {
o1 = i1 - 1;
function_node<inputs<int>, outputs<int>, decltype(minus_one)> minus_one_node{minus_one};
function_node<inputs<int>, outputs<int>, decltype(minus_one)> minus_one_node_1{minus_one};
function_node<inputs<int>, outputs<int>, decltype(minus_one)> minus_one_node_2{minus_one};
auto is_positive = [](const int &i1, bool &o1) {
o1 = i1 > 0;
......@@ -49,11 +50,12 @@ int main() {
merge_node<int> recursion_merge;
// Connect
minus_one_node_1 >> minus_one_node_2;
// Inputs to first processing step
graph.input<0>() >> minus_one_node.in_port<0>();
graph.input<0>() >> minus_one_node_1.in_port<0>();
graph.input<1>() >> triple_node.in_port<0>();
minus_one_node.out_port<0>() >> minus_split.value_in_port();
minus_one_node_2.out_port<0>() >> minus_split.value_in_port();
// Prepare decision...
minus_split.out_port_1() >> is_positive_node.in_port<0>();
......@@ -22,6 +22,10 @@ namespace internal {
using namespace pls::internal::helpers;
// Forward Decl
template<typename INS, typename OUTS>
class graph;
template<typename INS, typename OUTS, typename F>
class function_node {};
......@@ -60,11 +64,26 @@ class function_node<inputs<I0, I...>, outputs<O0, O...>, F> : public node {
return in_port_.template get<POS>();
multi_in_port_type &in_ports() {
return in_port_;
template<int POS>
out_port_at<POS> &out_port() {
return out_port_.template get<POS>();
multi_out_port_type &out_ports() {
return out_port_;
template<typename ...OS, typename FUNC>
function_node<inputs<O0, O...>, outputs<OS...>, FUNC>
&operator>>(function_node<inputs<O0, O...>, outputs<OS...>, FUNC> &other_node);
template<typename ...IS>
void operator>>(graph<inputs<IS...>, outputs<O0, O...>> &graph);
template<int POS, typename T>
void token_pushed(token<T> token);
......@@ -2,11 +2,29 @@
#include "graph.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<typename ...OS, typename FUNC>
function_node<inputs<O0, O...>, outputs<OS...>, FUNC>&
function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
operator>>(function_node<inputs<O0, O...>, outputs<OS...>, FUNC> &other_node) {
out_port_ >> other_node.in_ports();
return other_node;
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<typename ...IS>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
operator>>(graph<inputs<IS...>, outputs<O0, O...>> &graph) {
out_port_ >> graph.output_ports();
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<int POS, typename T>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
token_pushed(token<T> token) {
......@@ -25,6 +25,9 @@ class graph {};
template<typename I0, typename ...I, typename O0, typename ...O>
class graph<inputs<I0, I...>, outputs<O0, O...>> : public node {
template<typename INS, typename OUTS, typename FUNC>
class function_node;
// Our own type
using self_type = graph<inputs<I0, I...>, outputs<O0, O...>>;
......@@ -58,11 +61,24 @@ class graph<inputs<I0, I...>, outputs<O0, O...>> : public node {
input_at<POS> &input() {
return inputs_.template get<POS>();
inputs_type& input_ports() {
return inputs_;
template<int POS>
output_at<POS> &output() {
return outputs_.template get<POS>();
outputs_type& output_ports() {
return outputs_;
template<typename ...OS, typename FUNC>
function_node<inputs<I0, I...>, outputs<OS...>, FUNC>
&operator>>(function_node<inputs<I0, I...>, outputs<OS...>, FUNC> &other_node);
template<int POS, typename T>
void token_pushed(token<T> token);
......@@ -7,6 +7,14 @@ namespace dataflow {
namespace internal {
template<typename I0, typename ...I, typename O0, typename ...O>
template<typename ...OS, typename FUNC>
function_node<inputs<I0, I...>, outputs<OS...>, FUNC> &graph<inputs<I0, I...>, outputs<O0, O...>>::
operator>>(function_node<inputs<I0, I...>, outputs<OS...>, FUNC> &other_node) {
inputs_ >> other_node.in_ports();
return other_node;
template<typename I0, typename ...I, typename O0, typename ...O>
template<int POS, typename T>
void graph<inputs<I0, I...>, outputs<O0, O...>>::
token_pushed(token<T> token) {
......@@ -69,6 +69,12 @@ class multi_out_port {
return std::get<POS>(outputs_);
// Simple interface to connect multiple intputs to matching, multiple outputs
template<typename CB>
void operator>>(multi_in_port<CB, 0, O0, O...> &input) {
connect_to < CB, 0 > {this, &input}.connect();
node *next_node_at(int pos) const {
return next_node < 0 > {&outputs_}.get(pos);
......@@ -115,8 +121,26 @@ class multi_out_port {
return true;
template<typename CB, int POS, typename DUMMY=void>
struct connect_to {
multi_out_port<O0, O...> *out_port_;
multi_in_port<CB, 0, O0, O...> *in_port_;
void connect() {
out_port_->template get<POS>() >> in_port_->template get<POS>();
connect_to<CB, POS + 1>{out_port_, in_port_}.connect();
template<typename CB, typename DUMMY>
struct connect_to<CB, num_outputs, DUMMY> {
multi_out_port<O0, O...> *out_port_;
multi_in_port<CB, 0, O0, O...> *in_port_;
void connect() {
......@@ -25,7 +25,7 @@ class work_stealing_deque_item {
// as the race occurs in 'pop_head', where ALL CASES reading a corrupt/old value are cases
// where the next CAS fails anywas, thus making these corrupted values have no influence on
// the overall program execution.
// ==> If we find performance problems in this queue, try removing the atoimcs again.
// ==> If we find performance problems in this queue, try removing the atomics again.
// Pointer to the actual data
std::atomic<pointer_t> data_;
// Index (relative to stack base) to the next and previous element
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment