Commit a7073632 by FritzFlorian

WIP: refactor to only use fork-join tasks.

parent 64e2238c
...@@ -19,7 +19,7 @@ int count_child_nodes(uts::node &node) { ...@@ -19,7 +19,7 @@ int count_child_nodes(uts::node &node) {
return child_count; return child_count;
} }
auto current_task = pls::fork_join_sub_task::current(); auto current_task = pls::task::current();
std::vector<int> results(children.size()); std::vector<int> results(children.size());
for (size_t i = 0; i < children.size(); i++) { for (size_t i = 0; i < children.size(); i++) {
size_t index = i; size_t index = i;
...@@ -45,7 +45,7 @@ int unbalanced_tree_search(int seed, int root_children, double q, int normal_chi ...@@ -45,7 +45,7 @@ int unbalanced_tree_search(int seed, int root_children, double q, int normal_chi
}; };
pls::fork_join_lambda_by_reference<typeof(lambda)> task(lambda); pls::fork_join_lambda_by_reference<typeof(lambda)> task(lambda);
pls::fork_join_lambda_by_reference<typeof(lambda)> sub_task(lambda); pls::fork_join_lambda_by_reference<typeof(lambda)> sub_task(lambda);
pls::fork_join_task root_task{&sub_task, id}; pls::task root_task{&sub_task, id};
pls::scheduler::execute_task(root_task); pls::scheduler::execute_task(root_task);
return result; return result;
......
...@@ -29,15 +29,14 @@ add_library(pls STATIC ...@@ -29,15 +29,14 @@ add_library(pls STATIC
include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/mini_benchmark.h
include/pls/internal/helpers/unique_id.h include/pls/internal/helpers/unique_id.h
include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp src/internal/scheduling/root_task.cpp
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h
include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp src/internal/scheduling/abstract_task.cpp
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp src/internal/scheduling/run_on_n_threads_task.cpp
include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/scheduling/parallel_iterator_task.h include/pls/internal/scheduling/parallel_iterator_task_impl.h
src/internal/scheduling/parallel_iterator_task.cpp) src/internal/scheduling/parallel_iterator_task.cpp)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#ifndef PLS_PARALLEL_INVOKE_H #ifndef PLS_PARALLEL_INVOKE_H
#define PLS_PARALLEL_INVOKE_H #define PLS_PARALLEL_INVOKE_H
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
namespace pls { namespace pls {
......
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
#ifndef PLS_INVOKE_PARALLEL_IMPL_H #ifndef PLS_INVOKE_PARALLEL_IMPL_H
#define PLS_INVOKE_PARALLEL_IMPL_H #define PLS_INVOKE_PARALLEL_IMPL_H
#include <pls/internal/scheduling/fork_join_task.h> #include <pls/internal/scheduling/task.h>
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
#include "pls/internal/base/alignment.h" #include "pls/internal/base/alignment.h"
...@@ -23,7 +23,7 @@ inline void run_body(const Body &internal_body, const abstract_task::id &id) { ...@@ -23,7 +23,7 @@ inline void run_body(const Body &internal_body, const abstract_task::id &id) {
internal_body(); internal_body();
} else { } else {
fork_join_lambda_by_reference<Body> root_body(internal_body); fork_join_lambda_by_reference<Body> root_body(internal_body);
fork_join_task root_task{&root_body, id}; task root_task{&root_body, id};
scheduler::execute_task(root_task); scheduler::execute_task(root_task);
} }
} }
...@@ -37,7 +37,7 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2) { ...@@ -37,7 +37,7 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2) {
static abstract_task::id id = unique_id::create<Function1, Function2>(); static abstract_task::id id = unique_id::create<Function1, Function2>();
auto internal_body = [&]() { auto internal_body = [&]() {
auto current_task = fork_join_sub_task::current(); auto current_task = task::current();
auto sub_task_2 = fork_join_lambda_by_reference<Function2>(function2); auto sub_task_2 = fork_join_lambda_by_reference<Function2>(function2);
current_task->spawn_child(sub_task_2); current_task->spawn_child(sub_task_2);
...@@ -55,7 +55,7 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2, con ...@@ -55,7 +55,7 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2, con
static abstract_task::id id = unique_id::create<Function1, Function2, Function3>(); static abstract_task::id id = unique_id::create<Function1, Function2, Function3>();
auto internal_body = [&]() { auto internal_body = [&]() {
auto current_task = fork_join_sub_task::current(); auto current_task = task::current();
auto sub_task_2 = fork_join_lambda_by_reference<Function2>(function2); auto sub_task_2 = fork_join_lambda_by_reference<Function2>(function2);
auto sub_task_3 = fork_join_lambda_by_reference<Function3>(function3); auto sub_task_3 = fork_join_lambda_by_reference<Function3>(function3);
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#ifndef PLS_PARALLEL_FOR_IMPL_H #ifndef PLS_PARALLEL_FOR_IMPL_H
#define PLS_PARALLEL_FOR_IMPL_H #define PLS_PARALLEL_FOR_IMPL_H
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/parallel_iterator_task.h" #include "pls/internal/scheduling/parallel_iterator_task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
...@@ -31,10 +31,10 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) { ...@@ -31,10 +31,10 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) {
auto body = [=] { internal::parallel_for(first + middle_index, last, function); }; auto body = [=] { internal::parallel_for(first + middle_index, last, function); };
fork_join_lambda_by_reference<decltype(body)> second_half_task(body); fork_join_lambda_by_reference<decltype(body)> second_half_task(body);
fork_join_sub_task::current()->spawn_child(second_half_task); task::current()->spawn_child(second_half_task);
parallel_for(first, first + middle_index, function); parallel_for(first, first + middle_index, function);
fork_join_sub_task::current()->wait_for_all(); task::current()->wait_for_all();
} }
} }
} }
...@@ -59,7 +59,7 @@ void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &funct ...@@ -59,7 +59,7 @@ void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &funct
auto body = [=] { internal::parallel_for(first, last, function); }; auto body = [=] { internal::parallel_for(first, last, function); };
fork_join_lambda_by_reference<decltype(body)> root_body(body); fork_join_lambda_by_reference<decltype(body)> root_body(body);
fork_join_task root_task{&root_body, id}; task root_task{&root_body, id};
scheduler::execute_task(root_task); scheduler::execute_task(root_task);
} }
......
#ifndef PLS_ABSTRACT_TASK_H
#define PLS_ABSTRACT_TASK_H
#include "pls/internal/base/swmr_spin_lock.h"
#include "pls/internal/helpers/unique_id.h"
namespace pls {
namespace internal {
namespace scheduling {
class abstract_task {
public:
using id = helpers::unique_id;
private:
unsigned int depth_;
abstract_task::id unique_id_;
abstract_task *volatile child_task_;
public:
abstract_task(const unsigned int depth, const abstract_task::id &unique_id) :
depth_{depth},
unique_id_{unique_id},
child_task_{nullptr} {}
virtual void execute() = 0;
void set_child(abstract_task *child_task) { child_task_ = child_task; }
abstract_task *child() const { return child_task_; }
void set_depth(unsigned int depth) { depth_ = depth; }
unsigned int depth() const { return depth_; }
id unique_id() const { return unique_id_; }
protected:
virtual bool internal_stealing(abstract_task *other_task) = 0;
virtual bool split_task(base::swmr_spin_lock *lock) = 0;
bool steal_work();
};
}
}
}
#endif //PLS_ABSTRACT_TASK_H
#ifndef PLS_PARALLEL_ITERATOR_TASK_H
#define PLS_PARALLEL_ITERATOR_TASK_H
#include "pls/internal/data_structures/stamped_integer.h"
#include "abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
using data_structures::stamped_integer;
template<typename RandomIt, typename Function>
class parallel_iterator_task : public abstract_task {
alignas(64) const int step = 8;
alignas(64) RandomIt first_, last_;
alignas(64) Function function_;
// External stealing
alignas(64) std::atomic<size_t> first_index_;
alignas(64) std::atomic<size_t> to_be_processed_;
alignas(64) std::atomic<stamped_integer> last_index_;
alignas(64) parallel_iterator_task *parent_;
bool steal_front(size_t &stolen_max_index);
bool steal_back(size_t &stolen_first_index, size_t &stolen_last_index);
protected:
bool internal_stealing(abstract_task *other_task) override;
bool split_task(base::swmr_spin_lock * /*lock*/) override;
public:
explicit parallel_iterator_task(RandomIt first, RandomIt last, Function function, const abstract_task::id &id);
parallel_iterator_task(const parallel_iterator_task &other);
void execute() override;
};
}
}
}
#include "parallel_iterator_task_impl.h"
#endif //PLS_PARALLEL_ITERATOR_TASK_H
#ifndef PLS_PARALLEL_ITERATOR_TASK_IMPL_H
#define PLS_PARALLEL_ITERATOR_TASK_IMPL_H
#include "scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename RandomIt, typename Function>
parallel_iterator_task<RandomIt, Function>::parallel_iterator_task
(RandomIt first, RandomIt last, Function function, const abstract_task::id &id):
abstract_task(0, id),
first_{first},
last_{last},
function_{function},
first_index_{0},
to_be_processed_{std::distance(first, last)},
last_index_{stamped_integer{0, std::distance(first, last)}},
parent_{nullptr} {}
template<typename RandomIt, typename Function>
parallel_iterator_task<RandomIt,
Function>::parallel_iterator_task(const pls::internal::scheduling::parallel_iterator_task<
RandomIt,
Function> &other):
abstract_task{other.depth(), other.unique_id()},
first_{other.first_},
last_{other.last_},
function_{other.function_},
first_index_{other.first_index_.load()},
to_be_processed_{other.to_be_processed_.load()},
last_index_{other.last_index_.load()},
parent_{other.parent_} {}
template<typename RandomIt, typename Function>
void parallel_iterator_task<RandomIt, Function>::execute() {
// Start processing at beginning of our data
size_t current_index = 0;
auto current_iterator = first_;
// Keep going as long as we have data
while (true) {
// Claim next chunk of data for us
size_t local_max_index;
if (!steal_front(local_max_index)) {
break;
}
// Process Chunk
for (; current_index != local_max_index; current_index++) {
function_(*(current_iterator++));
}
}
to_be_processed_ -= current_index;
while (to_be_processed_.load() > 0)
steal_work();
if (parent_ != nullptr) {
parent_->to_be_processed_ -= std::distance(first_, last_);
}
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::steal_front(size_t &stolen_max) {
auto local_first_index = first_index_.load();
auto local_last_index = last_index_.load();
if (local_first_index >= local_last_index.value) {
return false;
}
// Proceed the first index == take part of the work for us
auto new_first_index = std::min(local_first_index + step, local_last_index.value);
first_index_ = new_first_index;
// Reload last index
local_last_index = last_index_.load();
// Enough distance
if (new_first_index < local_last_index.value) {
stolen_max = new_first_index;
return true;
}
// Fight over last element
if (new_first_index == local_last_index.value) {
auto new_last_index = stamped_integer{local_last_index.stamp + 1, local_last_index.value};
if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) {
stolen_max = new_first_index;
return true;
}
}
// All iterator elements are assigned to some executor
return false;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::steal_back(size_t &stolen_first_index, size_t &stolen_last_index) {
auto local_first_index = first_index_.load();
auto local_last_index = last_index_.load();
if (local_first_index >= local_last_index.value) {
return false;
}
// Try to steal using cas
auto target_last_index = std::max(local_last_index.value - step, local_first_index);
auto new_last_index = stamped_integer{local_last_index.stamp + 1, target_last_index};
if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) {
stolen_first_index = new_last_index.value;
stolen_last_index = local_last_index.value;
return true;
}
return false;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::split_task(base::swmr_spin_lock *lock) {
auto depth = this->depth();
auto id = this->unique_id();
size_t stolen_first_index, stolen_last_index;
if (!steal_back(stolen_first_index, stolen_last_index)) {
lock->reader_unlock();
return false;
}
lock->reader_unlock();
parallel_iterator_task new_task{first_ + stolen_first_index, first_ + stolen_last_index, function_, id};
new_task.parent_ = this;
scheduler::execute_task(new_task, depth);
return true;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::internal_stealing(abstract_task */*other_task*/) {
// Do not allow for now, eases up on ABA problem
return false;
}
}
}
}
#endif //PLS_PARALLEL_ITERATOR_TASK_IMPL_H
#ifndef PLS_ROOT_MASTER_TASK_H
#define PLS_ROOT_MASTER_TASK_H
#include <mutex>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/swmr_spin_lock.h"
#include "abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class root_task : public abstract_task {
Function function_;
std::atomic_uint8_t finished_;
public:
static constexpr auto create_id = helpers::unique_id::create<root_task<Function>>;
explicit root_task(Function function) :
abstract_task{0, create_id()},
function_{function},
finished_{0} {}
root_task(const root_task &other) :
abstract_task{0, create_id()},
function_{other.function_},
finished_{0} {}
bool finished() {
return finished_;
}
void execute() override {
PROFILE_WORK_BLOCK("execute root_task");
function_();
finished_ = 1;
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false;
}
};
template<typename Function>
class root_worker_task : public abstract_task {
root_task<Function> *master_task_;
public:
static constexpr auto create_id = root_task<Function>::create_id;
explicit root_worker_task(root_task<Function> *master_task) :
abstract_task{0, create_id()},
master_task_{master_task} {}
void execute() override {
PROFILE_WORK_BLOCK("execute root_task");
do {
steal_work();
} while (!master_task_->finished());
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false;
}
};
}
}
}
#endif //PLS_ROOT_MASTER_TASK_H
#ifndef PLS_RUN_ON_N_THREADS_TASK_H
#define PLS_RUN_ON_N_THREADS_TASK_H
#include <mutex>
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/thread.h"
#include "abstract_task.h"
#include "thread_state.h"
#include "scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class run_on_n_threads_task : public abstract_task {
template<typename F>
friend
class run_on_n_threads_task_worker;
Function function_;
// Improvement: Remove lock and replace by atomic variable (performance)
int counter;
base::spin_lock counter_lock_;
int decrement_counter() {
std::lock_guard<base::spin_lock> lock{counter_lock_};
counter--;
return counter;
}
int get_counter() {
std::lock_guard<base::spin_lock> lock{counter_lock_};
return counter;
}
public:
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task<Function>>;
run_on_n_threads_task(Function function, int num_threads) :
abstract_task{0, create_id()},
function_{function},
counter{num_threads - 1} {}
void execute() override {
// Execute our function ONCE
function_();
// Steal until we are finished (other threads executed)
do {
steal_work();
} while (get_counter() > 0);
std::cout << "Finished Master!" << std::endl;
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::swmr_spin_lock *lock) override;
};
template<typename Function>
class run_on_n_threads_task_worker : public abstract_task {
Function function_;
run_on_n_threads_task<Function> *root_;
public:
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task_worker<Function>>;
run_on_n_threads_task_worker(Function function, run_on_n_threads_task<Function> *root) :
abstract_task{0, create_id()},
function_{function},
root_{root} {}
void execute() override {
if (root_->decrement_counter() >= 0) {
function_();
std::cout << "Finished Worker!" << std::endl;
} else {
std::cout << "Abandoned Worker!" << std::endl;
}
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false;
}
};
template<typename Function>
bool run_on_n_threads_task<Function>::split_task(base::swmr_spin_lock *lock) {
if (get_counter() <= 0) {
return false;
}
// In success case, unlock.
lock->reader_unlock();
auto scheduler = base::this_thread::state<thread_state>()->scheduler_;
auto task = run_on_n_threads_task_worker<Function>{function_, this};
scheduler->execute_task(task, depth());
return true;
}
template<typename Function>
run_on_n_threads_task<Function> create_run_on_n_threads_task(Function function, int num_threads) {
return run_on_n_threads_task<Function>{function, num_threads};
}
}
}
}
#endif //PLS_RUN_ON_N_THREADS_TASK_H
...@@ -12,9 +12,8 @@ ...@@ -12,9 +12,8 @@
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h" #include "pls/internal/base/barrier.h"
#include "thread_state.h" #include "pls/internal/scheduling/thread_state.h"
#include "root_task.h" #include "pls/internal/scheduling/scheduler_memory.h"
#include "scheduler_memory.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -24,6 +23,7 @@ void worker_routine(); ...@@ -24,6 +23,7 @@ void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>; using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler { class scheduler {
friend class task;
friend void worker_routine(); friend void worker_routine();
const unsigned int num_threads_; const unsigned int num_threads_;
...@@ -32,12 +32,26 @@ class scheduler { ...@@ -32,12 +32,26 @@ class scheduler {
base::barrier sync_barrier_; base::barrier sync_barrier_;
bool terminated_; bool terminated_;
public: public:
/**
* Initializes a scheduler instance with the given number of threads.
* This will spawn the threads and put them to sleep, ready to process an
* upcoming parallel section.
*
* @param memory All memory is allocated statically, thus the user is required to provide the memory instance.
* @param num_threads The number of worker threads to be created.
*/
explicit scheduler(scheduler_memory *memory, unsigned int num_threads); explicit scheduler(scheduler_memory *memory, unsigned int num_threads);
/**
* The scheduler is implicitly terminated as soon as it leaves the scope.
*/
~scheduler(); ~scheduler();
/** /**
* Wakes up the thread pool. * Wakes up the thread pool.
* Code inside the Function lambda can invoke all parallel APIs. * Code inside the Function lambda can invoke all parallel APIs.
* This is meant to cleanly sleep and wake up the scheduler during an application run,
* e.g. to run parallel code on a timer loop/after interrupts.
* *
* @param work_section generic function or lambda to be executed in the scheduler's context. * @param work_section generic function or lambda to be executed in the scheduler's context.
*/ */
...@@ -45,20 +59,38 @@ class scheduler { ...@@ -45,20 +59,38 @@ class scheduler {
void perform_work(Function work_section); void perform_work(Function work_section);
/** /**
* Executes a top-level-task (children of abstract_task) on this thread. * Explicitly terminate the worker threads. Scheduler must not be used after this.
* *
* @param task The task to be executed. * @param wait_for_workers Set to true if you wish to return from this method only after the workers are shut down.
* @param depth Optional: depth of the new task, otherwise set implicitly.
*/ */
template<typename Task> void terminate(bool wait_for_workers = true);
static void execute_task(Task &task, int depth = -1);
static abstract_task *current_task() { return base::this_thread::state<thread_state>()->current_task_; } /**
* Helper to spawn a child on the currently running task.
*
* @tparam T type of the new task
* @param sub_task the new task to be spawned
*/
template<typename T>
void spawn_child(T &sub_task) {
this_thread_state()->current_task_->spawn_child(sub_task);
}
void terminate(bool wait_for_workers = true); /**
* Helper to wait for all children of the currently executing task.
*/
void wait_for_all() {
this_thread_state()->current_task_->wait_for_all();
}
unsigned int num_threads() const { return num_threads_; } unsigned int num_threads() const { return num_threads_; }
private:
// Helpers for accessing thread states
thread_state *thread_state_for(size_t id) { return memory_->thread_state_for(id); } thread_state *thread_state_for(size_t id) { return memory_->thread_state_for(id); }
task *get_local_task();
task *steal_task();
}; };
} }
......
...@@ -34,41 +34,6 @@ void scheduler::perform_work(Function work_section) { ...@@ -34,41 +34,6 @@ void scheduler::perform_work(Function work_section) {
} }
} }
template<typename Task>
void scheduler::execute_task(Task &task, int depth) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
auto my_state = base::this_thread::state<thread_state>();
abstract_task *old_task;
abstract_task *new_task;
// Init Task
old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
{
my_state->lock_.writer_lock();
my_state->current_task_ = new_task;
old_task->set_child(new_task);
my_state->lock_.writer_unlock();
}
// Run Task
new_task->execute();
// Teardown state back to before the task was executed
my_state->task_stack_->pop<Task>();
{
my_state->lock_.writer_lock();
old_task->set_child(nullptr);
my_state->current_task_ = old_task;
my_state->lock_.writer_unlock();
}
}
} }
} }
} }
......
...@@ -14,46 +14,39 @@ namespace pls { ...@@ -14,46 +14,39 @@ namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
class fork_join_task; class task {
class fork_join_sub_task { friend class scheduler;
friend class fork_join_task;
// Coordinate finishing of sub_tasks // Coordinate finishing of sub_tasks
std::atomic_uint32_t ref_count_; std::atomic<unsigned int> ref_count_;
fork_join_sub_task *parent_; task *parent_;
// Access to TBB scheduling environment
fork_join_task *tbb_task_;
bool executed = false;
int executed_at = -1;
// Stack Management (reset stack pointer after wait_for_all() calls) // Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::work_stealing_deque<fork_join_sub_task>::state deque_state_; data_structures::work_stealing_deque<task>::state deque_state_;
protected: protected:
explicit fork_join_sub_task(); // TODO: Double Check with copy and move constructors, try to minimize overhead while keeping a clean API.
fork_join_sub_task(const fork_join_sub_task &other); explicit task();
task(const task &other);
// Overwritten with behaviour of child tasks /**
* Overwrite this with the actual behaviour of concrete tasks.
*/
virtual void execute_internal() = 0; virtual void execute_internal() = 0;
public:
// Only use them when actually executing this sub_task (only public for simpler API design)
template<typename T> template<typename T>
void spawn_child(T &sub_task); void spawn_child(T &sub_task);
void wait_for_all(); void wait_for_all();
static fork_join_sub_task *current();
private: private:
void execute(); void execute();
}; };
template<typename Function> template<typename Function>
class fork_join_lambda_by_reference : public fork_join_sub_task { class fork_join_lambda_by_reference : public task {
const Function &function_; const Function &function_;
public: public:
explicit fork_join_lambda_by_reference(const Function &function) : fork_join_sub_task{}, function_{function} {}; explicit fork_join_lambda_by_reference(const Function &function) : task{}, function_{function} {};
protected: protected:
void execute_internal() override { void execute_internal() override {
...@@ -62,11 +55,11 @@ class fork_join_lambda_by_reference : public fork_join_sub_task { ...@@ -62,11 +55,11 @@ class fork_join_lambda_by_reference : public fork_join_sub_task {
}; };
template<typename Function> template<typename Function>
class fork_join_lambda_by_value : public fork_join_sub_task { class fork_join_lambda_by_value : public task {
const Function function_; const Function function_;
public: public:
explicit fork_join_lambda_by_value(const Function &function) : fork_join_sub_task{}, function_{function} {}; explicit fork_join_lambda_by_value(const Function &function) : task{}, function_{function} {};
protected: protected:
void execute_internal() override { void execute_internal() override {
...@@ -74,46 +67,21 @@ class fork_join_lambda_by_value : public fork_join_sub_task { ...@@ -74,46 +67,21 @@ class fork_join_lambda_by_value : public fork_join_sub_task {
} }
}; };
class fork_join_task : public abstract_task {
friend class fork_join_sub_task;
fork_join_sub_task *root_task_;
fork_join_sub_task *currently_executing_;
// Double-Ended Queue management
data_structures::work_stealing_deque<fork_join_sub_task> deque_;
// Steal Management
fork_join_sub_task *last_stolen_;
fork_join_sub_task *get_local_sub_task();
fork_join_sub_task *get_stolen_sub_task();
bool internal_stealing(abstract_task *other_task) override;
bool split_task(base::swmr_spin_lock * /*lock*/) override;
public:
explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id);
void execute() override;
fork_join_sub_task *currently_executing() const;
};
template<typename T> template<typename T>
void fork_join_sub_task::spawn_child(T &task) { void task::spawn_child(T &sub_task) {
PROFILE_FORK_JOIN_STEALING("spawn_child") PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<fork_join_sub_task, T>::value, "Only pass fork_join_sub_task subclasses!"); static_assert(std::is_base_of<sub_task, T>::value, "Only pass task subclasses!");
// Keep our refcount up to date // Keep our refcount up to date
ref_count_++; ref_count_++;
// Assign forced values // Assign forced values
task.parent_ = this; sub_task.parent_ = this;
task.tbb_task_ = tbb_task_; sub_task.deque_state_ = scheduler::this_thread_state()->deque_.save_state();
task.deque_state_ = tbb_task_->deque_.save_state();
// Push on our deque // Push on our deque
const T const_task = task; const T const_task = sub_task;
tbb_task_->deque_.push_tail(const_task); scheduler::this_thread_state()->deque_.push_tail(const_task);
} }
} }
......
...@@ -5,8 +5,8 @@ ...@@ -5,8 +5,8 @@
#include <random> #include <random>
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/swmr_spin_lock.h" #include "pls/internal/data_structures/deque.h"
#include "abstract_task.h" #include "pls/internal/scheduling/task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -17,11 +17,11 @@ class scheduler; ...@@ -17,11 +17,11 @@ class scheduler;
struct thread_state { struct thread_state {
alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_; alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_;
alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *root_task_; alignas(base::system_details::CACHE_LINE_SIZE) task *root_task_;
alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_; alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_;
alignas(base::system_details::CACHE_LINE_SIZE) data_structures::deque<task> deque_;
alignas(base::system_details::CACHE_LINE_SIZE) size_t id_; alignas(base::system_details::CACHE_LINE_SIZE) size_t id_;
alignas(base::system_details::CACHE_LINE_SIZE) base::swmr_spin_lock lock_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
thread_state() : thread_state() :
...@@ -29,8 +29,8 @@ struct thread_state { ...@@ -29,8 +29,8 @@ struct thread_state {
root_task_{nullptr}, root_task_{nullptr},
current_task_{nullptr}, current_task_{nullptr},
task_stack_{nullptr}, task_stack_{nullptr},
deque_{task_stack_},
id_{0}, id_{0},
lock_{},
random_{id_} {}; random_{id_} {};
thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) : thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) :
...@@ -38,9 +38,23 @@ struct thread_state { ...@@ -38,9 +38,23 @@ struct thread_state {
root_task_{nullptr}, root_task_{nullptr},
current_task_{nullptr}, current_task_{nullptr},
task_stack_{task_stack}, task_stack_{task_stack},
deque_{task_stack_},
id_{id}, id_{id},
lock_{},
random_{id_} {} random_{id_} {}
/**
* Convenience helper to get the thread_state instance associated with this thread.
* Must only be called on threads that are associated with a thread_state,
* this will most likely be threads created by the scheduler.
*
* @return The thread_state of this thread.
*/
static thread_state *get() { return base::this_thread::state<thread_state>(); }
static scheduler *scheduler() { return get()->scheduler_; }
static task *current_task() { return get()->current_task_; }
static void set_current_task(task *task) { get()->current_task_ = task; };
static data_structures::aligned_stack *task_stack() { return get()->task_stack_; }
static data_structures::deque<task> *deque() { return get()->deque_; }
}; };
} }
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
#include "pls/algorithms/invoke_parallel.h" #include "pls/algorithms/invoke_parallel.h"
#include "pls/algorithms/parallel_for.h" #include "pls/algorithms/parallel_for.h"
#include "pls/internal/scheduling/abstract_task.h" #include "pls/internal/scheduling/abstract_task.h"
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
...@@ -18,10 +18,10 @@ using task_id = internal::scheduling::abstract_task::id; ...@@ -18,10 +18,10 @@ using task_id = internal::scheduling::abstract_task::id;
using unique_id = internal::helpers::unique_id; using unique_id = internal::helpers::unique_id;
using internal::scheduling::fork_join_sub_task; using internal::scheduling::task;
using internal::scheduling::fork_join_lambda_by_reference; using internal::scheduling::fork_join_lambda_by_reference;
using internal::scheduling::fork_join_lambda_by_value; using internal::scheduling::fork_join_lambda_by_value;
using internal::scheduling::fork_join_task; using internal::scheduling::task;
using algorithm::invoke_parallel; using algorithm::invoke_parallel;
using algorithm::parallel_for_fork_join; using algorithm::parallel_for_fork_join;
......
#include <pls/internal/base/backoff.h>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/abstract_task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
bool abstract_task::steal_work() {
// thread_local static base::backoff backoff{};
PROFILE_STEALING("abstract_task::steal_work")
const auto my_state = base::this_thread::state<thread_state>();
const auto my_scheduler = my_state->scheduler_;
const size_t my_id = my_state->id_;
const size_t offset = my_state->random_() % my_scheduler->num_threads();
const size_t max_tries = my_scheduler->num_threads(); // TODO: Tune this value
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % my_scheduler->num_threads();
if (target == my_id) {
continue;
}
auto target_state = my_scheduler->thread_state_for(target);
if (!target_state->lock_.reader_try_lock()) {
continue;
}
// Dig down to our level
PROFILE_STEALING("Go to our level")
abstract_task *current_task = target_state->root_task_;
while (current_task != nullptr && current_task->depth() < depth()) {
current_task = current_task->child();
}
PROFILE_END_BLOCK
// Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation
PROFILE_STEALING("Internal Steal")
if (current_task != nullptr) {
// See if it equals our type and depth of task
if (current_task->unique_id_ == unique_id_ &&
current_task->depth_ == depth_) {
if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler
target_state->lock_.reader_unlock();
// backoff.reset();
return true;
}
// No success, we need to steal work from a deeper level using 'top level task stealing'
current_task = current_task->child();
}
}
PROFILE_END_BLOCK;
// Execute 'top level task steal' if possible
// (only try deeper tasks to keep depth restricted stealing).
PROFILE_STEALING("Top Level Steal")
while (current_task != nullptr) {
auto lock = &target_state->lock_;
if (current_task->split_task(lock)) {
// top level steal was a success (we did a top level task steal)
// backoff.reset();
return false;
}
current_task = current_task->child_task_;
}
PROFILE_END_BLOCK;
target_state->lock_.reader_unlock();
}
// internal steal was no success
// backoff.do_backoff();
// base::this_thread::sleep(5);
return false;
}
}
}
}
#include "pls/internal/scheduling/parallel_iterator_task.h"
#include "pls/internal/scheduling/root_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/run_on_n_threads_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
fork_join_sub_task::fork_join_sub_task() : task::task() :
ref_count_{0}, ref_count_{0},
parent_{nullptr}, parent_{nullptr},
tbb_task_{nullptr},
deque_state_{0} {} deque_state_{0} {}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) : task::task(const task &other) :
ref_count_{0}, ref_count_{0},
parent_{other.parent_}, parent_{other.parent_},
tbb_task_{other.tbb_task_},
deque_state_{other.deque_state_} {} deque_state_{other.deque_state_} {}
void fork_join_sub_task::execute() { void task::execute() {
PROFILE_WORK_BLOCK("execute sub_task") {
auto last_executing = tbb_task_->currently_executing_; PROFILE_WORK_BLOCK("execute task")
tbb_task_->currently_executing_ = this; auto last_executing = thread_state::get()->current_task_;
execute_internal(); thread_state::get()->current_task_ = this;
tbb_task_->currently_executing_ = last_executing;
PROFILE_END_BLOCK execute_internal();
thread_state::get()->current_task_ = last_executing;
}
wait_for_all(); wait_for_all();
if (parent_ != nullptr) { if (parent_ != nullptr) {
...@@ -33,10 +36,10 @@ void fork_join_sub_task::execute() { ...@@ -33,10 +36,10 @@ void fork_join_sub_task::execute() {
} }
} }
void fork_join_sub_task::wait_for_all() { void task::wait_for_all() {
while (ref_count_ > 0) { while (ref_count_ > 0) {
PROFILE_STEALING("get local sub task") PROFILE_STEALING("get local sub task")
fork_join_sub_task *local_task = tbb_task_->get_local_sub_task(); task *local_task = thread_state::get()->scheduler_->get_local_task();
PROFILE_END_BLOCK PROFILE_END_BLOCK
if (local_task != nullptr) { if (local_task != nullptr) {
local_task->execute(); local_task->execute();
...@@ -44,37 +47,25 @@ void fork_join_sub_task::wait_for_all() { ...@@ -44,37 +47,25 @@ void fork_join_sub_task::wait_for_all() {
// Try to steal work. // Try to steal work.
// External steal will be executed implicitly if success // External steal will be executed implicitly if success
PROFILE_STEALING("steal work") PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work(); task *stolen_task = thread_state::get()->scheduler_->steal_task();
PROFILE_END_BLOCK PROFILE_END_BLOCK
if (internal_steal_success) { if (stolen_task != nullptr) {
tbb_task_->last_stolen_->execute(); stolen_task->execute();
} }
} }
} }
tbb_task_->deque_.release_memory_until(deque_state_); tbb_task_->deque_.release_memory_until(deque_state_);
} }
fork_join_sub_task *fork_join_task::get_local_sub_task() { bool task::internal_stealing(abstract_task *other_task) {
return deque_.pop_tail(); PROFILE_STEALING("task::internal_stealin")
} auto cast_other_task = reinterpret_cast<task *>(other_task);
fork_join_sub_task *fork_join_task::get_stolen_sub_task() {
return deque_.pop_head();
}
fork_join_sub_task *fork_join_sub_task::current() {
return dynamic_cast<fork_join_task *>(scheduler::current_task())->currently_executing();
}
bool fork_join_task::internal_stealing(abstract_task *other_task) {
PROFILE_STEALING("fork_join_task::internal_stealin")
auto cast_other_task = reinterpret_cast<fork_join_task *>(other_task);
auto stolen_sub_task = cast_other_task->get_stolen_sub_task(); auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
if (stolen_sub_task == nullptr) { if (stolen_sub_task == nullptr) {
return false; return false;
} else { } else {
// Make sub-task belong to our fork_join_task instance // Make sub-task belong to our task instance
stolen_sub_task->tbb_task_ = this; stolen_sub_task->tbb_task_ = this;
stolen_sub_task->deque_state_ = deque_.save_state(); stolen_sub_task->deque_state_ = deque_.save_state();
// We will execute this next without explicitly moving it onto our stack storage // We will execute this next without explicitly moving it onto our stack storage
...@@ -84,13 +75,13 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) { ...@@ -84,13 +75,13 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) {
} }
} }
bool fork_join_task::split_task(base::swmr_spin_lock *lock) { bool task::split_task(base::swmr_spin_lock *lock) {
PROFILE_STEALING("fork_join_task::split_task") PROFILE_STEALING("task::split_task")
fork_join_sub_task *stolen_sub_task = get_stolen_sub_task(); fork_join_sub_task *stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) { if (stolen_sub_task == nullptr) {
return false; return false;
} }
fork_join_task task{stolen_sub_task, this->unique_id()}; task task{stolen_sub_task, this->unique_id()};
// In success case, unlock. // In success case, unlock.
lock->reader_unlock(); lock->reader_unlock();
...@@ -99,8 +90,8 @@ bool fork_join_task::split_task(base::swmr_spin_lock *lock) { ...@@ -99,8 +90,8 @@ bool fork_join_task::split_task(base::swmr_spin_lock *lock) {
return true; return true;
} }
void fork_join_task::execute() { void task::execute() {
PROFILE_WORK_BLOCK("execute fork_join_task"); PROFILE_WORK_BLOCK("execute task");
// Bind this instance to our OS thread // Bind this instance to our OS thread
// TODO: See if we did this right // TODO: See if we did this right
...@@ -114,16 +105,6 @@ void fork_join_task::execute() { ...@@ -114,16 +105,6 @@ void fork_join_task::execute() {
root_task_->execute(); root_task_->execute();
} }
fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; }
fork_join_task::fork_join_task(fork_join_sub_task *root_task,
const abstract_task::id &id) :
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
deque_{base::this_thread::state<thread_state>()->task_stack_},
last_stolen_{nullptr} {}
} }
} }
} }
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment