diff --git a/NOTES.md b/NOTES.md index 1a70ff5..a4481d5 100644 --- a/NOTES.md +++ b/NOTES.md @@ -4,6 +4,186 @@ A collection of stuff that we noticed during development. Useful later on two write a project report and to go back in time to find out why certain decisions where made. +## 19.07.2019 - Colored tokens, recursion and where to put memory + +While implementing dataflow graphs we encountered some obstacles. +The most severe one the most severe one that impacts the +internal design of the API, end user features and performance +is how to handle token flows, especially colored tokens for parallel +and/or recursive calls. + +The basic issue here is, that each execution instance requires its +own isolated environment to execute in. Therefore, each invocation +instance (parallel or recursive) needs an unique identifier. +This identifier is usually realized using colored tokens, +as for example in this \[1] classic dataflow language implementation. +The problem with this is, that in our environment we are constrained +to static memory allocation and lock-free programming. +To handle static memory allocation we first decided to follow an model +introduced by EMBB, associating an clock to each colored token, +which maps it to an slot in an array of possible parallel execution +instances. This works fine in EMBB, as they put two major limitations +on their execution model: no cycles and no recursion. +(Issues arise with correcly coloring tokens in an lock-free matter +without too much contention, leading us to believe that this +method scales very bad, especially with smaller workloads) + +While we could simply implement that, we wondered if there is a way to +better fit dataflow in our existing, task-stack based programming +approach. + +After some investigation we found some core statements: +- The dataflow graph itself describes only programming execution, +it should therefore not be tightly coupled with data/buffer management +- Each invocation of an dataflow graph is logically a task in our model, +it therefore makes sense to map the memory and coordination resources +required for one invocation instance directly to this task +- What we do is in fact not 'pure' dataflow programming, as we do not +try to re-create a full programming language (e.g. we do not care about +memory management and loops described in \[1]) +- What we do is more close to functional programming with single +assignment rules and recursion (see Elixir for a nice syntactic example) + +Our plan is therefore the following: +Separate structure of the dataflow from execution and map +one execution instance to one active task in our runtime system. + +This, conveniently, also mitigates most issues related to +memory allocation/parallelism in the graph and makes for a +nicer end user API (no more buffer types/parallelism in templates). + + +\[1] J. B. Dennis, “First version of a data flow procedure language,” in Programming Symposium, vol. 19, B. Robinet, Ed. Berlin, Heidelberg: Springer Berlin Heidelberg, 1974, pp. 362–376. + + +## 19.07.2019 - Variadic Templates for input/output goups + +We searched for the correct way to represent an nodes +or graphs input/output pairs for a while and found the +solution in partial specialization of templates. + +```C++ +template +class graph {}; + +template +class graph, outputs> { ... }; +``` + +The above code allows us to enforce an end-user API that is +clean while still giving us full access on the input/output variadic +template types. The end user API is the following: + +```C++ +graph, outputs> g; +``` + +## 03.07.2019 - Outline/Plan for our Dataflow API + +The following describes our ideas for what we expect/want to build +in our dataflow API. The restrictions and decisions try to hold a +balance for usability, feasibility of the implementation and +feature richness (what kind of flows can even be represented using +the API). + +We will model our dataflow closely to the EMBB implementation. +There are buffers that are controlled by a color/clock and +a DF graph has sources (inputs) and sinks (outputs). + +Some notable properties we want (and that can differ from EMBB): +- A DF graph must be well behaved, i.e. for each full set of input +values exactly one set of output values is produced (each parallel +invocation therefore corresponds to exactly one value in the in- +and outputs) +- A DF graph must be declared with all it's interface, i.e. it's +full set of input types and output types must be known +when declaring it +```c++ +dataflow, Outputs, NUM_PARAL> df_instance; +``` +- Nodes inside the DF graph are produced by the graph itself, allowing +it to propagate parallelism constraints (for example directly creating +the children with correct buffer capacities). This is done by having +factory functions on a concrete DF graph instance. +- A DF graph's sources/sinks are 'packed', i.e. the user must +provide a full sets of inputs to trigger the DF graph and is provided +a full set ouf outputs when reading a result from the DF Graph +(this highlights our intend for well behaved flow graphs, as users +do not even get the notion that it is ok to only return partial results) +```c++ +auto source = df.source([](String &out1, Int &out2) { + if (elements_avaliable) { + out1 = ...; + out2 = ...; + return true; + } else { + return false; + } +}); + +... + +auto sink = df.sink([](const &Image, const &Int){ + ...; +}); +``` +- Easy API's for working with array data are provided in form of +interator sources/sinks +```c++ +auto source = df.iterator_source(); +auto sink = df.iterator_sink(); + +... + +source.reset(input.begin(), input.end()); +sink.reset(output.begin()); +df.wait_for_all(); +``` +- In the first version nodes are always fully parallel, +further versions might include the per node property of +unordered_serial (nodes are executed at most once, but not ordered, +e.g. for accessing shared memory) or ordered_serial (e.g. +for logging something in correct order to a file/console). +- Sinks/User accessed outputs for a DF graph are always ordered_serial, +preventing confusion for the end user and ensuring deterministic +execution +- Memory management for all buffers and the DF graph itself is made +explicitly visible to the end user by enforcing him to hold all +components of the graph in memory when using it. This keeps on with +our phylosophy of not having hidden memory allocations, making +development for e.g. embedded platforms simpler, as it is clear +where and what resources are used (one can simply sizeof(...) all +parts of the graph and find out how much memory the buffers and so on +require) +- This model in principle allows recursive invocation. We will not +implement this in the first place, but keep the option for later. +This will potentially allow different patterns, like stencil operations, +to be implemented with the system. + +## 03.07.2019 - Dataflow in EMBB + +EMBB's dataflow is a very simple but effective implementation +of k-colored (maximum of k concurrent invocations, data/tokens +marked by an individual color per parallel invocation). +They force a acyclic, recursion-free flow and force to set source +nodes and sink nodes explicitly (acting as in- and outputs). + +This allows them to send signals down on arcs even if there is +no value, e.g. if there is a split in control flow the 'not used' +side of the flow will be fed an 'empty' token, signaling sinks +that the execution of this parallel instance reached the sink. +Once one color of tokens (so one parallel execution instance) +reaches ALL sinks the model allows a new to be input. +This force of all tokens reaching the sinks before new ones can +entry is ordered, thus potentially limiting concurrency, +but at the same time makes for a very simple to implement model. +Computational nodes between sources and sinks are associated +with input buffers (having the same capacity as the number of +parallel invocations allowed). These can hold values from +predecessors until all inputs for the node are ready. The node +is started as a process as soon as the last needed input is provided +(this event is triggered by a reference counter of missing inputs). + ## 26.06.2019 - Notes on Dataflow Implementation ### Dataflow in general diff --git a/app/benchmark_unbalanced/CMakeLists.txt b/app/benchmark_unbalanced/CMakeLists.txt index 00c95ab..d935ada 100644 --- a/app/benchmark_unbalanced/CMakeLists.txt +++ b/app/benchmark_unbalanced/CMakeLists.txt @@ -1,4 +1,4 @@ -add_executable(benchmark_unbalanced main.cpp node.h node.cpp picosha2.h) +add_executable(benchmark_unbalanced main.cpp node.h function_node.cpp picosha2.h) target_link_libraries(benchmark_unbalanced pls) if (EASY_PROFILER) target_link_libraries(benchmark_unbalanced easy_profiler) diff --git a/app/benchmark_unbalanced/node.cpp b/app/benchmark_unbalanced/function_node.cpp similarity index 100% rename from app/benchmark_unbalanced/node.cpp rename to app/benchmark_unbalanced/function_node.cpp diff --git a/app/playground/main.cpp b/app/playground/main.cpp index cde3abd..eb9384b 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -1,14 +1,96 @@ // Headers are available because we added the pls target -#include -#include -#include -#include -#include -#include +#include +#include #include +#include -#include +#include "pls/pls.h" +#include "pls/dataflow/internal/inputs.h" +#include "pls/dataflow/internal/outputs.h" +#include "pls/dataflow/internal/function_node.h" +#include "pls/dataflow/internal/graph.h" +#include "pls/dataflow/internal/switch_node.h" +#include "pls/dataflow/internal/split_node.h" +#include "pls/dataflow/internal/merge_node.h" int main() { + using namespace pls::dataflow; + using namespace pls::dataflow::internal; + + // Define + graph, outputs> graph; + + auto triple = [](const int &i1, int &o1) { + o1 = i1 * 3; + }; + function_node, outputs, decltype(triple)> triple_node{triple}; + + auto minus_one = [](const int &i1, int &o1) { + o1 = i1 - 1; + }; + function_node, outputs, decltype(minus_one)> minus_one_node_1{minus_one}; + function_node, outputs, decltype(minus_one)> minus_one_node_2{minus_one}; + + auto is_positive = [](const int &i1, bool &o1) { + o1 = i1 > 0; + }; + function_node, outputs, decltype(is_positive)> is_positive_node{is_positive}; + + auto recursion = [&](const int &i1, const int &i2, int &o1) { + std::tuple out; + graph.run({i1, i2}, out); + pls::scheduler::wait_for_all(); + o1 = std::get<0>(out); + }; + function_node, outputs, decltype(recursion)> recursion_node{recursion}; + + split_node minus_split; + split_node decision_split; + switch_node recursion_split; + merge_node recursion_merge; + + // Connect + minus_one_node_1 >> minus_one_node_2; + + // Inputs to first processing step + graph.input<0>() >> minus_one_node_1.in_port<0>(); + graph.input<1>() >> triple_node.in_port<0>(); + minus_one_node_2.out_port<0>() >> minus_split.value_in_port(); + + // Prepare decision... + minus_split.out_port_1() >> is_positive_node.in_port<0>(); + is_positive_node.out_port<0>() >> decision_split.value_in_port(); + triple_node.out_port<0>() >> recursion_split.value_in_port(); + decision_split.out_port_1() >> recursion_split.condition_in_port(); + + // Connect true case (recursion) + minus_split.out_port_2() >> recursion_node.in_port<0>(); + recursion_split.true_out_port() >> recursion_node.in_port<1>(); + recursion_node.out_port<0>() >> recursion_merge.true_in_port(); + + // Connect false case (no recursion) + recursion_split.false_out_port() >> recursion_merge.false_in_port(); + + // Deliver final result (back from merge) + decision_split.out_port_2() >> recursion_merge.condition_in_port(); + recursion_merge.value_out_port() >> graph.output<0>(); + + // Build + graph.build(); + + pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18u}; + pls::scheduler scheduler{&my_scheduler_memory, 8}; + scheduler.perform_work([&] { + // Schedule Execution + std::tuple out1, out2, out3; + graph.run({1, 2}, out1); + graph.run({1, 1}, out2); + graph.run({5, 6}, out3); + // Wait for results and print + pls::scheduler::wait_for_all(); + std::cout << std::get<0>(out1) << std::endl; + std::cout << std::get<0>(out2) << std::endl; + std::cout << std::get<0>(out3) << std::endl; + }); } diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 8d46447..67e9cd8 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -9,6 +9,21 @@ add_library(pls STATIC include/pls/algorithms/scan.h include/pls/algorithms/scan_impl.h + include/pls/dataflow/dataflow.h + include/pls/dataflow/internal/inputs.h + include/pls/dataflow/internal/outputs.h + include/pls/dataflow/internal/token.h + include/pls/dataflow/internal/in_port.h + include/pls/dataflow/internal/out_port.h + include/pls/dataflow/internal/function_node.h + include/pls/dataflow/internal/node.h + include/pls/dataflow/internal/graph.h + include/pls/dataflow/internal/build_state.h + include/pls/dataflow/internal/function_node_impl.h + include/pls/dataflow/internal/graph_impl.h + include/pls/dataflow/internal/switch_node.h + include/pls/dataflow/internal/merge_node.h + include/pls/internal/base/spin_lock.h include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp @@ -22,6 +37,7 @@ add_library(pls STATIC include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h + include/pls/internal/data_structures/deque.h include/pls/internal/data_structures/locking_deque.h include/pls/internal/data_structures/locking_deque_impl.h include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h @@ -32,13 +48,15 @@ add_library(pls STATIC include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/unique_id.h include/pls/internal/helpers/range.h + include/pls/internal/helpers/seqence.h include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp - include/pls/internal/scheduling/lambda_task.h include/pls/internal/data_structures/deque.h) + include/pls/internal/scheduling/lambda_task.h + include/pls/dataflow/internal/split_node.h include/pls/internal/helpers/member_function.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls PUBLIC diff --git a/lib/pls/include/pls/dataflow/dataflow.h b/lib/pls/include/pls/dataflow/dataflow.h new file mode 100644 index 0000000..e61d8a5 --- /dev/null +++ b/lib/pls/include/pls/dataflow/dataflow.h @@ -0,0 +1,36 @@ + +#ifndef PLS_DATAFLOW_DATAFLOW_H_ +#define PLS_DATAFLOW_DATAFLOW_H_ + +#include "internal/graph.h" +#include "internal/function_node.h" +#include "internal/merge_node.h" +#include "internal/switch_node.h" +#include "internal/split_node.h" + +#include "internal/inputs.h" +#include "internal/outputs.h" + +namespace pls { +namespace dataflow { + +template +using graph = internal::graph; +template +using function_node = internal::function_node; +template +using merge_node = internal::merge_node; +template +using switch_node = internal::switch_node; +template +using split_node = internal::split_node; + +template +using inputs = internal::inputs; +template +using outputs = internal::outputs; + +} +} + +#endif //PLS_DATAFLOW_DATAFLOW_H_ diff --git a/lib/pls/include/pls/dataflow/internal/build_state.h b/lib/pls/include/pls/dataflow/internal/build_state.h new file mode 100644 index 0000000..753491c --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/build_state.h @@ -0,0 +1,15 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_ +#define PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_ + +namespace pls { +namespace dataflow { +namespace internal { + +enum class build_state { fresh, building, built, teardown }; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/function_node.h b/lib/pls/include/pls/dataflow/internal/function_node.h new file mode 100644 index 0000000..d7490b7 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/function_node.h @@ -0,0 +1,134 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_ +#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_ + +#include +#include +#include + +#include "in_port.h" +#include "out_port.h" +#include "node.h" + +#include "inputs.h" +#include "outputs.h" + +#include "pls/internal/helpers/seqence.h" +#include "pls/pls.h" + +namespace pls { +namespace dataflow { +namespace internal { + +using namespace pls::internal::helpers; + +// Forward Decl +template +class graph; + +template +class function_node {}; + +template +class function_node, outputs, F> : public node { + private: + // Our own type + using self_type = function_node, outputs, F>; + + // Input-Output port types + using multi_in_port_type = multi_in_port; + using multi_out_port_type = multi_out_port; + + // Input-Output value tuples + using input_tuple = std::tuple, token...>; + using output_tuple = std::tuple, token...>; + static constexpr int num_in_ports = std::tuple_size(); + static constexpr int num_out_ports = std::tuple_size(); + + // Memory used for ONE active invocation of the dataflow-graph + struct invocation_memory { + std::atomic inputs_missing_; + input_tuple input_buffer_; + }; + + public: + explicit function_node(F function) : in_port_{this, this}, out_port_{}, function_{function} {} + + template + using in_port_at = typename multi_in_port_type::template in_port_type_at; + template + using out_port_at = typename multi_out_port_type::template out_port_type_at; + + template + in_port_at &in_port() { + return in_port_.template get(); + } + + multi_in_port_type &in_ports() { + return in_port_; + } + + template + out_port_at &out_port() { + return out_port_.template get(); + } + + multi_out_port_type &out_ports() { + return out_port_; + } + + template + function_node, outputs, FUNC> + &operator>>(function_node, outputs, FUNC> &other_node); + + template + void operator>>(graph, outputs> &graph); + + template + void token_pushed(token token); + + int num_successors() const override { + return num_out_ports; + } + node *successor_at(int pos) const override { + return out_port_.next_node_at(pos); + } + + bool is_fully_connected() const override { + return in_port_.fully_connected() && out_port_.fully_connected(); + } + + int instance_buffer_size() const override { + return sizeof(invocation_memory); + } + void init_instance_buffer(void *memory) const override { + auto invocation = new(memory) invocation_memory{}; + invocation->inputs_missing_ = num_in_ports; + }; + + private: + multi_in_port_type in_port_; + multi_out_port_type out_port_; + + F function_; + + ////////////////////////////////////////////////////////////////// + // Helpers for actually calling the work lambda + ////////////////////////////////////////////////////////////////// + void execute_function(invocation_memory *invocation_memory, invocation_info invocation_info); + + template + struct propagate_output; + + template + void execute_function_internal(input_tuple &inputs, sequence, + output_tuple &outputs, sequence, + invocation_info invocation_info); +}; + +} +} +} +#include "function_node_impl.h" + +#endif //PLS_DATAFLOW_INTERNAL_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/function_node_impl.h b/lib/pls/include/pls/dataflow/internal/function_node_impl.h new file mode 100644 index 0000000..c5e058e --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/function_node_impl.h @@ -0,0 +1,97 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_ +#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_ + +#include "graph.h" + +namespace pls { +namespace dataflow { +namespace internal { + +template +template +function_node, outputs, FUNC>& +function_node, outputs, F>:: +operator>>(function_node, outputs, FUNC> &other_node) { + out_port_ >> other_node.in_ports(); + return other_node; +} + +template +template +void function_node, outputs, F>:: +operator>>(graph, outputs> &graph) { + out_port_ >> graph.output_ports(); +} + +template +template +void function_node, outputs, F>:: +token_pushed(token token) { + auto current_memory = get_invocation(token.invocation()); + + std::get(current_memory->input_buffer_) = token; + auto remaining_inputs = --(current_memory->inputs_missing_); + if (remaining_inputs == 0) { + execute_function(current_memory, token.invocation()); + current_memory->inputs_missing_ = num_in_ports; + } +} + +////////////////////////////////////////////////////////////////// +// Helpers for actually calling the work lambda +////////////////////////////////////////////////////////////////// +template +void function_node, outputs, F>:: +execute_function(invocation_memory *invocation_memory, invocation_info invocation_info) { + auto lambda = [&]() { + input_tuple &inputs = invocation_memory->input_buffer_; + output_tuple outputs{}; + + execute_function_internal(inputs, typename sequence_gen<1 + sizeof...(I)>::type(), + outputs, typename sequence_gen<1 + sizeof...(O)>::type(), invocation_info); + }; + // TODO: maybe replace this with 'continuation' style invocation + pls::scheduler::spawn_child_and_wait>(lambda); +} + +template +template +struct function_node, outputs, F>:: +propagate_output { + propagate_output(multi_out_port_type &, output_tuple &, invocation_info&) {} + void propagate() {} +}; +template +template +struct function_node, outputs, F>:: +propagate_output { + multi_out_port_type &out_port_; + output_tuple &output_tuple_; + invocation_info &invocation_info_; + + propagate_output(multi_out_port_type &out_port, output_tuple &output_tuple, invocation_info& invocation_info) : + out_port_{out_port}, output_tuple_{output_tuple}, invocation_info_{invocation_info} {} + + void propagate() { + std::get(output_tuple_).set_invocation(invocation_info_); + out_port_.template get().push_token(std::get(output_tuple_)); + propagate_output{out_port_, output_tuple_, invocation_info_}.propagate(); + } +}; + +template +template +void function_node, outputs, F>:: +execute_function_internal(input_tuple &inputs, sequence, + output_tuple &outputs, sequence, + invocation_info invocation_info) { + function_(std::get(inputs).value()..., std::get(outputs).value()...); + propagate_output<0, O0, O...>{out_port_, outputs, invocation_info}.propagate(); +} + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_ diff --git a/lib/pls/include/pls/dataflow/internal/graph.h b/lib/pls/include/pls/dataflow/internal/graph.h new file mode 100644 index 0000000..0004ed3 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/graph.h @@ -0,0 +1,139 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_H_ +#define PLS_DATAFLOW_INTERNAL_GRAPH_H_ + +#include +#include +#include + +#include "build_state.h" +#include "node.h" +#include "function_node.h" +#include "in_port.h" +#include "out_port.h" +#include "inputs.h" +#include "outputs.h" + +#include "pls/pls.h" + +namespace pls { +namespace dataflow { +namespace internal { + +template +class graph {}; + +template +class graph, outputs> : public node { + template + friend + class function_node; + private: + // Our own type + using self_type = graph, outputs>; + + // Input-Output port types (internal) + using inputs_type = multi_out_port; + using outputs_type = multi_in_port; + + // Input-Output value tuples + using value_input_tuple = std::tuple; + using input_tuple = std::tuple, token...>; + + using value_output_tuple = std::tuple; + using output_tuple = std::tuple, token...>; + + static constexpr int num_in_ports = std::tuple_size(); + static constexpr int num_out_ports = std::tuple_size(); + + // Memory used for ONE active invocation of the dataflow-graph + struct invocation_memory { + value_output_tuple *output_buffer_; + }; + + public: + template + using input_at = typename inputs_type::template out_port_type_at; + template + using output_at = typename outputs_type::template in_port_type_at; + + template + input_at &input() { + return inputs_.template get(); + } + + inputs_type &input_ports() { + return inputs_; + } + + template + output_at &output() { + return outputs_.template get(); + } + + outputs_type &output_ports() { + return outputs_; + } + + template + function_node, outputs, FUNC> + &operator>>(function_node, outputs, FUNC> &other_node); + + void wait_for_all() { + pls::scheduler::wait_for_all(); + } + + template + void token_pushed(token token); + + graph() : inputs_{}, outputs_{this, this} {} + + void build(); + + void run(value_input_tuple input, value_output_tuple &output) { + PLS_ASSERT(build_state_ == build_state::built, "Must build graph before running it!") + pls::scheduler::spawn_child(this, input, &output); + } + + int num_successors() const override { + return 0; + } + node *successor_at(int pos) const override { + PLS_ERROR("A graph instance has no direct successor!") + } + + bool is_fully_connected() const override { + return inputs_.fully_connected() && outputs_.fully_connected(); + } + + int instance_buffer_size() const override { + return sizeof(invocation_memory); + } + void init_instance_buffer(void *memory) const override { + auto invocation = new(memory) invocation_memory{}; + }; + + private: + inputs_type inputs_; + outputs_type outputs_; + + // Information about building the graph + node *node_list_start_{nullptr}; + node *node_list_current_{nullptr}; + int num_nodes_{0}; + + // Internals required for building and running + void build_recursive(node *node); + void add_node(node *new_node); + + template + struct feed_inputs; + class run_graph_task; +}; + +} +} +} +#include "graph_impl.h" + +#endif //PLS_DATAFLOW_INTERNAL_GRAPH_H_ diff --git a/lib/pls/include/pls/dataflow/internal/graph_impl.h b/lib/pls/include/pls/dataflow/internal/graph_impl.h new file mode 100644 index 0000000..e675752 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/graph_impl.h @@ -0,0 +1,139 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_ +#define PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_ + +namespace pls { +namespace dataflow { +namespace internal { + +template +template +function_node, outputs, FUNC> &graph, outputs>:: +operator>>(function_node, outputs, FUNC> &other_node) { + inputs_ >> other_node.in_ports(); + return other_node; +} + +template +template +void graph, outputs>:: +token_pushed(token token) { + auto invocation = get_invocation(token.invocation()); + std::get(*invocation->output_buffer_) = token.value(); +} + +template +void graph, outputs>:: +build() { + PLS_ASSERT(build_state_ == build_state::fresh, "Must only build a dataflow graph once!") + PLS_ASSERT(is_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!") + + build_state_ = build_state::building; + node_list_start_ = node_list_current_ = this; + memory_index_ = 0; + num_nodes_ = 1; + for (int i = 0; i < num_in_ports; i++) { + build_recursive(inputs_.next_node_at(i)); + } + build_state_ = build_state::built; +} + +template +void graph, outputs>:: +build_recursive(node *node) { + if (node->build_state_ != build_state::fresh) { + return; // Already visited + } + node->build_state_ = build_state::building; + PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graph nodes!") + + add_node(node); + for (int i = 0; i < node->num_successors(); i++) { + build_recursive(node->successor_at(i)); + } + + node->build_state_ = build_state::built; +} + +template +void graph, outputs>:: +add_node(node *new_node) { + new_node->memory_index_ = num_nodes_++; + + if (node_list_current_ == nullptr) { + node_list_current_ = new_node; + node_list_start_ = new_node; + } else { + node_list_current_->direct_successor_ = new_node; + node_list_current_ = new_node; + } +} + +template +template +struct graph, outputs>:: +feed_inputs { + feed_inputs(inputs_type &, value_input_tuple &, invocation_info &) {} + void run() {} +}; + +template +template +struct graph, outputs>:: +feed_inputs { + inputs_type &inputs_; + value_input_tuple &input_values_; + invocation_info &invocation_; + + feed_inputs(inputs_type &inputs, + value_input_tuple &input_values, + invocation_info &invocation) : inputs_{inputs}, + input_values_{input_values}, + invocation_{invocation} {} + + void run() { + inputs_.template get().push_token(token{std::get(input_values_), invocation_}); + feed_inputs{inputs_, input_values_, invocation_}.run(); + } +}; + +template +class graph, outputs>::run_graph_task : public pls::task { + graph *self_; + value_input_tuple input_; + value_output_tuple *output_; + + // Buffers for actual execution + invocation_info invocation_; + + public: + run_graph_task(graph *self, value_input_tuple &input, value_output_tuple *output) : self_{self}, + input_{input}, + output_{output}, + invocation_{nullptr} { + void **buffers; + buffers = reinterpret_cast(allocate_memory(self_->num_nodes_ * sizeof(void *))); + + node *iterator = self_->node_list_start_; + for (int i = 0; i < self_->num_nodes_; i++) { + auto required_size = iterator->instance_buffer_size(); + buffers[i] = allocate_memory(required_size); + iterator->init_instance_buffer(buffers[i]); + + iterator = iterator->direct_successor_; + } + invocation_ = invocation_info{buffers}; + + self_->get_invocation(invocation_)->output_buffer_ = output; + } + + void execute_internal() override { + feed_inputs<0, I0, I...>{self_->inputs_, input_, invocation_}.run(); + } +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_ diff --git a/lib/pls/include/pls/dataflow/internal/in_port.h b/lib/pls/include/pls/dataflow/internal/in_port.h new file mode 100644 index 0000000..8aaace0 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/in_port.h @@ -0,0 +1,136 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_INPUT_H_ +#define PLS_DATAFLOW_INTERNAL_INPUT_H_ + +#include "pls/internal/base/error_handling.h" + +#include "token.h" +#include "node.h" + +namespace pls { +namespace dataflow { +namespace internal { + +/** + * Represents a single, logical input port (no data store, simply signal propagation). + * @tparam T Type of the input port + */ +template +class in_port { + template + friend + class out_port; + + public: + explicit in_port(node *owning_node) : owning_node_{owning_node} {}; + node *owning_node() const { return owning_node_; } + bool is_connected() const { return connected_; } + + protected: + virtual void token_pushed(token token) = 0; + + private: + bool connected_{false}; + node *const owning_node_; + + void push_token(token token) { + token_pushed(token); + } + + void connect() { + if (connected_) { + PLS_ERROR("Must only connect on input once. Disconnect the output pointing to it before reconnecting.") + } + connected_ = true; + } +}; + +/** + * Represents multiple input ports bundled together (a tuple of inputs). + * Allows for a unified callback method to handle multiple typed inputs. + * + * template + * void token_pushed(token token) { Notified when tokens arrive } + * + * @tparam CB The class implementing a callback + * @tparam N Put 0 to start the recursive implementation + * @tparam I A variadic list of input types + */ +template +class multi_in_port { + // end of template recursion + public: + explicit multi_in_port(node *, CB *) {}; + + bool fully_connected() const { + return true; + } +}; +template +class multi_in_port : public in_port { + public: + // Helpers for managing recursive types + using my_type = multi_in_port; + using child_type = multi_in_port; + using value_type = I0; + + explicit multi_in_port(node *owning_node, CB *cb) : in_port{owning_node}, cb_{cb}, rec_{owning_node, cb} {}; + + void token_pushed(token token) override { + cb_->template token_pushed(token); + } + + // Helper struct required for recursive access to types by index + template + struct type_at { + }; + template + struct type_at<0, T0, T...> { + using type = T0; + using in_port_type = in_port; + }; + template + struct type_at { + using type = typename type_at::type; + using in_port_type = in_port; + }; + + // Simple interface to get types by index + template + using in_port_type_at = typename type_at::in_port_type; + template + using value_type_at = typename type_at::type; + + // Helper struct required for recursive access to input's by index + template + struct get_at { + static RES_T &get(MY_T &self) { + return get_at::get(self.rec_); + } + }; + template + struct get_at<0, RESULT, CURRENT> { + static RESULT &get(CURRENT &self) { + return self; + } + }; + + // Simple interface to access input's by index + template + in_port_type_at &get() { + return get_at, my_type>::get(*this); + } + + bool fully_connected() const { + return this->is_connected() && rec_.fully_connected(); + } + + child_type rec_; + CB *cb_; +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_INPUT_H_ diff --git a/lib/pls/include/pls/dataflow/internal/inputs.h b/lib/pls/include/pls/dataflow/internal/inputs.h new file mode 100644 index 0000000..9be844a --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/inputs.h @@ -0,0 +1,17 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_INPUTS_H_ +#define PLS_DATAFLOW_INTERNAL_INPUTS_H_ + +namespace pls { +namespace dataflow { +namespace internal { + +template +struct inputs { +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_INPUTS_H_ diff --git a/lib/pls/include/pls/dataflow/internal/merge_node.h b/lib/pls/include/pls/dataflow/internal/merge_node.h new file mode 100644 index 0000000..2a7a6c5 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/merge_node.h @@ -0,0 +1,115 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_ +#define PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_ + +#include + +#include "in_port.h" +#include "out_port.h" +#include "node.h" +#include "token.h" + +namespace pls { +namespace dataflow { +namespace internal { + +template +class merge_node : public node { + // Our own type + using self_type = merge_node; + + // Input-Output port types + using multi_in_port_type = multi_in_port; + using multi_out_port_type = multi_out_port; + + // Input-Output tuples + using input_tuple = std::tuple, token, token>; + + // Memory used for ONE active invocation of the dataflow-graph + struct invocation_memory { + std::atomic inputs_missing_; + input_tuple input_buffer_; + }; + + // Encodings for current input state (needs to fit into an atomic) + static constexpr unsigned int IN_0_MISSING = 1u << 0u; + static constexpr unsigned int IN_1_MISSING = 1u << 1u; + static constexpr unsigned int COND_MISSING = 1u << 2u; + static constexpr unsigned int INITIAL_STATE = IN_0_MISSING + IN_1_MISSING + COND_MISSING; + + public: + explicit merge_node() : in_port_{this, this}, out_port_{} {} + + in_port &true_in_port() { + return in_port_.template get<0>(); + } + + in_port &false_in_port() { + return in_port_.template get<1>(); + } + + in_port &condition_in_port() { + return in_port_.template get<2>(); + } + + out_port &value_out_port() { + return out_port_.template get<0>(); + } + + template + void token_pushed(token token) { + auto current_memory = get_invocation(token.invocation()); + + std::get(current_memory->input_buffer_) = token; + + unsigned int remaining_inputs; + if (POS == 0) { + remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_0_MISSING) - IN_0_MISSING; + } else if (POS == 1) { + remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_1_MISSING) - IN_1_MISSING; + } else { + remaining_inputs = (current_memory->inputs_missing_).fetch_sub(COND_MISSING) - COND_MISSING; + } + + if ((remaining_inputs & COND_MISSING) == 0) { + if ((remaining_inputs & IN_0_MISSING) == 0) { + auto &data = std::get<0>(current_memory->input_buffer_); + value_out_port().push_token(data); + current_memory->inputs_missing_ += IN_0_MISSING + COND_MISSING; + } else if ((remaining_inputs & IN_1_MISSING) == 0) { + auto &data = std::get<1>(current_memory->input_buffer_); + value_out_port().push_token(data); + current_memory->inputs_missing_ += IN_1_MISSING + COND_MISSING; + } + } + } + + int num_successors() const override { + return 1; + } + node *successor_at(int pos) const override { + return out_port_.next_node_at(pos); + } + + bool is_fully_connected() const override { + return in_port_.fully_connected() && out_port_.fully_connected(); + } + + int instance_buffer_size() const override { + return sizeof(invocation_memory); + } + void init_instance_buffer(void *memory) const override { + auto invocation = new(memory) invocation_memory{}; + invocation->inputs_missing_ = INITIAL_STATE; + }; + + private: + multi_in_port_type in_port_; + multi_out_port_type out_port_; +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/node.h b/lib/pls/include/pls/dataflow/internal/node.h new file mode 100644 index 0000000..0b9c447 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/node.h @@ -0,0 +1,46 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_NODE_H_ +#define PLS_DATAFLOW_INTERNAL_NODE_H_ + +#include "build_state.h" +#include "token.h" + +namespace pls { +namespace dataflow { +namespace internal { + +/** + * Represents a single piece included in a dataflow graph. + * This can be anything connected to form the dataflow that requires + * memory during invocation and is present in the connected flow graph. + */ +class node { + template + friend + class graph; + + public: + virtual int num_successors() const = 0; + virtual node *successor_at(int pos) const = 0; + + virtual int instance_buffer_size() const = 0; + virtual void init_instance_buffer(void *memory) const = 0; + + virtual bool is_fully_connected() const = 0; + + template + M *get_invocation(invocation_info invocation) { + return invocation.template get_instance_buffer(memory_index_); + } + + private: + int memory_index_{0}; + node *direct_successor_{nullptr}; + build_state build_state_{build_state::fresh}; +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/out_port.h b/lib/pls/include/pls/dataflow/internal/out_port.h new file mode 100644 index 0000000..b499150 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/out_port.h @@ -0,0 +1,148 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_OUTPUT_H_ +#define PLS_DATAFLOW_INTERNAL_OUTPUT_H_ + +#include + +#include "in_port.h" +#include "node.h" + +namespace pls { +namespace dataflow { +namespace internal { + +template +class out_port { + public: + void connect(in_port &target) { + if (connected_) { + PLS_ERROR("Must only connect output once. Please disconnect it before reconnecting.") + } + + target.connect(); + target_ = ⌖ + connected_ = true; + } + + void operator>>(in_port &input) { + connect(input); + } + + void push_token(token token) const { + target_->push_token(token); + } + + node *next_node() const { + return target_->owning_node(); + } + + bool is_connected() const { return connected_; } + + private: + bool connected_{false}; + in_port *target_{nullptr}; +}; + +/** + * Represents multiple output ports bundled together (a tuple of inputs). + * + * @tparam I A variadic list of input types + */ +template +class multi_out_port { + private: + // Helpers for managing recursive types + using value_tuple_type = std::tuple; + using out_port_tupel_type = std::tuple, out_port...>; + static constexpr int num_outputs = std::tuple_size(); + + public: + // Simple interface to get types by index + template + using out_port_type_at = typename std::tuple_element::type; + template + using value_type_at = typename std::tuple_element::type; + + // Simple interface to access input's by index + template + out_port_type_at &get() { + return std::get(outputs_); + } + + // Simple interface to connect multiple intputs to matching, multiple outputs + template + void operator>>(multi_in_port &input) { + connect_to < CB, 0 > {this, &input}.connect(); + } + + node *next_node_at(int pos) const { + return next_node < 0 > {&outputs_}.get(pos); + } + + bool fully_connected() const { + return connected < 0 > {&outputs_}.get(); + } + + private: + out_port_tupel_type outputs_; + + template + struct next_node { + const out_port_tupel_type *outputs_; + node *get(int i) { + if (POS == i) { + return std::get(*outputs_).next_node(); + } else { + return next_node{outputs_}.get(i); + } + } + }; + template + struct next_node { + const out_port_tupel_type *outputs_; + node *get(int i) { + PLS_ERROR("Try to access invalid successor node index!") + } + }; + + template + struct connected { + const out_port_tupel_type *outputs_; + bool get() { + bool self = std::get(*outputs_).is_connected(); + bool children = connected{outputs_}.get(); + return self && children; + } + }; + template + struct connected { + const out_port_tupel_type *outputs_; + bool get() { + return true; + } + }; + + template + struct connect_to { + multi_out_port *out_port_; + multi_in_port *in_port_; + + void connect() { + out_port_->template get() >> in_port_->template get(); + connect_to{out_port_, in_port_}.connect(); + } + }; + template + struct connect_to { + multi_out_port *out_port_; + multi_in_port *in_port_; + + void connect() { + }; + }; +}; +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_OUTPUT_H_ diff --git a/lib/pls/include/pls/dataflow/internal/outputs.h b/lib/pls/include/pls/dataflow/internal/outputs.h new file mode 100644 index 0000000..40237d9 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/outputs.h @@ -0,0 +1,17 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_OUTPUTS_H_ +#define PLS_DATAFLOW_INTERNAL_OUTPUTS_H_ + +namespace pls { +namespace dataflow { +namespace internal { + +template +struct outputs { +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_OUTPUTS_H_ diff --git a/lib/pls/include/pls/dataflow/internal/split_node.h b/lib/pls/include/pls/dataflow/internal/split_node.h new file mode 100644 index 0000000..09d106d --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/split_node.h @@ -0,0 +1,73 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_ +#define PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_ + +#include + +#include "in_port.h" +#include "out_port.h" +#include "node.h" +#include "token.h" + +namespace pls { +namespace dataflow { +namespace internal { + +template +class split_node : public node { + // Our own type + using self_type = split_node; + + // Input-Output port types + using multi_in_port_type = multi_in_port; + using multi_out_port_type = multi_out_port; + + public: + explicit split_node() : in_port_{this, this}, out_port_{} {} + + in_port &value_in_port() { + return in_port_.template get<0>(); + } + + out_port &out_port_1() { + return out_port_.template get<0>(); + } + + out_port &out_port_2() { + return out_port_.template get<1>(); + } + + template + void token_pushed(token token) { + out_port_1().push_token(token); + out_port_2().push_token(token); + } + + int num_successors() const override { + return 2; + } + node *successor_at(int pos) const override { + return out_port_.next_node_at(pos); + } + + bool is_fully_connected() const override { + return in_port_.fully_connected() && out_port_.fully_connected(); + } + + int instance_buffer_size() const override { + return 0; + } + void init_instance_buffer(void *memory) const override { + // No need for memory, we simply forward + }; + + private: + multi_in_port_type in_port_; + multi_out_port_type out_port_; +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/switch_node.h b/lib/pls/include/pls/dataflow/internal/switch_node.h new file mode 100644 index 0000000..598275b --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/switch_node.h @@ -0,0 +1,99 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_ +#define PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_ + +#include + +#include "in_port.h" +#include "out_port.h" +#include "node.h" +#include "token.h" + +namespace pls { +namespace dataflow { +namespace internal { + +template +class switch_node : public node { + // Our own type + using self_type = switch_node; + + // Input-Output port types + using multi_in_port_type = multi_in_port; + using multi_out_port_type = multi_out_port; + + // Input-Output tuples + using input_tuple = std::tuple, token>; + + // Memory used for ONE active invocation of the dataflow-graph + struct invocation_memory { + std::atomic inputs_missing_; + input_tuple input_buffer_; + }; + + public: + explicit switch_node() : in_port_{this, this}, out_port_{} {} + + in_port &value_in_port() { + return in_port_.template get<0>(); + } + + in_port &condition_in_port() { + return in_port_.template get<1>(); + } + + out_port &true_out_port() { + return out_port_.template get<0>(); + } + + out_port &false_out_port() { + return out_port_.template get<1>(); + } + + template + void token_pushed(token token) { + auto current_memory = get_invocation(token.invocation()); + + std::get(current_memory->input_buffer_) = token; + auto remaining_inputs = --(current_memory->inputs_missing_); + if (remaining_inputs == 0) { + bool condition = std::get<1>(current_memory->input_buffer_).value(); + auto &data = std::get<0>(current_memory->input_buffer_); + if (condition) { + true_out_port().push_token(data); + } else { + false_out_port().push_token(data); + } + current_memory->inputs_missing_ = 2; + } + } + + int num_successors() const override { + return 2; + } + node *successor_at(int pos) const override { + return out_port_.next_node_at(pos); + } + + bool is_fully_connected() const override { + return in_port_.fully_connected() && out_port_.fully_connected(); + } + + int instance_buffer_size() const override { + return sizeof(invocation_memory); + } + void init_instance_buffer(void *memory) const override { + auto invocation = new(memory) invocation_memory{}; + invocation->inputs_missing_ = 2; + }; + + private: + multi_in_port_type in_port_; + multi_out_port_type out_port_; +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_ diff --git a/lib/pls/include/pls/dataflow/internal/token.h b/lib/pls/include/pls/dataflow/internal/token.h new file mode 100644 index 0000000..ee40720 --- /dev/null +++ b/lib/pls/include/pls/dataflow/internal/token.h @@ -0,0 +1,42 @@ + +#ifndef PLS_DATAFLOW_INTERNAL_TOKEN_H_ +#define PLS_DATAFLOW_INTERNAL_TOKEN_H_ + +namespace pls { +namespace dataflow { +namespace internal { + +class invocation_info { + public: + explicit invocation_info(void **memory) : memory_{memory} {}; + invocation_info(invocation_info const &other) = default; + + template + T *get_instance_buffer(int pos) { + return reinterpret_cast(memory_[pos]); + } + + private: + void **memory_; +}; + +template +class token { + T value_; + invocation_info invocation_; + + public: + token() : invocation_{nullptr} {}; // Default Constructor Stays uninitialized + token(T value, invocation_info color) : value_{value}, invocation_{color} {}; + + T &value() { return value_; } + invocation_info invocation() const { return invocation_; } + + void set_invocation(invocation_info invocation_info) { invocation_ = invocation_info; } +}; + +} +} +} + +#endif //PLS_DATAFLOW_INTERNAL_TOKEN_H_ diff --git a/lib/pls/include/pls/internal/base/error_handling.h b/lib/pls/include/pls/internal/base/error_handling.h index 381758a..15a2df6 100644 --- a/lib/pls/include/pls/internal/base/error_handling.h +++ b/lib/pls/include/pls/internal/base/error_handling.h @@ -2,7 +2,8 @@ #ifndef PLS_ERROR_HANDLING_H #define PLS_ERROR_HANDLING_H -#include +#include +#include /** * Called when there is an non-recoverable error/invariant in the scheduler. @@ -10,7 +11,7 @@ * The implementation can be changed if for example no iostream is available on a system * (or its inclusion adds too much overhead). */ -#define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1); -#define PLS_ASSERT(cond, msg) if (!cond) { PLS_ERROR(msg) } +#define PLS_ERROR(msg) printf("%s\n", msg); exit(1); +#define PLS_ASSERT(cond, msg) if (!(cond)) { PLS_ERROR(msg) } #endif //PLS_ERROR_HANDLING_H diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h index 0952968..e04567b 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h @@ -2,6 +2,8 @@ #ifndef PLS_ALIGNED_STACK_IMPL_H #define PLS_ALIGNED_STACK_IMPL_H +#include + namespace pls { namespace internal { namespace data_structures { diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h index f6338f9..d985632 100644 --- a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h @@ -25,7 +25,7 @@ class work_stealing_deque_item { // as the race occurs in 'pop_head', where ALL CASES reading a corrupt/old value are cases // where the next CAS fails anywas, thus making these corrupted values have no influence on // the overall program execution. - // ==> If we find performance problems in this queue, try removing the atoimcs again. + // ==> If we find performance problems in this queue, try removing the atomics again. // Pointer to the actual data std::atomic data_; // Index (relative to stack base) to the next and previous element diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h index d3316be..0730674 100644 --- a/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h @@ -2,6 +2,9 @@ #ifndef PLS_WORK_STEALING_DEQUE_IMPL_H_ #define PLS_WORK_STEALING_DEQUE_IMPL_H_ +#include +#include + namespace pls { namespace internal { namespace data_structures { diff --git a/lib/pls/include/pls/internal/helpers/member_function.h b/lib/pls/include/pls/internal/helpers/member_function.h new file mode 100644 index 0000000..d78e142 --- /dev/null +++ b/lib/pls/include/pls/internal/helpers/member_function.h @@ -0,0 +1,35 @@ + +#ifndef PLS_INTERNAL_HELPERS_MEMBER_FUNCTION_H_ +#define PLS_INTERNAL_HELPERS_MEMBER_FUNCTION_H_ + +namespace pls { +namespace internal { +namespace helpers { + +template +class member_function { + public: + using type = member_function; + + member_function(C *object, R (C::*function_pointer)(ARGS...)) : object_{object}, + function_pointer_{function_pointer} {} + + R operator()(ARGS... args) { + ((*object_).*function_pointer_)(args...); + } + + private: + C *object_; + R (C::*function_pointer_)(ARGS...); +}; + +template +static constexpr member_function bind(C *object, R (C::*function_pointer)(ARGS...)) { + return {object, function_pointer}; +} + +} +} +} + +#endif //PLS_INTERNAL_HELPERS_MEMBER_FUNCTION_H_ diff --git a/lib/pls/include/pls/internal/helpers/seqence.h b/lib/pls/include/pls/internal/helpers/seqence.h new file mode 100644 index 0000000..1a63736 --- /dev/null +++ b/lib/pls/include/pls/internal/helpers/seqence.h @@ -0,0 +1,27 @@ + +#ifndef PLS_INTERNAL_HELPERS_SEQENCE_H_ +#define PLS_INTERNAL_HELPERS_SEQENCE_H_ + +// See: https://stackoverflow.com/questions/7858817/unpacking-a-tuple-to-call-a-matching-function-pointer +// Would be easy in C++ 14 (has index sequences), but seems to be the only way to do it in C++ 11 + +namespace pls { +namespace internal { +namespace helpers { + +template +struct sequence {}; + +template +struct sequence_gen : sequence_gen {}; + +template +struct sequence_gen<0, S...> { + typedef sequence type; +}; + +} +} +} + +#endif //PLS_INTERNAL_HELPERS_SEQENCE_H_ diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index f90d55e..cbaef30 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -7,6 +7,7 @@ #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/helpers/unique_id.h" +#include "pls/internal/helpers/member_function.h" namespace pls { @@ -16,6 +17,8 @@ using internal::scheduling::malloc_scheduler_memory; using internal::scheduling::scheduler; using unique_id = internal::helpers::unique_id; +template +using member_function = internal::helpers::member_function; using internal::scheduling::task; using internal::scheduling::lambda_task_by_reference; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index f9af66a..9023079 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -2,5 +2,6 @@ add_executable(tests main.cpp data_structures_test.cpp scheduling_tests.cpp - algorithm_test.cpp) + algorithm_test.cpp + dataflow_test.cpp) target_link_libraries(tests catch2 pls) diff --git a/test/algorithm_test.cpp b/test/algorithm_test.cpp index 2909a37..beb0cfe 100644 --- a/test/algorithm_test.cpp +++ b/test/algorithm_test.cpp @@ -1,7 +1,7 @@ #include #include -#include +#include "pls/pls.h" using namespace pls; diff --git a/test/base_tests.cpp b/test/base_tests.cpp index 61435d5..ba3789e 100644 --- a/test/base_tests.cpp +++ b/test/base_tests.cpp @@ -1,7 +1,7 @@ #include -#include -#include -#include +#include "pls/internal/base/thread.h" +#include "pls/internal/base/spin_lock.h" +#include "pls/internal/base/system_details.h" #include #include diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index 2327fb9..4117139 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -1,12 +1,11 @@ #include -#include +#include "pls/internal/base/system_details.h" -#include -#include -#include +#include "pls/internal/data_structures/aligned_stack.h" +#include "pls/internal/data_structures/locking_deque.h" +#include "pls/internal/data_structures/work_stealing_deque.h" -#include #include using namespace pls::internal::data_structures; diff --git a/test/dataflow_test.cpp b/test/dataflow_test.cpp new file mode 100644 index 0000000..114c93d --- /dev/null +++ b/test/dataflow_test.cpp @@ -0,0 +1,114 @@ +#include +#include +#include + +#include "pls/pls.h" +#include "pls/dataflow/dataflow.h" + +using namespace pls; +using namespace pls::dataflow; + +void step_1(const int &in, int &out) { + out = in * 2; +} + +class member_call_test { + public: + void step_2(const int &in, int &out) { + out = in * 2; + } +}; + +TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") { + malloc_scheduler_memory my_scheduler_memory{8, 2u << 12u}; + scheduler my_scheduler{&my_scheduler_memory, 8}; + my_scheduler.perform_work([]() { + SECTION("linear pipelines") { + auto step_1 = [](const int &in, double &out1, double &out2) { + out1 = (double) in / 2.0; + out2 = (double) in / 3.0; + }; + auto step_2 = [](const double &in1, const double &in2, double &out) { + out = in1 * in2; + }; + + graph, outputs> linear_graph; + function_node, outputs, decltype(step_1)> node_1{step_1}; + function_node, outputs, decltype(step_2)> node_2{step_2}; + + linear_graph >> node_1 >> node_2 >> linear_graph; + linear_graph.build(); + + std::tuple out{}; + linear_graph.run(5, out); + linear_graph.wait_for_all(); + + REQUIRE(std::get<0>(out) == (5 / 2.0) * (5 / 3.0)); + } + + SECTION("member and function steps") { + member_call_test instance; + using member_func_type = member_function; + member_func_type func_1{&instance, &member_call_test::step_2}; + + graph, outputs> graph; + function_node, outputs, void (*)(const int &, int &)> node_1{&step_1}; + function_node, outputs, member_func_type> node_2{func_1}; + + graph >> node_1 >> node_2 >> graph; + graph.build(); + + std::tuple out{}; + graph.run(1, out); + graph.wait_for_all(); + + REQUIRE(std::get<0>(out) == 4); + } + + SECTION("non linear pipeline") { + auto path_one = [](const int &in, int &out) { + out = in + 1; + }; + auto path_two = [](const int &in, int &out) { + out = in - 1; + }; + + graph, outputs> graph; + function_node, outputs, decltype(path_one)> node_1{path_one}; + function_node, outputs, decltype(path_two)> node_2{path_two}; + switch_node switch_node; + merge_node merge_node; + split_node split; + + // Split up boolean signal + graph.input<1>() >> split.value_in_port(); + + // Feed switch + graph.input<0>() >> switch_node.value_in_port(); + split.out_port_1() >> switch_node.condition_in_port(); + + // True path + switch_node.true_out_port() >> node_1.in_port<0>(); + node_1.out_port<0>() >> merge_node.true_in_port(); + // False path + switch_node.false_out_port() >> node_2.in_port<0>(); + node_2.out_port<0>() >> merge_node.false_in_port(); + + // Read Merge + split.out_port_2() >> merge_node.condition_in_port(); + merge_node.value_out_port() >> graph.output<0>(); + + + // Build and run + graph.build(); + std::tuple out1{}, out2{}; + graph.run({0, true}, out1); + graph.run({0, false}, out2); + graph.wait_for_all(); + + REQUIRE(std::get<0>(out1) == 1); + REQUIRE(std::get<0>(out2) == -1); + } + + }); +} diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index 1334d6e..2cce39a 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -1,6 +1,6 @@ #include -#include +#include "pls/pls.h" using namespace pls;