Commit aba75f54 by Florian Fritz

Merge branch 'dataflow' into 'master'

Merge: Dataflow

See merge request !12
parents 938be84f 7c637562
Pipeline #1291 passed with stages
in 4 minutes 3 seconds
...@@ -4,6 +4,186 @@ A collection of stuff that we noticed during development. ...@@ -4,6 +4,186 @@ A collection of stuff that we noticed during development.
Useful later on two write a project report and to go back Useful later on two write a project report and to go back
in time to find out why certain decisions where made. in time to find out why certain decisions where made.
## 19.07.2019 - Colored tokens, recursion and where to put memory
While implementing dataflow graphs we encountered some obstacles.
The most severe one the most severe one that impacts the
internal design of the API, end user features and performance
is how to handle token flows, especially colored tokens for parallel
and/or recursive calls.
The basic issue here is, that each execution instance requires its
own isolated environment to execute in. Therefore, each invocation
instance (parallel or recursive) needs an unique identifier.
This identifier is usually realized using colored tokens,
as for example in this \[1] classic dataflow language implementation.
The problem with this is, that in our environment we are constrained
to static memory allocation and lock-free programming.
To handle static memory allocation we first decided to follow an model
introduced by EMBB, associating an clock to each colored token,
which maps it to an slot in an array of possible parallel execution
instances. This works fine in EMBB, as they put two major limitations
on their execution model: no cycles and no recursion.
(Issues arise with correcly coloring tokens in an lock-free matter
without too much contention, leading us to believe that this
method scales very bad, especially with smaller workloads)
While we could simply implement that, we wondered if there is a way to
better fit dataflow in our existing, task-stack based programming
approach.
After some investigation we found some core statements:
- The dataflow graph itself describes only programming execution,
it should therefore not be tightly coupled with data/buffer management
- Each invocation of an dataflow graph is logically a task in our model,
it therefore makes sense to map the memory and coordination resources
required for one invocation instance directly to this task
- What we do is in fact not 'pure' dataflow programming, as we do not
try to re-create a full programming language (e.g. we do not care about
memory management and loops described in \[1])
- What we do is more close to functional programming with single
assignment rules and recursion (see Elixir for a nice syntactic example)
Our plan is therefore the following:
Separate structure of the dataflow from execution and map
one execution instance to one active task in our runtime system.
This, conveniently, also mitigates most issues related to
memory allocation/parallelism in the graph and makes for a
nicer end user API (no more buffer types/parallelism in templates).
\[1] J. B. Dennis, “First version of a data flow procedure language,” in Programming Symposium, vol. 19, B. Robinet, Ed. Berlin, Heidelberg: Springer Berlin Heidelberg, 1974, pp. 362–376.
## 19.07.2019 - Variadic Templates for input/output goups
We searched for the correct way to represent an nodes
or graphs input/output pairs for a while and found the
solution in partial specialization of templates.
```C++
template<typename INS, typename OUTS>
class graph {};
template<typename I0, typename ...I, typename O0, typename ...O>
class graph<inputs<I0, I...>, outputs<O0, O...>> { ... };
```
The above code allows us to enforce an end-user API that is
clean while still giving us full access on the input/output variadic
template types. The end user API is the following:
```C++
graph<inputs<int, int>, outputs<std::string>> g;
```
## 03.07.2019 - Outline/Plan for our Dataflow API
The following describes our ideas for what we expect/want to build
in our dataflow API. The restrictions and decisions try to hold a
balance for usability, feasibility of the implementation and
feature richness (what kind of flows can even be represented using
the API).
We will model our dataflow closely to the EMBB implementation.
There are buffers that are controlled by a color/clock and
a DF graph has sources (inputs) and sinks (outputs).
Some notable properties we want (and that can differ from EMBB):
- A DF graph must be well behaved, i.e. for each full set of input
values exactly one set of output values is produced (each parallel
invocation therefore corresponds to exactly one value in the in-
and outputs)
- A DF graph must be declared with all it's interface, i.e. it's
full set of input types and output types must be known
when declaring it
```c++
dataflow<Inputs<String, Int>, Outputs<Int>, NUM_PARAL> df_instance;
```
- Nodes inside the DF graph are produced by the graph itself, allowing
it to propagate parallelism constraints (for example directly creating
the children with correct buffer capacities). This is done by having
factory functions on a concrete DF graph instance.
- A DF graph's sources/sinks are 'packed', i.e. the user must
provide a full sets of inputs to trigger the DF graph and is provided
a full set ouf outputs when reading a result from the DF Graph
(this highlights our intend for well behaved flow graphs, as users
do not even get the notion that it is ok to only return partial results)
```c++
auto source = df.source([](String &out1, Int &out2) {
if (elements_avaliable) {
out1 = ...;
out2 = ...;
return true;
} else {
return false;
}
});
...
auto sink = df.sink([](const &Image, const &Int){
...;
});
```
- Easy API's for working with array data are provided in form of
interator sources/sinks
```c++
auto source = df.iterator_source();
auto sink = df.iterator_sink();
...
source.reset(input.begin(), input.end());
sink.reset(output.begin());
df.wait_for_all();
```
- In the first version nodes are always fully parallel,
further versions might include the per node property of
unordered_serial (nodes are executed at most once, but not ordered,
e.g. for accessing shared memory) or ordered_serial (e.g.
for logging something in correct order to a file/console).
- Sinks/User accessed outputs for a DF graph are always ordered_serial,
preventing confusion for the end user and ensuring deterministic
execution
- Memory management for all buffers and the DF graph itself is made
explicitly visible to the end user by enforcing him to hold all
components of the graph in memory when using it. This keeps on with
our phylosophy of not having hidden memory allocations, making
development for e.g. embedded platforms simpler, as it is clear
where and what resources are used (one can simply sizeof(...) all
parts of the graph and find out how much memory the buffers and so on
require)
- This model in principle allows recursive invocation. We will not
implement this in the first place, but keep the option for later.
This will potentially allow different patterns, like stencil operations,
to be implemented with the system.
## 03.07.2019 - Dataflow in EMBB
EMBB's dataflow is a very simple but effective implementation
of k-colored (maximum of k concurrent invocations, data/tokens
marked by an individual color per parallel invocation).
They force a acyclic, recursion-free flow and force to set source
nodes and sink nodes explicitly (acting as in- and outputs).
This allows them to send signals down on arcs even if there is
no value, e.g. if there is a split in control flow the 'not used'
side of the flow will be fed an 'empty' token, signaling sinks
that the execution of this parallel instance reached the sink.
Once one color of tokens (so one parallel execution instance)
reaches ALL sinks the model allows a new to be input.
This force of all tokens reaching the sinks before new ones can
entry is ordered, thus potentially limiting concurrency,
but at the same time makes for a very simple to implement model.
Computational nodes between sources and sinks are associated
with input buffers (having the same capacity as the number of
parallel invocations allowed). These can hold values from
predecessors until all inputs for the node are ready. The node
is started as a process as soon as the last needed input is provided
(this event is triggered by a reference counter of missing inputs).
## 26.06.2019 - Notes on Dataflow Implementation ## 26.06.2019 - Notes on Dataflow Implementation
### Dataflow in general ### Dataflow in general
......
add_executable(benchmark_unbalanced main.cpp node.h node.cpp picosha2.h) add_executable(benchmark_unbalanced main.cpp node.h function_node.cpp picosha2.h)
target_link_libraries(benchmark_unbalanced pls) target_link_libraries(benchmark_unbalanced pls)
if (EASY_PROFILER) if (EASY_PROFILER)
target_link_libraries(benchmark_unbalanced easy_profiler) target_link_libraries(benchmark_unbalanced easy_profiler)
......
// Headers are available because we added the pls target // Headers are available because we added the pls target
#include <iostream> #include <string>
#include <functional> #include <cstdio>
#include <array>
#include <atomic>
#include <memory>
#include <typeindex>
#include <tuple> #include <tuple>
#include <array>
#include <pls/pls.h> #include "pls/pls.h"
#include "pls/dataflow/internal/inputs.h"
#include "pls/dataflow/internal/outputs.h"
#include "pls/dataflow/internal/function_node.h"
#include "pls/dataflow/internal/graph.h"
#include "pls/dataflow/internal/switch_node.h"
#include "pls/dataflow/internal/split_node.h"
#include "pls/dataflow/internal/merge_node.h"
int main() { int main() {
using namespace pls::dataflow;
using namespace pls::dataflow::internal;
// Define
graph<inputs<int, int>, outputs<int>> graph;
auto triple = [](const int &i1, int &o1) {
o1 = i1 * 3;
};
function_node<inputs<int>, outputs<int>, decltype(triple)> triple_node{triple};
auto minus_one = [](const int &i1, int &o1) {
o1 = i1 - 1;
};
function_node<inputs<int>, outputs<int>, decltype(minus_one)> minus_one_node_1{minus_one};
function_node<inputs<int>, outputs<int>, decltype(minus_one)> minus_one_node_2{minus_one};
auto is_positive = [](const int &i1, bool &o1) {
o1 = i1 > 0;
};
function_node<inputs<int>, outputs<bool>, decltype(is_positive)> is_positive_node{is_positive};
auto recursion = [&](const int &i1, const int &i2, int &o1) {
std::tuple<int> out;
graph.run({i1, i2}, out);
pls::scheduler::wait_for_all();
o1 = std::get<0>(out);
};
function_node<inputs<int, int>, outputs<int>, decltype(recursion)> recursion_node{recursion};
split_node<int> minus_split;
split_node<bool> decision_split;
switch_node<int> recursion_split;
merge_node<int> recursion_merge;
// Connect
minus_one_node_1 >> minus_one_node_2;
// Inputs to first processing step
graph.input<0>() >> minus_one_node_1.in_port<0>();
graph.input<1>() >> triple_node.in_port<0>();
minus_one_node_2.out_port<0>() >> minus_split.value_in_port();
// Prepare decision...
minus_split.out_port_1() >> is_positive_node.in_port<0>();
is_positive_node.out_port<0>() >> decision_split.value_in_port();
triple_node.out_port<0>() >> recursion_split.value_in_port();
decision_split.out_port_1() >> recursion_split.condition_in_port();
// Connect true case (recursion)
minus_split.out_port_2() >> recursion_node.in_port<0>();
recursion_split.true_out_port() >> recursion_node.in_port<1>();
recursion_node.out_port<0>() >> recursion_merge.true_in_port();
// Connect false case (no recursion)
recursion_split.false_out_port() >> recursion_merge.false_in_port();
// Deliver final result (back from merge)
decision_split.out_port_2() >> recursion_merge.condition_in_port();
recursion_merge.value_out_port() >> graph.output<0>();
// Build
graph.build();
pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18u};
pls::scheduler scheduler{&my_scheduler_memory, 8};
scheduler.perform_work([&] {
// Schedule Execution
std::tuple<int> out1, out2, out3;
graph.run({1, 2}, out1);
graph.run({1, 1}, out2);
graph.run({5, 6}, out3);
// Wait for results and print
pls::scheduler::wait_for_all();
std::cout << std::get<0>(out1) << std::endl;
std::cout << std::get<0>(out2) << std::endl;
std::cout << std::get<0>(out3) << std::endl;
});
} }
...@@ -9,6 +9,21 @@ add_library(pls STATIC ...@@ -9,6 +9,21 @@ add_library(pls STATIC
include/pls/algorithms/scan.h include/pls/algorithms/scan.h
include/pls/algorithms/scan_impl.h include/pls/algorithms/scan_impl.h
include/pls/dataflow/dataflow.h
include/pls/dataflow/internal/inputs.h
include/pls/dataflow/internal/outputs.h
include/pls/dataflow/internal/token.h
include/pls/dataflow/internal/in_port.h
include/pls/dataflow/internal/out_port.h
include/pls/dataflow/internal/function_node.h
include/pls/dataflow/internal/node.h
include/pls/dataflow/internal/graph.h
include/pls/dataflow/internal/build_state.h
include/pls/dataflow/internal/function_node_impl.h
include/pls/dataflow/internal/graph_impl.h
include/pls/dataflow/internal/switch_node.h
include/pls/dataflow/internal/merge_node.h
include/pls/internal/base/spin_lock.h include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp
...@@ -22,6 +37,7 @@ add_library(pls STATIC ...@@ -22,6 +37,7 @@ add_library(pls STATIC
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/deque.h
include/pls/internal/data_structures/locking_deque.h include/pls/internal/data_structures/locking_deque.h
include/pls/internal/data_structures/locking_deque_impl.h include/pls/internal/data_structures/locking_deque_impl.h
include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h
...@@ -32,13 +48,15 @@ add_library(pls STATIC ...@@ -32,13 +48,15 @@ add_library(pls STATIC
include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/mini_benchmark.h
include/pls/internal/helpers/unique_id.h include/pls/internal/helpers/unique_id.h
include/pls/internal/helpers/range.h include/pls/internal/helpers/range.h
include/pls/internal/helpers/seqence.h
include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/thread_state.h
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/scheduling/lambda_task.h include/pls/internal/data_structures/deque.h) include/pls/internal/scheduling/lambda_task.h
include/pls/dataflow/internal/split_node.h include/pls/internal/helpers/member_function.h)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
PUBLIC PUBLIC
......
#ifndef PLS_DATAFLOW_DATAFLOW_H_
#define PLS_DATAFLOW_DATAFLOW_H_
#include "internal/graph.h"
#include "internal/function_node.h"
#include "internal/merge_node.h"
#include "internal/switch_node.h"
#include "internal/split_node.h"
#include "internal/inputs.h"
#include "internal/outputs.h"
namespace pls {
namespace dataflow {
template<typename INS, typename OUTS>
using graph = internal::graph<INS, OUTS>;
template<typename INS, typename OUTS, typename F>
using function_node = internal::function_node<INS, OUTS, F>;
template<typename I>
using merge_node = internal::merge_node<I>;
template<typename I>
using switch_node = internal::switch_node<I>;
template<typename I>
using split_node = internal::split_node<I>;
template<typename ...I>
using inputs = internal::inputs<I...>;
template<typename ...O>
using outputs = internal::outputs<O...>;
}
}
#endif //PLS_DATAFLOW_DATAFLOW_H_
#ifndef PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_
#define PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_
namespace pls {
namespace dataflow {
namespace internal {
enum class build_state { fresh, building, built, teardown };
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_BUILD_STATE_H_
#ifndef PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_
#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_H_
#include <tuple>
#include <iostream>
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "inputs.h"
#include "outputs.h"
#include "pls/internal/helpers/seqence.h"
#include "pls/pls.h"
namespace pls {
namespace dataflow {
namespace internal {
using namespace pls::internal::helpers;
// Forward Decl
template<typename INS, typename OUTS>
class graph;
template<typename INS, typename OUTS, typename F>
class function_node {};
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
class function_node<inputs<I0, I...>, outputs<O0, O...>, F> : public node {
private:
// Our own type
using self_type = function_node<inputs<I0, I...>, outputs<O0, O...>, F>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I0, I...>;
using multi_out_port_type = multi_out_port<O0, O...>;
// Input-Output value tuples
using input_tuple = std::tuple<token<I0>, token<I>...>;
using output_tuple = std::tuple<token<O0>, token<O>...>;
static constexpr int num_in_ports = std::tuple_size<input_tuple>();
static constexpr int num_out_ports = std::tuple_size<output_tuple>();
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
std::atomic<int> inputs_missing_;
input_tuple input_buffer_;
};
public:
explicit function_node(F function) : in_port_{this, this}, out_port_{}, function_{function} {}
template<int POS>
using in_port_at = typename multi_in_port_type::template in_port_type_at<POS>;
template<int POS>
using out_port_at = typename multi_out_port_type::template out_port_type_at<POS>;
template<int POS>
in_port_at<POS> &in_port() {
return in_port_.template get<POS>();
}
multi_in_port_type &in_ports() {
return in_port_;
}
template<int POS>
out_port_at<POS> &out_port() {
return out_port_.template get<POS>();
}
multi_out_port_type &out_ports() {
return out_port_;
}
template<typename ...OS, typename FUNC>
function_node<inputs<O0, O...>, outputs<OS...>, FUNC>
&operator>>(function_node<inputs<O0, O...>, outputs<OS...>, FUNC> &other_node);
template<typename ...IS>
void operator>>(graph<inputs<IS...>, outputs<O0, O...>> &graph);
template<int POS, typename T>
void token_pushed(token<T> token);
int num_successors() const override {
return num_out_ports;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = num_in_ports;
};
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
F function_;
//////////////////////////////////////////////////////////////////
// Helpers for actually calling the work lambda
//////////////////////////////////////////////////////////////////
void execute_function(invocation_memory *invocation_memory, invocation_info invocation_info);
template<int N, typename ...OT>
struct propagate_output;
template<int ...IS, int ...OS>
void execute_function_internal(input_tuple &inputs, sequence<IS...>,
output_tuple &outputs, sequence<OS...>,
invocation_info invocation_info);
};
}
}
}
#include "function_node_impl.h"
#endif //PLS_DATAFLOW_INTERNAL_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_
#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_
#include "graph.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<typename ...OS, typename FUNC>
function_node<inputs<O0, O...>, outputs<OS...>, FUNC>&
function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
operator>>(function_node<inputs<O0, O...>, outputs<OS...>, FUNC> &other_node) {
out_port_ >> other_node.in_ports();
return other_node;
}
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<typename ...IS>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
operator>>(graph<inputs<IS...>, outputs<O0, O...>> &graph) {
out_port_ >> graph.output_ports();
}
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<int POS, typename T>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
token_pushed(token<T> token) {
auto current_memory = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(current_memory->input_buffer_) = token;
auto remaining_inputs = --(current_memory->inputs_missing_);
if (remaining_inputs == 0) {
execute_function(current_memory, token.invocation());
current_memory->inputs_missing_ = num_in_ports;
}
}
//////////////////////////////////////////////////////////////////
// Helpers for actually calling the work lambda
//////////////////////////////////////////////////////////////////
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
execute_function(invocation_memory *invocation_memory, invocation_info invocation_info) {
auto lambda = [&]() {
input_tuple &inputs = invocation_memory->input_buffer_;
output_tuple outputs{};
execute_function_internal(inputs, typename sequence_gen<1 + sizeof...(I)>::type(),
outputs, typename sequence_gen<1 + sizeof...(O)>::type(), invocation_info);
};
// TODO: maybe replace this with 'continuation' style invocation
pls::scheduler::spawn_child_and_wait<pls::lambda_task_by_reference<decltype(lambda)>>(lambda);
}
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<int N, typename ...OT>
struct function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
propagate_output {
propagate_output(multi_out_port_type &, output_tuple &, invocation_info&) {}
void propagate() {}
};
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<int N, typename OT1, typename ...OT>
struct function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
propagate_output<N, OT1, OT...> {
multi_out_port_type &out_port_;
output_tuple &output_tuple_;
invocation_info &invocation_info_;
propagate_output(multi_out_port_type &out_port, output_tuple &output_tuple, invocation_info& invocation_info) :
out_port_{out_port}, output_tuple_{output_tuple}, invocation_info_{invocation_info} {}
void propagate() {
std::get<N>(output_tuple_).set_invocation(invocation_info_);
out_port_.template get<N>().push_token(std::get<N>(output_tuple_));
propagate_output<N + 1, OT...>{out_port_, output_tuple_, invocation_info_}.propagate();
}
};
template<typename I0, typename ...I, typename O0, typename ...O, typename F>
template<int ...IS, int ...OS>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
execute_function_internal(input_tuple &inputs, sequence<IS...>,
output_tuple &outputs, sequence<OS...>,
invocation_info invocation_info) {
function_(std::get<IS>(inputs).value()..., std::get<OS>(outputs).value()...);
propagate_output<0, O0, O...>{out_port_, outputs, invocation_info}.propagate();
}
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_
#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_H_
#define PLS_DATAFLOW_INTERNAL_GRAPH_H_
#include <atomic>
#include <tuple>
#include <iostream>
#include "build_state.h"
#include "node.h"
#include "function_node.h"
#include "in_port.h"
#include "out_port.h"
#include "inputs.h"
#include "outputs.h"
#include "pls/pls.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename INS, typename OUTS>
class graph {};
template<typename I0, typename ...I, typename O0, typename ...O>
class graph<inputs<I0, I...>, outputs<O0, O...>> : public node {
template<typename INS, typename OUTS, typename FUNC>
friend
class function_node;
private:
// Our own type
using self_type = graph<inputs<I0, I...>, outputs<O0, O...>>;
// Input-Output port types (internal)
using inputs_type = multi_out_port<I0, I...>;
using outputs_type = multi_in_port<self_type, 0, O0, O...>;
// Input-Output value tuples
using value_input_tuple = std::tuple<I0, I...>;
using input_tuple = std::tuple<token<I0>, token<I>...>;
using value_output_tuple = std::tuple<O0, O...>;
using output_tuple = std::tuple<token<O0>, token<O>...>;
static constexpr int num_in_ports = std::tuple_size<input_tuple>();
static constexpr int num_out_ports = std::tuple_size<output_tuple>();
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
value_output_tuple *output_buffer_;
};
public:
template<int POS>
using input_at = typename inputs_type::template out_port_type_at<POS>;
template<int POS>
using output_at = typename outputs_type::template in_port_type_at<POS>;
template<int POS>
input_at<POS> &input() {
return inputs_.template get<POS>();
}
inputs_type &input_ports() {
return inputs_;
}
template<int POS>
output_at<POS> &output() {
return outputs_.template get<POS>();
}
outputs_type &output_ports() {
return outputs_;
}
template<typename ...OS, typename FUNC>
function_node<inputs<I0, I...>, outputs<OS...>, FUNC>
&operator>>(function_node<inputs<I0, I...>, outputs<OS...>, FUNC> &other_node);
void wait_for_all() {
pls::scheduler::wait_for_all();
}
template<int POS, typename T>
void token_pushed(token<T> token);
graph() : inputs_{}, outputs_{this, this} {}
void build();
void run(value_input_tuple input, value_output_tuple &output) {
PLS_ASSERT(build_state_ == build_state::built, "Must build graph before running it!")
pls::scheduler::spawn_child<run_graph_task>(this, input, &output);
}
int num_successors() const override {
return 0;
}
node *successor_at(int pos) const override {
PLS_ERROR("A graph instance has no direct successor!")
}
bool is_fully_connected() const override {
return inputs_.fully_connected() && outputs_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
};
private:
inputs_type inputs_;
outputs_type outputs_;
// Information about building the graph
node *node_list_start_{nullptr};
node *node_list_current_{nullptr};
int num_nodes_{0};
// Internals required for building and running
void build_recursive(node *node);
void add_node(node *new_node);
template<int N, typename ...IT>
struct feed_inputs;
class run_graph_task;
};
}
}
}
#include "graph_impl.h"
#endif //PLS_DATAFLOW_INTERNAL_GRAPH_H_
#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_
#define PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_
namespace pls {
namespace dataflow {
namespace internal {
template<typename I0, typename ...I, typename O0, typename ...O>
template<typename ...OS, typename FUNC>
function_node<inputs<I0, I...>, outputs<OS...>, FUNC> &graph<inputs<I0, I...>, outputs<O0, O...>>::
operator>>(function_node<inputs<I0, I...>, outputs<OS...>, FUNC> &other_node) {
inputs_ >> other_node.in_ports();
return other_node;
}
template<typename I0, typename ...I, typename O0, typename ...O>
template<int POS, typename T>
void graph<inputs<I0, I...>, outputs<O0, O...>>::
token_pushed(token<T> token) {
auto invocation = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(*invocation->output_buffer_) = token.value();
}
template<typename I0, typename ...I, typename O0, typename ...O>
void graph<inputs<I0, I...>, outputs<O0, O...>>::
build() {
PLS_ASSERT(build_state_ == build_state::fresh, "Must only build a dataflow graph once!")
PLS_ASSERT(is_fully_connected(), "Must fully connect all inputs/outputs inside a dataflow graph!")
build_state_ = build_state::building;
node_list_start_ = node_list_current_ = this;
memory_index_ = 0;
num_nodes_ = 1;
for (int i = 0; i < num_in_ports; i++) {
build_recursive(inputs_.next_node_at(i));
}
build_state_ = build_state::built;
}
template<typename I0, typename ...I, typename O0, typename ...O>
void graph<inputs<I0, I...>, outputs<O0, O...>>::
build_recursive(node *node) {
if (node->build_state_ != build_state::fresh) {
return; // Already visited
}
node->build_state_ = build_state::building;
PLS_ASSERT(node->is_fully_connected(), "Must fully connect dataflow graph nodes!")
add_node(node);
for (int i = 0; i < node->num_successors(); i++) {
build_recursive(node->successor_at(i));
}
node->build_state_ = build_state::built;
}
template<typename I0, typename ...I, typename O0, typename ...O>
void graph<inputs<I0, I...>, outputs<O0, O...>>::
add_node(node *new_node) {
new_node->memory_index_ = num_nodes_++;
if (node_list_current_ == nullptr) {
node_list_current_ = new_node;
node_list_start_ = new_node;
} else {
node_list_current_->direct_successor_ = new_node;
node_list_current_ = new_node;
}
}
template<typename I0, typename ...I, typename O0, typename ...O>
template<int N, typename ...IT>
struct graph<inputs<I0, I...>, outputs<O0, O...>>::
feed_inputs {
feed_inputs(inputs_type &, value_input_tuple &, invocation_info &) {}
void run() {}
};
template<typename I0, typename ...I, typename O0, typename ...O>
template<int N, typename IT1, typename ...IT>
struct graph<inputs<I0, I...>, outputs<O0, O...>>::
feed_inputs<N, IT1, IT...> {
inputs_type &inputs_;
value_input_tuple &input_values_;
invocation_info &invocation_;
feed_inputs(inputs_type &inputs,
value_input_tuple &input_values,
invocation_info &invocation) : inputs_{inputs},
input_values_{input_values},
invocation_{invocation} {}
void run() {
inputs_.template get<N>().push_token(token<IT1>{std::get<N>(input_values_), invocation_});
feed_inputs<N + 1, IT...>{inputs_, input_values_, invocation_}.run();
}
};
template<typename I0, typename ...I, typename O0, typename ...O>
class graph<inputs<I0, I...>, outputs<O0, O...>>::run_graph_task : public pls::task {
graph *self_;
value_input_tuple input_;
value_output_tuple *output_;
// Buffers for actual execution
invocation_info invocation_;
public:
run_graph_task(graph *self, value_input_tuple &input, value_output_tuple *output) : self_{self},
input_{input},
output_{output},
invocation_{nullptr} {
void **buffers;
buffers = reinterpret_cast<void **>(allocate_memory(self_->num_nodes_ * sizeof(void *)));
node *iterator = self_->node_list_start_;
for (int i = 0; i < self_->num_nodes_; i++) {
auto required_size = iterator->instance_buffer_size();
buffers[i] = allocate_memory(required_size);
iterator->init_instance_buffer(buffers[i]);
iterator = iterator->direct_successor_;
}
invocation_ = invocation_info{buffers};
self_->get_invocation<invocation_memory>(invocation_)->output_buffer_ = output;
}
void execute_internal() override {
feed_inputs<0, I0, I...>{self_->inputs_, input_, invocation_}.run();
}
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_
#ifndef PLS_DATAFLOW_INTERNAL_INPUT_H_
#define PLS_DATAFLOW_INTERNAL_INPUT_H_
#include "pls/internal/base/error_handling.h"
#include "token.h"
#include "node.h"
namespace pls {
namespace dataflow {
namespace internal {
/**
* Represents a single, logical input port (no data store, simply signal propagation).
* @tparam T Type of the input port
*/
template<typename T>
class in_port {
template<typename OT>
friend
class out_port;
public:
explicit in_port(node *owning_node) : owning_node_{owning_node} {};
node *owning_node() const { return owning_node_; }
bool is_connected() const { return connected_; }
protected:
virtual void token_pushed(token<T> token) = 0;
private:
bool connected_{false};
node *const owning_node_;
void push_token(token<T> token) {
token_pushed(token);
}
void connect() {
if (connected_) {
PLS_ERROR("Must only connect on input once. Disconnect the output pointing to it before reconnecting.")
}
connected_ = true;
}
};
/**
* Represents multiple input ports bundled together (a tuple of inputs).
* Allows for a unified callback method to handle multiple typed inputs.
*
* template<int POS, typename T>
* void token_pushed(token<T> token) { Notified when tokens arrive }
*
* @tparam CB The class implementing a callback
* @tparam N Put 0 to start the recursive implementation
* @tparam I A variadic list of input types
*/
template<typename CB, int N, typename ...I>
class multi_in_port {
// end of template recursion
public:
explicit multi_in_port(node *, CB *) {};
bool fully_connected() const {
return true;
}
};
template<typename CB, int N, typename I0, typename ...I>
class multi_in_port<CB, N, I0, I...> : public in_port<I0> {
public:
// Helpers for managing recursive types
using my_type = multi_in_port<CB, N, I0, I...>;
using child_type = multi_in_port<CB, N + 1, I...>;
using value_type = I0;
explicit multi_in_port(node *owning_node, CB *cb) : in_port<I0>{owning_node}, cb_{cb}, rec_{owning_node, cb} {};
void token_pushed(token<I0> token) override {
cb_->template token_pushed<N, I0>(token);
}
// Helper struct required for recursive access to types by index
template<int POS, typename ...T>
struct type_at {
};
template<typename T0, typename ...T>
struct type_at<0, T0, T...> {
using type = T0;
using in_port_type = in_port<type>;
};
template<int POS, typename T0, typename ...T>
struct type_at<POS, T0, T...> {
using type = typename type_at<POS - 1, T...>::type;
using in_port_type = in_port<type>;
};
// Simple interface to get types by index
template<int POS>
using in_port_type_at = typename type_at<POS, I0, I...>::in_port_type;
template<int POS>
using value_type_at = typename type_at<POS, I0, I...>::type;
// Helper struct required for recursive access to input's by index
template<int POS, typename RES_T, typename MY_T>
struct get_at {
static RES_T &get(MY_T &self) {
return get_at<POS - 1, RES_T, typename MY_T::child_type>::get(self.rec_);
}
};
template<typename RESULT, typename CURRENT>
struct get_at<0, RESULT, CURRENT> {
static RESULT &get(CURRENT &self) {
return self;
}
};
// Simple interface to access input's by index
template<int POS>
in_port_type_at<POS> &get() {
return get_at<POS, in_port_type_at<POS>, my_type>::get(*this);
}
bool fully_connected() const {
return this->is_connected() && rec_.fully_connected();
}
child_type rec_;
CB *cb_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_INPUT_H_
#ifndef PLS_DATAFLOW_INTERNAL_INPUTS_H_
#define PLS_DATAFLOW_INTERNAL_INPUTS_H_
namespace pls {
namespace dataflow {
namespace internal {
template<typename I1, typename ...I>
struct inputs {
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_INPUTS_H_
#ifndef PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_
#define PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I>
class merge_node : public node {
// Our own type
using self_type = merge_node<I>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I, I, bool>;
using multi_out_port_type = multi_out_port<I>;
// Input-Output tuples
using input_tuple = std::tuple<token < I>, token<I>, token<bool>>;
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
std::atomic<unsigned int> inputs_missing_;
input_tuple input_buffer_;
};
// Encodings for current input state (needs to fit into an atomic)
static constexpr unsigned int IN_0_MISSING = 1u << 0u;
static constexpr unsigned int IN_1_MISSING = 1u << 1u;
static constexpr unsigned int COND_MISSING = 1u << 2u;
static constexpr unsigned int INITIAL_STATE = IN_0_MISSING + IN_1_MISSING + COND_MISSING;
public:
explicit merge_node() : in_port_{this, this}, out_port_{} {}
in_port<I> &true_in_port() {
return in_port_.template get<0>();
}
in_port<I> &false_in_port() {
return in_port_.template get<1>();
}
in_port<bool> &condition_in_port() {
return in_port_.template get<2>();
}
out_port<I> &value_out_port() {
return out_port_.template get<0>();
}
template<int POS, typename T>
void token_pushed(token<T> token) {
auto current_memory = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(current_memory->input_buffer_) = token;
unsigned int remaining_inputs;
if (POS == 0) {
remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_0_MISSING) - IN_0_MISSING;
} else if (POS == 1) {
remaining_inputs = (current_memory->inputs_missing_).fetch_sub(IN_1_MISSING) - IN_1_MISSING;
} else {
remaining_inputs = (current_memory->inputs_missing_).fetch_sub(COND_MISSING) - COND_MISSING;
}
if ((remaining_inputs & COND_MISSING) == 0) {
if ((remaining_inputs & IN_0_MISSING) == 0) {
auto &data = std::get<0>(current_memory->input_buffer_);
value_out_port().push_token(data);
current_memory->inputs_missing_ += IN_0_MISSING + COND_MISSING;
} else if ((remaining_inputs & IN_1_MISSING) == 0) {
auto &data = std::get<1>(current_memory->input_buffer_);
value_out_port().push_token(data);
current_memory->inputs_missing_ += IN_1_MISSING + COND_MISSING;
}
}
}
int num_successors() const override {
return 1;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = INITIAL_STATE;
};
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_MERGE_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_NODE_H_
#define PLS_DATAFLOW_INTERNAL_NODE_H_
#include "build_state.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
/**
* Represents a single piece included in a dataflow graph.
* This can be anything connected to form the dataflow that requires
* memory during invocation and is present in the connected flow graph.
*/
class node {
template<typename INS, typename OUTS>
friend
class graph;
public:
virtual int num_successors() const = 0;
virtual node *successor_at(int pos) const = 0;
virtual int instance_buffer_size() const = 0;
virtual void init_instance_buffer(void *memory) const = 0;
virtual bool is_fully_connected() const = 0;
template<typename M>
M *get_invocation(invocation_info invocation) {
return invocation.template get_instance_buffer<M>(memory_index_);
}
private:
int memory_index_{0};
node *direct_successor_{nullptr};
build_state build_state_{build_state::fresh};
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#define PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#include <tuple>
#include "in_port.h"
#include "node.h"
namespace pls {
namespace dataflow {
namespace internal {
template<class T>
class out_port {
public:
void connect(in_port<T> &target) {
if (connected_) {
PLS_ERROR("Must only connect output once. Please disconnect it before reconnecting.")
}
target.connect();
target_ = &target;
connected_ = true;
}
void operator>>(in_port<T> &input) {
connect(input);
}
void push_token(token<T> token) const {
target_->push_token(token);
}
node *next_node() const {
return target_->owning_node();
}
bool is_connected() const { return connected_; }
private:
bool connected_{false};
in_port<T> *target_{nullptr};
};
/**
* Represents multiple output ports bundled together (a tuple of inputs).
*
* @tparam I A variadic list of input types
*/
template<typename O0, typename ...O>
class multi_out_port {
private:
// Helpers for managing recursive types
using value_tuple_type = std::tuple<O0, O...>;
using out_port_tupel_type = std::tuple<out_port<O0>, out_port<O>...>;
static constexpr int num_outputs = std::tuple_size<value_tuple_type>();
public:
// Simple interface to get types by index
template<int POS>
using out_port_type_at = typename std::tuple_element<POS, out_port_tupel_type>::type;
template<int POS>
using value_type_at = typename std::tuple_element<POS, value_tuple_type>::type;
// Simple interface to access input's by index
template<int POS>
out_port_type_at<POS> &get() {
return std::get<POS>(outputs_);
}
// Simple interface to connect multiple intputs to matching, multiple outputs
template<typename CB>
void operator>>(multi_in_port<CB, 0, O0, O...> &input) {
connect_to < CB, 0 > {this, &input}.connect();
}
node *next_node_at(int pos) const {
return next_node < 0 > {&outputs_}.get(pos);
}
bool fully_connected() const {
return connected < 0 > {&outputs_}.get();
}
private:
out_port_tupel_type outputs_;
template<int POS, typename DUMMY=void>
struct next_node {
const out_port_tupel_type *outputs_;
node *get(int i) {
if (POS == i) {
return std::get<POS>(*outputs_).next_node();
} else {
return next_node<POS + 1>{outputs_}.get(i);
}
}
};
template<typename DUMMY>
struct next_node<num_outputs, DUMMY> {
const out_port_tupel_type *outputs_;
node *get(int i) {
PLS_ERROR("Try to access invalid successor node index!")
}
};
template<int POS, typename DUMMY=void>
struct connected {
const out_port_tupel_type *outputs_;
bool get() {
bool self = std::get<POS>(*outputs_).is_connected();
bool children = connected<POS + 1>{outputs_}.get();
return self && children;
}
};
template<typename DUMMY>
struct connected<num_outputs, DUMMY> {
const out_port_tupel_type *outputs_;
bool get() {
return true;
}
};
template<typename CB, int POS, typename DUMMY=void>
struct connect_to {
multi_out_port<O0, O...> *out_port_;
multi_in_port<CB, 0, O0, O...> *in_port_;
void connect() {
out_port_->template get<POS>() >> in_port_->template get<POS>();
connect_to<CB, POS + 1>{out_port_, in_port_}.connect();
}
};
template<typename CB, typename DUMMY>
struct connect_to<CB, num_outputs, DUMMY> {
multi_out_port<O0, O...> *out_port_;
multi_in_port<CB, 0, O0, O...> *in_port_;
void connect() {
};
};
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_OUTPUT_H_
#ifndef PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
#define PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
namespace pls {
namespace dataflow {
namespace internal {
template<typename O0, typename ...O>
struct outputs {
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_OUTPUTS_H_
#ifndef PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_
#define PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I>
class split_node : public node {
// Our own type
using self_type = split_node<I>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I>;
using multi_out_port_type = multi_out_port<I, I>;
public:
explicit split_node() : in_port_{this, this}, out_port_{} {}
in_port<I> &value_in_port() {
return in_port_.template get<0>();
}
out_port<I> &out_port_1() {
return out_port_.template get<0>();
}
out_port<I> &out_port_2() {
return out_port_.template get<1>();
}
template<int POS, typename T>
void token_pushed(token<T> token) {
out_port_1().push_token(token);
out_port_2().push_token(token);
}
int num_successors() const override {
return 2;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return 0;
}
void init_instance_buffer(void *memory) const override {
// No need for memory, we simply forward
};
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_SPLIT_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_
#define PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_
#include <atomic>
#include "in_port.h"
#include "out_port.h"
#include "node.h"
#include "token.h"
namespace pls {
namespace dataflow {
namespace internal {
template<typename I>
class switch_node : public node {
// Our own type
using self_type = switch_node<I>;
// Input-Output port types
using multi_in_port_type = multi_in_port<self_type, 0, I, bool>;
using multi_out_port_type = multi_out_port<I, I>;
// Input-Output tuples
using input_tuple = std::tuple<token < I>, token<bool>>;
// Memory used for ONE active invocation of the dataflow-graph
struct invocation_memory {
std::atomic<int> inputs_missing_;
input_tuple input_buffer_;
};
public:
explicit switch_node() : in_port_{this, this}, out_port_{} {}
in_port<I> &value_in_port() {
return in_port_.template get<0>();
}
in_port<bool> &condition_in_port() {
return in_port_.template get<1>();
}
out_port<I> &true_out_port() {
return out_port_.template get<0>();
}
out_port<I> &false_out_port() {
return out_port_.template get<1>();
}
template<int POS, typename T>
void token_pushed(token<T> token) {
auto current_memory = get_invocation<invocation_memory>(token.invocation());
std::get<POS>(current_memory->input_buffer_) = token;
auto remaining_inputs = --(current_memory->inputs_missing_);
if (remaining_inputs == 0) {
bool condition = std::get<1>(current_memory->input_buffer_).value();
auto &data = std::get<0>(current_memory->input_buffer_);
if (condition) {
true_out_port().push_token(data);
} else {
false_out_port().push_token(data);
}
current_memory->inputs_missing_ = 2;
}
}
int num_successors() const override {
return 2;
}
node *successor_at(int pos) const override {
return out_port_.next_node_at(pos);
}
bool is_fully_connected() const override {
return in_port_.fully_connected() && out_port_.fully_connected();
}
int instance_buffer_size() const override {
return sizeof(invocation_memory);
}
void init_instance_buffer(void *memory) const override {
auto invocation = new(memory) invocation_memory{};
invocation->inputs_missing_ = 2;
};
private:
multi_in_port_type in_port_;
multi_out_port_type out_port_;
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_SWITCH_NODE_H_
#ifndef PLS_DATAFLOW_INTERNAL_TOKEN_H_
#define PLS_DATAFLOW_INTERNAL_TOKEN_H_
namespace pls {
namespace dataflow {
namespace internal {
class invocation_info {
public:
explicit invocation_info(void **memory) : memory_{memory} {};
invocation_info(invocation_info const &other) = default;
template<typename T>
T *get_instance_buffer(int pos) {
return reinterpret_cast<T *>(memory_[pos]);
}
private:
void **memory_;
};
template<typename T>
class token {
T value_;
invocation_info invocation_;
public:
token() : invocation_{nullptr} {}; // Default Constructor Stays uninitialized
token(T value, invocation_info color) : value_{value}, invocation_{color} {};
T &value() { return value_; }
invocation_info invocation() const { return invocation_; }
void set_invocation(invocation_info invocation_info) { invocation_ = invocation_info; }
};
}
}
}
#endif //PLS_DATAFLOW_INTERNAL_TOKEN_H_
...@@ -2,7 +2,8 @@ ...@@ -2,7 +2,8 @@
#ifndef PLS_ERROR_HANDLING_H #ifndef PLS_ERROR_HANDLING_H
#define PLS_ERROR_HANDLING_H #define PLS_ERROR_HANDLING_H
#include <iostream> #include <cstdio>
#include <cstdlib>
/** /**
* Called when there is an non-recoverable error/invariant in the scheduler. * Called when there is an non-recoverable error/invariant in the scheduler.
...@@ -10,7 +11,7 @@ ...@@ -10,7 +11,7 @@
* The implementation can be changed if for example no iostream is available on a system * The implementation can be changed if for example no iostream is available on a system
* (or its inclusion adds too much overhead). * (or its inclusion adds too much overhead).
*/ */
#define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1); #define PLS_ERROR(msg) printf("%s\n", msg); exit(1);
#define PLS_ASSERT(cond, msg) if (!cond) { PLS_ERROR(msg) } #define PLS_ASSERT(cond, msg) if (!(cond)) { PLS_ERROR(msg) }
#endif //PLS_ERROR_HANDLING_H #endif //PLS_ERROR_HANDLING_H
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
#ifndef PLS_ALIGNED_STACK_IMPL_H #ifndef PLS_ALIGNED_STACK_IMPL_H
#define PLS_ALIGNED_STACK_IMPL_H #define PLS_ALIGNED_STACK_IMPL_H
#include <utility>
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
......
...@@ -25,7 +25,7 @@ class work_stealing_deque_item { ...@@ -25,7 +25,7 @@ class work_stealing_deque_item {
// as the race occurs in 'pop_head', where ALL CASES reading a corrupt/old value are cases // as the race occurs in 'pop_head', where ALL CASES reading a corrupt/old value are cases
// where the next CAS fails anywas, thus making these corrupted values have no influence on // where the next CAS fails anywas, thus making these corrupted values have no influence on
// the overall program execution. // the overall program execution.
// ==> If we find performance problems in this queue, try removing the atoimcs again. // ==> If we find performance problems in this queue, try removing the atomics again.
// Pointer to the actual data // Pointer to the actual data
std::atomic<pointer_t> data_; std::atomic<pointer_t> data_;
// Index (relative to stack base) to the next and previous element // Index (relative to stack base) to the next and previous element
......
...@@ -2,6 +2,9 @@ ...@@ -2,6 +2,9 @@
#ifndef PLS_WORK_STEALING_DEQUE_IMPL_H_ #ifndef PLS_WORK_STEALING_DEQUE_IMPL_H_
#define PLS_WORK_STEALING_DEQUE_IMPL_H_ #define PLS_WORK_STEALING_DEQUE_IMPL_H_
#include <utility>
#include <new>
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
......
#ifndef PLS_INTERNAL_HELPERS_MEMBER_FUNCTION_H_
#define PLS_INTERNAL_HELPERS_MEMBER_FUNCTION_H_
namespace pls {
namespace internal {
namespace helpers {
template<class C, typename R, typename ...ARGS>
class member_function {
public:
using type = member_function<C, R, ARGS...>;
member_function(C *object, R (C::*function_pointer)(ARGS...)) : object_{object},
function_pointer_{function_pointer} {}
R operator()(ARGS... args) {
((*object_).*function_pointer_)(args...);
}
private:
C *object_;
R (C::*function_pointer_)(ARGS...);
};
template<typename C, typename R, typename ...ARGS>
static constexpr member_function<C, R, ARGS...> bind(C *object, R (C::*function_pointer)(ARGS...)) {
return {object, function_pointer};
}
}
}
}
#endif //PLS_INTERNAL_HELPERS_MEMBER_FUNCTION_H_
#ifndef PLS_INTERNAL_HELPERS_SEQENCE_H_
#define PLS_INTERNAL_HELPERS_SEQENCE_H_
// See: https://stackoverflow.com/questions/7858817/unpacking-a-tuple-to-call-a-matching-function-pointer
// Would be easy in C++ 14 (has index sequences), but seems to be the only way to do it in C++ 11
namespace pls {
namespace internal {
namespace helpers {
template<int ...>
struct sequence {};
template<int N, int ...S>
struct sequence_gen : sequence_gen<N - 1, N - 1, S...> {};
template<int ...S>
struct sequence_gen<0, S...> {
typedef sequence<S...> type;
};
}
}
}
#endif //PLS_INTERNAL_HELPERS_SEQENCE_H_
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
#include "pls/internal/helpers/member_function.h"
namespace pls { namespace pls {
...@@ -16,6 +17,8 @@ using internal::scheduling::malloc_scheduler_memory; ...@@ -16,6 +17,8 @@ using internal::scheduling::malloc_scheduler_memory;
using internal::scheduling::scheduler; using internal::scheduling::scheduler;
using unique_id = internal::helpers::unique_id; using unique_id = internal::helpers::unique_id;
template<class C, typename R, typename ...ARGS>
using member_function = internal::helpers::member_function<C, R, ARGS...>;
using internal::scheduling::task; using internal::scheduling::task;
using internal::scheduling::lambda_task_by_reference; using internal::scheduling::lambda_task_by_reference;
......
...@@ -2,5 +2,6 @@ add_executable(tests ...@@ -2,5 +2,6 @@ add_executable(tests
main.cpp main.cpp
data_structures_test.cpp data_structures_test.cpp
scheduling_tests.cpp scheduling_tests.cpp
algorithm_test.cpp) algorithm_test.cpp
dataflow_test.cpp)
target_link_libraries(tests catch2 pls) target_link_libraries(tests catch2 pls)
#include <catch.hpp> #include <catch.hpp>
#include <array> #include <array>
#include <pls/pls.h> #include "pls/pls.h"
using namespace pls; using namespace pls;
......
#include <catch.hpp> #include <catch.hpp>
#include <pls/internal/base/thread.h> #include "pls/internal/base/thread.h"
#include <pls/internal/base/spin_lock.h> #include "pls/internal/base/spin_lock.h"
#include <pls/internal/base/system_details.h> #include "pls/internal/base/system_details.h"
#include <vector> #include <vector>
#include <mutex> #include <mutex>
......
#include <catch.hpp> #include <catch.hpp>
#include <pls/internal/base/system_details.h> #include "pls/internal/base/system_details.h"
#include <pls/internal/data_structures/aligned_stack.h> #include "pls/internal/data_structures/aligned_stack.h"
#include <pls/internal/data_structures/locking_deque.h> #include "pls/internal/data_structures/locking_deque.h"
#include <pls/internal/data_structures/work_stealing_deque.h> #include "pls/internal/data_structures/work_stealing_deque.h"
#include <vector>
#include <mutex> #include <mutex>
using namespace pls::internal::data_structures; using namespace pls::internal::data_structures;
......
#include <catch.hpp>
#include <array>
#include <tuple>
#include "pls/pls.h"
#include "pls/dataflow/dataflow.h"
using namespace pls;
using namespace pls::dataflow;
void step_1(const int &in, int &out) {
out = in * 2;
}
class member_call_test {
public:
void step_2(const int &in, int &out) {
out = in * 2;
}
};
TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") {
malloc_scheduler_memory my_scheduler_memory{8, 2u << 12u};
scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([]() {
SECTION("linear pipelines") {
auto step_1 = [](const int &in, double &out1, double &out2) {
out1 = (double) in / 2.0;
out2 = (double) in / 3.0;
};
auto step_2 = [](const double &in1, const double &in2, double &out) {
out = in1 * in2;
};
graph<inputs<int>, outputs<double>> linear_graph;
function_node<inputs<int>, outputs<double, double>, decltype(step_1)> node_1{step_1};
function_node<inputs<double, double>, outputs<double>, decltype(step_2)> node_2{step_2};
linear_graph >> node_1 >> node_2 >> linear_graph;
linear_graph.build();
std::tuple<double> out{};
linear_graph.run(5, out);
linear_graph.wait_for_all();
REQUIRE(std::get<0>(out) == (5 / 2.0) * (5 / 3.0));
}
SECTION("member and function steps") {
member_call_test instance;
using member_func_type = member_function<member_call_test, void, const int &, int &>;
member_func_type func_1{&instance, &member_call_test::step_2};
graph<inputs<int>, outputs<int>> graph;
function_node<inputs<int>, outputs<int>, void (*)(const int &, int &)> node_1{&step_1};
function_node<inputs<int>, outputs<int>, member_func_type> node_2{func_1};
graph >> node_1 >> node_2 >> graph;
graph.build();
std::tuple<int> out{};
graph.run(1, out);
graph.wait_for_all();
REQUIRE(std::get<0>(out) == 4);
}
SECTION("non linear pipeline") {
auto path_one = [](const int &in, int &out) {
out = in + 1;
};
auto path_two = [](const int &in, int &out) {
out = in - 1;
};
graph<inputs<int, bool>, outputs<int>> graph;
function_node<inputs<int>, outputs<int>, decltype(path_one)> node_1{path_one};
function_node<inputs<int>, outputs<int>, decltype(path_two)> node_2{path_two};
switch_node<int> switch_node;
merge_node<int> merge_node;
split_node<bool> split;
// Split up boolean signal
graph.input<1>() >> split.value_in_port();
// Feed switch
graph.input<0>() >> switch_node.value_in_port();
split.out_port_1() >> switch_node.condition_in_port();
// True path
switch_node.true_out_port() >> node_1.in_port<0>();
node_1.out_port<0>() >> merge_node.true_in_port();
// False path
switch_node.false_out_port() >> node_2.in_port<0>();
node_2.out_port<0>() >> merge_node.false_in_port();
// Read Merge
split.out_port_2() >> merge_node.condition_in_port();
merge_node.value_out_port() >> graph.output<0>();
// Build and run
graph.build();
std::tuple<int> out1{}, out2{};
graph.run({0, true}, out1);
graph.run({0, false}, out2);
graph.wait_for_all();
REQUIRE(std::get<0>(out1) == 1);
REQUIRE(std::get<0>(out2) == -1);
}
});
}
#include <catch.hpp> #include <catch.hpp>
#include <pls/pls.h> #include "pls/pls.h"
using namespace pls; using namespace pls;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment