diff --git a/app/playground/main.cpp b/app/playground/main.cpp index b370892..cd9e742 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -10,10 +10,27 @@ int main() { using namespace pls::dataflow; - graph, outputs, 8> graph; - graph.output<0>().connect(graph.input<0>()); - graph.output<1>().connect(graph.input<1>()); + graph, outputs, 2> graph2; + graph2.input<0>() >> graph2.output<1>(); + graph2.input<1>() >> graph2.output<0>(); - graph.input<0>().push_token(pls::dataflow::internal::token()); - graph.input<1>().push_token(pls::dataflow::internal::token()); + graph, outputs, 2> graph1; + graph1.input<0>() >> graph2.external_input<0>(); + graph1.input<1>() >> graph2.external_input<1>(); + graph2.external_output<0>() >> graph1.output<0>(); + graph2.external_output<1>() >> graph1.output<1>(); + + graph1.push_input(1, 2); + graph1.push_input(3, 4); + + auto result1 = graph1.get_output(); + std::cout << std::get<0>(result1) << ", " << std::get<1>(result1) << std::endl; + + graph1.push_input(5, 6); + + auto result2 = graph1.get_output(); + std::cout << std::get<0>(result2) << ", " << std::get<1>(result2) << std::endl; + + auto result3 = graph1.get_output(); + std::cout << std::get<0>(result3) << ", " << std::get<1>(result3) << std::endl; } diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 45b0740..41a974b 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -47,7 +47,7 @@ add_library(pls STATIC include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp - include/pls/internal/scheduling/lambda_task.h include/pls/dataflow/inputs.h include/pls/dataflow/outputs.h) + include/pls/internal/scheduling/lambda_task.h include/pls/dataflow/inputs.h include/pls/dataflow/outputs.h include/pls/dataflow/input.h include/pls/dataflow/output.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls PUBLIC diff --git a/lib/pls/include/pls/dataflow/graph.h b/lib/pls/include/pls/dataflow/graph.h index 46fab38..73b4e23 100644 --- a/lib/pls/include/pls/dataflow/graph.h +++ b/lib/pls/include/pls/dataflow/graph.h @@ -4,27 +4,172 @@ #include "inputs.h" #include "outputs.h" +#include "input.h" +#include "output.h" + +#include "internal/token.h" namespace pls { namespace dataflow { template class graph { - using internal_inputs = typename I::template internal_inputs

; - using internal_outputs = typename O::template internal_outputs

; + /** + * Boilerplate for callbacks of dataflow input/output handlers. + */ + struct external_input_cb { + graph *graph_; + + void one_input(int pos, internal::token_color color) { + graph_->one_external_input(pos, color); + } + void all_inputs(internal::token_color color) { + graph_->all_external_inputs(color); + } + }; + struct internal_output_cb { + graph *graph_; + + void one_input(int pos, internal::token_color color) { + graph_->one_internal_output(pos, color); + } + void all_inputs(internal::token_color color) { + graph_->all_internal_outputs(color); + } + }; + + /** + * Handling of buffers for dataflow. + */ + using exeternal_inputs = typename I::template internal_inputs::external_input_cb>; + using external_outputs = typename O::template internal_outputs

; + exeternal_inputs external_inputs_{external_input_cb{this}}; + external_outputs external_outputs_{}; + + void one_external_input(int pos, internal::token_color color) { + // We do currently not support 'single external inputs'. + // This prevents some fine-grained parallelism, but eases implementation + } + void all_external_inputs(internal::token_color external_color) { + auto my_clock = input_clock_++; + internal::token_color internal_color{my_clock, external_color.depth_ + 1}; + auto my_index = internal_color.get_index(P); + + while (output_clear_[my_index] != clear) { + PLS_ERROR("Full inputs are not implemented jet. Place waiting code in here/yield to scheduler.") + } + + output_clear_[my_index] = waiting; + input_output_color_matching_[my_index] = {external_color, internal_color}; + external_inputs_.push_to_outputs(internal_inputs_, external_color.get_index(P), internal_color); + } + + using internal_outputs = typename I::template internal_inputs::internal_output_cb>; + using internal_inputs = typename O::template internal_outputs

; + internal_outputs internal_outputs_{internal_output_cb{this}}; + internal_inputs internal_inputs_{}; + + void one_internal_output(int pos, internal::token_color color) { + // We do not support 'single flow' output. Every graph is well behaved, e.g. will emmit exactly one output tuple. + } + void all_internal_outputs(internal::token_color color) { + auto color_matching = input_output_color_matching_[color.get_index(P)]; + auto external_color = std::get<0>(color_matching); + auto internal_color = std::get<1>(color_matching); + if (external_color == internal_color) { + output_clear_[color.get_index(P)] = finished; + } else { + internal_outputs_.push_to_outputs(external_outputs_, internal_color.get_index(P), external_color); + output_clear_[color.get_index(P)] = clear; + } + } + + /** + * Handling of colors/maximum concurrent tokens in the system. + */ + std::atomic input_clock_{0}; + std::atomic output_clock_{0}; + std::array, P> input_output_color_matching_{}; + enum output_state { clear, waiting, finished }; + std::array output_clear_{clear}; public: - internal_inputs inputs_; - internal_outputs outputs_; + template + using external_output_at = output>; + template + using external_input_at = input>; template - decltype(outputs_.template get()) output() { - return outputs_.template get(); + external_output_at external_output() { + external_output_at o{external_outputs_.template get()}; + return o; + } + template + external_input_at external_input() { + external_input_at i{external_inputs_.template get()}; + return i; } template - decltype(inputs_.template get()) input() { - return inputs_.template get(); + using input_at = output>; + template + using output_at = input>; + + template + output_at output() { + output_at o{internal_outputs_.template get()}; + return o; + } + template + input_at input() { + input_at i{internal_inputs_.template get()}; + return i; + } + + template + void push_input(ARGS ...values) { + auto my_clock = input_clock_++; + internal::token_color color{my_clock, 0}; + auto my_index = color.get_index(P); + + while (output_clear_[my_index] != clear) { + PLS_ERROR("Full inputs are not implemented jet. Place waiting code in here/yield to scheduler.") + } + + output_clear_[my_index] = waiting; + input_output_color_matching_[my_index] = {color, color}; + + push_input_rec<0, ARGS...>(color, values...); + } + template + void push_input_rec(internal::token_color color) {} + template + void push_input_rec(internal::token_color color, HEAD h, TAIL ...t) { + internal_inputs_.template get().push_token(internal::token{h, color}); + push_input_rec(color, t...); + } + + typename internal_outputs::raw_types get_output() { + while (true) { + if (output_clock_ > input_clock_) { + PLS_ERROR("Must not try to pull outputs if there are not enough inputs.") + } + + auto color_matching = input_output_color_matching_[output_clock_ % P]; + if (std::get<0>(color_matching) == std::get<1>(color_matching)) { + break; + } + output_clock_++; + } + + while (output_clear_[output_clock_ % P] != finished) { + PLS_ERROR("Blocking/Unfinishe outputs are not implemented jet. Place waiting code in here/yield to scheduler.") + } + + auto result = internal_outputs_.get_outputs(output_clock_ % P); + output_clear_[output_clock_ % P] = clear; + output_clock_++; + return result; } }; diff --git a/lib/pls/include/pls/dataflow/inputs.h b/lib/pls/include/pls/dataflow/inputs.h index c7d453e..8e397fa 100644 --- a/lib/pls/include/pls/dataflow/inputs.h +++ b/lib/pls/include/pls/dataflow/inputs.h @@ -11,8 +11,8 @@ namespace dataflow { template struct inputs { - template - using internal_inputs = internal::inputs; + template + using internal_inputs = internal::inputs; }; } diff --git a/lib/pls/include/pls/dataflow/internal/input.h b/lib/pls/include/pls/dataflow/internal/input.h index 889351f..c5b0f97 100644 --- a/lib/pls/include/pls/dataflow/internal/input.h +++ b/lib/pls/include/pls/dataflow/internal/input.h @@ -37,11 +37,17 @@ class input { } public: + input() = default; + void push_token(token token) { tokens_[token.color().get_index(P)] = token; cb_->token_pushed(input_pos_, token.color()); } + token token_at(int p) { + return tokens_[p]; + } + void set_cb(push_token_cb *cb, int input_pos) { cb_ = cb; input_pos_ = input_pos; diff --git a/lib/pls/include/pls/dataflow/internal/inputs.h b/lib/pls/include/pls/dataflow/internal/inputs.h index 5c2218b..100665d 100644 --- a/lib/pls/include/pls/dataflow/internal/inputs.h +++ b/lib/pls/include/pls/dataflow/internal/inputs.h @@ -6,31 +6,50 @@ #include #include "input.h" +#include "outputs.h" namespace pls { namespace dataflow { namespace internal { -template +template class inputs : push_token_cb { + public: + using raw_types = std::tuple; using values_type = std::tuple...>; - values_type values_; + + template + using raw_type_at = typename std::tuple_element::type; + template + using value_type_at = typename std::tuple_element::type; static constexpr unsigned int num_values = std::tuple_size::value; + private: + values_type values_; std::array, P> required_inputs_; + CB cb_; public: - inputs() { + explicit inputs(CB cb) : cb_{cb} { for (int i = 0; i < P; i++) { required_inputs_[i] = num_values; } init_cb<0, I...>::call(this); } + inputs(inputs &&other) = delete; + inputs(const inputs &other) = delete; + inputs &operator=(inputs &&other) = delete; + inputs &operator=(const inputs &other) = delete; + + void token_pushed(int pos, token_color color) override { + auto index = color.get_index(P); + int current_required = --required_inputs_[index]; + + cb_.one_input(pos, color); - void token_pushed(int /*pos*/, token_color color) override { - int current_required = --required_inputs_[color.get_index(P)]; if (current_required == 0) { - std::cout << "All Inputs Avaliable" << std::endl; // TODO: Add proper callback in here + cb_.all_inputs(color); + required_inputs_[index] = num_values; } } @@ -39,14 +58,47 @@ class inputs : push_token_cb { return std::get(values_); } + raw_types get_outputs(int p) { + return fill_output<0, I...>::call(*this, p); + } + template + struct fill_output { + static std::tuple<> call(inputs &self, int p) { + return {}; + } + }; + template + struct fill_output { + static std::tuple call(inputs &self, int p) { + std::tuple().token_at(p).value())> our_tuple{self.get().token_at(p).value()}; + return std::tuple_cat(our_tuple, fill_output::call(self, p)); + } + }; + + void push_to_outputs(outputs &outputs, int input_index, token_color new_color) { + push_to_outputs_rec<0, I...>::call(this, &outputs, input_index, new_color); + } + template + struct push_to_outputs_rec { + static void call(inputs *inputs, outputs *outputs, int input_index, token_color new_color) {} + }; + template + struct push_to_outputs_rec { + static void call(inputs *inputs, outputs *outputs, int input_index, token_color new_color) { + auto token = inputs->get().token_at(input_index); + outputs->template get().push_token({token.value(), new_color}); + push_to_outputs_rec::call(inputs, outputs, input_index, new_color); + } + }; + // TODO: Change CB code using proper templating to save method calls during execution... template struct init_cb { - static void call(inputs *inputs) { } + static void call(inputs *inputs) {} }; template struct init_cb { - static void call(inputs *inputs) { + static void call(inputs *inputs) { inputs->get().set_cb(inputs, POS); init_cb::call(inputs); } diff --git a/lib/pls/include/pls/dataflow/internal/outputs.h b/lib/pls/include/pls/dataflow/internal/outputs.h index ca616ea..77ae3c1 100644 --- a/lib/pls/include/pls/dataflow/internal/outputs.h +++ b/lib/pls/include/pls/dataflow/internal/outputs.h @@ -12,12 +12,21 @@ namespace internal { template class outputs { + public: + using raw_types = std::tuple; using values_type = std::tuple...>; + + template + using raw_type_at = typename std::tuple_element::type; + template + using value_type_at = typename std::tuple_element::type; + + private: values_type values_; public: template - typename std::tuple_element::type &get() { + value_type_at &get() { return std::get(values_); } }; diff --git a/lib/pls/include/pls/dataflow/internal/token.h b/lib/pls/include/pls/dataflow/internal/token.h index a6243f6..edb36b6 100644 --- a/lib/pls/include/pls/dataflow/internal/token.h +++ b/lib/pls/include/pls/dataflow/internal/token.h @@ -13,10 +13,15 @@ namespace internal { */ struct token_color { unsigned int clock_; + unsigned int depth_; int get_index(int parallel_limit) const { return clock_ % parallel_limit; } + + bool operator==(const token_color &other) { + return other.clock_ == clock_ && other.depth_ == depth_; + } }; template @@ -25,6 +30,9 @@ class token { token_color color_; public: + token() : color_{} {}; // Default Constructor Stays uninitialized + token(T value, token_color color) : value_{value}, color_{color} {}; + T value() const { return value_; } token_color color() const { return color_; } };