Commit df021243 by FritzFlorian

Working recursive invocations.

Recursion works by using a function node, calling the graph again. We separated an graph invocation form an function invocation within an graph, making the graph only handle one concern.
parent b48c46fc
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <tuple> #include <tuple>
#include <array> #include <array>
#include <pls/pls.h>
#include <pls/dataflow/internal/inputs.h> #include <pls/dataflow/internal/inputs.h>
#include <pls/dataflow/internal/outputs.h> #include <pls/dataflow/internal/outputs.h>
#include <pls/dataflow/internal/function_node.h> #include <pls/dataflow/internal/function_node.h>
...@@ -17,31 +18,53 @@ int main() { ...@@ -17,31 +18,53 @@ int main() {
// Define // Define
graph<inputs<int, int>, outputs<int>> graph; graph<inputs<int, int>, outputs<int>> graph;
auto func1 = [](const int &i1, const int &i2, int &o1) { auto triple = [](const int &i1, int &o1) {
std::cout << "Add up " << i1 << " and " << i2 << "..." << std::endl; o1 = i1 * 3;
o1 = i1 + i2;
}; };
function_node<inputs<int, int>, outputs<int>, decltype(func1)> node1{func1}; function_node<inputs<int>, outputs<int>, decltype(triple)> triple_node{triple};
auto func2 = [](const int &i1, int &o1) { auto minus_one = [](const int &i1, int &o1) {
std::cout << "Print Result " << i1 << std::endl; o1 = i1 - 1;
o1 = i1;
}; };
function_node<inputs<int>, outputs<int>, decltype(func2)> node2{func2}; function_node<inputs<int>, outputs<int>, decltype(minus_one)> minus_one_node{minus_one};
auto recursion = [&](const int &i1, const int &i2, int &o1) {
if (i1 > 0) {
std::tuple<int> out;
graph.run({i1, i2}, out);
pls::scheduler::wait_for_all();
o1 = std::get<0>(out);
} else {
o1 = i2;
}
};
function_node<inputs<int, int>, outputs<int>, decltype(recursion)> recursion_node{recursion};
// Connect // Connect
graph.input<0>() >> node1.in_port<0>(); graph.input<0>() >> minus_one_node.in_port<0>();
graph.input<1>() >> node1.in_port<1>(); minus_one_node.out_port<0>() >> recursion_node.in_port<0>();
node1.out_port<0>() >> node2.in_port<0>(); graph.input<1>() >> triple_node.in_port<0>();
triple_node.out_port<0>() >> recursion_node.in_port<1>();
node2.out_port<0>() >> graph.output<0>(); recursion_node.out_port<0>() >> graph.output<0>();
// Build // Build
graph.build(); graph.build();
// Execute pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18u};
graph.run({1, 2}); pls::scheduler scheduler{&my_scheduler_memory, 8};
graph.run({1, 1}); scheduler.perform_work([&] {
graph.run({5, 6}); // Schedule Execution
std::tuple<int> out1, out2, out3;
graph.run({1, 2}, out1);
graph.run({1, 1}, out2);
graph.run({5, 6}, out3);
// Wait for results and print
pls::scheduler::wait_for_all();
std::cout << std::get<0>(out1) << std::endl;
std::cout << std::get<0>(out2) << std::endl;
std::cout << std::get<0>(out3) << std::endl;
});
} }
...@@ -49,7 +49,7 @@ add_library(pls STATIC ...@@ -49,7 +49,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/scheduling/lambda_task.h include/pls/internal/helpers/seqence.h) include/pls/internal/scheduling/lambda_task.h include/pls/internal/helpers/seqence.h include/pls/dataflow/internal/build_state.h)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
PUBLIC PUBLIC
......
...@@ -64,7 +64,7 @@ class function_node<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, ...@@ -64,7 +64,7 @@ class function_node<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0,
template<int POS, typename T> template<int POS, typename T>
void token_pushed(token<T> token) { void token_pushed(token<T> token) {
auto current_memory = get_invocation<invocation_memory>(token); auto current_memory = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(current_memory->input_buffer_) = token; std::get<POS>(current_memory->input_buffer_) = token;
auto remaining_inputs = --(current_memory->inputs_missing_); auto remaining_inputs = --(current_memory->inputs_missing_);
...@@ -141,7 +141,6 @@ class function_node<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, ...@@ -141,7 +141,6 @@ class function_node<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0,
}; };
private: private:
// Input/Output ports
multi_in_port_type in_port_; multi_in_port_type in_port_;
multi_out_port_type out_port_; multi_out_port_type out_port_;
......
...@@ -6,12 +6,16 @@ ...@@ -6,12 +6,16 @@
#include <tuple> #include <tuple>
#include <iostream> #include <iostream>
#include "build_state.h"
#include "node.h" #include "node.h"
#include "function_node.h"
#include "in_port.h" #include "in_port.h"
#include "out_port.h" #include "out_port.h"
#include "inputs.h" #include "inputs.h"
#include "outputs.h" #include "outputs.h"
#include "pls/pls.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
...@@ -25,126 +29,70 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -25,126 +29,70 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
// Our own type // Our own type
using self_type = graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>>; using self_type = graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>>;
// Callbacks for input/output ports
struct in_port_cb {
self_type *self_;
template<int POS, typename T>
void token_pushed(token<T> token) {
self_->in_port_token_pushed<POS, T>(token);
}
};
struct out_port_cb {
self_type *self_;
template<int POS, typename T>
void token_pushed(token<T> token) {
self_->out_port_token_pushed<POS, T>(token);
}
};
// Input-Output port types (external)
using multi_in_port_type = multi_in_port<in_port_cb, 0, I0, I...>;
using multi_out_port_type = multi_out_port<O0, O...>;
// Input-Output port types (internal) // Input-Output port types (internal)
using internal_in_port_type = multi_out_port<I0, I...>; using inputs_type = multi_out_port<I0, I...>;
using internal_out_port_type = multi_in_port<out_port_cb, 0, O0, O...>; using outputs_type = multi_in_port<self_type, 0, O0, O...>;
// Input-Output value tuples // Input-Output value tuples
using value_input_tuple = std::tuple<I0, I...>; using value_input_tuple = std::tuple<I0, I...>;
using input_tuple = std::tuple<token < I0>, token<I>...>; using input_tuple = std::tuple<token < I0>, token<I>...>;
using value_output_tuple = std::tuple<O0, O...>;
using output_tuple = std::tuple<token < O0>, token<O>...>; using output_tuple = std::tuple<token < O0>, token<O>...>;
static constexpr int num_in_ports = std::tuple_size<input_tuple>(); static constexpr int num_in_ports = std::tuple_size<input_tuple>();
static constexpr int num_out_ports = std::tuple_size<output_tuple>(); static constexpr int num_out_ports = std::tuple_size<output_tuple>();
// Memory used for ONE active invocation of the dataflow-graph // Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory { struct invocation_memory {
std::atomic<int> inputs_missing_; value_output_tuple *output_buffer_;
input_tuple input_buffer_;
}; };
public: public:
template<int POS> template<int POS>
using in_port_at = typename multi_in_port_type::template in_port_type_at<POS>; using input_at = typename inputs_type::template out_port_type_at<POS>;
template<int POS>
using out_port_at = typename multi_out_port_type::template out_port_type_at<POS>;
template<int POS>
in_port_at<POS> &in_port() {
return in_port_.template get<POS>();
}
template<int POS>
out_port_at<POS> &out_port() {
return out_port_.template get<POS>();
}
template<int POS, typename T>
void in_port_token_pushed(token<T> token) {
auto invocation = get_invocation<invocation_memory>(token);
// TODO: Invoke a new instance of our graph (recursive call) when all inputs are here
std::cout << "Token pushed at " << POS << " with value " << token.value() << std::endl;
auto remaining_inputs = --(invocation->inputs_missing_);
if (remaining_inputs == 0) {
std::cout << "All inputs there!" << std::endl;
}
}
template<int POS>
using internal_in_port_at = typename internal_in_port_type::template out_port_type_at<POS>;
template<int POS> template<int POS>
using internal_out_port_at = typename internal_out_port_type::template in_port_type_at<POS>; using output_at = typename outputs_type::template in_port_type_at<POS>;
template<int POS> template<int POS>
internal_in_port_at<POS> &input() { input_at<POS> &input() {
return internal_in_port_.template get<POS>(); return inputs_.template get<POS>();
} }
template<int POS> template<int POS>
internal_out_port_at<POS> &output() { output_at<POS> &output() {
return internal_out_port_.template get<POS>(); return outputs_.template get<POS>();
} }
template<int POS, typename T> template<int POS, typename T>
void out_port_token_pushed(token<T> token) { void token_pushed(token<T> token) {
auto invocation = get_invocation<invocation_memory>(token); auto invocation = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(*invocation->output_buffer_) = token.value();
std::cout << "Output produced at port " << POS << " with value " << token.value() << std::endl;
auto remaining_inputs = --(invocation->inputs_missing_);
if (remaining_inputs == 0) {
std::cout << "All inputs there!" << std::endl;
}
} }
graph() : in_port_cb_{this}, graph() : inputs_{}, outputs_{this, this} {}
out_port_cb_{this},
in_port_{this, &in_port_cb_},
out_port_{},
internal_in_port_{},
internal_out_port_{this, &out_port_cb_} {
}
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
// Graph building // Graph building
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
void build() { void build() {
state_ = building; PLS_ASSERT(build_state_ == build_state::fresh, "Must only build a dataflow graph once!")
PLS_ASSERT(is_internal_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!") PLS_ASSERT(is_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!")
num_nodes_ = 0; build_state_ = build_state::building;
node_list_start_ = node_list_current_ = this;
memory_index_ = 0;
num_nodes_ = 1;
for (int i = 0; i < num_in_ports; i++) { for (int i = 0; i < num_in_ports; i++) {
build_recursive(internal_in_port_.next_node_at(i)); build_recursive(inputs_.next_node_at(i));
} }
build_state_ = build_state::built;
state_ = built;
} }
node *start_{nullptr};
node *current_{nullptr};
void build_recursive(node *node) { void build_recursive(node *node) {
if (node->state_ != fresh) { if (node->build_state_ != build_state::fresh) {
return; // Already visited return; // Already visited
} }
node->state_ = building; node->build_state_ = build_state::building;
PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graph nodes!") PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graph nodes!")
add_node(node); add_node(node);
...@@ -152,34 +100,33 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -152,34 +100,33 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
build_recursive(node->successor_at(i)); build_recursive(node->successor_at(i));
} }
node->state_ = built; node->build_state_ = build_state::built;
} }
void add_node(node *new_node) { void add_node(node *new_node) {
new_node->memory_index_ = num_nodes_++; new_node->memory_index_ = num_nodes_++;
if (start_ == nullptr) { if (node_list_current_ == nullptr) {
start_ = new_node; node_list_current_ = new_node;
current_ = new_node; node_list_start_ = new_node;
} else { } else {
current_->direct_successor_ = new_node; node_list_current_->direct_successor_ = new_node;
current_ = new_node; node_list_current_ = new_node;
} }
} }
// TODO: Clean up + move to async task
template<int N, typename ...IT> template<int N, typename ...IT>
struct feed_inputs { struct feed_inputs {
feed_inputs(internal_in_port_type &, value_input_tuple &, invocation_info &) {} feed_inputs(inputs_type &, value_input_tuple &, invocation_info &) {}
void run() {} void run() {}
}; };
template<int N, typename IT1, typename ...IT> template<int N, typename IT1, typename ...IT>
struct feed_inputs<N, IT1, IT...> { struct feed_inputs<N, IT1, IT...> {
internal_in_port_type &inputs_; inputs_type &inputs_;
value_input_tuple &input_values_; value_input_tuple &input_values_;
invocation_info &invocation_; invocation_info &invocation_;
feed_inputs(internal_in_port_type &inputs, feed_inputs(inputs_type &inputs,
value_input_tuple &input_values, value_input_tuple &input_values,
invocation_info &invocation) : inputs_{inputs}, invocation_info &invocation) : inputs_{inputs},
input_values_{input_values}, input_values_{input_values},
...@@ -191,38 +138,56 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -191,38 +138,56 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
} }
}; };
void run(value_input_tuple input) { class run_graph_task : public pls::task {
// TODO: clearly move this onto the task stack instead fo malloc (without any free...) graph *self_;
void **buffers = reinterpret_cast<void **>(malloc(num_nodes_ * sizeof(void *))); value_input_tuple input_;
node *iterator = start_; value_output_tuple *output_;
for (int i = 0; i < num_nodes_; i++) {
auto required_size = iterator->instance_buffer_size(); // Buffers for actual execution
buffers[i] = malloc(required_size); invocation_info invocation_;
iterator->init_instance_buffer(buffers[i]);
public:
run_graph_task(graph *self, value_input_tuple &input, value_output_tuple *output) : self_{self},
input_{input},
output_{output},
invocation_{nullptr} {
void **buffers;
buffers = reinterpret_cast<void **>(allocate_memory(self_->num_nodes_ * sizeof(void *)));
node *iterator = self_->node_list_start_;
for (int i = 0; i < self_->num_nodes_; i++) {
auto required_size = iterator->instance_buffer_size();
buffers[i] = allocate_memory(required_size);
iterator->init_instance_buffer(buffers[i]);
iterator = iterator->direct_successor_;
}
invocation_ = invocation_info{buffers};
self_->get_invocation<invocation_memory>(invocation_)->output_buffer_ = output;
}
iterator = iterator->direct_successor_; void execute_internal() override {
feed_inputs<0, I0, I...>{self_->inputs_, input_, invocation_}.run();
} }
};
invocation_info invocation{buffers}; void run(value_input_tuple input, value_output_tuple &output) {
feed_inputs<0, I0, I...>{internal_in_port_, input, invocation}.run(); pls::scheduler::spawn_child<run_graph_task>(this, input, &output);
} }
////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////
// Overrides for generic node functionality (building the graph) // Overrides for generic node functionality (building the graph)
////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////
int num_successors() const override { int num_successors() const override {
return num_out_ports; return 0;
} }
node *successor_at(int pos) const override { node *successor_at(int pos) const override {
return out_port_.next_node_at(pos); PLS_ERROR("A graph instance has no direct successor!")
} }
bool is_fully_connected() const override { bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected(); return inputs_.fully_connected() && outputs_.fully_connected();
}
bool is_internal_fully_connected() const {
return internal_in_port_.fully_connected() && internal_out_port_.fully_connected();
} }
int instance_buffer_size() const override { int instance_buffer_size() const override {
...@@ -230,20 +195,15 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : ...@@ -230,20 +195,15 @@ class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> :
} }
void init_instance_buffer(void *memory) const override { void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{}; auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = num_in_ports;
}; };
private: private:
// Input/Output ports inputs_type inputs_;
in_port_cb in_port_cb_; outputs_type outputs_;
out_port_cb out_port_cb_;
multi_in_port_type in_port_;
multi_out_port_type out_port_;
internal_in_port_type internal_in_port_;
internal_out_port_type internal_out_port_;
// Information about building the graph
node *node_list_start_{nullptr};
node *node_list_current_{nullptr};
int num_nodes_{0}; int num_nodes_{0};
}; };
......
...@@ -2,17 +2,22 @@ ...@@ -2,17 +2,22 @@
#ifndef PLS_DATAFLOW_INTERNAL_NODE_H_ #ifndef PLS_DATAFLOW_INTERNAL_NODE_H_
#define PLS_DATAFLOW_INTERNAL_NODE_H_ #define PLS_DATAFLOW_INTERNAL_NODE_H_
#include "build_state.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
/**
* Represents a single piece included in a dataflow graph.
* This can be anything connected to form the dataflow that requires
* memory during invocation and is present in the connected flow graph.
*/
class node { class node {
template<typename INS, typename OUTS> template<typename INS, typename OUTS>
friend friend
class graph; class graph;
enum state { fresh, building, built, teardown };
public: public:
virtual int num_successors() const = 0; virtual int num_successors() const = 0;
virtual node *successor_at(int pos) const = 0; virtual node *successor_at(int pos) const = 0;
...@@ -22,18 +27,15 @@ class node { ...@@ -22,18 +27,15 @@ class node {
virtual bool is_fully_connected() const = 0; virtual bool is_fully_connected() const = 0;
template<typename M, typename T> template<typename M>
M *get_invocation(token<T> token) { M *get_invocation(invocation_info invocation) {
return token.invocation().template get_instance_buffer<M>(memory_index_); return invocation.template get_instance_buffer<M>(memory_index_);
} }
// TODO: Remove
void set_memory_index(int i) { memory_index_ = i; }
private: private:
int memory_index_{0}; int memory_index_{0};
node *direct_successor_{nullptr}; node *direct_successor_{nullptr};
state state_{fresh}; build_state build_state_{build_state::fresh};
}; };
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment