Commit e2de954a by FritzFlorian

Re-add parallel invoke pattern.

parent 84ec2d6b
Pipeline #1424 failed with stages
in 34 seconds
......@@ -3,7 +3,6 @@
using namespace pls::internal::scheduling;
#include <iostream>
#include <cstdio>
#include "benchmark_runner.h"
#include "benchmark_base/fib.h"
......@@ -31,7 +30,7 @@ int pls_fib(int n) {
}
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 4;
constexpr int MAX_STACK_SIZE = 1024 * 32;
int main(int argc, char **argv) {
int num_threads;
......
......@@ -2,22 +2,19 @@
#ifndef PLS_PARALLEL_INVOKE_H
#define PLS_PARALLEL_INVOKE_H
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
namespace algorithm {
namespace pls::algorithm {
template<typename Function1, typename Function2>
void invoke(const Function1 &function1, const Function2 &function2);
void invoke(const Function1 &&function1, const Function2 &&function2);
template<typename Function1, typename Function2, typename Function3>
void invoke(const Function1 &function1, const Function2 &function2, const Function3 &function3);
void invoke(const Function1 &&function1, const Function2 &&function2, const Function3 &&function3);
// ...and so on, add more if we decide to keep this design
}
}
#include "invoke_impl.h"
#endif //PLS_PARALLEL_INVOKE_H
......@@ -2,39 +2,29 @@
#ifndef PLS_INVOKE_PARALLEL_IMPL_H
#define PLS_INVOKE_PARALLEL_IMPL_H
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/lambda_task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace algorithm {
namespace pls::algorithm {
template<typename Function1, typename Function2>
void invoke(Function1 &&function1, Function2 &&function2) {
using namespace ::pls::internal::scheduling;
using task_1_t = lambda_task_by_value<Function1>;
using task_2_t = lambda_task_by_value<Function2>;
scheduler::spawn_child<task_2_t>(std::forward<Function2>(function2));
scheduler::spawn_child_and_wait<task_1_t>(std::forward<Function1>(function1));
scheduler::spawn(std::forward<Function1>(function1));
scheduler::spawn(std::forward<Function2>(function2));
scheduler::sync();
}
template<typename Function1, typename Function2, typename Function3>
void invoke(Function1 &&function1, Function2 &&function2, Function3 &&function3) {
using namespace ::pls::internal::scheduling;
using task_1_t = lambda_task_by_value<Function1>;
using task_2_t = lambda_task_by_value<Function2>;
using task_3_t = lambda_task_by_value<Function3>;
scheduler::spawn_child<task_3_t>(std::forward<Function3>(function3));
scheduler::spawn_child<task_2_t>(std::forward<Function2>(function2));
scheduler::spawn_child_and_wait<task_1_t>(std::forward<Function1>(function1));
scheduler::spawn(std::forward<Function1>(function1));
scheduler::spawn(std::forward<Function2>(function2));
scheduler::spawn(std::forward<Function3>(function3));
scheduler::sync();
}
}
}
#endif //PLS_INVOKE_PARALLEL_IMPL_H
#ifndef PLS_PARALLEL_SCAN_H_
#define PLS_PARALLEL_SCAN_H_
namespace pls {
namespace algorithm {
template<typename InIter, typename OutIter, typename BinaryOp, typename Type>
void scan(InIter in_start, const InIter in_end, OutIter out, BinaryOp op, Type neutral_elem);
}
}
#include "scan_impl.h"
#endif //PLS_PARALLEL_SCAN_H_
#ifndef PLS_PARALLEL_SCAN_IMPL_H_
#define PLS_PARALLEL_SCAN_IMPL_H_
#include <memory>
#include <functional>
#include "pls/pls.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h"
namespace pls {
namespace algorithm {
namespace internal {
using namespace pls::internal::scheduling;
template<typename InIter, typename OutIter, typename BinaryOp, typename Type>
void serial_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element) {
auto current_output = output;
auto last_value = neutral_element;
for (auto current_input = input_start; current_input != input_end; current_input++) {
last_value = op(last_value, *current_input);
*current_output = last_value;
current_output++;
}
}
template<typename InIter, typename OutIter, typename BinaryOp, typename Type>
class scan_task : public pls::internal::scheduling::task {
const InIter in_start_;
const InIter in_end_;
const OutIter out_;
const BinaryOp op_;
const Type neutral_elem_;
long size_, chunks_;
long items_per_chunk_;
Type *chunk_sums_;
public:
scan_task(const InIter in_start, const InIter in_end, const OutIter out, const BinaryOp op, const Type neutral_elem) :
in_start_{in_start},
in_end_{in_end},
out_{out},
op_{op},
neutral_elem_{neutral_elem} {
constexpr auto chunks_per_thread = 1;
size_ = std::distance(in_start, in_end);
auto num_threads = thread_state::get()->scheduler_->num_threads();
chunks_ = num_threads * chunks_per_thread + 1;
items_per_chunk_ = size_ / chunks_ + 1;
chunk_sums_ = reinterpret_cast<Type *>(allocate_memory(sizeof(Type) * chunks_ - 1));
};
void execute_internal() override {
// First Pass = calculate each chunks individual prefix sum
for_each_range(0, chunks_ - 1, [&](int i) {
auto chunk_start = in_start_ + items_per_chunk_ * i;
auto chunk_end = std::min(in_end_, chunk_start + items_per_chunk_);
auto chunk_size = std::distance(chunk_start, chunk_end);
auto chunk_output = out_ + items_per_chunk_ * i;
internal::serial_scan(chunk_start, chunk_end, chunk_output, op_, neutral_elem_);
auto last_chunk_value = *(chunk_output + chunk_size - 1);
chunk_sums_[i] = last_chunk_value;
}, fixed_strategy{1});
// Calculate prefix sums of each chunks sum
// (effectively the prefix sum at the end of each chunk, then used to correct the following chunk).
internal::serial_scan(chunk_sums_, chunk_sums_ + chunks_ - 1, chunk_sums_, op_, neutral_elem_);
// Second Pass = Use results from first pass to correct each chunks sum
auto output_start = out_;
auto output_end = out_ + size_;
for_each_range(1, chunks_, [&](int i) {
if (i == chunks_ - 1) {
auto chunk_start = in_start_ + items_per_chunk_ * i;
auto chunk_end = std::min(in_end_, chunk_start + items_per_chunk_);
auto chunk_output = output_start + items_per_chunk_ * i;
*chunk_start += chunk_sums_[i - 1];
internal::serial_scan(chunk_start, chunk_end, chunk_output, op_, neutral_elem_);
} else {
auto chunk_start = output_start + items_per_chunk_ * i;
auto chunk_end = std::min(output_end, chunk_start + items_per_chunk_);
for (; chunk_start != chunk_end; chunk_start++) {
*chunk_start = op_(*chunk_start, chunk_sums_[i - 1]);
}
}
}, fixed_strategy{1});
wait_for_all();
this->~scan_task();
}
};
}
template<typename InIter, typename OutIter, typename BinaryOp, typename Type>
void scan(InIter in_start, const InIter in_end, OutIter out, BinaryOp op, Type neutral_elem) {
using namespace pls::internal::scheduling;
using scan_task_type = internal::scan_task<InIter, OutIter, BinaryOp, Type>;
scheduler::spawn_child_and_wait<scan_task_type>(in_start, in_end, out, op, neutral_elem);
}
}
}
#endif //PLS_PARALLEL_SCAN_IMPL_H_
......@@ -26,7 +26,7 @@ class task_manager {
explicit task_manager(unsigned thread_id,
size_t num_tasks,
size_t stack_size,
std::shared_ptr<stack_allocator> stack_allocator);
std::shared_ptr<stack_allocator> &stack_allocator);
~task_manager();
void push_resource_on_task(task *target_task, task *spare_task_chain);
......
#ifndef PLS_LIBRARY_H
#define PLS_LIBRARY_H
#include <utility>
#include "pls/algorithms/invoke.h"
#include "pls/algorithms/for_each.h"
#include "pls/algorithms/scan.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h"
#include "pls/internal/helpers/member_function.h"
namespace pls {
using internal::scheduling::static_scheduler_memory;
// 'basic' for-join APIs
using internal::scheduling::scheduler;
template<typename Function>
void spawn(Function &&function) {
scheduler::spawn(std::forward<Function>(function));
}
void sync() {
scheduler::sync();
}
using unique_id = internal::helpers::unique_id;
// general helpers that can be handy when using PLS
template<class C, typename R, typename ...ARGS>
using member_function = internal::helpers::member_function<C, R, ARGS...>;
using internal::scheduling::task;
using internal::scheduling::lambda_task_by_reference;
using internal::scheduling::lambda_task_by_value;
using internal::scheduling::task;
// parallel patterns API
using algorithm::invoke;
using algorithm::for_each;
using algorithm::for_each_range;
using algorithm::scan;
}
#endif
......@@ -9,7 +9,7 @@ namespace pls::internal::scheduling {
task_manager::task_manager(unsigned thread_id,
size_t num_tasks,
size_t stack_size,
std::shared_ptr<stack_allocator> stack_allocator) : stack_allocator_{stack_allocator},
std::shared_ptr<stack_allocator> &stack_allocator) : stack_allocator_{stack_allocator},
tasks_{},
deque_{thread_id, num_tasks} {
tasks_.reserve(num_tasks);
......
......@@ -3,5 +3,6 @@ add_executable(tests
data_structures_test.cpp
base_tests.cpp
scheduling_tests.cpp
patterns_test.cpp
test_helpers.h)
target_link_libraries(tests catch2 pls)
......@@ -2,6 +2,7 @@
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/system_details.h"
#include "pls/internal/base/alignment.h"
#include "pls/internal/base/stack_allocator.h"
#include "test_helpers.h"
......@@ -14,6 +15,38 @@ using namespace pls::internal::base;
int base_tests_shared_counter;
TEST_CASE("align helpers", "[internal/base/alignment.h") {
system_details::pointer_t aligned_64 = 64;
system_details::pointer_t aligned_32 = 32;
system_details::pointer_t not_aligned_64 = 70;
system_details::pointer_t not_aligned_32 = 60;
REQUIRE(alignment::next_alignment(aligned_64, 64) == 64);
REQUIRE(alignment::next_alignment(aligned_32, 32) == 32);
REQUIRE(alignment::next_alignment(aligned_32, 64) == 64);
REQUIRE(alignment::previous_alignment(not_aligned_64, 64) == 64);
REQUIRE(alignment::next_alignment(not_aligned_64, 64) == 128);
REQUIRE(alignment::previous_alignment(not_aligned_32, 32) == 32);
REQUIRE(alignment::next_alignment(not_aligned_32, 32) == 64);
}
TEST_CASE("alignment wrapper", "[internal/base/alignment.h") {
char filler1 = '\0';
alignment::alignment_wrapper<int, 256> int_256{256};
int filler2 = 0;
alignment::alignment_wrapper<int, 1024> int_1024{1024};
(void) filler1;
(void) filler2;
REQUIRE(int_256.object() == 256);
REQUIRE((system_details::pointer_t) int_256.pointer() % 256 == 0);
REQUIRE(int_1024.object() == 1024);
REQUIRE((system_details::pointer_t) int_1024.pointer() % 1024 == 0);
}
TEST_CASE("mmap stack allocator", "[internal/base/stack_allocator.h") {
mmap_stack_allocator stack_allocator;
......@@ -57,7 +90,7 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi
}};
std::thread t2{[&]() {
barrier--;
while(barrier != 0);
while (barrier != 0);
for (int i = 0; i < num_iterations; i++) {
lock.lock();
......
#include <catch.hpp>
#include <atomic>
#include "pls/pls.h"
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 8;
TEST_CASE("parallel invoke calls correctly", "[algorithms/invoke.h]") {
pls::scheduler scheduler{3, MAX_NUM_TASKS, MAX_STACK_SIZE};
std::atomic<int> num_run{0};
scheduler.perform_work([&] {
pls::invoke([&] {
num_run++;
while (num_run < 3);
}, [&] {
while (num_run < 1);
num_run++;
while (num_run < 3);
}, [&] {
while (num_run < 2);
num_run++;
});
REQUIRE(num_run == 3);
});
}
......@@ -33,7 +33,7 @@ TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling
});
scheduler::sync();
});
REQUIRE(true);
REQUIRE(num_run == 3);
}
TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas_field.h]") {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment