From 4bb8c2e5567190fc6219fa40d0ce16613a802ac6 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Tue, 11 Jun 2019 14:39:35 +0200 Subject: [PATCH] Working version of parallel scan --- CMakeLists.txt | 1 + app/benchmark_matrix/main.cpp | 5 +---- app/benchmark_prefix/CMakeLists.txt | 5 +++++ app/benchmark_prefix/main.cpp | 25 +++++++++++++++++++++++++ lib/pls/CMakeLists.txt | 4 ++-- lib/pls/include/pls/algorithms/parallel_for.h | 9 +++++++++ lib/pls/include/pls/algorithms/parallel_for_impl.h | 6 ++++-- lib/pls/include/pls/algorithms/parallel_scan.h | 15 +++++++++++++++ lib/pls/include/pls/algorithms/parallel_scan_impl.h | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/data_structures/aligned_stack.h | 3 ++- lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h | 14 +++----------- lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h | 2 +- lib/pls/include/pls/internal/helpers/mini_benchmark.h | 2 +- lib/pls/include/pls/internal/scheduling/task.h | 5 ++--- lib/pls/include/pls/pls.h | 3 ++- lib/pls/src/internal/data_structures/aligned_stack.cpp | 12 ++++++++++++ 16 files changed, 166 insertions(+), 26 deletions(-) create mode 100644 app/benchmark_prefix/CMakeLists.txt create mode 100644 app/benchmark_prefix/main.cpp create mode 100644 lib/pls/include/pls/algorithms/parallel_scan.h create mode 100644 lib/pls/include/pls/algorithms/parallel_scan_impl.h diff --git a/CMakeLists.txt b/CMakeLists.txt index bc061e3..2b4d1da 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,6 +36,7 @@ add_subdirectory(app/invoke_parallel) add_subdirectory(app/benchmark_fft) add_subdirectory(app/benchmark_unbalanced) add_subdirectory(app/benchmark_matrix) +add_subdirectory(app/benchmark_prefix) # Add optional tests option(PACKAGE_TESTS "Build the tests" ON) diff --git a/app/benchmark_matrix/main.cpp b/app/benchmark_matrix/main.cpp index bb21de6..d536e6c 100644 --- a/app/benchmark_matrix/main.cpp +++ b/app/benchmark_matrix/main.cpp @@ -2,8 +2,6 @@ #include #include -#include - const int MATRIX_SIZE = 128; template @@ -16,8 +14,7 @@ class matrix { } void multiply(const matrix &a, const matrix &b) { - auto range = boost::irange(0, SIZE); - pls::algorithm::parallel_for(range.begin(), range.end(), [&](int i) { + pls::algorithm::parallel_for(0, SIZE, [&](int i) { this->multiply_column(i, a, b); }); } diff --git a/app/benchmark_prefix/CMakeLists.txt b/app/benchmark_prefix/CMakeLists.txt new file mode 100644 index 0000000..f4f705b --- /dev/null +++ b/app/benchmark_prefix/CMakeLists.txt @@ -0,0 +1,5 @@ +add_executable(benchmark_prefix main.cpp) +target_link_libraries(benchmark_prefix pls) +if (EASY_PROFILER) + target_link_libraries(benchmark_prefix easy_profiler) +endif () diff --git a/app/benchmark_prefix/main.cpp b/app/benchmark_prefix/main.cpp new file mode 100644 index 0000000..3c9c8a0 --- /dev/null +++ b/app/benchmark_prefix/main.cpp @@ -0,0 +1,25 @@ +#include +#include +#include + +#include +#include +#include + +static constexpr int INPUT_SIZE = 1000000; + +int main() { + PROFILE_ENABLE + std::vector vec(INPUT_SIZE, 1); + std::vector out(INPUT_SIZE); + + for (int i = 0; i < INPUT_SIZE; i++) { + vec[i] = i; + } + + pls::internal::helpers::run_mini_benchmark([&] { + pls::parallel_scan(vec.begin(), vec.end(), out.begin(), std::plus(), 0.0); + }, 8, 1000); + + PROFILE_SAVE("test_profile.prof") +} diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 4b3201d..d242258 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -6,6 +6,8 @@ add_library(pls STATIC include/pls/algorithms/invoke_parallel_impl.h include/pls/algorithms/parallel_for.h include/pls/algorithms/parallel_for_impl.h + include/pls/algorithms/parallel_scan.h + include/pls/algorithms/parallel_scan_impl.h include/pls/internal/base/spin_lock.h include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp @@ -35,8 +37,6 @@ add_library(pls STATIC include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/lambda_task.h include/pls/internal/data_structures/deque.h - - # include/pls/algorithms/parallel_scan.h include/pls/algorithms/parallel_scan_impl.h) ) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/algorithms/parallel_for.h b/lib/pls/include/pls/algorithms/parallel_for.h index 4df1104..6860863 100644 --- a/lib/pls/include/pls/algorithms/parallel_for.h +++ b/lib/pls/include/pls/algorithms/parallel_for.h @@ -2,12 +2,21 @@ #ifndef PLS_PARALLEL_FOR_H #define PLS_PARALLEL_FOR_H +// TODO: Replace with own integer iterator to remove dependency +#include + namespace pls { namespace algorithm { template void parallel_for(RandomIt first, RandomIt last, const Function &function); +template +void parallel_for(size_t first, size_t last, const Function &function) { + auto range = boost::irange(first, last); + parallel_for(range.begin(), range.end(), function); +} + } } #include "parallel_for_impl.h" diff --git a/lib/pls/include/pls/algorithms/parallel_for_impl.h b/lib/pls/include/pls/algorithms/parallel_for_impl.h index 32f9aa5..215f597 100644 --- a/lib/pls/include/pls/algorithms/parallel_for_impl.h +++ b/lib/pls/include/pls/algorithms/parallel_for_impl.h @@ -25,11 +25,13 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) { // Cut in half recursively long middle_index = num_elements / 2; - auto second_half_body = [=] { parallel_for(first + middle_index, last, function); }; + auto second_half_body = + [first, middle_index, last, &function] { parallel_for(first + middle_index, last, function); }; using second_half_t = lambda_task_by_reference; scheduler::spawn_child(std::move(second_half_body)); - auto first_half_body = [=] { parallel_for(first, first + middle_index, function); }; + auto first_half_body = + [first, middle_index, last, &function] { parallel_for(first, first + middle_index, function); }; using first_half_t = lambda_task_by_reference; scheduler::spawn_child_and_wait(std::move(first_half_body)); } diff --git a/lib/pls/include/pls/algorithms/parallel_scan.h b/lib/pls/include/pls/algorithms/parallel_scan.h new file mode 100644 index 0000000..fd1a621 --- /dev/null +++ b/lib/pls/include/pls/algorithms/parallel_scan.h @@ -0,0 +1,15 @@ + +#ifndef PLS_PARALLEL_SCAN_H_ +#define PLS_PARALLEL_SCAN_H_ + +namespace pls { +namespace algorithm { + +template +void parallel_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element); + +} +} +#include "parallel_scan_impl.h" + +#endif //PLS_PARALLEL_SCAN_H_ diff --git a/lib/pls/include/pls/algorithms/parallel_scan_impl.h b/lib/pls/include/pls/algorithms/parallel_scan_impl.h new file mode 100644 index 0000000..1de1840 --- /dev/null +++ b/lib/pls/include/pls/algorithms/parallel_scan_impl.h @@ -0,0 +1,81 @@ + +#ifndef PLS_PARALLEL_SCAN_IMPL_H_ +#define PLS_PARALLEL_SCAN_IMPL_H_ + +#include +#include + +#include "pls/pls.h" +#include "pls/internal/scheduling/thread_state.h" + +namespace pls { +namespace algorithm { +namespace internal { +template +void serial_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element) { + auto current_input = input_start; + auto current_output = output; + auto last_value = neutral_element; + while (current_input != input_end) { + last_value = op(last_value, *current_input); + *current_output = last_value; + + current_input++; + current_output++; + } +} +} + +template +void parallel_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element) { + constexpr auto chunks_per_thread = 4; + using namespace pls::internal::scheduling; + + auto size = std::distance(input_start, input_end); + auto my_state = thread_state::get(); + auto num_threads = thread_state::get()->scheduler_->num_threads(); + auto chunks = num_threads * chunks_per_thread; + auto items_per_chunk = std::max(1l, size / chunks); + + // First Pass -> each chunk isolated + // TODO: Put this onto the stack, not into heap allocated memory + Type *tmp = new Type[chunks]; //reinterpret_cast(my_state->task_stack_->push_bytes(sizeof(Type) * (chunks))); + parallel_for(0, chunks, + [tmp, input_start, input_end, output, items_per_chunk, &op, &neutral_element](int i) { + auto local_start = input_start + items_per_chunk * i; + auto local_end = local_start + items_per_chunk; + if (local_end > input_end) { + local_end = input_end; + } + + // TODO: This MUST be dynamic to make sense, as it has a far bigger influence than any other cutoff + // + internal::serial_scan(local_start, local_end, output + items_per_chunk * i, op, neutral_element); + tmp[i] = *(output + std::distance(local_start, local_end) - 1); + }); + internal::serial_scan(tmp, tmp + chunks, tmp, std::plus(), 0); + + // Sum up pivot elements... + auto output_start = output; + auto output_end = output + size; + parallel_for(1, chunks, + [tmp, output_start, output_end, items_per_chunk, &op, &neutral_element](int i) { + auto local_start = output_start + items_per_chunk * i; + auto local_end = local_start + items_per_chunk; + if (local_end > output_end) { + local_end = output_end; + } + + for (; local_start != local_end; local_start++) { + *local_start = op(*local_start, tmp[i - 1]); + } + }); + +// scheduler::wait_for_all(); + delete[] tmp; +} + +} +} + +#endif //PLS_PARALLEL_SCAN_IMPL_H_ diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h index f1ab8eb..bd49830 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h @@ -43,7 +43,8 @@ class aligned_stack { template T *push(ARGS &&... args); template - void *push(); + void *push_bytes(); + void *push_bytes(size_t size); template T pop(); diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h index ed7d25c..0952968 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h @@ -9,20 +9,12 @@ namespace data_structures { template T *aligned_stack::push(ARGS &&... args) { // Perfect-Forward construct - return new(push < T > ())T(std::forward(args)...); + return new(push_bytes())T(std::forward(args)...); } template -void *aligned_stack::push() { - void *result = reinterpret_cast(head_); - - // Move head to next aligned position after new object - head_ = base::alignment::next_alignment(head_ + sizeof(T)); - if (head_ >= memory_end_) { - PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!"); - } - - return result; +void *aligned_stack::push_bytes() { + return push_bytes(sizeof(T)); } template diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h index 45cebdf..e1f15e2 100644 --- a/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h @@ -28,7 +28,7 @@ std::pair *work_stealing_deque::allocate_item // 'Union' type to push both on stack using pair_t = std::pair; // Allocate space on stack - auto new_pair = reinterpret_cast(stack_->push()); + auto new_pair = reinterpret_cast(stack_->push_bytes()); // Initialize memory on stack new((void *) &(new_pair->first)) work_stealing_deque_item(); new((void *) &(new_pair->second)) T(std::forward(args)...); diff --git a/lib/pls/include/pls/internal/helpers/mini_benchmark.h b/lib/pls/include/pls/internal/helpers/mini_benchmark.h index d5bcce4..59409cb 100644 --- a/lib/pls/include/pls/internal/helpers/mini_benchmark.h +++ b/lib/pls/include/pls/internal/helpers/mini_benchmark.h @@ -18,7 +18,7 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon using namespace std; using namespace pls::internal::scheduling; - malloc_scheduler_memory scheduler_memory{max_threads}; + malloc_scheduler_memory scheduler_memory{max_threads, 2 << 12}; for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) { scheduler local_scheduler{&scheduler_memory, num_threads}; diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index c29cd3d..6af295e 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -50,11 +50,10 @@ void task::spawn_child(ARGS &&... args) { ref_count_++; // Push on our deque - auto deque_state = thread_state::get()->deque_.save_state(); - thread_state::get()->deque_.push_tail([this, deque_state](T *item) { + thread_state::get()->deque_.push_tail([this](T *item) { // Assign forced values (for stack and parent management) item->parent_ = this; - item->deque_state_ = deque_state; + item->deque_state_ = thread_state::get()->deque_.save_state(); }, std::forward(args)...); } diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index 0602610..c6aa33f 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -3,6 +3,7 @@ #include "pls/algorithms/invoke_parallel.h" #include "pls/algorithms/parallel_for.h" +#include "pls/algorithms/parallel_scan.h" #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/helpers/unique_id.h" @@ -23,7 +24,7 @@ using internal::scheduling::task; using algorithm::invoke_parallel; using algorithm::parallel_for; - +using algorithm::parallel_scan; } #endif diff --git a/lib/pls/src/internal/data_structures/aligned_stack.cpp b/lib/pls/src/internal/data_structures/aligned_stack.cpp index 4564338..269032c 100644 --- a/lib/pls/src/internal/data_structures/aligned_stack.cpp +++ b/lib/pls/src/internal/data_structures/aligned_stack.cpp @@ -15,6 +15,18 @@ aligned_stack::aligned_stack(char *memory_region, const std::size_t size) : memory_end_{(pointer_t) memory_region + size}, head_{base::alignment::next_alignment(memory_start_)} {} +void *aligned_stack::push_bytes(size_t size) { + void *result = reinterpret_cast(head_); + + // Move head to next aligned position after new object + head_ = base::alignment::next_alignment(head_ + size); + if (head_ >= memory_end_) { + PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!"); + } + + return result; +} + } } } -- libgit2 0.26.0