Commit 94804014 by FritzFlorian

Refine flow through system with better template programming.

We separated the structure (input-output flow) from the rest of the architecture and reworked some template programming to have better access to the types required at compile time.
parent 09349f1b
Pipeline #1282 failed with stages
in 34 seconds
...@@ -4,6 +4,80 @@ A collection of stuff that we noticed during development. ...@@ -4,6 +4,80 @@ A collection of stuff that we noticed during development.
Useful later on two write a project report and to go back Useful later on two write a project report and to go back
in time to find out why certain decisions where made. in time to find out why certain decisions where made.
## 19.07.2019 - Colored tokens, recursion and where to put memory
While implementing dataflow graphs we encountered some obstacles.
The most severe one the most severe one that impacts the
internal design of the API, end user features and performance
is how to handle token flows, especially colored tokens for parallel
and/or recursive calls.
The basic issue here is, that each execution instance requires its
own isolated environment to execute in. Therefore, each invocation
instance (parallel or recursive) needs an unique identifier.
This identifier is usually realized using colored tokens,
as for example in this \[1] classic dataflow language implementation.
The problem with this is, that in our environment we are constrained
to static memory allocation and lock-free programming.
To handle static memory allocation we first decided to follow an model
introduced by EMBB, associating an clock to each colored token,
which maps it to an slot in an array of possible parallel execution
instances. This works fine in EMBB, as they put two major limitations
on their execution model: no cycles and no recursion.
(Issues arise with correcly coloring tokens in an lock-free matter
without too much contention, leading us to believe that this
method scales very bad, especially with smaller workloads)
While we could simply implement that, we wondered if there is a way to
better fit dataflow in our existing, task-stack based programming
approach.
After some investigation we found some core statements:
- The dataflow graph itself describes only programming execution,
it should therefore not be tightly coupled with data/buffer management
- Each invocation of an dataflow graph is logically a task in our model,
it therefore makes sense to map the memory and coordination resources
required for one invocation instance directly to this task
- What we do is in fact not 'pure' dataflow programming, as we do not
try to re-create a full programming language (e.g. we do not care about
memory management and loops described in \[1])
- What we do is more close to functional programming with single
assignment rules and recursion (see Elixir for a nice syntactic example)
Our plan is therefore the following:
Separate structure of the dataflow from execution and map
one execution instance to one active task in our runtime system.
This, conveniently, also mitigates most issues related to
memory allocation/parallelism in the graph and makes for a
nicer end user API (no more buffer types/parallelism in templates).
\[1] J. B. Dennis, “First version of a data flow procedure language,” in Programming Symposium, vol. 19, B. Robinet, Ed. Berlin, Heidelberg: Springer Berlin Heidelberg, 1974, pp. 362–376.
## 19.07.2019 - Variadic Templates for input/output goups
We searched for the correct way to represent an nodes
or graphs input/output pairs for a while and found the
solution in partial specialization of templates.
```C++
template<typename INS, typename OUTS>
class graph {};
template<typename I0, typename ...I, typename O0, typename ...O>
class graph<inputs<I0, I...>, outputs<O0, O...>> { ... };
```
The above code allows us to enforce an end-user API that is
clean while still giving us full access on the input/output variadic
template types. The end user API is the following:
```C++
graph<inputs<int, int>, outputs<std::string>> g;
```
## 03.07.2019 - Outline/Plan for our Dataflow API ## 03.07.2019 - Outline/Plan for our Dataflow API
The following describes our ideas for what we expect/want to build The following describes our ideas for what we expect/want to build
......
...@@ -2,35 +2,52 @@ ...@@ -2,35 +2,52 @@
#include <string> #include <string>
#include <cstdio> #include <cstdio>
#include <tuple> #include <tuple>
#include <array>
#include <pls/pls.h> #include <pls/dataflow/inputs.h>
#include <pls/dataflow/graph.h> #include <pls/dataflow/outputs.h>
#include <pls/dataflow/internal/token.h> #include <pls/dataflow/internal/graph.h>
#include <pls/dataflow/internal/out_port.h>
#include <pls/dataflow/internal/buffer.h>
int main() { int main() {
using namespace pls::dataflow; using namespace pls::dataflow;
using namespace pls::dataflow::internal;
out_port<int> port1;
out_port<int> port2;
graph<static_buffer<4>::type, inputs<int, int>, outputs<int>> tmp{};
port1 >> tmp.in_port<0>();
port2 >> tmp.in_port<1>();
port1.push_token({1, {}});
port2.push_token({2, {}});
// using namespace pls::dataflow;
//
// graph<inputs<int, int>, outputs<int, int>, 2> graph2;
// graph2.input<0>() >> graph2.output<1>();
// graph2.input<1>() >> graph2.output<0>();
//
// graph<inputs<int, int>, outputs<int, int>, 2> graph1;
// graph1.input<0>() >> graph2.external_input<0>();
// graph1.input<1>() >> graph2.external_input<1>();
// graph2.external_output<0>() >> graph1.output<0>();
// graph2.external_output<1>() >> graph1.output<1>();
//
// graph1.push_input(1, 2);
// graph1.push_input(3, 4);
//
// auto result1 = graph1.get_output();
// std::cout << std::get<0>(result1) << ", " << std::get<1>(result1) << std::endl;
//
// graph1.push_input(5, 6);
//
// auto result2 = graph1.get_output();
// std::cout << std::get<0>(result2) << ", " << std::get<1>(result2) << std::endl;
//
// auto result3 = graph1.get_output();
// std::cout << std::get<0>(result3) << ", " << std::get<1>(result3) << std::endl;
graph<inputs<int, int>, outputs<int, int>, 2> graph2;
graph2.input<0>() >> graph2.output<1>();
graph2.input<1>() >> graph2.output<0>();
graph<inputs<int, int>, outputs<int, int>, 2> graph1;
graph1.input<0>() >> graph2.external_input<0>();
graph1.input<1>() >> graph2.external_input<1>();
graph2.external_output<0>() >> graph1.output<0>();
graph2.external_output<1>() >> graph1.output<1>();
graph1.push_input(1, 2);
graph1.push_input(3, 4);
auto result1 = graph1.get_output();
std::cout << std::get<0>(result1) << ", " << std::get<1>(result1) << std::endl;
graph1.push_input(5, 6);
auto result2 = graph1.get_output();
std::cout << std::get<0>(result2) << ", " << std::get<1>(result2) << std::endl;
auto result3 = graph1.get_output();
std::cout << std::get<0>(result3) << ", " << std::get<1>(result3) << std::endl;
} }
...@@ -10,12 +10,13 @@ add_library(pls STATIC ...@@ -10,12 +10,13 @@ add_library(pls STATIC
include/pls/algorithms/scan_impl.h include/pls/algorithms/scan_impl.h
include/pls/dataflow/dataflow.h include/pls/dataflow/dataflow.h
include/pls/dataflow/inputs.h
include/pls/dataflow/outputs.h
include/pls/dataflow/internal/token.h include/pls/dataflow/internal/token.h
include/pls/dataflow/graph.h include/pls/dataflow/internal/in_port.h
include/pls/dataflow/internal/inputs.h include/pls/dataflow/internal/out_port.h
include/pls/dataflow/internal/outputs.h include/pls/dataflow/internal/graph.h
include/pls/dataflow/internal/input.h include/pls/dataflow/internal/buffer.h
include/pls/dataflow/internal/output.h
include/pls/internal/base/spin_lock.h include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
...@@ -47,7 +48,8 @@ add_library(pls STATIC ...@@ -47,7 +48,8 @@ add_library(pls STATIC
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/scheduling/lambda_task.h include/pls/dataflow/inputs.h include/pls/dataflow/outputs.h include/pls/dataflow/input.h include/pls/dataflow/output.h) include/pls/internal/scheduling/lambda_task.h
)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
PUBLIC PUBLIC
......
#ifndef PLS_DATAFLOW_INPUT_H_
#define PLS_DATAFLOW_INPUT_H_
#include "pls/dataflow/internal/input.h"
namespace pls {
namespace dataflow {
template<int P, typename T>
class input {
template<int OP, typename OT>
friend
class output;
using internal_type = internal::input<P, T>;
internal_type &internal_input_;
public:
explicit input(internal_type &internal_input) : internal_input_{internal_input} {};
};
}
}
#endif //PLS_DATAFLOW_INPUT_H_
...@@ -2,17 +2,11 @@ ...@@ -2,17 +2,11 @@
#ifndef PLS_DATAFLOW_INPUTS_H_ #ifndef PLS_DATAFLOW_INPUTS_H_
#define PLS_DATAFLOW_INPUTS_H_ #define PLS_DATAFLOW_INPUTS_H_
#include <tuple>
#include "internal/inputs.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
template<typename I1, typename ...I> template<typename I1, typename ...I>
struct inputs { struct inputs {
template<int P, typename CB>
using internal_inputs = internal::inputs<P, CB, I1, I...>;
}; };
} }
......
#ifndef PLS_DATAFLOW_INTERNAL_BUFFER_H_
#define PLS_DATAFLOW_INTERNAL_BUFFER_H_
#include <array>
#include "in_port.h"
namespace pls {
namespace dataflow {
namespace internal {
template<int P, typename T>
struct static_buffer_impl {
std::array<T, P> buffer_;
const T &operator[](size_t i) const { return buffer_[i]; }
T &operator[](size_t i) { return buffer_[i]; }
static_buffer_impl() : buffer_{} {};
explicit static_buffer_impl(T init) : buffer_(init) {};
int capacity() const {
return P;
}
};
template<int P>
struct static_buffer {
template<typename T>
using type = static_buffer_impl<P, T>;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_BUFFER_H_
#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_H_
#define PLS_DATAFLOW_INTERNAL_GRAPH_H_
#include <tuple>
#include <iostream>
#include <atomic>
#include "buffer.h"
#include "in_port.h"
#include "out_port.h"
#include "pls/dataflow/inputs.h"
#include "pls/dataflow/outputs.h"
namespace pls {
namespace dataflow {
namespace internal {
struct graph_invocation {
enum state { clear, running, finished };
explicit graph_invocation(int num_outputs) : num_outputs_{num_outputs}, outputs_missing_{num_outputs} {};
void reset() {
state_ = clear;
internal_call_ = false;
previous_color_ = {};
outputs_missing_ = num_outputs_;
}
const int num_outputs_;
std::atomic<int> outputs_missing_;
state state_{clear};
bool internal_call_{false};
token_color previous_color_{};
};
template<template<typename> class B, typename INS, typename OUTS>
class graph {};
template<template<typename> class B, typename I0, typename ...I, typename O0, typename ...O>
class graph<B, pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>> {
private:
using self_type = graph<B, pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>>;
struct in_port_cb {
self_type &self_;
explicit in_port_cb(self_type &self) : self_{self} {};
template<int POS, typename T>
void token_pushed(token<T> token) {
self_.in_port_token_pushed<POS, T>(token);
}
};
struct output_cb {
const self_type &self_;
explicit output_cb(self_type &self) : self_{self} {};
template<int POS, typename T>
void token_pushed(token<T> token) {
self_.output_token_pushed<POS, T>(token);
}
};
// Type-Defs used internally
using my_type = graph<B, pls::dataflow::inputs<I0, I...>, pls::dataflow::outputs<O0, O...>>;
using multi_in_port_type = multi_in_port<in_port_cb, 0, I0, I...>;
using multi_out_port_type = multi_out_port<O0, O...>;
using input_tuple = std::tuple<I0, I...>;
using output_tuple = std::tuple<O0, O...>;
public:
template<int POS>
using in_port_at = typename multi_in_port_type::template in_port_type_at<POS>;
template<int POS>
using out_port_at = typename multi_out_port_type::template out_port_type_at<POS>;
template<int POS>
in_port_at<POS> &in_port() {
return in_port_.template get<POS>();
}
template<int POS>
out_port_at<POS> &out_port() {
return out_port_.template get<POS>();
}
template<int POS, typename T>
void in_port_token_pushed(token<T> token) {
auto my_clock = input_clock_++;
auto index = token.color().get_index(buffer_size_);
std::cout << "Token pushed at " << POS << " with value " << token.value() << std::endl;
// std::get<POS>(inputs_[index]) = token;
//
// auto remaining = --inputs_required_[index];
// if (remaining == 0) {
// std::cout << "All tokens at clock " << token.color().clock_ << std::endl;
// }
}
template<int POS, typename T>
void output_token_pushed(token<T> token) {
}
graph() : in_port_cb_{*this}, in_port_{in_port_cb_}, out_port_{}, invocations_{} {
buffer_size_ = invocations_.capacity();
}
private:
void reset_input(int i) {
inputs_missing_[i] =
}
// Input/Output ports
in_port_cb in_port_cb_;
multi_in_port_type in_port_;
multi_out_port_type out_port_;
const int buffer_size_;
// Clocks and state for execution
std::atomic<unsigned int> input_clock_{0};
std::atomic<unsigned int> output_clock_{0};
B<input_tuple> input_buffer_;
B<std::atomic<int>> inputs_missing_;
B<graph_invocation> invocations_;
B<output_tuple> output_buffer_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_GRAPH_H_
#ifndef PLS_DATAFLOW_INTERNAL_INPUT_H_
#define PLS_DATAFLOW_INTERNAL_INPUT_H_
#include "pls/internal/base/error_handling.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
/**
* Represents a single, logical input port (no data store, simply signal propagation).
* @tparam T Type of the input port
*/
template<typename T>
class in_port {
template<typename OT>
friend
class out_port;
protected:
virtual void token_pushed(token<T> token) = 0;
private:
bool connected_{false};
void push_token(token<T> token) {
token_pushed(token);
}
void connect() {
if (connected_) {
PLS_ERROR("Must only connect on input once. Disconnect the output pointing to it before reconnecting.")
}
connected_ = true;
}
};
/**
* Represents multiple input ports bundled together (a tuple of inputs).
* Allows for a unified callback method to handle multiple typed inputs.
*
* template<int POS, typename T>
* void token_pushed(token<T> token) { Notified when tokens arrive }
*
* @tparam CB The class implementing a callback
* @tparam N Put 0 to start the recursive implementation
* @tparam I A variadic list of input types
*/
template<typename CB, int N, typename ...I>
class multi_in_port {
// end of template recursion
public:
explicit multi_in_port(CB &) {};
};
template<typename CB, int N, typename I0, typename ...I>
class multi_in_port<CB, N, I0, I...> : public in_port<I0> {
private:
// Helpers for managing recursive types
using my_type = multi_in_port<CB, N, I0, I...>;
using child_type = multi_in_port<CB, N + 1, I...>;
using value_type = I0;
public:
explicit multi_in_port(CB &cb) : cb_{cb}, rec_{cb} {};
void token_pushed(token<I0> token) override {
cb_.template token_pushed<N, I0>(token);
}
// Helper struct required for recursive access to types by index
template<int POS, typename ...T>
struct type_at {
};
template<typename T0, typename ...T>
struct type_at<0, T0, T...> {
using type = T0;
using in_port_type = in_port<type>;
};
template<int POS, typename T0, typename ...T>
struct type_at<POS, T0, T...> {
using type = typename type_at<POS - 1, T...>::type;
using in_port_type = in_port<type>;
};
// Simple interface to get types by index
template<int POS>
using in_port_type_at = typename type_at<POS, I0, I...>::in_port_type;
template<int POS>
using value_type_at = typename type_at<POS, I0, I...>::type;
// Helper struct required for recursive access to input's by index
template<int POS, class RES_T, class MY_T>
struct get_at {
static RES_T &get(MY_T &self) {
return get_at<POS - 1, RES_T, typename MY_T::child_type>::get(self.rec_);
}
};
template<class RESULT, class CURRENT>
struct get_at<0, RESULT, CURRENT> {
static RESULT &get(CURRENT &self) {
return self;
}
};
// Simple interface to access input's by index
template<int POS>
in_port_type_at<POS> &get() {
return get_at<POS, in_port_type_at<POS>, my_type>::get(*this);
}
private:
child_type rec_;
CB &cb_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_INPUT_H_
#ifndef PLS_DATAFLOW_INTERNAL_INPUT_H_
#define PLS_DATAFLOW_INTERNAL_INPUT_H_
#include <array>
#include "pls/internal/base/error_handling.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
class push_token_cb {
public:
virtual void token_pushed(int pos, token_color color) = 0;
};
template<int P, typename T>
class input {
template<int OP, typename OT>
friend
class output;
std::array<token<T>, P> tokens_;
bool connected_{false};
push_token_cb *cb_{nullptr};
int input_pos_{0};
void connect() {
if (connected_) {
PLS_ERROR("Must only connect on input once. Disconnect the output pointing to it before reconnecting.")
}
connected_ = true;
}
public:
input() = default;
void push_token(token<T> token) {
tokens_[token.color().get_index(P)] = token;
cb_->token_pushed(input_pos_, token.color());
}
token<T> token_at(int p) {
return tokens_[p];
}
void set_cb(push_token_cb *cb, int input_pos) {
cb_ = cb;
input_pos_ = input_pos;
}
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_INPUT_H_
#ifndef PLS_DATAFLOW_INTERNAL_INPUTS_H_
#define PLS_DATAFLOW_INTERNAL_INPUTS_H_
#include <tuple>
#include <array>
#include "input.h"
#include "outputs.h"
namespace pls {
namespace dataflow {
namespace internal {
template<int P, typename CB, typename ...I>
class inputs : push_token_cb {
public:
using raw_types = std::tuple<I...>;
using values_type = std::tuple<input<P, I>...>;
template<int N>
using raw_type_at = typename std::tuple_element<N, raw_types>::type;
template<int N>
using value_type_at = typename std::tuple_element<N, values_type>::type;
static constexpr unsigned int num_values = std::tuple_size<values_type>::value;
private:
values_type values_;
std::array<std::atomic<unsigned int>, P> required_inputs_;
CB cb_;
public:
explicit inputs(CB cb) : cb_{cb} {
for (int i = 0; i < P; i++) {
required_inputs_[i] = num_values;
}
init_cb<0, I...>::call(this);
}
inputs(inputs &&other) = delete;
inputs(const inputs &other) = delete;
inputs &operator=(inputs &&other) = delete;
inputs &operator=(const inputs &other) = delete;
void token_pushed(int pos, token_color color) override {
auto index = color.get_index(P);
int current_required = --required_inputs_[index];
cb_.one_input(pos, color);
if (current_required == 0) {
cb_.all_inputs(color);
required_inputs_[index] = num_values;
}
}
template<int N>
typename std::tuple_element<N, values_type>::type &get() {
return std::get<N>(values_);
}
raw_types get_outputs(int p) {
return fill_output<0, I...>::call(*this, p);
}
template<int POS, typename ...TAIL>
struct fill_output {
static std::tuple<> call(inputs<P, CB, I...> &self, int p) {
return {};
}
};
template<int POS, typename HEAD, typename ...TAIL>
struct fill_output<POS, HEAD, TAIL...> {
static std::tuple<HEAD, TAIL...> call(inputs<P, CB, I...> &self, int p) {
std::tuple<decltype(self.get<POS>().token_at(p).value())> our_tuple{self.get<POS>().token_at(p).value()};
return std::tuple_cat(our_tuple, fill_output<POS + 1, TAIL...>::call(self, p));
}
};
void push_to_outputs(outputs<P, I...> &outputs, int input_index, token_color new_color) {
push_to_outputs_rec<0, I...>::call(this, &outputs, input_index, new_color);
}
template<int POS, typename ...TAIL>
struct push_to_outputs_rec {
static void call(inputs<P, CB, I...> *inputs, outputs<P, I...> *outputs, int input_index, token_color new_color) {}
};
template<int POS, typename HEAD, typename ...TAIL>
struct push_to_outputs_rec<POS, HEAD, TAIL...> {
static void call(inputs<P, CB, I...> *inputs, outputs<P, I...> *outputs, int input_index, token_color new_color) {
auto token = inputs->get<POS>().token_at(input_index);
outputs->template get<POS>().push_token({token.value(), new_color});
push_to_outputs_rec<POS + 1, TAIL...>::call(inputs, outputs, input_index, new_color);
}
};
// TODO: Change CB code using proper templating to save method calls during execution...
template<int POS, typename ...TAIL>
struct init_cb {
static void call(inputs<P, CB, I...> *inputs) {}
};
template<int POS, typename HEAD, typename ...TAIL>
struct init_cb<POS, HEAD, TAIL...> {
static void call(inputs<P, CB, I...> *inputs) {
inputs->get<POS>().set_cb(inputs, POS);
init_cb<POS + 1, TAIL...>::call(inputs);
}
};
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_INPUTS_H_
#ifndef PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#define PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#include <tuple>
#include "in_port.h"
namespace pls {
namespace dataflow {
namespace internal {
template<class T>
class out_port {
public:
void connect(in_port<T> &target) {
if (connected_) {
PLS_ERROR("Must only connect output once. Please disconnect it before reconnecting.")
}
target.connect();
target_ = &target;
connected_ = true;
}
void operator>>(in_port<T> &input) {
connect(input);
}
void push_token(token<T> token) {
target_->push_token(token);
}
private:
bool connected_{false};
in_port<T> *target_{nullptr};
};
/**
* Represents multiple output ports bundled together (a tuple of inputs).
*
* @tparam I A variadic list of input types
*/
template<typename O0, typename ...O>
class multi_out_port {
private:
// Helpers for managing recursive types
using value_tuple_type = std::tuple<O0, O...>;
using out_port_tupel_type = std::tuple<out_port<O0>, out_port<O>...>;
public:
// Simple interface to get types by index
template<int POS>
using out_port_type_at = typename std::tuple_element<POS, out_port_tupel_type>::type;
template<int POS>
using value_type_at = typename std::tuple_element<POS, value_tuple_type>::type;
// Simple interface to access input's by index
template<int POS>
out_port_type_at<POS> &get() {
return std::get<POS>(outputs_);
}
private:
out_port_tupel_type outputs_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#ifndef PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#define PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#include "token.h"
#include "input.h"
#include "pls/internal/base/error_handling.h"
namespace pls {
namespace dataflow {
namespace internal {
template<int P, typename T>
class output {
input<P, T> *target_;
bool connected_;
public:
output() : target_{nullptr}, connected_{false} {};
void connect(input<P, T> &target) {
if (connected_) {
PLS_ERROR("Must only connect output once. Please disconnect it before reconnecting.")
}
target.connect();
target_ = &target;
connected_ = true;
}
void push_token(token<T> token) {
target_->push_token(token);
}
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#ifndef PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
#define PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
#include <tuple>
#include "output.h"
namespace pls {
namespace dataflow {
namespace internal {
template<int P, typename ...O>
class outputs {
public:
using raw_types = std::tuple<O...>;
using values_type = std::tuple<output<P, O>...>;
template<int N>
using raw_type_at = typename std::tuple_element<N, raw_types>::type;
template<int N>
using value_type_at = typename std::tuple_element<N, values_type>::type;
private:
values_type values_;
public:
template<int N>
value_type_at<N> &get() {
return std::get<N>(values_);
}
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
#ifndef PLS_DATAFLOW_OUTPUT_H_
#define PLS_DATAFLOW_OUTPUT_H_
#include "input.h"
#include "pls/dataflow/internal/output.h"
namespace pls {
namespace dataflow {
template<int P, typename T>
class output {
using internal_type = internal::output<P, T>;
internal_type &internal_output_;
public:
explicit output(internal_type &internal_output) : internal_output_{internal_output} {};
void operator>>(const input<P, T> &input) {
internal_output_.connect(input.internal_input_);
}
};
}
}
#endif //PLS_DATAFLOW_OUTPUT_H_
...@@ -2,16 +2,11 @@ ...@@ -2,16 +2,11 @@
#ifndef PLS_DATAFLOW_OUTPUTS_H_ #ifndef PLS_DATAFLOW_OUTPUTS_H_
#define PLS_DATAFLOW_OUTPUTS_H_ #define PLS_DATAFLOW_OUTPUTS_H_
#include "internal/outputs.h"
#include "outputs.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
template<typename O1, typename ...O> template<typename O0, typename ...O>
struct outputs { struct outputs {
template<int P>
using internal_outputs = internal::outputs<P, O1, O...>;
}; };
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment