From fba551da172656bcdd4843f8584008b13e07f78f Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Tue, 26 Mar 2019 15:17:09 +0100 Subject: [PATCH] Add first test version of high level task stealing. This means that high level tasks can be stolen and lays the groundwork for implementing different tasks like classic work stealing. --- lib/pls/CMakeLists.txt | 3 ++- lib/pls/include/pls/internal/base/spin_lock.h | 4 ++++ lib/pls/include/pls/internal/scheduling/abstract_task.h | 29 ++++++++++------------------- lib/pls/include/pls/internal/scheduling/root_master_task.h | 6 +++++- lib/pls/include/pls/internal/scheduling/root_worker_task.h | 6 +++++- lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/scheduler.h | 44 +++++++++++++++++++++++++++----------------- lib/pls/include/pls/internal/scheduling/thread_state.h | 41 +++++++++++++++++++++++++++++++++++------ lib/pls/src/internal/scheduling/abstract_task.cpp | 47 +++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp | 9 +++++++++ lib/pls/src/internal/scheduling/scheduler.cpp | 2 +- 11 files changed, 256 insertions(+), 46 deletions(-) create mode 100644 lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h create mode 100644 lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 8b56cd0..661d055 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -10,7 +10,8 @@ add_library(pls STATIC src/internal/base/barrier.cpp include/pls/internal/base/barrier.h src/internal/scheduling/root_master_task.cpp include/pls/internal/scheduling/root_master_task.h src/internal/base/aligned_stack.cpp include/pls/internal/base/aligned_stack.h - include/pls/internal/base/system_details.h) + include/pls/internal/base/system_details.h + src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/base/spin_lock.h b/lib/pls/include/pls/internal/base/spin_lock.h index 5236e9f..cfd3143 100644 --- a/lib/pls/include/pls/internal/base/spin_lock.h +++ b/lib/pls/include/pls/internal/base/spin_lock.h @@ -3,6 +3,7 @@ #define PLS_SPINLOCK_H #include +#include #include "pls/internal/base/thread.h" @@ -16,6 +17,9 @@ namespace pls { public: spin_lock(): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {}; + spin_lock(const spin_lock& other): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} { + std::cout << "Spinlock Moved!" << std::endl; + } void lock(); void unlock(); diff --git a/lib/pls/include/pls/internal/scheduling/abstract_task.h b/lib/pls/include/pls/internal/scheduling/abstract_task.h index ceb4943..d736127 100644 --- a/lib/pls/include/pls/internal/scheduling/abstract_task.h +++ b/lib/pls/include/pls/internal/scheduling/abstract_task.h @@ -13,33 +13,24 @@ namespace pls { int depth_; int unique_id_; abstract_task* child_task_; - base::spin_lock spin_lock_; public: - explicit abstract_task(int depth, int unique_id): + abstract_task(int depth, int unique_id): depth_{depth}, unique_id_{unique_id}, - child_task_{nullptr}, - spin_lock_{} {}; + child_task_{nullptr} {} virtual void execute() = 0; - const base::spin_lock& spin_lock() { return spin_lock_; } + void set_child(abstract_task* child_task) { child_task_ = child_task; } + abstract_task* child() { return child_task_; } + void set_depth(int depth) { depth_ = depth; } + int depth() { return depth_; } protected: - virtual bool my_stealing(abstract_task *other_task) = 0; - - bool steal_work() { - // get scheduler - // select victim - // try steal - // |-- see if same depth is available - // |-- see if equals depth + id - // |-- try user steal if matches (will return itself if it could steal) - // |-- try internal steal if deeper tasks are available - // |-- if internal steal worked, execute it - // return if the user steal was a success - return false; - }; + virtual bool internal_stealing(abstract_task* other_task) = 0; + virtual bool split_task() = 0; + + bool steal_work(); }; } } diff --git a/lib/pls/include/pls/internal/scheduling/root_master_task.h b/lib/pls/include/pls/internal/scheduling/root_master_task.h index 01282ce..ba46408 100644 --- a/lib/pls/include/pls/internal/scheduling/root_master_task.h +++ b/lib/pls/include/pls/internal/scheduling/root_master_task.h @@ -36,7 +36,11 @@ namespace pls { } } - bool my_stealing(abstract_task* /*other_task*/) override { + bool internal_stealing(abstract_task* /*other_task*/) override { + return false; + } + + bool split_task() override { return false; } }; diff --git a/lib/pls/include/pls/internal/scheduling/root_worker_task.h b/lib/pls/include/pls/internal/scheduling/root_worker_task.h index 459895e..ff67cbd 100644 --- a/lib/pls/include/pls/internal/scheduling/root_worker_task.h +++ b/lib/pls/include/pls/internal/scheduling/root_worker_task.h @@ -22,7 +22,11 @@ namespace pls { } while (!master_task_->finished()); } - bool my_stealing(abstract_task* /*other_task*/) override { + bool internal_stealing(abstract_task* /*other_task*/) override { + return false; + } + + bool split_task() override { return false; } }; diff --git a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h new file mode 100644 index 0000000..d7e2a24 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h @@ -0,0 +1,111 @@ + +#ifndef PLS_RUN_ON_N_THREADS_TASK_H +#define PLS_RUN_ON_N_THREADS_TASK_H + +#include + +#include "pls/internal/base/spin_lock.h" +#include "pls/internal/base/thread.h" + +#include "abstract_task.h" +#include "thread_state.h" +#include "scheduler.h" + +namespace pls { + namespace internal { + namespace scheduling { + template + class run_on_n_threads_task : public abstract_task { + template + friend class run_on_n_threads_task_worker; + + Function function_; + + // Improvement: Remove lock and replace by atomic variable (performance) + int counter; + base::spin_lock counter_lock_; + + int decrement_counter() { + std::lock_guard lock{counter_lock_}; + counter--; + return counter; + } + + int get_counter() { + std::lock_guard lock{counter_lock_}; + return counter; + } + public: + run_on_n_threads_task(Function function, int num_threads): + abstract_task{PLS_UNIQUE_ID, 0}, + function_{function}, + counter{num_threads - 1} {} + + void execute() override { + // Execute our function ONCE + function_(); + + // Steal until we are finished (other threads executed) + do { + steal_work(); + } while (get_counter() > 0); + + std::cout << "Finished Master!" << std::endl; + } + + bool internal_stealing(abstract_task* /*other_task*/) override { + return false; + } + + bool split_task() override; + }; + + template + class run_on_n_threads_task_worker : public abstract_task { + Function function_; + run_on_n_threads_task* root_; + public: + run_on_n_threads_task_worker(Function function, run_on_n_threads_task* root): + abstract_task{PLS_UNIQUE_ID, 0}, + function_{function}, + root_{root} {} + + void execute() override { + if (root_->decrement_counter() >= 0) { + function_(); + std::cout << "Finished Worker!" << std::endl; + } else { + std::cout << "Abandoned Worker!" << std::endl; + } + } + + bool internal_stealing(abstract_task* /*other_task*/) override { + return false; + } + + bool split_task() override { + return false; + } + }; + + template + bool run_on_n_threads_task::split_task() { + if (get_counter() <= 0) { + return false; + } + + auto scheduler = base::this_thread::state()->scheduler_; + auto task = run_on_n_threads_task_worker{function_, this}; + scheduler->execute_task(task, depth()); + return true; + } + + template + run_on_n_threads_task create_run_on_n_threads_task(Function function, int num_threads) { + return run_on_n_threads_task{function, num_threads}; + } + } + } +} + +#endif //PLS_RUN_ON_N_THREADS_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 7505d77..b8f1f39 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -8,8 +8,8 @@ #include "pls/internal/base/aligned_stack.h" #include "pls/internal/base/thread.h" #include "pls/internal/base/barrier.h" -#include "pls/internal/scheduling/thread_state.h" +#include "thread_state.h" #include "root_master_task.h" #include "root_worker_task.h" @@ -54,7 +54,7 @@ namespace pls { class scheduler { friend void worker_routine(); - unsigned int num_threads_; + const unsigned int num_threads_; scheduler_memory* memory_; base::barrier sync_barrier_; @@ -69,38 +69,48 @@ namespace pls { root_worker_task worker{&master}; // Push root task on stacks - memory_->thread_state_for(0)->root_task_ = memory_->task_stack_for(0)->push(master); + memory_->thread_state_for(0)->root_task_ = &master; + memory_->thread_state_for(0)->current_task_ = &master; for (unsigned int i = 1; i < num_threads_; i++) { - memory_->thread_state_for(i)->root_task_ = memory_->task_stack_for(i)->push(worker); + memory_->thread_state_for(i)->root_task_ = &worker; + memory_->thread_state_for(i)->current_task_ = &worker; } // Perform and wait for work sync_barrier_.wait(); // Trigger threads to wake up sync_barrier_.wait(); // Wait for threads to finish - - // Remove root task from stacks - memory_->task_stack_for(0)->pop(); - for (unsigned int i = 1; i < num_threads_; i++) { - memory_->task_stack_for(i)->pop(); - } } // TODO: See if we should place this differently (only for performance reasons) template - void execute_task(Task& task) { + static void execute_task(Task task, int depth=-1) { static_assert(std::is_base_of::value, "Only pass abstract_task subclasses!"); auto my_state = base::this_thread::state(); - auto task_stack = my_state->task_stack_; + auto current_task = my_state->current_task_; + + // Init Task + { + std::lock_guard lock{my_state->lock_}; + task.set_depth(depth >= 0 ? depth : current_task->depth() + 1); + my_state->current_task_ = &task; + current_task->set_child(&task); + } + + // Run Task + task.execute(); - // TODO: Assert if 'top level' task even have to go somewhere or if - // we can simply keep the on the call stack. - auto my_task = task_stack->push(task); - my_task.execute(); - task_stack->pop(); + // Teardown state back to before the task was executed + { + std::lock_guard lock{my_state->lock_}; + current_task->set_child(nullptr); + my_state->current_task_ = current_task; + } } void terminate(bool wait_for_workers=true); + unsigned int num_threads() const { return num_threads_; } + thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); } }; } } diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index b39ab09..58dcc9b 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -3,6 +3,7 @@ #define PLS_THREAD_STATE_H #include "abstract_task.h" +#include "pls/internal/base/aligned_stack.h" namespace pls { namespace internal { @@ -11,15 +12,43 @@ namespace pls { class scheduler; struct thread_state { - thread_state(): scheduler_{nullptr}, root_task_{nullptr}, task_stack_{nullptr} {}; - explicit thread_state(scheduler* scheduler, base::aligned_stack* task_stack): - scheduler_{scheduler}, - root_task_{nullptr}, - task_stack_{task_stack} {} - scheduler* scheduler_; abstract_task* root_task_; + abstract_task* current_task_; base::aligned_stack* task_stack_; + unsigned int id_; + base::spin_lock lock_; + + thread_state(): + scheduler_{nullptr}, + root_task_{nullptr}, + current_task_{nullptr}, + task_stack_{nullptr}, + id_{0} {}; + + thread_state(scheduler* scheduler, base::aligned_stack* task_stack, unsigned int id): + scheduler_{scheduler}, + root_task_{nullptr}, + current_task_{nullptr}, + task_stack_{task_stack}, + id_{id} {} + + thread_state(const thread_state& other): + scheduler_{other.scheduler_}, + root_task_{other.root_task_}, + current_task_{other.current_task_}, + task_stack_{other.task_stack_}, + id_{other.id_} {} + + thread_state& operator=(const thread_state& other) { + scheduler_ = other.scheduler_; + root_task_ = other.root_task_; + current_task_ = other.current_task_; + task_stack_ = other.task_stack_; + id_ = other.id_; + + return *this; + } }; } } diff --git a/lib/pls/src/internal/scheduling/abstract_task.cpp b/lib/pls/src/internal/scheduling/abstract_task.cpp index 314dc68..ba69bcb 100644 --- a/lib/pls/src/internal/scheduling/abstract_task.cpp +++ b/lib/pls/src/internal/scheduling/abstract_task.cpp @@ -1,9 +1,56 @@ +#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/abstract_task.h" +#include "pls/internal/scheduling/scheduler.h" namespace pls { namespace internal { namespace scheduling { + bool abstract_task::steal_work() { + auto my_state = base::this_thread::state(); + auto my_scheduler = my_state->scheduler_; + int my_id = my_state->id_; + for (size_t i = 1; i < my_scheduler->num_threads(); i++) { + size_t target = (my_id + i) % my_scheduler->num_threads(); + auto target_state = my_scheduler->thread_state_for(target); + std::lock_guard lock{target_state->lock_}; + + // Dig down to our level + abstract_task* current_task = target_state->root_task_; + while (current_task != nullptr && current_task->depth() < depth()) { + current_task = current_task->child_task_; + } + + if (current_task != nullptr) { + // See if it equals our type and depth of task + if (current_task->unique_id_ == unique_id_ && + current_task->depth_ == depth_) { + if (internal_stealing(current_task)) { + // internal steal was a success, hand it back to the internal scheduler + return true; + } + + // No success, we need to steal work from a deeper level using 'top level task stealing' + current_task = current_task->child_task_; + } + } + + + // Execute 'top level task steal' if possible + // (only try deeper tasks to keep depth restricted stealing) + while (current_task != nullptr) { + if (current_task->split_task()) { + // internal steal was no success (we did a top level task steal) + return false; + } + + current_task = current_task->child_task_; + } + } + + // internal steal was no success + return false; + }; } } } diff --git a/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp b/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp new file mode 100644 index 0000000..e41571f --- /dev/null +++ b/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp @@ -0,0 +1,9 @@ +#include "pls/internal/scheduling/run_on_n_threads_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + + } + } +} diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 1960fd2..8e55147 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -13,7 +13,7 @@ namespace pls { } for (unsigned int i = 0; i < num_threads; i++) { - *memory_->thread_state_for(i) = thread_state{this, memory_->task_stack_for(i)}; + *memory_->thread_state_for(i) = thread_state{this, memory_->task_stack_for(i), i}; *memory_->thread_for(i) = base::start_thread(&worker_routine, memory_->thread_state_for(i)); } } -- libgit2 0.26.0