From 6150b3218a5f5e80e9705e04268c10773adac9ea Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Mon, 22 Jul 2019 11:36:30 +0200 Subject: [PATCH] Pushing data through the new system works. --- app/benchmark_unbalanced/CMakeLists.txt | 2 +- app/benchmark_unbalanced/function_node.cpp | 28 ++++++++++++++++++++++++++++ app/benchmark_unbalanced/node.cpp | 28 ---------------------------- app/playground/main.cpp | 60 ++++++++++++++++++++++++++---------------------------------- lib/pls/CMakeLists.txt | 6 +++--- lib/pls/include/pls/dataflow/graph.h | 179 ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- lib/pls/include/pls/dataflow/internal/buffer.h | 38 -------------------------------------- lib/pls/include/pls/dataflow/internal/function_node.h | 155 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/dataflow/internal/graph.h | 172 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------------------------------------------------------- lib/pls/include/pls/dataflow/internal/in_port.h | 24 ++++++++++++++++++++---- lib/pls/include/pls/dataflow/internal/node.h | 43 +++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/dataflow/internal/out_port.h | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++++- lib/pls/include/pls/dataflow/internal/token.h | 36 +++++++++++++++++------------------- lib/pls/include/pls/internal/helpers/seqence.h | 27 +++++++++++++++++++++++++++ 14 files changed, 463 insertions(+), 389 deletions(-) create mode 100644 app/benchmark_unbalanced/function_node.cpp delete mode 100644 app/benchmark_unbalanced/node.cpp delete mode 100644 lib/pls/include/pls/dataflow/graph.h delete mode 100644 lib/pls/include/pls/dataflow/internal/buffer.h create mode 100644 lib/pls/include/pls/dataflow/internal/function_node.h create mode 100644 lib/pls/include/pls/dataflow/internal/node.h create mode 100644 lib/pls/include/pls/internal/helpers/seqence.h diff --git a/app/benchmark_unbalanced/CMakeLists.txt b/app/benchmark_unbalanced/CMakeLists.txt index 00c95ab..d935ada 100644 --- a/app/benchmark_unbalanced/CMakeLists.txt +++ b/app/benchmark_unbalanced/CMakeLists.txt @@ -1,4 +1,4 @@ -add_executable(benchmark_unbalanced main.cpp node.h node.cpp picosha2.h) +add_executable(benchmark_unbalanced main.cpp node.h function_node.cpp picosha2.h) target_link_libraries(benchmark_unbalanced pls) if (EASY_PROFILER) target_link_libraries(benchmark_unbalanced easy_profiler) diff --git a/app/benchmark_unbalanced/function_node.cpp b/app/benchmark_unbalanced/function_node.cpp new file mode 100644 index 0000000..1cb931e --- /dev/null +++ b/app/benchmark_unbalanced/function_node.cpp @@ -0,0 +1,28 @@ +#include "node.h" + +namespace uts { +node_state node::generate_child_state(uint32_t index) { + node_state result; + + picosha2::hash256_one_by_one hasher; + hasher.process(state_.begin(), state_.end()); + auto index_begin = reinterpret_cast(&index); + hasher.process(index_begin, index_begin + 4); + hasher.finish(); + hasher.get_hash_bytes(result.begin(), result.end()); + + return result; +} + +double node::get_state_random() { + int32_t state_random_integer; + uint32_t b = ((uint32_t) state_[16] << 24) | + ((uint32_t) state_[17] << 16) | + ((uint32_t) state_[18] << 8) | + ((uint32_t) state_[19] << 0); + b = b & 0x7fffffff; // Mask out negative values + state_random_integer = static_cast(b); + + return (double) state_random_integer / (double) INT32_MAX; +} +} diff --git a/app/benchmark_unbalanced/node.cpp b/app/benchmark_unbalanced/node.cpp deleted file mode 100644 index 1cb931e..0000000 --- a/app/benchmark_unbalanced/node.cpp +++ /dev/null @@ -1,28 +0,0 @@ -#include "node.h" - -namespace uts { -node_state node::generate_child_state(uint32_t index) { - node_state result; - - picosha2::hash256_one_by_one hasher; - hasher.process(state_.begin(), state_.end()); - auto index_begin = reinterpret_cast(&index); - hasher.process(index_begin, index_begin + 4); - hasher.finish(); - hasher.get_hash_bytes(result.begin(), result.end()); - - return result; -} - -double node::get_state_random() { - int32_t state_random_integer; - uint32_t b = ((uint32_t) state_[16] << 24) | - ((uint32_t) state_[17] << 16) | - ((uint32_t) state_[18] << 8) | - ((uint32_t) state_[19] << 0); - b = b & 0x7fffffff; // Mask out negative values - state_random_integer = static_cast(b); - - return (double) state_random_integer / (double) INT32_MAX; -} -} diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 74c05de..4aa4802 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -6,48 +6,40 @@ #include #include -#include +#include #include -#include int main() { using namespace pls::dataflow; using namespace pls::dataflow::internal; - out_port port1; - out_port port2; - graph::type, inputs, outputs> tmp{}; + out_port external1; + out_port external2; - port1 >> tmp.in_port<0>(); - port2 >> tmp.in_port<1>(); + auto func1 = [](const int &i1, const int &i2, int &o1) { + std::cout << "Hello! " << i1 << ", " << i2 << std::endl; + o1 = i1 + i2; + }; + function_node, outputs, decltype(func1)> node1{func1}; + auto func2 = [](const int &i1, int &o1) { + std::cout << "We get! " << i1 << std::endl; + }; + function_node, outputs, decltype(func2)> node2{func2}; - port1.push_token({1, {}}); - port2.push_token({2, {}}); + external1 >> node1.in_port<0>(); + external2 >> node1.in_port<1>(); + node1.out_port<0>() >> node2.in_port<0>(); -// using namespace pls::dataflow; -// -// graph, outputs, 2> graph2; -// graph2.input<0>() >> graph2.output<1>(); -// graph2.input<1>() >> graph2.output<0>(); -// -// graph, outputs, 2> graph1; -// graph1.input<0>() >> graph2.external_input<0>(); -// graph1.input<1>() >> graph2.external_input<1>(); -// graph2.external_output<0>() >> graph1.output<0>(); -// graph2.external_output<1>() >> graph1.output<1>(); -// -// graph1.push_input(1, 2); -// graph1.push_input(3, 4); -// -// auto result1 = graph1.get_output(); -// std::cout << std::get<0>(result1) << ", " << std::get<1>(result1) << std::endl; -// -// graph1.push_input(5, 6); -// -// auto result2 = graph1.get_output(); -// std::cout << std::get<0>(result2) << ", " << std::get<1>(result2) << std::endl; -// -// auto result3 = graph1.get_output(); -// std::cout << std::get<0>(result3) << ", " << std::get<1>(result3) << std::endl; + // Simulate execution environment + void *buffer1 = malloc(node1.instance_buffer_size()); + void *buffer2 = malloc(node2.instance_buffer_size()); + void *memory[] = {buffer1, buffer2}; + node1.init_instance_buffer(memory[0]); + node1.set_memory_index(0); + node2.init_instance_buffer(memory[1]); + node2.set_memory_index(1); + invocation_info invocation{memory}; + external1.push_token({1, invocation}); + external2.push_token({2, invocation}); } diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 6735058..1f0f3b7 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -15,8 +15,9 @@ add_library(pls STATIC include/pls/dataflow/internal/token.h include/pls/dataflow/internal/in_port.h include/pls/dataflow/internal/out_port.h + include/pls/dataflow/internal/function_node.h + include/pls/dataflow/internal/node.h include/pls/dataflow/internal/graph.h - include/pls/dataflow/internal/buffer.h include/pls/internal/base/spin_lock.h include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp @@ -48,8 +49,7 @@ add_library(pls STATIC include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp - include/pls/internal/scheduling/lambda_task.h - ) + include/pls/internal/scheduling/lambda_task.h include/pls/internal/helpers/seqence.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls PUBLIC diff --git a/lib/pls/include/pls/dataflow/graph.h b/lib/pls/include/pls/dataflow/graph.h deleted file mode 100644 index 73b4e23..0000000 --- a/lib/pls/include/pls/dataflow/graph.h +++ /dev/null @@ -1,179 +0,0 @@ - -#ifndef PLS_DATAFLOW_GRAPH_H_ -#define PLS_DATAFLOW_GRAPH_H_ - -#include "inputs.h" -#include "outputs.h" -#include "input.h" -#include "output.h" - -#include "internal/token.h" - -namespace pls { -namespace dataflow { - -template -class graph { - /** - * Boilerplate for callbacks of dataflow input/output handlers. - */ - struct external_input_cb { - graph *graph_; - - void one_input(int pos, internal::token_color color) { - graph_->one_external_input(pos, color); - } - void all_inputs(internal::token_color color) { - graph_->all_external_inputs(color); - } - }; - struct internal_output_cb { - graph *graph_; - - void one_input(int pos, internal::token_color color) { - graph_->one_internal_output(pos, color); - } - void all_inputs(internal::token_color color) { - graph_->all_internal_outputs(color); - } - }; - - /** - * Handling of buffers for dataflow. - */ - using exeternal_inputs = typename I::template internal_inputs::external_input_cb>; - using external_outputs = typename O::template internal_outputs

; - exeternal_inputs external_inputs_{external_input_cb{this}}; - external_outputs external_outputs_{}; - - void one_external_input(int pos, internal::token_color color) { - // We do currently not support 'single external inputs'. - // This prevents some fine-grained parallelism, but eases implementation - } - void all_external_inputs(internal::token_color external_color) { - auto my_clock = input_clock_++; - internal::token_color internal_color{my_clock, external_color.depth_ + 1}; - auto my_index = internal_color.get_index(P); - - while (output_clear_[my_index] != clear) { - PLS_ERROR("Full inputs are not implemented jet. Place waiting code in here/yield to scheduler.") - } - - output_clear_[my_index] = waiting; - input_output_color_matching_[my_index] = {external_color, internal_color}; - external_inputs_.push_to_outputs(internal_inputs_, external_color.get_index(P), internal_color); - } - - using internal_outputs = typename I::template internal_inputs::internal_output_cb>; - using internal_inputs = typename O::template internal_outputs

; - internal_outputs internal_outputs_{internal_output_cb{this}}; - internal_inputs internal_inputs_{}; - - void one_internal_output(int pos, internal::token_color color) { - // We do not support 'single flow' output. Every graph is well behaved, e.g. will emmit exactly one output tuple. - } - void all_internal_outputs(internal::token_color color) { - auto color_matching = input_output_color_matching_[color.get_index(P)]; - auto external_color = std::get<0>(color_matching); - auto internal_color = std::get<1>(color_matching); - if (external_color == internal_color) { - output_clear_[color.get_index(P)] = finished; - } else { - internal_outputs_.push_to_outputs(external_outputs_, internal_color.get_index(P), external_color); - output_clear_[color.get_index(P)] = clear; - } - } - - /** - * Handling of colors/maximum concurrent tokens in the system. - */ - std::atomic input_clock_{0}; - std::atomic output_clock_{0}; - std::array, P> input_output_color_matching_{}; - enum output_state { clear, waiting, finished }; - std::array output_clear_{clear}; - - public: - template - using external_output_at = output>; - template - using external_input_at = input>; - - template - external_output_at external_output() { - external_output_at o{external_outputs_.template get()}; - return o; - } - template - external_input_at external_input() { - external_input_at i{external_inputs_.template get()}; - return i; - } - - template - using input_at = output>; - template - using output_at = input>; - - template - output_at output() { - output_at o{internal_outputs_.template get()}; - return o; - } - template - input_at input() { - input_at i{internal_inputs_.template get()}; - return i; - } - - template - void push_input(ARGS ...values) { - auto my_clock = input_clock_++; - internal::token_color color{my_clock, 0}; - auto my_index = color.get_index(P); - - while (output_clear_[my_index] != clear) { - PLS_ERROR("Full inputs are not implemented jet. Place waiting code in here/yield to scheduler.") - } - - output_clear_[my_index] = waiting; - input_output_color_matching_[my_index] = {color, color}; - - push_input_rec<0, ARGS...>(color, values...); - } - template - void push_input_rec(internal::token_color color) {} - template - void push_input_rec(internal::token_color color, HEAD h, TAIL ...t) { - internal_inputs_.template get().push_token(internal::token{h, color}); - push_input_rec(color, t...); - } - - typename internal_outputs::raw_types get_output() { - while (true) { - if (output_clock_ > input_clock_) { - PLS_ERROR("Must not try to pull outputs if there are not enough inputs.") - } - - auto color_matching = input_output_color_matching_[output_clock_ % P]; - if (std::get<0>(color_matching) == std::get<1>(color_matching)) { - break; - } - output_clock_++; - } - - while (output_clear_[output_clock_ % P] != finished) { - PLS_ERROR("Blocking/Unfinishe outputs are not implemented jet. Place waiting code in here/yield to scheduler.") - } - - auto result = internal_outputs_.get_outputs(output_clock_ % P); - output_clear_[output_clock_ % P] = clear; - output_clock_++; - return result; - } -}; - -} -} - -#endif //PLS_DATAFLOW_GRAPH_H_ diff --git a/lib/pls/include/pls/dataflow/internal/buffer.h b/lib/pls/include/pls/dataflow/internal/buffer.h deleted file mode 100644 index 4546b27..0000000 --- a/lib/pls/include/pls/dataflow/internal/buffer.h +++ /dev/null @@ -1,38 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_BUFFER_H_ -#define PLS_DATAFLOW_INTERNAL_BUFFER_H_ - -#include - -#include "in_port.h" - -namespace pls { -namespace dataflow { -namespace internal { - -template -struct static_buffer_impl { - std::array buffer_; - - const T &operator[](size_t i) const { return buffer_[i]; } - T &operator[](size_t i) { return buffer_[i]; } - - static_buffer_impl() : buffer_{} {}; - explicit static_buffer_impl(T init) : buffer_(init) {}; - - int capacity() const { - return P; - } -}; - -template -struct static_buffer { - template - using type = static_buffer_impl; -}; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_BUFFER_H_ diff --git a/lib/pls/include/pls/dataflow/internal/function_node.h b/lib/pls/include/pls/dataflow/internal/function_node.h new file mode 100644 index 0000000..ea88b28 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/function_node.h @@ -0,0 +1,155 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_ +#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_ + +#include +#include +#include + +#include "in_port.h" +#include "out_port.h" +#include "node.h" + +#include "pls/dataflow/inputs.h" +#include "pls/dataflow/outputs.h" + +#include "pls/internal/helpers/seqence.h" + +namespace pls { +namespace dataflow { +namespace internal { + +using namespace pls::internal::helpers; + +template +class function_node {}; + +template +class function_node, pls::dataflow::outputs, F> : public node { + private: + // Our own type + using self_type = function_node, pls::dataflow::outputs, F>; + + // Input-Output port types + using multi_in_port_type = multi_in_port; + using multi_out_port_type = multi_out_port; + + // Input-Output value tuples + using input_tuple = std::tuple, token...>; + using output_tuple = std::tuple, token...>; + static constexpr int num_in_ports = std::tuple_size(); + static constexpr int num_out_ports = std::tuple_size(); + + // Memory used for ONE active invocation of the dataflow-graph + struct invocation_memory { + std::atomic inputs_missing_; + input_tuple input_buffer_; + }; + + public: + template + using in_port_at = typename multi_in_port_type::template in_port_type_at; + template + using out_port_at = typename multi_out_port_type::template out_port_type_at; + + template + in_port_at &in_port() { + return in_port_.template get(); + } + + template + out_port_at &out_port() { + return out_port_.template get(); + } + + template + void token_pushed(token token) { + auto current_memory = get_invocation(token); + + std::get(current_memory->input_buffer_) = token; + auto remaining_inputs = --(current_memory->inputs_missing_); + if (remaining_inputs == 0) { + execute_function(current_memory, token.invocation()); + current_memory->inputs_missing_ = num_in_ports; + } + } + + void execute_function(invocation_memory *invocation_memory, invocation_info invocation_info) { + input_tuple &inputs = invocation_memory->input_buffer_; + output_tuple outputs; + + execute_function_internal(inputs, typename sequence_gen<1 + sizeof...(I)>::type(), + outputs, typename sequence_gen<1 + sizeof...(O)>::type(), invocation_info); + + } + template + void set_invocation_info(token &token, invocation_info invocation_info) { + token.set_invocation(invocation_info); + } + template + struct propagate_output { + propagate_output(multi_out_port_type &, output_tuple &) {} + void propagate() {} + }; + template + struct propagate_output { + multi_out_port_type &out_port_; + output_tuple &output_tuple_; + + propagate_output(multi_out_port_type &out_port, output_tuple &output_tuple) : out_port_{out_port}, + output_tuple_{output_tuple} {} + + void propagate() { + out_port_.template get().push_token(std::get(output_tuple_)); + propagate_output{out_port_, output_tuple_}.propagate(); + } + }; + + template + void execute_function_internal(input_tuple &inputs, + sequence, + output_tuple &outputs, + sequence, + invocation_info invocation_info) { + set_invocation_info(std::get(outputs)..., invocation_info); + function_(std::get(inputs).value()..., std::get(outputs).value()...); + propagate_output<0, O0, O...>{out_port_, outputs}.propagate(); + } + + explicit function_node(F function) : in_port_{this, this}, out_port_{}, function_{function} {} + + ////////////////////////////////////////////////////////////////// + // Overrides for generic node functionality (building the graph) + ////////////////////////////////////////////////////////////////// + int num_successors() const override { + return num_out_ports; + } + node *successor_at(int pos) const override { + return out_port_.next_node_at(pos); + } + + bool is_fully_connected() const override { + return in_port_.fully_connected() && out_port_.fully_connected(); + } + + int instance_buffer_size() const override { + return sizeof(invocation_memory); + } + void init_instance_buffer(void *memory) const override { + auto invocation = new(memory) invocation_memory{}; + invocation->inputs_missing_ = num_in_ports; + }; + + private: + // Input/Output ports + multi_in_port_type in_port_; + multi_out_port_type out_port_; + + F function_; +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/graph.h b/lib/pls/include/pls/dataflow/internal/graph.h index 1cb7905..0ae153a 100644 --- a/lib/pls/include/pls/dataflow/internal/graph.h +++ b/lib/pls/include/pls/dataflow/internal/graph.h @@ -2,11 +2,11 @@ #ifndef PLS_DATAFLOW_INTERNAL_GRAPH_H_ #define PLS_DATAFLOW_INTERNAL_GRAPH_H_ +#include #include #include -#include -#include "buffer.h" +#include "node.h" #include "in_port.h" #include "out_port.h" @@ -17,65 +17,32 @@ namespace pls { namespace dataflow { namespace internal { -struct graph_invocation { - enum state { clear, running, finished }; - - explicit graph_invocation(int num_outputs) : num_outputs_{num_outputs}, outputs_missing_{num_outputs} {}; - - void reset() { - state_ = clear; - internal_call_ = false; - previous_color_ = {}; - - outputs_missing_ = num_outputs_; - } - - const int num_outputs_; - std::atomic outputs_missing_; - - state state_{clear}; - - bool internal_call_{false}; - token_color previous_color_{}; -}; - -template class B, typename INS, typename OUTS> +template class graph {}; -template class B, typename I0, typename ...I, typename O0, typename ...O> -class graph, pls::dataflow::outputs> { - private: - using self_type = graph, pls::dataflow::outputs>; - struct in_port_cb { - self_type &self_; +template +class graph, pls::dataflow::outputs> : public node { - explicit in_port_cb(self_type &self) : self_{self} {}; + private: + // Our own type + using self_type = graph, pls::dataflow::outputs>; - template - void token_pushed(token token) { - self_.in_port_token_pushed(token); - } - }; - struct output_cb { - const self_type &self_; + // Input-Output port types + using multi_in_port_type = multi_in_port; + using multi_out_port_type = multi_out_port; - explicit output_cb(self_type &self) : self_{self} {}; + // Input-Output value tuples + using input_tuple = std::tuple, token...>; + using output_tuple = std::tuple, token...>; + static constexpr int num_in_ports = std::tuple_size(); + static constexpr int num_out_ports = std::tuple_size(); - template - void token_pushed(token token) { - self_.output_token_pushed(token); - } + // Memory used for ONE active invocation of the dataflow-graph + struct invocation_memory { + std::atomic inputs_missing_; + input_tuple input_buffer_; }; - // Type-Defs used internally - using my_type = graph, pls::dataflow::outputs>; - - using multi_in_port_type = multi_in_port; - using multi_out_port_type = multi_out_port; - - using input_tuple = std::tuple; - using output_tuple = std::tuple; - public: template using in_port_at = typename multi_in_port_type::template in_port_type_at; @@ -93,51 +60,92 @@ class graph, pls::dataflow::outputs } template - void in_port_token_pushed(token token) { - auto my_clock = input_clock_++; - - auto index = token.color().get_index(buffer_size_); + void token_pushed(token token) { + auto invocation = get_invocation(token); std::cout << "Token pushed at " << POS << " with value " << token.value() << std::endl; + auto remaining_inputs = --(invocation->inputs_missing_); + if (remaining_inputs == 0) { + std::cout << "All inputs there!" << std::endl; + } + } -// std::get(inputs_[index]) = token; -// -// auto remaining = --inputs_required_[index]; -// if (remaining == 0) { -// std::cout << "All tokens at clock " << token.color().clock_ << std::endl; -// } + graph() : in_port_{this, this}, out_port_{} { } - template - void output_token_pushed(token token) { + ///////////////////////////////////////////////////////////////// + // Graph building + ///////////////////////////////////////////////////////////////// + void build() { + state_ = building; + PLS_ASSERT(is_fully_connected(), "Must fully connect dataflow graphs!") + + num_nodes_ = 0; + for (int i = 0; i < num_in_ports; i++) { + build_recursive(successor_at(i)); + } + state_ = built; } - graph() : in_port_cb_{*this}, in_port_{in_port_cb_}, out_port_{}, invocations_{} { - buffer_size_ = invocations_.capacity(); + node *start_{nullptr}; + node *current_{nullptr}; + + void build_recursive(node *node) { + if (node->state_ != fresh) { + return; // Already visited + } + node->state_ = building; + PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graphs") + + add_node(node); + for (int i = 0; i < node->num_successors(); i++) { + build_recursive(node->successor_at(i)); + } + + node->state_ = built; } - private: - void reset_input(int i) { - inputs_missing_[i] = + void add_node(node *new_node) { + new_node->memory_index_ = num_nodes_++; + + if (start_ == nullptr) { + start_ = new_node; + current_ = new_node; + } else { + current_->direct_successor_ = new_node; + current_ = new_node; + } } - // Input/Output ports - in_port_cb in_port_cb_; - multi_in_port_type in_port_; - multi_out_port_type out_port_; + ////////////////////////////////////////////////////////////////// + // Overrides for generic node functionality (building the graph) + ////////////////////////////////////////////////////////////////// + int num_successors() const override { + return num_out_ports; + } + node *successor_at(int pos) const override { + return out_port_.next_node_at(pos); + } - const int buffer_size_; + bool is_fully_connected() const override { + return in_port_.fully_connected() && out_port_.fully_connected(); + } - // Clocks and state for execution - std::atomic input_clock_{0}; - std::atomic output_clock_{0}; + int instance_buffer_size() const override { + return sizeof(invocation_memory); + } + void init_instance_buffer(void *memory) const override { + auto invocation = new(memory) invocation_memory{}; + invocation->inputs_missing_ = num_in_ports; + }; - B input_buffer_; - B> inputs_missing_; + private: + // Input/Output ports + multi_in_port_type in_port_; + multi_out_port_type out_port_; - B invocations_; - B output_buffer_; + int num_nodes_{0}; }; } diff --git a/lib/pls/include/pls/dataflow/internal/in_port.h b/lib/pls/include/pls/dataflow/internal/in_port.h index 89f9ebe..629c1f7 100644 --- a/lib/pls/include/pls/dataflow/internal/in_port.h +++ b/lib/pls/include/pls/dataflow/internal/in_port.h @@ -3,7 +3,9 @@ #define PLS_DATAFLOW_INTERNAL_INPUT_H_ #include "pls/internal/base/error_handling.h" + #include "token.h" +#include "node.h" namespace pls { namespace dataflow { @@ -19,11 +21,17 @@ class in_port { friend class out_port; + public: + explicit in_port(node *owning_node) : owning_node_{owning_node} {}; + node *owning_node() const { return owning_node_; } + bool is_connected() const { return connected_; } + protected: virtual void token_pushed(token token) = 0; private: bool connected_{false}; + node *const owning_node_; void push_token(token token) { token_pushed(token); @@ -52,7 +60,11 @@ template class multi_in_port { // end of template recursion public: - explicit multi_in_port(CB &) {}; + explicit multi_in_port(node *, CB *) {}; + + bool fully_connected() const { + return true; + } }; template class multi_in_port : public in_port { @@ -63,10 +75,10 @@ class multi_in_port : public in_port { using value_type = I0; public: - explicit multi_in_port(CB &cb) : cb_{cb}, rec_{cb} {}; + explicit multi_in_port(node *owning_node, CB *cb) : in_port{owning_node}, cb_{cb}, rec_{owning_node, cb} {}; void token_pushed(token token) override { - cb_.template token_pushed(token); + cb_->template token_pushed(token); } // Helper struct required for recursive access to types by index @@ -110,9 +122,13 @@ class multi_in_port : public in_port { return get_at, my_type>::get(*this); } + bool fully_connected() const { + return this->is_connected() && rec_.fully_connected(); + } + private: child_type rec_; - CB &cb_; + CB *cb_; }; } diff --git a/lib/pls/include/pls/dataflow/internal/node.h b/lib/pls/include/pls/dataflow/internal/node.h new file mode 100644 index 0000000..8f7ac4d --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/node.h @@ -0,0 +1,43 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_NODE_H_ +#define PLS_DATAFLOW_INTERNAL_NODE_H_ + +namespace pls { +namespace dataflow { +namespace internal { + +class node { + template + friend + class graph; + + enum state { fresh, building, built, teardown }; + + public: + virtual int num_successors() const = 0; + virtual node *successor_at(int pos) const = 0; + + virtual int instance_buffer_size() const = 0; + virtual void init_instance_buffer(void *memory) const = 0; + + virtual bool is_fully_connected() const = 0; + + template + M *get_invocation(token token) { + return token.invocation().template get_instance_buffer(memory_index_); + } + + // TODO: Remove + void set_memory_index(int i) { memory_index_ = i; } + + private: + int memory_index_{0}; + node *direct_successor_; + state state_{fresh}; +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/out_port.h b/lib/pls/include/pls/dataflow/internal/out_port.h index 0904547..dc8f816 100644 --- a/lib/pls/include/pls/dataflow/internal/out_port.h +++ b/lib/pls/include/pls/dataflow/internal/out_port.h @@ -5,6 +5,7 @@ #include #include "in_port.h" +#include "node.h" namespace pls { namespace dataflow { @@ -27,10 +28,16 @@ class out_port { connect(input); } - void push_token(token token) { + void push_token(token token) const { target_->push_token(token); } + node *next_node() const { + return target_->owning_node(); + } + + bool is_connected() const { return connected_; } + private: bool connected_{false}; in_port *target_{nullptr}; @@ -47,6 +54,7 @@ class multi_out_port { // Helpers for managing recursive types using value_tuple_type = std::tuple; using out_port_tupel_type = std::tuple, out_port...>; + static constexpr int num_outputs = std::tuple_size(); public: // Simple interface to get types by index @@ -61,8 +69,52 @@ class multi_out_port { return std::get(outputs_); } + node *next_node_at(int pos) const { + return next_node < 0 > {&outputs_}.get(pos); + } + + bool fully_connected() const { + return connected < 0 > {&outputs_}.get(); + } + private: out_port_tupel_type outputs_; + + template + struct next_node { + const out_port_tupel_type *outputs_; + node *get(int i) { + if (POS == i) { + return std::get(*outputs_).next_node(); + } else { + return next_node{outputs_}.get(i); + } + } + }; + template + struct next_node { + const out_port_tupel_type *outputs_; + node *get(int i) { + PLS_ERROR("Try to access invalid successor node index!") + } + }; + + template + struct connected { + const out_port_tupel_type *outputs_; + bool get() { + bool self = std::get(*outputs_).is_connected(); + bool children = connected{outputs_}.get(); + return self && children; + } + }; + template + struct connected { + const out_port_tupel_type *outputs_; + bool get() { + return true; + } + }; }; } diff --git a/lib/pls/include/pls/dataflow/internal/token.h b/lib/pls/include/pls/dataflow/internal/token.h index edb36b6..ee40720 100644 --- a/lib/pls/include/pls/dataflow/internal/token.h +++ b/lib/pls/include/pls/dataflow/internal/token.h @@ -6,35 +6,33 @@ namespace pls { namespace dataflow { namespace internal { -/** - * Parallel invocations of the same sub-graph are usually working with some kind of coloring - * for tokens to distinguishe different invocations. As this concept is abstract and we could - * change it in the future (for e.g. more advanced features/dataflows) we encapsulate it. - */ -struct token_color { - unsigned int clock_; - unsigned int depth_; - - int get_index(int parallel_limit) const { - return clock_ % parallel_limit; - } +class invocation_info { + public: + explicit invocation_info(void **memory) : memory_{memory} {}; + invocation_info(invocation_info const &other) = default; - bool operator==(const token_color &other) { - return other.clock_ == clock_ && other.depth_ == depth_; + template + T *get_instance_buffer(int pos) { + return reinterpret_cast(memory_[pos]); } + + private: + void **memory_; }; template class token { T value_; - token_color color_; + invocation_info invocation_; public: - token() : color_{} {}; // Default Constructor Stays uninitialized - token(T value, token_color color) : value_{value}, color_{color} {}; + token() : invocation_{nullptr} {}; // Default Constructor Stays uninitialized + token(T value, invocation_info color) : value_{value}, invocation_{color} {}; + + T &value() { return value_; } + invocation_info invocation() const { return invocation_; } - T value() const { return value_; } - token_color color() const { return color_; } + void set_invocation(invocation_info invocation_info) { invocation_ = invocation_info; } }; } diff --git a/lib/pls/include/pls/internal/helpers/seqence.h b/lib/pls/include/pls/internal/helpers/seqence.h new file mode 100644 index 0000000..1a63736 --- /dev/null +++ b/lib/pls/include/pls/internal/helpers/seqence.h @@ -0,0 +1,27 @@ + +#ifndef PLS_INTERNAL_HELPERS_SEQENCE_H_ +#define PLS_INTERNAL_HELPERS_SEQENCE_H_ + +// See: https://stackoverflow.com/questions/7858817/unpacking-a-tuple-to-call-a-matching-function-pointer +// Would be easy in C++ 14 (has index sequences), but seems to be the only way to do it in C++ 11 + +namespace pls { +namespace internal { +namespace helpers { + +template +struct sequence {}; + +template +struct sequence_gen : sequence_gen {}; + +template +struct sequence_gen<0, S...> { + typedef sequence type; +}; + +} +} +} + +#endif //PLS_INTERNAL_HELPERS_SEQENCE_H_ -- libgit2 0.26.0