From 6802681d86eabda8ad5fc325734fec5926bdfd0b Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Thu, 11 Apr 2019 13:09:46 +0200 Subject: [PATCH] Move functions using templates to *_impl.h header files. --- NOTES.md | 14 +++++++++++++- lib/pls/CMakeLists.txt | 6 +++++- lib/pls/include/pls/algorithms/invoke_parallel.h | 37 ++++--------------------------------- lib/pls/include/pls/algorithms/invoke_parallel_impl.h | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/base/alignment.h | 1 - lib/pls/include/pls/internal/base/thread.h | 76 +++++++++------------------------------------------------------------------- lib/pls/include/pls/internal/base/thread_impl.h | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/data_structures/aligned_stack.h | 34 ++++++---------------------------- lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h | 36 ++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/fork_join_task.h | 24 +++--------------------- lib/pls/include/pls/internal/scheduling/scheduler.h | 76 +++++++++++++++++----------------------------------------------------------- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/src/algorithms/.gitkeep | 0 lib/pls/src/algorithms/invoke_parallel.cpp | 1 - lib/pls/src/internal/helpers/.gitkeep | 0 lib/pls/src/internal/scheduling/fork_join_task.cpp | 22 ++++++++++++++++++++++ 16 files changed, 343 insertions(+), 212 deletions(-) create mode 100644 lib/pls/include/pls/algorithms/invoke_parallel_impl.h create mode 100644 lib/pls/include/pls/internal/base/thread_impl.h create mode 100644 lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h create mode 100644 lib/pls/include/pls/internal/scheduling/scheduler_impl.h create mode 100644 lib/pls/src/algorithms/.gitkeep delete mode 100644 lib/pls/src/algorithms/invoke_parallel.cpp create mode 100644 lib/pls/src/internal/helpers/.gitkeep diff --git a/NOTES.md b/NOTES.md index 04b1705..5a07756 100644 --- a/NOTES.md +++ b/NOTES.md @@ -4,7 +4,19 @@ A collection of stuff that we noticed during development. Useful later on two write a project report and to go back in time to find out why certain decisions where made. -## 09.02.2019 - Cache Alignment +## 11.04.2019 - Notes on C++ Templating + +After working more with templating and talking to mike, +it seems like the common way to go is the following: +- If possible, add template arguments to + data containers only (separate from logic). +- If logic and data are coupled (like often with lambdas), + add the declaration of the interface into the normal header + some_class.h and add it's implementation into an extra implementation + file some_class_impl.h that is included at the end of the file. + + +## 09.04.2019 - Cache Alignment Aligning the cache needs all parts (both data types with correct alignment and base memory with correct alignment). diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 346ad9f..970adf0 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -2,16 +2,19 @@ add_library(pls STATIC include/pls/pls.h src/pls.cpp - include/pls/algorithms/invoke_parallel.h src/algorithms/invoke_parallel.cpp + include/pls/algorithms/invoke_parallel.h + include/pls/algorithms/invoke_parallel_impl.h include/pls/internal/base/spin_lock.h src/internal/base/spin_lock.cpp include/pls/internal/base/thread.h src/internal/base/thread.cpp + include/pls/internal/base/thread_impl.h include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/system_details.h include/pls/internal/base/error_handling.h include/pls/internal/base/alignment.h src/internal/base/alignment.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp + include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp include/pls/internal/helpers/prohibit_new.h @@ -22,6 +25,7 @@ add_library(pls STATIC include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp + include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp diff --git a/lib/pls/include/pls/algorithms/invoke_parallel.h b/lib/pls/include/pls/algorithms/invoke_parallel.h index 21dec7e..dc44469 100644 --- a/lib/pls/include/pls/algorithms/invoke_parallel.h +++ b/lib/pls/include/pls/algorithms/invoke_parallel.h @@ -7,44 +7,15 @@ namespace pls { namespace algorithm { - namespace internal { - using namespace ::pls::internal::scheduling; - - template - inline void run_body(const Body& internal_body, const abstract_task::id& id) { - // Make sure we are in the context of this invoke_parallel instance, - // if not we will spawn it as a new 'fork-join-style' task. - auto current_task = scheduler::current_task(); - if (current_task->unique_id() == id) { - auto current_sub_task = reinterpret_cast(current_task)->currently_executing(); - internal_body(current_sub_task); - } else { - fork_join_lambda root_body(&internal_body); - fork_join_task root_task{&root_body, id}; - scheduler::execute_task(root_task); - } - } - } - template - void invoke_parallel(const Function1& function1, const Function2& function2) { - using namespace ::pls::internal::scheduling; - static abstract_task::id id{PLS_UNIQUE_ID, true}; - - auto internal_body = [&] (fork_join_sub_task* this_task){ - auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; - auto sub_task_1 = fork_join_lambda(&sub_task_body_1); - - this_task->spawn_child(sub_task_1); - function2(); // Execute last function 'inline' without spawning a sub_task object - this_task->wait_for_all(); - }; + void invoke_parallel(const Function1& function1, const Function2& function2); - internal::run_body(internal_body, id); - } + template + void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3); // ...and so on, add more if we decide to keep this design } } +#include "invoke_parallel_impl.h" #endif //PLS_PARALLEL_INVOKE_H diff --git a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h new file mode 100644 index 0000000..4a7746f --- /dev/null +++ b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h @@ -0,0 +1,68 @@ + +#ifndef PLS_INVOKE_PARALLEL_IMPL_H +#define PLS_INVOKE_PARALLEL_IMPL_H + +#include "pls/internal/scheduling/fork_join_task.h" +#include "pls/internal/scheduling/scheduler.h" + +namespace pls { + namespace algorithm { + namespace internal { + using namespace ::pls::internal::scheduling; + + template + inline void run_body(const Body& internal_body, const abstract_task::id& id) { + // Make sure we are in the context of this invoke_parallel instance, + // if not we will spawn it as a new 'fork-join-style' task. + auto current_task = scheduler::current_task(); + if (current_task->unique_id() == id) { + auto current_sub_task = reinterpret_cast(current_task)->currently_executing(); + internal_body(current_sub_task); + } else { + fork_join_lambda root_body(&internal_body); + fork_join_task root_task{&root_body, id}; + scheduler::execute_task(root_task); + } + } + } + + template + void invoke_parallel(const Function1& function1, const Function2& function2) { + using namespace ::pls::internal::scheduling; + static abstract_task::id id{PLS_UNIQUE_ID, true}; + + auto internal_body = [&] (fork_join_sub_task* this_task){ + auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; + auto sub_task_1 = fork_join_lambda(&sub_task_body_1); + + this_task->spawn_child(sub_task_1); + function2(); // Execute last function 'inline' without spawning a sub_task object + this_task->wait_for_all(); + }; + + internal::run_body(internal_body, id); + } + + template + void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3) { + using namespace ::pls::internal::scheduling; + static abstract_task::id id{PLS_UNIQUE_ID, true}; + + auto internal_body = [&] (fork_join_sub_task* this_task){ + auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; + auto sub_task_1 = fork_join_lambda(&sub_task_body_1); + auto sub_task_body_2 = [&] (fork_join_sub_task*){ function2(); }; + auto sub_task_2 = fork_join_lambda(&sub_task_body_2); + + this_task->spawn_child(sub_task_1); + this_task->spawn_child(sub_task_2); + function3(); // Execute last function 'inline' without spawning a sub_task object + this_task->wait_for_all(); + }; + + internal::run_body(internal_body, id); + } + } +} + +#endif //PLS_INVOKE_PARALLEL_IMPL_H diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index 4dc4752..2dfb474 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -14,7 +14,6 @@ namespace pls { template struct aligned_wrapper { alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)]; - T* pointer() { return reinterpret_cast(data); } }; void* allocate_aligned(size_t size); diff --git a/lib/pls/include/pls/internal/base/thread.h b/lib/pls/include/pls/internal/base/thread.h index fd0fe33..c06b10b 100644 --- a/lib/pls/include/pls/internal/base/thread.h +++ b/lib/pls/include/pls/internal/base/thread.h @@ -49,14 +49,7 @@ namespace pls { * @return The state pointer hold for this thread. */ template - static T* state() { -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - return reinterpret_cast(pthread_getspecific(local_storage_key_)); -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER - return reinterpret_cast(local_state_); -#endif - } + static T* state(); /** * Stores a pointer to the thread local state object. @@ -67,18 +60,11 @@ namespace pls { * @param state_pointer A pointer to the threads state object. */ template - static void set_state(T* state_pointer) { -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer); -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER - local_state_ = state_pointer; -#endif - } + static void set_state(T* state_pointer); }; /** - * Abstraction for starting a function in a sparate thread. + * Abstraction for starting a function in a separate thread. * * @tparam Function Lambda being started on the new thread. * @tparam State State type held for this thread. @@ -108,52 +94,13 @@ namespace pls { // Keep handle to native implementation pthread_t pthread_thread_; - static void* start_pthread_internal(void* thread_pointer) { - auto my_thread = reinterpret_cast(thread_pointer); - Function my_function_copy = my_thread->function_; - State* my_state_pointer_copy = my_thread->state_pointer_; - - // Now we have copies of everything we need on the stack. - // The original thread object can be moved freely (no more - // references to its memory location). - my_thread->startup_flag_->clear(); - - this_thread::set_state(my_state_pointer_copy); - my_function_copy(); - - // Finished executing the user function - pthread_exit(nullptr); - } + static void* start_pthread_internal(void* thread_pointer); public: - thread(): function_{}, state_pointer_{nullptr}, startup_flag_{nullptr}, pthread_thread_{} {} - - explicit thread(const Function& function, State* state_pointer): - function_{function}, - state_pointer_{state_pointer}, - startup_flag_{nullptr}, - pthread_thread_{} { - -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - if (!this_thread::local_storage_key_initialized_) { - pthread_key_create(&this_thread::local_storage_key_, nullptr); - this_thread::local_storage_key_initialized_ = true; - } -#endif + explicit thread(const Function& function, State* state_pointer); - // We only need this during startup, will be destroyed when out of scope - std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; - startup_flag_ = &startup_flag; - - startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return - pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this)); - while (startup_flag.test_and_set()) - ; // Busy waiting for the starting flag to clear - } public: - void join() { - pthread_join(pthread_thread_, nullptr); - } + void join(); // make object move only thread(thread&&) noexcept = default; @@ -164,17 +111,12 @@ namespace pls { }; template - thread start_thread(const Function& function, State* state_pointer) { - return thread(function, state_pointer); - } - + thread start_thread(const Function& function, State* state_pointer); template - thread start_thread(const Function& function) { - return thread(function, nullptr); - } + thread start_thread(const Function& function); } } - } +#include "thread_impl.h" #endif //PLS_THREAD_H diff --git a/lib/pls/include/pls/internal/base/thread_impl.h b/lib/pls/include/pls/internal/base/thread_impl.h new file mode 100644 index 0000000..1ac356a --- /dev/null +++ b/lib/pls/include/pls/internal/base/thread_impl.h @@ -0,0 +1,88 @@ + +#ifndef PLS_THREAD_IMPL_H +#define PLS_THREAD_IMPL_H + +namespace pls { + namespace internal { + namespace base { + template + T* this_thread::state() { +#ifdef PLS_THREAD_SPECIFIC_PTHREAD + return reinterpret_cast(pthread_getspecific(local_storage_key_)); +#endif +#ifdef PLS_THREAD_SPECIFIC_COMPILER + return reinterpret_cast(local_state_); +#endif + } + + template + void this_thread::set_state(T* state_pointer) { +#ifdef PLS_THREAD_SPECIFIC_PTHREAD + pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer); +#endif +#ifdef PLS_THREAD_SPECIFIC_COMPILER + local_state_ = state_pointer; +#endif + } + + template + void* thread::start_pthread_internal(void* thread_pointer) { + auto my_thread = reinterpret_cast(thread_pointer); + Function my_function_copy = my_thread->function_; + State* my_state_pointer_copy = my_thread->state_pointer_; + + // Now we have copies of everything we need on the stack. + // The original thread object can be moved freely (no more + // references to its memory location). + my_thread->startup_flag_->clear(); + + this_thread::set_state(my_state_pointer_copy); + my_function_copy(); + + // Finished executing the user function + pthread_exit(nullptr); + } + + template + thread::thread(const Function& function, State* state_pointer): + function_{function}, + state_pointer_{state_pointer}, + startup_flag_{nullptr}, + pthread_thread_{} { + +#ifdef PLS_THREAD_SPECIFIC_PTHREAD + if (!this_thread::local_storage_key_initialized_) { + pthread_key_create(&this_thread::local_storage_key_, nullptr); + this_thread::local_storage_key_initialized_ = true; + } +#endif + + // We only need this during startup, will be destroyed when out of scope + std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; + startup_flag_ = &startup_flag; + + startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return + pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this)); + while (startup_flag.test_and_set()) + ; // Busy waiting for the starting flag to clear + } + + template + void thread::join() { + pthread_join(pthread_thread_, nullptr); + } + + template + thread start_thread(const Function& function, State* state_pointer) { + return thread(function, state_pointer); + } + + template + thread start_thread(const Function& function) { + return thread(function, nullptr); + } + } + } +} + +#endif //PLS_THREAD_IMPL_H diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h index 743ab56..3bb3f72 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h @@ -37,40 +37,18 @@ namespace pls { aligned_stack(char* memory_region, std::size_t size); template - T* push(const T& object) { - // Copy-Construct - return new ((void*)push())T(object); - } - + T* push(const T& object); template - void* push() { - void* result = reinterpret_cast(head_); - - // Move head to next aligned position after new object - head_ = base::alignment::next_alignment(head_ + sizeof(T)); - if (head_ >= memory_end_) { - PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!"); - } - - return result; - } - + void* push(); template - T pop() { - head_ = head_ - base::alignment::next_alignment(sizeof(T)); - return *reinterpret_cast(head_); - } - - state save_state() { - return head_; - } + T pop(); - void reset_state(state new_state) { - head_ = new_state; - } + state save_state() const { return head_; } + void reset_state(state new_state) { head_ = new_state; } }; } } } +#include "aligned_stack_impl.h" #endif //PLS_ALIGNED_STACK_H diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h new file mode 100644 index 0000000..8a3a759 --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h @@ -0,0 +1,36 @@ + +#ifndef PLS_ALIGNED_STACK_IMPL_H +#define PLS_ALIGNED_STACK_IMPL_H + +namespace pls { + namespace internal { + namespace data_structures { + template + T* aligned_stack::push(const T& object) { + // Copy-Construct + return new ((void*)push())T(object); + } + + template + void* aligned_stack::push() { + void* result = reinterpret_cast(head_); + + // Move head to next aligned position after new object + head_ = base::alignment::next_alignment(head_ + sizeof(T)); + if (head_ >= memory_end_) { + PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!"); + } + + return result; + } + + template + T aligned_stack::pop() { + head_ = head_ - base::alignment::next_alignment(sizeof(T)); + return *reinterpret_cast(head_); + } + } + } +} + +#endif //PLS_ALIGNED_STACK_IMPL_H diff --git a/lib/pls/include/pls/internal/scheduling/fork_join_task.h b/lib/pls/include/pls/internal/scheduling/fork_join_task.h index efcd395..b7b0da3 100644 --- a/lib/pls/include/pls/internal/scheduling/fork_join_task.h +++ b/lib/pls/include/pls/internal/scheduling/fork_join_task.h @@ -77,27 +77,9 @@ namespace pls { bool split_task(base::spin_lock* /*lock*/) override; public: - explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id): - abstract_task{0, id}, - root_task_{root_task}, - currently_executing_{nullptr}, - my_stack_{nullptr}, - deque_{}, - last_stolen_{nullptr} {}; - - void execute() override { - PROFILE_WORK_BLOCK("execute fork_join_task"); - - // Bind this instance to our OS thread - my_stack_ = base::this_thread::state()->task_stack_; - root_task_->tbb_task_ = this; - root_task_->stack_state_ = my_stack_->save_state(); - - // Execute it on our OS thread until its finished - root_task_->execute(); - } - - fork_join_sub_task* currently_executing() const { return currently_executing_; } + explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id); + void execute() override; + fork_join_sub_task* currently_executing() const; }; template diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index a9e2da5..4f01e89 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -34,76 +34,34 @@ namespace pls { explicit scheduler(scheduler_memory* memory, unsigned int num_threads); ~scheduler(); + /** + * Wakes up the thread pool. + * Code inside the Function lambda can invoke all parallel APIs. + * + * @param work_section generic function or lambda to be executed in the scheduler's context. + */ template - void perform_work(Function work_section) { - PROFILE_WORK_BLOCK("scheduler::perform_work") - root_task master{work_section}; - - // Push root task on stacks - auto new_master = memory_->task_stack_for(0)->push(master); - memory_->thread_state_for(0)->root_task_ = new_master; - memory_->thread_state_for(0)->current_task_ = new_master; - for (unsigned int i = 1; i < num_threads_; i++) { - root_worker_task worker{new_master}; - auto new_worker = memory_->task_stack_for(0)->push(worker); - memory_->thread_state_for(i)->root_task_ = new_worker; - memory_->thread_state_for(i)->current_task_ = new_worker; - } - - // Perform and wait for work - sync_barrier_.wait(); // Trigger threads to wake up - sync_barrier_.wait(); // Wait for threads to finish - - // Clean up stack - memory_->task_stack_for(0)->pop(); - for (unsigned int i = 1; i < num_threads_; i++) { - root_worker_task worker{new_master}; - memory_->task_stack_for(0)->pop(); - } - } - - // TODO: See if we should place this differently (only for performance reasons) + void perform_work(Function work_section); + + /** + * Executes a top-level-task (children of abstract_task) on this thread. + * + * @param task The task to be executed. + * @param depth Optional: depth of the new task, otherwise set implicitly. + */ template - static void execute_task(Task& task, int depth=-1) { - static_assert(std::is_base_of::value, "Only pass abstract_task subclasses!"); - - auto my_state = base::this_thread::state(); - abstract_task* old_task; - abstract_task* new_task; - - // Init Task - { - std::lock_guard lock{my_state->lock_}; - old_task = my_state->current_task_; - new_task = my_state->task_stack_->push(task); - - new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); - my_state->current_task_ = new_task; - old_task->set_child(new_task); - } - - // Run Task - new_task->execute(); - - // Teardown state back to before the task was executed - { - std::lock_guard lock{my_state->lock_}; - - old_task->set_child(nullptr); - my_state->current_task_ = old_task; - - my_state->task_stack_->pop(); - } - } + static void execute_task(Task& task, int depth=-1); static abstract_task* current_task() { return base::this_thread::state()->current_task_; } void terminate(bool wait_for_workers=true); + unsigned int num_threads() const { return num_threads_; } thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); } }; } } } +#include "scheduler_impl.h" #endif //PLS_SCHEDULER_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h new file mode 100644 index 0000000..869a5e3 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -0,0 +1,72 @@ + +#ifndef PLS_SCHEDULER_IMPL_H +#define PLS_SCHEDULER_IMPL_H + +namespace pls { + namespace internal { + namespace scheduling { + template + void scheduler::perform_work(Function work_section) { + PROFILE_WORK_BLOCK("scheduler::perform_work") + root_task master{work_section}; + + // Push root task on stacks + auto new_master = memory_->task_stack_for(0)->push(master); + memory_->thread_state_for(0)->root_task_ = new_master; + memory_->thread_state_for(0)->current_task_ = new_master; + for (unsigned int i = 1; i < num_threads_; i++) { + root_worker_task worker{new_master}; + auto new_worker = memory_->task_stack_for(0)->push(worker); + memory_->thread_state_for(i)->root_task_ = new_worker; + memory_->thread_state_for(i)->current_task_ = new_worker; + } + + // Perform and wait for work + sync_barrier_.wait(); // Trigger threads to wake up + sync_barrier_.wait(); // Wait for threads to finish + + // Clean up stack + memory_->task_stack_for(0)->pop(); + for (unsigned int i = 1; i < num_threads_; i++) { + root_worker_task worker{new_master}; + memory_->task_stack_for(0)->pop(); + } + } + + template + void scheduler::execute_task(Task& task, int depth) { + static_assert(std::is_base_of::value, "Only pass abstract_task subclasses!"); + + auto my_state = base::this_thread::state(); + abstract_task* old_task; + abstract_task* new_task; + + // Init Task + { + std::lock_guard lock{my_state->lock_}; + old_task = my_state->current_task_; + new_task = my_state->task_stack_->push(task); + + new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); + my_state->current_task_ = new_task; + old_task->set_child(new_task); + } + + // Run Task + new_task->execute(); + + // Teardown state back to before the task was executed + { + std::lock_guard lock{my_state->lock_}; + + old_task->set_child(nullptr); + my_state->current_task_ = old_task; + + my_state->task_stack_->pop(); + } + } + } + } +} + +#endif //PLS_SCHEDULER_IMPL_H diff --git a/lib/pls/src/algorithms/.gitkeep b/lib/pls/src/algorithms/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lib/pls/src/algorithms/.gitkeep diff --git a/lib/pls/src/algorithms/invoke_parallel.cpp b/lib/pls/src/algorithms/invoke_parallel.cpp deleted file mode 100644 index 598d59f..0000000 --- a/lib/pls/src/algorithms/invoke_parallel.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "pls/algorithms/invoke_parallel.h" diff --git a/lib/pls/src/internal/helpers/.gitkeep b/lib/pls/src/internal/helpers/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lib/pls/src/internal/helpers/.gitkeep diff --git a/lib/pls/src/internal/scheduling/fork_join_task.cpp b/lib/pls/src/internal/scheduling/fork_join_task.cpp index 164f804..0c5f51f 100644 --- a/lib/pls/src/internal/scheduling/fork_join_task.cpp +++ b/lib/pls/src/internal/scheduling/fork_join_task.cpp @@ -107,6 +107,28 @@ namespace pls { scheduler::execute_task(task, depth()); return true; } + + void fork_join_task::execute() { + PROFILE_WORK_BLOCK("execute fork_join_task"); + + // Bind this instance to our OS thread + my_stack_ = base::this_thread::state()->task_stack_; + root_task_->tbb_task_ = this; + root_task_->stack_state_ = my_stack_->save_state(); + + // Execute it on our OS thread until its finished + root_task_->execute(); + } + + fork_join_sub_task* fork_join_task::currently_executing() const { return currently_executing_; } + + fork_join_task::fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id): + abstract_task{0, id}, + root_task_{root_task}, + currently_executing_{nullptr}, + my_stack_{nullptr}, + deque_{}, + last_stolen_{nullptr} {}; } } } -- libgit2 0.26.0