diff --git a/lib/pls/include/pls/dataflow/dataflow.h b/lib/pls/include/pls/dataflow/dataflow.h deleted file mode 100644 index e61d8a5..0000000 --- a/lib/pls/include/pls/dataflow/dataflow.h +++ /dev/null @@ -1,36 +0,0 @@ - -#ifndef PLS_DATAFLOW_DATAFLOW_H_ -#define PLS_DATAFLOW_DATAFLOW_H_ - -#include "internal/graph.h" -#include "internal/function_node.h" -#include "internal/merge_node.h" -#include "internal/switch_node.h" -#include "internal/split_node.h" - -#include "internal/inputs.h" -#include "internal/outputs.h" - -namespace pls { -namespace dataflow { - -template -using graph = internal::graph; -template -using function_node = internal::function_node; -template -using merge_node = internal::merge_node; -template -using switch_node = internal::switch_node; -template -using split_node = internal::split_node; - -template -using inputs = internal::inputs; -template -using outputs = internal::outputs; - -} -} - -#endif //PLS_DATAFLOW_DATAFLOW_H_ diff --git a/lib/pls/include/pls/dataflow/internal/build_state.h b/lib/pls/include/pls/dataflow/internal/build_state.h deleted file mode 100644 index 753491c..0000000 --- a/lib/pls/include/pls/dataflow/internal/build_state.h +++ /dev/null @@ -1,15 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_ -#define PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_ - -namespace pls { -namespace dataflow { -namespace internal { - -enum class build_state { fresh, building, built, teardown }; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/function_node.h b/lib/pls/include/pls/dataflow/internal/function_node.h deleted file mode 100644 index 47fed95..0000000 --- a/lib/pls/include/pls/dataflow/internal/function_node.h +++ /dev/null @@ -1,138 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_ -#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_ - -#include -#include -#include - -#include "in_port.h" -#include "out_port.h" -#include "node.h" - -#include "inputs.h" -#include "outputs.h" - -#include "pls/internal/helpers/seqence.h" -#include "pls/pls.h" - -namespace pls { -namespace dataflow { -namespace internal { - -using namespace pls::internal::helpers; - -// Forward Decl -template -class graph; - -template -class function_node {}; - -template -class function_node, outputs, F> : public node { - private: - // Our own type - using self_type = function_node, outputs, F>; - - // Input-Output port types - using multi_in_port_type = multi_in_port; - using multi_out_port_type = multi_out_port; - - // Input-Output value tuples - using input_tuple = std::tuple, token...>; - using output_tuple = std::tuple, token...>; - static constexpr int num_in_ports = std::tuple_size(); - static constexpr int num_out_ports = std::tuple_size(); - - // Memory used for ONE active invocation of the dataflow-graph - struct invocation_memory { - std::atomic inputs_missing_; - input_tuple input_buffer_; - }; - - public: - explicit function_node(F function) : in_port_{this, this}, out_port_{}, function_{function} {} - - template - using in_port_at = typename multi_in_port_type::template in_port_type_at; - template - using out_port_at = typename multi_out_port_type::template out_port_type_at; - - template - in_port_at &in_port() { - return in_port_.template get(); - } - - multi_in_port_type &in_ports() { - return in_port_; - } - - template - out_port_at &out_port() { - return out_port_.template get(); - } - - multi_out_port_type &out_ports() { - return out_port_; - } - - template - function_node, outputs, FUNC> - &operator>>(function_node, outputs, FUNC> &other_node); - - template - void operator>>(graph, outputs> &graph); - - template - void token_pushed(token token); - - int num_successors() const override { - return num_out_ports; - } - node *successor_at(int pos) const override { - return out_port_.next_node_at(pos); - } - - bool is_fully_connected() const override { - return in_port_.fully_connected() && out_port_.fully_connected(); - } - - int instance_buffer_size() const override { - return sizeof(invocation_memory); - } - void init_instance_buffer(void *memory) const override { - auto invocation = new(memory) invocation_memory{}; - invocation->inputs_missing_ = num_in_ports; - }; - void clean_up_instance_buffer(void *memory) const override { - auto invocation = reinterpret_cast(memory); - invocation->~invocation_memory(); - } - - private: - multi_in_port_type in_port_; - multi_out_port_type out_port_; - - F function_; - - ////////////////////////////////////////////////////////////////// - // Helpers for actually calling the work lambda - ////////////////////////////////////////////////////////////////// - void execute_function(invocation_memory *invocation_memory, invocation_info invocation_info); - - template - struct propagate_output; - - template - void execute_function_internal(input_tuple &inputs, sequence, - output_tuple &outputs, sequence, - invocation_info invocation_info); -}; - -} -} -} -#include "function_node_impl.h" - -#endif //PLS_DATAFLOW_INTERNAL_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/function_node_impl.h b/lib/pls/include/pls/dataflow/internal/function_node_impl.h deleted file mode 100644 index 35e17df..0000000 --- a/lib/pls/include/pls/dataflow/internal/function_node_impl.h +++ /dev/null @@ -1,100 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_ -#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_ - -#include "graph.h" -#include "pls/internal/helpers/easy_profiler.h" - -namespace pls { -namespace dataflow { -namespace internal { - -template -template -function_node, outputs, FUNC>& -function_node, outputs, F>:: -operator>>(function_node, outputs, FUNC> &other_node) { - out_port_ >> other_node.in_ports(); - return other_node; -} - -template -template -void function_node, outputs, F>:: -operator>>(graph, outputs> &graph) { - out_port_ >> graph.output_ports(); -} - -template -template -void function_node, outputs, F>:: -token_pushed(token token) { - auto current_memory = get_invocation(token.invocation()); - - std::get(current_memory->input_buffer_) = token; - auto remaining_inputs = --(current_memory->inputs_missing_); - if (remaining_inputs == 0) { - execute_function(current_memory, token.invocation()); - current_memory->inputs_missing_ = num_in_ports; - } -} - -////////////////////////////////////////////////////////////////// -// Helpers for actually calling the work lambda -////////////////////////////////////////////////////////////////// -template -void function_node, outputs, F>:: -execute_function(invocation_memory *invocation_memory, invocation_info invocation_info) { - auto lambda = [=]() { - PROFILE_WORK_BLOCK("Function Node") - input_tuple &inputs = invocation_memory->input_buffer_; - output_tuple outputs{}; - - execute_function_internal(inputs, typename sequence_gen<1 + sizeof...(I)>::type(), - outputs, typename sequence_gen<1 + sizeof...(O)>::type(), invocation_info); - PROFILE_END_BLOCK - }; - // TODO: maybe replace this with 'continuation' style invocation - pls::scheduler::spawn_child>(lambda); -} - -template -template -struct function_node, outputs, F>:: -propagate_output { - propagate_output(multi_out_port_type &, output_tuple &, invocation_info&) {} - void propagate() {} -}; -template -template -struct function_node, outputs, F>:: -propagate_output { - multi_out_port_type &out_port_; - output_tuple &output_tuple_; - invocation_info &invocation_info_; - - propagate_output(multi_out_port_type &out_port, output_tuple &output_tuple, invocation_info& invocation_info) : - out_port_{out_port}, output_tuple_{output_tuple}, invocation_info_{invocation_info} {} - - void propagate() { - std::get(output_tuple_).set_invocation(invocation_info_); - out_port_.template get().push_token(std::get(output_tuple_)); - propagate_output{out_port_, output_tuple_, invocation_info_}.propagate(); - } -}; - -template -template -void function_node, outputs, F>:: -execute_function_internal(input_tuple &inputs, sequence, - output_tuple &outputs, sequence, - invocation_info invocation_info) { - function_(std::get(inputs).value()..., std::get(outputs).value()...); - propagate_output<0, O0, O...>{out_port_, outputs, invocation_info}.propagate(); -} - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_ diff --git a/lib/pls/include/pls/dataflow/internal/graph.h b/lib/pls/include/pls/dataflow/internal/graph.h deleted file mode 100644 index d1580b8..0000000 --- a/lib/pls/include/pls/dataflow/internal/graph.h +++ /dev/null @@ -1,146 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_H_ -#define PLS_DATAFLOW_INTERNAL_GRAPH_H_ - -#include -#include -#include - -#include "build_state.h" -#include "node.h" -#include "function_node.h" -#include "in_port.h" -#include "out_port.h" -#include "inputs.h" -#include "outputs.h" - -#include "pls/pls.h" - -namespace pls { -namespace dataflow { -namespace internal { - -template -class graph {}; - -template -class graph, outputs> : public node { - template - friend - class function_node; - private: - // Our own type - using self_type = graph, outputs>; - - // Input-Output port types (internal) - using inputs_type = multi_out_port; - using outputs_type = multi_in_port; - - // Input-Output value tuples - using value_input_tuple = std::tuple; - using input_tuple = std::tuple, token...>; - - using value_output_tuple = std::tuple; - using output_tuple = std::tuple, token...>; - - static constexpr int num_in_ports = std::tuple_size(); - static constexpr int num_out_ports = std::tuple_size(); - - // Memory used for ONE active invocation of the dataflow-graph - struct invocation_memory { - value_output_tuple *output_buffer_; - }; - - public: - template - using input_at = typename inputs_type::template out_port_type_at; - template - using output_at = typename outputs_type::template in_port_type_at; - - template - input_at &input() { - return inputs_.template get(); - } - - inputs_type &input_ports() { - return inputs_; - } - - template - output_at &output() { - return outputs_.template get(); - } - - outputs_type &output_ports() { - return outputs_; - } - - template - function_node, outputs, FUNC> - &operator>>(function_node, outputs, FUNC> &other_node); - - void wait_for_all() { - pls::scheduler::wait_for_all(); - } - - template - void token_pushed(token token); - - graph() : inputs_{}, outputs_{this, this} {} - - void build(); - - void run(value_input_tuple input, value_output_tuple *output) { - PLS_ASSERT(build_state_ == build_state::built, "Must build graph before running it!") - const auto lambda = [=]() { - pls::scheduler::spawn_child(this, input, output); - }; - pls::scheduler::spawn_child>(lambda); - } - - int num_successors() const override { - return 0; - } - node *successor_at(int) const override { - PLS_ERROR("A graph instance has no direct successor!") - } - - bool is_fully_connected() const override { - return inputs_.fully_connected() && outputs_.fully_connected(); - } - - int instance_buffer_size() const override { - return sizeof(invocation_memory); - } - void init_instance_buffer(void *memory) const override { - new(memory) invocation_memory{}; - }; - void clean_up_instance_buffer(void *memory) const override { - auto invocation = reinterpret_cast(memory); - invocation->~invocation_memory(); - } - - private: - inputs_type inputs_; - outputs_type outputs_; - - // Information about building the graph - node *node_list_start_{nullptr}; - node *node_list_current_{nullptr}; - int num_nodes_{0}; - - // Internals required for building and running - void build_recursive(node *node); - void add_node(node *new_node); - - template - struct feed_inputs; - class run_graph_task; -}; - -} -} -} -#include "graph_impl.h" - -#endif //PLS_DATAFLOW_INTERNAL_GRAPH_H_ diff --git a/lib/pls/include/pls/dataflow/internal/graph_impl.h b/lib/pls/include/pls/dataflow/internal/graph_impl.h deleted file mode 100644 index 1a9e2a2..0000000 --- a/lib/pls/include/pls/dataflow/internal/graph_impl.h +++ /dev/null @@ -1,156 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_ -#define PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_ - -#include "pls/internal/helpers/easy_profiler.h" - -namespace pls { -namespace dataflow { -namespace internal { - -template -template -function_node, outputs, FUNC> &graph, outputs>:: -operator>>(function_node, outputs, FUNC> &other_node) { - inputs_ >> other_node.in_ports(); - return other_node; -} - -template -template -void graph, outputs>:: -token_pushed(token token) { - auto invocation = get_invocation(token.invocation()); - std::get(*invocation->output_buffer_) = token.value(); -} - -template -void graph, outputs>:: -build() { - PLS_ASSERT(build_state_ == build_state::fresh, "Must only build a dataflow graph once!") - PLS_ASSERT(is_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!") - - build_state_ = build_state::building; - node_list_start_ = node_list_current_ = this; - memory_index_ = 0; - num_nodes_ = 1; - for (int i = 0; i < num_in_ports; i++) { - build_recursive(inputs_.next_node_at(i)); - } - build_state_ = build_state::built; -} - -template -void graph, outputs>:: -build_recursive(node *node) { - if (node->build_state_ != build_state::fresh) { - return; // Already visited - } - node->build_state_ = build_state::building; - PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graph nodes!") - - add_node(node); - for (int i = 0; i < node->num_successors(); i++) { - build_recursive(node->successor_at(i)); - } - - node->build_state_ = build_state::built; -} - -template -void graph, outputs>:: -add_node(node *new_node) { - new_node->memory_index_ = num_nodes_++; - - if (node_list_current_ == nullptr) { - node_list_current_ = new_node; - node_list_start_ = new_node; - } else { - node_list_current_->direct_successor_ = new_node; - node_list_current_ = new_node; - } -} - -template -template -struct graph, outputs>:: -feed_inputs { - feed_inputs(inputs_type &, value_input_tuple &, invocation_info &) {} - void run() {} -}; - -template -template -struct graph, outputs>:: -feed_inputs { - inputs_type &inputs_; - value_input_tuple &input_values_; - invocation_info &invocation_; - - feed_inputs(inputs_type &inputs, - value_input_tuple &input_values, - invocation_info &invocation) : inputs_{inputs}, - input_values_{input_values}, - invocation_{invocation} {} - - void run() { - inputs_.template get().push_token(token{std::get(input_values_), invocation_}); - feed_inputs{inputs_, input_values_, invocation_}.run(); - } -}; - -template -class graph, outputs>::run_graph_task : public pls::task { - graph *self_; - value_input_tuple input_; - value_output_tuple *output_; - - // Buffers for actual execution - invocation_info invocation_; - - public: - run_graph_task(graph *self, value_input_tuple input, value_output_tuple *output) : self_{self}, - input_{input}, - output_{output}, - invocation_{nullptr} { - void **buffers; - buffers = reinterpret_cast(allocate_memory(self_->num_nodes_ * sizeof(void *))); - - node *iterator = self_->node_list_start_; - for (int i = 0; i < self_->num_nodes_; i++) { - auto required_size = iterator->instance_buffer_size(); - buffers[i] = allocate_memory(required_size); - iterator->init_instance_buffer(buffers[i]); - - iterator = iterator->direct_successor_; - } - invocation_ = invocation_info{buffers}; - - self_->get_invocation(invocation_)->output_buffer_ = output; - } - - ~run_graph_task() { - node *iterator = self_->node_list_start_; - for (int i = 0; i < self_->num_nodes_; i++) { - void* memory = invocation_.get_instance_buffer(i); - iterator->clean_up_instance_buffer(memory); - - iterator = iterator->direct_successor_; - } - } - - void execute_internal() override { - PROFILE_WORK_BLOCK("Graph Invocation") - feed_inputs<0, I0, I...>{self_->inputs_, input_, invocation_}.run(); - - wait_for_all(); - this->~run_graph_task(); - PROFILE_END_BLOCK - } -}; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_ diff --git a/lib/pls/include/pls/dataflow/internal/in_port.h b/lib/pls/include/pls/dataflow/internal/in_port.h deleted file mode 100644 index 981808b..0000000 --- a/lib/pls/include/pls/dataflow/internal/in_port.h +++ /dev/null @@ -1,136 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_INPUT_H_ -#define PLS_DATAFLOW_INTERNAL_INPUT_H_ - -#include "pls/internal/base/error_handling.h" - -#include "token.h" -#include "node.h" - -namespace pls { -namespace dataflow { -namespace internal { - -/** - * Represents a single, logical input port (no data store, simply signal propagation). - * @tparam T Type of the input port - */ -template -class in_port { - template - friend - class out_port; - - public: - explicit in_port(node *owning_node) : owning_node_{owning_node} {}; - node *owning_node() const { return owning_node_; } - bool is_connected() const { return connected_; } - - protected: - virtual void token_pushed(token token) = 0; - - private: - bool connected_{false}; - node *const owning_node_; - - void push_token(token token) { - token_pushed(token); - } - - void connect() { - if (connected_) { - PLS_ERROR("Must only connect on input once. Disconnect the output pointing to it before reconnecting.") - } - connected_ = true; - } -}; - -/** - * Represents multiple input ports bundled together (a tuple of inputs). - * Allows for a unified callback method to handle multiple typed inputs. - * - * template - * void token_pushed(token token) { Notified when tokens arrive } - * - * @tparam CB The class implementing a callback - * @tparam N Put 0 to start the recursive implementation - * @tparam I A variadic list of input types - */ -template -class multi_in_port { - // end of template recursion - public: - explicit multi_in_port(node *, CB *) {}; - - bool fully_connected() const { - return true; - } -}; -template -class multi_in_port : public in_port { - public: - // Helpers for managing recursive types - using my_type = multi_in_port; - using child_type = multi_in_port; - using value_type = I0; - - explicit multi_in_port(node *owning_node, CB *cb) : in_port{owning_node}, rec_{owning_node, cb}, cb_{cb} {}; - - void token_pushed(token token) override { - cb_->template token_pushed(token); - } - - // Helper struct required for recursive access to types by index - template - struct type_at { - }; - template - struct type_at<0, T0, T...> { - using type = T0; - using in_port_type = in_port; - }; - template - struct type_at { - using type = typename type_at::type; - using in_port_type = in_port; - }; - - // Simple interface to get types by index - template - using in_port_type_at = typename type_at::in_port_type; - template - using value_type_at = typename type_at::type; - - // Helper struct required for recursive access to input's by index - template - struct get_at { - static RES_T &get(MY_T &self) { - return get_at::get(self.rec_); - } - }; - template - struct get_at<0, RESULT, CURRENT> { - static RESULT &get(CURRENT &self) { - return self; - } - }; - - // Simple interface to access input's by index - template - in_port_type_at &get() { - return get_at, my_type>::get(*this); - } - - bool fully_connected() const { - return this->is_connected() && rec_.fully_connected(); - } - - child_type rec_; - CB *cb_; -}; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_INPUT_H_ diff --git a/lib/pls/include/pls/dataflow/internal/inputs.h b/lib/pls/include/pls/dataflow/internal/inputs.h deleted file mode 100644 index 9be844a..0000000 --- a/lib/pls/include/pls/dataflow/internal/inputs.h +++ /dev/null @@ -1,17 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_INPUTS_H_ -#define PLS_DATAFLOW_INTERNAL_INPUTS_H_ - -namespace pls { -namespace dataflow { -namespace internal { - -template -struct inputs { -}; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_INPUTS_H_ diff --git a/lib/pls/include/pls/dataflow/internal/merge_node.h b/lib/pls/include/pls/dataflow/internal/merge_node.h deleted file mode 100644 index 8da2e64..0000000 --- a/lib/pls/include/pls/dataflow/internal/merge_node.h +++ /dev/null @@ -1,119 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_ -#define PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_ - -#include - -#include "in_port.h" -#include "out_port.h" -#include "node.h" -#include "token.h" - -namespace pls { -namespace dataflow { -namespace internal { - -template -class merge_node : public node { - // Our own type - using self_type = merge_node; - - // Input-Output port types - using multi_in_port_type = multi_in_port; - using multi_out_port_type = multi_out_port; - - // Input-Output tuples - using input_tuple = std::tuple, token, token>; - - // Memory used for ONE active invocation of the dataflow-graph - struct invocation_memory { - std::atomic inputs_missing_; - input_tuple input_buffer_; - }; - - // Encodings for current input state (needs to fit into an atomic) - static constexpr unsigned int IN_0_MISSING = 1u << 0u; - static constexpr unsigned int IN_1_MISSING = 1u << 1u; - static constexpr unsigned int COND_MISSING = 1u << 2u; - static constexpr unsigned int INITIAL_STATE = IN_0_MISSING + IN_1_MISSING + COND_MISSING; - - public: - explicit merge_node() : in_port_{this, this}, out_port_{} {} - - in_port &true_in_port() { - return in_port_.template get<0>(); - } - - in_port &false_in_port() { - return in_port_.template get<1>(); - } - - in_port &condition_in_port() { - return in_port_.template get<2>(); - } - - out_port &value_out_port() { - return out_port_.template get<0>(); - } - - template - void token_pushed(token token) { - auto current_memory = get_invocation(token.invocation()); - - std::get(current_memory->input_buffer_) = token; - - unsigned int remaining_inputs; - if (POS == 0) { - remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_0_MISSING) - IN_0_MISSING; - } else if (POS == 1) { - remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_1_MISSING) - IN_1_MISSING; - } else { - remaining_inputs = (current_memory->inputs_missing_).fetch_sub(COND_MISSING) - COND_MISSING; - } - - if ((remaining_inputs & COND_MISSING) == 0) { - if ((remaining_inputs & IN_0_MISSING) == 0) { - auto &data = std::get<0>(current_memory->input_buffer_); - value_out_port().push_token(data); - current_memory->inputs_missing_ += IN_0_MISSING + COND_MISSING; - } else if ((remaining_inputs & IN_1_MISSING) == 0) { - auto &data = std::get<1>(current_memory->input_buffer_); - value_out_port().push_token(data); - current_memory->inputs_missing_ += IN_1_MISSING + COND_MISSING; - } - } - } - - int num_successors() const override { - return 1; - } - node *successor_at(int pos) const override { - return out_port_.next_node_at(pos); - } - - bool is_fully_connected() const override { - return in_port_.fully_connected() && out_port_.fully_connected(); - } - - int instance_buffer_size() const override { - return sizeof(invocation_memory); - } - void init_instance_buffer(void *memory) const override { - auto invocation = new(memory) invocation_memory{}; - invocation->inputs_missing_ = INITIAL_STATE; - }; - void clean_up_instance_buffer(void *memory) const override { - auto invocation = reinterpret_cast(memory); - invocation->~invocation_memory(); - } - - private: - multi_in_port_type in_port_; - multi_out_port_type out_port_; -}; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/node.h b/lib/pls/include/pls/dataflow/internal/node.h deleted file mode 100644 index 037a3eb..0000000 --- a/lib/pls/include/pls/dataflow/internal/node.h +++ /dev/null @@ -1,47 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_NODE_H_ -#define PLS_DATAFLOW_INTERNAL_NODE_H_ - -#include "build_state.h" -#include "token.h" - -namespace pls { -namespace dataflow { -namespace internal { - -/** - * Represents a single piece included in a dataflow graph. - * This can be anything connected to form the dataflow that requires - * memory during invocation and is present in the connected flow graph. - */ -class node { - template - friend - class graph; - - public: - virtual int num_successors() const = 0; - virtual node *successor_at(int pos) const = 0; - - virtual int instance_buffer_size() const = 0; - virtual void init_instance_buffer(void *memory) const = 0; - virtual void clean_up_instance_buffer(void *memory) const = 0; - - virtual bool is_fully_connected() const = 0; - - template - M *get_invocation(invocation_info invocation) { - return invocation.template get_instance_buffer(memory_index_); - } - - private: - int memory_index_{0}; - node *direct_successor_{nullptr}; - build_state build_state_{build_state::fresh}; -}; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/out_port.h b/lib/pls/include/pls/dataflow/internal/out_port.h deleted file mode 100644 index f8eb266..0000000 --- a/lib/pls/include/pls/dataflow/internal/out_port.h +++ /dev/null @@ -1,148 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_OUTPUT_H_ -#define PLS_DATAFLOW_INTERNAL_OUTPUT_H_ - -#include - -#include "in_port.h" -#include "node.h" - -namespace pls { -namespace dataflow { -namespace internal { - -template -class out_port { - public: - void connect(in_port &target) { - if (connected_) { - PLS_ERROR("Must only connect output once. Please disconnect it before reconnecting.") - } - - target.connect(); - target_ = ⌖ - connected_ = true; - } - - void operator>>(in_port &input) { - connect(input); - } - - void push_token(token token) const { - target_->push_token(token); - } - - node *next_node() const { - return target_->owning_node(); - } - - bool is_connected() const { return connected_; } - - private: - bool connected_{false}; - in_port *target_{nullptr}; -}; - -/** - * Represents multiple output ports bundled together (a tuple of inputs). - * - * @tparam I A variadic list of input types - */ -template -class multi_out_port { - private: - // Helpers for managing recursive types - using value_tuple_type = std::tuple; - using out_port_tupel_type = std::tuple, out_port...>; - static constexpr int num_outputs = std::tuple_size(); - - public: - // Simple interface to get types by index - template - using out_port_type_at = typename std::tuple_element::type; - template - using value_type_at = typename std::tuple_element::type; - - // Simple interface to access input's by index - template - out_port_type_at &get() { - return std::get(outputs_); - } - - // Simple interface to connect multiple intputs to matching, multiple outputs - template - void operator>>(multi_in_port &input) { - connect_to < CB, 0 > {this, &input}.connect(); - } - - node *next_node_at(int pos) const { - return next_node < 0 > {&outputs_}.get(pos); - } - - bool fully_connected() const { - return connected < 0 > {&outputs_}.get(); - } - - private: - out_port_tupel_type outputs_; - - template - struct next_node { - const out_port_tupel_type *outputs_; - node *get(int i) { - if (POS == i) { - return std::get(*outputs_).next_node(); - } else { - return next_node{outputs_}.get(i); - } - } - }; - template - struct next_node { - const out_port_tupel_type *outputs_; - node *get(int) { - PLS_ERROR("Try to access invalid successor node index!") - } - }; - - template - struct connected { - const out_port_tupel_type *outputs_; - bool get() { - bool self = std::get(*outputs_).is_connected(); - bool children = connected{outputs_}.get(); - return self && children; - } - }; - template - struct connected { - const out_port_tupel_type *outputs_; - bool get() { - return true; - } - }; - - template - struct connect_to { - multi_out_port *out_port_; - multi_in_port *in_port_; - - void connect() { - out_port_->template get() >> in_port_->template get(); - connect_to{out_port_, in_port_}.connect(); - } - }; - template - struct connect_to { - multi_out_port *out_port_; - multi_in_port *in_port_; - - void connect() { - }; - }; -}; -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_OUTPUT_H_ diff --git a/lib/pls/include/pls/dataflow/internal/outputs.h b/lib/pls/include/pls/dataflow/internal/outputs.h deleted file mode 100644 index 40237d9..0000000 --- a/lib/pls/include/pls/dataflow/internal/outputs.h +++ /dev/null @@ -1,17 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_OUTPUTS_H_ -#define PLS_DATAFLOW_INTERNAL_OUTPUTS_H_ - -namespace pls { -namespace dataflow { -namespace internal { - -template -struct outputs { -}; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_OUTPUTS_H_ diff --git a/lib/pls/include/pls/dataflow/internal/split_node.h b/lib/pls/include/pls/dataflow/internal/split_node.h deleted file mode 100644 index 363f32c..0000000 --- a/lib/pls/include/pls/dataflow/internal/split_node.h +++ /dev/null @@ -1,76 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_ -#define PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_ - -#include - -#include "in_port.h" -#include "out_port.h" -#include "node.h" -#include "token.h" - -namespace pls { -namespace dataflow { -namespace internal { - -template -class split_node : public node { - // Our own type - using self_type = split_node; - - // Input-Output port types - using multi_in_port_type = multi_in_port; - using multi_out_port_type = multi_out_port; - - public: - explicit split_node() : in_port_{this, this}, out_port_{} {} - - in_port &value_in_port() { - return in_port_.template get<0>(); - } - - out_port &out_port_1() { - return out_port_.template get<0>(); - } - - out_port &out_port_2() { - return out_port_.template get<1>(); - } - - template - void token_pushed(token token) { - out_port_1().push_token(token); - out_port_2().push_token(token); - } - - int num_successors() const override { - return 2; - } - node *successor_at(int pos) const override { - return out_port_.next_node_at(pos); - } - - bool is_fully_connected() const override { - return in_port_.fully_connected() && out_port_.fully_connected(); - } - - int instance_buffer_size() const override { - return 0; - } - void init_instance_buffer(void *) const override { - // No need for memory, we simply forward entries without buffering - }; - void clean_up_instance_buffer(void *memory) const override { - // No need for memory, we simply forward entries without buffering - } - - private: - multi_in_port_type in_port_; - multi_out_port_type out_port_; -}; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/switch_node.h b/lib/pls/include/pls/dataflow/internal/switch_node.h deleted file mode 100644 index b371c0b..0000000 --- a/lib/pls/include/pls/dataflow/internal/switch_node.h +++ /dev/null @@ -1,103 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_ -#define PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_ - -#include - -#include "in_port.h" -#include "out_port.h" -#include "node.h" -#include "token.h" - -namespace pls { -namespace dataflow { -namespace internal { - -template -class switch_node : public node { - // Our own type - using self_type = switch_node; - - // Input-Output port types - using multi_in_port_type = multi_in_port; - using multi_out_port_type = multi_out_port; - - // Input-Output tuples - using input_tuple = std::tuple, token>; - - // Memory used for ONE active invocation of the dataflow-graph - struct invocation_memory { - std::atomic inputs_missing_; - input_tuple input_buffer_; - }; - - public: - explicit switch_node() : in_port_{this, this}, out_port_{} {} - - in_port &value_in_port() { - return in_port_.template get<0>(); - } - - in_port &condition_in_port() { - return in_port_.template get<1>(); - } - - out_port &true_out_port() { - return out_port_.template get<0>(); - } - - out_port &false_out_port() { - return out_port_.template get<1>(); - } - - template - void token_pushed(token token) { - auto current_memory = get_invocation(token.invocation()); - - std::get(current_memory->input_buffer_) = token; - auto remaining_inputs = --(current_memory->inputs_missing_); - if (remaining_inputs == 0) { - bool condition = std::get<1>(current_memory->input_buffer_).value(); - auto &data = std::get<0>(current_memory->input_buffer_); - if (condition) { - true_out_port().push_token(data); - } else { - false_out_port().push_token(data); - } - current_memory->inputs_missing_ = 2; - } - } - - int num_successors() const override { - return 2; - } - node *successor_at(int pos) const override { - return out_port_.next_node_at(pos); - } - - bool is_fully_connected() const override { - return in_port_.fully_connected() && out_port_.fully_connected(); - } - - int instance_buffer_size() const override { - return sizeof(invocation_memory); - } - void init_instance_buffer(void *memory) const override { - auto invocation = new(memory) invocation_memory{}; - invocation->inputs_missing_ = 2; - }; - void clean_up_instance_buffer(void *memory) const override { - auto invocation = reinterpret_cast(memory); - invocation->~invocation_memory(); - } - - private: - multi_in_port_type in_port_; - multi_out_port_type out_port_; -}; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/token.h b/lib/pls/include/pls/dataflow/internal/token.h deleted file mode 100644 index ee40720..0000000 --- a/lib/pls/include/pls/dataflow/internal/token.h +++ /dev/null @@ -1,42 +0,0 @@ - -#ifndef PLS_DATAFLOW_INTERNAL_TOKEN_H_ -#define PLS_DATAFLOW_INTERNAL_TOKEN_H_ - -namespace pls { -namespace dataflow { -namespace internal { - -class invocation_info { - public: - explicit invocation_info(void **memory) : memory_{memory} {}; - invocation_info(invocation_info const &other) = default; - - template - T *get_instance_buffer(int pos) { - return reinterpret_cast(memory_[pos]); - } - - private: - void **memory_; -}; - -template -class token { - T value_; - invocation_info invocation_; - - public: - token() : invocation_{nullptr} {}; // Default Constructor Stays uninitialized - token(T value, invocation_info color) : value_{value}, invocation_{color} {}; - - T &value() { return value_; } - invocation_info invocation() const { return invocation_; } - - void set_invocation(invocation_info invocation_info) { invocation_ = invocation_info; } -}; - -} -} -} - -#endif //PLS_DATAFLOW_INTERNAL_TOKEN_H_