Commit 4bb8c2e5 by FritzFlorian

Working version of parallel scan

parent a5bb074c
Pipeline #1261 failed with stages
in 50 seconds
...@@ -36,6 +36,7 @@ add_subdirectory(app/invoke_parallel) ...@@ -36,6 +36,7 @@ add_subdirectory(app/invoke_parallel)
add_subdirectory(app/benchmark_fft) add_subdirectory(app/benchmark_fft)
add_subdirectory(app/benchmark_unbalanced) add_subdirectory(app/benchmark_unbalanced)
add_subdirectory(app/benchmark_matrix) add_subdirectory(app/benchmark_matrix)
add_subdirectory(app/benchmark_prefix)
# Add optional tests # Add optional tests
option(PACKAGE_TESTS "Build the tests" ON) option(PACKAGE_TESTS "Build the tests" ON)
......
...@@ -2,8 +2,6 @@ ...@@ -2,8 +2,6 @@
#include <pls/internal/helpers/profiler.h> #include <pls/internal/helpers/profiler.h>
#include <pls/internal/helpers/mini_benchmark.h> #include <pls/internal/helpers/mini_benchmark.h>
#include <boost/range/irange.hpp>
const int MATRIX_SIZE = 128; const int MATRIX_SIZE = 128;
template<typename T, int SIZE> template<typename T, int SIZE>
...@@ -16,8 +14,7 @@ class matrix { ...@@ -16,8 +14,7 @@ class matrix {
} }
void multiply(const matrix<T, SIZE> &a, const matrix<T, SIZE> &b) { void multiply(const matrix<T, SIZE> &a, const matrix<T, SIZE> &b) {
auto range = boost::irange(0, SIZE); pls::algorithm::parallel_for(0, SIZE, [&](int i) {
pls::algorithm::parallel_for(range.begin(), range.end(), [&](int i) {
this->multiply_column(i, a, b); this->multiply_column(i, a, b);
}); });
} }
......
add_executable(benchmark_prefix main.cpp)
target_link_libraries(benchmark_prefix pls)
if (EASY_PROFILER)
target_link_libraries(benchmark_prefix easy_profiler)
endif ()
#include <pls/pls.h>
#include <pls/internal/helpers/profiler.h>
#include <pls/internal/helpers/mini_benchmark.h>
#include <iostream>
#include <vector>
#include <functional>
static constexpr int INPUT_SIZE = 1000000;
int main() {
PROFILE_ENABLE
std::vector<double> vec(INPUT_SIZE, 1);
std::vector<double> out(INPUT_SIZE);
for (int i = 0; i < INPUT_SIZE; i++) {
vec[i] = i;
}
pls::internal::helpers::run_mini_benchmark([&] {
pls::parallel_scan(vec.begin(), vec.end(), out.begin(), std::plus<double>(), 0.0);
}, 8, 1000);
PROFILE_SAVE("test_profile.prof")
}
...@@ -6,6 +6,8 @@ add_library(pls STATIC ...@@ -6,6 +6,8 @@ add_library(pls STATIC
include/pls/algorithms/invoke_parallel_impl.h include/pls/algorithms/invoke_parallel_impl.h
include/pls/algorithms/parallel_for.h include/pls/algorithms/parallel_for.h
include/pls/algorithms/parallel_for_impl.h include/pls/algorithms/parallel_for_impl.h
include/pls/algorithms/parallel_scan.h
include/pls/algorithms/parallel_scan_impl.h
include/pls/internal/base/spin_lock.h include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
...@@ -35,8 +37,6 @@ add_library(pls STATIC ...@@ -35,8 +37,6 @@ add_library(pls STATIC
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/scheduling/lambda_task.h include/pls/internal/data_structures/deque.h include/pls/internal/scheduling/lambda_task.h include/pls/internal/data_structures/deque.h
# include/pls/algorithms/parallel_scan.h include/pls/algorithms/parallel_scan_impl.h)
) )
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
......
...@@ -2,12 +2,21 @@ ...@@ -2,12 +2,21 @@
#ifndef PLS_PARALLEL_FOR_H #ifndef PLS_PARALLEL_FOR_H
#define PLS_PARALLEL_FOR_H #define PLS_PARALLEL_FOR_H
// TODO: Replace with own integer iterator to remove dependency
#include <boost/range/irange.hpp>
namespace pls { namespace pls {
namespace algorithm { namespace algorithm {
template<typename RandomIt, typename Function> template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function); void parallel_for(RandomIt first, RandomIt last, const Function &function);
template<typename Function>
void parallel_for(size_t first, size_t last, const Function &function) {
auto range = boost::irange(first, last);
parallel_for(range.begin(), range.end(), function);
}
} }
} }
#include "parallel_for_impl.h" #include "parallel_for_impl.h"
......
...@@ -25,11 +25,13 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) { ...@@ -25,11 +25,13 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) {
// Cut in half recursively // Cut in half recursively
long middle_index = num_elements / 2; long middle_index = num_elements / 2;
auto second_half_body = [=] { parallel_for(first + middle_index, last, function); }; auto second_half_body =
[first, middle_index, last, &function] { parallel_for(first + middle_index, last, function); };
using second_half_t = lambda_task_by_reference<decltype(second_half_body)>; using second_half_t = lambda_task_by_reference<decltype(second_half_body)>;
scheduler::spawn_child<second_half_t>(std::move(second_half_body)); scheduler::spawn_child<second_half_t>(std::move(second_half_body));
auto first_half_body = [=] { parallel_for(first, first + middle_index, function); }; auto first_half_body =
[first, middle_index, last, &function] { parallel_for(first, first + middle_index, function); };
using first_half_t = lambda_task_by_reference<decltype(first_half_body)>; using first_half_t = lambda_task_by_reference<decltype(first_half_body)>;
scheduler::spawn_child_and_wait<first_half_t>(std::move(first_half_body)); scheduler::spawn_child_and_wait<first_half_t>(std::move(first_half_body));
} }
......
#ifndef PLS_PARALLEL_SCAN_H_
#define PLS_PARALLEL_SCAN_H_
namespace pls {
namespace algorithm {
template<typename InIter, typename OutIter, typename BinaryOp, typename Type>
void parallel_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element);
}
}
#include "parallel_scan_impl.h"
#endif //PLS_PARALLEL_SCAN_H_
#ifndef PLS_PARALLEL_SCAN_IMPL_H_
#define PLS_PARALLEL_SCAN_IMPL_H_
#include <memory>
#include <functional>
#include "pls/pls.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace algorithm {
namespace internal {
template<typename InIter, typename OutIter, typename BinaryOp, typename Type>
void serial_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element) {
auto current_input = input_start;
auto current_output = output;
auto last_value = neutral_element;
while (current_input != input_end) {
last_value = op(last_value, *current_input);
*current_output = last_value;
current_input++;
current_output++;
}
}
}
template<typename InIter, typename OutIter, typename BinaryOp, typename Type>
void parallel_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element) {
constexpr auto chunks_per_thread = 4;
using namespace pls::internal::scheduling;
auto size = std::distance(input_start, input_end);
auto my_state = thread_state::get();
auto num_threads = thread_state::get()->scheduler_->num_threads();
auto chunks = num_threads * chunks_per_thread;
auto items_per_chunk = std::max(1l, size / chunks);
// First Pass -> each chunk isolated
// TODO: Put this onto the stack, not into heap allocated memory
Type *tmp = new Type[chunks]; //reinterpret_cast<Type *>(my_state->task_stack_->push_bytes(sizeof(Type) * (chunks)));
parallel_for(0, chunks,
[tmp, input_start, input_end, output, items_per_chunk, &op, &neutral_element](int i) {
auto local_start = input_start + items_per_chunk * i;
auto local_end = local_start + items_per_chunk;
if (local_end > input_end) {
local_end = input_end;
}
// TODO: This MUST be dynamic to make sense, as it has a far bigger influence than any other cutoff
//
internal::serial_scan(local_start, local_end, output + items_per_chunk * i, op, neutral_element);
tmp[i] = *(output + std::distance(local_start, local_end) - 1);
});
internal::serial_scan(tmp, tmp + chunks, tmp, std::plus<int>(), 0);
// Sum up pivot elements...
auto output_start = output;
auto output_end = output + size;
parallel_for(1, chunks,
[tmp, output_start, output_end, items_per_chunk, &op, &neutral_element](int i) {
auto local_start = output_start + items_per_chunk * i;
auto local_end = local_start + items_per_chunk;
if (local_end > output_end) {
local_end = output_end;
}
for (; local_start != local_end; local_start++) {
*local_start = op(*local_start, tmp[i - 1]);
}
});
// scheduler::wait_for_all();
delete[] tmp;
}
}
}
#endif //PLS_PARALLEL_SCAN_IMPL_H_
...@@ -43,7 +43,8 @@ class aligned_stack { ...@@ -43,7 +43,8 @@ class aligned_stack {
template<typename T, typename ...ARGS> template<typename T, typename ...ARGS>
T *push(ARGS &&... args); T *push(ARGS &&... args);
template<typename T> template<typename T>
void *push(); void *push_bytes();
void *push_bytes(size_t size);
template<typename T> template<typename T>
T pop(); T pop();
......
...@@ -9,20 +9,12 @@ namespace data_structures { ...@@ -9,20 +9,12 @@ namespace data_structures {
template<typename T, typename ...ARGS> template<typename T, typename ...ARGS>
T *aligned_stack::push(ARGS &&... args) { T *aligned_stack::push(ARGS &&... args) {
// Perfect-Forward construct // Perfect-Forward construct
return new(push < T > ())T(std::forward<ARGS>(args)...); return new(push_bytes<T>())T(std::forward<ARGS>(args)...);
} }
template<typename T> template<typename T>
void *aligned_stack::push() { void *aligned_stack::push_bytes() {
void *result = reinterpret_cast<T *>(head_); return push_bytes(sizeof(T));
// Move head to next aligned position after new object
head_ = base::alignment::next_alignment(head_ + sizeof(T));
if (head_ >= memory_end_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
return result;
} }
template<typename T> template<typename T>
......
...@@ -28,7 +28,7 @@ std::pair<work_stealing_deque_item, T> *work_stealing_deque<Item>::allocate_item ...@@ -28,7 +28,7 @@ std::pair<work_stealing_deque_item, T> *work_stealing_deque<Item>::allocate_item
// 'Union' type to push both on stack // 'Union' type to push both on stack
using pair_t = std::pair<work_stealing_deque_item, T>; using pair_t = std::pair<work_stealing_deque_item, T>;
// Allocate space on stack // Allocate space on stack
auto new_pair = reinterpret_cast<pair_t *>(stack_->push<pair_t>()); auto new_pair = reinterpret_cast<pair_t *>(stack_->push_bytes<pair_t>());
// Initialize memory on stack // Initialize memory on stack
new((void *) &(new_pair->first)) work_stealing_deque_item(); new((void *) &(new_pair->first)) work_stealing_deque_item();
new((void *) &(new_pair->second)) T(std::forward<ARGS>(args)...); new((void *) &(new_pair->second)) T(std::forward<ARGS>(args)...);
......
...@@ -18,7 +18,7 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon ...@@ -18,7 +18,7 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon
using namespace std; using namespace std;
using namespace pls::internal::scheduling; using namespace pls::internal::scheduling;
malloc_scheduler_memory scheduler_memory{max_threads}; malloc_scheduler_memory scheduler_memory{max_threads, 2 << 12};
for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) { for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) {
scheduler local_scheduler{&scheduler_memory, num_threads}; scheduler local_scheduler{&scheduler_memory, num_threads};
......
...@@ -50,11 +50,10 @@ void task::spawn_child(ARGS &&... args) { ...@@ -50,11 +50,10 @@ void task::spawn_child(ARGS &&... args) {
ref_count_++; ref_count_++;
// Push on our deque // Push on our deque
auto deque_state = thread_state::get()->deque_.save_state(); thread_state::get()->deque_.push_tail<T>([this](T *item) {
thread_state::get()->deque_.push_tail<T>([this, deque_state](T *item) {
// Assign forced values (for stack and parent management) // Assign forced values (for stack and parent management)
item->parent_ = this; item->parent_ = this;
item->deque_state_ = deque_state; item->deque_state_ = thread_state::get()->deque_.save_state();
}, std::forward<ARGS>(args)...); }, std::forward<ARGS>(args)...);
} }
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include "pls/algorithms/invoke_parallel.h" #include "pls/algorithms/invoke_parallel.h"
#include "pls/algorithms/parallel_for.h" #include "pls/algorithms/parallel_for.h"
#include "pls/algorithms/parallel_scan.h"
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
...@@ -23,7 +24,7 @@ using internal::scheduling::task; ...@@ -23,7 +24,7 @@ using internal::scheduling::task;
using algorithm::invoke_parallel; using algorithm::invoke_parallel;
using algorithm::parallel_for; using algorithm::parallel_for;
using algorithm::parallel_scan;
} }
#endif #endif
...@@ -15,6 +15,18 @@ aligned_stack::aligned_stack(char *memory_region, const std::size_t size) : ...@@ -15,6 +15,18 @@ aligned_stack::aligned_stack(char *memory_region, const std::size_t size) :
memory_end_{(pointer_t) memory_region + size}, memory_end_{(pointer_t) memory_region + size},
head_{base::alignment::next_alignment(memory_start_)} {} head_{base::alignment::next_alignment(memory_start_)} {}
void *aligned_stack::push_bytes(size_t size) {
void *result = reinterpret_cast<void *>(head_);
// Move head to next aligned position after new object
head_ = base::alignment::next_alignment(head_ + size);
if (head_ >= memory_end_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
return result;
}
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment