From e2de954a6e30587ede65b73ae29e7b2343ffad07 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Mon, 30 Mar 2020 12:10:12 +0200 Subject: [PATCH] Re-add parallel invoke pattern. --- app/benchmark_fib/main.cpp | 3 +-- lib/pls/include/pls/algorithms/invoke.h | 9 +++------ lib/pls/include/pls/algorithms/invoke_impl.h | 26 ++++++++------------------ lib/pls/include/pls/algorithms/scan.h | 15 --------------- lib/pls/include/pls/algorithms/scan_impl.h | 115 ------------------------------------------------------------------------------------------------------------------- lib/pls/include/pls/internal/scheduling/task_manager.h | 2 +- lib/pls/include/pls/pls.h | 23 ++++++++++++----------- lib/pls/src/internal/scheduling/task_manager.cpp | 2 +- test/CMakeLists.txt | 1 + test/base_tests.cpp | 35 ++++++++++++++++++++++++++++++++++- test/patterns_test.cpp | 28 ++++++++++++++++++++++++++++ test/scheduling_tests.cpp | 2 +- 12 files changed, 90 insertions(+), 171 deletions(-) delete mode 100644 lib/pls/include/pls/algorithms/scan.h delete mode 100644 lib/pls/include/pls/algorithms/scan_impl.h create mode 100644 test/patterns_test.cpp diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 4ad8b45..b304bb6 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -3,7 +3,6 @@ using namespace pls::internal::scheduling; #include -#include #include "benchmark_runner.h" #include "benchmark_base/fib.h" @@ -31,7 +30,7 @@ int pls_fib(int n) { } constexpr int MAX_NUM_TASKS = 32; -constexpr int MAX_STACK_SIZE = 1024 * 4; +constexpr int MAX_STACK_SIZE = 1024 * 32; int main(int argc, char **argv) { int num_threads; diff --git a/lib/pls/include/pls/algorithms/invoke.h b/lib/pls/include/pls/algorithms/invoke.h index 3197a97..a2829e7 100644 --- a/lib/pls/include/pls/algorithms/invoke.h +++ b/lib/pls/include/pls/algorithms/invoke.h @@ -2,22 +2,19 @@ #ifndef PLS_PARALLEL_INVOKE_H #define PLS_PARALLEL_INVOKE_H -#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/scheduler.h" -namespace pls { -namespace algorithm { +namespace pls::algorithm { template -void invoke(const Function1 &function1, const Function2 &function2); +void invoke(const Function1 &&function1, const Function2 &&function2); template -void invoke(const Function1 &function1, const Function2 &function2, const Function3 &function3); +void invoke(const Function1 &&function1, const Function2 &&function2, const Function3 &&function3); // ...and so on, add more if we decide to keep this design } -} #include "invoke_impl.h" #endif //PLS_PARALLEL_INVOKE_H diff --git a/lib/pls/include/pls/algorithms/invoke_impl.h b/lib/pls/include/pls/algorithms/invoke_impl.h index fe64cd7..83c4126 100644 --- a/lib/pls/include/pls/algorithms/invoke_impl.h +++ b/lib/pls/include/pls/algorithms/invoke_impl.h @@ -2,39 +2,29 @@ #ifndef PLS_INVOKE_PARALLEL_IMPL_H #define PLS_INVOKE_PARALLEL_IMPL_H -#include "pls/internal/scheduling/task.h" -#include "pls/internal/scheduling/lambda_task.h" #include "pls/internal/scheduling/scheduler.h" -#include "pls/internal/scheduling/thread_state.h" -namespace pls { -namespace algorithm { +namespace pls::algorithm { template void invoke(Function1 &&function1, Function2 &&function2) { using namespace ::pls::internal::scheduling; - using task_1_t = lambda_task_by_value; - using task_2_t = lambda_task_by_value; - - scheduler::spawn_child(std::forward(function2)); - scheduler::spawn_child_and_wait(std::forward(function1)); + scheduler::spawn(std::forward(function1)); + scheduler::spawn(std::forward(function2)); + scheduler::sync(); } template void invoke(Function1 &&function1, Function2 &&function2, Function3 &&function3) { using namespace ::pls::internal::scheduling; - using task_1_t = lambda_task_by_value; - using task_2_t = lambda_task_by_value; - using task_3_t = lambda_task_by_value; - - scheduler::spawn_child(std::forward(function3)); - scheduler::spawn_child(std::forward(function2)); - scheduler::spawn_child_and_wait(std::forward(function1)); + scheduler::spawn(std::forward(function1)); + scheduler::spawn(std::forward(function2)); + scheduler::spawn(std::forward(function3)); + scheduler::sync(); } } -} #endif //PLS_INVOKE_PARALLEL_IMPL_H diff --git a/lib/pls/include/pls/algorithms/scan.h b/lib/pls/include/pls/algorithms/scan.h deleted file mode 100644 index 1db358b..0000000 --- a/lib/pls/include/pls/algorithms/scan.h +++ /dev/null @@ -1,15 +0,0 @@ - -#ifndef PLS_PARALLEL_SCAN_H_ -#define PLS_PARALLEL_SCAN_H_ - -namespace pls { -namespace algorithm { - -template -void scan(InIter in_start, const InIter in_end, OutIter out, BinaryOp op, Type neutral_elem); - -} -} -#include "scan_impl.h" - -#endif //PLS_PARALLEL_SCAN_H_ diff --git a/lib/pls/include/pls/algorithms/scan_impl.h b/lib/pls/include/pls/algorithms/scan_impl.h deleted file mode 100644 index 1af0418..0000000 --- a/lib/pls/include/pls/algorithms/scan_impl.h +++ /dev/null @@ -1,115 +0,0 @@ - -#ifndef PLS_PARALLEL_SCAN_IMPL_H_ -#define PLS_PARALLEL_SCAN_IMPL_H_ - -#include -#include - -#include "pls/pls.h" -#include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/scheduling/task.h" - -namespace pls { -namespace algorithm { -namespace internal { - -using namespace pls::internal::scheduling; - -template -void serial_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element) { - - auto current_output = output; - auto last_value = neutral_element; - - for (auto current_input = input_start; current_input != input_end; current_input++) { - last_value = op(last_value, *current_input); - *current_output = last_value; - current_output++; - } -} - -template -class scan_task : public pls::internal::scheduling::task { - const InIter in_start_; - const InIter in_end_; - const OutIter out_; - const BinaryOp op_; - const Type neutral_elem_; - - long size_, chunks_; - long items_per_chunk_; - Type *chunk_sums_; - - public: - scan_task(const InIter in_start, const InIter in_end, const OutIter out, const BinaryOp op, const Type neutral_elem) : - in_start_{in_start}, - in_end_{in_end}, - out_{out}, - op_{op}, - neutral_elem_{neutral_elem} { - constexpr auto chunks_per_thread = 1; - - size_ = std::distance(in_start, in_end); - auto num_threads = thread_state::get()->scheduler_->num_threads(); - chunks_ = num_threads * chunks_per_thread + 1; - items_per_chunk_ = size_ / chunks_ + 1; - - chunk_sums_ = reinterpret_cast(allocate_memory(sizeof(Type) * chunks_ - 1)); - }; - - void execute_internal() override { - // First Pass = calculate each chunks individual prefix sum - for_each_range(0, chunks_ - 1, [&](int i) { - auto chunk_start = in_start_ + items_per_chunk_ * i; - auto chunk_end = std::min(in_end_, chunk_start + items_per_chunk_); - auto chunk_size = std::distance(chunk_start, chunk_end); - auto chunk_output = out_ + items_per_chunk_ * i; - - internal::serial_scan(chunk_start, chunk_end, chunk_output, op_, neutral_elem_); - auto last_chunk_value = *(chunk_output + chunk_size - 1); - chunk_sums_[i] = last_chunk_value; - }, fixed_strategy{1}); - - // Calculate prefix sums of each chunks sum - // (effectively the prefix sum at the end of each chunk, then used to correct the following chunk). - internal::serial_scan(chunk_sums_, chunk_sums_ + chunks_ - 1, chunk_sums_, op_, neutral_elem_); - - // Second Pass = Use results from first pass to correct each chunks sum - auto output_start = out_; - auto output_end = out_ + size_; - for_each_range(1, chunks_, [&](int i) { - if (i == chunks_ - 1) { - auto chunk_start = in_start_ + items_per_chunk_ * i; - auto chunk_end = std::min(in_end_, chunk_start + items_per_chunk_); - auto chunk_output = output_start + items_per_chunk_ * i; - - *chunk_start += chunk_sums_[i - 1]; - internal::serial_scan(chunk_start, chunk_end, chunk_output, op_, neutral_elem_); - } else { - auto chunk_start = output_start + items_per_chunk_ * i; - auto chunk_end = std::min(output_end, chunk_start + items_per_chunk_); - for (; chunk_start != chunk_end; chunk_start++) { - *chunk_start = op_(*chunk_start, chunk_sums_[i - 1]); - } - } - }, fixed_strategy{1}); - - wait_for_all(); - this->~scan_task(); - } -}; - -} - -template -void scan(InIter in_start, const InIter in_end, OutIter out, BinaryOp op, Type neutral_elem) { - using namespace pls::internal::scheduling; - - using scan_task_type = internal::scan_task; - scheduler::spawn_child_and_wait(in_start, in_end, out, op, neutral_elem); -} - -} -} - -#endif //PLS_PARALLEL_SCAN_IMPL_H_ diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 55eec55..7ec2637 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -26,7 +26,7 @@ class task_manager { explicit task_manager(unsigned thread_id, size_t num_tasks, size_t stack_size, - std::shared_ptr stack_allocator); + std::shared_ptr &stack_allocator); ~task_manager(); void push_resource_on_task(task *target_task, task *spare_task_chain); diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index 95c1ac1..b766dfe 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -1,32 +1,33 @@ #ifndef PLS_LIBRARY_H #define PLS_LIBRARY_H +#include + #include "pls/algorithms/invoke.h" #include "pls/algorithms/for_each.h" -#include "pls/algorithms/scan.h" -#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/scheduler.h" -#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/member_function.h" namespace pls { -using internal::scheduling::static_scheduler_memory; +// 'basic' for-join APIs using internal::scheduling::scheduler; +template +void spawn(Function &&function) { + scheduler::spawn(std::forward(function)); +} +void sync() { + scheduler::sync(); +} -using unique_id = internal::helpers::unique_id; +// general helpers that can be handy when using PLS template using member_function = internal::helpers::member_function; -using internal::scheduling::task; -using internal::scheduling::lambda_task_by_reference; -using internal::scheduling::lambda_task_by_value; -using internal::scheduling::task; - +// parallel patterns API using algorithm::invoke; using algorithm::for_each; using algorithm::for_each_range; -using algorithm::scan; } #endif diff --git a/lib/pls/src/internal/scheduling/task_manager.cpp b/lib/pls/src/internal/scheduling/task_manager.cpp index b6c40c5..8a2aa7d 100644 --- a/lib/pls/src/internal/scheduling/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/task_manager.cpp @@ -9,7 +9,7 @@ namespace pls::internal::scheduling { task_manager::task_manager(unsigned thread_id, size_t num_tasks, size_t stack_size, - std::shared_ptr stack_allocator) : stack_allocator_{stack_allocator}, + std::shared_ptr &stack_allocator) : stack_allocator_{stack_allocator}, tasks_{}, deque_{thread_id, num_tasks} { tasks_.reserve(num_tasks); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ff36afe..9f4e844 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -3,5 +3,6 @@ add_executable(tests data_structures_test.cpp base_tests.cpp scheduling_tests.cpp + patterns_test.cpp test_helpers.h) target_link_libraries(tests catch2 pls) diff --git a/test/base_tests.cpp b/test/base_tests.cpp index 99feac7..e7e3593 100644 --- a/test/base_tests.cpp +++ b/test/base_tests.cpp @@ -2,6 +2,7 @@ #include "pls/internal/base/spin_lock.h" #include "pls/internal/base/system_details.h" +#include "pls/internal/base/alignment.h" #include "pls/internal/base/stack_allocator.h" #include "test_helpers.h" @@ -14,6 +15,38 @@ using namespace pls::internal::base; int base_tests_shared_counter; +TEST_CASE("align helpers", "[internal/base/alignment.h") { + system_details::pointer_t aligned_64 = 64; + system_details::pointer_t aligned_32 = 32; + system_details::pointer_t not_aligned_64 = 70; + system_details::pointer_t not_aligned_32 = 60; + + REQUIRE(alignment::next_alignment(aligned_64, 64) == 64); + REQUIRE(alignment::next_alignment(aligned_32, 32) == 32); + REQUIRE(alignment::next_alignment(aligned_32, 64) == 64); + + REQUIRE(alignment::previous_alignment(not_aligned_64, 64) == 64); + REQUIRE(alignment::next_alignment(not_aligned_64, 64) == 128); + REQUIRE(alignment::previous_alignment(not_aligned_32, 32) == 32); + REQUIRE(alignment::next_alignment(not_aligned_32, 32) == 64); +} + +TEST_CASE("alignment wrapper", "[internal/base/alignment.h") { + char filler1 = '\0'; + alignment::alignment_wrapper int_256{256}; + int filler2 = 0; + alignment::alignment_wrapper int_1024{1024}; + + (void) filler1; + (void) filler2; + + REQUIRE(int_256.object() == 256); + REQUIRE((system_details::pointer_t) int_256.pointer() % 256 == 0); + + REQUIRE(int_1024.object() == 1024); + REQUIRE((system_details::pointer_t) int_1024.pointer() % 1024 == 0); +} + TEST_CASE("mmap stack allocator", "[internal/base/stack_allocator.h") { mmap_stack_allocator stack_allocator; @@ -57,7 +90,7 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi }}; std::thread t2{[&]() { barrier--; - while(barrier != 0); + while (barrier != 0); for (int i = 0; i < num_iterations; i++) { lock.lock(); diff --git a/test/patterns_test.cpp b/test/patterns_test.cpp new file mode 100644 index 0000000..3726626 --- /dev/null +++ b/test/patterns_test.cpp @@ -0,0 +1,28 @@ +#include + +#include + +#include "pls/pls.h" + +constexpr int MAX_NUM_TASKS = 32; +constexpr int MAX_STACK_SIZE = 1024 * 8; + +TEST_CASE("parallel invoke calls correctly", "[algorithms/invoke.h]") { + pls::scheduler scheduler{3, MAX_NUM_TASKS, MAX_STACK_SIZE}; + + std::atomic num_run{0}; + scheduler.perform_work([&] { + pls::invoke([&] { + num_run++; + while (num_run < 3); + }, [&] { + while (num_run < 1); + num_run++; + while (num_run < 3); + }, [&] { + while (num_run < 2); + num_run++; + }); + REQUIRE(num_run == 3); + }); +} diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index 3f03cc9..442fa17 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -33,7 +33,7 @@ TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling }); scheduler::sync(); }); - REQUIRE(true); + REQUIRE(num_run == 3); } TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas_field.h]") { -- libgit2 0.26.0