Commit 6150b321 by FritzFlorian

Pushing data through the new system works.

parent 94804014
Pipeline #1283 passed with stages
in 3 minutes 57 seconds
add_executable(benchmark_unbalanced main.cpp node.h node.cpp picosha2.h) add_executable(benchmark_unbalanced main.cpp node.h function_node.cpp picosha2.h)
target_link_libraries(benchmark_unbalanced pls) target_link_libraries(benchmark_unbalanced pls)
if (EASY_PROFILER) if (EASY_PROFILER)
target_link_libraries(benchmark_unbalanced easy_profiler) target_link_libraries(benchmark_unbalanced easy_profiler)
......
...@@ -6,48 +6,40 @@ ...@@ -6,48 +6,40 @@
#include <pls/dataflow/inputs.h> #include <pls/dataflow/inputs.h>
#include <pls/dataflow/outputs.h> #include <pls/dataflow/outputs.h>
#include <pls/dataflow/internal/graph.h> #include <pls/dataflow/internal/function_node.h>
#include <pls/dataflow/internal/out_port.h> #include <pls/dataflow/internal/out_port.h>
#include <pls/dataflow/internal/buffer.h>
int main() { int main() {
using namespace pls::dataflow; using namespace pls::dataflow;
using namespace pls::dataflow::internal; using namespace pls::dataflow::internal;
out_port<int> port1; out_port<int> external1;
out_port<int> port2; out_port<int> external2;
graph<static_buffer<4>::type, inputs<int, int>, outputs<int>> tmp{};
port1 >> tmp.in_port<0>(); auto func1 = [](const int &i1, const int &i2, int &o1) {
port2 >> tmp.in_port<1>(); std::cout << "Hello! " << i1 << ", " << i2 << std::endl;
o1 = i1 + i2;
};
function_node<inputs<int, int>, outputs<int>, decltype(func1)> node1{func1};
auto func2 = [](const int &i1, int &o1) {
std::cout << "We get! " << i1 << std::endl;
};
function_node<inputs<int>, outputs<int>, decltype(func2)> node2{func2};
port1.push_token({1, {}}); external1 >> node1.in_port<0>();
port2.push_token({2, {}}); external2 >> node1.in_port<1>();
node1.out_port<0>() >> node2.in_port<0>();
// using namespace pls::dataflow; // Simulate execution environment
// void *buffer1 = malloc(node1.instance_buffer_size());
// graph<inputs<int, int>, outputs<int, int>, 2> graph2; void *buffer2 = malloc(node2.instance_buffer_size());
// graph2.input<0>() >> graph2.output<1>(); void *memory[] = {buffer1, buffer2};
// graph2.input<1>() >> graph2.output<0>(); node1.init_instance_buffer(memory[0]);
// node1.set_memory_index(0);
// graph<inputs<int, int>, outputs<int, int>, 2> graph1; node2.init_instance_buffer(memory[1]);
// graph1.input<0>() >> graph2.external_input<0>(); node2.set_memory_index(1);
// graph1.input<1>() >> graph2.external_input<1>();
// graph2.external_output<0>() >> graph1.output<0>();
// graph2.external_output<1>() >> graph1.output<1>();
//
// graph1.push_input(1, 2);
// graph1.push_input(3, 4);
//
// auto result1 = graph1.get_output();
// std::cout << std::get<0>(result1) << ", " << std::get<1>(result1) << std::endl;
//
// graph1.push_input(5, 6);
//
// auto result2 = graph1.get_output();
// std::cout << std::get<0>(result2) << ", " << std::get<1>(result2) << std::endl;
//
// auto result3 = graph1.get_output();
// std::cout << std::get<0>(result3) << ", " << std::get<1>(result3) << std::endl;
invocation_info invocation{memory};
external1.push_token({1, invocation});
external2.push_token({2, invocation});
} }
...@@ -15,8 +15,9 @@ add_library(pls STATIC ...@@ -15,8 +15,9 @@ add_library(pls STATIC
include/pls/dataflow/internal/token.h include/pls/dataflow/internal/token.h
include/pls/dataflow/internal/in_port.h include/pls/dataflow/internal/in_port.h
include/pls/dataflow/internal/out_port.h include/pls/dataflow/internal/out_port.h
include/pls/dataflow/internal/function_node.h
include/pls/dataflow/internal/node.h
include/pls/dataflow/internal/graph.h include/pls/dataflow/internal/graph.h
include/pls/dataflow/internal/buffer.h
include/pls/internal/base/spin_lock.h include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
...@@ -48,8 +49,7 @@ add_library(pls STATIC ...@@ -48,8 +49,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/scheduling/lambda_task.h include/pls/internal/scheduling/lambda_task.h include/pls/internal/helpers/seqence.h)
)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
PUBLIC PUBLIC
......
#ifndef PLS_DATAFLOW_GRAPH_H_
#define PLS_DATAFLOW_GRAPH_H_
#include "inputs.h"
#include "outputs.h"
#include "input.h"
#include "output.h"
#include "internal/token.h"
namespace pls {
namespace dataflow {
template<typename I, typename O, int P>
class graph {
/**
* Boilerplate for callbacks of dataflow input/output handlers.
*/
struct external_input_cb {
graph *graph_;
void one_input(int pos, internal::token_color color) {
graph_->one_external_input(pos, color);
}
void all_inputs(internal::token_color color) {
graph_->all_external_inputs(color);
}
};
struct internal_output_cb {
graph *graph_;
void one_input(int pos, internal::token_color color) {
graph_->one_internal_output(pos, color);
}
void all_inputs(internal::token_color color) {
graph_->all_internal_outputs(color);
}
};
/**
* Handling of buffers for dataflow.
*/
using exeternal_inputs = typename I::template internal_inputs<P, graph<I, O, P>::external_input_cb>;
using external_outputs = typename O::template internal_outputs<P>;
exeternal_inputs external_inputs_{external_input_cb{this}};
external_outputs external_outputs_{};
void one_external_input(int pos, internal::token_color color) {
// We do currently not support 'single external inputs'.
// This prevents some fine-grained parallelism, but eases implementation
}
void all_external_inputs(internal::token_color external_color) {
auto my_clock = input_clock_++;
internal::token_color internal_color{my_clock, external_color.depth_ + 1};
auto my_index = internal_color.get_index(P);
while (output_clear_[my_index] != clear) {
PLS_ERROR("Full inputs are not implemented jet. Place waiting code in here/yield to scheduler.")
}
output_clear_[my_index] = waiting;
input_output_color_matching_[my_index] = {external_color, internal_color};
external_inputs_.push_to_outputs(internal_inputs_, external_color.get_index(P), internal_color);
}
using internal_outputs = typename I::template internal_inputs<P, graph<I, O, P>::internal_output_cb>;
using internal_inputs = typename O::template internal_outputs<P>;
internal_outputs internal_outputs_{internal_output_cb{this}};
internal_inputs internal_inputs_{};
void one_internal_output(int pos, internal::token_color color) {
// We do not support 'single flow' output. Every graph is well behaved, e.g. will emmit exactly one output tuple.
}
void all_internal_outputs(internal::token_color color) {
auto color_matching = input_output_color_matching_[color.get_index(P)];
auto external_color = std::get<0>(color_matching);
auto internal_color = std::get<1>(color_matching);
if (external_color == internal_color) {
output_clear_[color.get_index(P)] = finished;
} else {
internal_outputs_.push_to_outputs(external_outputs_, internal_color.get_index(P), external_color);
output_clear_[color.get_index(P)] = clear;
}
}
/**
* Handling of colors/maximum concurrent tokens in the system.
*/
std::atomic<unsigned int> input_clock_{0};
std::atomic<unsigned int> output_clock_{0};
std::array<std::pair<internal::token_color, internal::token_color>, P> input_output_color_matching_{};
enum output_state { clear, waiting, finished };
std::array<output_state, P> output_clear_{clear};
public:
template<int N>
using external_output_at = output<P, typename external_outputs::template raw_type_at<N>>;
template<int N>
using external_input_at = input<P, typename exeternal_inputs::template raw_type_at<N>>;
template<int N>
external_output_at<N> external_output() {
external_output_at<N> o{external_outputs_.template get<N>()};
return o;
}
template<int N>
external_input_at<N> external_input() {
external_input_at<N> i{external_inputs_.template get<N>()};
return i;
}
template<int N>
using input_at = output<P, typename internal_inputs::template raw_type_at<N>>;
template<int N>
using output_at = input<P, typename internal_outputs::template raw_type_at<N>>;
template<int N>
output_at<N> output() {
output_at<N> o{internal_outputs_.template get<N>()};
return o;
}
template<int N>
input_at<N> input() {
input_at<N> i{internal_inputs_.template get<N>()};
return i;
}
template<typename ...ARGS>
void push_input(ARGS ...values) {
auto my_clock = input_clock_++;
internal::token_color color{my_clock, 0};
auto my_index = color.get_index(P);
while (output_clear_[my_index] != clear) {
PLS_ERROR("Full inputs are not implemented jet. Place waiting code in here/yield to scheduler.")
}
output_clear_[my_index] = waiting;
input_output_color_matching_[my_index] = {color, color};
push_input_rec<0, ARGS...>(color, values...);
}
template<int POS, typename ...TAIL>
void push_input_rec(internal::token_color color) {}
template<int POS, typename HEAD, typename ...TAIL>
void push_input_rec(internal::token_color color, HEAD h, TAIL ...t) {
internal_inputs_.template get<POS>().push_token(internal::token<HEAD>{h, color});
push_input_rec<POS + 1, TAIL...>(color, t...);
}
typename internal_outputs::raw_types get_output() {
while (true) {
if (output_clock_ > input_clock_) {
PLS_ERROR("Must not try to pull outputs if there are not enough inputs.")
}
auto color_matching = input_output_color_matching_[output_clock_ % P];
if (std::get<0>(color_matching) == std::get<1>(color_matching)) {
break;
}
output_clock_++;
}
while (output_clear_[output_clock_ % P] != finished) {
PLS_ERROR("Blocking/Unfinishe outputs are not implemented jet. Place waiting code in here/yield to scheduler.")
}
auto result = internal_outputs_.get_outputs(output_clock_ % P);
output_clear_[output_clock_ % P] = clear;
output_clock_++;
return result;
}
};
}
}
#endif //PLS_DATAFLOW_GRAPH_H_
#ifndef PLS_DATAFLOW_INTERNAL_BUFFER_H_
#define PLS_DATAFLOW_INTERNAL_BUFFER_H_
#include <array>
#include "in_port.h"
namespace pls {
namespace dataflow {
namespace internal {
template<int P, typename T>
struct static_buffer_impl {
std::array<T, P> buffer_;
const T &operator[](size_t i) const { return buffer_[i]; }
T &operator[](size_t i) { return buffer_[i]; }
static_buffer_impl() : buffer_{} {};
explicit static_buffer_impl(T init) : buffer_(init) {};
int capacity() const {
return P;
}
};
template<int P>
struct static_buffer {
template<typename T>
using type = static_buffer_impl<P, T>;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_BUFFER_H_
#ifndef PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_
#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_
#include <tuple>
#include <iostream>
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "pls/dataflow/inputs.h"
#include "pls/dataflow/outputs.h"
#include "pls/internal/helpers/seqence.h"
namespace pls {
namespace dataflow {
namespace internal {
using namespace pls::internal::helpers;
template<typename INS, typename OUTS, typename F>
class function_node {};
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
class function_node<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>, F> : public node {
private:
// Our own type
using self_type = function_node<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>, F>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I0, I...>;
using multi_out_port_type = multi_out_port<O0, O...>;
// Input-Output value tuples
using input_tuple = std::tuple<token<I0>, token<I>...>;
using output_tuple = std::tuple<token<O0>, token<O>...>;
static constexpr int num_in_ports = std::tuple_size<input_tuple>();
static constexpr int num_out_ports = std::tuple_size<output_tuple>();
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
std::atomic<int> inputs_missing_;
input_tuple input_buffer_;
};
public:
template<int POS>
using in_port_at = typename multi_in_port_type::template in_port_type_at<POS>;
template<int POS>
using out_port_at = typename multi_out_port_type::template out_port_type_at<POS>;
template<int POS>
in_port_at<POS> &in_port() {
return in_port_.template get<POS>();
}
template<int POS>
out_port_at<POS> &out_port() {
return out_port_.template get<POS>();
}
template<int POS, typename T>
void token_pushed(token<T> token) {
auto current_memory = get_invocation<invocation_memory>(token);
std::get<POS>(current_memory->input_buffer_) = token;
auto remaining_inputs = --(current_memory->inputs_missing_);
if (remaining_inputs == 0) {
execute_function(current_memory, token.invocation());
current_memory->inputs_missing_ = num_in_ports;
}
}
void execute_function(invocation_memory *invocation_memory, invocation_info invocation_info) {
input_tuple &inputs = invocation_memory->input_buffer_;
output_tuple outputs;
execute_function_internal(inputs, typename sequence_gen<1 + sizeof...(I)>::type(),
outputs, typename sequence_gen<1 + sizeof...(O)>::type(), invocation_info);
}
template<typename T>
void set_invocation_info(token<T> &token, invocation_info invocation_info) {
token.set_invocation(invocation_info);
}
template<int N, typename ...OT>
struct propagate_output {
propagate_output(multi_out_port_type &, output_tuple &) {}
void propagate() {}
};
template<int N, typename OT1, typename ...OT>
struct propagate_output<N, OT1, OT...> {
multi_out_port_type &out_port_;
output_tuple &output_tuple_;
propagate_output(multi_out_port_type &out_port, output_tuple &output_tuple) : out_port_{out_port},
output_tuple_{output_tuple} {}
void propagate() {
out_port_.template get<N>().push_token(std::get<N>(output_tuple_));
propagate_output<N + 1, OT...>{out_port_, output_tuple_}.propagate();
}
};
template<int ...IS, int ...OS>
void execute_function_internal(input_tuple &inputs,
sequence<IS...>,
output_tuple &outputs,
sequence<OS...>,
invocation_info invocation_info) {
set_invocation_info(std::get<OS>(outputs)..., invocation_info);
function_(std::get<IS>(inputs).value()..., std::get<OS>(outputs).value()...);
propagate_output<0, O0, O...>{out_port_, outputs}.propagate();
}
explicit function_node(F function) : in_port_{this, this}, out_port_{}, function_{function} {}
//////////////////////////////////////////////////////////////////
// Overrides for generic node functionality (building the graph)
//////////////////////////////////////////////////////////////////
int num_successors() const override {
return num_out_ports;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = num_in_ports;
};
private:
// Input/Output ports
multi_in_port_type in_port_;
multi_out_port_type out_port_;
F function_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_NODE_H_
...@@ -2,11 +2,11 @@ ...@@ -2,11 +2,11 @@
#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_H_ #ifndef PLS_DATAFLOW_INTERNAL_GRAPH_H_
#define PLS_DATAFLOW_INTERNAL_GRAPH_H_ #define PLS_DATAFLOW_INTERNAL_GRAPH_H_
#include <atomic>
#include <tuple> #include <tuple>
#include <iostream> #include <iostream>
#include <atomic>
#include "buffer.h" #include "node.h"
#include "in_port.h" #include "in_port.h"
#include "out_port.h" #include "out_port.h"
...@@ -17,65 +17,32 @@ namespace pls { ...@@ -17,65 +17,32 @@ namespace pls {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
struct graph_invocation { template<typename INS, typename OUTS>
enum state { clear, running, finished };
explicit graph_invocation(int num_outputs) : num_outputs_{num_outputs}, outputs_missing_{num_outputs} {};
void reset() {
state_ = clear;
internal_call_ = false;
previous_color_ = {};
outputs_missing_ = num_outputs_;
}
const int num_outputs_;
std::atomic<int> outputs_missing_;
state state_{clear};
bool internal_call_{false};
token_color previous_color_{};
};
template<template<typename> class B, typename INS, typename OUTS>
class graph {}; class graph {};
template<template<typename> class B, typename I0, typename ...I, typename O0, typename ...O> template<typename I0, typename ...I, typename O0, typename ...O>
class graph<B, pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> { class graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> : public node {
private:
using self_type = graph<B, pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>>;
struct in_port_cb {
self_type &self_;
explicit in_port_cb(self_type &self) : self_{self} {}; private:
// Our own type
using self_type = graph<pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>>;
template<int POS, typename T> // Input-Output port types
void token_pushed(token<T> token) { using multi_in_port_type = multi_in_port<self_type, 0, I0, I...>;
self_.in_port_token_pushed<POS, T>(token); using multi_out_port_type = multi_out_port<O0, O...>;
}
};
struct output_cb {
const self_type &self_;
explicit output_cb(self_type &self) : self_{self} {}; // Input-Output value tuples
using input_tuple = std::tuple<token<I0>, token<I>...>;
using output_tuple = std::tuple<token<O0>, token<O>...>;
static constexpr int num_in_ports = std::tuple_size<input_tuple>();
static constexpr int num_out_ports = std::tuple_size<output_tuple>();
template<int POS, typename T> // Memory used for ONE active invocation of the dataflow-graph
void token_pushed(token<T> token) { struct invocation_memory {
self_.output_token_pushed<POS, T>(token); std::atomic<int> inputs_missing_;
} input_tuple input_buffer_;
}; };
// Type-Defs used internally
using my_type = graph<B, pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>>;
using multi_in_port_type = multi_in_port<in_port_cb, 0, I0, I...>;
using multi_out_port_type = multi_out_port<O0, O...>;
using input_tuple = std::tuple<I0, I...>;
using output_tuple = std::tuple<O0, O...>;
public: public:
template<int POS> template<int POS>
using in_port_at = typename multi_in_port_type::template in_port_type_at<POS>; using in_port_at = typename multi_in_port_type::template in_port_type_at<POS>;
...@@ -93,51 +60,92 @@ class graph<B, pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...> ...@@ -93,51 +60,92 @@ class graph<B, pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>
} }
template<int POS, typename T> template<int POS, typename T>
void in_port_token_pushed(token<T> token) { void token_pushed(token<T> token) {
auto my_clock = input_clock_++; auto invocation = get_invocation<invocation_memory>(token);
auto index = token.color().get_index(buffer_size_);
std::cout << "Token pushed at " << POS << " with value " << token.value() << std::endl; std::cout << "Token pushed at " << POS << " with value " << token.value() << std::endl;
auto remaining_inputs = --(invocation->inputs_missing_);
if (remaining_inputs == 0) {
std::cout << "All inputs there!" << std::endl;
}
}
// std::get<POS>(inputs_[index]) = token; graph() : in_port_{this, this}, out_port_{} {
//
// auto remaining = --inputs_required_[index];
// if (remaining == 0) {
// std::cout << "All tokens at clock " << token.color().clock_ << std::endl;
// }
} }
template<int POS, typename T> /////////////////////////////////////////////////////////////////
void output_token_pushed(token<T> token) { // Graph building
/////////////////////////////////////////////////////////////////
void build() {
state_ = building;
PLS_ASSERT(is_fully_connected(), "Must fully connect dataflow graphs!")
num_nodes_ = 0;
for (int i = 0; i < num_in_ports; i++) {
build_recursive(successor_at(i));
} }
graph() : in_port_cb_{*this}, in_port_{in_port_cb_}, out_port_{}, invocations_{} { state_ = built;
buffer_size_ = invocations_.capacity();
} }
private: node *start_{nullptr};
void reset_input(int i) { node *current_{nullptr};
inputs_missing_[i] =
void build_recursive(node *node) {
if (node->state_ != fresh) {
return; // Already visited
} }
node->state_ = building;
PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graphs")
// Input/Output ports add_node(node);
in_port_cb in_port_cb_; for (int i = 0; i < node->num_successors(); i++) {
multi_in_port_type in_port_; build_recursive(node->successor_at(i));
multi_out_port_type out_port_; }
node->state_ = built;
}
void add_node(node *new_node) {
new_node->memory_index_ = num_nodes_++;
const int buffer_size_; if (start_ == nullptr) {
start_ = new_node;
current_ = new_node;
} else {
current_->direct_successor_ = new_node;
current_ = new_node;
}
}
// Clocks and state for execution //////////////////////////////////////////////////////////////////
std::atomic<unsigned int> input_clock_{0}; // Overrides for generic node functionality (building the graph)
std::atomic<unsigned int> output_clock_{0}; //////////////////////////////////////////////////////////////////
int num_successors() const override {
return num_out_ports;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
B<input_tuple> input_buffer_; bool is_fully_connected() const override {
B<std::atomic<int>> inputs_missing_; return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = num_in_ports;
};
private:
// Input/Output ports
multi_in_port_type in_port_;
multi_out_port_type out_port_;
B<graph_invocation> invocations_; int num_nodes_{0};
B<output_tuple> output_buffer_;
}; };
} }
......
...@@ -3,7 +3,9 @@ ...@@ -3,7 +3,9 @@
#define PLS_DATAFLOW_INTERNAL_INPUT_H_ #define PLS_DATAFLOW_INTERNAL_INPUT_H_
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include "token.h" #include "token.h"
#include "node.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
...@@ -19,11 +21,17 @@ class in_port { ...@@ -19,11 +21,17 @@ class in_port {
friend friend
class out_port; class out_port;
public:
explicit in_port(node *owning_node) : owning_node_{owning_node} {};
node *owning_node() const { return owning_node_; }
bool is_connected() const { return connected_; }
protected: protected:
virtual void token_pushed(token<T> token) = 0; virtual void token_pushed(token<T> token) = 0;
private: private:
bool connected_{false}; bool connected_{false};
node *const owning_node_;
void push_token(token<T> token) { void push_token(token<T> token) {
token_pushed(token); token_pushed(token);
...@@ -52,7 +60,11 @@ template<typename CB, int N, typename ...I> ...@@ -52,7 +60,11 @@ template<typename CB, int N, typename ...I>
class multi_in_port { class multi_in_port {
// end of template recursion // end of template recursion
public: public:
explicit multi_in_port(CB &) {}; explicit multi_in_port(node *, CB *) {};
bool fully_connected() const {
return true;
}
}; };
template<typename CB, int N, typename I0, typename ...I> template<typename CB, int N, typename I0, typename ...I>
class multi_in_port<CB, N, I0, I...> : public in_port<I0> { class multi_in_port<CB, N, I0, I...> : public in_port<I0> {
...@@ -63,10 +75,10 @@ class multi_in_port<CB, N, I0, I...> : public in_port<I0> { ...@@ -63,10 +75,10 @@ class multi_in_port<CB, N, I0, I...> : public in_port<I0> {
using value_type = I0; using value_type = I0;
public: public:
explicit multi_in_port(CB &cb) : cb_{cb}, rec_{cb} {}; explicit multi_in_port(node *owning_node, CB *cb) : in_port<I0>{owning_node}, cb_{cb}, rec_{owning_node, cb} {};
void token_pushed(token<I0> token) override { void token_pushed(token<I0> token) override {
cb_.template token_pushed<N, I0>(token); cb_->template token_pushed<N, I0>(token);
} }
// Helper struct required for recursive access to types by index // Helper struct required for recursive access to types by index
...@@ -110,9 +122,13 @@ class multi_in_port<CB, N, I0, I...> : public in_port<I0> { ...@@ -110,9 +122,13 @@ class multi_in_port<CB, N, I0, I...> : public in_port<I0> {
return get_at<POS, in_port_type_at<POS>, my_type>::get(*this); return get_at<POS, in_port_type_at<POS>, my_type>::get(*this);
} }
bool fully_connected() const {
return this->is_connected() && rec_.fully_connected();
}
private: private:
child_type rec_; child_type rec_;
CB &cb_; CB *cb_;
}; };
} }
......
#ifndef PLS_DATAFLOW_INTERNAL_NODE_H_
#define PLS_DATAFLOW_INTERNAL_NODE_H_
namespace pls {
namespace dataflow {
namespace internal {
class node {
template<typename INS, typename OUTS>
friend
class graph;
enum state { fresh, building, built, teardown };
public:
virtual int num_successors() const = 0;
virtual node *successor_at(int pos) const = 0;
virtual int instance_buffer_size() const = 0;
virtual void init_instance_buffer(void *memory) const = 0;
virtual bool is_fully_connected() const = 0;
template<typename M, typename T>
M *get_invocation(token<T> token) {
return token.invocation().template get_instance_buffer<M>(memory_index_);
}
// TODO: Remove
void set_memory_index(int i) { memory_index_ = i; }
private:
int memory_index_{0};
node *direct_successor_;
state state_{fresh};
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_NODE_H_
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <tuple> #include <tuple>
#include "in_port.h" #include "in_port.h"
#include "node.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
...@@ -27,10 +28,16 @@ class out_port { ...@@ -27,10 +28,16 @@ class out_port {
connect(input); connect(input);
} }
void push_token(token<T> token) { void push_token(token<T> token) const {
target_->push_token(token); target_->push_token(token);
} }
node *next_node() const {
return target_->owning_node();
}
bool is_connected() const { return connected_; }
private: private:
bool connected_{false}; bool connected_{false};
in_port<T> *target_{nullptr}; in_port<T> *target_{nullptr};
...@@ -47,6 +54,7 @@ class multi_out_port { ...@@ -47,6 +54,7 @@ class multi_out_port {
// Helpers for managing recursive types // Helpers for managing recursive types
using value_tuple_type = std::tuple<O0, O...>; using value_tuple_type = std::tuple<O0, O...>;
using out_port_tupel_type = std::tuple<out_port<O0>, out_port<O>...>; using out_port_tupel_type = std::tuple<out_port<O0>, out_port<O>...>;
static constexpr int num_outputs = std::tuple_size<value_tuple_type>();
public: public:
// Simple interface to get types by index // Simple interface to get types by index
...@@ -61,8 +69,52 @@ class multi_out_port { ...@@ -61,8 +69,52 @@ class multi_out_port {
return std::get<POS>(outputs_); return std::get<POS>(outputs_);
} }
node *next_node_at(int pos) const {
return next_node < 0 > {&outputs_}.get(pos);
}
bool fully_connected() const {
return connected < 0 > {&outputs_}.get();
}
private: private:
out_port_tupel_type outputs_; out_port_tupel_type outputs_;
template<int POS, typename DUMMY=void>
struct next_node {
const out_port_tupel_type *outputs_;
node *get(int i) {
if (POS == i) {
return std::get<POS>(*outputs_).next_node();
} else {
return next_node<POS + 1>{outputs_}.get(i);
}
}
};
template<typename DUMMY>
struct next_node<num_outputs, DUMMY> {
const out_port_tupel_type *outputs_;
node *get(int i) {
PLS_ERROR("Try to access invalid successor node index!")
}
};
template<int POS, typename DUMMY=void>
struct connected {
const out_port_tupel_type *outputs_;
bool get() {
bool self = std::get<POS>(*outputs_).is_connected();
bool children = connected<POS + 1>{outputs_}.get();
return self && children;
}
};
template<typename DUMMY>
struct connected<num_outputs, DUMMY> {
const out_port_tupel_type *outputs_;
bool get() {
return true;
}
};
}; };
} }
......
...@@ -6,35 +6,33 @@ namespace pls { ...@@ -6,35 +6,33 @@ namespace pls {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
/** class invocation_info {
* Parallel invocations of the same sub-graph are usually working with some kind of coloring public:
* for tokens to distinguishe different invocations. As this concept is abstract and we could explicit invocation_info(void **memory) : memory_{memory} {};
* change it in the future (for e.g. more advanced features/dataflows) we encapsulate it. invocation_info(invocation_info const &other) = default;
*/
struct token_color {
unsigned int clock_;
unsigned int depth_;
int get_index(int parallel_limit) const {
return clock_ % parallel_limit;
}
bool operator==(const token_color &other) { template<typename T>
return other.clock_ == clock_ && other.depth_ == depth_; T *get_instance_buffer(int pos) {
return reinterpret_cast<T *>(memory_[pos]);
} }
private:
void **memory_;
}; };
template<typename T> template<typename T>
class token { class token {
T value_; T value_;
token_color color_; invocation_info invocation_;
public: public:
token() : color_{} {}; // Default Constructor Stays uninitialized token() : invocation_{nullptr} {}; // Default Constructor Stays uninitialized
token(T value, token_color color) : value_{value}, color_{color} {}; token(T value, invocation_info color) : value_{value}, invocation_{color} {};
T &value() { return value_; }
invocation_info invocation() const { return invocation_; }
T value() const { return value_; } void set_invocation(invocation_info invocation_info) { invocation_ = invocation_info; }
token_color color() const { return color_; }
}; };
} }
......
#ifndef PLS_INTERNAL_HELPERS_SEQENCE_H_
#define PLS_INTERNAL_HELPERS_SEQENCE_H_
// See: https://stackoverflow.com/questions/7858817/unpacking-a-tuple-to-call-a-matching-function-pointer
// Would be easy in C++ 14 (has index sequences), but seems to be the only way to do it in C++ 11
namespace pls {
namespace internal {
namespace helpers {
template<int ...>
struct sequence {};
template<int N, int ...S>
struct sequence_gen : sequence_gen<N - 1, N - 1, S...> {};
template<int ...S>
struct sequence_gen<0, S...> {
typedef sequence<S...> type;
};
}
}
}
#endif //PLS_INTERNAL_HELPERS_SEQENCE_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment