Commit f671c042 by FritzFlorian

Refactor to put all dataflow related items in one directory.

parent 6150b321
...@@ -4,42 +4,44 @@ ...@@ -4,42 +4,44 @@
#include <tuple> #include <tuple>
#include <array> #include <array>
#include <pls/dataflow/inputs.h> #include <pls/dataflow/internal/inputs.h>
#include <pls/dataflow/outputs.h> #include <pls/dataflow/internal/outputs.h>
#include <pls/dataflow/internal/function_node.h> #include <pls/dataflow/internal/function_node.h>
#include <pls/dataflow/internal/graph.h>
#include <pls/dataflow/internal/out_port.h> #include <pls/dataflow/internal/out_port.h>
int main() { int main() {
using namespace pls::dataflow; using namespace pls::dataflow;
using namespace pls::dataflow::internal; using namespace pls::dataflow::internal;
out_port<int> external1; // Define
out_port<int> external2; graph<inputs<int, int>, outputs<int>> graph;
auto func1 = [](const int &i1, const int &i2, int &o1) { auto func1 = [](const int &i1, const int &i2, int &o1) {
std::cout << "Hello! " << i1 << ", " << i2 << std::endl; std::cout << "Add up " << i1 << " and " << i2 << "..." << std::endl;
o1 = i1 + i2; o1 = i1 + i2;
}; };
function_node<inputs<int, int>, outputs<int>, decltype(func1)> node1{func1}; function_node<inputs<int, int>, outputs<int>, decltype(func1)> node1{func1};
auto func2 = [](const int &i1, int &o1) { auto func2 = [](const int &i1, int &o1) {
std::cout << "We get! " << i1 << std::endl; std::cout << "Print Result " << i1 << std::endl;
o1 = i1;
}; };
function_node<inputs<int>, outputs<int>, decltype(func2)> node2{func2}; function_node<inputs<int>, outputs<int>, decltype(func2)> node2{func2};
external1 >> node1.in_port<0>(); // Connect
external2 >> node1.in_port<1>(); graph.input<0>() >> node1.in_port<0>();
graph.input<1>() >> node1.in_port<1>();
node1.out_port<0>() >> node2.in_port<0>(); node1.out_port<0>() >> node2.in_port<0>();
// Simulate execution environment node2.out_port<0>() >> graph.output<0>();
void *buffer1 = malloc(node1.instance_buffer_size());
void *buffer2 = malloc(node2.instance_buffer_size()); // Build
void *memory[] = {buffer1, buffer2}; graph.build();
node1.init_instance_buffer(memory[0]);
node1.set_memory_index(0); // Execute
node2.init_instance_buffer(memory[1]); graph.run({1, 2});
node2.set_memory_index(1); graph.run({1, 1});
graph.run({5, 6});
invocation_info invocation{memory};
external1.push_token({1, invocation});
external2.push_token({2, invocation});
} }
...@@ -10,8 +10,8 @@ add_library(pls STATIC ...@@ -10,8 +10,8 @@ add_library(pls STATIC
include/pls/algorithms/scan_impl.h include/pls/algorithms/scan_impl.h
include/pls/dataflow/dataflow.h include/pls/dataflow/dataflow.h
include/pls/dataflow/inputs.h include/pls/dataflow/internal/inputs.h
include/pls/dataflow/outputs.h include/pls/dataflow/internal/outputs.h
include/pls/dataflow/internal/token.h include/pls/dataflow/internal/token.h
include/pls/dataflow/internal/in_port.h include/pls/dataflow/internal/in_port.h
include/pls/dataflow/internal/out_port.h include/pls/dataflow/internal/out_port.h
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#define PLS_DATAFLOW_DATAFLOW_H_ #define PLS_DATAFLOW_DATAFLOW_H_
#include "graph.h" #include "graph.h"
#include "inputs.h" #include "pls/dataflow/internal/inputs.h"
#include "outputs.h" #include "pls/dataflow/internal/outputs.h"
#endif //PLS_DATAFLOW_DATAFLOW_H_ #endif //PLS_DATAFLOW_DATAFLOW_H_
...@@ -10,8 +10,8 @@ ...@@ -10,8 +10,8 @@
#include "out_port.h" #include "out_port.h"
#include "node.h" #include "node.h"
#include "pls/dataflow/inputs.h" #include "inputs.h"
#include "pls/dataflow/outputs.h" #include "outputs.h"
#include "pls/internal/helpers/seqence.h" #include "pls/internal/helpers/seqence.h"
......
...@@ -9,9 +9,8 @@ ...@@ -9,9 +9,8 @@
#include "node.h" #include "node.h"
#include "in_port.h" #include "in_port.h"
#include "out_port.h" #include "out_port.h"
#include "inputs.h"
#include "pls/dataflow/inputs.h" #include "outputs.h"
#include "pls/dataflow/outputs.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
...@@ -22,18 +21,38 @@ class graph {}; ...@@ -22,18 +21,38 @@ class graph {};
template<typename I0, typename ...I, typename O0, typename ...O> template<typename I0, typename ...I, typename O0, typename ...O>
class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : public node { class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : public node {
private: private:
// Our own type // Our own type
using self_type = graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>>; using self_type = graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>>;
// Input-Output port types // Callbacks for input/output ports
using multi_in_port_type = multi_in_port<self_type, 0, I0, I...>; struct in_port_cb {
self_type *self_;
template<int POS, typename T>
void token_pushed(token<T> token) {
self_->in_port_token_pushed<POS, T>(token);
}
};
struct out_port_cb {
self_type *self_;
template<int POS, typename T>
void token_pushed(token<T> token) {
self_->out_port_token_pushed<POS, T>(token);
}
};
// Input-Output port types (external)
using multi_in_port_type = multi_in_port<in_port_cb, 0, I0, I...>;
using multi_out_port_type = multi_out_port<O0, O...>; using multi_out_port_type = multi_out_port<O0, O...>;
// Input-Output port types (internal)
using internal_in_port_type = multi_out_port<I0, I...>;
using internal_out_port_type = multi_in_port<out_port_cb, 0, O0, O...>;
// Input-Output value tuples // Input-Output value tuples
using input_tuple = std::tuple<token<I0>, token<I>...>; using value_input_tuple = std::tuple<I0, I...>;
using output_tuple = std::tuple<token<O0>, token<O>...>; using input_tuple = std::tuple<token < I0>, token<I>...>;
using output_tuple = std::tuple<token < O0>, token<O>...>;
static constexpr int num_in_ports = std::tuple_size<input_tuple>(); static constexpr int num_in_ports = std::tuple_size<input_tuple>();
static constexpr int num_out_ports = std::tuple_size<output_tuple>(); static constexpr int num_out_ports = std::tuple_size<output_tuple>();
...@@ -53,16 +72,16 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -53,16 +72,16 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
in_port_at<POS> &in_port() { in_port_at<POS> &in_port() {
return in_port_.template get<POS>(); return in_port_.template get<POS>();
} }
template<int POS> template<int POS>
out_port_at<POS> &out_port() { out_port_at<POS> &out_port() {
return out_port_.template get<POS>(); return out_port_.template get<POS>();
} }
template<int POS, typename T> template<int POS, typename T>
void token_pushed(token<T> token) { void in_port_token_pushed(token<T> token) {
auto invocation = get_invocation<invocation_memory>(token); auto invocation = get_invocation<invocation_memory>(token);
// TODO: Invoke a new instance of our graph (recursive call) when all inputs are here
std::cout << "Token pushed at " << POS << " with value " << token.value() << std::endl; std::cout << "Token pushed at " << POS << " with value " << token.value() << std::endl;
auto remaining_inputs = --(invocation->inputs_missing_); auto remaining_inputs = --(invocation->inputs_missing_);
if (remaining_inputs == 0) { if (remaining_inputs == 0) {
...@@ -70,7 +89,37 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -70,7 +89,37 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
} }
} }
graph() : in_port_{this, this}, out_port_{} { template<int POS>
using internal_in_port_at = typename internal_in_port_type::template out_port_type_at<POS>;
template<int POS>
using internal_out_port_at = typename internal_out_port_type::template in_port_type_at<POS>;
template<int POS>
internal_in_port_at<POS> &input() {
return internal_in_port_.template get<POS>();
}
template<int POS>
internal_out_port_at<POS> &output() {
return internal_out_port_.template get<POS>();
}
template<int POS, typename T>
void out_port_token_pushed(token<T> token) {
auto invocation = get_invocation<invocation_memory>(token);
std::cout << "Output produced at port " << POS << " with value " << token.value() << std::endl;
auto remaining_inputs = --(invocation->inputs_missing_);
if (remaining_inputs == 0) {
std::cout << "All inputs there!" << std::endl;
}
}
graph() : in_port_cb_{this},
out_port_cb_{this},
in_port_{this, &in_port_cb_},
out_port_{},
internal_in_port_{},
internal_out_port_{this, &out_port_cb_} {
} }
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
...@@ -78,11 +127,11 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -78,11 +127,11 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
void build() { void build() {
state_ = building; state_ = building;
PLS_ASSERT(is_fully_connected(), "Must fully connect dataflow graphs!") PLS_ASSERT(is_internal_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!")
num_nodes_ = 0; num_nodes_ = 0;
for (int i = 0; i < num_in_ports; i++) { for (int i = 0; i < num_in_ports; i++) {
build_recursive(successor_at(i)); build_recursive(internal_in_port_.next_node_at(i));
} }
state_ = built; state_ = built;
...@@ -96,7 +145,7 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -96,7 +145,7 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
return; // Already visited return; // Already visited
} }
node->state_ = building; node->state_ = building;
PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graphs") PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graph nodes!")
add_node(node); add_node(node);
for (int i = 0; i < node->num_successors(); i++) { for (int i = 0; i < node->num_successors(); i++) {
...@@ -118,6 +167,46 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -118,6 +167,46 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
} }
} }
// TODO: Clean up + move to async task
template<int N, typename ...IT>
struct feed_inputs {
feed_inputs(internal_in_port_type &, value_input_tuple &, invocation_info &) {}
void run() {}
};
template<int N, typename IT1, typename ...IT>
struct feed_inputs<N, IT1, IT...> {
internal_in_port_type &inputs_;
value_input_tuple &input_values_;
invocation_info &invocation_;
feed_inputs(internal_in_port_type &inputs,
value_input_tuple &input_values,
invocation_info &invocation) : inputs_{inputs},
input_values_{input_values},
invocation_{invocation} {}
void run() {
inputs_.template get<N>().push_token(token<IT1>{std::get<N>(input_values_), invocation_});
feed_inputs<N + 1, IT...>{inputs_, input_values_, invocation_}.run();
}
};
void run(value_input_tuple input) {
// TODO: clearly move this onto the task stack instead fo malloc (without any free...)
void **buffers = reinterpret_cast<void **>(malloc(num_nodes_ * sizeof(void *)));
node *iterator = start_;
for (int i = 0; i < num_nodes_; i++) {
auto required_size = iterator->instance_buffer_size();
buffers[i] = malloc(required_size);
iterator->init_instance_buffer(buffers[i]);
iterator = iterator->direct_successor_;
}
invocation_info invocation{buffers};
feed_inputs<0, I0, I...>{internal_in_port_, input, invocation}.run();
}
////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////
// Overrides for generic node functionality (building the graph) // Overrides for generic node functionality (building the graph)
////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////
...@@ -132,6 +221,10 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -132,6 +221,10 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
return in_port_.fully_connected() && out_port_.fully_connected(); return in_port_.fully_connected() && out_port_.fully_connected();
} }
bool is_internal_fully_connected() const {
return internal_in_port_.fully_connected() && internal_out_port_.fully_connected();
}
int instance_buffer_size() const override { int instance_buffer_size() const override {
return sizeof(invocation_memory); return sizeof(invocation_memory);
} }
...@@ -142,9 +235,15 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -142,9 +235,15 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
private: private:
// Input/Output ports // Input/Output ports
in_port_cb in_port_cb_;
out_port_cb out_port_cb_;
multi_in_port_type in_port_; multi_in_port_type in_port_;
multi_out_port_type out_port_; multi_out_port_type out_port_;
internal_in_port_type internal_in_port_;
internal_out_port_type internal_out_port_;
int num_nodes_{0}; int num_nodes_{0};
}; };
......
#ifndef PLS_DATAFLOW_INPUTS_H_ #ifndef PLS_DATAFLOW_INTERNAL_INPUTS_H_
#define PLS_DATAFLOW_INPUTS_H_ #define PLS_DATAFLOW_INTERNAL_INPUTS_H_
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
...@@ -12,4 +12,4 @@ struct inputs { ...@@ -12,4 +12,4 @@ struct inputs {
} }
} }
#endif //PLS_DATAFLOW_INPUTS_H_ #endif //PLS_DATAFLOW_INTERNAL_INPUTS_H_
...@@ -32,7 +32,7 @@ class node { ...@@ -32,7 +32,7 @@ class node {
private: private:
int memory_index_{0}; int memory_index_{0};
node *direct_successor_; node *direct_successor_{nullptr};
state state_{fresh}; state state_{fresh};
}; };
......
#ifndef PLS_DATAFLOW_OUTPUTS_H_ #ifndef PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
#define PLS_DATAFLOW_OUTPUTS_H_ #define PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
...@@ -12,4 +12,4 @@ struct outputs { ...@@ -12,4 +12,4 @@ struct outputs {
} }
} }
#endif //PLS_DATAFLOW_OUTPUTS_H_ #endif //PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment