diff --git a/CMakeLists.txt b/CMakeLists.txt index 2b4d1da..98cb69c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,6 +37,7 @@ add_subdirectory(app/benchmark_fft) add_subdirectory(app/benchmark_unbalanced) add_subdirectory(app/benchmark_matrix) add_subdirectory(app/benchmark_prefix) +add_subdirectory(app/benchmark_pipeline) # Add optional tests option(PACKAGE_TESTS "Build the tests" ON) diff --git a/app/benchmark_pipeline/CMakeLists.txt b/app/benchmark_pipeline/CMakeLists.txt new file mode 100644 index 0000000..d531b74 --- /dev/null +++ b/app/benchmark_pipeline/CMakeLists.txt @@ -0,0 +1,5 @@ +add_executable(benchmark_pipeline main.cpp) +target_link_libraries(benchmark_pipeline pls) +if (EASY_PROFILER) + target_link_libraries(benchmark_pipeline easy_profiler) +endif () diff --git a/app/benchmark_pipeline/main.cpp b/app/benchmark_pipeline/main.cpp new file mode 100644 index 0000000..6752d17 --- /dev/null +++ b/app/benchmark_pipeline/main.cpp @@ -0,0 +1,148 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +static constexpr int INPUT_SIZE = 8192; +typedef std::vector> complex_vector; + +using namespace pls::dataflow; + +void divide(complex_vector::iterator data, int n) { + complex_vector tmp_odd_elements(n / 2); + for (int i = 0; i < n / 2; i++) { + tmp_odd_elements[i] = data[i * 2 + 1]; + } + for (int i = 0; i < n / 2; i++) { + data[i] = data[i * 2]; + } + for (int i = 0; i < n / 2; i++) { + data[i + n / 2] = tmp_odd_elements[i]; + } +} + +void combine(complex_vector::iterator data, int n) { + for (int i = 0; i < n / 2; i++) { + std::complex even = data[i]; + std::complex odd = data[i + n / 2]; + + // w is the "twiddle-factor". + // this could be cached, but we run the same 'data_structures' algorithm parallel/serial, + // so it won't impact the performance comparison. + std::complex w = exp(std::complex(0, -2. * M_PI * i / n)); + + data[i] = even + w * odd; + data[i + n / 2] = even - w * odd; + } +} + +void fft(complex_vector::iterator data, int n) { + if (n < 2) { + return; + } + + divide(data, n); + fft(data, n / 2); + fft(data + n / 2, n / 2); + combine(data, n); +} + +complex_vector prepare_input(int input_size) { + std::vector known_frequencies{2, 11, 52, 88, 256}; + complex_vector data(input_size); + + // Set our input data to match a time series of the known_frequencies. + // When applying fft to this time-series we should find these frequencies. + for (int i = 0; i < input_size; i++) { + data[i] = std::complex(0.0, 0.0); + for (auto frequencie : known_frequencies) { + data[i] += sin(2 * M_PI * frequencie * i / input_size); + } + } + + return data; +} + +int main() { + PROFILE_ENABLE + pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18u}; + pls::scheduler scheduler{&my_scheduler_memory, 4}; + + graph, outputs> graph; + std::atomic count{0}; + auto lambda = [&](const int &in, int &out) { + PROFILE_WORK_BLOCK("Work Lambda") + auto tmp = in; + out = tmp; + complex_vector input = prepare_input(INPUT_SIZE); + fft(input.begin(), input.size()); + count++; + }; + function_node, outputs, decltype(lambda)> step_1{lambda}; + function_node, outputs, decltype(lambda)> step_2{lambda}; + function_node, outputs, decltype(lambda)> step_3{lambda}; + function_node, outputs, decltype(lambda)> step_4{lambda}; + + graph >> step_1 >> step_2 >> step_3 >> step_4 >> graph; + graph.build(); + + const int num_elements = 10; + std::vector> results(num_elements); + + pls::internal::helpers::run_mini_benchmark([&] { + PROFILE_WORK_BLOCK("Top Level") + for (int j = 0; j < num_elements; j++) { + graph.run(std::tuple{j}, &results[j]); + } + pls::scheduler::wait_for_all(); + }, 8, 1000); + + PROFILE_SAVE("test_profile.prof") +} + +//int main() { +// PROFILE_ENABLE +// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18u}; +// pls::scheduler scheduler{&my_scheduler_memory, 4}; +// +// graph, outputs> graph; +// std::atomic count{0}; +// auto lambda = [&](const int &in, int &out) { +// PROFILE_WORK_BLOCK("Work Lambda") +// out = in; +// complex_vector input = prepare_input(INPUT_SIZE); +// fft(input.begin(), input.size()); +// count++; +// }; +// function_node, outputs, decltype(lambda)> step_1{lambda}; +// function_node, outputs, decltype(lambda)> step_2{lambda}; +// function_node, outputs, decltype(lambda)> step_3{lambda}; +// function_node, outputs, decltype(lambda)> step_4{lambda}; +// +// graph >> step_1 >> step_2 >> step_3 >> step_4 >> graph; +// graph.build(); +// +// const int num_elements = 10; +// std::vector> results(num_elements); +// +// scheduler.perform_work([&] { +// PROFILE_MAIN_THREAD +// for (int i = 0; i < 10; i++) { +// PROFILE_WORK_BLOCK("Top Level") +// for (int j = 0; j < num_elements; j++) { +// graph.run(std::tuple{j}, &results[j]); +// } +// pls::scheduler::wait_for_all(); +// } +// }); +// +// std::cout << count << std::endl; +// +// PROFILE_SAVE("test_profile.prof") +//} diff --git a/app/benchmark_prefix/main.cpp b/app/benchmark_prefix/main.cpp index d7e3bdb..a7cd7be 100644 --- a/app/benchmark_prefix/main.cpp +++ b/app/benchmark_prefix/main.cpp @@ -6,7 +6,7 @@ #include #include -static constexpr int INPUT_SIZE = 100; +static constexpr int INPUT_SIZE = 10e7; int main() { PROFILE_ENABLE diff --git a/app/invoke_parallel/main.cpp b/app/invoke_parallel/main.cpp index 903f704..be158cb 100644 --- a/app/invoke_parallel/main.cpp +++ b/app/invoke_parallel/main.cpp @@ -55,8 +55,8 @@ void fft(complex_vector::iterator data, int n) { fft(data + n / 2, n / 2); } else { pls::invoke( - [&] { fft(data, n / 2); }, - [&] { fft(data + n / 2, n / 2); } + [n, &data] { fft(data, n / 2); }, + [n, &data] { fft(data + n / 2, n / 2); } ); } PROFILE_END_BLOCK diff --git a/app/playground/main.cpp b/app/playground/main.cpp index eb9384b..03b3bbf 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -1,96 +1,26 @@ // Headers are available because we added the pls target -#include -#include -#include -#include +const long NUM_THREADS = 8; +const long MEMORY_PER_THREAD = 2u << 12u; #include "pls/pls.h" -#include "pls/dataflow/internal/inputs.h" -#include "pls/dataflow/internal/outputs.h" -#include "pls/dataflow/internal/function_node.h" -#include "pls/dataflow/internal/graph.h" -#include "pls/dataflow/internal/switch_node.h" -#include "pls/dataflow/internal/split_node.h" -#include "pls/dataflow/internal/merge_node.h" -int main() { - using namespace pls::dataflow; - using namespace pls::dataflow::internal; - - // Define - graph, outputs> graph; - - auto triple = [](const int &i1, int &o1) { - o1 = i1 * 3; - }; - function_node, outputs, decltype(triple)> triple_node{triple}; - - auto minus_one = [](const int &i1, int &o1) { - o1 = i1 - 1; - }; - function_node, outputs, decltype(minus_one)> minus_one_node_1{minus_one}; - function_node, outputs, decltype(minus_one)> minus_one_node_2{minus_one}; - - auto is_positive = [](const int &i1, bool &o1) { - o1 = i1 > 0; - }; - function_node, outputs, decltype(is_positive)> is_positive_node{is_positive}; - - auto recursion = [&](const int &i1, const int &i2, int &o1) { - std::tuple out; - graph.run({i1, i2}, out); - pls::scheduler::wait_for_all(); - o1 = std::get<0>(out); - }; - function_node, outputs, decltype(recursion)> recursion_node{recursion}; - - split_node minus_split; - split_node decision_split; - switch_node recursion_split; - merge_node recursion_merge; +pls::static_scheduler_memory memory; - // Connect - minus_one_node_1 >> minus_one_node_2; - - // Inputs to first processing step - graph.input<0>() >> minus_one_node_1.in_port<0>(); - graph.input<1>() >> triple_node.in_port<0>(); - minus_one_node_2.out_port<0>() >> minus_split.value_in_port(); - - // Prepare decision... - minus_split.out_port_1() >> is_positive_node.in_port<0>(); - is_positive_node.out_port<0>() >> decision_split.value_in_port(); - triple_node.out_port<0>() >> recursion_split.value_in_port(); - decision_split.out_port_1() >> recursion_split.condition_in_port(); - - // Connect true case (recursion) - minus_split.out_port_2() >> recursion_node.in_port<0>(); - recursion_split.true_out_port() >> recursion_node.in_port<1>(); - recursion_node.out_port<0>() >> recursion_merge.true_in_port(); - - // Connect false case (no recursion) - recursion_split.false_out_port() >> recursion_merge.false_in_port(); - - // Deliver final result (back from merge) - decision_split.out_port_2() >> recursion_merge.condition_in_port(); - recursion_merge.value_out_port() >> graph.output<0>(); +int main() { + pls::scheduler scheduler{&memory, NUM_THREADS}; - // Build - graph.build(); + scheduler.perform_work([]() { + auto lambda = []() { + // Do work + }; + using lambda_task = pls::lambda_task_by_value; - pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18u}; - pls::scheduler scheduler{&my_scheduler_memory, 8}; - scheduler.perform_work([&] { - // Schedule Execution - std::tuple out1, out2, out3; - graph.run({1, 2}, out1); - graph.run({1, 1}, out2); - graph.run({5, 6}, out3); + pls::scheduler::spawn_child(lambda); + pls::scheduler::spawn_child(lambda); - // Wait for results and print pls::scheduler::wait_for_all(); - std::cout << std::get<0>(out1) << std::endl; - std::cout << std::get<0>(out2) << std::endl; - std::cout << std::get<0>(out3) << std::endl; }); + + scheduler.terminate(); + return 0; } diff --git a/lib/pls/include/pls/dataflow/internal/function_node_impl.h b/lib/pls/include/pls/dataflow/internal/function_node_impl.h index c5e058e..e2e80d8 100644 --- a/lib/pls/include/pls/dataflow/internal/function_node_impl.h +++ b/lib/pls/include/pls/dataflow/internal/function_node_impl.h @@ -3,6 +3,7 @@ #define PLS_DATAFLOW_INTERNAL_FUNCTION_NODE_IMPL_H_ #include "graph.h" +#include "pls/internal/helpers/profiler.h" namespace pls { namespace dataflow { @@ -44,15 +45,17 @@ token_pushed(token token) { template void function_node, outputs, F>:: execute_function(invocation_memory *invocation_memory, invocation_info invocation_info) { - auto lambda = [&]() { + auto lambda = [=]() { + PROFILE_WORK_BLOCK("Function Node") input_tuple &inputs = invocation_memory->input_buffer_; output_tuple outputs{}; execute_function_internal(inputs, typename sequence_gen<1 + sizeof...(I)>::type(), outputs, typename sequence_gen<1 + sizeof...(O)>::type(), invocation_info); + PROFILE_END_BLOCK }; // TODO: maybe replace this with 'continuation' style invocation - pls::scheduler::spawn_child_and_wait>(lambda); + pls::scheduler::spawn_child>(lambda); } template diff --git a/lib/pls/include/pls/dataflow/internal/graph.h b/lib/pls/include/pls/dataflow/internal/graph.h index 67d2a95..fa984dc 100644 --- a/lib/pls/include/pls/dataflow/internal/graph.h +++ b/lib/pls/include/pls/dataflow/internal/graph.h @@ -90,9 +90,12 @@ class graph, outputs> : public node { void build(); - void run(value_input_tuple input, value_output_tuple &output) { + void run(value_input_tuple input, value_output_tuple *output) { PLS_ASSERT(build_state_ == build_state::built, "Must build graph before running it!") - pls::scheduler::spawn_child(this, input, &output); + const auto lambda = [=]() { + pls::scheduler::spawn_child(this, input, output); + }; + pls::scheduler::spawn_child>(lambda); } int num_successors() const override { diff --git a/lib/pls/include/pls/dataflow/internal/graph_impl.h b/lib/pls/include/pls/dataflow/internal/graph_impl.h index e675752..377673f 100644 --- a/lib/pls/include/pls/dataflow/internal/graph_impl.h +++ b/lib/pls/include/pls/dataflow/internal/graph_impl.h @@ -2,6 +2,8 @@ #ifndef PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_ #define PLS_DATAFLOW_INTERNAL_GRAPH_IMPL_H_ +#include "pls/internal/helpers/profiler.h" + namespace pls { namespace dataflow { namespace internal { @@ -107,7 +109,7 @@ class graph, outputs>::run_graph_task : public pls::t invocation_info invocation_; public: - run_graph_task(graph *self, value_input_tuple &input, value_output_tuple *output) : self_{self}, + run_graph_task(graph *self, value_input_tuple input, value_output_tuple *output) : self_{self}, input_{input}, output_{output}, invocation_{nullptr} { @@ -128,7 +130,9 @@ class graph, outputs>::run_graph_task : public pls::t } void execute_internal() override { + PROFILE_WORK_BLOCK("Graph Invocation") feed_inputs<0, I0, I...>{self_->inputs_, input_, invocation_}.run(); + PROFILE_END_BLOCK } }; diff --git a/test/dataflow_test.cpp b/test/dataflow_test.cpp index 114c93d..58e26e9 100644 --- a/test/dataflow_test.cpp +++ b/test/dataflow_test.cpp @@ -40,7 +40,7 @@ TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") { linear_graph.build(); std::tuple out{}; - linear_graph.run(5, out); + linear_graph.run(5, &out); linear_graph.wait_for_all(); REQUIRE(std::get<0>(out) == (5 / 2.0) * (5 / 3.0)); @@ -59,7 +59,7 @@ TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") { graph.build(); std::tuple out{}; - graph.run(1, out); + graph.run(1, &out); graph.wait_for_all(); REQUIRE(std::get<0>(out) == 4); @@ -102,8 +102,8 @@ TEST_CASE("dataflow functions correctly", "[dataflow/dataflow.h]") { // Build and run graph.build(); std::tuple out1{}, out2{}; - graph.run({0, true}, out1); - graph.run({0, false}, out2); + graph.run({0, true}, &out1); + graph.run({0, false}, &out2); graph.wait_for_all(); REQUIRE(std::get<0>(out1) == 1);