From 6ce5e24751383338b36f92a8be813bf2a008f986 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Mon, 29 Jul 2019 14:07:20 +0200 Subject: [PATCH] Split graph and function_node impls into separate files. --- lib/pls/CMakeLists.txt | 2 +- lib/pls/include/pls/dataflow/internal/function_node.h | 80 ++++++++++++++++++++------------------------------------------------------------ lib/pls/include/pls/dataflow/internal/graph.h | 122 +++++++++++++------------------------------------------------------------------------------------------------------------- 3 files changed, 34 insertions(+), 170 deletions(-) diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 03ffff8..8b0b789 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -49,7 +49,7 @@ add_library(pls STATIC include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp - include/pls/internal/scheduling/lambda_task.h include/pls/internal/helpers/seqence.h include/pls/dataflow/internal/build_state.h) + include/pls/internal/scheduling/lambda_task.h include/pls/internal/helpers/seqence.h include/pls/dataflow/internal/build_state.h include/pls/dataflow/internal/function_node_impl.h include/pls/dataflow/internal/graph_impl.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls PUBLIC diff --git a/lib/pls/include/pls/dataflow/internal/function_node.h b/lib/pls/include/pls/dataflow/internal/function_node.h index ae6fc1c..dac72b5 100644 --- a/lib/pls/include/pls/dataflow/internal/function_node.h +++ b/lib/pls/include/pls/dataflow/internal/function_node.h @@ -48,6 +48,8 @@ class function_node, pls::dataflow::outputs using in_port_at = typename multi_in_port_type::template in_port_type_at; template @@ -64,67 +66,8 @@ class function_node, pls::dataflow::outputs - void token_pushed(token token) { - auto current_memory = get_invocation(token.invocation()); - - std::get(current_memory->input_buffer_) = token; - auto remaining_inputs = --(current_memory->inputs_missing_); - if (remaining_inputs == 0) { - execute_function(current_memory, token.invocation()); - current_memory->inputs_missing_ = num_in_ports; - } - } - - void execute_function(invocation_memory *invocation_memory, invocation_info invocation_info) { - auto lambda = [&]() { - input_tuple &inputs = invocation_memory->input_buffer_; - output_tuple outputs; - - execute_function_internal(inputs, typename sequence_gen<1 + sizeof...(I)>::type(), - outputs, typename sequence_gen<1 + sizeof...(O)>::type(), invocation_info); - }; - // TODO: maybe replace this with 'continuation' style invocation - pls::scheduler::spawn_child_and_wait>(lambda); - } - template - void set_invocation_info(token &token, invocation_info invocation_info) { - token.set_invocation(invocation_info); - } - template - struct propagate_output { - propagate_output(multi_out_port_type &, output_tuple &) {} - void propagate() {} - }; - template - struct propagate_output { - multi_out_port_type &out_port_; - output_tuple &output_tuple_; - - propagate_output(multi_out_port_type &out_port, output_tuple &output_tuple) : out_port_{out_port}, - output_tuple_{output_tuple} {} - - void propagate() { - out_port_.template get().push_token(std::get(output_tuple_)); - propagate_output{out_port_, output_tuple_}.propagate(); - } - }; - - template - void execute_function_internal(input_tuple &inputs, - sequence, - output_tuple &outputs, - sequence, - invocation_info invocation_info) { - set_invocation_info(std::get(outputs)..., invocation_info); - function_(std::get(inputs).value()..., std::get(outputs).value()...); - propagate_output<0, O0, O...>{out_port_, outputs}.propagate(); - } + void token_pushed(token token); - explicit function_node(F function) : in_port_{this, this}, out_port_{}, function_{function} {} - - ////////////////////////////////////////////////////////////////// - // Overrides for generic node functionality (building the graph) - ////////////////////////////////////////////////////////////////// int num_successors() const override { return num_out_ports; } @@ -149,10 +92,27 @@ class function_node, pls::dataflow::outputs + struct propagate_output; + + template + void set_invocation_info(token &token, invocation_info invocation_info); + + template + void execute_function_internal(input_tuple &inputs, sequence, + output_tuple &outputs, sequence, + invocation_info invocation_info); }; } } } +#include "function_node_impl.h" #endif //PLS_DATAFLOW_INTERNAL_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/graph.h b/lib/pls/include/pls/dataflow/internal/graph.h index f57c786..eb0830c 100644 --- a/lib/pls/include/pls/dataflow/internal/graph.h +++ b/lib/pls/include/pls/dataflow/internal/graph.h @@ -35,10 +35,10 @@ class graph, pls::dataflow::outputs> : // Input-Output value tuples using value_input_tuple = std::tuple; - using input_tuple = std::tuple, token...>; + using input_tuple = std::tuple, token...>; using value_output_tuple = std::tuple; - using output_tuple = std::tuple, token...>; + using output_tuple = std::tuple, token...>; static constexpr int num_in_ports = std::tuple_size(); static constexpr int num_out_ports = std::tuple_size(); @@ -64,121 +64,16 @@ class graph, pls::dataflow::outputs> : } template - void token_pushed(token token) { - auto invocation = get_invocation(token.invocation()); - std::get(*invocation->output_buffer_) = token.value(); - } + void token_pushed(token token); graph() : inputs_{}, outputs_{this, this} {} - ///////////////////////////////////////////////////////////////// - // Graph building - ///////////////////////////////////////////////////////////////// - void build() { - PLS_ASSERT(build_state_ == build_state::fresh, "Must only build a dataflow graph once!") - PLS_ASSERT(is_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!") - - build_state_ = build_state::building; - node_list_start_ = node_list_current_ = this; - memory_index_ = 0; - num_nodes_ = 1; - for (int i = 0; i < num_in_ports; i++) { - build_recursive(inputs_.next_node_at(i)); - } - build_state_ = build_state::built; - } - - void build_recursive(node *node) { - if (node->build_state_ != build_state::fresh) { - return; // Already visited - } - node->build_state_ = build_state::building; - PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graph nodes!") - - add_node(node); - for (int i = 0; i < node->num_successors(); i++) { - build_recursive(node->successor_at(i)); - } - - node->build_state_ = build_state::built; - } - - void add_node(node *new_node) { - new_node->memory_index_ = num_nodes_++; - - if (node_list_current_ == nullptr) { - node_list_current_ = new_node; - node_list_start_ = new_node; - } else { - node_list_current_->direct_successor_ = new_node; - node_list_current_ = new_node; - } - } - - template - struct feed_inputs { - feed_inputs(inputs_type &, value_input_tuple &, invocation_info &) {} - void run() {} - }; - template - struct feed_inputs { - inputs_type &inputs_; - value_input_tuple &input_values_; - invocation_info &invocation_; - - feed_inputs(inputs_type &inputs, - value_input_tuple &input_values, - invocation_info &invocation) : inputs_{inputs}, - input_values_{input_values}, - invocation_{invocation} {} - - void run() { - inputs_.template get().push_token(token{std::get(input_values_), invocation_}); - feed_inputs{inputs_, input_values_, invocation_}.run(); - } - }; - - class run_graph_task : public pls::task { - graph *self_; - value_input_tuple input_; - value_output_tuple *output_; - - // Buffers for actual execution - invocation_info invocation_; - - public: - run_graph_task(graph *self, value_input_tuple &input, value_output_tuple *output) : self_{self}, - input_{input}, - output_{output}, - invocation_{nullptr} { - void **buffers; - buffers = reinterpret_cast(allocate_memory(self_->num_nodes_ * sizeof(void *))); - - node *iterator = self_->node_list_start_; - for (int i = 0; i < self_->num_nodes_; i++) { - auto required_size = iterator->instance_buffer_size(); - buffers[i] = allocate_memory(required_size); - iterator->init_instance_buffer(buffers[i]); - - iterator = iterator->direct_successor_; - } - invocation_ = invocation_info{buffers}; - - self_->get_invocation(invocation_)->output_buffer_ = output; - } - - void execute_internal() override { - feed_inputs<0, I0, I...>{self_->inputs_, input_, invocation_}.run(); - } - }; + void build(); void run(value_input_tuple input, value_output_tuple &output) { pls::scheduler::spawn_child(this, input, &output); } - ////////////////////////////////////////////////////////////////// - // Overrides for generic node functionality (building the graph) - ////////////////////////////////////////////////////////////////// int num_successors() const override { return 0; } @@ -205,10 +100,19 @@ class graph, pls::dataflow::outputs> : node *node_list_start_{nullptr}; node *node_list_current_{nullptr}; int num_nodes_{0}; + + // Internals required for building and running + void build_recursive(node *node); + void add_node(node *new_node); + + template + struct feed_inputs; + class run_graph_task; }; } } } +#include "graph_impl.h" #endif //PLS_DATAFLOW_INTERNAL_GRAPH_H_ -- libgit2 0.26.0