Commit afd0331b by FritzFlorian

First Version of parallel iterator task.

parent 1b4c5880
Pipeline #1192 passed with stages
in 3 minutes 49 seconds
......@@ -80,7 +80,7 @@ int main() {
pls::internal::helpers::run_mini_benchmark([&] {
complex_vector input = initial_input;
fft(input.begin(), input.size());
}, 8, 4000);
}, 8, 1000);
PROFILE_SAVE("test_profile.prof")
}
......@@ -8,6 +8,9 @@ namespace algorithm {
template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function);
template<typename RandomIt, typename Function>
void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function);
}
}
#include "parallel_for_impl.h"
......
......@@ -3,6 +3,7 @@
#define PLS_PARALLEL_FOR_IMPL_H
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/parallel_iterator_task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/scheduler.h"
......@@ -45,6 +46,17 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<RandomIt, Function>();
parallel_iterator_task<RandomIt, Function> iterator_task{first, last, function, id};
scheduler::execute_task(iterator_task);
}
template<typename RandomIt, typename Function>
void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<RandomIt, Function>();
auto body = [=] { internal::parallel_for(first, last, function); };
fork_join_lambda_by_reference<decltype(body)> root_body(body);
fork_join_task root_task{&root_body, id};
......
......@@ -2,20 +2,31 @@
#ifndef PLS_PARALLEL_ITERATOR_TASK_H
#define PLS_PARALLEL_ITERATOR_TASK_H
#include "pls/internal/data_structures/stamped_integer.h"
#include "abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
using data_structures::stamped_integer;
template<typename RandomIt, typename Function>
class parallel_iterator_task : public abstract_task {
RandomIt first_, last_;
Function function_;
alignas(64) const int step = 8;
alignas(64) RandomIt first_, last_;
alignas(64) Function function_;
// External stealing
size_t first_index_ava, last_index_ava;
// My internal state
size_t current_index, max_index;
alignas(64) std::atomic<size_t> first_index_;
alignas(64) std::atomic<size_t> to_be_processed_;
alignas(64) std::atomic<stamped_integer> last_index_;
alignas(64) parallel_iterator_task *parent_;
bool steal_front(size_t &stolen_max_index);
bool steal_back(size_t &stolen_first_index, size_t &stolen_last_index);
protected:
bool internal_stealing(abstract_task *other_task) override;
......@@ -23,6 +34,7 @@ class parallel_iterator_task : public abstract_task {
public:
explicit parallel_iterator_task(RandomIt first, RandomIt last, Function function, const abstract_task::id &id);
parallel_iterator_task(const parallel_iterator_task &other);
void execute() override;
};
}
......
......@@ -2,6 +2,7 @@
#ifndef PLS_PARALLEL_ITERATOR_TASK_IMPL_H
#define PLS_PARALLEL_ITERATOR_TASK_IMPL_H
#include "scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
......@@ -11,20 +12,129 @@ parallel_iterator_task<RandomIt, Function>::parallel_iterator_task
abstract_task(0, id),
first_{first},
last_{last},
function_{function} {}
function_{function},
first_index_{0},
to_be_processed_{std::distance(first, last)},
last_index_{stamped_integer{0, std::distance(first, last)}},
parent_{nullptr} {}
template<typename RandomIt, typename Function>
parallel_iterator_task<RandomIt,
Function>::parallel_iterator_task(const pls::internal::scheduling::parallel_iterator_task<
RandomIt,
Function> &other):
abstract_task{other.depth(), other.unique_id()},
first_{other.first_},
last_{other.last_},
function_{other.function_},
first_index_{other.first_index_.load()},
to_be_processed_{other.to_be_processed_.load()},
last_index_{other.last_index_.load()},
parent_{other.parent_} {}
template<typename RandomIt, typename Function>
void parallel_iterator_task<RandomIt, Function>::execute() {
// Start processing at beginning of our data
size_t current_index = 0;
auto current_iterator = first_;
// Keep going as long as we have data
while (true) {
// Claim next chunk of data for us
size_t local_max_index;
if (!steal_front(local_max_index)) {
break;
}
// Process Chunk
for (; current_index != local_max_index; current_index++) {
function_(*(current_iterator++));
}
}
to_be_processed_ -= current_index;
while (to_be_processed_.load() > 0)
steal_work();
if (parent_ != nullptr) {
parent_->to_be_processed_ -= std::distance(first_, last_);
}
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::steal_front(size_t &stolen_max) {
auto local_first_index = first_index_.load();
auto local_last_index = last_index_.load();
if (local_first_index >= local_last_index.value) {
return false;
}
// Proceed the first index == take part of the work for us
auto new_first_index = std::min(local_first_index + step, local_last_index.value);
first_index_ = new_first_index;
// Reload last index
local_last_index = last_index_.load();
// Enough distance
if (new_first_index < local_last_index.value) {
stolen_max = new_first_index;
return true;
}
// Fight over last element
if (new_first_index == local_last_index.value) {
auto new_last_index = stamped_integer{local_last_index.stamp + 1, local_last_index.value};
if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) {
stolen_max = new_first_index;
return true;
}
}
// All iterator elements are assigned to some executor
return false;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::split_task(base::swmr_spin_lock * /*lock*/) {
bool parallel_iterator_task<RandomIt, Function>::steal_back(size_t &stolen_first_index, size_t &stolen_last_index) {
auto local_first_index = first_index_.load();
auto local_last_index = last_index_.load();
if (local_first_index >= local_last_index.value) {
return false;
}
// Try to steal using cas
auto target_last_index = std::max(local_last_index.value - step, local_first_index);
auto new_last_index = stamped_integer{local_last_index.stamp + 1, target_last_index};
if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) {
stolen_first_index = new_last_index.value;
stolen_last_index = local_last_index.value;
return true;
}
return false;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::internal_stealing(abstract_task *other_task) {
bool parallel_iterator_task<RandomIt, Function>::split_task(base::swmr_spin_lock *lock) {
auto depth = this->depth();
auto id = this->unique_id();
size_t stolen_first_index, stolen_last_index;
if (!steal_back(stolen_first_index, stolen_last_index)) {
lock->reader_unlock();
return false;
}
lock->reader_unlock();
parallel_iterator_task new_task{first_ + stolen_first_index, first_ + stolen_last_index, function_, id};
new_task.parent_ = this;
scheduler::execute_task(new_task, depth);
return true;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::internal_stealing(abstract_task */*other_task*/) {
// Do not allow for now, eases up on ABA problem
return false;
}
}
......
......@@ -24,7 +24,7 @@ using internal::scheduling::fork_join_lambda_by_value;
using internal::scheduling::fork_join_task;
using algorithm::invoke_parallel;
using algorithm::parallel_for;
using algorithm::parallel_for_fork_join;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment