diff --git a/NOTES.md b/NOTES.md index 04b1705..076b2ca 100644 --- a/NOTES.md +++ b/NOTES.md @@ -4,7 +4,67 @@ A collection of stuff that we noticed during development. Useful later on two write a project report and to go back in time to find out why certain decisions where made. -## 09.02.2019 - Cache Alignment +## 12.04.2019 - Unique IDs + +Assigning unique IDs to logical different tasks is key to the +current model of separating timing/memory guarantees for certain +parallel patterns. + +We do want to assign these IDs automatic in most cases (a call to +some parallel API should not require the end user to specific a +unique ID for each different call, as this would be error prone). +Instead we want to make sure that each DIFFERENT API call is separated +automatic, while leaving the option for manual ID assignment for later +implementations of GPU offloading (tasks need to have identifiers for +this to work properly). + +Our first approach was using the `__COUNTER__` macro, +but this only works in ONE COMPLIATION UNIT and is not +portable to all compilers. + +As all our unique instances are copled to a class/type +we decided to implement the unique IDs using +typeid(tuple) in automatic cases (bound to a specific type) +and fully manual IDs in other types. All this is wrapped in a +helper::unique_id class. + + +## 11.04.2019 - Lambda Pointer Abstraction + +The question is if we could use a pointer to a lambda without +needing templating (for the type of the lambda) at the place that +we use it. + +We looked into different techniques to achieve this: +- Using std::function<...> +- Using custom wrappers + +std::function uses dynamic memory, thus ruling it out. +All methods that we can think of involve storing a pointer +to the lambda and then calling on it later on. +This works well enough with a simple wrapper, but has one +major downside: It involves virtual function calls, +making it impossible to inline the lambda. +This property broke the technique for us in most places, +as inlining is crucial, especially in small functions like loop +iterations. See `invoke_parallel_impl.h` for an example where we +did this (wrapper with virtual function call), but we only did it there, +as the generic `fork_join_sub_task` requires the virtual call anyway, +thus making us not loose ANY performance with this technique. + +## 11.04.2019 - Notes on C++ Templating + +After working more with templating and talking to mike, +it seems like the common way to go is the following: +- If possible, add template arguments to + data containers only (separate from logic). +- If logic and data are coupled (like often with lambdas), + add the declaration of the interface into the normal header + some_class.h and add it's implementation into an extra implementation + file some_class_impl.h that is included at the end of the file. + + +## 09.04.2019 - Cache Alignment Aligning the cache needs all parts (both data types with correct alignment and base memory with correct alignment). diff --git a/app/playground/CMakeLists.txt b/app/playground/CMakeLists.txt index bdcbebe..c1b57ee 100644 --- a/app/playground/CMakeLists.txt +++ b/app/playground/CMakeLists.txt @@ -1,4 +1,4 @@ add_executable(playground main.cpp) # Example for adding the library to your app (as a cmake project dependency) -target_link_libraries(playground pls) \ No newline at end of file +target_link_libraries(playground pls) diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 4a33c29..cc7e784 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -4,16 +4,14 @@ #include #include #include +#include +#include -#include -#include -#include +#include +#include -using namespace pls; int main() { - malloc_scheduler_memory sched_memory{8}; - std::cout << (std::uintptr_t)sched_memory.thread_for(0) % 64 << ", " << (std::uintptr_t)sched_memory.thread_for(1) % 64 << ", " << (std::uintptr_t)sched_memory.thread_for(2) % 64 << ", " << std::endl; - std::cout << (std::uintptr_t)sched_memory.thread_state_for(0) % 64 << ", " << (std::uintptr_t)sched_memory.thread_state_for(1) % 64 << ", " << (std::uintptr_t)sched_memory.thread_state_for(2) % 64 << ", " << std::endl; - std::cout << (std::uintptr_t)sched_memory.task_stack_for(0) % 64 << ", " << (std::uintptr_t)sched_memory.task_stack_for(1) % 64 << ", " << (std::uintptr_t)sched_memory.task_stack_for(2) % 64 << ", " << std::endl; + std::cout << pls::internal::scheduling::root_task::create_id().type_.hash_code() << std::endl; + std::cout << pls::internal::helpers::unique_id::create>().type_.hash_code() << std::endl; } diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 346ad9f..4f8652a 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -2,16 +2,19 @@ add_library(pls STATIC include/pls/pls.h src/pls.cpp - include/pls/algorithms/invoke_parallel.h src/algorithms/invoke_parallel.cpp + include/pls/algorithms/invoke_parallel.h + include/pls/algorithms/invoke_parallel_impl.h include/pls/internal/base/spin_lock.h src/internal/base/spin_lock.cpp include/pls/internal/base/thread.h src/internal/base/thread.cpp + include/pls/internal/base/thread_impl.h include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/system_details.h include/pls/internal/base/error_handling.h include/pls/internal/base/alignment.h src/internal/base/alignment.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp + include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp include/pls/internal/helpers/prohibit_new.h @@ -22,10 +25,11 @@ add_library(pls STATIC include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp + include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp -) + include/pls/internal/helpers/unique_id.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/algorithms/invoke_parallel.h b/lib/pls/include/pls/algorithms/invoke_parallel.h index 21dec7e..dc44469 100644 --- a/lib/pls/include/pls/algorithms/invoke_parallel.h +++ b/lib/pls/include/pls/algorithms/invoke_parallel.h @@ -7,44 +7,15 @@ namespace pls { namespace algorithm { - namespace internal { - using namespace ::pls::internal::scheduling; - - template - inline void run_body(const Body& internal_body, const abstract_task::id& id) { - // Make sure we are in the context of this invoke_parallel instance, - // if not we will spawn it as a new 'fork-join-style' task. - auto current_task = scheduler::current_task(); - if (current_task->unique_id() == id) { - auto current_sub_task = reinterpret_cast(current_task)->currently_executing(); - internal_body(current_sub_task); - } else { - fork_join_lambda root_body(&internal_body); - fork_join_task root_task{&root_body, id}; - scheduler::execute_task(root_task); - } - } - } - template - void invoke_parallel(const Function1& function1, const Function2& function2) { - using namespace ::pls::internal::scheduling; - static abstract_task::id id{PLS_UNIQUE_ID, true}; - - auto internal_body = [&] (fork_join_sub_task* this_task){ - auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; - auto sub_task_1 = fork_join_lambda(&sub_task_body_1); - - this_task->spawn_child(sub_task_1); - function2(); // Execute last function 'inline' without spawning a sub_task object - this_task->wait_for_all(); - }; + void invoke_parallel(const Function1& function1, const Function2& function2); - internal::run_body(internal_body, id); - } + template + void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3); // ...and so on, add more if we decide to keep this design } } +#include "invoke_parallel_impl.h" #endif //PLS_PARALLEL_INVOKE_H diff --git a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h new file mode 100644 index 0000000..7dfbef8 --- /dev/null +++ b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h @@ -0,0 +1,71 @@ + +#ifndef PLS_INVOKE_PARALLEL_IMPL_H +#define PLS_INVOKE_PARALLEL_IMPL_H + +#include "pls/internal/scheduling/fork_join_task.h" +#include "pls/internal/scheduling/scheduler.h" +#include "pls/internal/helpers/unique_id.h" + +namespace pls { + namespace algorithm { + namespace internal { + using namespace ::pls::internal::scheduling; + + template + inline void run_body(const Body& internal_body, const abstract_task::id& id) { + // Make sure we are in the context of this invoke_parallel instance, + // if not we will spawn it as a new 'fork-join-style' task. + auto current_task = scheduler::current_task(); + if (current_task->unique_id() == id) { + auto current_sub_task = reinterpret_cast(current_task)->currently_executing(); + internal_body(current_sub_task); + } else { + fork_join_lambda root_body(&internal_body); + fork_join_task root_task{&root_body, id}; + scheduler::execute_task(root_task); + } + } + } + + template + void invoke_parallel(const Function1& function1, const Function2& function2) { + using namespace ::pls::internal::scheduling; + using namespace ::pls::internal::helpers; + static abstract_task::id id = unique_id::create(); + + auto internal_body = [&] (fork_join_sub_task* this_task){ + auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; + auto sub_task_1 = fork_join_lambda(&sub_task_body_1); + + this_task->spawn_child(sub_task_1); + function2(); // Execute last function 'inline' without spawning a sub_task object + this_task->wait_for_all(); + }; + + internal::run_body(internal_body, id); + } + + template + void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3) { + using namespace ::pls::internal::scheduling; + using namespace ::pls::internal::helpers; + static abstract_task::id id = unique_id::create(); + + auto internal_body = [&] (fork_join_sub_task* this_task){ + auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; + auto sub_task_1 = fork_join_lambda(&sub_task_body_1); + auto sub_task_body_2 = [&] (fork_join_sub_task*){ function2(); }; + auto sub_task_2 = fork_join_lambda(&sub_task_body_2); + + this_task->spawn_child(sub_task_1); + this_task->spawn_child(sub_task_2); + function3(); // Execute last function 'inline' without spawning a sub_task object + this_task->wait_for_all(); + }; + + internal::run_body(internal_body, id); + } + } +} + +#endif //PLS_INVOKE_PARALLEL_IMPL_H diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index 4dc4752..2dfb474 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -14,7 +14,6 @@ namespace pls { template struct aligned_wrapper { alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)]; - T* pointer() { return reinterpret_cast(data); } }; void* allocate_aligned(size_t size); diff --git a/lib/pls/include/pls/internal/base/spin_lock.h b/lib/pls/include/pls/internal/base/spin_lock.h index 5acaf0a..cb87811 100644 --- a/lib/pls/include/pls/internal/base/spin_lock.h +++ b/lib/pls/include/pls/internal/base/spin_lock.h @@ -18,7 +18,7 @@ namespace pls { */ class spin_lock { std::atomic_flag flag_; - int yield_at_tries_; + unsigned int yield_at_tries_; public: diff --git a/lib/pls/include/pls/internal/base/thread.h b/lib/pls/include/pls/internal/base/thread.h index fd0fe33..c06b10b 100644 --- a/lib/pls/include/pls/internal/base/thread.h +++ b/lib/pls/include/pls/internal/base/thread.h @@ -49,14 +49,7 @@ namespace pls { * @return The state pointer hold for this thread. */ template - static T* state() { -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - return reinterpret_cast(pthread_getspecific(local_storage_key_)); -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER - return reinterpret_cast(local_state_); -#endif - } + static T* state(); /** * Stores a pointer to the thread local state object. @@ -67,18 +60,11 @@ namespace pls { * @param state_pointer A pointer to the threads state object. */ template - static void set_state(T* state_pointer) { -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer); -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER - local_state_ = state_pointer; -#endif - } + static void set_state(T* state_pointer); }; /** - * Abstraction for starting a function in a sparate thread. + * Abstraction for starting a function in a separate thread. * * @tparam Function Lambda being started on the new thread. * @tparam State State type held for this thread. @@ -108,52 +94,13 @@ namespace pls { // Keep handle to native implementation pthread_t pthread_thread_; - static void* start_pthread_internal(void* thread_pointer) { - auto my_thread = reinterpret_cast(thread_pointer); - Function my_function_copy = my_thread->function_; - State* my_state_pointer_copy = my_thread->state_pointer_; - - // Now we have copies of everything we need on the stack. - // The original thread object can be moved freely (no more - // references to its memory location). - my_thread->startup_flag_->clear(); - - this_thread::set_state(my_state_pointer_copy); - my_function_copy(); - - // Finished executing the user function - pthread_exit(nullptr); - } + static void* start_pthread_internal(void* thread_pointer); public: - thread(): function_{}, state_pointer_{nullptr}, startup_flag_{nullptr}, pthread_thread_{} {} - - explicit thread(const Function& function, State* state_pointer): - function_{function}, - state_pointer_{state_pointer}, - startup_flag_{nullptr}, - pthread_thread_{} { - -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - if (!this_thread::local_storage_key_initialized_) { - pthread_key_create(&this_thread::local_storage_key_, nullptr); - this_thread::local_storage_key_initialized_ = true; - } -#endif + explicit thread(const Function& function, State* state_pointer); - // We only need this during startup, will be destroyed when out of scope - std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; - startup_flag_ = &startup_flag; - - startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return - pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this)); - while (startup_flag.test_and_set()) - ; // Busy waiting for the starting flag to clear - } public: - void join() { - pthread_join(pthread_thread_, nullptr); - } + void join(); // make object move only thread(thread&&) noexcept = default; @@ -164,17 +111,12 @@ namespace pls { }; template - thread start_thread(const Function& function, State* state_pointer) { - return thread(function, state_pointer); - } - + thread start_thread(const Function& function, State* state_pointer); template - thread start_thread(const Function& function) { - return thread(function, nullptr); - } + thread start_thread(const Function& function); } } - } +#include "thread_impl.h" #endif //PLS_THREAD_H diff --git a/lib/pls/include/pls/internal/base/thread_impl.h b/lib/pls/include/pls/internal/base/thread_impl.h new file mode 100644 index 0000000..1ac356a --- /dev/null +++ b/lib/pls/include/pls/internal/base/thread_impl.h @@ -0,0 +1,88 @@ + +#ifndef PLS_THREAD_IMPL_H +#define PLS_THREAD_IMPL_H + +namespace pls { + namespace internal { + namespace base { + template + T* this_thread::state() { +#ifdef PLS_THREAD_SPECIFIC_PTHREAD + return reinterpret_cast(pthread_getspecific(local_storage_key_)); +#endif +#ifdef PLS_THREAD_SPECIFIC_COMPILER + return reinterpret_cast(local_state_); +#endif + } + + template + void this_thread::set_state(T* state_pointer) { +#ifdef PLS_THREAD_SPECIFIC_PTHREAD + pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer); +#endif +#ifdef PLS_THREAD_SPECIFIC_COMPILER + local_state_ = state_pointer; +#endif + } + + template + void* thread::start_pthread_internal(void* thread_pointer) { + auto my_thread = reinterpret_cast(thread_pointer); + Function my_function_copy = my_thread->function_; + State* my_state_pointer_copy = my_thread->state_pointer_; + + // Now we have copies of everything we need on the stack. + // The original thread object can be moved freely (no more + // references to its memory location). + my_thread->startup_flag_->clear(); + + this_thread::set_state(my_state_pointer_copy); + my_function_copy(); + + // Finished executing the user function + pthread_exit(nullptr); + } + + template + thread::thread(const Function& function, State* state_pointer): + function_{function}, + state_pointer_{state_pointer}, + startup_flag_{nullptr}, + pthread_thread_{} { + +#ifdef PLS_THREAD_SPECIFIC_PTHREAD + if (!this_thread::local_storage_key_initialized_) { + pthread_key_create(&this_thread::local_storage_key_, nullptr); + this_thread::local_storage_key_initialized_ = true; + } +#endif + + // We only need this during startup, will be destroyed when out of scope + std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; + startup_flag_ = &startup_flag; + + startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return + pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this)); + while (startup_flag.test_and_set()) + ; // Busy waiting for the starting flag to clear + } + + template + void thread::join() { + pthread_join(pthread_thread_, nullptr); + } + + template + thread start_thread(const Function& function, State* state_pointer) { + return thread(function, state_pointer); + } + + template + thread start_thread(const Function& function) { + return thread(function, nullptr); + } + } + } +} + +#endif //PLS_THREAD_IMPL_H diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h index 743ab56..3bb3f72 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h @@ -37,40 +37,18 @@ namespace pls { aligned_stack(char* memory_region, std::size_t size); template - T* push(const T& object) { - // Copy-Construct - return new ((void*)push())T(object); - } - + T* push(const T& object); template - void* push() { - void* result = reinterpret_cast(head_); - - // Move head to next aligned position after new object - head_ = base::alignment::next_alignment(head_ + sizeof(T)); - if (head_ >= memory_end_) { - PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!"); - } - - return result; - } - + void* push(); template - T pop() { - head_ = head_ - base::alignment::next_alignment(sizeof(T)); - return *reinterpret_cast(head_); - } - - state save_state() { - return head_; - } + T pop(); - void reset_state(state new_state) { - head_ = new_state; - } + state save_state() const { return head_; } + void reset_state(state new_state) { head_ = new_state; } }; } } } +#include "aligned_stack_impl.h" #endif //PLS_ALIGNED_STACK_H diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h new file mode 100644 index 0000000..8a3a759 --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h @@ -0,0 +1,36 @@ + +#ifndef PLS_ALIGNED_STACK_IMPL_H +#define PLS_ALIGNED_STACK_IMPL_H + +namespace pls { + namespace internal { + namespace data_structures { + template + T* aligned_stack::push(const T& object) { + // Copy-Construct + return new ((void*)push())T(object); + } + + template + void* aligned_stack::push() { + void* result = reinterpret_cast(head_); + + // Move head to next aligned position after new object + head_ = base::alignment::next_alignment(head_ + sizeof(T)); + if (head_ >= memory_end_) { + PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!"); + } + + return result; + } + + template + T aligned_stack::pop() { + head_ = head_ - base::alignment::next_alignment(sizeof(T)); + return *reinterpret_cast(head_); + } + } + } +} + +#endif //PLS_ALIGNED_STACK_IMPL_H diff --git a/lib/pls/include/pls/internal/helpers/mini_benchmark.h b/lib/pls/include/pls/internal/helpers/mini_benchmark.h index d153a32..dac7237 100644 --- a/lib/pls/include/pls/internal/helpers/mini_benchmark.h +++ b/lib/pls/include/pls/internal/helpers/mini_benchmark.h @@ -13,7 +13,7 @@ namespace pls { namespace helpers { // TODO: Clean up (separate into small functions and .cpp file) template - void run_mini_benchmark(const Function& lambda, size_t max_threads, long max_runtime_ms=1000) { + void run_mini_benchmark(const Function& lambda, size_t max_threads, unsigned long max_runtime_ms=1000) { using namespace std; using namespace pls::internal::scheduling; diff --git a/lib/pls/include/pls/internal/helpers/unique_id.h b/lib/pls/include/pls/internal/helpers/unique_id.h new file mode 100644 index 0000000..918021c --- /dev/null +++ b/lib/pls/include/pls/internal/helpers/unique_id.h @@ -0,0 +1,31 @@ + +#ifndef PLS_UNIQUE_ID_H +#define PLS_UNIQUE_ID_H + +#include +#include +#include + +namespace pls { + namespace internal { + namespace helpers { + struct unique_id { + const uint32_t id_; + const std::type_info& type_; + bool operator==(const unique_id& other) const { return id_ == other.id_ && type_ == other.type_; } + + static constexpr unique_id create(const uint32_t id) { + return unique_id(id, typeid(void)); + } + template + static constexpr unique_id create() { + return unique_id(UINT32_MAX, typeid(std::tuple)); + } + private: + explicit constexpr unique_id(const uint32_t id, const std::type_info& type): id_{id}, type_{type} {}; + }; + } + } +} + +#endif //PLS_UNIQUE_ID_H diff --git a/lib/pls/include/pls/internal/scheduling/abstract_task.h b/lib/pls/include/pls/internal/scheduling/abstract_task.h index 952a530..c239811 100644 --- a/lib/pls/include/pls/internal/scheduling/abstract_task.h +++ b/lib/pls/include/pls/internal/scheduling/abstract_task.h @@ -2,33 +2,23 @@ #ifndef PLS_ABSTRACT_TASK_H #define PLS_ABSTRACT_TASK_H -#define PLS_UNIQUE_ID __COUNTER__ - #include "pls/internal/base/spin_lock.h" +#include "pls/internal/helpers/unique_id.h" namespace pls { namespace internal { namespace scheduling { class abstract_task { public: - struct id { - uint32_t id_; - bool auto_generated_; - - explicit id(uint32_t id, bool auto_generated=false): id_{id}, auto_generated_{auto_generated} {}; - - bool operator==(const abstract_task::id& other) const { - return id_ == other.id_ && auto_generated_ == other.auto_generated_; - } - }; + using id = helpers::unique_id; private: - int depth_; + unsigned int depth_; abstract_task::id unique_id_; abstract_task* child_task_; public: - abstract_task(const int depth, const abstract_task::id& unique_id): + abstract_task(const unsigned int depth, const abstract_task::id& unique_id): depth_{depth}, unique_id_{unique_id}, child_task_{nullptr} {} @@ -37,8 +27,8 @@ namespace pls { void set_child(abstract_task* child_task) { child_task_ = child_task; } abstract_task* child() { return child_task_; } - void set_depth(int depth) { depth_ = depth; } - int depth() const { return depth_; } + void set_depth(unsigned int depth) { depth_ = depth; } + unsigned int depth() const { return depth_; } id unique_id() const { return unique_id_; } protected: virtual bool internal_stealing(abstract_task* other_task) = 0; diff --git a/lib/pls/include/pls/internal/scheduling/fork_join_task.h b/lib/pls/include/pls/internal/scheduling/fork_join_task.h index efcd395..b7b0da3 100644 --- a/lib/pls/include/pls/internal/scheduling/fork_join_task.h +++ b/lib/pls/include/pls/internal/scheduling/fork_join_task.h @@ -77,27 +77,9 @@ namespace pls { bool split_task(base::spin_lock* /*lock*/) override; public: - explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id): - abstract_task{0, id}, - root_task_{root_task}, - currently_executing_{nullptr}, - my_stack_{nullptr}, - deque_{}, - last_stolen_{nullptr} {}; - - void execute() override { - PROFILE_WORK_BLOCK("execute fork_join_task"); - - // Bind this instance to our OS thread - my_stack_ = base::this_thread::state()->task_stack_; - root_task_->tbb_task_ = this; - root_task_->stack_state_ = my_stack_->save_state(); - - // Execute it on our OS thread until its finished - root_task_->execute(); - } - - fork_join_sub_task* currently_executing() const { return currently_executing_; } + explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id); + void execute() override; + fork_join_sub_task* currently_executing() const; }; template diff --git a/lib/pls/include/pls/internal/scheduling/root_task.h b/lib/pls/include/pls/internal/scheduling/root_task.h index 6834b6b..5b2c3cb 100644 --- a/lib/pls/include/pls/internal/scheduling/root_task.h +++ b/lib/pls/include/pls/internal/scheduling/root_task.h @@ -17,12 +17,14 @@ namespace pls { Function function_; std::atomic_uint8_t finished_; public: + static constexpr auto create_id = helpers::unique_id::create>; + explicit root_task(Function function): - abstract_task{0, id{0}}, + abstract_task{0, create_id()}, function_{function}, finished_{0} {} root_task(const root_task& other): - abstract_task{0, id{0}}, + abstract_task{0, create_id()}, function_{other.function_}, finished_{0} {} @@ -50,8 +52,10 @@ namespace pls { root_task* master_task_; public: + static constexpr auto create_id = root_task::create_id; + explicit root_worker_task(root_task* master_task): - abstract_task{0, id{0}}, + abstract_task{0, create_id()}, master_task_{master_task} {} void execute() override { diff --git a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h index f8fd9ef..06539e1 100644 --- a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h +++ b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h @@ -36,8 +36,10 @@ namespace pls { return counter; } public: + static constexpr auto create_id = helpers::unique_id::create>; + run_on_n_threads_task(Function function, int num_threads): - abstract_task{0, id{PLS_UNIQUE_ID, true}}, + abstract_task{0, create_id()}, function_{function}, counter{num_threads - 1} {} @@ -65,8 +67,10 @@ namespace pls { Function function_; run_on_n_threads_task* root_; public: + static constexpr auto create_id = helpers::unique_id::create>; + run_on_n_threads_task_worker(Function function, run_on_n_threads_task* root): - abstract_task{0, id{PLS_UNIQUE_ID, true}}, + abstract_task{0, create_id()}, function_{function}, root_{root} {} diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index a9e2da5..4f01e89 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -34,76 +34,34 @@ namespace pls { explicit scheduler(scheduler_memory* memory, unsigned int num_threads); ~scheduler(); + /** + * Wakes up the thread pool. + * Code inside the Function lambda can invoke all parallel APIs. + * + * @param work_section generic function or lambda to be executed in the scheduler's context. + */ template - void perform_work(Function work_section) { - PROFILE_WORK_BLOCK("scheduler::perform_work") - root_task master{work_section}; - - // Push root task on stacks - auto new_master = memory_->task_stack_for(0)->push(master); - memory_->thread_state_for(0)->root_task_ = new_master; - memory_->thread_state_for(0)->current_task_ = new_master; - for (unsigned int i = 1; i < num_threads_; i++) { - root_worker_task worker{new_master}; - auto new_worker = memory_->task_stack_for(0)->push(worker); - memory_->thread_state_for(i)->root_task_ = new_worker; - memory_->thread_state_for(i)->current_task_ = new_worker; - } - - // Perform and wait for work - sync_barrier_.wait(); // Trigger threads to wake up - sync_barrier_.wait(); // Wait for threads to finish - - // Clean up stack - memory_->task_stack_for(0)->pop(); - for (unsigned int i = 1; i < num_threads_; i++) { - root_worker_task worker{new_master}; - memory_->task_stack_for(0)->pop(); - } - } - - // TODO: See if we should place this differently (only for performance reasons) + void perform_work(Function work_section); + + /** + * Executes a top-level-task (children of abstract_task) on this thread. + * + * @param task The task to be executed. + * @param depth Optional: depth of the new task, otherwise set implicitly. + */ template - static void execute_task(Task& task, int depth=-1) { - static_assert(std::is_base_of::value, "Only pass abstract_task subclasses!"); - - auto my_state = base::this_thread::state(); - abstract_task* old_task; - abstract_task* new_task; - - // Init Task - { - std::lock_guard lock{my_state->lock_}; - old_task = my_state->current_task_; - new_task = my_state->task_stack_->push(task); - - new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); - my_state->current_task_ = new_task; - old_task->set_child(new_task); - } - - // Run Task - new_task->execute(); - - // Teardown state back to before the task was executed - { - std::lock_guard lock{my_state->lock_}; - - old_task->set_child(nullptr); - my_state->current_task_ = old_task; - - my_state->task_stack_->pop(); - } - } + static void execute_task(Task& task, int depth=-1); static abstract_task* current_task() { return base::this_thread::state()->current_task_; } void terminate(bool wait_for_workers=true); + unsigned int num_threads() const { return num_threads_; } thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); } }; } } } +#include "scheduler_impl.h" #endif //PLS_SCHEDULER_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h new file mode 100644 index 0000000..869a5e3 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -0,0 +1,72 @@ + +#ifndef PLS_SCHEDULER_IMPL_H +#define PLS_SCHEDULER_IMPL_H + +namespace pls { + namespace internal { + namespace scheduling { + template + void scheduler::perform_work(Function work_section) { + PROFILE_WORK_BLOCK("scheduler::perform_work") + root_task master{work_section}; + + // Push root task on stacks + auto new_master = memory_->task_stack_for(0)->push(master); + memory_->thread_state_for(0)->root_task_ = new_master; + memory_->thread_state_for(0)->current_task_ = new_master; + for (unsigned int i = 1; i < num_threads_; i++) { + root_worker_task worker{new_master}; + auto new_worker = memory_->task_stack_for(0)->push(worker); + memory_->thread_state_for(i)->root_task_ = new_worker; + memory_->thread_state_for(i)->current_task_ = new_worker; + } + + // Perform and wait for work + sync_barrier_.wait(); // Trigger threads to wake up + sync_barrier_.wait(); // Wait for threads to finish + + // Clean up stack + memory_->task_stack_for(0)->pop(); + for (unsigned int i = 1; i < num_threads_; i++) { + root_worker_task worker{new_master}; + memory_->task_stack_for(0)->pop(); + } + } + + template + void scheduler::execute_task(Task& task, int depth) { + static_assert(std::is_base_of::value, "Only pass abstract_task subclasses!"); + + auto my_state = base::this_thread::state(); + abstract_task* old_task; + abstract_task* new_task; + + // Init Task + { + std::lock_guard lock{my_state->lock_}; + old_task = my_state->current_task_; + new_task = my_state->task_stack_->push(task); + + new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); + my_state->current_task_ = new_task; + old_task->set_child(new_task); + } + + // Run Task + new_task->execute(); + + // Teardown state back to before the task was executed + { + std::lock_guard lock{my_state->lock_}; + + old_task->set_child(nullptr); + my_state->current_task_ = old_task; + + my_state->task_stack_->pop(); + } + } + } + } +} + +#endif //PLS_SCHEDULER_IMPL_H diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index ac5c4ec..35cb545 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -5,6 +5,7 @@ #include "pls/internal/scheduling/abstract_task.h" #include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/scheduler.h" +#include "pls/internal/helpers/unique_id.h" namespace pls { using internal::scheduling::static_scheduler_memory; @@ -13,6 +14,8 @@ namespace pls { using internal::scheduling::scheduler; using task_id = internal::scheduling::abstract_task::id; + using unique_id = internal::helpers::unique_id; + using internal::scheduling::fork_join_sub_task; using internal::scheduling::fork_join_task; diff --git a/lib/pls/src/algorithms/.gitkeep b/lib/pls/src/algorithms/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lib/pls/src/algorithms/.gitkeep diff --git a/lib/pls/src/algorithms/invoke_parallel.cpp b/lib/pls/src/algorithms/invoke_parallel.cpp deleted file mode 100644 index 598d59f..0000000 --- a/lib/pls/src/algorithms/invoke_parallel.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "pls/algorithms/invoke_parallel.h" diff --git a/lib/pls/src/internal/helpers/.gitkeep b/lib/pls/src/internal/helpers/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lib/pls/src/internal/helpers/.gitkeep diff --git a/lib/pls/src/internal/scheduling/abstract_task.cpp b/lib/pls/src/internal/scheduling/abstract_task.cpp index bfca31f..1b6a21f 100644 --- a/lib/pls/src/internal/scheduling/abstract_task.cpp +++ b/lib/pls/src/internal/scheduling/abstract_task.cpp @@ -33,6 +33,7 @@ namespace pls { } PROFILE_END_BLOCK + // Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation PROFILE_STEALING("Internal Steal") if (current_task != nullptr) { // See if it equals our type and depth of task @@ -52,7 +53,7 @@ namespace pls { // Execute 'top level task steal' if possible - // (only try deeper tasks to keep depth restricted stealing) + // (only try deeper tasks to keep depth restricted stealing). PROFILE_STEALING("Top Level Steal") while (current_task != nullptr) { auto lock = &target_state->lock_; diff --git a/lib/pls/src/internal/scheduling/fork_join_task.cpp b/lib/pls/src/internal/scheduling/fork_join_task.cpp index 164f804..0c5f51f 100644 --- a/lib/pls/src/internal/scheduling/fork_join_task.cpp +++ b/lib/pls/src/internal/scheduling/fork_join_task.cpp @@ -107,6 +107,28 @@ namespace pls { scheduler::execute_task(task, depth()); return true; } + + void fork_join_task::execute() { + PROFILE_WORK_BLOCK("execute fork_join_task"); + + // Bind this instance to our OS thread + my_stack_ = base::this_thread::state()->task_stack_; + root_task_->tbb_task_ = this; + root_task_->stack_state_ = my_stack_->save_state(); + + // Execute it on our OS thread until its finished + root_task_->execute(); + } + + fork_join_sub_task* fork_join_task::currently_executing() const { return currently_executing_; } + + fork_join_task::fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id): + abstract_task{0, id}, + root_task_{root_task}, + currently_executing_{nullptr}, + my_stack_{nullptr}, + deque_{}, + last_stolen_{nullptr} {}; } } } diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index f116f1b..d3a340d 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -58,7 +58,7 @@ TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_t my_scheduler.perform_work([&] (){ once_sub_task sub_task{&counter, start_counter}; - fork_join_task task{&sub_task, task_id{42}}; + fork_join_task task{&sub_task, unique_id::create(42)}; scheduler::execute_task(task); }); @@ -71,7 +71,7 @@ TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_t my_scheduler.perform_work([&] (){ std::atomic dummy_parent{1}, overall_counter{8}; force_steal_sub_task sub_task{&dummy_parent, &overall_counter}; - fork_join_task task{&sub_task, task_id{42}}; + fork_join_task task{&sub_task, unique_id::create(42)}; scheduler::execute_task(task); }); my_scheduler.terminate(true);