From 7e388ea8af21c0374953f4de354654fd2acee6e4 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Mon, 29 Jul 2019 15:35:09 +0200 Subject: [PATCH] Add merge and split nodes. This makes the programming model a full dataflow implementation, as it allows for branching and recursion. --- app/playground/main.cpp | 50 +++++++++++++++++++++++++++++++++++++------------- lib/pls/CMakeLists.txt | 9 ++++++++- lib/pls/include/pls/dataflow/internal/in_port.h | 8 +++----- lib/pls/include/pls/dataflow/internal/merge_node.h | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/dataflow/internal/split_node.h | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/dataflow/internal/switch_node.h | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 335 insertions(+), 19 deletions(-) create mode 100644 lib/pls/include/pls/dataflow/internal/merge_node.h create mode 100644 lib/pls/include/pls/dataflow/internal/split_node.h create mode 100644 lib/pls/include/pls/dataflow/internal/switch_node.h diff --git a/app/playground/main.cpp b/app/playground/main.cpp index e38032c..e5e6c7b 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -9,7 +9,9 @@ #include #include #include -#include +#include +#include +#include int main() { using namespace pls::dataflow; @@ -28,26 +30,48 @@ int main() { }; function_node, outputs, decltype(minus_one)> minus_one_node{minus_one}; + auto is_positive = [](const int &i1, bool &o1) { + o1 = i1 > 0; + }; + function_node, outputs, decltype(is_positive)> is_positive_node{is_positive}; + auto recursion = [&](const int &i1, const int &i2, int &o1) { - if (i1 > 0) { - std::tuple out; - graph.run({i1, i2}, out); - pls::scheduler::wait_for_all(); - o1 = std::get<0>(out); - } else { - o1 = i2; - } + std::tuple out; + graph.run({i1, i2}, out); + pls::scheduler::wait_for_all(); + o1 = std::get<0>(out); }; function_node, outputs, decltype(recursion)> recursion_node{recursion}; + split_node minus_split; + split_node decision_split; + switch_node recursion_split; + merge_node recursion_merge; + // Connect - graph.input<0>() >> minus_one_node.in_port<0>(); - minus_one_node.out_port<0>() >> recursion_node.in_port<0>(); + // Inputs to first processing step + graph.input<0>() >> minus_one_node.in_port<0>(); graph.input<1>() >> triple_node.in_port<0>(); - triple_node.out_port<0>() >> recursion_node.in_port<1>(); + minus_one_node.out_port<0>() >> minus_split.value_in_port(); + + // Prepare decision... + minus_split.out_port_1() >> is_positive_node.in_port<0>(); + is_positive_node.out_port<0>() >> decision_split.value_in_port(); + triple_node.out_port<0>() >> recursion_split.value_in_port(); + decision_split.out_port_1() >> recursion_split.condition_in_port(); + + // Connect true case (recursion) + minus_split.out_port_2() >> recursion_node.in_port<0>(); + recursion_split.true_out_port() >> recursion_node.in_port<1>(); + recursion_node.out_port<0>() >> recursion_merge.true_in_port(); + + // Connect false case (no recursion) + recursion_split.false_out_port() >> recursion_merge.false_in_port(); - recursion_node.out_port<0>() >> graph.output<0>(); + // Deliver final result (back from merge) + decision_split.out_port_2() >> recursion_merge.condition_in_port(); + recursion_merge.value_out_port() >> graph.output<0>(); // Build graph.build(); diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 8b0b789..9e314cb 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -18,6 +18,11 @@ add_library(pls STATIC include/pls/dataflow/internal/function_node.h include/pls/dataflow/internal/node.h include/pls/dataflow/internal/graph.h + include/pls/dataflow/internal/build_state.h + include/pls/dataflow/internal/function_node_impl.h + include/pls/dataflow/internal/graph_impl.h + include/pls/dataflow/internal/switch_node.h + include/pls/dataflow/internal/merge_node.h include/pls/internal/base/spin_lock.h include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp @@ -43,13 +48,15 @@ add_library(pls STATIC include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/unique_id.h include/pls/internal/helpers/range.h + include/pls/internal/helpers/seqence.h include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp - include/pls/internal/scheduling/lambda_task.h include/pls/internal/helpers/seqence.h include/pls/dataflow/internal/build_state.h include/pls/dataflow/internal/function_node_impl.h include/pls/dataflow/internal/graph_impl.h) + include/pls/internal/scheduling/lambda_task.h + include/pls/dataflow/internal/split_node.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls PUBLIC diff --git a/lib/pls/include/pls/dataflow/internal/in_port.h b/lib/pls/include/pls/dataflow/internal/in_port.h index 629c1f7..8aaace0 100644 --- a/lib/pls/include/pls/dataflow/internal/in_port.h +++ b/lib/pls/include/pls/dataflow/internal/in_port.h @@ -68,13 +68,12 @@ class multi_in_port { }; template class multi_in_port : public in_port { - private: + public: // Helpers for managing recursive types using my_type = multi_in_port; using child_type = multi_in_port; using value_type = I0; - public: explicit multi_in_port(node *owning_node, CB *cb) : in_port{owning_node}, cb_{cb}, rec_{owning_node, cb} {}; void token_pushed(token token) override { @@ -103,13 +102,13 @@ class multi_in_port : public in_port { using value_type_at = typename type_at::type; // Helper struct required for recursive access to input's by index - template + template struct get_at { static RES_T &get(MY_T &self) { return get_at::get(self.rec_); } }; - template + template struct get_at<0, RESULT, CURRENT> { static RESULT &get(CURRENT &self) { return self; @@ -126,7 +125,6 @@ class multi_in_port : public in_port { return this->is_connected() && rec_.fully_connected(); } - private: child_type rec_; CB *cb_; }; diff --git a/lib/pls/include/pls/dataflow/internal/merge_node.h b/lib/pls/include/pls/dataflow/internal/merge_node.h new file mode 100644 index 0000000..2a7a6c5 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/merge_node.h @@ -0,0 +1,115 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_ +#define PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_ + +#include + +#include "in_port.h" +#include "out_port.h" +#include "node.h" +#include "token.h" + +namespace pls { +namespace dataflow { +namespace internal { + +template +class merge_node : public node { + // Our own type + using self_type = merge_node; + + // Input-Output port types + using multi_in_port_type = multi_in_port; + using multi_out_port_type = multi_out_port; + + // Input-Output tuples + using input_tuple = std::tuple, token, token>; + + // Memory used for ONE active invocation of the dataflow-graph + struct invocation_memory { + std::atomic inputs_missing_; + input_tuple input_buffer_; + }; + + // Encodings for current input state (needs to fit into an atomic) + static constexpr unsigned int IN_0_MISSING = 1u << 0u; + static constexpr unsigned int IN_1_MISSING = 1u << 1u; + static constexpr unsigned int COND_MISSING = 1u << 2u; + static constexpr unsigned int INITIAL_STATE = IN_0_MISSING + IN_1_MISSING + COND_MISSING; + + public: + explicit merge_node() : in_port_{this, this}, out_port_{} {} + + in_port &true_in_port() { + return in_port_.template get<0>(); + } + + in_port &false_in_port() { + return in_port_.template get<1>(); + } + + in_port &condition_in_port() { + return in_port_.template get<2>(); + } + + out_port &value_out_port() { + return out_port_.template get<0>(); + } + + template + void token_pushed(token token) { + auto current_memory = get_invocation(token.invocation()); + + std::get(current_memory->input_buffer_) = token; + + unsigned int remaining_inputs; + if (POS == 0) { + remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_0_MISSING) - IN_0_MISSING; + } else if (POS == 1) { + remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_1_MISSING) - IN_1_MISSING; + } else { + remaining_inputs = (current_memory->inputs_missing_).fetch_sub(COND_MISSING) - COND_MISSING; + } + + if ((remaining_inputs & COND_MISSING) == 0) { + if ((remaining_inputs & IN_0_MISSING) == 0) { + auto &data = std::get<0>(current_memory->input_buffer_); + value_out_port().push_token(data); + current_memory->inputs_missing_ += IN_0_MISSING + COND_MISSING; + } else if ((remaining_inputs & IN_1_MISSING) == 0) { + auto &data = std::get<1>(current_memory->input_buffer_); + value_out_port().push_token(data); + current_memory->inputs_missing_ += IN_1_MISSING + COND_MISSING; + } + } + } + + int num_successors() const override { + return 1; + } + node *successor_at(int pos) const override { + return out_port_.next_node_at(pos); + } + + bool is_fully_connected() const override { + return in_port_.fully_connected() && out_port_.fully_connected(); + } + + int instance_buffer_size() const override { + return sizeof(invocation_memory); + } + void init_instance_buffer(void *memory) const override { + auto invocation = new(memory) invocation_memory{}; + invocation->inputs_missing_ = INITIAL_STATE; + }; + + private: + multi_in_port_type in_port_; + multi_out_port_type out_port_; +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/split_node.h b/lib/pls/include/pls/dataflow/internal/split_node.h new file mode 100644 index 0000000..09d106d --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/split_node.h @@ -0,0 +1,73 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_ +#define PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_ + +#include + +#include "in_port.h" +#include "out_port.h" +#include "node.h" +#include "token.h" + +namespace pls { +namespace dataflow { +namespace internal { + +template +class split_node : public node { + // Our own type + using self_type = split_node; + + // Input-Output port types + using multi_in_port_type = multi_in_port; + using multi_out_port_type = multi_out_port; + + public: + explicit split_node() : in_port_{this, this}, out_port_{} {} + + in_port &value_in_port() { + return in_port_.template get<0>(); + } + + out_port &out_port_1() { + return out_port_.template get<0>(); + } + + out_port &out_port_2() { + return out_port_.template get<1>(); + } + + template + void token_pushed(token token) { + out_port_1().push_token(token); + out_port_2().push_token(token); + } + + int num_successors() const override { + return 2; + } + node *successor_at(int pos) const override { + return out_port_.next_node_at(pos); + } + + bool is_fully_connected() const override { + return in_port_.fully_connected() && out_port_.fully_connected(); + } + + int instance_buffer_size() const override { + return 0; + } + void init_instance_buffer(void *memory) const override { + // No need for memory, we simply forward + }; + + private: + multi_in_port_type in_port_; + multi_out_port_type out_port_; +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/switch_node.h b/lib/pls/include/pls/dataflow/internal/switch_node.h new file mode 100644 index 0000000..598275b --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/switch_node.h @@ -0,0 +1,99 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_ +#define PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_ + +#include + +#include "in_port.h" +#include "out_port.h" +#include "node.h" +#include "token.h" + +namespace pls { +namespace dataflow { +namespace internal { + +template +class switch_node : public node { + // Our own type + using self_type = switch_node; + + // Input-Output port types + using multi_in_port_type = multi_in_port; + using multi_out_port_type = multi_out_port; + + // Input-Output tuples + using input_tuple = std::tuple, token>; + + // Memory used for ONE active invocation of the dataflow-graph + struct invocation_memory { + std::atomic inputs_missing_; + input_tuple input_buffer_; + }; + + public: + explicit switch_node() : in_port_{this, this}, out_port_{} {} + + in_port &value_in_port() { + return in_port_.template get<0>(); + } + + in_port &condition_in_port() { + return in_port_.template get<1>(); + } + + out_port &true_out_port() { + return out_port_.template get<0>(); + } + + out_port &false_out_port() { + return out_port_.template get<1>(); + } + + template + void token_pushed(token token) { + auto current_memory = get_invocation(token.invocation()); + + std::get(current_memory->input_buffer_) = token; + auto remaining_inputs = --(current_memory->inputs_missing_); + if (remaining_inputs == 0) { + bool condition = std::get<1>(current_memory->input_buffer_).value(); + auto &data = std::get<0>(current_memory->input_buffer_); + if (condition) { + true_out_port().push_token(data); + } else { + false_out_port().push_token(data); + } + current_memory->inputs_missing_ = 2; + } + } + + int num_successors() const override { + return 2; + } + node *successor_at(int pos) const override { + return out_port_.next_node_at(pos); + } + + bool is_fully_connected() const override { + return in_port_.fully_connected() && out_port_.fully_connected(); + } + + int instance_buffer_size() const override { + return sizeof(invocation_memory); + } + void init_instance_buffer(void *memory) const override { + auto invocation = new(memory) invocation_memory{}; + invocation->inputs_missing_ = 2; + }; + + private: + multi_in_port_type in_port_; + multi_out_port_type out_port_; +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_ -- libgit2 0.26.0