Commit 5978c910 by FritzFlorian

Remove old data-flow code from PLS_v1.

The code could build up a statically type checked data-flow graph without memory allocation.
It might be worth wile to bring the code back in future iterations, however, for now it does not work and clutters up the project.
parent 7a8f320b
Pipeline #1589 passed with stages
in 4 minutes 43 seconds
#ifndef PLS_DATAFLOW_DATAFLOW_H_
#define PLS_DATAFLOW_DATAFLOW_H_
#include "internal/graph.h"
#include "internal/function_node.h"
#include "internal/merge_node.h"
#include "internal/switch_node.h"
#include "internal/split_node.h"
#include "internal/inputs.h"
#include "internal/outputs.h"
namespace pls {
namespace dataflow {
template<typename INS, typename OUTS>
using graph = internal::graph<INS, OUTS>;
template<typename INS, typename OUTS, typename F>
using function_node = internal::function_node<INS, OUTS, F>;
template<typename I>
using merge_node = internal::merge_node<I>;
template<typename I>
using switch_node = internal::switch_node<I>;
template<typename I>
using split_node = internal::split_node<I>;
template<typename ...I>
using inputs = internal::inputs<I...>;
template<typename ...O>
using outputs = internal::outputs<O...>;
}
}
#endif //PLS_DATAFLOW_DATAFLOW_H_
#ifndef PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_
#define PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_
namespace pls {
namespace dataflow {
namespace internal {
enum class build_state { fresh, building, built, teardown };
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_
#ifndef PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_
#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_
#include <tuple>
#include <iostream>
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "inputs.h"
#include "outputs.h"
#include "pls/internal/helpers/seqence.h"
#include "pls/pls.h"
namespace pls {
namespace dataflow {
namespace internal {
using namespace pls::internal::helpers;
// Forward Decl
template<typename INS, typename OUTS>
class graph;
template<typename INS, typename OUTS, typename F>
class function_node {};
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
class function_node<inputs<I0, I...>, outputs<O0, O...>, F> : public node {
private:
// Our own type
using self_type = function_node<inputs<I0, I...>, outputs<O0, O...>, F>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I0, I...>;
using multi_out_port_type = multi_out_port<O0, O...>;
// Input-Output value tuples
using input_tuple = std::tuple<token<I0>, token<I>...>;
using output_tuple = std::tuple<token<O0>, token<O>...>;
static constexpr int num_in_ports = std::tuple_size<input_tuple>();
static constexpr int num_out_ports = std::tuple_size<output_tuple>();
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
std::atomic<int> inputs_missing_;
input_tuple input_buffer_;
};
public:
explicit function_node(F function) : in_port_{this, this}, out_port_{}, function_{function} {}
template<int POS>
using in_port_at = typename multi_in_port_type::template in_port_type_at<POS>;
template<int POS>
using out_port_at = typename multi_out_port_type::template out_port_type_at<POS>;
template<int POS>
in_port_at<POS> &in_port() {
return in_port_.template get<POS>();
}
multi_in_port_type &in_ports() {
return in_port_;
}
template<int POS>
out_port_at<POS> &out_port() {
return out_port_.template get<POS>();
}
multi_out_port_type &out_ports() {
return out_port_;
}
template<typename ...OS, typename FUNC>
function_node<inputs<O0, O...>, outputs<OS...>, FUNC>
&operator>>(function_node<inputs<O0, O...>, outputs<OS...>, FUNC> &other_node);
template<typename ...IS>
void operator>>(graph<inputs<IS...>, outputs<O0, O...>> &graph);
template<int POS, typename T>
void token_pushed(token<T> token);
int num_successors() const override {
return num_out_ports;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = num_in_ports;
};
void clean_up_instance_buffer(void *memory) const override {
auto invocation = reinterpret_cast<invocation_memory *>(memory);
invocation->~invocation_memory();
}
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
F function_;
//////////////////////////////////////////////////////////////////
// Helpers for actually calling the work lambda
//////////////////////////////////////////////////////////////////
void execute_function(invocation_memory *invocation_memory, invocation_info invocation_info);
template<int N, typename ...OT>
struct propagate_output;
template<int ...IS, int ...OS>
void execute_function_internal(input_tuple &inputs, sequence<IS...>,
output_tuple &outputs, sequence<OS...>,
invocation_info invocation_info);
};
}
}
}
#include "function_node_impl.h"
#endif //PLS_DATAFLOW_INTERNAL_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_
#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_
#include "graph.h"
#include "pls/internal/helpers/easy_profiler.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<typename ...OS, typename FUNC>
function_node<inputs<O0, O...>, outputs<OS...>, FUNC>&
function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
operator>>(function_node<inputs<O0, O...>, outputs<OS...>, FUNC> &other_node) {
out_port_ >> other_node.in_ports();
return other_node;
}
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<typename ...IS>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
operator>>(graph<inputs<IS...>, outputs<O0, O...>> &graph) {
out_port_ >> graph.output_ports();
}
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<int POS, typename T>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
token_pushed(token<T> token) {
auto current_memory = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(current_memory->input_buffer_) = token;
auto remaining_inputs = --(current_memory->inputs_missing_);
if (remaining_inputs == 0) {
execute_function(current_memory, token.invocation());
current_memory->inputs_missing_ = num_in_ports;
}
}
//////////////////////////////////////////////////////////////////
// Helpers for actually calling the work lambda
//////////////////////////////////////////////////////////////////
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
execute_function(invocation_memory *invocation_memory, invocation_info invocation_info) {
auto lambda = [=]() {
PROFILE_WORK_BLOCK("Function Node")
input_tuple &inputs = invocation_memory->input_buffer_;
output_tuple outputs{};
execute_function_internal(inputs, typename sequence_gen<1 + sizeof...(I)>::type(),
outputs, typename sequence_gen<1 + sizeof...(O)>::type(), invocation_info);
PROFILE_END_BLOCK
};
// TODO: maybe replace this with 'continuation' style invocation
pls::scheduler::spawn_child<pls::lambda_task_by_value<decltype(lambda)>>(lambda);
}
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<int N, typename ...OT>
struct function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
propagate_output {
propagate_output(multi_out_port_type &, output_tuple &, invocation_info&) {}
void propagate() {}
};
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<int N, typename OT1, typename ...OT>
struct function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
propagate_output<N, OT1, OT...> {
multi_out_port_type &out_port_;
output_tuple &output_tuple_;
invocation_info &invocation_info_;
propagate_output(multi_out_port_type &out_port, output_tuple &output_tuple, invocation_info& invocation_info) :
out_port_{out_port}, output_tuple_{output_tuple}, invocation_info_{invocation_info} {}
void propagate() {
std::get<N>(output_tuple_).set_invocation(invocation_info_);
out_port_.template get<N>().push_token(std::get<N>(output_tuple_));
propagate_output<N + 1, OT...>{out_port_, output_tuple_, invocation_info_}.propagate();
}
};
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<int ...IS, int ...OS>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
execute_function_internal(input_tuple &inputs, sequence<IS...>,
output_tuple &outputs, sequence<OS...>,
invocation_info invocation_info) {
function_(std::get<IS>(inputs).value()..., std::get<OS>(outputs).value()...);
propagate_output<0, O0, O...>{out_port_, outputs, invocation_info}.propagate();
}
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_
#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_H_
#define PLS_DATAFLOW_INTERNAL_GRAPH_H_
#include <atomic>
#include <tuple>
#include <iostream>
#include "build_state.h"
#include "node.h"
#include "function_node.h"
#include "in_port.h"
#include "out_port.h"
#include "inputs.h"
#include "outputs.h"
#include "pls/pls.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename INS, typename OUTS>
class graph {};
template<typename I0, typename ...I, typename O0, typename ...O>
class graph<inputs<I0, I...>, outputs<O0, O...>> : public node {
template<typename INS, typename OUTS, typename FUNC>
friend
class function_node;
private:
// Our own type
using self_type = graph<inputs<I0, I...>, outputs<O0, O...>>;
// Input-Output port types (internal)
using inputs_type = multi_out_port<I0, I...>;
using outputs_type = multi_in_port<self_type, 0, O0, O...>;
// Input-Output value tuples
using value_input_tuple = std::tuple<I0, I...>;
using input_tuple = std::tuple<token<I0>, token<I>...>;
using value_output_tuple = std::tuple<O0, O...>;
using output_tuple = std::tuple<token<O0>, token<O>...>;
static constexpr int num_in_ports = std::tuple_size<input_tuple>();
static constexpr int num_out_ports = std::tuple_size<output_tuple>();
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
value_output_tuple *output_buffer_;
};
public:
template<int POS>
using input_at = typename inputs_type::template out_port_type_at<POS>;
template<int POS>
using output_at = typename outputs_type::template in_port_type_at<POS>;
template<int POS>
input_at<POS> &input() {
return inputs_.template get<POS>();
}
inputs_type &input_ports() {
return inputs_;
}
template<int POS>
output_at<POS> &output() {
return outputs_.template get<POS>();
}
outputs_type &output_ports() {
return outputs_;
}
template<typename ...OS, typename FUNC>
function_node<inputs<I0, I...>, outputs<OS...>, FUNC>
&operator>>(function_node<inputs<I0, I...>, outputs<OS...>, FUNC> &other_node);
void wait_for_all() {
pls::scheduler::wait_for_all();
}
template<int POS, typename T>
void token_pushed(token<T> token);
graph() : inputs_{}, outputs_{this, this} {}
void build();
void run(value_input_tuple input, value_output_tuple *output) {
PLS_ASSERT(build_state_ == build_state::built, "Must build graph before running it!")
const auto lambda = [=]() {
pls::scheduler::spawn_child<run_graph_task>(this, input, output);
};
pls::scheduler::spawn_child<lambda_task_by_value<decltype(lambda)>>(lambda);
}
int num_successors() const override {
return 0;
}
node *successor_at(int) const override {
PLS_ERROR("A graph instance has no direct successor!")
}
bool is_fully_connected() const override {
return inputs_.fully_connected() && outputs_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
new(memory) invocation_memory{};
};
void clean_up_instance_buffer(void *memory) const override {
auto invocation = reinterpret_cast<invocation_memory *>(memory);
invocation->~invocation_memory();
}
private:
inputs_type inputs_;
outputs_type outputs_;
// Information about building the graph
node *node_list_start_{nullptr};
node *node_list_current_{nullptr};
int num_nodes_{0};
// Internals required for building and running
void build_recursive(node *node);
void add_node(node *new_node);
template<int N, typename ...IT>
struct feed_inputs;
class run_graph_task;
};
}
}
}
#include "graph_impl.h"
#endif //PLS_DATAFLOW_INTERNAL_GRAPH_H_
#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_
#define PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_
#include "pls/internal/helpers/easy_profiler.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I0, typename ...I, typename O0, typename ...O>
template<typename ...OS, typename FUNC>
function_node<inputs<I0, I...>, outputs<OS...>, FUNC> &graph<inputs<I0, I...>, outputs<O0, O...>>::
operator>>(function_node<inputs<I0, I...>, outputs<OS...>, FUNC> &other_node) {
inputs_ >> other_node.in_ports();
return other_node;
}
template<typename I0, typename ...I, typename O0, typename ...O>
template<int POS, typename T>
void graph<inputs<I0, I...>, outputs<O0, O...>>::
token_pushed(token<T> token) {
auto invocation = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(*invocation->output_buffer_) = token.value();
}
template<typename I0, typename ...I, typename O0, typename ...O>
void graph<inputs<I0, I...>, outputs<O0, O...>>::
build() {
PLS_ASSERT(build_state_ == build_state::fresh, "Must only build a dataflow graph once!")
PLS_ASSERT(is_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!")
build_state_ = build_state::building;
node_list_start_ = node_list_current_ = this;
memory_index_ = 0;
num_nodes_ = 1;
for (int i = 0; i < num_in_ports; i++) {
build_recursive(inputs_.next_node_at(i));
}
build_state_ = build_state::built;
}
template<typename I0, typename ...I, typename O0, typename ...O>
void graph<inputs<I0, I...>, outputs<O0, O...>>::
build_recursive(node *node) {
if (node->build_state_ != build_state::fresh) {
return; // Already visited
}
node->build_state_ = build_state::building;
PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graph nodes!")
add_node(node);
for (int i = 0; i < node->num_successors(); i++) {
build_recursive(node->successor_at(i));
}
node->build_state_ = build_state::built;
}
template<typename I0, typename ...I, typename O0, typename ...O>
void graph<inputs<I0, I...>, outputs<O0, O...>>::
add_node(node *new_node) {
new_node->memory_index_ = num_nodes_++;
if (node_list_current_ == nullptr) {
node_list_current_ = new_node;
node_list_start_ = new_node;
} else {
node_list_current_->direct_successor_ = new_node;
node_list_current_ = new_node;
}
}
template<typename I0, typename ...I, typename O0, typename ...O>
template<int N, typename ...IT>
struct graph<inputs<I0, I...>, outputs<O0, O...>>::
feed_inputs {
feed_inputs(inputs_type &, value_input_tuple &, invocation_info &) {}
void run() {}
};
template<typename I0, typename ...I, typename O0, typename ...O>
template<int N, typename IT1, typename ...IT>
struct graph<inputs<I0, I...>, outputs<O0, O...>>::
feed_inputs<N, IT1, IT...> {
inputs_type &inputs_;
value_input_tuple &input_values_;
invocation_info &invocation_;
feed_inputs(inputs_type &inputs,
value_input_tuple &input_values,
invocation_info &invocation) : inputs_{inputs},
input_values_{input_values},
invocation_{invocation} {}
void run() {
inputs_.template get<N>().push_token(token<IT1>{std::get<N>(input_values_), invocation_});
feed_inputs<N + 1, IT...>{inputs_, input_values_, invocation_}.run();
}
};
template<typename I0, typename ...I, typename O0, typename ...O>
class graph<inputs<I0, I...>, outputs<O0, O...>>::run_graph_task : public pls::task {
graph *self_;
value_input_tuple input_;
value_output_tuple *output_;
// Buffers for actual execution
invocation_info invocation_;
public:
run_graph_task(graph *self, value_input_tuple input, value_output_tuple *output) : self_{self},
input_{input},
output_{output},
invocation_{nullptr} {
void **buffers;
buffers = reinterpret_cast<void **>(allocate_memory(self_->num_nodes_ * sizeof(void *)));
node *iterator = self_->node_list_start_;
for (int i = 0; i < self_->num_nodes_; i++) {
auto required_size = iterator->instance_buffer_size();
buffers[i] = allocate_memory(required_size);
iterator->init_instance_buffer(buffers[i]);
iterator = iterator->direct_successor_;
}
invocation_ = invocation_info{buffers};
self_->get_invocation<invocation_memory>(invocation_)->output_buffer_ = output;
}
~run_graph_task() {
node *iterator = self_->node_list_start_;
for (int i = 0; i < self_->num_nodes_; i++) {
void* memory = invocation_.get_instance_buffer<void>(i);
iterator->clean_up_instance_buffer(memory);
iterator = iterator->direct_successor_;
}
}
void execute_internal() override {
PROFILE_WORK_BLOCK("Graph Invocation")
feed_inputs<0, I0, I...>{self_->inputs_, input_, invocation_}.run();
wait_for_all();
this->~run_graph_task();
PROFILE_END_BLOCK
}
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_
#ifndef PLS_DATAFLOW_INTERNAL_INPUT_H_
#define PLS_DATAFLOW_INTERNAL_INPUT_H_
#include "pls/internal/base/error_handling.h"
#include "token.h"
#include "node.h"
namespace pls {
namespace dataflow {
namespace internal {
/**
* Represents a single, logical input port (no data store, simply signal propagation).
* @tparam T Type of the input port
*/
template<typename T>
class in_port {
template<typename OT>
friend
class out_port;
public:
explicit in_port(node *owning_node) : owning_node_{owning_node} {};
node *owning_node() const { return owning_node_; }
bool is_connected() const { return connected_; }
protected:
virtual void token_pushed(token<T> token) = 0;
private:
bool connected_{false};
node *const owning_node_;
void push_token(token<T> token) {
token_pushed(token);
}
void connect() {
if (connected_) {
PLS_ERROR("Must only connect on input once. Disconnect the output pointing to it before reconnecting.")
}
connected_ = true;
}
};
/**
* Represents multiple input ports bundled together (a tuple of inputs).
* Allows for a unified callback method to handle multiple typed inputs.
*
* template<int POS, typename T>
* void token_pushed(token<T> token) { Notified when tokens arrive }
*
* @tparam CB The class implementing a callback
* @tparam N Put 0 to start the recursive implementation
* @tparam I A variadic list of input types
*/
template<typename CB, int N, typename ...I>
class multi_in_port {
// end of template recursion
public:
explicit multi_in_port(node *, CB *) {};
bool fully_connected() const {
return true;
}
};
template<typename CB, int N, typename I0, typename ...I>
class multi_in_port<CB, N, I0, I...> : public in_port<I0> {
public:
// Helpers for managing recursive types
using my_type = multi_in_port<CB, N, I0, I...>;
using child_type = multi_in_port<CB, N + 1, I...>;
using value_type = I0;
explicit multi_in_port(node *owning_node, CB *cb) : in_port<I0>{owning_node}, rec_{owning_node, cb}, cb_{cb} {};
void token_pushed(token<I0> token) override {
cb_->template token_pushed<N, I0>(token);
}
// Helper struct required for recursive access to types by index
template<int POS, typename ...T>
struct type_at {
};
template<typename T0, typename ...T>
struct type_at<0, T0, T...> {
using type = T0;
using in_port_type = in_port<type>;
};
template<int POS, typename T0, typename ...T>
struct type_at<POS, T0, T...> {
using type = typename type_at<POS - 1, T...>::type;
using in_port_type = in_port<type>;
};
// Simple interface to get types by index
template<int POS>
using in_port_type_at = typename type_at<POS, I0, I...>::in_port_type;
template<int POS>
using value_type_at = typename type_at<POS, I0, I...>::type;
// Helper struct required for recursive access to input's by index
template<int POS, typename RES_T, typename MY_T>
struct get_at {
static RES_T &get(MY_T &self) {
return get_at<POS - 1, RES_T, typename MY_T::child_type>::get(self.rec_);
}
};
template<typename RESULT, typename CURRENT>
struct get_at<0, RESULT, CURRENT> {
static RESULT &get(CURRENT &self) {
return self;
}
};
// Simple interface to access input's by index
template<int POS>
in_port_type_at<POS> &get() {
return get_at<POS, in_port_type_at<POS>, my_type>::get(*this);
}
bool fully_connected() const {
return this->is_connected() && rec_.fully_connected();
}
child_type rec_;
CB *cb_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_INPUT_H_
#ifndef PLS_DATAFLOW_INTERNAL_INPUTS_H_
#define PLS_DATAFLOW_INTERNAL_INPUTS_H_
namespace pls {
namespace dataflow {
namespace internal {
template<typename I1, typename ...I>
struct inputs {
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_INPUTS_H_
#ifndef PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_
#define PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I>
class merge_node : public node {
// Our own type
using self_type = merge_node<I>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I, I, bool>;
using multi_out_port_type = multi_out_port<I>;
// Input-Output tuples
using input_tuple = std::tuple<token < I>, token<I>, token<bool>>;
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
std::atomic<unsigned int> inputs_missing_;
input_tuple input_buffer_;
};
// Encodings for current input state (needs to fit into an atomic)
static constexpr unsigned int IN_0_MISSING = 1u << 0u;
static constexpr unsigned int IN_1_MISSING = 1u << 1u;
static constexpr unsigned int COND_MISSING = 1u << 2u;
static constexpr unsigned int INITIAL_STATE = IN_0_MISSING + IN_1_MISSING + COND_MISSING;
public:
explicit merge_node() : in_port_{this, this}, out_port_{} {}
in_port<I> &true_in_port() {
return in_port_.template get<0>();
}
in_port<I> &false_in_port() {
return in_port_.template get<1>();
}
in_port<bool> &condition_in_port() {
return in_port_.template get<2>();
}
out_port<I> &value_out_port() {
return out_port_.template get<0>();
}
template<int POS, typename T>
void token_pushed(token<T> token) {
auto current_memory = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(current_memory->input_buffer_) = token;
unsigned int remaining_inputs;
if (POS == 0) {
remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_0_MISSING) - IN_0_MISSING;
} else if (POS == 1) {
remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_1_MISSING) - IN_1_MISSING;
} else {
remaining_inputs = (current_memory->inputs_missing_).fetch_sub(COND_MISSING) - COND_MISSING;
}
if ((remaining_inputs & COND_MISSING) == 0) {
if ((remaining_inputs & IN_0_MISSING) == 0) {
auto &data = std::get<0>(current_memory->input_buffer_);
value_out_port().push_token(data);
current_memory->inputs_missing_ += IN_0_MISSING + COND_MISSING;
} else if ((remaining_inputs & IN_1_MISSING) == 0) {
auto &data = std::get<1>(current_memory->input_buffer_);
value_out_port().push_token(data);
current_memory->inputs_missing_ += IN_1_MISSING + COND_MISSING;
}
}
}
int num_successors() const override {
return 1;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = INITIAL_STATE;
};
void clean_up_instance_buffer(void *memory) const override {
auto invocation = reinterpret_cast<invocation_memory *>(memory);
invocation->~invocation_memory();
}
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_NODE_H_
#define PLS_DATAFLOW_INTERNAL_NODE_H_
#include "build_state.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
/**
* Represents a single piece included in a dataflow graph.
* This can be anything connected to form the dataflow that requires
* memory during invocation and is present in the connected flow graph.
*/
class node {
template<typename INS, typename OUTS>
friend
class graph;
public:
virtual int num_successors() const = 0;
virtual node *successor_at(int pos) const = 0;
virtual int instance_buffer_size() const = 0;
virtual void init_instance_buffer(void *memory) const = 0;
virtual void clean_up_instance_buffer(void *memory) const = 0;
virtual bool is_fully_connected() const = 0;
template<typename M>
M *get_invocation(invocation_info invocation) {
return invocation.template get_instance_buffer<M>(memory_index_);
}
private:
int memory_index_{0};
node *direct_successor_{nullptr};
build_state build_state_{build_state::fresh};
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#define PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#include <tuple>
#include "in_port.h"
#include "node.h"
namespace pls {
namespace dataflow {
namespace internal {
template<class T>
class out_port {
public:
void connect(in_port<T> &target) {
if (connected_) {
PLS_ERROR("Must only connect output once. Please disconnect it before reconnecting.")
}
target.connect();
target_ = &target;
connected_ = true;
}
void operator>>(in_port<T> &input) {
connect(input);
}
void push_token(token<T> token) const {
target_->push_token(token);
}
node *next_node() const {
return target_->owning_node();
}
bool is_connected() const { return connected_; }
private:
bool connected_{false};
in_port<T> *target_{nullptr};
};
/**
* Represents multiple output ports bundled together (a tuple of inputs).
*
* @tparam I A variadic list of input types
*/
template<typename O0, typename ...O>
class multi_out_port {
private:
// Helpers for managing recursive types
using value_tuple_type = std::tuple<O0, O...>;
using out_port_tupel_type = std::tuple<out_port<O0>, out_port<O>...>;
static constexpr int num_outputs = std::tuple_size<value_tuple_type>();
public:
// Simple interface to get types by index
template<int POS>
using out_port_type_at = typename std::tuple_element<POS, out_port_tupel_type>::type;
template<int POS>
using value_type_at = typename std::tuple_element<POS, value_tuple_type>::type;
// Simple interface to access input's by index
template<int POS>
out_port_type_at<POS> &get() {
return std::get<POS>(outputs_);
}
// Simple interface to connect multiple intputs to matching, multiple outputs
template<typename CB>
void operator>>(multi_in_port<CB, 0, O0, O...> &input) {
connect_to < CB, 0 > {this, &input}.connect();
}
node *next_node_at(int pos) const {
return next_node < 0 > {&outputs_}.get(pos);
}
bool fully_connected() const {
return connected < 0 > {&outputs_}.get();
}
private:
out_port_tupel_type outputs_;
template<int POS, typename DUMMY=void>
struct next_node {
const out_port_tupel_type *outputs_;
node *get(int i) {
if (POS == i) {
return std::get<POS>(*outputs_).next_node();
} else {
return next_node<POS + 1>{outputs_}.get(i);
}
}
};
template<typename DUMMY>
struct next_node<num_outputs, DUMMY> {
const out_port_tupel_type *outputs_;
node *get(int) {
PLS_ERROR("Try to access invalid successor node index!")
}
};
template<int POS, typename DUMMY=void>
struct connected {
const out_port_tupel_type *outputs_;
bool get() {
bool self = std::get<POS>(*outputs_).is_connected();
bool children = connected<POS + 1>{outputs_}.get();
return self && children;
}
};
template<typename DUMMY>
struct connected<num_outputs, DUMMY> {
const out_port_tupel_type *outputs_;
bool get() {
return true;
}
};
template<typename CB, int POS, typename DUMMY=void>
struct connect_to {
multi_out_port<O0, O...> *out_port_;
multi_in_port<CB, 0, O0, O...> *in_port_;
void connect() {
out_port_->template get<POS>() >> in_port_->template get<POS>();
connect_to<CB, POS + 1>{out_port_, in_port_}.connect();
}
};
template<typename CB, typename DUMMY>
struct connect_to<CB, num_outputs, DUMMY> {
multi_out_port<O0, O...> *out_port_;
multi_in_port<CB, 0, O0, O...> *in_port_;
void connect() {
};
};
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#ifndef PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
#define PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
namespace pls {
namespace dataflow {
namespace internal {
template<typename O0, typename ...O>
struct outputs {
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
#ifndef PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_
#define PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I>
class split_node : public node {
// Our own type
using self_type = split_node<I>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I>;
using multi_out_port_type = multi_out_port<I, I>;
public:
explicit split_node() : in_port_{this, this}, out_port_{} {}
in_port<I> &value_in_port() {
return in_port_.template get<0>();
}
out_port<I> &out_port_1() {
return out_port_.template get<0>();
}
out_port<I> &out_port_2() {
return out_port_.template get<1>();
}
template<int POS, typename T>
void token_pushed(token<T> token) {
out_port_1().push_token(token);
out_port_2().push_token(token);
}
int num_successors() const override {
return 2;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return 0;
}
void init_instance_buffer(void *) const override {
// No need for memory, we simply forward entries without buffering
};
void clean_up_instance_buffer(void *memory) const override {
// No need for memory, we simply forward entries without buffering
}
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_
#define PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I>
class switch_node : public node {
// Our own type
using self_type = switch_node<I>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I, bool>;
using multi_out_port_type = multi_out_port<I, I>;
// Input-Output tuples
using input_tuple = std::tuple<token < I>, token<bool>>;
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
std::atomic<int> inputs_missing_;
input_tuple input_buffer_;
};
public:
explicit switch_node() : in_port_{this, this}, out_port_{} {}
in_port<I> &value_in_port() {
return in_port_.template get<0>();
}
in_port<bool> &condition_in_port() {
return in_port_.template get<1>();
}
out_port<I> &true_out_port() {
return out_port_.template get<0>();
}
out_port<I> &false_out_port() {
return out_port_.template get<1>();
}
template<int POS, typename T>
void token_pushed(token<T> token) {
auto current_memory = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(current_memory->input_buffer_) = token;
auto remaining_inputs = --(current_memory->inputs_missing_);
if (remaining_inputs == 0) {
bool condition = std::get<1>(current_memory->input_buffer_).value();
auto &data = std::get<0>(current_memory->input_buffer_);
if (condition) {
true_out_port().push_token(data);
} else {
false_out_port().push_token(data);
}
current_memory->inputs_missing_ = 2;
}
}
int num_successors() const override {
return 2;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = 2;
};
void clean_up_instance_buffer(void *memory) const override {
auto invocation = reinterpret_cast<invocation_memory *>(memory);
invocation->~invocation_memory();
}
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_TOKEN_H_
#define PLS_DATAFLOW_INTERNAL_TOKEN_H_
namespace pls {
namespace dataflow {
namespace internal {
class invocation_info {
public:
explicit invocation_info(void **memory) : memory_{memory} {};
invocation_info(invocation_info const &other) = default;
template<typename T>
T *get_instance_buffer(int pos) {
return reinterpret_cast<T *>(memory_[pos]);
}
private:
void **memory_;
};
template<typename T>
class token {
T value_;
invocation_info invocation_;
public:
token() : invocation_{nullptr} {}; // Default Constructor Stays uninitialized
token(T value, invocation_info color) : value_{value}, invocation_{color} {};
T &value() { return value_; }
invocation_info invocation() const { return invocation_; }
void set_invocation(invocation_info invocation_info) { invocation_ = invocation_info; }
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_TOKEN_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment