From f671c042dcd1d65d561f55ca2f044b72c4d67c3a Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Wed, 24 Jul 2019 11:03:11 +0200 Subject: [PATCH] Refactor to put all dataflow related items in one directory. --- app/playground/main.cpp | 42 ++++++++++++++++++++++-------------------- lib/pls/CMakeLists.txt | 4 ++-- lib/pls/include/pls/dataflow/dataflow.h | 4 ++-- lib/pls/include/pls/dataflow/inputs.h | 15 --------------- lib/pls/include/pls/dataflow/internal/function_node.h | 4 ++-- lib/pls/include/pls/dataflow/internal/graph.h | 127 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------- lib/pls/include/pls/dataflow/internal/inputs.h | 15 +++++++++++++++ lib/pls/include/pls/dataflow/internal/node.h | 2 +- lib/pls/include/pls/dataflow/internal/outputs.h | 15 +++++++++++++++ lib/pls/include/pls/dataflow/outputs.h | 15 --------------- 10 files changed, 172 insertions(+), 71 deletions(-) delete mode 100644 lib/pls/include/pls/dataflow/inputs.h create mode 100644 lib/pls/include/pls/dataflow/internal/inputs.h create mode 100644 lib/pls/include/pls/dataflow/internal/outputs.h delete mode 100644 lib/pls/include/pls/dataflow/outputs.h diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 4aa4802..37f3eb6 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -4,42 +4,44 @@ #include #include -#include -#include +#include +#include #include +#include #include int main() { using namespace pls::dataflow; using namespace pls::dataflow::internal; - out_port external1; - out_port external2; + // Define + graph, outputs> graph; auto func1 = [](const int &i1, const int &i2, int &o1) { - std::cout << "Hello! " << i1 << ", " << i2 << std::endl; + std::cout << "Add up " << i1 << " and " << i2 << "..." << std::endl; o1 = i1 + i2; }; function_node, outputs, decltype(func1)> node1{func1}; + auto func2 = [](const int &i1, int &o1) { - std::cout << "We get! " << i1 << std::endl; + std::cout << "Print Result " << i1 << std::endl; + o1 = i1; }; function_node, outputs, decltype(func2)> node2{func2}; - external1 >> node1.in_port<0>(); - external2 >> node1.in_port<1>(); + // Connect + graph.input<0>() >> node1.in_port<0>(); + graph.input<1>() >> node1.in_port<1>(); + node1.out_port<0>() >> node2.in_port<0>(); - // Simulate execution environment - void *buffer1 = malloc(node1.instance_buffer_size()); - void *buffer2 = malloc(node2.instance_buffer_size()); - void *memory[] = {buffer1, buffer2}; - node1.init_instance_buffer(memory[0]); - node1.set_memory_index(0); - node2.init_instance_buffer(memory[1]); - node2.set_memory_index(1); - - invocation_info invocation{memory}; - external1.push_token({1, invocation}); - external2.push_token({2, invocation}); + node2.out_port<0>() >> graph.output<0>(); + + // Build + graph.build(); + + // Execute + graph.run({1, 2}); + graph.run({1, 1}); + graph.run({5, 6}); } diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 1f0f3b7..34e7e6f 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -10,8 +10,8 @@ add_library(pls STATIC include/pls/algorithms/scan_impl.h include/pls/dataflow/dataflow.h - include/pls/dataflow/inputs.h - include/pls/dataflow/outputs.h + include/pls/dataflow/internal/inputs.h + include/pls/dataflow/internal/outputs.h include/pls/dataflow/internal/token.h include/pls/dataflow/internal/in_port.h include/pls/dataflow/internal/out_port.h diff --git a/lib/pls/include/pls/dataflow/dataflow.h b/lib/pls/include/pls/dataflow/dataflow.h index bc9ce86..a02c54a 100644 --- a/lib/pls/include/pls/dataflow/dataflow.h +++ b/lib/pls/include/pls/dataflow/dataflow.h @@ -3,7 +3,7 @@ #define PLS_DATAFLOW_DATAFLOW_H_ #include "graph.h" -#include "inputs.h" -#include "outputs.h" +#include "pls/dataflow/internal/inputs.h" +#include "pls/dataflow/internal/outputs.h" #endif //PLS_DATAFLOW_DATAFLOW_H_ diff --git a/lib/pls/include/pls/dataflow/inputs.h b/lib/pls/include/pls/dataflow/inputs.h deleted file mode 100644 index d1bb05d..0000000 --- a/lib/pls/include/pls/dataflow/inputs.h +++ /dev/null @@ -1,15 +0,0 @@ - -#ifndef PLS_DATAFLOW_INPUTS_H_ -#define PLS_DATAFLOW_INPUTS_H_ - -namespace pls { -namespace dataflow { - -template -struct inputs { -}; - -} -} - -#endif //PLS_DATAFLOW_INPUTS_H_ diff --git a/lib/pls/include/pls/dataflow/internal/function_node.h b/lib/pls/include/pls/dataflow/internal/function_node.h index ea88b28..d630382 100644 --- a/lib/pls/include/pls/dataflow/internal/function_node.h +++ b/lib/pls/include/pls/dataflow/internal/function_node.h @@ -10,8 +10,8 @@ #include "out_port.h" #include "node.h" -#include "pls/dataflow/inputs.h" -#include "pls/dataflow/outputs.h" +#include "inputs.h" +#include "outputs.h" #include "pls/internal/helpers/seqence.h" diff --git a/lib/pls/include/pls/dataflow/internal/graph.h b/lib/pls/include/pls/dataflow/internal/graph.h index 0ae153a..84e2839 100644 --- a/lib/pls/include/pls/dataflow/internal/graph.h +++ b/lib/pls/include/pls/dataflow/internal/graph.h @@ -9,9 +9,8 @@ #include "node.h" #include "in_port.h" #include "out_port.h" - -#include "pls/dataflow/inputs.h" -#include "pls/dataflow/outputs.h" +#include "inputs.h" +#include "outputs.h" namespace pls { namespace dataflow { @@ -22,18 +21,38 @@ class graph {}; template class graph, pls::dataflow::outputs> : public node { - private: // Our own type using self_type = graph, pls::dataflow::outputs>; - // Input-Output port types - using multi_in_port_type = multi_in_port; + // Callbacks for input/output ports + struct in_port_cb { + self_type *self_; + template + void token_pushed(token token) { + self_->in_port_token_pushed(token); + } + }; + struct out_port_cb { + self_type *self_; + template + void token_pushed(token token) { + self_->out_port_token_pushed(token); + } + }; + + // Input-Output port types (external) + using multi_in_port_type = multi_in_port; using multi_out_port_type = multi_out_port; + // Input-Output port types (internal) + using internal_in_port_type = multi_out_port; + using internal_out_port_type = multi_in_port; + // Input-Output value tuples - using input_tuple = std::tuple, token...>; - using output_tuple = std::tuple, token...>; + using value_input_tuple = std::tuple; + using input_tuple = std::tuple, token...>; + using output_tuple = std::tuple, token...>; static constexpr int num_in_ports = std::tuple_size(); static constexpr int num_out_ports = std::tuple_size(); @@ -53,16 +72,16 @@ class graph, pls::dataflow::outputs> : in_port_at &in_port() { return in_port_.template get(); } - template out_port_at &out_port() { return out_port_.template get(); } template - void token_pushed(token token) { + void in_port_token_pushed(token token) { auto invocation = get_invocation(token); + // TODO: Invoke a new instance of our graph (recursive call) when all inputs are here std::cout << "Token pushed at " << POS << " with value " << token.value() << std::endl; auto remaining_inputs = --(invocation->inputs_missing_); if (remaining_inputs == 0) { @@ -70,7 +89,37 @@ class graph, pls::dataflow::outputs> : } } - graph() : in_port_{this, this}, out_port_{} { + template + using internal_in_port_at = typename internal_in_port_type::template out_port_type_at; + template + using internal_out_port_at = typename internal_out_port_type::template in_port_type_at; + + template + internal_in_port_at &input() { + return internal_in_port_.template get(); + } + template + internal_out_port_at &output() { + return internal_out_port_.template get(); + } + + template + void out_port_token_pushed(token token) { + auto invocation = get_invocation(token); + + std::cout << "Output produced at port " << POS << " with value " << token.value() << std::endl; + auto remaining_inputs = --(invocation->inputs_missing_); + if (remaining_inputs == 0) { + std::cout << "All inputs there!" << std::endl; + } + } + + graph() : in_port_cb_{this}, + out_port_cb_{this}, + in_port_{this, &in_port_cb_}, + out_port_{}, + internal_in_port_{}, + internal_out_port_{this, &out_port_cb_} { } ///////////////////////////////////////////////////////////////// @@ -78,11 +127,11 @@ class graph, pls::dataflow::outputs> : ///////////////////////////////////////////////////////////////// void build() { state_ = building; - PLS_ASSERT(is_fully_connected(), "Must fully connect dataflow graphs!") + PLS_ASSERT(is_internal_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!") num_nodes_ = 0; for (int i = 0; i < num_in_ports; i++) { - build_recursive(successor_at(i)); + build_recursive(internal_in_port_.next_node_at(i)); } state_ = built; @@ -96,7 +145,7 @@ class graph, pls::dataflow::outputs> : return; // Already visited } node->state_ = building; - PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graphs") + PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graph nodes!") add_node(node); for (int i = 0; i < node->num_successors(); i++) { @@ -118,6 +167,46 @@ class graph, pls::dataflow::outputs> : } } + // TODO: Clean up + move to async task + template + struct feed_inputs { + feed_inputs(internal_in_port_type &, value_input_tuple &, invocation_info &) {} + void run() {} + }; + template + struct feed_inputs { + internal_in_port_type &inputs_; + value_input_tuple &input_values_; + invocation_info &invocation_; + + feed_inputs(internal_in_port_type &inputs, + value_input_tuple &input_values, + invocation_info &invocation) : inputs_{inputs}, + input_values_{input_values}, + invocation_{invocation} {} + + void run() { + inputs_.template get().push_token(token{std::get(input_values_), invocation_}); + feed_inputs{inputs_, input_values_, invocation_}.run(); + } + }; + + void run(value_input_tuple input) { + // TODO: clearly move this onto the task stack instead fo malloc (without any free...) + void **buffers = reinterpret_cast(malloc(num_nodes_ * sizeof(void *))); + node *iterator = start_; + for (int i = 0; i < num_nodes_; i++) { + auto required_size = iterator->instance_buffer_size(); + buffers[i] = malloc(required_size); + iterator->init_instance_buffer(buffers[i]); + + iterator = iterator->direct_successor_; + } + + invocation_info invocation{buffers}; + feed_inputs<0, I0, I...>{internal_in_port_, input, invocation}.run(); + } + ////////////////////////////////////////////////////////////////// // Overrides for generic node functionality (building the graph) ////////////////////////////////////////////////////////////////// @@ -132,6 +221,10 @@ class graph, pls::dataflow::outputs> : return in_port_.fully_connected() && out_port_.fully_connected(); } + bool is_internal_fully_connected() const { + return internal_in_port_.fully_connected() && internal_out_port_.fully_connected(); + } + int instance_buffer_size() const override { return sizeof(invocation_memory); } @@ -142,9 +235,15 @@ class graph, pls::dataflow::outputs> : private: // Input/Output ports + in_port_cb in_port_cb_; + out_port_cb out_port_cb_; + multi_in_port_type in_port_; multi_out_port_type out_port_; + internal_in_port_type internal_in_port_; + internal_out_port_type internal_out_port_; + int num_nodes_{0}; }; diff --git a/lib/pls/include/pls/dataflow/internal/inputs.h b/lib/pls/include/pls/dataflow/internal/inputs.h new file mode 100644 index 0000000..5bb9e61 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/inputs.h @@ -0,0 +1,15 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_INPUTS_H_ +#define PLS_DATAFLOW_INTERNAL_INPUTS_H_ + +namespace pls { +namespace dataflow { + +template +struct inputs { +}; + +} +} + +#endif //PLS_DATAFLOW_INTERNAL_INPUTS_H_ diff --git a/lib/pls/include/pls/dataflow/internal/node.h b/lib/pls/include/pls/dataflow/internal/node.h index 8f7ac4d..0d03488 100644 --- a/lib/pls/include/pls/dataflow/internal/node.h +++ b/lib/pls/include/pls/dataflow/internal/node.h @@ -32,7 +32,7 @@ class node { private: int memory_index_{0}; - node *direct_successor_; + node *direct_successor_{nullptr}; state state_{fresh}; }; diff --git a/lib/pls/include/pls/dataflow/internal/outputs.h b/lib/pls/include/pls/dataflow/internal/outputs.h new file mode 100644 index 0000000..9090885 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/outputs.h @@ -0,0 +1,15 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_OUTPUTS_H_ +#define PLS_DATAFLOW_INTERNAL_OUTPUTS_H_ + +namespace pls { +namespace dataflow { + +template +struct outputs { +}; + +} +} + +#endif //PLS_DATAFLOW_INTERNAL_OUTPUTS_H_ diff --git a/lib/pls/include/pls/dataflow/outputs.h b/lib/pls/include/pls/dataflow/outputs.h deleted file mode 100644 index d4a7967..0000000 --- a/lib/pls/include/pls/dataflow/outputs.h +++ /dev/null @@ -1,15 +0,0 @@ - -#ifndef PLS_DATAFLOW_OUTPUTS_H_ -#define PLS_DATAFLOW_OUTPUTS_H_ - -namespace pls { -namespace dataflow { - -template -struct outputs { -}; - -} -} - -#endif //PLS_DATAFLOW_OUTPUTS_H_ -- libgit2 0.26.0