diff --git a/app/benchmark_unbalanced/main.cpp b/app/benchmark_unbalanced/main.cpp index 1860877..603bd78 100644 --- a/app/benchmark_unbalanced/main.cpp +++ b/app/benchmark_unbalanced/main.cpp @@ -19,7 +19,7 @@ int count_child_nodes(uts::node &node) { return child_count; } - auto current_task = pls::fork_join_sub_task::current(); + auto current_task = pls::task::current(); std::vector results(children.size()); for (size_t i = 0; i < children.size(); i++) { size_t index = i; @@ -45,7 +45,7 @@ int unbalanced_tree_search(int seed, int root_children, double q, int normal_chi }; pls::fork_join_lambda_by_reference task(lambda); pls::fork_join_lambda_by_reference sub_task(lambda); - pls::fork_join_task root_task{&sub_task, id}; + pls::task root_task{&sub_task, id}; pls::scheduler::execute_task(root_task); return result; diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index d834402..42a1c06 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -29,15 +29,14 @@ add_library(pls STATIC include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/unique_id.h - include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp - include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp - include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp + src/internal/scheduling/root_task.cpp + include/pls/internal/scheduling/thread_state.h + src/internal/scheduling/abstract_task.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h - include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp - include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp + src/internal/scheduling/run_on_n_threads_task.cpp + include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp - include/pls/internal/scheduling/parallel_iterator_task.h include/pls/internal/scheduling/parallel_iterator_task_impl.h src/internal/scheduling/parallel_iterator_task.cpp) # Add everything in `./include` to be in the include path of this project diff --git a/lib/pls/include/pls/algorithms/invoke_parallel.h b/lib/pls/include/pls/algorithms/invoke_parallel.h index 17b439e..e311a71 100644 --- a/lib/pls/include/pls/algorithms/invoke_parallel.h +++ b/lib/pls/include/pls/algorithms/invoke_parallel.h @@ -2,7 +2,7 @@ #ifndef PLS_PARALLEL_INVOKE_H #define PLS_PARALLEL_INVOKE_H -#include "pls/internal/scheduling/fork_join_task.h" +#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/scheduler.h" namespace pls { diff --git a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h index 9bfa185..ec89a0f 100644 --- a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h +++ b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h @@ -2,8 +2,8 @@ #ifndef PLS_INVOKE_PARALLEL_IMPL_H #define PLS_INVOKE_PARALLEL_IMPL_H -#include -#include "pls/internal/scheduling/fork_join_task.h" +#include +#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/helpers/unique_id.h" #include "pls/internal/base/alignment.h" @@ -23,7 +23,7 @@ inline void run_body(const Body &internal_body, const abstract_task::id &id) { internal_body(); } else { fork_join_lambda_by_reference root_body(internal_body); - fork_join_task root_task{&root_body, id}; + task root_task{&root_body, id}; scheduler::execute_task(root_task); } } @@ -37,7 +37,7 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2) { static abstract_task::id id = unique_id::create(); auto internal_body = [&]() { - auto current_task = fork_join_sub_task::current(); + auto current_task = task::current(); auto sub_task_2 = fork_join_lambda_by_reference(function2); current_task->spawn_child(sub_task_2); @@ -55,7 +55,7 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2, con static abstract_task::id id = unique_id::create(); auto internal_body = [&]() { - auto current_task = fork_join_sub_task::current(); + auto current_task = task::current(); auto sub_task_2 = fork_join_lambda_by_reference(function2); auto sub_task_3 = fork_join_lambda_by_reference(function3); diff --git a/lib/pls/include/pls/algorithms/parallel_for_impl.h b/lib/pls/include/pls/algorithms/parallel_for_impl.h index 5b79468..f6caa97 100644 --- a/lib/pls/include/pls/algorithms/parallel_for_impl.h +++ b/lib/pls/include/pls/algorithms/parallel_for_impl.h @@ -2,7 +2,7 @@ #ifndef PLS_PARALLEL_FOR_IMPL_H #define PLS_PARALLEL_FOR_IMPL_H -#include "pls/internal/scheduling/fork_join_task.h" +#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/parallel_iterator_task.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h" @@ -31,10 +31,10 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) { auto body = [=] { internal::parallel_for(first + middle_index, last, function); }; fork_join_lambda_by_reference second_half_task(body); - fork_join_sub_task::current()->spawn_child(second_half_task); + task::current()->spawn_child(second_half_task); parallel_for(first, first + middle_index, function); - fork_join_sub_task::current()->wait_for_all(); + task::current()->wait_for_all(); } } } @@ -59,7 +59,7 @@ void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &funct auto body = [=] { internal::parallel_for(first, last, function); }; fork_join_lambda_by_reference root_body(body); - fork_join_task root_task{&root_body, id}; + task root_task{&root_body, id}; scheduler::execute_task(root_task); } diff --git a/lib/pls/include/pls/internal/scheduling/abstract_task.h b/lib/pls/include/pls/internal/scheduling/abstract_task.h deleted file mode 100644 index 21d7357..0000000 --- a/lib/pls/include/pls/internal/scheduling/abstract_task.h +++ /dev/null @@ -1,45 +0,0 @@ - -#ifndef PLS_ABSTRACT_TASK_H -#define PLS_ABSTRACT_TASK_H - -#include "pls/internal/base/swmr_spin_lock.h" -#include "pls/internal/helpers/unique_id.h" - -namespace pls { -namespace internal { -namespace scheduling { - -class abstract_task { - public: - using id = helpers::unique_id; - - private: - unsigned int depth_; - abstract_task::id unique_id_; - abstract_task *volatile child_task_; - - public: - abstract_task(const unsigned int depth, const abstract_task::id &unique_id) : - depth_{depth}, - unique_id_{unique_id}, - child_task_{nullptr} {} - - virtual void execute() = 0; - void set_child(abstract_task *child_task) { child_task_ = child_task; } - abstract_task *child() const { return child_task_; } - - void set_depth(unsigned int depth) { depth_ = depth; } - unsigned int depth() const { return depth_; } - id unique_id() const { return unique_id_; } - protected: - virtual bool internal_stealing(abstract_task *other_task) = 0; - virtual bool split_task(base::swmr_spin_lock *lock) = 0; - - bool steal_work(); -}; - -} -} -} - -#endif //PLS_ABSTRACT_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h deleted file mode 100644 index 304df79..0000000 --- a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h +++ /dev/null @@ -1,45 +0,0 @@ - -#ifndef PLS_PARALLEL_ITERATOR_TASK_H -#define PLS_PARALLEL_ITERATOR_TASK_H - -#include "pls/internal/data_structures/stamped_integer.h" -#include "abstract_task.h" - -namespace pls { -namespace internal { -namespace scheduling { - -using data_structures::stamped_integer; - -template -class parallel_iterator_task : public abstract_task { - alignas(64) const int step = 8; - - alignas(64) RandomIt first_, last_; - alignas(64) Function function_; - - // External stealing - alignas(64) std::atomic first_index_; - alignas(64) std::atomic to_be_processed_; - alignas(64) std::atomic last_index_; - - alignas(64) parallel_iterator_task *parent_; - - bool steal_front(size_t &stolen_max_index); - bool steal_back(size_t &stolen_first_index, size_t &stolen_last_index); - - protected: - bool internal_stealing(abstract_task *other_task) override; - bool split_task(base::swmr_spin_lock * /*lock*/) override; - - public: - explicit parallel_iterator_task(RandomIt first, RandomIt last, Function function, const abstract_task::id &id); - parallel_iterator_task(const parallel_iterator_task &other); - void execute() override; -}; -} -} -} -#include "parallel_iterator_task_impl.h" - -#endif //PLS_PARALLEL_ITERATOR_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h deleted file mode 100644 index f3a2026..0000000 --- a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h +++ /dev/null @@ -1,144 +0,0 @@ - -#ifndef PLS_PARALLEL_ITERATOR_TASK_IMPL_H -#define PLS_PARALLEL_ITERATOR_TASK_IMPL_H - -#include "scheduler.h" -namespace pls { -namespace internal { -namespace scheduling { -template -parallel_iterator_task::parallel_iterator_task - (RandomIt first, RandomIt last, Function function, const abstract_task::id &id): - abstract_task(0, id), - first_{first}, - last_{last}, - function_{function}, - first_index_{0}, - to_be_processed_{std::distance(first, last)}, - last_index_{stamped_integer{0, std::distance(first, last)}}, - parent_{nullptr} {} - -template -parallel_iterator_task::parallel_iterator_task(const pls::internal::scheduling::parallel_iterator_task< - RandomIt, - Function> &other): - abstract_task{other.depth(), other.unique_id()}, - first_{other.first_}, - last_{other.last_}, - function_{other.function_}, - first_index_{other.first_index_.load()}, - to_be_processed_{other.to_be_processed_.load()}, - last_index_{other.last_index_.load()}, - parent_{other.parent_} {} - -template -void parallel_iterator_task::execute() { - // Start processing at beginning of our data - size_t current_index = 0; - auto current_iterator = first_; - - // Keep going as long as we have data - while (true) { - // Claim next chunk of data for us - size_t local_max_index; - if (!steal_front(local_max_index)) { - break; - } - - // Process Chunk - for (; current_index != local_max_index; current_index++) { - function_(*(current_iterator++)); - } - } - - to_be_processed_ -= current_index; - while (to_be_processed_.load() > 0) - steal_work(); - if (parent_ != nullptr) { - parent_->to_be_processed_ -= std::distance(first_, last_); - } -} - -template -bool parallel_iterator_task::steal_front(size_t &stolen_max) { - auto local_first_index = first_index_.load(); - auto local_last_index = last_index_.load(); - - if (local_first_index >= local_last_index.value) { - return false; - } - - // Proceed the first index == take part of the work for us - auto new_first_index = std::min(local_first_index + step, local_last_index.value); - first_index_ = new_first_index; - // Reload last index - local_last_index = last_index_.load(); - // Enough distance - if (new_first_index < local_last_index.value) { - stolen_max = new_first_index; - return true; - } - - // Fight over last element - if (new_first_index == local_last_index.value) { - auto new_last_index = stamped_integer{local_last_index.stamp + 1, local_last_index.value}; - if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) { - stolen_max = new_first_index; - return true; - } - } - - // All iterator elements are assigned to some executor - return false; -} - -template -bool parallel_iterator_task::steal_back(size_t &stolen_first_index, size_t &stolen_last_index) { - auto local_first_index = first_index_.load(); - auto local_last_index = last_index_.load(); - - if (local_first_index >= local_last_index.value) { - return false; - } - - // Try to steal using cas - auto target_last_index = std::max(local_last_index.value - step, local_first_index); - auto new_last_index = stamped_integer{local_last_index.stamp + 1, target_last_index}; - if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) { - stolen_first_index = new_last_index.value; - stolen_last_index = local_last_index.value; - return true; - } - - return false; -} - -template -bool parallel_iterator_task::split_task(base::swmr_spin_lock *lock) { - auto depth = this->depth(); - auto id = this->unique_id(); - size_t stolen_first_index, stolen_last_index; - if (!steal_back(stolen_first_index, stolen_last_index)) { - lock->reader_unlock(); - return false; - } - - lock->reader_unlock(); - parallel_iterator_task new_task{first_ + stolen_first_index, first_ + stolen_last_index, function_, id}; - new_task.parent_ = this; - scheduler::execute_task(new_task, depth); - - return true; -} - -template -bool parallel_iterator_task::internal_stealing(abstract_task */*other_task*/) { - // Do not allow for now, eases up on ABA problem - return false; -} -} -} -} - -#endif //PLS_PARALLEL_ITERATOR_TASK_IMPL_H diff --git a/lib/pls/include/pls/internal/scheduling/root_task.h b/lib/pls/include/pls/internal/scheduling/root_task.h deleted file mode 100644 index 32a6ea2..0000000 --- a/lib/pls/include/pls/internal/scheduling/root_task.h +++ /dev/null @@ -1,82 +0,0 @@ - -#ifndef PLS_ROOT_MASTER_TASK_H -#define PLS_ROOT_MASTER_TASK_H - -#include - -#include "pls/internal/helpers/profiler.h" -#include "pls/internal/base/swmr_spin_lock.h" - -#include "abstract_task.h" - -namespace pls { -namespace internal { -namespace scheduling { - -template -class root_task : public abstract_task { - Function function_; - std::atomic_uint8_t finished_; - public: - static constexpr auto create_id = helpers::unique_id::create>; - - explicit root_task(Function function) : - abstract_task{0, create_id()}, - function_{function}, - finished_{0} {} - root_task(const root_task &other) : - abstract_task{0, create_id()}, - function_{other.function_}, - finished_{0} {} - - bool finished() { - return finished_; - } - - void execute() override { - PROFILE_WORK_BLOCK("execute root_task"); - function_(); - finished_ = 1; - } - - bool internal_stealing(abstract_task * /*other_task*/) override { - return false; - } - - bool split_task(base::swmr_spin_lock * /*lock*/) override { - return false; - } -}; - -template -class root_worker_task : public abstract_task { - root_task *master_task_; - - public: - static constexpr auto create_id = root_task::create_id; - - explicit root_worker_task(root_task *master_task) : - abstract_task{0, create_id()}, - master_task_{master_task} {} - - void execute() override { - PROFILE_WORK_BLOCK("execute root_task"); - do { - steal_work(); - } while (!master_task_->finished()); - } - - bool internal_stealing(abstract_task * /*other_task*/) override { - return false; - } - - bool split_task(base::swmr_spin_lock * /*lock*/) override { - return false; - } -}; - -} -} -} - -#endif //PLS_ROOT_MASTER_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h deleted file mode 100644 index f412cbd..0000000 --- a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h +++ /dev/null @@ -1,120 +0,0 @@ - -#ifndef PLS_RUN_ON_N_THREADS_TASK_H -#define PLS_RUN_ON_N_THREADS_TASK_H - -#include - -#include "pls/internal/base/spin_lock.h" -#include "pls/internal/base/thread.h" - -#include "abstract_task.h" -#include "thread_state.h" -#include "scheduler.h" - -namespace pls { -namespace internal { -namespace scheduling { - -template -class run_on_n_threads_task : public abstract_task { - template - friend - class run_on_n_threads_task_worker; - - Function function_; - - // Improvement: Remove lock and replace by atomic variable (performance) - int counter; - base::spin_lock counter_lock_; - - int decrement_counter() { - std::lock_guard lock{counter_lock_}; - counter--; - return counter; - } - - int get_counter() { - std::lock_guard lock{counter_lock_}; - return counter; - } - public: - static constexpr auto create_id = helpers::unique_id::create>; - - run_on_n_threads_task(Function function, int num_threads) : - abstract_task{0, create_id()}, - function_{function}, - counter{num_threads - 1} {} - - void execute() override { - // Execute our function ONCE - function_(); - - // Steal until we are finished (other threads executed) - do { - steal_work(); - } while (get_counter() > 0); - - std::cout << "Finished Master!" << std::endl; - } - - bool internal_stealing(abstract_task * /*other_task*/) override { - return false; - } - - bool split_task(base::swmr_spin_lock *lock) override; -}; - -template -class run_on_n_threads_task_worker : public abstract_task { - Function function_; - run_on_n_threads_task *root_; - public: - static constexpr auto create_id = helpers::unique_id::create>; - - run_on_n_threads_task_worker(Function function, run_on_n_threads_task *root) : - abstract_task{0, create_id()}, - function_{function}, - root_{root} {} - - void execute() override { - if (root_->decrement_counter() >= 0) { - function_(); - std::cout << "Finished Worker!" << std::endl; - } else { - std::cout << "Abandoned Worker!" << std::endl; - } - } - - bool internal_stealing(abstract_task * /*other_task*/) override { - return false; - } - - bool split_task(base::swmr_spin_lock * /*lock*/) override { - return false; - } -}; - -template -bool run_on_n_threads_task::split_task(base::swmr_spin_lock *lock) { - if (get_counter() <= 0) { - return false; - } - // In success case, unlock. - lock->reader_unlock(); - - auto scheduler = base::this_thread::state()->scheduler_; - auto task = run_on_n_threads_task_worker{function_, this}; - scheduler->execute_task(task, depth()); - return true; -} - -template -run_on_n_threads_task create_run_on_n_threads_task(Function function, int num_threads) { - return run_on_n_threads_task{function, num_threads}; -} - -} -} -} - -#endif //PLS_RUN_ON_N_THREADS_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 583262a..28a9f09 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -12,9 +12,8 @@ #include "pls/internal/base/thread.h" #include "pls/internal/base/barrier.h" -#include "thread_state.h" -#include "root_task.h" -#include "scheduler_memory.h" +#include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/scheduler_memory.h" namespace pls { namespace internal { @@ -24,6 +23,7 @@ void worker_routine(); using scheduler_thread = base::thread; class scheduler { + friend class task; friend void worker_routine(); const unsigned int num_threads_; @@ -32,12 +32,26 @@ class scheduler { base::barrier sync_barrier_; bool terminated_; public: + /** + * Initializes a scheduler instance with the given number of threads. + * This will spawn the threads and put them to sleep, ready to process an + * upcoming parallel section. + * + * @param memory All memory is allocated statically, thus the user is required to provide the memory instance. + * @param num_threads The number of worker threads to be created. + */ explicit scheduler(scheduler_memory *memory, unsigned int num_threads); + + /** + * The scheduler is implicitly terminated as soon as it leaves the scope. + */ ~scheduler(); /** * Wakes up the thread pool. * Code inside the Function lambda can invoke all parallel APIs. + * This is meant to cleanly sleep and wake up the scheduler during an application run, + * e.g. to run parallel code on a timer loop/after interrupts. * * @param work_section generic function or lambda to be executed in the scheduler's context. */ @@ -45,20 +59,38 @@ class scheduler { void perform_work(Function work_section); /** - * Executes a top-level-task (children of abstract_task) on this thread. + * Explicitly terminate the worker threads. Scheduler must not be used after this. * - * @param task The task to be executed. - * @param depth Optional: depth of the new task, otherwise set implicitly. + * @param wait_for_workers Set to true if you wish to return from this method only after the workers are shut down. */ - template - static void execute_task(Task &task, int depth = -1); + void terminate(bool wait_for_workers = true); - static abstract_task *current_task() { return base::this_thread::state()->current_task_; } + /** + * Helper to spawn a child on the currently running task. + * + * @tparam T type of the new task + * @param sub_task the new task to be spawned + */ + template + void spawn_child(T &sub_task) { + this_thread_state()->current_task_->spawn_child(sub_task); + } - void terminate(bool wait_for_workers = true); + /** + * Helper to wait for all children of the currently executing task. + */ + void wait_for_all() { + this_thread_state()->current_task_->wait_for_all(); + } unsigned int num_threads() const { return num_threads_; } + + private: + // Helpers for accessing thread states thread_state *thread_state_for(size_t id) { return memory_->thread_state_for(id); } + + task *get_local_task(); + task *steal_task(); }; } diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 0af46c7..f345d49 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -34,41 +34,6 @@ void scheduler::perform_work(Function work_section) { } } -template -void scheduler::execute_task(Task &task, int depth) { - static_assert(std::is_base_of::value, "Only pass abstract_task subclasses!"); - - auto my_state = base::this_thread::state(); - abstract_task *old_task; - abstract_task *new_task; - - // Init Task - old_task = my_state->current_task_; - new_task = my_state->task_stack_->push(task); - - new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); - - { - my_state->lock_.writer_lock(); - my_state->current_task_ = new_task; - old_task->set_child(new_task); - my_state->lock_.writer_unlock(); - } - - // Run Task - new_task->execute(); - - // Teardown state back to before the task was executed - my_state->task_stack_->pop(); - - { - my_state->lock_.writer_lock(); - old_task->set_child(nullptr); - my_state->current_task_ = old_task; - my_state->lock_.writer_unlock(); - } -} - } } } diff --git a/lib/pls/include/pls/internal/scheduling/fork_join_task.h b/lib/pls/include/pls/internal/scheduling/task.h similarity index 55% rename from lib/pls/include/pls/internal/scheduling/fork_join_task.h rename to lib/pls/include/pls/internal/scheduling/task.h index 33a6c2f..68c856e 100644 --- a/lib/pls/include/pls/internal/scheduling/fork_join_task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -14,46 +14,39 @@ namespace pls { namespace internal { namespace scheduling { -class fork_join_task; -class fork_join_sub_task { - friend class fork_join_task; +class task { + friend class scheduler; // Coordinate finishing of sub_tasks - std::atomic_uint32_t ref_count_; - fork_join_sub_task *parent_; - - // Access to TBB scheduling environment - fork_join_task *tbb_task_; - - bool executed = false; - int executed_at = -1; + std::atomic ref_count_; + task *parent_; // Stack Management (reset stack pointer after wait_for_all() calls) - data_structures::work_stealing_deque::state deque_state_; + data_structures::work_stealing_deque::state deque_state_; + protected: - explicit fork_join_sub_task(); - fork_join_sub_task(const fork_join_sub_task &other); + // TODO: Double Check with copy and move constructors, try to minimize overhead while keeping a clean API. + explicit task(); + task(const task &other); - // Overwritten with behaviour of child tasks + /** + * Overwrite this with the actual behaviour of concrete tasks. + */ virtual void execute_internal() = 0; - public: - // Only use them when actually executing this sub_task (only public for simpler API design) template void spawn_child(T &sub_task); void wait_for_all(); - - static fork_join_sub_task *current(); private: void execute(); }; template -class fork_join_lambda_by_reference : public fork_join_sub_task { +class fork_join_lambda_by_reference : public task { const Function &function_; public: - explicit fork_join_lambda_by_reference(const Function &function) : fork_join_sub_task{}, function_{function} {}; + explicit fork_join_lambda_by_reference(const Function &function) : task{}, function_{function} {}; protected: void execute_internal() override { @@ -62,11 +55,11 @@ class fork_join_lambda_by_reference : public fork_join_sub_task { }; template -class fork_join_lambda_by_value : public fork_join_sub_task { +class fork_join_lambda_by_value : public task { const Function function_; public: - explicit fork_join_lambda_by_value(const Function &function) : fork_join_sub_task{}, function_{function} {}; + explicit fork_join_lambda_by_value(const Function &function) : task{}, function_{function} {}; protected: void execute_internal() override { @@ -74,46 +67,21 @@ class fork_join_lambda_by_value : public fork_join_sub_task { } }; -class fork_join_task : public abstract_task { - friend class fork_join_sub_task; - - fork_join_sub_task *root_task_; - fork_join_sub_task *currently_executing_; - - // Double-Ended Queue management - data_structures::work_stealing_deque deque_; - - // Steal Management - fork_join_sub_task *last_stolen_; - - fork_join_sub_task *get_local_sub_task(); - fork_join_sub_task *get_stolen_sub_task(); - - bool internal_stealing(abstract_task *other_task) override; - bool split_task(base::swmr_spin_lock * /*lock*/) override; - - public: - explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id); - void execute() override; - fork_join_sub_task *currently_executing() const; -}; - template -void fork_join_sub_task::spawn_child(T &task) { +void task::spawn_child(T &sub_task) { PROFILE_FORK_JOIN_STEALING("spawn_child") - static_assert(std::is_base_of::value, "Only pass fork_join_sub_task subclasses!"); + static_assert(std::is_base_of::value, "Only pass task subclasses!"); // Keep our refcount up to date ref_count_++; // Assign forced values - task.parent_ = this; - task.tbb_task_ = tbb_task_; - task.deque_state_ = tbb_task_->deque_.save_state(); + sub_task.parent_ = this; + sub_task.deque_state_ = scheduler::this_thread_state()->deque_.save_state(); // Push on our deque - const T const_task = task; - tbb_task_->deque_.push_tail(const_task); + const T const_task = sub_task; + scheduler::this_thread_state()->deque_.push_tail(const_task); } } diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 0efbf78..b7fb030 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -5,8 +5,8 @@ #include #include "pls/internal/data_structures/aligned_stack.h" -#include "pls/internal/base/swmr_spin_lock.h" -#include "abstract_task.h" +#include "pls/internal/data_structures/deque.h" +#include "pls/internal/scheduling/task.h" namespace pls { namespace internal { @@ -17,11 +17,11 @@ class scheduler; struct thread_state { alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_; - alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *root_task_; - alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *current_task_; + alignas(base::system_details::CACHE_LINE_SIZE) task *root_task_; + alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_; + alignas(base::system_details::CACHE_LINE_SIZE) data_structures::deque deque_; alignas(base::system_details::CACHE_LINE_SIZE) size_t id_; - alignas(base::system_details::CACHE_LINE_SIZE) base::swmr_spin_lock lock_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; thread_state() : @@ -29,8 +29,8 @@ struct thread_state { root_task_{nullptr}, current_task_{nullptr}, task_stack_{nullptr}, + deque_{task_stack_}, id_{0}, - lock_{}, random_{id_} {}; thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) : @@ -38,9 +38,23 @@ struct thread_state { root_task_{nullptr}, current_task_{nullptr}, task_stack_{task_stack}, + deque_{task_stack_}, id_{id}, - lock_{}, random_{id_} {} + + /** + * Convenience helper to get the thread_state instance associated with this thread. + * Must only be called on threads that are associated with a thread_state, + * this will most likely be threads created by the scheduler. + * + * @return The thread_state of this thread. + */ + static thread_state *get() { return base::this_thread::state(); } + static scheduler *scheduler() { return get()->scheduler_; } + static task *current_task() { return get()->current_task_; } + static void set_current_task(task *task) { get()->current_task_ = task; }; + static data_structures::aligned_stack *task_stack() { return get()->task_stack_; } + static data_structures::deque *deque() { return get()->deque_; } }; } diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index accbcc3..36c83d0 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -4,7 +4,7 @@ #include "pls/algorithms/invoke_parallel.h" #include "pls/algorithms/parallel_for.h" #include "pls/internal/scheduling/abstract_task.h" -#include "pls/internal/scheduling/fork_join_task.h" +#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/helpers/unique_id.h" @@ -18,10 +18,10 @@ using task_id = internal::scheduling::abstract_task::id; using unique_id = internal::helpers::unique_id; -using internal::scheduling::fork_join_sub_task; +using internal::scheduling::task; using internal::scheduling::fork_join_lambda_by_reference; using internal::scheduling::fork_join_lambda_by_value; -using internal::scheduling::fork_join_task; +using internal::scheduling::task; using algorithm::invoke_parallel; using algorithm::parallel_for_fork_join; diff --git a/lib/pls/src/internal/scheduling/abstract_task.cpp b/lib/pls/src/internal/scheduling/abstract_task.cpp deleted file mode 100644 index 0fb15d2..0000000 --- a/lib/pls/src/internal/scheduling/abstract_task.cpp +++ /dev/null @@ -1,86 +0,0 @@ -#include -#include "pls/internal/helpers/profiler.h" - -#include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/scheduling/abstract_task.h" -#include "pls/internal/scheduling/scheduler.h" - -namespace pls { -namespace internal { -namespace scheduling { - -bool abstract_task::steal_work() { -// thread_local static base::backoff backoff{}; - - PROFILE_STEALING("abstract_task::steal_work") - const auto my_state = base::this_thread::state(); - const auto my_scheduler = my_state->scheduler_; - - const size_t my_id = my_state->id_; - const size_t offset = my_state->random_() % my_scheduler->num_threads(); - const size_t max_tries = my_scheduler->num_threads(); // TODO: Tune this value - for (size_t i = 0; i < max_tries; i++) { - size_t target = (offset + i) % my_scheduler->num_threads(); - if (target == my_id) { - continue; - } - auto target_state = my_scheduler->thread_state_for(target); - - if (!target_state->lock_.reader_try_lock()) { - continue; - } - - // Dig down to our level - PROFILE_STEALING("Go to our level") - abstract_task *current_task = target_state->root_task_; - while (current_task != nullptr && current_task->depth() < depth()) { - current_task = current_task->child(); - } - PROFILE_END_BLOCK - - // Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation - PROFILE_STEALING("Internal Steal") - if (current_task != nullptr) { - // See if it equals our type and depth of task - if (current_task->unique_id_ == unique_id_ && - current_task->depth_ == depth_) { - if (internal_stealing(current_task)) { - // internal steal was a success, hand it back to the internal scheduler - target_state->lock_.reader_unlock(); -// backoff.reset(); - return true; - } - - // No success, we need to steal work from a deeper level using 'top level task stealing' - current_task = current_task->child(); - } - } - PROFILE_END_BLOCK; - - - // Execute 'top level task steal' if possible - // (only try deeper tasks to keep depth restricted stealing). - PROFILE_STEALING("Top Level Steal") - while (current_task != nullptr) { - auto lock = &target_state->lock_; - if (current_task->split_task(lock)) { - // top level steal was a success (we did a top level task steal) -// backoff.reset(); - return false; - } - - current_task = current_task->child_task_; - } - PROFILE_END_BLOCK; - target_state->lock_.reader_unlock(); - } - - // internal steal was no success -// backoff.do_backoff(); -// base::this_thread::sleep(5); - return false; -} - -} -} -} diff --git a/lib/pls/src/internal/scheduling/parallel_iterator_task.cpp b/lib/pls/src/internal/scheduling/parallel_iterator_task.cpp deleted file mode 100644 index 4b22133..0000000 --- a/lib/pls/src/internal/scheduling/parallel_iterator_task.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "pls/internal/scheduling/parallel_iterator_task.h" diff --git a/lib/pls/src/internal/scheduling/root_task.cpp b/lib/pls/src/internal/scheduling/root_task.cpp deleted file mode 100644 index 7e3ef89..0000000 --- a/lib/pls/src/internal/scheduling/root_task.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "pls/internal/scheduling/root_task.h" - -namespace pls { -namespace internal { -namespace scheduling { - -} -} -} diff --git a/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp b/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp deleted file mode 100644 index 178afbb..0000000 --- a/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "pls/internal/scheduling/run_on_n_threads_task.h" - -namespace pls { -namespace internal { -namespace scheduling { - -} -} -} diff --git a/lib/pls/src/internal/scheduling/fork_join_task.cpp b/lib/pls/src/internal/scheduling/task.cpp similarity index 65% rename from lib/pls/src/internal/scheduling/fork_join_task.cpp rename to lib/pls/src/internal/scheduling/task.cpp index 115de70..276a407 100644 --- a/lib/pls/src/internal/scheduling/fork_join_task.cpp +++ b/lib/pls/src/internal/scheduling/task.cpp @@ -1,31 +1,34 @@ #include "pls/internal/helpers/profiler.h" #include "pls/internal/scheduling/scheduler.h" -#include "pls/internal/scheduling/fork_join_task.h" +#include "pls/internal/scheduling/task.h" +#include "pls/internal/scheduling/thread_state.h" namespace pls { namespace internal { namespace scheduling { -fork_join_sub_task::fork_join_sub_task() : +task::task() : ref_count_{0}, parent_{nullptr}, - tbb_task_{nullptr}, deque_state_{0} {} -fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) : +task::task(const task &other) : ref_count_{0}, parent_{other.parent_}, - tbb_task_{other.tbb_task_}, deque_state_{other.deque_state_} {} -void fork_join_sub_task::execute() { - PROFILE_WORK_BLOCK("execute sub_task") - auto last_executing = tbb_task_->currently_executing_; - tbb_task_->currently_executing_ = this; - execute_internal(); - tbb_task_->currently_executing_ = last_executing; - PROFILE_END_BLOCK +void task::execute() { + { + PROFILE_WORK_BLOCK("execute task") + auto last_executing = thread_state::get()->current_task_; + thread_state::get()->current_task_ = this; + + execute_internal(); + + thread_state::get()->current_task_ = last_executing; + } + wait_for_all(); if (parent_ != nullptr) { @@ -33,10 +36,10 @@ void fork_join_sub_task::execute() { } } -void fork_join_sub_task::wait_for_all() { +void task::wait_for_all() { while (ref_count_ > 0) { PROFILE_STEALING("get local sub task") - fork_join_sub_task *local_task = tbb_task_->get_local_sub_task(); + task *local_task = thread_state::get()->scheduler_->get_local_task(); PROFILE_END_BLOCK if (local_task != nullptr) { local_task->execute(); @@ -44,37 +47,25 @@ void fork_join_sub_task::wait_for_all() { // Try to steal work. // External steal will be executed implicitly if success PROFILE_STEALING("steal work") - bool internal_steal_success = tbb_task_->steal_work(); + task *stolen_task = thread_state::get()->scheduler_->steal_task(); PROFILE_END_BLOCK - if (internal_steal_success) { - tbb_task_->last_stolen_->execute(); + if (stolen_task != nullptr) { + stolen_task->execute(); } } } tbb_task_->deque_.release_memory_until(deque_state_); } -fork_join_sub_task *fork_join_task::get_local_sub_task() { - return deque_.pop_tail(); -} - -fork_join_sub_task *fork_join_task::get_stolen_sub_task() { - return deque_.pop_head(); -} - -fork_join_sub_task *fork_join_sub_task::current() { - return dynamic_cast(scheduler::current_task())->currently_executing(); -} - -bool fork_join_task::internal_stealing(abstract_task *other_task) { - PROFILE_STEALING("fork_join_task::internal_stealin") - auto cast_other_task = reinterpret_cast(other_task); +bool task::internal_stealing(abstract_task *other_task) { + PROFILE_STEALING("task::internal_stealin") + auto cast_other_task = reinterpret_cast(other_task); auto stolen_sub_task = cast_other_task->get_stolen_sub_task(); if (stolen_sub_task == nullptr) { return false; } else { - // Make sub-task belong to our fork_join_task instance + // Make sub-task belong to our task instance stolen_sub_task->tbb_task_ = this; stolen_sub_task->deque_state_ = deque_.save_state(); // We will execute this next without explicitly moving it onto our stack storage @@ -84,13 +75,13 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) { } } -bool fork_join_task::split_task(base::swmr_spin_lock *lock) { - PROFILE_STEALING("fork_join_task::split_task") +bool task::split_task(base::swmr_spin_lock *lock) { + PROFILE_STEALING("task::split_task") fork_join_sub_task *stolen_sub_task = get_stolen_sub_task(); if (stolen_sub_task == nullptr) { return false; } - fork_join_task task{stolen_sub_task, this->unique_id()}; + task task{stolen_sub_task, this->unique_id()}; // In success case, unlock. lock->reader_unlock(); @@ -99,8 +90,8 @@ bool fork_join_task::split_task(base::swmr_spin_lock *lock) { return true; } -void fork_join_task::execute() { - PROFILE_WORK_BLOCK("execute fork_join_task"); +void task::execute() { + PROFILE_WORK_BLOCK("execute task"); // Bind this instance to our OS thread // TODO: See if we did this right @@ -114,16 +105,6 @@ void fork_join_task::execute() { root_task_->execute(); } -fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; } - -fork_join_task::fork_join_task(fork_join_sub_task *root_task, - const abstract_task::id &id) : - abstract_task{0, id}, - root_task_{root_task}, - currently_executing_{nullptr}, - deque_{base::this_thread::state()->task_stack_}, - last_stolen_{nullptr} {} - } } } diff --git a/lib/pls/src/internal/scheduling/thread_state.cpp b/lib/pls/src/internal/scheduling/thread_state.cpp deleted file mode 100644 index f503b6a..0000000 --- a/lib/pls/src/internal/scheduling/thread_state.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "pls/internal/scheduling/thread_state.h" - -namespace pls { -namespace internal { -namespace scheduling { - -} -} -}