Commit 4e865e0e by FritzFlorian

Re-add parallel loop patterns.

parent 4c626a86
Pipeline #1426 failed with stages
in 34 seconds
# List all required files here (cmake best practice to NOT automate this step!)
add_library(pls STATIC
include/pls/algorithms/for_each.h include/pls/algorithms/for_each_impl.h
include/pls/algorithms/invoke.h include/pls/algorithms/invoke_impl.h
include/pls/algorithms/loop_partition_strategy.h
include/pls/algorithms/reduce.h include/pls/algorithms/reduce_impl.h
include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp
......@@ -31,7 +36,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task_manager_impl.h src/internal/scheduling/task_manager.cpp
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/external_trading_deque.h src/internal/scheduling/external_trading_deque.cpp
include/pls/internal/scheduling/traded_cas_field.h)
include/pls/internal/scheduling/traded_cas_field.h include/pls/algorithms/loop_partition_strategy.h)
# Dependencies for pls
target_link_libraries(pls Threads::Threads)
......
......@@ -2,35 +2,31 @@
#ifndef PLS_PARALLEL_FOR_H
#define PLS_PARALLEL_FOR_H
namespace pls {
namespace algorithm {
#include "loop_partition_strategy.h"
class fixed_strategy;
class dynamic_strategy;
namespace pls::algorithm {
template<typename Function, typename ExecutionStrategy>
void for_each_range(unsigned long first,
static void for_each_range(unsigned long first,
unsigned long last,
const Function &function,
ExecutionStrategy &execution_strategy);
template<typename Function>
void for_each_range(unsigned long first,
static void for_each_range(unsigned long first,
unsigned long last,
const Function &function);
template<typename RandomIt, typename Function, typename ExecutionStrategy>
void for_each(RandomIt first,
static void for_each(RandomIt first,
RandomIt last,
const Function &function,
ExecutionStrategy execution_strategy);
template<typename RandomIt, typename Function>
void for_each(RandomIt first,
static void for_each(RandomIt first,
RandomIt last,
const Function &function);
}
}
#include "for_each_impl.h"
......
......@@ -3,17 +3,16 @@
#define PLS_PARALLEL_FOR_IMPL_H
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/helpers/range.h"
namespace pls {
namespace algorithm {
namespace pls::algorithm {
namespace internal {
template<typename RandomIt, typename Function>
void for_each(const RandomIt first,
static void for_each(const RandomIt first,
const RandomIt last,
const Function function,
const Function &function,
const long min_elements) {
using namespace ::pls::internal::scheduling;
......@@ -28,13 +27,13 @@ void for_each(const RandomIt first,
const long middle_index = num_elements / 2;
scheduler::spawn([first, middle_index, last, &function, min_elements] {
return internal::for_each(first,
internal::for_each(first,
first + middle_index,
function,
min_elements);
});
scheduler::spawn([first, middle_index, last, &function, min_elements] {
return internal::for_each(first + middle_index,
internal::for_each(first + middle_index,
last,
function,
min_elements);
......@@ -45,31 +44,8 @@ void for_each(const RandomIt first,
}
class dynamic_strategy {
public:
explicit dynamic_strategy(const unsigned int tasks_per_thread = 4) : tasks_per_thread_{tasks_per_thread} {};
long calculate_min_elements(long num_elements) const {
const long num_threads = pls::internal::scheduling::thread_state::get().get_scheduler().num_threads();
return num_elements / (num_threads * tasks_per_thread_);
}
private:
unsigned const int tasks_per_thread_;
};
class fixed_strategy {
public:
explicit fixed_strategy(const long min_elements_per_task) : min_elements_per_task_{min_elements_per_task} {};
long calculate_min_elements(long /*num_elements*/) const {
return min_elements_per_task_;
}
private:
const long min_elements_per_task_;
};
template<typename RandomIt, typename Function, typename ExecutionStrategy>
void for_each(RandomIt
static void for_each(RandomIt
first,
RandomIt last,
const Function &function,
......@@ -81,12 +57,12 @@ void for_each(RandomIt
}
template<typename RandomIt, typename Function>
void for_each(RandomIt first, RandomIt last, const Function &function) {
static void for_each(RandomIt first, RandomIt last, const Function &function) {
return for_each(first, last, function, dynamic_strategy{4});
}
template<typename Function, typename ExecutionStrategy>
void for_each_range(unsigned long first,
static void for_each_range(unsigned long first,
unsigned long last,
const Function &function,
ExecutionStrategy execution_strategy) {
......@@ -95,7 +71,7 @@ void for_each_range(unsigned long first,
}
template<typename Function>
void for_each_range(unsigned long first,
static void for_each_range(unsigned long first,
unsigned long last,
const Function &function) {
auto range = pls::internal::helpers::range(first, last);
......@@ -103,6 +79,5 @@ void for_each_range(unsigned long first,
}
}
}
#endif //PLS_INVOKE_PARALLEL_IMPL_H
#ifndef PLS_ALGO_LOOP_PARTITION_STRATEGY_H_
#define PLS_ALGO_LOOP_PARTITION_STRATEGY_H_
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls::algorithm {
class dynamic_strategy {
public:
explicit dynamic_strategy(const unsigned int tasks_per_thread = 4) : tasks_per_thread_{tasks_per_thread} {};
[[nodiscard]] long calculate_min_elements(long num_elements) const {
const long num_threads = pls::internal::scheduling::thread_state::get().get_scheduler().num_threads();
return num_elements / (num_threads * tasks_per_thread_);
}
private:
unsigned const int tasks_per_thread_;
};
class fixed_strategy {
public:
explicit fixed_strategy(const long min_elements_per_task) : min_elements_per_task_{min_elements_per_task} {};
[[nodiscard]] long calculate_min_elements(long /*num_elements*/) const {
return min_elements_per_task_;
}
private:
const long min_elements_per_task_;
};
}
#endif //PLS_ALGO_LOOP_PARTITION_STRATEGY_H_
#ifndef PLS_PARALLEL_REDUCE_H
#define PLS_PARALLEL_REDUCE_H
#include "loop_partition_strategy.h"
namespace pls::algorithm {
template<typename RandomIt, typename Function, typename ExecutionStrategy>
static auto reduce(RandomIt first,
RandomIt last,
decltype(*first) neutral,
const Function &reducer,
ExecutionStrategy execution_strategy);
template<typename RandomIt, typename Function>
static auto reduce(RandomIt first,
RandomIt last,
decltype(*first) neutral,
const Function &reducer);
}
#include "reduce_impl.h"
#endif //PLS_PARALLEL_REDUCE_H
#ifndef PLS_PARALLEL_REDUCE_IMPL_H
#define PLS_PARALLEL_REDUCE_IMPL_H
#include "pls/internal/scheduling/scheduler.h"
#include "pls/algorithms/loop_partition_strategy.h"
namespace pls::algorithm {
namespace internal {
template<typename RandomIt, typename Function, typename Element>
static Element reduce(const RandomIt first,
const RandomIt last,
Element neutral,
const Function &reducer,
const long min_elements) {
using namespace ::pls::internal::scheduling;
const long num_elements = std::distance(first, last);
if (num_elements <= min_elements) {
// calculate last elements in loop to avoid overhead
Element acc = neutral;
for (auto current = first; current != last; current++) {
acc = reducer(acc, *current);
}
return acc;
} else {
// Cut in half recursively
const long middle_index = num_elements / 2;
Element left, right;
scheduler::spawn([first, middle_index, last, neutral, &reducer, min_elements, &left] {
left = internal::reduce<RandomIt, Function, Element>(first,
first + middle_index,
neutral,
reducer,
min_elements);
});
scheduler::spawn([first, middle_index, last, neutral, &reducer, min_elements, &right] {
right = internal::reduce<RandomIt, Function, Element>(first + middle_index,
last,
neutral,
reducer,
min_elements);
});
scheduler::sync();
return reducer(left, right);
}
}
}
template<typename RandomIt, typename Function, typename ExecutionStrategy>
static auto reduce(RandomIt first,
RandomIt last,
decltype(*first) neutral,
const Function &reducer,
ExecutionStrategy execution_strategy) {
long num_elements = std::distance(first, last);
return internal::reduce<RandomIt, Function,
decltype(*first)>(first,
last,
neutral,
reducer,
execution_strategy.calculate_min_elements(num_elements));
}
template<typename RandomIt, typename Function>
static auto reduce(RandomIt first,
RandomIt last,
decltype(*first) neutral,
const Function &reducer) {
return reduce(first, last, neutral, reducer, dynamic_strategy{4});
}
}
#endif //PLS_PARALLEL_REDUCE_IMPL_H
......@@ -5,7 +5,11 @@
#include "pls/algorithms/invoke.h"
#include "pls/algorithms/for_each.h"
#include "pls/algorithms/reduce.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/range.h"
#include "pls/internal/helpers/member_function.h"
namespace pls {
......@@ -23,11 +27,13 @@ static void sync() {
// general helpers that can be handy when using PLS
template<class C, typename R, typename ...ARGS>
using member_function = internal::helpers::member_function<C, R, ARGS...>;
using internal::helpers::range;
// parallel patterns API
using algorithm::invoke;
using algorithm::for_each;
using algorithm::for_each_range;
using algorithm::reduce;
}
#endif
#include <catch.hpp>
#include <atomic>
#include <thread>
#include "pls/pls.h"
......@@ -49,3 +50,47 @@ TEST_CASE("parallel invoke calls correctly", "[algorithms/invoke.h]") {
REQUIRE(num_run == 3);
});
}
TEST_CASE("parallel for calls correctly (might fail, timing based)", "[algorithms/for_each.h]") {
pls::scheduler scheduler{8, MAX_NUM_TASKS, MAX_STACK_SIZE};
auto start = std::chrono::steady_clock::now();
std::atomic<int> work_done{0};
scheduler.perform_work([&] {
pls::for_each_range(0, 100, [&](const int) {
work_done++;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
});
});
auto end = std::chrono::steady_clock::now();
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
REQUIRE(work_done == 100);
// It makes sense that 100 iterations on at least 4 threads take less than half the serial time.
// We want to make sure that at least some work is distributed on multiple cores.
REQUIRE(elapsed <= 50);
}
TEST_CASE("reduce calls correctly (might fail, timing based)", "[algorithms/for_each.h]") {
pls::scheduler scheduler{8, MAX_NUM_TASKS, MAX_STACK_SIZE};
auto start = std::chrono::steady_clock::now();
int num_elements = 100;
pls::range range{1, num_elements + 1};
int result;
scheduler.perform_work([&] {
result = pls::reduce(range.begin(), range.end(), 0, [&](const int a, const int b) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return a + b;
});
});
auto end = std::chrono::steady_clock::now();
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
REQUIRE(result == (num_elements * (num_elements + 1)) / 2);
// It makes sense that 100 iterations on at least 4 threads take less than half the serial time.
// We want to make sure that at least some work is distributed on multiple cores.
REQUIRE(elapsed <= 50);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment