Commit 3beb27ed by FritzFlorian

Basic flow of data through the system.

The data can now flow into a graph, follow its path on inptus/outputs and be fetched from the graph after execution.
Currently graphs are executed synchronous.
parent f374d93f
Pipeline #1280 failed with stages
in 27 seconds
...@@ -10,10 +10,27 @@ ...@@ -10,10 +10,27 @@
int main() { int main() {
using namespace pls::dataflow; using namespace pls::dataflow;
graph<inputs<int, int>, outputs<int, int>, 8> graph; graph<inputs<int, int>, outputs<int, int>, 2> graph2;
graph.output<0>().connect(graph.input<0>()); graph2.input<0>() >> graph2.output<1>();
graph.output<1>().connect(graph.input<1>()); graph2.input<1>() >> graph2.output<0>();
graph.input<0>().push_token(pls::dataflow::internal::token<int>()); graph<inputs<int, int>, outputs<int, int>, 2> graph1;
graph.input<1>().push_token(pls::dataflow::internal::token<int>()); graph1.input<0>() >> graph2.external_input<0>();
graph1.input<1>() >> graph2.external_input<1>();
graph2.external_output<0>() >> graph1.output<0>();
graph2.external_output<1>() >> graph1.output<1>();
graph1.push_input(1, 2);
graph1.push_input(3, 4);
auto result1 = graph1.get_output();
std::cout << std::get<0>(result1) << ", " << std::get<1>(result1) << std::endl;
graph1.push_input(5, 6);
auto result2 = graph1.get_output();
std::cout << std::get<0>(result2) << ", " << std::get<1>(result2) << std::endl;
auto result3 = graph1.get_output();
std::cout << std::get<0>(result3) << ", " << std::get<1>(result3) << std::endl;
} }
...@@ -47,7 +47,7 @@ add_library(pls STATIC ...@@ -47,7 +47,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/scheduling/lambda_task.h include/pls/dataflow/inputs.h include/pls/dataflow/outputs.h) include/pls/internal/scheduling/lambda_task.h include/pls/dataflow/inputs.h include/pls/dataflow/outputs.h include/pls/dataflow/input.h include/pls/dataflow/output.h)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
PUBLIC PUBLIC
......
...@@ -4,27 +4,172 @@ ...@@ -4,27 +4,172 @@
#include "inputs.h" #include "inputs.h"
#include "outputs.h" #include "outputs.h"
#include "input.h"
#include "output.h"
#include "internal/token.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
template<typename I, typename O, int P> template<typename I, typename O, int P>
class graph { class graph {
using internal_inputs = typename I::template internal_inputs<P>; /**
using internal_outputs = typename O::template internal_outputs<P>; * Boilerplate for callbacks of dataflow input/output handlers.
*/
struct external_input_cb {
graph *graph_;
void one_input(int pos, internal::token_color color) {
graph_->one_external_input(pos, color);
}
void all_inputs(internal::token_color color) {
graph_->all_external_inputs(color);
}
};
struct internal_output_cb {
graph *graph_;
void one_input(int pos, internal::token_color color) {
graph_->one_internal_output(pos, color);
}
void all_inputs(internal::token_color color) {
graph_->all_internal_outputs(color);
}
};
/**
* Handling of buffers for dataflow.
*/
using exeternal_inputs = typename I::template internal_inputs<P, graph<I, O, P>::external_input_cb>;
using external_outputs = typename O::template internal_outputs<P>;
exeternal_inputs external_inputs_{external_input_cb{this}};
external_outputs external_outputs_{};
void one_external_input(int pos, internal::token_color color) {
// We do currently not support 'single external inputs'.
// This prevents some fine-grained parallelism, but eases implementation
}
void all_external_inputs(internal::token_color external_color) {
auto my_clock = input_clock_++;
internal::token_color internal_color{my_clock, external_color.depth_ + 1};
auto my_index = internal_color.get_index(P);
while (output_clear_[my_index] != clear) {
PLS_ERROR("Full inputs are not implemented jet. Place waiting code in here/yield to scheduler.")
}
output_clear_[my_index] = waiting;
input_output_color_matching_[my_index] = {external_color, internal_color};
external_inputs_.push_to_outputs(internal_inputs_, external_color.get_index(P), internal_color);
}
using internal_outputs = typename I::template internal_inputs<P, graph<I, O, P>::internal_output_cb>;
using internal_inputs = typename O::template internal_outputs<P>;
internal_outputs internal_outputs_{internal_output_cb{this}};
internal_inputs internal_inputs_{};
void one_internal_output(int pos, internal::token_color color) {
// We do not support 'single flow' output. Every graph is well behaved, e.g. will emmit exactly one output tuple.
}
void all_internal_outputs(internal::token_color color) {
auto color_matching = input_output_color_matching_[color.get_index(P)];
auto external_color = std::get<0>(color_matching);
auto internal_color = std::get<1>(color_matching);
if (external_color == internal_color) {
output_clear_[color.get_index(P)] = finished;
} else {
internal_outputs_.push_to_outputs(external_outputs_, internal_color.get_index(P), external_color);
output_clear_[color.get_index(P)] = clear;
}
}
/**
* Handling of colors/maximum concurrent tokens in the system.
*/
std::atomic<unsigned int> input_clock_{0};
std::atomic<unsigned int> output_clock_{0};
std::array<std::pair<internal::token_color, internal::token_color>, P> input_output_color_matching_{};
enum output_state { clear, waiting, finished };
std::array<output_state, P> output_clear_{clear};
public: public:
internal_inputs inputs_; template<int N>
internal_outputs outputs_; using external_output_at = output<P, typename external_outputs::template raw_type_at<N>>;
template<int N>
using external_input_at = input<P, typename exeternal_inputs::template raw_type_at<N>>;
template<int N> template<int N>
decltype(outputs_.template get<N>()) output() { external_output_at<N> external_output() {
return outputs_.template get<N>(); external_output_at<N> o{external_outputs_.template get<N>()};
return o;
} }
template<int N>
external_input_at<N> external_input() {
external_input_at<N> i{external_inputs_.template get<N>()};
return i;
}
template<int N>
using input_at = output<P, typename internal_inputs::template raw_type_at<N>>;
template<int N>
using output_at = input<P, typename internal_outputs::template raw_type_at<N>>;
template<int N> template<int N>
decltype(inputs_.template get<N>()) input() { output_at<N> output() {
return inputs_.template get<N>(); output_at<N> o{internal_outputs_.template get<N>()};
return o;
}
template<int N>
input_at<N> input() {
input_at<N> i{internal_inputs_.template get<N>()};
return i;
}
template<typename ...ARGS>
void push_input(ARGS ...values) {
auto my_clock = input_clock_++;
internal::token_color color{my_clock, 0};
auto my_index = color.get_index(P);
while (output_clear_[my_index] != clear) {
PLS_ERROR("Full inputs are not implemented jet. Place waiting code in here/yield to scheduler.")
}
output_clear_[my_index] = waiting;
input_output_color_matching_[my_index] = {color, color};
push_input_rec<0, ARGS...>(color, values...);
}
template<int POS, typename ...TAIL>
void push_input_rec(internal::token_color color) {}
template<int POS, typename HEAD, typename ...TAIL>
void push_input_rec(internal::token_color color, HEAD h, TAIL ...t) {
internal_inputs_.template get<POS>().push_token(internal::token<HEAD>{h, color});
push_input_rec<POS + 1, TAIL...>(color, t...);
}
typename internal_outputs::raw_types get_output() {
while (true) {
if (output_clock_ > input_clock_) {
PLS_ERROR("Must not try to pull outputs if there are not enough inputs.")
}
auto color_matching = input_output_color_matching_[output_clock_ % P];
if (std::get<0>(color_matching) == std::get<1>(color_matching)) {
break;
}
output_clock_++;
}
while (output_clear_[output_clock_ % P] != finished) {
PLS_ERROR("Blocking/Unfinishe outputs are not implemented jet. Place waiting code in here/yield to scheduler.")
}
auto result = internal_outputs_.get_outputs(output_clock_ % P);
output_clear_[output_clock_ % P] = clear;
output_clock_++;
return result;
} }
}; };
......
...@@ -11,8 +11,8 @@ namespace dataflow { ...@@ -11,8 +11,8 @@ namespace dataflow {
template<typename I1, typename ...I> template<typename I1, typename ...I>
struct inputs { struct inputs {
template<int P> template<int P, typename CB>
using internal_inputs = internal::inputs<P, I1, I...>; using internal_inputs = internal::inputs<P, CB, I1, I...>;
}; };
} }
......
...@@ -37,11 +37,17 @@ class input { ...@@ -37,11 +37,17 @@ class input {
} }
public: public:
input() = default;
void push_token(token<T> token) { void push_token(token<T> token) {
tokens_[token.color().get_index(P)] = token; tokens_[token.color().get_index(P)] = token;
cb_->token_pushed(input_pos_, token.color()); cb_->token_pushed(input_pos_, token.color());
} }
token<T> token_at(int p) {
return tokens_[p];
}
void set_cb(push_token_cb *cb, int input_pos) { void set_cb(push_token_cb *cb, int input_pos) {
cb_ = cb; cb_ = cb;
input_pos_ = input_pos; input_pos_ = input_pos;
......
...@@ -6,31 +6,50 @@ ...@@ -6,31 +6,50 @@
#include <array> #include <array>
#include "input.h" #include "input.h"
#include "outputs.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
template<int P, typename ...I> template<int P, typename CB, typename ...I>
class inputs : push_token_cb { class inputs : push_token_cb {
public:
using raw_types = std::tuple<I...>;
using values_type = std::tuple<input<P, I>...>; using values_type = std::tuple<input<P, I>...>;
values_type values_;
template<int N>
using raw_type_at = typename std::tuple_element<N, raw_types>::type;
template<int N>
using value_type_at = typename std::tuple_element<N, values_type>::type;
static constexpr unsigned int num_values = std::tuple_size<values_type>::value; static constexpr unsigned int num_values = std::tuple_size<values_type>::value;
private:
values_type values_;
std::array<std::atomic<unsigned int>, P> required_inputs_; std::array<std::atomic<unsigned int>, P> required_inputs_;
CB cb_;
public: public:
inputs() { explicit inputs(CB cb) : cb_{cb} {
for (int i = 0; i < P; i++) { for (int i = 0; i < P; i++) {
required_inputs_[i] = num_values; required_inputs_[i] = num_values;
} }
init_cb<0, I...>::call(this); init_cb<0, I...>::call(this);
} }
inputs(inputs &&other) = delete;
inputs(const inputs &other) = delete;
inputs &operator=(inputs &&other) = delete;
inputs &operator=(const inputs &other) = delete;
void token_pushed(int pos, token_color color) override {
auto index = color.get_index(P);
int current_required = --required_inputs_[index];
cb_.one_input(pos, color);
void token_pushed(int /*pos*/, token_color color) override {
int current_required = --required_inputs_[color.get_index(P)];
if (current_required == 0) { if (current_required == 0) {
std::cout << "All Inputs Avaliable" << std::endl; // TODO: Add proper callback in here cb_.all_inputs(color);
required_inputs_[index] = num_values;
} }
} }
...@@ -39,14 +58,47 @@ class inputs : push_token_cb { ...@@ -39,14 +58,47 @@ class inputs : push_token_cb {
return std::get<N>(values_); return std::get<N>(values_);
} }
raw_types get_outputs(int p) {
return fill_output<0, I...>::call(*this, p);
}
template<int POS, typename ...TAIL>
struct fill_output {
static std::tuple<> call(inputs<P, CB, I...> &self, int p) {
return {};
}
};
template<int POS, typename HEAD, typename ...TAIL>
struct fill_output<POS, HEAD, TAIL...> {
static std::tuple<HEAD, TAIL...> call(inputs<P, CB, I...> &self, int p) {
std::tuple<decltype(self.get<POS>().token_at(p).value())> our_tuple{self.get<POS>().token_at(p).value()};
return std::tuple_cat(our_tuple, fill_output<POS + 1, TAIL...>::call(self, p));
}
};
void push_to_outputs(outputs<P, I...> &outputs, int input_index, token_color new_color) {
push_to_outputs_rec<0, I...>::call(this, &outputs, input_index, new_color);
}
template<int POS, typename ...TAIL>
struct push_to_outputs_rec {
static void call(inputs<P, CB, I...> *inputs, outputs<P, I...> *outputs, int input_index, token_color new_color) {}
};
template<int POS, typename HEAD, typename ...TAIL>
struct push_to_outputs_rec<POS, HEAD, TAIL...> {
static void call(inputs<P, CB, I...> *inputs, outputs<P, I...> *outputs, int input_index, token_color new_color) {
auto token = inputs->get<POS>().token_at(input_index);
outputs->template get<POS>().push_token({token.value(), new_color});
push_to_outputs_rec<POS + 1, TAIL...>::call(inputs, outputs, input_index, new_color);
}
};
// TODO: Change CB code using proper templating to save method calls during execution... // TODO: Change CB code using proper templating to save method calls during execution...
template<int POS, typename ...TAIL> template<int POS, typename ...TAIL>
struct init_cb { struct init_cb {
static void call(inputs<P, I...> *inputs) { } static void call(inputs<P, CB, I...> *inputs) {}
}; };
template<int POS, typename HEAD, typename ...TAIL> template<int POS, typename HEAD, typename ...TAIL>
struct init_cb<POS, HEAD, TAIL...> { struct init_cb<POS, HEAD, TAIL...> {
static void call(inputs<P, I...> *inputs) { static void call(inputs<P, CB, I...> *inputs) {
inputs->get<POS>().set_cb(inputs, POS); inputs->get<POS>().set_cb(inputs, POS);
init_cb<POS + 1, TAIL...>::call(inputs); init_cb<POS + 1, TAIL...>::call(inputs);
} }
......
...@@ -12,12 +12,21 @@ namespace internal { ...@@ -12,12 +12,21 @@ namespace internal {
template<int P, typename ...O> template<int P, typename ...O>
class outputs { class outputs {
public:
using raw_types = std::tuple<O...>;
using values_type = std::tuple<output<P, O>...>; using values_type = std::tuple<output<P, O>...>;
template<int N>
using raw_type_at = typename std::tuple_element<N, raw_types>::type;
template<int N>
using value_type_at = typename std::tuple_element<N, values_type>::type;
private:
values_type values_; values_type values_;
public: public:
template<int N> template<int N>
typename std::tuple_element<N, values_type>::type &get() { value_type_at<N> &get() {
return std::get<N>(values_); return std::get<N>(values_);
} }
}; };
......
...@@ -13,10 +13,15 @@ namespace internal { ...@@ -13,10 +13,15 @@ namespace internal {
*/ */
struct token_color { struct token_color {
unsigned int clock_; unsigned int clock_;
unsigned int depth_;
int get_index(int parallel_limit) const { int get_index(int parallel_limit) const {
return clock_ % parallel_limit; return clock_ % parallel_limit;
} }
bool operator==(const token_color &other) {
return other.clock_ == clock_ && other.depth_ == depth_;
}
}; };
template<typename T> template<typename T>
...@@ -25,6 +30,9 @@ class token { ...@@ -25,6 +30,9 @@ class token {
token_color color_; token_color color_;
public: public:
token() : color_{} {}; // Default Constructor Stays uninitialized
token(T value, token_color color) : value_{value}, color_{color} {};
T value() const { return value_; } T value() const { return value_; }
token_color color() const { return color_; } token_color color() const { return color_; }
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment