diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index f6ed20e..cbb8445 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -80,7 +80,7 @@ int main() { pls::internal::helpers::run_mini_benchmark([&] { complex_vector input = initial_input; fft(input.begin(), input.size()); - }, 8, 4000); + }, 8, 1000); PROFILE_SAVE("test_profile.prof") } diff --git a/lib/pls/include/pls/algorithms/parallel_for.h b/lib/pls/include/pls/algorithms/parallel_for.h index 4df1104..58334fe 100644 --- a/lib/pls/include/pls/algorithms/parallel_for.h +++ b/lib/pls/include/pls/algorithms/parallel_for.h @@ -8,6 +8,9 @@ namespace algorithm { template void parallel_for(RandomIt first, RandomIt last, const Function &function); +template +void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function); + } } #include "parallel_for_impl.h" diff --git a/lib/pls/include/pls/algorithms/parallel_for_impl.h b/lib/pls/include/pls/algorithms/parallel_for_impl.h index afd7baa..5b79468 100644 --- a/lib/pls/include/pls/algorithms/parallel_for_impl.h +++ b/lib/pls/include/pls/algorithms/parallel_for_impl.h @@ -3,6 +3,7 @@ #define PLS_PARALLEL_FOR_IMPL_H #include "pls/internal/scheduling/fork_join_task.h" +#include "pls/internal/scheduling/parallel_iterator_task.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h" @@ -45,6 +46,17 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) { using namespace ::pls::internal::base; static abstract_task::id id = unique_id::create(); + parallel_iterator_task iterator_task{first, last, function, id}; + scheduler::execute_task(iterator_task); +} + +template +void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function) { + using namespace ::pls::internal::scheduling; + using namespace ::pls::internal::helpers; + using namespace ::pls::internal::base; + static abstract_task::id id = unique_id::create(); + auto body = [=] { internal::parallel_for(first, last, function); }; fork_join_lambda_by_reference root_body(body); fork_join_task root_task{&root_body, id}; diff --git a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h index 2013110..304df79 100644 --- a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h +++ b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h @@ -2,20 +2,31 @@ #ifndef PLS_PARALLEL_ITERATOR_TASK_H #define PLS_PARALLEL_ITERATOR_TASK_H +#include "pls/internal/data_structures/stamped_integer.h" #include "abstract_task.h" namespace pls { namespace internal { namespace scheduling { + +using data_structures::stamped_integer; + template class parallel_iterator_task : public abstract_task { - RandomIt first_, last_; - Function function_; + alignas(64) const int step = 8; + + alignas(64) RandomIt first_, last_; + alignas(64) Function function_; // External stealing - size_t first_index_ava, last_index_ava; - // My internal state - size_t current_index, max_index; + alignas(64) std::atomic first_index_; + alignas(64) std::atomic to_be_processed_; + alignas(64) std::atomic last_index_; + + alignas(64) parallel_iterator_task *parent_; + + bool steal_front(size_t &stolen_max_index); + bool steal_back(size_t &stolen_first_index, size_t &stolen_last_index); protected: bool internal_stealing(abstract_task *other_task) override; @@ -23,6 +34,7 @@ class parallel_iterator_task : public abstract_task { public: explicit parallel_iterator_task(RandomIt first, RandomIt last, Function function, const abstract_task::id &id); + parallel_iterator_task(const parallel_iterator_task &other); void execute() override; }; } diff --git a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h index 1883cde..f3a2026 100644 --- a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h +++ b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h @@ -2,6 +2,7 @@ #ifndef PLS_PARALLEL_ITERATOR_TASK_IMPL_H #define PLS_PARALLEL_ITERATOR_TASK_IMPL_H +#include "scheduler.h" namespace pls { namespace internal { namespace scheduling { @@ -11,20 +12,129 @@ parallel_iterator_task::parallel_iterator_task abstract_task(0, id), first_{first}, last_{last}, - function_{function} {} + function_{function}, + first_index_{0}, + to_be_processed_{std::distance(first, last)}, + last_index_{stamped_integer{0, std::distance(first, last)}}, + parent_{nullptr} {} + +template +parallel_iterator_task::parallel_iterator_task(const pls::internal::scheduling::parallel_iterator_task< + RandomIt, + Function> &other): + abstract_task{other.depth(), other.unique_id()}, + first_{other.first_}, + last_{other.last_}, + function_{other.function_}, + first_index_{other.first_index_.load()}, + to_be_processed_{other.to_be_processed_.load()}, + last_index_{other.last_index_.load()}, + parent_{other.parent_} {} template void parallel_iterator_task::execute() { + // Start processing at beginning of our data + size_t current_index = 0; + auto current_iterator = first_; + + // Keep going as long as we have data + while (true) { + // Claim next chunk of data for us + size_t local_max_index; + if (!steal_front(local_max_index)) { + break; + } + + // Process Chunk + for (; current_index != local_max_index; current_index++) { + function_(*(current_iterator++)); + } + } + + to_be_processed_ -= current_index; + while (to_be_processed_.load() > 0) + steal_work(); + if (parent_ != nullptr) { + parent_->to_be_processed_ -= std::distance(first_, last_); + } +} + +template +bool parallel_iterator_task::steal_front(size_t &stolen_max) { + auto local_first_index = first_index_.load(); + auto local_last_index = last_index_.load(); + + if (local_first_index >= local_last_index.value) { + return false; + } + + // Proceed the first index == take part of the work for us + auto new_first_index = std::min(local_first_index + step, local_last_index.value); + first_index_ = new_first_index; + // Reload last index + local_last_index = last_index_.load(); + // Enough distance + if (new_first_index < local_last_index.value) { + stolen_max = new_first_index; + return true; + } + // Fight over last element + if (new_first_index == local_last_index.value) { + auto new_last_index = stamped_integer{local_last_index.stamp + 1, local_last_index.value}; + if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) { + stolen_max = new_first_index; + return true; + } + } + + // All iterator elements are assigned to some executor + return false; } template -bool parallel_iterator_task::split_task(base::swmr_spin_lock * /*lock*/) { +bool parallel_iterator_task::steal_back(size_t &stolen_first_index, size_t &stolen_last_index) { + auto local_first_index = first_index_.load(); + auto local_last_index = last_index_.load(); + + if (local_first_index >= local_last_index.value) { + return false; + } + + // Try to steal using cas + auto target_last_index = std::max(local_last_index.value - step, local_first_index); + auto new_last_index = stamped_integer{local_last_index.stamp + 1, target_last_index}; + if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) { + stolen_first_index = new_last_index.value; + stolen_last_index = local_last_index.value; + return true; + } + return false; } template -bool parallel_iterator_task::internal_stealing(abstract_task *other_task) { +bool parallel_iterator_task::split_task(base::swmr_spin_lock *lock) { + auto depth = this->depth(); + auto id = this->unique_id(); + size_t stolen_first_index, stolen_last_index; + if (!steal_back(stolen_first_index, stolen_last_index)) { + lock->reader_unlock(); + return false; + } + + lock->reader_unlock(); + parallel_iterator_task new_task{first_ + stolen_first_index, first_ + stolen_last_index, function_, id}; + new_task.parent_ = this; + scheduler::execute_task(new_task, depth); + + return true; +} + +template +bool parallel_iterator_task::internal_stealing(abstract_task */*other_task*/) { + // Do not allow for now, eases up on ABA problem return false; } } diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index 0b0e366..2cc0757 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -24,7 +24,7 @@ using internal::scheduling::fork_join_lambda_by_value; using internal::scheduling::fork_join_task; using algorithm::invoke_parallel; -using algorithm::parallel_for; +using algorithm::parallel_for_fork_join; }