Commit eca0dd4d by FritzFlorian

Rework spawn ordering/waiting on pipeline tasks

parent 722ddf41
Pipeline #1324 passed with stages
in 4 minutes 51 seconds
...@@ -37,6 +37,7 @@ add_subdirectory(app/benchmark_fft) ...@@ -37,6 +37,7 @@ add_subdirectory(app/benchmark_fft)
add_subdirectory(app/benchmark_unbalanced) add_subdirectory(app/benchmark_unbalanced)
add_subdirectory(app/benchmark_matrix) add_subdirectory(app/benchmark_matrix)
add_subdirectory(app/benchmark_prefix) add_subdirectory(app/benchmark_prefix)
add_subdirectory(app/benchmark_pipeline)
# Add optional tests # Add optional tests
option(PACKAGE_TESTS "Build the tests" ON) option(PACKAGE_TESTS "Build the tests" ON)
......
add_executable(benchmark_pipeline main.cpp)
target_link_libraries(benchmark_pipeline pls)
if (EASY_PROFILER)
target_link_libraries(benchmark_pipeline easy_profiler)
endif ()
#include <pls/pls.h>
#include <pls/dataflow/dataflow.h>
#include <pls/internal/helpers/profiler.h>
#include <pls/internal/helpers/mini_benchmark.h>
#include <iostream>
#include <complex>
#include <vector>
#include <tuple>
#include <atomic>
static constexpr int INPUT_SIZE = 8192;
typedef std::vector<std::complex<double>> complex_vector;
using namespace pls::dataflow;
void divide(complex_vector::iterator data, int n) {
complex_vector tmp_odd_elements(n / 2);
for (int i = 0; i < n / 2; i++) {
tmp_odd_elements[i] = data[i * 2 + 1];
}
for (int i = 0; i < n / 2; i++) {
data[i] = data[i * 2];
}
for (int i = 0; i < n / 2; i++) {
data[i + n / 2] = tmp_odd_elements[i];
}
}
void combine(complex_vector::iterator data, int n) {
for (int i = 0; i < n / 2; i++) {
std::complex<double> even = data[i];
std::complex<double> odd = data[i + n / 2];
// w is the "twiddle-factor".
// this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
// so it won't impact the performance comparison.
std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n));
data[i] = even + w * odd;
data[i + n / 2] = even - w * odd;
}
}
void fft(complex_vector::iterator data, int n) {
if (n < 2) {
return;
}
divide(data, n);
fft(data, n / 2);
fft(data + n / 2, n / 2);
combine(data, n);
}
complex_vector prepare_input(int input_size) {
std::vector<double> known_frequencies{2, 11, 52, 88, 256};
complex_vector data(input_size);
// Set our input data to match a time series of the known_frequencies.
// When applying fft to this time-series we should find these frequencies.
for (int i = 0; i < input_size; i++) {
data[i] = std::complex<double>(0.0, 0.0);
for (auto frequencie : known_frequencies) {
data[i] += sin(2 * M_PI * frequencie * i / input_size);
}
}
return data;
}
int main() {
PROFILE_ENABLE
pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18u};
pls::scheduler scheduler{&my_scheduler_memory, 4};
graph<inputs<int>, outputs<int>> graph;
std::atomic<int> count{0};
auto lambda = [&](const int &in, int &out) {
PROFILE_WORK_BLOCK("Work Lambda")
auto tmp = in;
out = tmp;
complex_vector input = prepare_input(INPUT_SIZE);
fft(input.begin(), input.size());
count++;
};
function_node<inputs<int>, outputs<int>, decltype(lambda)> step_1{lambda};
function_node<inputs<int>, outputs<int>, decltype(lambda)> step_2{lambda};
function_node<inputs<int>, outputs<int>, decltype(lambda)> step_3{lambda};
function_node<inputs<int>, outputs<int>, decltype(lambda)> step_4{lambda};
graph >> step_1 >> step_2 >> step_3 >> step_4 >> graph;
graph.build();
const int num_elements = 10;
std::vector<std::tuple<int>> results(num_elements);
pls::internal::helpers::run_mini_benchmark([&] {
PROFILE_WORK_BLOCK("Top Level")
for (int j = 0; j < num_elements; j++) {
graph.run(std::tuple<int>{j}, &results[j]);
}
pls::scheduler::wait_for_all();
}, 8, 1000);
PROFILE_SAVE("test_profile.prof")
}
//int main() {
// PROFILE_ENABLE
// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18u};
// pls::scheduler scheduler{&my_scheduler_memory, 4};
//
// graph<inputs<int>, outputs<int>> graph;
// std::atomic<int> count{0};
// auto lambda = [&](const int &in, int &out) {
// PROFILE_WORK_BLOCK("Work Lambda")
// out = in;
// complex_vector input = prepare_input(INPUT_SIZE);
// fft(input.begin(), input.size());
// count++;
// };
// function_node<inputs<int>, outputs<int>, decltype(lambda)> step_1{lambda};
// function_node<inputs<int>, outputs<int>, decltype(lambda)> step_2{lambda};
// function_node<inputs<int>, outputs<int>, decltype(lambda)> step_3{lambda};
// function_node<inputs<int>, outputs<int>, decltype(lambda)> step_4{lambda};
//
// graph >> step_1 >> step_2 >> step_3 >> step_4 >> graph;
// graph.build();
//
// const int num_elements = 10;
// std::vector<std::tuple<int>> results(num_elements);
//
// scheduler.perform_work([&] {
// PROFILE_MAIN_THREAD
// for (int i = 0; i < 10; i++) {
// PROFILE_WORK_BLOCK("Top Level")
// for (int j = 0; j < num_elements; j++) {
// graph.run(std::tuple<int>{j}, &results[j]);
// }
// pls::scheduler::wait_for_all();
// }
// });
//
// std::cout << count << std::endl;
//
// PROFILE_SAVE("test_profile.prof")
//}
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
#include <vector> #include <vector>
#include <functional> #include <functional>
static constexpr int INPUT_SIZE = 100; static constexpr int INPUT_SIZE = 10e7;
int main() { int main() {
PROFILE_ENABLE PROFILE_ENABLE
......
...@@ -55,8 +55,8 @@ void fft(complex_vector::iterator data, int n) { ...@@ -55,8 +55,8 @@ void fft(complex_vector::iterator data, int n) {
fft(data + n / 2, n / 2); fft(data + n / 2, n / 2);
} else { } else {
pls::invoke( pls::invoke(
[&] { fft(data, n / 2); }, [n, &data] { fft(data, n / 2); },
[&] { fft(data + n / 2, n / 2); } [n, &data] { fft(data + n / 2, n / 2); }
); );
} }
PROFILE_END_BLOCK PROFILE_END_BLOCK
......
// Headers are available because we added the pls target // Headers are available because we added the pls target
#include <string> const long NUM_THREADS = 8;
#include <cstdio> const long MEMORY_PER_THREAD = 2u << 12u;
#include <tuple>
#include <array>
#include "pls/pls.h" #include "pls/pls.h"
#include "pls/dataflow/internal/inputs.h"
#include "pls/dataflow/internal/outputs.h"
#include "pls/dataflow/internal/function_node.h"
#include "pls/dataflow/internal/graph.h"
#include "pls/dataflow/internal/switch_node.h"
#include "pls/dataflow/internal/split_node.h"
#include "pls/dataflow/internal/merge_node.h"
int main() { pls::static_scheduler_memory<NUM_THREADS, MEMORY_PER_THREAD> memory;
using namespace pls::dataflow;
using namespace pls::dataflow::internal;
// Define
graph<inputs<int, int>, outputs<int>> graph;
auto triple = [](const int &i1, int &o1) { int main() {
o1 = i1 * 3; pls::scheduler scheduler{&memory, NUM_THREADS};
};
function_node<inputs<int>, outputs<int>, decltype(triple)> triple_node{triple};
auto minus_one = [](const int &i1, int &o1) {
o1 = i1 - 1;
};
function_node<inputs<int>, outputs<int>, decltype(minus_one)> minus_one_node_1{minus_one};
function_node<inputs<int>, outputs<int>, decltype(minus_one)> minus_one_node_2{minus_one};
auto is_positive = [](const int &i1, bool &o1) {
o1 = i1 > 0;
};
function_node<inputs<int>, outputs<bool>, decltype(is_positive)> is_positive_node{is_positive};
auto recursion = [&](const int &i1, const int &i2, int &o1) { scheduler.perform_work([]() {
std::tuple<int> out; auto lambda = []() {
graph.run({i1, i2}, out); // Do work
pls::scheduler::wait_for_all();
o1 = std::get<0>(out);
}; };
function_node<inputs<int, int>, outputs<int>, decltype(recursion)> recursion_node{recursion}; using lambda_task = pls::lambda_task_by_value<decltype(lambda)>;
split_node<int> minus_split; pls::scheduler::spawn_child<lambda_task>(lambda);
split_node<bool> decision_split; pls::scheduler::spawn_child<lambda_task>(lambda);
switch_node<int> recursion_split;
merge_node<int> recursion_merge;
// Connect
minus_one_node_1 >> minus_one_node_2;
// Inputs to first processing step
graph.input<0>() >> minus_one_node_1.in_port<0>();
graph.input<1>() >> triple_node.in_port<0>();
minus_one_node_2.out_port<0>() >> minus_split.value_in_port();
// Prepare decision...
minus_split.out_port_1() >> is_positive_node.in_port<0>();
is_positive_node.out_port<0>() >> decision_split.value_in_port();
triple_node.out_port<0>() >> recursion_split.value_in_port();
decision_split.out_port_1() >> recursion_split.condition_in_port();
// Connect true case (recursion)
minus_split.out_port_2() >> recursion_node.in_port<0>();
recursion_split.true_out_port() >> recursion_node.in_port<1>();
recursion_node.out_port<0>() >> recursion_merge.true_in_port();
// Connect false case (no recursion)
recursion_split.false_out_port() >> recursion_merge.false_in_port();
// Deliver final result (back from merge)
decision_split.out_port_2() >> recursion_merge.condition_in_port();
recursion_merge.value_out_port() >> graph.output<0>();
// Build
graph.build();
pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18u};
pls::scheduler scheduler{&my_scheduler_memory, 8};
scheduler.perform_work([&] {
// Schedule Execution
std::tuple<int> out1, out2, out3;
graph.run({1, 2}, out1);
graph.run({1, 1}, out2);
graph.run({5, 6}, out3);
// Wait for results and print
pls::scheduler::wait_for_all(); pls::scheduler::wait_for_all();
std::cout << std::get<0>(out1) << std::endl;
std::cout << std::get<0>(out2) << std::endl;
std::cout << std::get<0>(out3) << std::endl;
}); });
scheduler.terminate();
return 0;
} }
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_ #define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_
#include "graph.h" #include "graph.h"
#include "pls/internal/helpers/profiler.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
...@@ -44,15 +45,17 @@ token_pushed(token<T> token) { ...@@ -44,15 +45,17 @@ token_pushed(token<T> token) {
template<typename I0, typename ...I, typename O0, typename ...O, typename F> template<typename I0, typename ...I, typename O0, typename ...O, typename F>
void function_node<inputs<I0, I...>, outputs<O0, O...>, F>:: void function_node<inputs<I0, I...>, outputs<O0, O...>, F>::
execute_function(invocation_memory *invocation_memory, invocation_info invocation_info) { execute_function(invocation_memory *invocation_memory, invocation_info invocation_info) {
auto lambda = [&]() { auto lambda = [=]() {
PROFILE_WORK_BLOCK("Function Node")
input_tuple &inputs = invocation_memory->input_buffer_; input_tuple &inputs = invocation_memory->input_buffer_;
output_tuple outputs{}; output_tuple outputs{};
execute_function_internal(inputs, typename sequence_gen<1 + sizeof...(I)>::type(), execute_function_internal(inputs, typename sequence_gen<1 + sizeof...(I)>::type(),
outputs, typename sequence_gen<1 + sizeof...(O)>::type(), invocation_info); outputs, typename sequence_gen<1 + sizeof...(O)>::type(), invocation_info);
PROFILE_END_BLOCK
}; };
// TODO: maybe replace this with 'continuation' style invocation // TODO: maybe replace this with 'continuation' style invocation
pls::scheduler::spawn_child_and_wait<pls::lambda_task_by_reference<decltype(lambda)>>(lambda); pls::scheduler::spawn_child<pls::lambda_task_by_value<decltype(lambda)>>(lambda);
} }
template<typename I0, typename ...I, typename O0, typename ...O, typename F> template<typename I0, typename ...I, typename O0, typename ...O, typename F>
......
...@@ -90,9 +90,12 @@ class graph<inputs<I0, I...>, outputs<O0, O...>> : public node { ...@@ -90,9 +90,12 @@ class graph<inputs<I0, I...>, outputs<O0, O...>> : public node {
void build(); void build();
void run(value_input_tuple input, value_output_tuple &output) { void run(value_input_tuple input, value_output_tuple *output) {
PLS_ASSERT(build_state_ == build_state::built, "Must build graph before running it!") PLS_ASSERT(build_state_ == build_state::built, "Must build graph before running it!")
pls::scheduler::spawn_child<run_graph_task>(this, input, &output); const auto lambda = [=]() {
pls::scheduler::spawn_child<run_graph_task>(this, input, output);
};
pls::scheduler::spawn_child<lambda_task_by_value<decltype(lambda)>>(lambda);
} }
int num_successors() const override { int num_successors() const override {
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
#ifndef PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_ #ifndef PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_
#define PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_ #define PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_
#include "pls/internal/helpers/profiler.h"
namespace pls { namespace pls {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
...@@ -107,7 +109,7 @@ class graph<inputs<I0, I...>, outputs<O0, O...>>::run_graph_task : public pls::t ...@@ -107,7 +109,7 @@ class graph<inputs<I0, I...>, outputs<O0, O...>>::run_graph_task : public pls::t
invocation_info invocation_; invocation_info invocation_;
public: public:
run_graph_task(graph *self, value_input_tuple &input, value_output_tuple *output) : self_{self}, run_graph_task(graph *self, value_input_tuple input, value_output_tuple *output) : self_{self},
input_{input}, input_{input},
output_{output}, output_{output},
invocation_{nullptr} { invocation_{nullptr} {
...@@ -128,7 +130,9 @@ class graph<inputs<I0, I...>, outputs<O0, O...>>::run_graph_task : public pls::t ...@@ -128,7 +130,9 @@ class graph<inputs<I0, I...>, outputs<O0, O...>>::run_graph_task : public pls::t
} }
void execute_internal() override { void execute_internal() override {
PROFILE_WORK_BLOCK("Graph Invocation")
feed_inputs<0, I0, I...>{self_->inputs_, input_, invocation_}.run(); feed_inputs<0, I0, I...>{self_->inputs_, input_, invocation_}.run();
PROFILE_END_BLOCK
} }
}; };
......
...@@ -40,7 +40,7 @@ TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") { ...@@ -40,7 +40,7 @@ TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") {
linear_graph.build(); linear_graph.build();
std::tuple<double> out{}; std::tuple<double> out{};
linear_graph.run(5, out); linear_graph.run(5, &out);
linear_graph.wait_for_all(); linear_graph.wait_for_all();
REQUIRE(std::get<0>(out) == (5 / 2.0) * (5 / 3.0)); REQUIRE(std::get<0>(out) == (5 / 2.0) * (5 / 3.0));
...@@ -59,7 +59,7 @@ TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") { ...@@ -59,7 +59,7 @@ TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") {
graph.build(); graph.build();
std::tuple<int> out{}; std::tuple<int> out{};
graph.run(1, out); graph.run(1, &out);
graph.wait_for_all(); graph.wait_for_all();
REQUIRE(std::get<0>(out) == 4); REQUIRE(std::get<0>(out) == 4);
...@@ -102,8 +102,8 @@ TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") { ...@@ -102,8 +102,8 @@ TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") {
// Build and run // Build and run
graph.build(); graph.build();
std::tuple<int> out1{}, out2{}; std::tuple<int> out1{}, out2{};
graph.run({0, true}, out1); graph.run({0, true}, &out1);
graph.run({0, false}, out2); graph.run({0, false}, &out2);
graph.wait_for_all(); graph.wait_for_all();
REQUIRE(std::get<0>(out1) == 1); REQUIRE(std::get<0>(out1) == 1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment