diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 37f3eb6..e38032c 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -17,31 +18,53 @@ int main() { // Define graph, outputs> graph; - auto func1 = [](const int &i1, const int &i2, int &o1) { - std::cout << "Add up " << i1 << " and " << i2 << "..." << std::endl; - o1 = i1 + i2; + auto triple = [](const int &i1, int &o1) { + o1 = i1 * 3; }; - function_node, outputs, decltype(func1)> node1{func1}; + function_node, outputs, decltype(triple)> triple_node{triple}; - auto func2 = [](const int &i1, int &o1) { - std::cout << "Print Result " << i1 << std::endl; - o1 = i1; + auto minus_one = [](const int &i1, int &o1) { + o1 = i1 - 1; }; - function_node, outputs, decltype(func2)> node2{func2}; + function_node, outputs, decltype(minus_one)> minus_one_node{minus_one}; + + auto recursion = [&](const int &i1, const int &i2, int &o1) { + if (i1 > 0) { + std::tuple out; + graph.run({i1, i2}, out); + pls::scheduler::wait_for_all(); + o1 = std::get<0>(out); + } else { + o1 = i2; + } + }; + function_node, outputs, decltype(recursion)> recursion_node{recursion}; // Connect - graph.input<0>() >> node1.in_port<0>(); - graph.input<1>() >> node1.in_port<1>(); + graph.input<0>() >> minus_one_node.in_port<0>(); + minus_one_node.out_port<0>() >> recursion_node.in_port<0>(); - node1.out_port<0>() >> node2.in_port<0>(); + graph.input<1>() >> triple_node.in_port<0>(); + triple_node.out_port<0>() >> recursion_node.in_port<1>(); - node2.out_port<0>() >> graph.output<0>(); + recursion_node.out_port<0>() >> graph.output<0>(); // Build graph.build(); - // Execute - graph.run({1, 2}); - graph.run({1, 1}); - graph.run({5, 6}); + pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18u}; + pls::scheduler scheduler{&my_scheduler_memory, 8}; + scheduler.perform_work([&] { + // Schedule Execution + std::tuple out1, out2, out3; + graph.run({1, 2}, out1); + graph.run({1, 1}, out2); + graph.run({5, 6}, out3); + + // Wait for results and print + pls::scheduler::wait_for_all(); + std::cout << std::get<0>(out1) << std::endl; + std::cout << std::get<0>(out2) << std::endl; + std::cout << std::get<0>(out3) << std::endl; + }); } diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 34e7e6f..03ffff8 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -49,7 +49,7 @@ add_library(pls STATIC include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp - include/pls/internal/scheduling/lambda_task.h include/pls/internal/helpers/seqence.h) + include/pls/internal/scheduling/lambda_task.h include/pls/internal/helpers/seqence.h include/pls/dataflow/internal/build_state.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls PUBLIC diff --git a/lib/pls/include/pls/dataflow/internal/function_node.h b/lib/pls/include/pls/dataflow/internal/function_node.h index d630382..6959d52 100644 --- a/lib/pls/include/pls/dataflow/internal/function_node.h +++ b/lib/pls/include/pls/dataflow/internal/function_node.h @@ -64,7 +64,7 @@ class function_node, pls::dataflow::outputs void token_pushed(token token) { - auto current_memory = get_invocation(token); + auto current_memory = get_invocation(token.invocation()); std::get(current_memory->input_buffer_) = token; auto remaining_inputs = --(current_memory->inputs_missing_); @@ -141,7 +141,6 @@ class function_node, pls::dataflow::outputs #include +#include "build_state.h" #include "node.h" +#include "function_node.h" #include "in_port.h" #include "out_port.h" #include "inputs.h" #include "outputs.h" +#include "pls/pls.h" + namespace pls { namespace dataflow { namespace internal { @@ -25,126 +29,70 @@ class graph, pls::dataflow::outputs> : // Our own type using self_type = graph, pls::dataflow::outputs>; - // Callbacks for input/output ports - struct in_port_cb { - self_type *self_; - template - void token_pushed(token token) { - self_->in_port_token_pushed(token); - } - }; - struct out_port_cb { - self_type *self_; - template - void token_pushed(token token) { - self_->out_port_token_pushed(token); - } - }; - - // Input-Output port types (external) - using multi_in_port_type = multi_in_port; - using multi_out_port_type = multi_out_port; - // Input-Output port types (internal) - using internal_in_port_type = multi_out_port; - using internal_out_port_type = multi_in_port; + using inputs_type = multi_out_port; + using outputs_type = multi_in_port; // Input-Output value tuples using value_input_tuple = std::tuple; using input_tuple = std::tuple, token...>; + + using value_output_tuple = std::tuple; using output_tuple = std::tuple, token...>; + static constexpr int num_in_ports = std::tuple_size(); static constexpr int num_out_ports = std::tuple_size(); // Memory used for ONE active invocation of the dataflow-graph struct invocation_memory { - std::atomic inputs_missing_; - input_tuple input_buffer_; + value_output_tuple *output_buffer_; }; public: template - using in_port_at = typename multi_in_port_type::template in_port_type_at; - template - using out_port_at = typename multi_out_port_type::template out_port_type_at; - - template - in_port_at &in_port() { - return in_port_.template get(); - } - template - out_port_at &out_port() { - return out_port_.template get(); - } - - template - void in_port_token_pushed(token token) { - auto invocation = get_invocation(token); - - // TODO: Invoke a new instance of our graph (recursive call) when all inputs are here - std::cout << "Token pushed at " << POS << " with value " << token.value() << std::endl; - auto remaining_inputs = --(invocation->inputs_missing_); - if (remaining_inputs == 0) { - std::cout << "All inputs there!" << std::endl; - } - } - - template - using internal_in_port_at = typename internal_in_port_type::template out_port_type_at; + using input_at = typename inputs_type::template out_port_type_at; template - using internal_out_port_at = typename internal_out_port_type::template in_port_type_at; + using output_at = typename outputs_type::template in_port_type_at; template - internal_in_port_at &input() { - return internal_in_port_.template get(); + input_at &input() { + return inputs_.template get(); } template - internal_out_port_at &output() { - return internal_out_port_.template get(); + output_at &output() { + return outputs_.template get(); } template - void out_port_token_pushed(token token) { - auto invocation = get_invocation(token); - - std::cout << "Output produced at port " << POS << " with value " << token.value() << std::endl; - auto remaining_inputs = --(invocation->inputs_missing_); - if (remaining_inputs == 0) { - std::cout << "All inputs there!" << std::endl; - } + void token_pushed(token token) { + auto invocation = get_invocation(token.invocation()); + std::get(*invocation->output_buffer_) = token.value(); } - graph() : in_port_cb_{this}, - out_port_cb_{this}, - in_port_{this, &in_port_cb_}, - out_port_{}, - internal_in_port_{}, - internal_out_port_{this, &out_port_cb_} { - } + graph() : inputs_{}, outputs_{this, this} {} ///////////////////////////////////////////////////////////////// // Graph building ///////////////////////////////////////////////////////////////// void build() { - state_ = building; - PLS_ASSERT(is_internal_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!") + PLS_ASSERT(build_state_ == build_state::fresh, "Must only build a dataflow graph once!") + PLS_ASSERT(is_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!") - num_nodes_ = 0; + build_state_ = build_state::building; + node_list_start_ = node_list_current_ = this; + memory_index_ = 0; + num_nodes_ = 1; for (int i = 0; i < num_in_ports; i++) { - build_recursive(internal_in_port_.next_node_at(i)); + build_recursive(inputs_.next_node_at(i)); } - - state_ = built; + build_state_ = build_state::built; } - node *start_{nullptr}; - node *current_{nullptr}; - void build_recursive(node *node) { - if (node->state_ != fresh) { + if (node->build_state_ != build_state::fresh) { return; // Already visited } - node->state_ = building; + node->build_state_ = build_state::building; PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graph nodes!") add_node(node); @@ -152,34 +100,33 @@ class graph, pls::dataflow::outputs> : build_recursive(node->successor_at(i)); } - node->state_ = built; + node->build_state_ = build_state::built; } void add_node(node *new_node) { new_node->memory_index_ = num_nodes_++; - if (start_ == nullptr) { - start_ = new_node; - current_ = new_node; + if (node_list_current_ == nullptr) { + node_list_current_ = new_node; + node_list_start_ = new_node; } else { - current_->direct_successor_ = new_node; - current_ = new_node; + node_list_current_->direct_successor_ = new_node; + node_list_current_ = new_node; } } - // TODO: Clean up + move to async task template struct feed_inputs { - feed_inputs(internal_in_port_type &, value_input_tuple &, invocation_info &) {} + feed_inputs(inputs_type &, value_input_tuple &, invocation_info &) {} void run() {} }; template struct feed_inputs { - internal_in_port_type &inputs_; + inputs_type &inputs_; value_input_tuple &input_values_; invocation_info &invocation_; - feed_inputs(internal_in_port_type &inputs, + feed_inputs(inputs_type &inputs, value_input_tuple &input_values, invocation_info &invocation) : inputs_{inputs}, input_values_{input_values}, @@ -191,38 +138,56 @@ class graph, pls::dataflow::outputs> : } }; - void run(value_input_tuple input) { - // TODO: clearly move this onto the task stack instead fo malloc (without any free...) - void **buffers = reinterpret_cast(malloc(num_nodes_ * sizeof(void *))); - node *iterator = start_; - for (int i = 0; i < num_nodes_; i++) { - auto required_size = iterator->instance_buffer_size(); - buffers[i] = malloc(required_size); - iterator->init_instance_buffer(buffers[i]); + class run_graph_task : public pls::task { + graph *self_; + value_input_tuple input_; + value_output_tuple *output_; + + // Buffers for actual execution + invocation_info invocation_; + + public: + run_graph_task(graph *self, value_input_tuple &input, value_output_tuple *output) : self_{self}, + input_{input}, + output_{output}, + invocation_{nullptr} { + void **buffers; + buffers = reinterpret_cast(allocate_memory(self_->num_nodes_ * sizeof(void *))); + + node *iterator = self_->node_list_start_; + for (int i = 0; i < self_->num_nodes_; i++) { + auto required_size = iterator->instance_buffer_size(); + buffers[i] = allocate_memory(required_size); + iterator->init_instance_buffer(buffers[i]); + + iterator = iterator->direct_successor_; + } + invocation_ = invocation_info{buffers}; + + self_->get_invocation(invocation_)->output_buffer_ = output; + } - iterator = iterator->direct_successor_; + void execute_internal() override { + feed_inputs<0, I0, I...>{self_->inputs_, input_, invocation_}.run(); } + }; - invocation_info invocation{buffers}; - feed_inputs<0, I0, I...>{internal_in_port_, input, invocation}.run(); + void run(value_input_tuple input, value_output_tuple &output) { + pls::scheduler::spawn_child(this, input, &output); } ////////////////////////////////////////////////////////////////// // Overrides for generic node functionality (building the graph) ////////////////////////////////////////////////////////////////// int num_successors() const override { - return num_out_ports; + return 0; } node *successor_at(int pos) const override { - return out_port_.next_node_at(pos); + PLS_ERROR("A graph instance has no direct successor!") } bool is_fully_connected() const override { - return in_port_.fully_connected() && out_port_.fully_connected(); - } - - bool is_internal_fully_connected() const { - return internal_in_port_.fully_connected() && internal_out_port_.fully_connected(); + return inputs_.fully_connected() && outputs_.fully_connected(); } int instance_buffer_size() const override { @@ -230,20 +195,15 @@ class graph, pls::dataflow::outputs> : } void init_instance_buffer(void *memory) const override { auto invocation = new(memory) invocation_memory{}; - invocation->inputs_missing_ = num_in_ports; }; private: - // Input/Output ports - in_port_cb in_port_cb_; - out_port_cb out_port_cb_; - - multi_in_port_type in_port_; - multi_out_port_type out_port_; - - internal_in_port_type internal_in_port_; - internal_out_port_type internal_out_port_; + inputs_type inputs_; + outputs_type outputs_; + // Information about building the graph + node *node_list_start_{nullptr}; + node *node_list_current_{nullptr}; int num_nodes_{0}; }; diff --git a/lib/pls/include/pls/dataflow/internal/node.h b/lib/pls/include/pls/dataflow/internal/node.h index 0d03488..b871183 100644 --- a/lib/pls/include/pls/dataflow/internal/node.h +++ b/lib/pls/include/pls/dataflow/internal/node.h @@ -2,17 +2,22 @@ #ifndef PLS_DATAFLOW_INTERNAL_NODE_H_ #define PLS_DATAFLOW_INTERNAL_NODE_H_ +#include "build_state.h" + namespace pls { namespace dataflow { namespace internal { +/** + * Represents a single piece included in a dataflow graph. + * This can be anything connected to form the dataflow that requires + * memory during invocation and is present in the connected flow graph. + */ class node { template friend class graph; - enum state { fresh, building, built, teardown }; - public: virtual int num_successors() const = 0; virtual node *successor_at(int pos) const = 0; @@ -22,18 +27,15 @@ class node { virtual bool is_fully_connected() const = 0; - template - M *get_invocation(token token) { - return token.invocation().template get_instance_buffer(memory_index_); + template + M *get_invocation(invocation_info invocation) { + return invocation.template get_instance_buffer(memory_index_); } - // TODO: Remove - void set_memory_index(int i) { memory_index_ = i; } - private: int memory_index_{0}; node *direct_successor_{nullptr}; - state state_{fresh}; + build_state build_state_{build_state::fresh}; }; }