diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index b300c31..e2f59f2 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -1,5 +1,10 @@ # List all required files here (cmake best practice to NOT automate this step!) add_library(pls STATIC + include/pls/algorithms/for_each.h include/pls/algorithms/for_each_impl.h + include/pls/algorithms/invoke.h include/pls/algorithms/invoke_impl.h + include/pls/algorithms/loop_partition_strategy.h + include/pls/algorithms/reduce.h include/pls/algorithms/reduce_impl.h + include/pls/internal/base/spin_lock.h include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp @@ -31,7 +36,7 @@ add_library(pls STATIC include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task_manager_impl.h src/internal/scheduling/task_manager.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/external_trading_deque.h src/internal/scheduling/external_trading_deque.cpp - include/pls/internal/scheduling/traded_cas_field.h) + include/pls/internal/scheduling/traded_cas_field.h include/pls/algorithms/loop_partition_strategy.h) # Dependencies for pls target_link_libraries(pls Threads::Threads) diff --git a/lib/pls/include/pls/algorithms/for_each.h b/lib/pls/include/pls/algorithms/for_each.h index 44c6541..7430ea5 100644 --- a/lib/pls/include/pls/algorithms/for_each.h +++ b/lib/pls/include/pls/algorithms/for_each.h @@ -2,35 +2,31 @@ #ifndef PLS_PARALLEL_FOR_H #define PLS_PARALLEL_FOR_H -namespace pls { -namespace algorithm { +#include "loop_partition_strategy.h" -class fixed_strategy; -class dynamic_strategy; +namespace pls::algorithm { template -void for_each_range(unsigned long first, - unsigned long last, - const Function &function, - ExecutionStrategy &execution_strategy); +static void for_each_range(unsigned long first, + unsigned long last, + const Function &function, + ExecutionStrategy &execution_strategy); template -void for_each_range(unsigned long first, - unsigned long last, - const Function &function); +static void for_each_range(unsigned long first, + unsigned long last, + const Function &function); template -void for_each(RandomIt first, - RandomIt last, - const Function &function, - ExecutionStrategy execution_strategy); +static void for_each(RandomIt first, + RandomIt last, + const Function &function, + ExecutionStrategy execution_strategy); template -void for_each(RandomIt first, - RandomIt last, - const Function &function); - -} +static void for_each(RandomIt first, + RandomIt last, + const Function &function); } #include "for_each_impl.h" diff --git a/lib/pls/include/pls/algorithms/for_each_impl.h b/lib/pls/include/pls/algorithms/for_each_impl.h index 73e42bf..9d2bb8a 100644 --- a/lib/pls/include/pls/algorithms/for_each_impl.h +++ b/lib/pls/include/pls/algorithms/for_each_impl.h @@ -3,18 +3,17 @@ #define PLS_PARALLEL_FOR_IMPL_H #include "pls/internal/scheduling/scheduler.h" -#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/helpers/range.h" -namespace pls { -namespace algorithm { +namespace pls::algorithm { + namespace internal { template -void for_each(const RandomIt first, - const RandomIt last, - const Function function, - const long min_elements) { +static void for_each(const RandomIt first, + const RandomIt last, + const Function &function, + const long min_elements) { using namespace ::pls::internal::scheduling; const long num_elements = std::distance(first, last); @@ -28,16 +27,16 @@ void for_each(const RandomIt first, const long middle_index = num_elements / 2; scheduler::spawn([first, middle_index, last, &function, min_elements] { - return internal::for_each(first, - first + middle_index, - function, - min_elements); + internal::for_each(first, + first + middle_index, + function, + min_elements); }); scheduler::spawn([first, middle_index, last, &function, min_elements] { - return internal::for_each(first + middle_index, - last, - function, - min_elements); + internal::for_each(first + middle_index, + last, + function, + min_elements); }); scheduler::sync(); } @@ -45,64 +44,40 @@ void for_each(const RandomIt first, } -class dynamic_strategy { - public: - explicit dynamic_strategy(const unsigned int tasks_per_thread = 4) : tasks_per_thread_{tasks_per_thread} {}; - - long calculate_min_elements(long num_elements) const { - const long num_threads = pls::internal::scheduling::thread_state::get().get_scheduler().num_threads(); - return num_elements / (num_threads * tasks_per_thread_); - } - private: - unsigned const int tasks_per_thread_; -}; - -class fixed_strategy { - public: - explicit fixed_strategy(const long min_elements_per_task) : min_elements_per_task_{min_elements_per_task} {}; - - long calculate_min_elements(long /*num_elements*/) const { - return min_elements_per_task_; - } - private: - const long min_elements_per_task_; -}; - template -void for_each(RandomIt - first, - RandomIt last, - const Function &function, - ExecutionStrategy - execution_strategy) { +static void for_each(RandomIt + first, + RandomIt last, + const Function &function, + ExecutionStrategy + execution_strategy) { long num_elements = std::distance(first, last); return internal::for_each(first, last, function, execution_strategy.calculate_min_elements(num_elements)); } template -void for_each(RandomIt first, RandomIt last, const Function &function) { +static void for_each(RandomIt first, RandomIt last, const Function &function) { return for_each(first, last, function, dynamic_strategy{4}); } template -void for_each_range(unsigned long first, - unsigned long last, - const Function &function, - ExecutionStrategy execution_strategy) { +static void for_each_range(unsigned long first, + unsigned long last, + const Function &function, + ExecutionStrategy execution_strategy) { auto range = pls::internal::helpers::range(first, last); return for_each(range.begin(), range.end(), function, execution_strategy); } template -void for_each_range(unsigned long first, - unsigned long last, - const Function &function) { +static void for_each_range(unsigned long first, + unsigned long last, + const Function &function) { auto range = pls::internal::helpers::range(first, last); return for_each(range.begin(), range.end(), function); } } -} #endif //PLS_INVOKE_PARALLEL_IMPL_H diff --git a/lib/pls/include/pls/algorithms/loop_partition_strategy.h b/lib/pls/include/pls/algorithms/loop_partition_strategy.h new file mode 100644 index 0000000..315a5cd --- /dev/null +++ b/lib/pls/include/pls/algorithms/loop_partition_strategy.h @@ -0,0 +1,33 @@ + +#ifndef PLS_ALGO_LOOP_PARTITION_STRATEGY_H_ +#define PLS_ALGO_LOOP_PARTITION_STRATEGY_H_ + +#include "pls/internal/scheduling/scheduler.h" +#include "pls/internal/scheduling/thread_state.h" + +namespace pls::algorithm { +class dynamic_strategy { + public: + explicit dynamic_strategy(const unsigned int tasks_per_thread = 4) : tasks_per_thread_{tasks_per_thread} {}; + + [[nodiscard]] long calculate_min_elements(long num_elements) const { + const long num_threads = pls::internal::scheduling::thread_state::get().get_scheduler().num_threads(); + return num_elements / (num_threads * tasks_per_thread_); + } + private: + unsigned const int tasks_per_thread_; +}; + +class fixed_strategy { + public: + explicit fixed_strategy(const long min_elements_per_task) : min_elements_per_task_{min_elements_per_task} {}; + + [[nodiscard]] long calculate_min_elements(long /*num_elements*/) const { + return min_elements_per_task_; + } + private: + const long min_elements_per_task_; +}; +} + +#endif //PLS_ALGO_LOOP_PARTITION_STRATEGY_H_ diff --git a/lib/pls/include/pls/algorithms/reduce.h b/lib/pls/include/pls/algorithms/reduce.h new file mode 100644 index 0000000..8dd18ee --- /dev/null +++ b/lib/pls/include/pls/algorithms/reduce.h @@ -0,0 +1,24 @@ + +#ifndef PLS_PARALLEL_REDUCE_H +#define PLS_PARALLEL_REDUCE_H + +#include "loop_partition_strategy.h" + +namespace pls::algorithm { + +template +static auto reduce(RandomIt first, + RandomIt last, + decltype(*first) neutral, + const Function &reducer, + ExecutionStrategy execution_strategy); + +template +static auto reduce(RandomIt first, + RandomIt last, + decltype(*first) neutral, + const Function &reducer); +} +#include "reduce_impl.h" + +#endif //PLS_PARALLEL_REDUCE_H diff --git a/lib/pls/include/pls/algorithms/reduce_impl.h b/lib/pls/include/pls/algorithms/reduce_impl.h new file mode 100644 index 0000000..5022c9c --- /dev/null +++ b/lib/pls/include/pls/algorithms/reduce_impl.h @@ -0,0 +1,79 @@ + +#ifndef PLS_PARALLEL_REDUCE_IMPL_H +#define PLS_PARALLEL_REDUCE_IMPL_H + +#include "pls/internal/scheduling/scheduler.h" +#include "pls/algorithms/loop_partition_strategy.h" + +namespace pls::algorithm { + +namespace internal { + +template +static Element reduce(const RandomIt first, + const RandomIt last, + Element neutral, + const Function &reducer, + const long min_elements) { + using namespace ::pls::internal::scheduling; + + const long num_elements = std::distance(first, last); + if (num_elements <= min_elements) { + // calculate last elements in loop to avoid overhead + Element acc = neutral; + for (auto current = first; current != last; current++) { + acc = reducer(acc, *current); + } + return acc; + } else { + // Cut in half recursively + const long middle_index = num_elements / 2; + + Element left, right; + scheduler::spawn([first, middle_index, last, neutral, &reducer, min_elements, &left] { + left = internal::reduce(first, + first + middle_index, + neutral, + reducer, + min_elements); + }); + scheduler::spawn([first, middle_index, last, neutral, &reducer, min_elements, &right] { + right = internal::reduce(first + middle_index, + last, + neutral, + reducer, + min_elements); + }); + scheduler::sync(); + return reducer(left, right); + } +} + +} + +template +static auto reduce(RandomIt first, + RandomIt last, + decltype(*first) neutral, + const Function &reducer, + ExecutionStrategy execution_strategy) { + long num_elements = std::distance(first, last); + return internal::reduce(first, + last, + neutral, + reducer, + execution_strategy.calculate_min_elements(num_elements)); +} + +template +static auto reduce(RandomIt first, + RandomIt last, + decltype(*first) neutral, + const Function &reducer) { + return reduce(first, last, neutral, reducer, dynamic_strategy{4}); +} + +} + +#endif //PLS_PARALLEL_REDUCE_IMPL_H diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index c38f5d6..daff79d 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -5,7 +5,11 @@ #include "pls/algorithms/invoke.h" #include "pls/algorithms/for_each.h" +#include "pls/algorithms/reduce.h" + #include "pls/internal/scheduling/scheduler.h" + +#include "pls/internal/helpers/range.h" #include "pls/internal/helpers/member_function.h" namespace pls { @@ -23,11 +27,13 @@ static void sync() { // general helpers that can be handy when using PLS template using member_function = internal::helpers::member_function; +using internal::helpers::range; // parallel patterns API using algorithm::invoke; using algorithm::for_each; using algorithm::for_each_range; +using algorithm::reduce; } #endif diff --git a/test/patterns_test.cpp b/test/patterns_test.cpp index 0fddc82..bc92ffd 100644 --- a/test/patterns_test.cpp +++ b/test/patterns_test.cpp @@ -1,6 +1,7 @@ #include #include +#include #include "pls/pls.h" @@ -49,3 +50,47 @@ TEST_CASE("parallel invoke calls correctly", "[algorithms/invoke.h]") { REQUIRE(num_run == 3); }); } + +TEST_CASE("parallel for calls correctly (might fail, timing based)", "[algorithms/for_each.h]") { + pls::scheduler scheduler{8, MAX_NUM_TASKS, MAX_STACK_SIZE}; + + auto start = std::chrono::steady_clock::now(); + std::atomic work_done{0}; + scheduler.perform_work([&] { + pls::for_each_range(0, 100, [&](const int) { + work_done++; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); + }); + auto end = std::chrono::steady_clock::now(); + auto elapsed = + std::chrono::duration_cast(end - start).count(); + + REQUIRE(work_done == 100); + // It makes sense that 100 iterations on at least 4 threads take less than half the serial time. + // We want to make sure that at least some work is distributed on multiple cores. + REQUIRE(elapsed <= 50); +} + +TEST_CASE("reduce calls correctly (might fail, timing based)", "[algorithms/for_each.h]") { + pls::scheduler scheduler{8, MAX_NUM_TASKS, MAX_STACK_SIZE}; + + auto start = std::chrono::steady_clock::now(); + int num_elements = 100; + pls::range range{1, num_elements + 1}; + int result; + scheduler.perform_work([&] { + result = pls::reduce(range.begin(), range.end(), 0, [&](const int a, const int b) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + return a + b; + }); + }); + auto end = std::chrono::steady_clock::now(); + auto elapsed = + std::chrono::duration_cast(end - start).count(); + + REQUIRE(result == (num_elements * (num_elements + 1)) / 2); + // It makes sense that 100 iterations on at least 4 threads take less than half the serial time. + // We want to make sure that at least some work is distributed on multiple cores. + REQUIRE(elapsed <= 50); +}