diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index 0b0f505..d10012b 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -12,76 +12,75 @@ static constexpr int INPUT_SIZE = 2064; typedef std::vector> complex_vector; void divide(complex_vector::iterator data, int n) { - complex_vector tmp_odd_elements(n / 2); - for (int i = 0; i < n / 2; i++) { - tmp_odd_elements[i] = data[i * 2 + 1]; - } - for (int i = 0; i < n / 2; i++) { - data[i] = data[i * 2]; - } - for (int i = 0; i < n / 2; i++) { - data[i + n / 2] = tmp_odd_elements[i]; - } + complex_vector tmp_odd_elements(n / 2); + for (int i = 0; i < n / 2; i++) { + tmp_odd_elements[i] = data[i * 2 + 1]; + } + for (int i = 0; i < n / 2; i++) { + data[i] = data[i * 2]; + } + for (int i = 0; i < n / 2; i++) { + data[i + n / 2] = tmp_odd_elements[i]; + } } void combine(complex_vector::iterator data, int n) { - for (int i = 0; i < n / 2; i++) { - std::complex even = data[i]; - std::complex odd = data[i + n / 2]; + for (int i = 0; i < n / 2; i++) { + std::complex even = data[i]; + std::complex odd = data[i + n / 2]; - // w is the "twiddle-factor". - // this could be cached, but we run the same 'data_structures' algorithm parallel/serial, - // so it won't impact the performance comparison. - std::complex w = exp(std::complex(0, -2. * M_PI * i / n)); + // w is the "twiddle-factor". + // this could be cached, but we run the same 'data_structures' algorithm parallel/serial, + // so it won't impact the performance comparison. + std::complex w = exp(std::complex(0, -2. * M_PI * i / n)); - data[i] = even + w * odd; - data[i + n / 2] = even - w * odd; - } + data[i] = even + w * odd; + data[i + n / 2] = even - w * odd; + } } void fft(complex_vector::iterator data, int n) { - if (n < 2) { - return; - } + if (n < 2) { + return; + } - divide(data, n); - if (n <= CUTOFF) { - fft(data, n / 2); - fft(data + n / 2, n / 2); - } else { - pls::invoke_parallel( - [&] { fft(data, n / 2); }, - [&] { fft(data + n / 2, n / 2); } - ); - } - combine(data, n); + divide(data, n); + if (n <= CUTOFF) { + fft(data, n / 2); + fft(data + n / 2, n / 2); + } else { + pls::invoke_parallel( + [&] { fft(data, n / 2); }, + [&] { fft(data + n / 2, n / 2); } + ); + } + combine(data, n); } complex_vector prepare_input(int input_size) { - std::vector known_frequencies{2, 11, 52, 88, 256}; - complex_vector data(input_size); + std::vector known_frequencies{2, 11, 52, 88, 256}; + complex_vector data(input_size); - // Set our input data to match a time series of the known_frequencies. - // When applying fft to this time-series we should find these frequencies. - for (int i = 0; i < input_size; i++) { - data[i] = std::complex(0.0, 0.0); - for (auto frequencie : known_frequencies) { - data[i] += sin(2 * M_PI * frequencie * i / input_size); - } + // Set our input data to match a time series of the known_frequencies. + // When applying fft to this time-series we should find these frequencies. + for (int i = 0; i < input_size; i++) { + data[i] = std::complex(0.0, 0.0); + for (auto frequencie : known_frequencies) { + data[i] += sin(2 * M_PI * frequencie * i / input_size); } + } - return data; + return data; } - int main() { - PROFILE_ENABLE - complex_vector initial_input = prepare_input(INPUT_SIZE); + PROFILE_ENABLE + complex_vector initial_input = prepare_input(INPUT_SIZE); - pls::internal::helpers::run_mini_benchmark([&] { - complex_vector input = initial_input; - fft(input.begin(), input.size()); - }, 8, 4000); + pls::internal::helpers::run_mini_benchmark([&] { + complex_vector input = initial_input; + fft(input.begin(), input.size()); + }, 8, 4000); - PROFILE_SAVE("test_profile.prof") + PROFILE_SAVE("test_profile.prof") } diff --git a/app/invoke_parallel/main.cpp b/app/invoke_parallel/main.cpp index 4ae48ef..371f90f 100644 --- a/app/invoke_parallel/main.cpp +++ b/app/invoke_parallel/main.cpp @@ -8,44 +8,44 @@ static pls::static_scheduler_memory<8, 2 << 14> my_scheduler_memory; static constexpr int CUTOFF = 10; long fib_serial(long n) { - if (n == 0) { - return 0; - } - if (n == 1) { - return 1; - } - - return fib_serial(n - 1) + fib_serial(n - 2); + if (n == 0) { + return 0; + } + if (n == 1) { + return 1; + } + + return fib_serial(n - 1) + fib_serial(n - 2); } long fib(long n) { - if (n <= CUTOFF) { - return fib_serial(n); - } - - // Actual 'invoke_parallel' logic/code - int left, right; - pls::invoke_parallel( - [&] { left = fib(n - 1); }, - [&] { right = fib(n - 2); } - ); - return left + right; + if (n <= CUTOFF) { + return fib_serial(n); + } + + // Actual 'invoke_parallel' logic/code + int left, right; + pls::invoke_parallel( + [&] { left = fib(n - 1); }, + [&] { right = fib(n - 2); } + ); + return left + right; } int main() { - PROFILE_ENABLE - pls::scheduler scheduler{&my_scheduler_memory, 8}; - - long result; - scheduler.perform_work([&] { - PROFILE_MAIN_THREAD - // Call looks just the same, only requirement is - // the enclosure in the perform_work lambda. - for (int i = 0; i < 10; i++) { - result = fib(30); - std::cout << "Fib(30)=" << result << std::endl; - } - }); - - PROFILE_SAVE("test_profile.prof") + PROFILE_ENABLE + pls::scheduler scheduler{&my_scheduler_memory, 8}; + + long result; + scheduler.perform_work([&] { + PROFILE_MAIN_THREAD + // Call looks just the same, only requirement is + // the enclosure in the perform_work lambda. + for (int i = 0; i < 10; i++) { + result = fib(30); + std::cout << "Fib(30)=" << result << std::endl; + } + }); + + PROFILE_SAVE("test_profile.prof") } diff --git a/app/playground/main.cpp b/app/playground/main.cpp index cc7e784..442d5c6 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -10,8 +10,9 @@ #include #include - int main() { - std::cout << pls::internal::scheduling::root_task::create_id().type_.hash_code() << std::endl; - std::cout << pls::internal::helpers::unique_id::create>().type_.hash_code() << std::endl; + std::cout << pls::internal::scheduling::root_task::create_id().type_.hash_code() << std::endl; + std::cout + << pls::internal::helpers::unique_id::create>().type_.hash_code() + << std::endl; } diff --git a/app/test_for_new/main.cpp b/app/test_for_new/main.cpp index 2e74529..51f454c 100644 --- a/app/test_for_new/main.cpp +++ b/app/test_for_new/main.cpp @@ -5,9 +5,8 @@ using namespace pls::internal::base; int global = 0; - int main() { - // Try to use every feature, to trigger the prohibited use of new if found somewhere - auto t1 = start_thread([] (){}); - t1.join(); + // Try to use every feature, to trigger the prohibited use of new if found somewhere + auto t1 = start_thread([]() {}); + t1.join(); } diff --git a/lib/pls/include/pls/algorithms/invoke_parallel.h b/lib/pls/include/pls/algorithms/invoke_parallel.h index dc44469..17b439e 100644 --- a/lib/pls/include/pls/algorithms/invoke_parallel.h +++ b/lib/pls/include/pls/algorithms/invoke_parallel.h @@ -6,15 +6,17 @@ #include "pls/internal/scheduling/scheduler.h" namespace pls { - namespace algorithm { - template - void invoke_parallel(const Function1& function1, const Function2& function2); +namespace algorithm { - template - void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3); +template +void invoke_parallel(const Function1 &function1, const Function2 &function2); - // ...and so on, add more if we decide to keep this design - } +template +void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3); + +// ...and so on, add more if we decide to keep this design + +} } #include "invoke_parallel_impl.h" diff --git a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h index 7dfbef8..44cbc74 100644 --- a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h +++ b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h @@ -7,65 +7,67 @@ #include "pls/internal/helpers/unique_id.h" namespace pls { - namespace algorithm { - namespace internal { - using namespace ::pls::internal::scheduling; +namespace algorithm { +namespace internal { - template - inline void run_body(const Body& internal_body, const abstract_task::id& id) { - // Make sure we are in the context of this invoke_parallel instance, - // if not we will spawn it as a new 'fork-join-style' task. - auto current_task = scheduler::current_task(); - if (current_task->unique_id() == id) { - auto current_sub_task = reinterpret_cast(current_task)->currently_executing(); - internal_body(current_sub_task); - } else { - fork_join_lambda root_body(&internal_body); - fork_join_task root_task{&root_body, id}; - scheduler::execute_task(root_task); - } - } - } +using namespace ::pls::internal::scheduling; - template - void invoke_parallel(const Function1& function1, const Function2& function2) { - using namespace ::pls::internal::scheduling; - using namespace ::pls::internal::helpers; - static abstract_task::id id = unique_id::create(); +template +inline void run_body(const Body &internal_body, const abstract_task::id &id) { + // Make sure we are in the context of this invoke_parallel instance, + // if not we will spawn it as a new 'fork-join-style' task. + auto current_task = scheduler::current_task(); + if (current_task->unique_id() == id) { + auto current_sub_task = reinterpret_cast(current_task)->currently_executing(); + internal_body(current_sub_task); + } else { + fork_join_lambda root_body(&internal_body); + fork_join_task root_task{&root_body, id}; + scheduler::execute_task(root_task); + } +} +} - auto internal_body = [&] (fork_join_sub_task* this_task){ - auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; - auto sub_task_1 = fork_join_lambda(&sub_task_body_1); +template +void invoke_parallel(const Function1 &function1, const Function2 &function2) { + using namespace ::pls::internal::scheduling; + using namespace ::pls::internal::helpers; + static abstract_task::id id = unique_id::create(); - this_task->spawn_child(sub_task_1); - function2(); // Execute last function 'inline' without spawning a sub_task object - this_task->wait_for_all(); - }; + auto internal_body = [&](fork_join_sub_task *this_task) { + auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); }; + auto sub_task_1 = fork_join_lambda(&sub_task_body_1); - internal::run_body(internal_body, id); - } + this_task->spawn_child(sub_task_1); + function2(); // Execute last function 'inline' without spawning a sub_task object + this_task->wait_for_all(); + }; + + internal::run_body(internal_body, id); +} - template - void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3) { - using namespace ::pls::internal::scheduling; - using namespace ::pls::internal::helpers; - static abstract_task::id id = unique_id::create(); +template +void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3) { + using namespace ::pls::internal::scheduling; + using namespace ::pls::internal::helpers; + static abstract_task::id id = unique_id::create(); - auto internal_body = [&] (fork_join_sub_task* this_task){ - auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; - auto sub_task_1 = fork_join_lambda(&sub_task_body_1); - auto sub_task_body_2 = [&] (fork_join_sub_task*){ function2(); }; - auto sub_task_2 = fork_join_lambda(&sub_task_body_2); + auto internal_body = [&](fork_join_sub_task *this_task) { + auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); }; + auto sub_task_1 = fork_join_lambda(&sub_task_body_1); + auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); }; + auto sub_task_2 = fork_join_lambda(&sub_task_body_2); - this_task->spawn_child(sub_task_1); - this_task->spawn_child(sub_task_2); - function3(); // Execute last function 'inline' without spawning a sub_task object - this_task->wait_for_all(); - }; + this_task->spawn_child(sub_task_1); + this_task->spawn_child(sub_task_2); + function3(); // Execute last function 'inline' without spawning a sub_task object + this_task->wait_for_all(); + }; - internal::run_body(internal_body, id); - } - } + internal::run_body(internal_body, id); +} + +} } #endif //PLS_INVOKE_PARALLEL_IMPL_H diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index 2dfb474..a531c58 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -8,21 +8,23 @@ #include "system_details.h" namespace pls { - namespace internal { - namespace base { - namespace alignment { - template - struct aligned_wrapper { - alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)]; - T* pointer() { return reinterpret_cast(data); } - }; - void* allocate_aligned(size_t size); - - std::uintptr_t next_alignment(std::uintptr_t size); - char* next_alignment(char* pointer); - } - } - } +namespace internal { +namespace base { +namespace alignment { + +template +struct aligned_wrapper { + alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)]; + T *pointer() { return reinterpret_cast(data); } +}; +void *allocate_aligned(size_t size); + +std::uintptr_t next_alignment(std::uintptr_t size); +char *next_alignment(char *pointer); + +} +} +} } #endif //PLS_ALIGNMENT_H diff --git a/lib/pls/include/pls/internal/base/barrier.h b/lib/pls/include/pls/internal/base/barrier.h index 996f0e0..7f7653e 100644 --- a/lib/pls/include/pls/internal/base/barrier.h +++ b/lib/pls/include/pls/internal/base/barrier.h @@ -5,27 +5,29 @@ #include namespace pls { - namespace internal { - namespace base { - /** - * Provides standard barrier behaviour. - * `count` threads have to call `wait()` before any of the `wait()` calls returns, - * thus blocking all threads until everyone reached the barrier. - * - * PORTABILITY: - * Current implementation is based on pthreads. - */ - class barrier { - pthread_barrier_t barrier_; - - public: - explicit barrier(unsigned int count); - ~barrier(); - - void wait(); - }; - } - } +namespace internal { +namespace base { + +/** + * Provides standard barrier behaviour. + * `count` threads have to call `wait()` before any of the `wait()` calls returns, + * thus blocking all threads until everyone reached the barrier. + * + * PORTABILITY: + * Current implementation is based on pthreads. + */ +class barrier { + pthread_barrier_t barrier_; + + public: + explicit barrier(unsigned int count); + ~barrier(); + + void wait(); +}; + +} +} } #endif //PLS_BARRIER_H diff --git a/lib/pls/include/pls/internal/base/spin_lock.h b/lib/pls/include/pls/internal/base/spin_lock.h index 36f4bc3..2506145 100644 --- a/lib/pls/include/pls/internal/base/spin_lock.h +++ b/lib/pls/include/pls/internal/base/spin_lock.h @@ -6,12 +6,14 @@ #include "ttas_spin_lock.h" namespace pls { - namespace internal { - namespace base { - // Default Spin-Lock implementation for this project. - using spin_lock = tas_spin_lock; - } - } +namespace internal { +namespace base { + +// Default Spin-Lock implementation for this project. +using spin_lock = tas_spin_lock; + +} +} } #endif //PLS_SPINLOCK_H diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h index 28f7dff..af2e71f 100644 --- a/lib/pls/include/pls/internal/base/system_details.h +++ b/lib/pls/include/pls/internal/base/system_details.h @@ -5,29 +5,31 @@ #include namespace pls { - namespace internal { - namespace base { - /** - * Collection of system details, e.g. hardware cache line size. - * - * PORTABILITY: - * Currently sane default values for x86. - */ - namespace system_details { - /** - * Most processors have 64 byte cache lines - */ - constexpr std::uintptr_t CACHE_LINE_SIZE = 64; +namespace internal { +namespace base { - /** - * Choose one of the following ways to store thread specific data. - * Try to choose the fastest available on this processor/system. - */ +/** + * Collection of system details, e.g. hardware cache line size. + * + * PORTABILITY: + * Currently sane default values for x86. + */ +namespace system_details { +/** + * Most processors have 64 byte cache lines + */ +constexpr std::uintptr_t CACHE_LINE_SIZE = 64; + +/** + * Choose one of the following ways to store thread specific data. + * Try to choose the fastest available on this processor/system. + */ // #define PLS_THREAD_SPECIFIC_PTHREAD - #define PLS_THREAD_SPECIFIC_COMPILER - } - } - } +#define PLS_THREAD_SPECIFIC_COMPILER + +} +} +} } #endif //PLS_SYSTEM_DETAILS_H diff --git a/lib/pls/include/pls/internal/base/tas_spin_lock.h b/lib/pls/include/pls/internal/base/tas_spin_lock.h index 17db757..d2c1465 100644 --- a/lib/pls/include/pls/internal/base/tas_spin_lock.h +++ b/lib/pls/include/pls/internal/base/tas_spin_lock.h @@ -10,30 +10,30 @@ #include "pls/internal/base/thread.h" namespace pls { - namespace internal { - namespace base { - /** - * A simple set and test_and_set based spin lock implementation. - * - * PORTABILITY: - * Current implementation is based on C++ 11 atomic_flag. - */ - class tas_spin_lock { - std::atomic_flag flag_; - unsigned int yield_at_tries_; - - - public: - tas_spin_lock(): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {}; - tas_spin_lock(const tas_spin_lock& other): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {} - - void lock(); - bool try_lock(unsigned int num_tries=1); - void unlock(); - }; - } - } -} +namespace internal { +namespace base { + +/** + * A simple set and test_and_set based spin lock implementation. + * + * PORTABILITY: + * Current implementation is based on C++ 11 atomic_flag. + */ +class tas_spin_lock { + std::atomic_flag flag_; + unsigned int yield_at_tries_; + + public: + tas_spin_lock() : flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {}; + tas_spin_lock(const tas_spin_lock &other) : flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {} + + void lock(); + bool try_lock(unsigned int num_tries = 1); + void unlock(); +}; +} +} +} #endif //PLS_TAS_SPIN_LOCK_H diff --git a/lib/pls/include/pls/internal/base/thread.h b/lib/pls/include/pls/internal/base/thread.h index c06b10b..48dc59c 100644 --- a/lib/pls/include/pls/internal/base/thread.h +++ b/lib/pls/include/pls/internal/base/thread.h @@ -13,109 +13,112 @@ #include "system_details.h" namespace pls { - namespace internal { - namespace base { - using thread_entrypoint = void(); - - /** - * Static methods than can be performed on the current thread. - * - * usage: - * this_thread::yield(); - * T* state = this_thread::state(); - * - * PORTABILITY: - * Current implementation is based on pthreads. - */ - class this_thread { - template - friend class thread; +namespace internal { +namespace base { + +using thread_entrypoint = void(); + +/** + * Static methods than can be performed on the current thread. + * + * usage: + * this_thread::yield(); + * T* state = this_thread::state(); + * + * PORTABILITY: + * Current implementation is based on pthreads. + */ +class this_thread { + template + friend + class thread; #ifdef PLS_THREAD_SPECIFIC_PTHREAD - static pthread_key_t local_storage_key_; - static bool local_storage_key_initialized_; + static pthread_key_t local_storage_key_; + static bool local_storage_key_initialized_; #endif #ifdef PLS_THREAD_SPECIFIC_COMPILER - static __thread void* local_state_; + static __thread void *local_state_; #endif - public: - static void yield() { - pthread_yield(); - } - - /** - * Retrieves the local state pointer. - * - * @tparam T The type of the state that is stored. - * @return The state pointer hold for this thread. - */ - template - static T* state(); - - /** - * Stores a pointer to the thread local state object. - * The memory management for this has to be done by the user, - * we only keep the pointer. - * - * @tparam T The type of the state that is stored. - * @param state_pointer A pointer to the threads state object. - */ - template - static void set_state(T* state_pointer); - }; - - /** - * Abstraction for starting a function in a separate thread. - * - * @tparam Function Lambda being started on the new thread. - * @tparam State State type held for this thread. - * - * usage: - * T* state; - * auto thread = start_thread([] { - * // Run on new thread - * }, state); - * thread.join(); // Wait for it to finish - * - * PORTABILITY: - * Current implementation is based on pthreads. - */ - template - class thread { - friend class this_thread; - // Keep a copy of the function (lambda) in this object to make sure it is valid when called! - Function function_; - State* state_pointer_; - - // Wee need to wait for the started function to read - // the function_ and state_pointer_ property before returning - // from the constructor, as the object might be moved after this. - std::atomic_flag* startup_flag_; - - // Keep handle to native implementation - pthread_t pthread_thread_; - - static void* start_pthread_internal(void* thread_pointer); - - public: - explicit thread(const Function& function, State* state_pointer); - - public: - void join(); - - // make object move only - thread(thread&&) noexcept = default; - thread& operator=(thread&&) noexcept = default; - - thread(const thread&) = delete; - thread& operator=(const thread&) = delete; - }; - - template - thread start_thread(const Function& function, State* state_pointer); - template - thread start_thread(const Function& function); - } - } + public: + static void yield() { + pthread_yield(); + } + + /** + * Retrieves the local state pointer. + * + * @tparam T The type of the state that is stored. + * @return The state pointer hold for this thread. + */ + template + static T *state(); + + /** + * Stores a pointer to the thread local state object. + * The memory management for this has to be done by the user, + * we only keep the pointer. + * + * @tparam T The type of the state that is stored. + * @param state_pointer A pointer to the threads state object. + */ + template + static void set_state(T *state_pointer); +}; + +/** + * Abstraction for starting a function in a separate thread. + * + * @tparam Function Lambda being started on the new thread. + * @tparam State State type held for this thread. + * + * usage: + * T* state; + * auto thread = start_thread([] { + * // Run on new thread + * }, state); + * thread.join(); // Wait for it to finish + * + * PORTABILITY: + * Current implementation is based on pthreads. + */ +template +class thread { + friend class this_thread; + // Keep a copy of the function (lambda) in this object to make sure it is valid when called! + Function function_; + State *state_pointer_; + + // Wee need to wait for the started function to read + // the function_ and state_pointer_ property before returning + // from the constructor, as the object might be moved after this. + std::atomic_flag *startup_flag_; + + // Keep handle to native implementation + pthread_t pthread_thread_; + + static void *start_pthread_internal(void *thread_pointer); + + public: + explicit thread(const Function &function, State *state_pointer); + + public: + void join(); + + // make object move only + thread(thread &&) noexcept = default; + thread &operator=(thread &&) noexcept = default; + + thread(const thread &) = delete; + thread &operator=(const thread &) = delete; +}; + +template +thread start_thread(const Function &function, State *state_pointer); +template +thread start_thread(const Function &function); + +} +} } #include "thread_impl.h" diff --git a/lib/pls/include/pls/internal/base/thread_impl.h b/lib/pls/include/pls/internal/base/thread_impl.h index 1ac356a..64320b6 100644 --- a/lib/pls/include/pls/internal/base/thread_impl.h +++ b/lib/pls/include/pls/internal/base/thread_impl.h @@ -3,86 +3,87 @@ #define PLS_THREAD_IMPL_H namespace pls { - namespace internal { - namespace base { - template - T* this_thread::state() { +namespace internal { +namespace base { + +template +T *this_thread::state() { #ifdef PLS_THREAD_SPECIFIC_PTHREAD - return reinterpret_cast(pthread_getspecific(local_storage_key_)); + return reinterpret_cast(pthread_getspecific(local_storage_key_)); #endif #ifdef PLS_THREAD_SPECIFIC_COMPILER - return reinterpret_cast(local_state_); + return reinterpret_cast(local_state_); #endif - } +} - template - void this_thread::set_state(T* state_pointer) { +template +void this_thread::set_state(T *state_pointer) { #ifdef PLS_THREAD_SPECIFIC_PTHREAD - pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer); + pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer); #endif #ifdef PLS_THREAD_SPECIFIC_COMPILER - local_state_ = state_pointer; + local_state_ = state_pointer; #endif - } +} - template - void* thread::start_pthread_internal(void* thread_pointer) { - auto my_thread = reinterpret_cast(thread_pointer); - Function my_function_copy = my_thread->function_; - State* my_state_pointer_copy = my_thread->state_pointer_; +template +void *thread::start_pthread_internal(void *thread_pointer) { + auto my_thread = reinterpret_cast(thread_pointer); + Function my_function_copy = my_thread->function_; + State *my_state_pointer_copy = my_thread->state_pointer_; - // Now we have copies of everything we need on the stack. - // The original thread object can be moved freely (no more - // references to its memory location). - my_thread->startup_flag_->clear(); + // Now we have copies of everything we need on the stack. + // The original thread object can be moved freely (no more + // references to its memory location). + my_thread->startup_flag_->clear(); - this_thread::set_state(my_state_pointer_copy); - my_function_copy(); + this_thread::set_state(my_state_pointer_copy); + my_function_copy(); - // Finished executing the user function - pthread_exit(nullptr); - } + // Finished executing the user function + pthread_exit(nullptr); +} - template - thread::thread(const Function& function, State* state_pointer): - function_{function}, - state_pointer_{state_pointer}, - startup_flag_{nullptr}, - pthread_thread_{} { +template +thread::thread(const Function &function, State *state_pointer): + function_{function}, + state_pointer_{state_pointer}, + startup_flag_{nullptr}, + pthread_thread_{} { #ifdef PLS_THREAD_SPECIFIC_PTHREAD - if (!this_thread::local_storage_key_initialized_) { - pthread_key_create(&this_thread::local_storage_key_, nullptr); - this_thread::local_storage_key_initialized_ = true; - } + if (!this_thread::local_storage_key_initialized_) { + pthread_key_create(&this_thread::local_storage_key_, nullptr); + this_thread::local_storage_key_initialized_ = true; + } #endif - // We only need this during startup, will be destroyed when out of scope - std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; - startup_flag_ = &startup_flag; - - startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return - pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this)); - while (startup_flag.test_and_set()) - ; // Busy waiting for the starting flag to clear - } - - template - void thread::join() { - pthread_join(pthread_thread_, nullptr); - } - - template - thread start_thread(const Function& function, State* state_pointer) { - return thread(function, state_pointer); - } - - template - thread start_thread(const Function& function) { - return thread(function, nullptr); - } - } - } + // We only need this during startup, will be destroyed when out of scope + std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; + startup_flag_ = &startup_flag; + + startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return + pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *) (this)); + while (startup_flag.test_and_set()); // Busy waiting for the starting flag to clear +} + +template +void thread::join() { + pthread_join(pthread_thread_, nullptr); +} + +template +thread start_thread(const Function &function, State *state_pointer) { + return thread(function, state_pointer); +} + +template +thread start_thread(const Function &function) { + return thread(function, nullptr); +} + +} +} } #endif //PLS_THREAD_IMPL_H diff --git a/lib/pls/include/pls/internal/base/ttas_spin_lock.h b/lib/pls/include/pls/internal/base/ttas_spin_lock.h index 592c847..73160ba 100644 --- a/lib/pls/include/pls/internal/base/ttas_spin_lock.h +++ b/lib/pls/include/pls/internal/base/ttas_spin_lock.h @@ -8,30 +8,28 @@ #include "pls/internal/base/thread.h" namespace pls { - namespace internal { - namespace base { - /** - * A simple set and test_and_set based spin lock implementation. - * - * PORTABILITY: - * Current implementation is based on C++ 11 atomic_flag. - */ - class ttas_spin_lock { - std::atomic flag_; - const unsigned int yield_at_tries_; - - - public: - ttas_spin_lock(): flag_{0}, yield_at_tries_{1024} {}; - ttas_spin_lock(const ttas_spin_lock& other): flag_{0}, yield_at_tries_{other.yield_at_tries_} {} - - void lock(); - bool try_lock(unsigned int num_tries=1); - void unlock(); - }; - } - } +namespace internal { +namespace base { +/** + * A simple set and test_and_set based spin lock implementation. + * + * PORTABILITY: + * Current implementation is based on C++ 11 atomic_flag. + */ +class ttas_spin_lock { + std::atomic flag_; + const unsigned int yield_at_tries_; + + public: + ttas_spin_lock() : flag_{0}, yield_at_tries_{1024} {}; + ttas_spin_lock(const ttas_spin_lock &other) : flag_{0}, yield_at_tries_{other.yield_at_tries_} {} + + void lock(); + bool try_lock(unsigned int num_tries = 1); + void unlock(); +}; +} +} } - #endif //PLS_TTAS_SPIN_LOCK_H diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h index 3bb3f72..dc46812 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h @@ -9,45 +9,48 @@ #include "pls/internal/base/alignment.h" namespace pls { - namespace internal { - namespace data_structures { - /** - * Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region. - * The objects will be stored aligned in the stack, making the storage cache friendly and very fast - * (as long as one can live with the stack restrictions). - * - * IMPORTANT: Does not call destructors on stored objects! Do not allocate resources in the objects! - * - * Usage: - * aligned_stack stack{pointer_to_memory, size_of_memory}; - * T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack - * stack.pop(); // Deconstruct the top object of type T - */ - class aligned_stack { - // Keep bounds of our memory block - char* memory_start_; - char* memory_end_; - - // Current head will always be aligned to cache lines - char* head_; - public: - typedef char* state; - - aligned_stack(): memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {}; - aligned_stack(char* memory_region, std::size_t size); - - template - T* push(const T& object); - template - void* push(); - template - T pop(); - - state save_state() const { return head_; } - void reset_state(state new_state) { head_ = new_state; } - }; - } - } +namespace internal { +namespace data_structures { + +/** + * Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region. + * The objects will be stored aligned in the stack, making the storage cache friendly and very fast + * (as long as one can live with the stack restrictions). + * + * IMPORTANT: Does not call destructors on stored objects! Do not allocate resources in the objects! + * + * Usage: + * aligned_stack stack{pointer_to_memory, size_of_memory}; + * T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack + * stack.pop(); // Deconstruct the top object of type T + */ +class aligned_stack { + // Keep bounds of our memory block + char *memory_start_; + char *memory_end_; + + // Current head will always be aligned to cache lines + char *head_; + public: + typedef char *state; + + aligned_stack() : memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {}; + aligned_stack(char *memory_region, std::size_t size); + + template + T *push(const T &object); + template + void *push(); + template + T pop(); + + state save_state() const { return head_; } + void reset_state(state new_state) { head_ = new_state; } +}; + + +} +} } #include "aligned_stack_impl.h" diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h index 8a3a759..f034a4e 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h @@ -3,34 +3,36 @@ #define PLS_ALIGNED_STACK_IMPL_H namespace pls { - namespace internal { - namespace data_structures { - template - T* aligned_stack::push(const T& object) { - // Copy-Construct - return new ((void*)push())T(object); - } - - template - void* aligned_stack::push() { - void* result = reinterpret_cast(head_); - - // Move head to next aligned position after new object - head_ = base::alignment::next_alignment(head_ + sizeof(T)); - if (head_ >= memory_end_) { - PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!"); - } - - return result; - } - - template - T aligned_stack::pop() { - head_ = head_ - base::alignment::next_alignment(sizeof(T)); - return *reinterpret_cast(head_); - } - } - } +namespace internal { +namespace data_structures { + +template +T *aligned_stack::push(const T &object) { + // Copy-Construct + return new((void *) push < T > ())T(object); +} + +template +void *aligned_stack::push() { + void *result = reinterpret_cast(head_); + + // Move head to next aligned position after new object + head_ = base::alignment::next_alignment(head_ + sizeof(T)); + if (head_ >= memory_end_) { + PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!"); + } + + return result; +} + +template +T aligned_stack::pop() { + head_ = head_ - base::alignment::next_alignment(sizeof(T)); + return *reinterpret_cast(head_); +} + +} +} } #endif //PLS_ALIGNED_STACK_IMPL_H diff --git a/lib/pls/include/pls/internal/data_structures/deque.h b/lib/pls/include/pls/internal/data_structures/deque.h index 8652cc3..8f555da 100644 --- a/lib/pls/include/pls/internal/data_structures/deque.h +++ b/lib/pls/include/pls/internal/data_structures/deque.h @@ -5,56 +5,58 @@ #include "pls/internal/base/spin_lock.h" namespace pls { - namespace internal { - namespace data_structures { - /** - * Turns any object into deque item when inheriting from this. - */ - class deque_item { - friend class deque_internal; - - deque_item* prev_; - deque_item* next_; - - }; - - class deque_internal { - protected: - deque_item* head_; - deque_item* tail_; - - base::spin_lock lock_; - - deque_item* pop_head_internal(); - deque_item* pop_tail_internal(); - void push_tail_internal(deque_item *new_item); - }; - - /** - * A double linked list based deque. - * Storage is therefore only needed for the individual items. - * - * @tparam Item The type of items stored in this deque - */ - template - class deque: deque_internal { - public: - explicit deque(): deque_internal{} {} - - inline Item* pop_head() { - return static_cast(pop_head_internal()); - } - - inline Item* pop_tail() { - return static_cast(pop_tail_internal()); - } - - inline void push_tail(Item* new_item) { - push_tail_internal(new_item); - } - }; - } - } +namespace internal { +namespace data_structures { + +/** + * Turns any object into deque item when inheriting from this. + */ +class deque_item { + friend class deque_internal; + + deque_item *prev_; + deque_item *next_; + +}; + +class deque_internal { + protected: + deque_item *head_; + deque_item *tail_; + + base::spin_lock lock_; + + deque_item *pop_head_internal(); + deque_item *pop_tail_internal(); + void push_tail_internal(deque_item *new_item); +}; + +/** + * A double linked list based deque. + * Storage is therefore only needed for the individual items. + * + * @tparam Item The type of items stored in this deque + */ +template +class deque : deque_internal { + public: + explicit deque() : deque_internal{} {} + + inline Item *pop_head() { + return static_cast(pop_head_internal()); + } + + inline Item *pop_tail() { + return static_cast(pop_tail_internal()); + } + + inline void push_tail(Item *new_item) { + push_tail_internal(new_item); + } +}; + +} +} } #endif //PLS_DEQUE_H diff --git a/lib/pls/include/pls/internal/helpers/mini_benchmark.h b/lib/pls/include/pls/internal/helpers/mini_benchmark.h index dac7237..d5bcce4 100644 --- a/lib/pls/include/pls/internal/helpers/mini_benchmark.h +++ b/lib/pls/include/pls/internal/helpers/mini_benchmark.h @@ -9,45 +9,47 @@ #include namespace pls { - namespace internal { - namespace helpers { - // TODO: Clean up (separate into small functions and .cpp file) - template - void run_mini_benchmark(const Function& lambda, size_t max_threads, unsigned long max_runtime_ms=1000) { - using namespace std; - using namespace pls::internal::scheduling; - - malloc_scheduler_memory scheduler_memory{max_threads}; - for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) { - scheduler local_scheduler{&scheduler_memory, num_threads}; - - chrono::high_resolution_clock::time_point start_time; - chrono::high_resolution_clock::time_point end_time; - unsigned long iterations = 0; - local_scheduler.perform_work([&] { - start_time = chrono::high_resolution_clock::now(); - end_time = start_time; - chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms); - - while (end_time < planned_end_time) { - lambda(); - end_time = chrono::high_resolution_clock::now(); - iterations++; - } - }); - - long time = chrono::duration_cast(end_time - start_time).count(); - double time_per_iteration = (double)time / iterations; - - std::cout << time_per_iteration; - if (num_threads < max_threads) { - std::cout << ","; - } - } - std::cout << std::endl; - } - } +namespace internal { +namespace helpers { + +// TODO: Clean up (separate into small functions and .cpp file) +template +void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned long max_runtime_ms = 1000) { + using namespace std; + using namespace pls::internal::scheduling; + + malloc_scheduler_memory scheduler_memory{max_threads}; + for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) { + scheduler local_scheduler{&scheduler_memory, num_threads}; + + chrono::high_resolution_clock::time_point start_time; + chrono::high_resolution_clock::time_point end_time; + unsigned long iterations = 0; + local_scheduler.perform_work([&] { + start_time = chrono::high_resolution_clock::now(); + end_time = start_time; + chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms); + + while (end_time < planned_end_time) { + lambda(); + end_time = chrono::high_resolution_clock::now(); + iterations++; + } + }); + + long time = chrono::duration_cast(end_time - start_time).count(); + double time_per_iteration = (double) time / iterations; + + std::cout << time_per_iteration; + if (num_threads < max_threads) { + std::cout << ","; } + } + std::cout << std::endl; +} + +} +} } #endif //PLS_MINI_BENCHMARK_H diff --git a/lib/pls/include/pls/internal/helpers/prohibit_new.h b/lib/pls/include/pls/internal/helpers/prohibit_new.h index 4ea59cc..d55b34e 100644 --- a/lib/pls/include/pls/internal/helpers/prohibit_new.h +++ b/lib/pls/include/pls/internal/helpers/prohibit_new.h @@ -15,9 +15,9 @@ #ifdef NEW_LINK_ERROR // This will cause a linker error if new is used in the code. // We also exit if it is somehow still called. -inline void * operator new (std::size_t) { - extern int bare_new_erroneously_called(); - exit(bare_new_erroneously_called() | 1); +inline void *operator new(std::size_t) { + extern int bare_new_erroneously_called(); + exit(bare_new_erroneously_called() | 1); } #else // Use this + debugging point to find out where we use a new diff --git a/lib/pls/include/pls/internal/helpers/unique_id.h b/lib/pls/include/pls/internal/helpers/unique_id.h index 918021c..358d17e 100644 --- a/lib/pls/include/pls/internal/helpers/unique_id.h +++ b/lib/pls/include/pls/internal/helpers/unique_id.h @@ -7,25 +7,27 @@ #include namespace pls { - namespace internal { - namespace helpers { - struct unique_id { - const uint32_t id_; - const std::type_info& type_; - bool operator==(const unique_id& other) const { return id_ == other.id_ && type_ == other.type_; } +namespace internal { +namespace helpers { - static constexpr unique_id create(const uint32_t id) { - return unique_id(id, typeid(void)); - } - template - static constexpr unique_id create() { - return unique_id(UINT32_MAX, typeid(std::tuple)); - } - private: - explicit constexpr unique_id(const uint32_t id, const std::type_info& type): id_{id}, type_{type} {}; - }; - } - } +struct unique_id { + const uint32_t id_; + const std::type_info &type_; + bool operator==(const unique_id &other) const { return id_ == other.id_ && type_ == other.type_; } + + static constexpr unique_id create(const uint32_t id) { + return unique_id(id, typeid(void)); + } + template + static constexpr unique_id create() { + return unique_id(UINT32_MAX, typeid(std::tuple)); + } + private: + explicit constexpr unique_id(const uint32_t id, const std::type_info &type) : id_{id}, type_{type} {}; +}; + +} +} } #endif //PLS_UNIQUE_ID_H diff --git a/lib/pls/include/pls/internal/scheduling/abstract_task.h b/lib/pls/include/pls/internal/scheduling/abstract_task.h index c239811..45e5490 100644 --- a/lib/pls/include/pls/internal/scheduling/abstract_task.h +++ b/lib/pls/include/pls/internal/scheduling/abstract_task.h @@ -6,38 +6,40 @@ #include "pls/internal/helpers/unique_id.h" namespace pls { - namespace internal { - namespace scheduling { - class abstract_task { - public: - using id = helpers::unique_id; - - private: - unsigned int depth_; - abstract_task::id unique_id_; - abstract_task* child_task_; - - public: - abstract_task(const unsigned int depth, const abstract_task::id& unique_id): - depth_{depth}, - unique_id_{unique_id}, - child_task_{nullptr} {} - - virtual void execute() = 0; - void set_child(abstract_task* child_task) { child_task_ = child_task; } - abstract_task* child() { return child_task_; } - - void set_depth(unsigned int depth) { depth_ = depth; } - unsigned int depth() const { return depth_; } - id unique_id() const { return unique_id_; } - protected: - virtual bool internal_stealing(abstract_task* other_task) = 0; - virtual bool split_task(base::spin_lock* lock) = 0; - - bool steal_work(); - }; - } - } +namespace internal { +namespace scheduling { + +class abstract_task { + public: + using id = helpers::unique_id; + + private: + unsigned int depth_; + abstract_task::id unique_id_; + abstract_task *child_task_; + + public: + abstract_task(const unsigned int depth, const abstract_task::id &unique_id) : + depth_{depth}, + unique_id_{unique_id}, + child_task_{nullptr} {} + + virtual void execute() = 0; + void set_child(abstract_task *child_task) { child_task_ = child_task; } + abstract_task *child() { return child_task_; } + + void set_depth(unsigned int depth) { depth_ = depth; } + unsigned int depth() const { return depth_; } + id unique_id() const { return unique_id_; } + protected: + virtual bool internal_stealing(abstract_task *other_task) = 0; + virtual bool split_task(base::spin_lock *lock) = 0; + + bool steal_work(); +}; + +} +} } #endif //PLS_ABSTRACT_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/fork_join_task.h b/lib/pls/include/pls/internal/scheduling/fork_join_task.h index b7b0da3..ffa98a5 100644 --- a/lib/pls/include/pls/internal/scheduling/fork_join_task.h +++ b/lib/pls/include/pls/internal/scheduling/fork_join_task.h @@ -11,87 +11,89 @@ #include "thread_state.h" namespace pls { - namespace internal { - namespace scheduling { - class fork_join_task; - class fork_join_sub_task: public data_structures::deque_item { - friend class fork_join_task; - - // Coordinate finishing of sub_tasks - std::atomic_uint32_t ref_count_; - fork_join_sub_task* parent_; - - // Access to TBB scheduling environment - fork_join_task* tbb_task_; - - // Stack Management (reset stack pointer after wait_for_all() calls) - data_structures::aligned_stack::state stack_state_; - protected: - explicit fork_join_sub_task(); - fork_join_sub_task(const fork_join_sub_task& other); - - // Overwritten with behaviour of child tasks - virtual void execute_internal() = 0; - - public: - // Only use them when actually executing this sub_task (only public for simpler API design) - template - void spawn_child(const T& sub_task); - void wait_for_all(); - - private: - void spawn_child_internal(fork_join_sub_task* sub_task); - void execute(); - }; - - template - class fork_join_lambda: public fork_join_sub_task { - const Function* function_; - - public: - explicit fork_join_lambda(const Function* function): function_{function} {}; - - protected: - void execute_internal() override { - (*function_)(this); - } - }; - - class fork_join_task: public abstract_task { - friend class fork_join_sub_task; - - fork_join_sub_task* root_task_; - fork_join_sub_task* currently_executing_; - data_structures::aligned_stack* my_stack_; - - // Double-Ended Queue management - data_structures::deque deque_; - - // Steal Management - fork_join_sub_task* last_stolen_; - - fork_join_sub_task* get_local_sub_task(); - fork_join_sub_task* get_stolen_sub_task(); - - bool internal_stealing(abstract_task* other_task) override; - bool split_task(base::spin_lock* /*lock*/) override; - - public: - explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id); - void execute() override; - fork_join_sub_task* currently_executing() const; - }; - - template - void fork_join_sub_task::spawn_child(const T& task) { - PROFILE_FORK_JOIN_STEALING("spawn_child") - static_assert(std::is_base_of::value, "Only pass fork_join_sub_task subclasses!"); - - T* new_task = tbb_task_->my_stack_->push(task); - spawn_child_internal(new_task); - } - } - } +namespace internal { +namespace scheduling { + +class fork_join_task; +class fork_join_sub_task : public data_structures::deque_item { + friend class fork_join_task; + + // Coordinate finishing of sub_tasks + std::atomic_uint32_t ref_count_; + fork_join_sub_task *parent_; + + // Access to TBB scheduling environment + fork_join_task *tbb_task_; + + // Stack Management (reset stack pointer after wait_for_all() calls) + data_structures::aligned_stack::state stack_state_; + protected: + explicit fork_join_sub_task(); + fork_join_sub_task(const fork_join_sub_task &other); + + // Overwritten with behaviour of child tasks + virtual void execute_internal() = 0; + + public: + // Only use them when actually executing this sub_task (only public for simpler API design) + template + void spawn_child(const T &sub_task); + void wait_for_all(); + + private: + void spawn_child_internal(fork_join_sub_task *sub_task); + void execute(); +}; + +template +class fork_join_lambda : public fork_join_sub_task { + const Function *function_; + + public: + explicit fork_join_lambda(const Function *function) : function_{function} {}; + + protected: + void execute_internal() override { + (*function_)(this); + } +}; + +class fork_join_task : public abstract_task { + friend class fork_join_sub_task; + + fork_join_sub_task *root_task_; + fork_join_sub_task *currently_executing_; + data_structures::aligned_stack *my_stack_; + + // Double-Ended Queue management + data_structures::deque deque_; + + // Steal Management + fork_join_sub_task *last_stolen_; + + fork_join_sub_task *get_local_sub_task(); + fork_join_sub_task *get_stolen_sub_task(); + + bool internal_stealing(abstract_task *other_task) override; + bool split_task(base::spin_lock * /*lock*/) override; + + public: + explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id); + void execute() override; + fork_join_sub_task *currently_executing() const; +}; + +template +void fork_join_sub_task::spawn_child(const T &task) { + PROFILE_FORK_JOIN_STEALING("spawn_child") + static_assert(std::is_base_of::value, "Only pass fork_join_sub_task subclasses!"); + + T *new_task = tbb_task_->my_stack_->push(task); + spawn_child_internal(new_task); +} + +} +} } #endif //PLS_TBB_LIKE_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/root_task.h b/lib/pls/include/pls/internal/scheduling/root_task.h index 5b2c3cb..eff93e5 100644 --- a/lib/pls/include/pls/internal/scheduling/root_task.h +++ b/lib/pls/include/pls/internal/scheduling/root_task.h @@ -10,71 +10,73 @@ #include "abstract_task.h" namespace pls { - namespace internal { - namespace scheduling { - template - class root_task : public abstract_task { - Function function_; - std::atomic_uint8_t finished_; - public: - static constexpr auto create_id = helpers::unique_id::create>; - - explicit root_task(Function function): - abstract_task{0, create_id()}, - function_{function}, - finished_{0} {} - root_task(const root_task& other): - abstract_task{0, create_id()}, - function_{other.function_}, - finished_{0} {} - - bool finished() { - return finished_; - } - - void execute() override { - PROFILE_WORK_BLOCK("execute root_task"); - function_(); - finished_ = 1; - } - - bool internal_stealing(abstract_task* /*other_task*/) override { - return false; - } - - bool split_task(base::spin_lock* /*lock*/) override { - return false; - } - }; - - template - class root_worker_task : public abstract_task { - root_task* master_task_; - - public: - static constexpr auto create_id = root_task::create_id; - - explicit root_worker_task(root_task* master_task): - abstract_task{0, create_id()}, - master_task_{master_task} {} - - void execute() override { - PROFILE_WORK_BLOCK("execute root_task"); - do { - steal_work(); - } while (!master_task_->finished()); - } - - bool internal_stealing(abstract_task* /*other_task*/) override { - return false; - } - - bool split_task(base::spin_lock* /*lock*/) override { - return false; - } - }; - } - } +namespace internal { +namespace scheduling { + +template +class root_task : public abstract_task { + Function function_; + std::atomic_uint8_t finished_; + public: + static constexpr auto create_id = helpers::unique_id::create>; + + explicit root_task(Function function) : + abstract_task{0, create_id()}, + function_{function}, + finished_{0} {} + root_task(const root_task &other) : + abstract_task{0, create_id()}, + function_{other.function_}, + finished_{0} {} + + bool finished() { + return finished_; + } + + void execute() override { + PROFILE_WORK_BLOCK("execute root_task"); + function_(); + finished_ = 1; + } + + bool internal_stealing(abstract_task * /*other_task*/) override { + return false; + } + + bool split_task(base::spin_lock * /*lock*/) override { + return false; + } +}; + +template +class root_worker_task : public abstract_task { + root_task *master_task_; + + public: + static constexpr auto create_id = root_task::create_id; + + explicit root_worker_task(root_task *master_task) : + abstract_task{0, create_id()}, + master_task_{master_task} {} + + void execute() override { + PROFILE_WORK_BLOCK("execute root_task"); + do { + steal_work(); + } while (!master_task_->finished()); + } + + bool internal_stealing(abstract_task * /*other_task*/) override { + return false; + } + + bool split_task(base::spin_lock * /*lock*/) override { + return false; + } +}; + +} +} } #endif //PLS_ROOT_MASTER_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h index 06539e1..0a708ff 100644 --- a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h +++ b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h @@ -12,107 +12,110 @@ #include "scheduler.h" namespace pls { - namespace internal { - namespace scheduling { - template - class run_on_n_threads_task : public abstract_task { - template - friend class run_on_n_threads_task_worker; - - Function function_; - - // Improvement: Remove lock and replace by atomic variable (performance) - int counter; - base::spin_lock counter_lock_; - - int decrement_counter() { - std::lock_guard lock{counter_lock_}; - counter--; - return counter; - } - - int get_counter() { - std::lock_guard lock{counter_lock_}; - return counter; - } - public: - static constexpr auto create_id = helpers::unique_id::create>; - - run_on_n_threads_task(Function function, int num_threads): - abstract_task{0, create_id()}, - function_{function}, - counter{num_threads - 1} {} - - void execute() override { - // Execute our function ONCE - function_(); - - // Steal until we are finished (other threads executed) - do { - steal_work(); - } while (get_counter() > 0); - - std::cout << "Finished Master!" << std::endl; - } - - bool internal_stealing(abstract_task* /*other_task*/) override { - return false; - } - - bool split_task(base::spin_lock* lock) override; - }; - - template - class run_on_n_threads_task_worker : public abstract_task { - Function function_; - run_on_n_threads_task* root_; - public: - static constexpr auto create_id = helpers::unique_id::create>; - - run_on_n_threads_task_worker(Function function, run_on_n_threads_task* root): - abstract_task{0, create_id()}, - function_{function}, - root_{root} {} - - void execute() override { - if (root_->decrement_counter() >= 0) { - function_(); - std::cout << "Finished Worker!" << std::endl; - } else { - std::cout << "Abandoned Worker!" << std::endl; - } - } - - bool internal_stealing(abstract_task* /*other_task*/) override { - return false; - } - - bool split_task(base::spin_lock* /*lock*/) override { - return false; - } - }; - - template - bool run_on_n_threads_task::split_task(base::spin_lock* lock) { - if (get_counter() <= 0) { - return false; - } - // In success case, unlock. - // TODO: this locking is complicated and error prone. - lock->unlock(); - - auto scheduler = base::this_thread::state()->scheduler_; - auto task = run_on_n_threads_task_worker{function_, this}; - scheduler->execute_task(task, depth()); - return true; - } - - template - run_on_n_threads_task create_run_on_n_threads_task(Function function, int num_threads) { - return run_on_n_threads_task{function, num_threads}; - } - } +namespace internal { +namespace scheduling { + +template +class run_on_n_threads_task : public abstract_task { + template + friend + class run_on_n_threads_task_worker; + + Function function_; + + // Improvement: Remove lock and replace by atomic variable (performance) + int counter; + base::spin_lock counter_lock_; + + int decrement_counter() { + std::lock_guard lock{counter_lock_}; + counter--; + return counter; + } + + int get_counter() { + std::lock_guard lock{counter_lock_}; + return counter; + } + public: + static constexpr auto create_id = helpers::unique_id::create>; + + run_on_n_threads_task(Function function, int num_threads) : + abstract_task{0, create_id()}, + function_{function}, + counter{num_threads - 1} {} + + void execute() override { + // Execute our function ONCE + function_(); + + // Steal until we are finished (other threads executed) + do { + steal_work(); + } while (get_counter() > 0); + + std::cout << "Finished Master!" << std::endl; + } + + bool internal_stealing(abstract_task * /*other_task*/) override { + return false; + } + + bool split_task(base::spin_lock *lock) override; +}; + +template +class run_on_n_threads_task_worker : public abstract_task { + Function function_; + run_on_n_threads_task *root_; + public: + static constexpr auto create_id = helpers::unique_id::create>; + + run_on_n_threads_task_worker(Function function, run_on_n_threads_task *root) : + abstract_task{0, create_id()}, + function_{function}, + root_{root} {} + + void execute() override { + if (root_->decrement_counter() >= 0) { + function_(); + std::cout << "Finished Worker!" << std::endl; + } else { + std::cout << "Abandoned Worker!" << std::endl; } + } + + bool internal_stealing(abstract_task * /*other_task*/) override { + return false; + } + + bool split_task(base::spin_lock * /*lock*/) override { + return false; + } +}; + +template +bool run_on_n_threads_task::split_task(base::spin_lock *lock) { + if (get_counter() <= 0) { + return false; + } + // In success case, unlock. + // TODO: this locking is complicated and error prone. + lock->unlock(); + + auto scheduler = base::this_thread::state()->scheduler_; + auto task = run_on_n_threads_task_worker{function_, this}; + scheduler->execute_task(task, depth()); + return true; +} + +template +run_on_n_threads_task create_run_on_n_threads_task(Function function, int num_threads) { + return run_on_n_threads_task{function, num_threads}; +} + +} +} } #endif //PLS_RUN_ON_N_THREADS_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 4f01e89..583262a 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -17,50 +17,52 @@ #include "scheduler_memory.h" namespace pls { - namespace internal { - namespace scheduling { - void worker_routine(); - using scheduler_thread = base::thread; - - class scheduler { - friend void worker_routine(); - - const unsigned int num_threads_; - scheduler_memory* memory_; - - base::barrier sync_barrier_; - bool terminated_; - public: - explicit scheduler(scheduler_memory* memory, unsigned int num_threads); - ~scheduler(); - - /** - * Wakes up the thread pool. - * Code inside the Function lambda can invoke all parallel APIs. - * - * @param work_section generic function or lambda to be executed in the scheduler's context. - */ - template - void perform_work(Function work_section); - - /** - * Executes a top-level-task (children of abstract_task) on this thread. - * - * @param task The task to be executed. - * @param depth Optional: depth of the new task, otherwise set implicitly. - */ - template - static void execute_task(Task& task, int depth=-1); - - static abstract_task* current_task() { return base::this_thread::state()->current_task_; } - - void terminate(bool wait_for_workers=true); - - unsigned int num_threads() const { return num_threads_; } - thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); } - }; - } - } +namespace internal { +namespace scheduling { + +void worker_routine(); +using scheduler_thread = base::thread; + +class scheduler { + friend void worker_routine(); + + const unsigned int num_threads_; + scheduler_memory *memory_; + + base::barrier sync_barrier_; + bool terminated_; + public: + explicit scheduler(scheduler_memory *memory, unsigned int num_threads); + ~scheduler(); + + /** + * Wakes up the thread pool. + * Code inside the Function lambda can invoke all parallel APIs. + * + * @param work_section generic function or lambda to be executed in the scheduler's context. + */ + template + void perform_work(Function work_section); + + /** + * Executes a top-level-task (children of abstract_task) on this thread. + * + * @param task The task to be executed. + * @param depth Optional: depth of the new task, otherwise set implicitly. + */ + template + static void execute_task(Task &task, int depth = -1); + + static abstract_task *current_task() { return base::this_thread::state()->current_task_; } + + void terminate(bool wait_for_workers = true); + + unsigned int num_threads() const { return num_threads_; } + thread_state *thread_state_for(size_t id) { return memory_->thread_state_for(id); } +}; + +} +} } #include "scheduler_impl.h" diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 869a5e3..fcfc1dd 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -3,70 +3,72 @@ #define PLS_SCHEDULER_IMPL_H namespace pls { - namespace internal { - namespace scheduling { - template - void scheduler::perform_work(Function work_section) { - PROFILE_WORK_BLOCK("scheduler::perform_work") - root_task master{work_section}; - - // Push root task on stacks - auto new_master = memory_->task_stack_for(0)->push(master); - memory_->thread_state_for(0)->root_task_ = new_master; - memory_->thread_state_for(0)->current_task_ = new_master; - for (unsigned int i = 1; i < num_threads_; i++) { - root_worker_task worker{new_master}; - auto new_worker = memory_->task_stack_for(0)->push(worker); - memory_->thread_state_for(i)->root_task_ = new_worker; - memory_->thread_state_for(i)->current_task_ = new_worker; - } - - // Perform and wait for work - sync_barrier_.wait(); // Trigger threads to wake up - sync_barrier_.wait(); // Wait for threads to finish - - // Clean up stack - memory_->task_stack_for(0)->pop(); - for (unsigned int i = 1; i < num_threads_; i++) { - root_worker_task worker{new_master}; - memory_->task_stack_for(0)->pop(); - } - } - - template - void scheduler::execute_task(Task& task, int depth) { - static_assert(std::is_base_of::value, "Only pass abstract_task subclasses!"); - - auto my_state = base::this_thread::state(); - abstract_task* old_task; - abstract_task* new_task; - - // Init Task - { - std::lock_guard lock{my_state->lock_}; - old_task = my_state->current_task_; - new_task = my_state->task_stack_->push(task); - - new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); - my_state->current_task_ = new_task; - old_task->set_child(new_task); - } - - // Run Task - new_task->execute(); - - // Teardown state back to before the task was executed - { - std::lock_guard lock{my_state->lock_}; - - old_task->set_child(nullptr); - my_state->current_task_ = old_task; - - my_state->task_stack_->pop(); - } - } - } - } +namespace internal { +namespace scheduling { + +template +void scheduler::perform_work(Function work_section) { + PROFILE_WORK_BLOCK("scheduler::perform_work") + root_task master{work_section}; + + // Push root task on stacks + auto new_master = memory_->task_stack_for(0)->push(master); + memory_->thread_state_for(0)->root_task_ = new_master; + memory_->thread_state_for(0)->current_task_ = new_master; + for (unsigned int i = 1; i < num_threads_; i++) { + root_worker_task worker{new_master}; + auto new_worker = memory_->task_stack_for(0)->push(worker); + memory_->thread_state_for(i)->root_task_ = new_worker; + memory_->thread_state_for(i)->current_task_ = new_worker; + } + + // Perform and wait for work + sync_barrier_.wait(); // Trigger threads to wake up + sync_barrier_.wait(); // Wait for threads to finish + + // Clean up stack + memory_->task_stack_for(0)->pop(); + for (unsigned int i = 1; i < num_threads_; i++) { + root_worker_task worker{new_master}; + memory_->task_stack_for(0)->pop(); + } +} + +template +void scheduler::execute_task(Task &task, int depth) { + static_assert(std::is_base_of::value, "Only pass abstract_task subclasses!"); + + auto my_state = base::this_thread::state(); + abstract_task *old_task; + abstract_task *new_task; + + // Init Task + { + std::lock_guard lock{my_state->lock_}; + old_task = my_state->current_task_; + new_task = my_state->task_stack_->push(task); + + new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); + my_state->current_task_ = new_task; + old_task->set_child(new_task); + } + + // Run Task + new_task->execute(); + + // Teardown state back to before the task was executed + { + std::lock_guard lock{my_state->lock_}; + + old_task->set_child(nullptr); + my_state->current_task_ = old_task; + + my_state->task_stack_->pop(); + } +} + +} +} } #endif //PLS_SCHEDULER_IMPL_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h index 25d898f..602736a 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h @@ -7,72 +7,75 @@ #define PLS_SCHEDULER_MEMORY_H namespace pls { - namespace internal { - namespace scheduling { - void worker_routine(); - using scheduler_thread = base::thread; +namespace internal { +namespace scheduling { - class scheduler_memory { - public: - virtual size_t max_threads() const = 0; - virtual thread_state* thread_state_for(size_t id) = 0; - virtual scheduler_thread* thread_for(size_t id) = 0; - virtual data_structures::aligned_stack* task_stack_for(size_t id) = 0; - }; +void worker_routine(); +using scheduler_thread = base::thread; - template - class static_scheduler_memory: public scheduler_memory { - // Everyone of these types has to live on its own cache line, - // as each thread uses one of them independently. - // Therefore it would be a major performance hit if we shared cache lines on these. - using aligned_thread = base::alignment::aligned_wrapper; - using aligned_thread_state = base::alignment::aligned_wrapper; - using aligned_thread_stack = base::alignment::aligned_wrapper>; - using aligned_aligned_stack = base::alignment::aligned_wrapper; +class scheduler_memory { + public: + virtual size_t max_threads() const = 0; + virtual thread_state *thread_state_for(size_t id) = 0; + virtual scheduler_thread *thread_for(size_t id) = 0; + virtual data_structures::aligned_stack *task_stack_for(size_t id) = 0; +}; - std::array threads_; - std::array thread_states_; - std::array task_stacks_memory_; - std::array task_stacks_; +template +class static_scheduler_memory : public scheduler_memory { + // Everyone of these types has to live on its own cache line, + // as each thread uses one of them independently. + // Therefore it would be a major performance hit if we shared cache lines on these. + using aligned_thread = base::alignment::aligned_wrapper; + using aligned_thread_state = base::alignment::aligned_wrapper; + using aligned_thread_stack = base::alignment::aligned_wrapper>; + using aligned_aligned_stack = base::alignment::aligned_wrapper; - public: - static_scheduler_memory() { - for (size_t i = 0; i < MAX_THREADS; i++) { - new ((void*)task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(), TASK_STACK_SIZE); - } - } + std::array threads_; + std::array thread_states_; + std::array task_stacks_memory_; + std::array task_stacks_; - size_t max_threads() const override { return MAX_THREADS; } - thread_state* thread_state_for(size_t id) override { return thread_states_[id].pointer(); } - scheduler_thread* thread_for(size_t id) override { return threads_[id].pointer(); } - data_structures::aligned_stack* task_stack_for(size_t id) override { return task_stacks_[id].pointer(); } - }; + public: + static_scheduler_memory() { + for (size_t i = 0; i < MAX_THREADS; i++) { + new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(), + TASK_STACK_SIZE); + } + } - class malloc_scheduler_memory: public scheduler_memory { - // Everyone of these types has to live on its own cache line, - // as each thread uses one of them independently. - // Therefore it would be a major performance hit if we shared cache lines on these. - using aligned_thread = base::alignment::aligned_wrapper; - using aligned_thread_state = base::alignment::aligned_wrapper; - using aligned_aligned_stack = base::alignment::aligned_wrapper; + size_t max_threads() const override { return MAX_THREADS; } + thread_state *thread_state_for(size_t id) override { return thread_states_[id].pointer(); } + scheduler_thread *thread_for(size_t id) override { return threads_[id].pointer(); } + data_structures::aligned_stack *task_stack_for(size_t id) override { return task_stacks_[id].pointer(); } +}; - const size_t num_threads_; +class malloc_scheduler_memory : public scheduler_memory { + // Everyone of these types has to live on its own cache line, + // as each thread uses one of them independently. + // Therefore it would be a major performance hit if we shared cache lines on these. + using aligned_thread = base::alignment::aligned_wrapper; + using aligned_thread_state = base::alignment::aligned_wrapper; + using aligned_aligned_stack = base::alignment::aligned_wrapper; - aligned_thread* threads_; - aligned_thread_state * thread_states_; - char** task_stacks_memory_; - aligned_aligned_stack * task_stacks_; - public: - explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16); - ~malloc_scheduler_memory(); + const size_t num_threads_; - size_t max_threads() const override { return num_threads_; } - thread_state* thread_state_for(size_t id) override { return thread_states_[id].pointer(); } - scheduler_thread* thread_for(size_t id) override { return threads_[id].pointer(); } - data_structures::aligned_stack* task_stack_for(size_t id) override { return task_stacks_[id].pointer(); } - }; - } - } + aligned_thread *threads_; + aligned_thread_state *thread_states_; + char **task_stacks_memory_; + aligned_aligned_stack *task_stacks_; + public: + explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16); + ~malloc_scheduler_memory(); + + size_t max_threads() const override { return num_threads_; } + thread_state *thread_state_for(size_t id) override { return thread_states_[id].pointer(); } + scheduler_thread *thread_for(size_t id) override { return threads_[id].pointer(); } + data_structures::aligned_stack *task_stack_for(size_t id) override { return task_stacks_[id].pointer(); } +}; + +} +} } #endif //PLS_SCHEDULER_MEMORY_H diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index ee864db..22154f8 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -8,38 +8,40 @@ #include "abstract_task.h" namespace pls { - namespace internal { - namespace scheduling { - // forward declaration - class scheduler; - - struct thread_state { - scheduler* scheduler_; - abstract_task* root_task_; - abstract_task* current_task_; - data_structures::aligned_stack* task_stack_; - size_t id_; - base::spin_lock lock_; - std::minstd_rand random_; - - thread_state(): - scheduler_{nullptr}, - root_task_{nullptr}, - current_task_{nullptr}, - task_stack_{nullptr}, - id_{0}, - random_{id_} {}; - - thread_state(scheduler* scheduler, data_structures::aligned_stack* task_stack, unsigned int id): - scheduler_{scheduler}, - root_task_{nullptr}, - current_task_{nullptr}, - task_stack_{task_stack}, - id_{id}, - random_{id_} {} - }; - } - } +namespace internal { +namespace scheduling { + +// forward declaration +class scheduler; + +struct thread_state { + scheduler *scheduler_; + abstract_task *root_task_; + abstract_task *current_task_; + data_structures::aligned_stack *task_stack_; + size_t id_; + base::spin_lock lock_; + std::minstd_rand random_; + + thread_state() : + scheduler_{nullptr}, + root_task_{nullptr}, + current_task_{nullptr}, + task_stack_{nullptr}, + id_{0}, + random_{id_} {}; + + thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) : + scheduler_{scheduler}, + root_task_{nullptr}, + current_task_{nullptr}, + task_stack_{task_stack}, + id_{id}, + random_{id_} {} +}; + +} +} } #endif //PLS_THREAD_STATE_H diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index 35cb545..b75317b 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -8,18 +8,20 @@ #include "pls/internal/helpers/unique_id.h" namespace pls { - using internal::scheduling::static_scheduler_memory; - using internal::scheduling::malloc_scheduler_memory; - using internal::scheduling::scheduler; - using task_id = internal::scheduling::abstract_task::id; +using internal::scheduling::static_scheduler_memory; +using internal::scheduling::malloc_scheduler_memory; - using unique_id = internal::helpers::unique_id; +using internal::scheduling::scheduler; +using task_id = internal::scheduling::abstract_task::id; - using internal::scheduling::fork_join_sub_task; - using internal::scheduling::fork_join_task; +using unique_id = internal::helpers::unique_id; + +using internal::scheduling::fork_join_sub_task; +using internal::scheduling::fork_join_task; + +using algorithm::invoke_parallel; - using algorithm::invoke_parallel; } #endif diff --git a/lib/pls/src/internal/base/alignment.cpp b/lib/pls/src/internal/base/alignment.cpp index af95adb..d41364b 100644 --- a/lib/pls/src/internal/base/alignment.cpp +++ b/lib/pls/src/internal/base/alignment.cpp @@ -2,26 +2,28 @@ #include "pls/internal/base/system_details.h" namespace pls { - namespace internal { - namespace base { - namespace alignment { - void* allocate_aligned(size_t size) { - return aligned_alloc(system_details::CACHE_LINE_SIZE, size); - } +namespace internal { +namespace base { +namespace alignment { - std::uintptr_t next_alignment(std::uintptr_t size) { - std::uintptr_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE; - if (miss_alignment == 0) { - return size; - } else { - return size + (base::system_details::CACHE_LINE_SIZE - miss_alignment); - } - } +void *allocate_aligned(size_t size) { + return aligned_alloc(system_details::CACHE_LINE_SIZE, size); +} + +std::uintptr_t next_alignment(std::uintptr_t size) { + std::uintptr_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE; + if (miss_alignment == 0) { + return size; + } else { + return size + (base::system_details::CACHE_LINE_SIZE - miss_alignment); + } +} + +char *next_alignment(char *pointer) { + return reinterpret_cast(next_alignment(reinterpret_cast(pointer))); +} - char* next_alignment(char* pointer) { - return reinterpret_cast(next_alignment(reinterpret_cast(pointer))); - } - } - } - } +} +} +} } diff --git a/lib/pls/src/internal/base/barrier.cpp b/lib/pls/src/internal/base/barrier.cpp index a2893be..0d78ad5 100644 --- a/lib/pls/src/internal/base/barrier.cpp +++ b/lib/pls/src/internal/base/barrier.cpp @@ -1,19 +1,21 @@ #include "pls/internal/base/barrier.h" namespace pls { - namespace internal { - namespace base { - barrier::barrier(const unsigned int count): barrier_{} { - pthread_barrier_init(&barrier_, nullptr, count); - } +namespace internal { +namespace base { - barrier::~barrier() { - pthread_barrier_destroy(&barrier_); - } +barrier::barrier(const unsigned int count) : barrier_{} { + pthread_barrier_init(&barrier_, nullptr, count); +} + +barrier::~barrier() { + pthread_barrier_destroy(&barrier_); +} - void barrier::wait() { - pthread_barrier_wait(&barrier_); - } - } - } +void barrier::wait() { + pthread_barrier_wait(&barrier_); +} + +} +} } diff --git a/lib/pls/src/internal/base/tas_spin_lock.cpp b/lib/pls/src/internal/base/tas_spin_lock.cpp index e6f9d1e..b35d805 100644 --- a/lib/pls/src/internal/base/tas_spin_lock.cpp +++ b/lib/pls/src/internal/base/tas_spin_lock.cpp @@ -2,33 +2,35 @@ #include "pls/internal/base/tas_spin_lock.h" namespace pls { - namespace internal { - namespace base { - void tas_spin_lock::lock() { - PROFILE_LOCK("Acquire Lock") - int tries = 0; - while (flag_.test_and_set(std::memory_order_acquire)) { - tries++; - if (tries % yield_at_tries_ == 0) { - this_thread::yield(); - } - } - } +namespace internal { +namespace base { - bool tas_spin_lock::try_lock(unsigned int num_tries) { - PROFILE_LOCK("Try Acquire Lock") - while (flag_.test_and_set(std::memory_order_acquire)) { - num_tries--; - if (num_tries <= 0) { - return false; - } - } - return true; - } +void tas_spin_lock::lock() { + PROFILE_LOCK("Acquire Lock") + int tries = 0; + while (flag_.test_and_set(std::memory_order_acquire)) { + tries++; + if (tries % yield_at_tries_ == 0) { + this_thread::yield(); + } + } +} - void tas_spin_lock::unlock() { - flag_.clear(std::memory_order_release); - } - } +bool tas_spin_lock::try_lock(unsigned int num_tries) { + PROFILE_LOCK("Try Acquire Lock") + while (flag_.test_and_set(std::memory_order_acquire)) { + num_tries--; + if (num_tries <= 0) { + return false; } + } + return true; +} + +void tas_spin_lock::unlock() { + flag_.clear(std::memory_order_release); +} + +} +} } diff --git a/lib/pls/src/internal/base/thread.cpp b/lib/pls/src/internal/base/thread.cpp index 57ebd21..8c47060 100644 --- a/lib/pls/src/internal/base/thread.cpp +++ b/lib/pls/src/internal/base/thread.cpp @@ -1,16 +1,18 @@ #include "pls/internal/base/thread.h" namespace pls { - namespace internal { - namespace base { +namespace internal { +namespace base { + #ifdef PLS_THREAD_SPECIFIC_PTHREAD - pthread_key_t this_thread::local_storage_key_ = false; - bool this_thread::local_storage_key_initialized_; +pthread_key_t this_thread::local_storage_key_ = false; +bool this_thread::local_storage_key_initialized_; #endif #ifdef PLS_THREAD_SPECIFIC_COMPILER - __thread void* this_thread::local_state_; +__thread void *this_thread::local_state_; #endif - // implementation in header (C++ templating) - } - } +// implementation in header (C++ templating) + +} +} } diff --git a/lib/pls/src/internal/base/ttas_spin_lock.cpp b/lib/pls/src/internal/base/ttas_spin_lock.cpp index e2c8bff..ba1d3ac 100644 --- a/lib/pls/src/internal/base/ttas_spin_lock.cpp +++ b/lib/pls/src/internal/base/ttas_spin_lock.cpp @@ -2,46 +2,48 @@ #include "pls/internal/base/ttas_spin_lock.h" namespace pls { - namespace internal { - namespace base { - void ttas_spin_lock::lock() { - PROFILE_LOCK("Acquire Lock") - int tries = 0; - int expected = 0; - - do { - while (flag_.load(std::memory_order_relaxed) == 1) { - tries++; - if (tries % yield_at_tries_ == 0) { - this_thread::yield(); - } - } - - expected = 0; - } while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)); - } - - bool ttas_spin_lock::try_lock(unsigned int num_tries) { - PROFILE_LOCK("Try Acquire Lock") - int expected = 0; - - do { - while (flag_.load(std::memory_order_relaxed) == 1) { - num_tries--; - if (num_tries <= 0) { - return false; - } - } - - expected = 0; - } while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)); - - return true; - } - - void ttas_spin_lock::unlock() { - flag_.store(0, std::memory_order_release); - } - } +namespace internal { +namespace base { + +void ttas_spin_lock::lock() { + PROFILE_LOCK("Acquire Lock") + int tries = 0; + int expected = 0; + + do { + while (flag_.load(std::memory_order_relaxed) == 1) { + tries++; + if (tries % yield_at_tries_ == 0) { + this_thread::yield(); + } } + + expected = 0; + } while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)); +} + +bool ttas_spin_lock::try_lock(unsigned int num_tries) { + PROFILE_LOCK("Try Acquire Lock") + int expected = 0; + + do { + while (flag_.load(std::memory_order_relaxed) == 1) { + num_tries--; + if (num_tries <= 0) { + return false; + } + } + + expected = 0; + } while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)); + + return true; +} + +void ttas_spin_lock::unlock() { + flag_.store(0, std::memory_order_release); +} + +} +} } diff --git a/lib/pls/src/internal/data_structures/aligned_stack.cpp b/lib/pls/src/internal/data_structures/aligned_stack.cpp index 2a4d6d9..c91c3ec 100644 --- a/lib/pls/src/internal/data_structures/aligned_stack.cpp +++ b/lib/pls/src/internal/data_structures/aligned_stack.cpp @@ -2,12 +2,14 @@ #include "pls/internal/base/system_details.h" namespace pls { - namespace internal { - namespace data_structures { - aligned_stack::aligned_stack(char* memory_region, const std::size_t size): - memory_start_{memory_region}, - memory_end_{memory_region + size}, - head_{base::alignment::next_alignment(memory_start_)} {} - } - } +namespace internal { +namespace data_structures { + +aligned_stack::aligned_stack(char *memory_region, const std::size_t size) : + memory_start_{memory_region}, + memory_end_{memory_region + size}, + head_{base::alignment::next_alignment(memory_start_)} {} + +} +} } diff --git a/lib/pls/src/internal/data_structures/deque.cpp b/lib/pls/src/internal/data_structures/deque.cpp index 786e04b..017a590 100644 --- a/lib/pls/src/internal/data_structures/deque.cpp +++ b/lib/pls/src/internal/data_structures/deque.cpp @@ -3,56 +3,58 @@ #include "pls/internal/data_structures/deque.h" namespace pls { - namespace internal { - namespace data_structures { - deque_item* deque_internal::pop_head_internal() { - std::lock_guard lock{lock_}; - - if (head_ == nullptr) { - return nullptr; - } - - deque_item* result = head_; - head_ = head_->prev_; - if (head_ == nullptr) { - tail_ = nullptr; - } else { - head_->next_ = nullptr; - } - - return result; - } - - deque_item* deque_internal::pop_tail_internal() { - std::lock_guard lock{lock_}; - - if (tail_ == nullptr) { - return nullptr; - } - - deque_item* result = tail_; - tail_ = tail_->next_; - if (tail_ == nullptr) { - head_ = nullptr; - } else { - tail_->prev_ = nullptr; - } - - return result; - } - - void deque_internal::push_tail_internal(deque_item *new_item) { - std::lock_guard lock{lock_}; - - if (tail_ != nullptr) { - tail_->prev_ = new_item; - } else { - head_ = new_item; - } - new_item->next_ = tail_; - new_item->prev_ = nullptr; - tail_ = new_item; - } - } - } +namespace internal { +namespace data_structures { + +deque_item *deque_internal::pop_head_internal() { + std::lock_guard lock{lock_}; + + if (head_ == nullptr) { + return nullptr; + } + + deque_item *result = head_; + head_ = head_->prev_; + if (head_ == nullptr) { + tail_ = nullptr; + } else { + head_->next_ = nullptr; + } + + return result; +} + +deque_item *deque_internal::pop_tail_internal() { + std::lock_guard lock{lock_}; + + if (tail_ == nullptr) { + return nullptr; + } + + deque_item *result = tail_; + tail_ = tail_->next_; + if (tail_ == nullptr) { + head_ = nullptr; + } else { + tail_->prev_ = nullptr; + } + + return result; +} + +void deque_internal::push_tail_internal(deque_item *new_item) { + std::lock_guard lock{lock_}; + + if (tail_ != nullptr) { + tail_->prev_ = new_item; + } else { + head_ = new_item; + } + new_item->next_ = tail_; + new_item->prev_ = nullptr; + tail_ = new_item; +} + +} +} } diff --git a/lib/pls/src/internal/scheduling/abstract_task.cpp b/lib/pls/src/internal/scheduling/abstract_task.cpp index 1b6a21f..50a7003 100644 --- a/lib/pls/src/internal/scheduling/abstract_task.cpp +++ b/lib/pls/src/internal/scheduling/abstract_task.cpp @@ -5,72 +5,74 @@ #include "pls/internal/scheduling/scheduler.h" namespace pls { - namespace internal { - namespace scheduling { - bool abstract_task::steal_work() { - PROFILE_STEALING("abstract_task::steal_work") - const auto my_state = base::this_thread::state(); - const auto my_scheduler = my_state->scheduler_; +namespace internal { +namespace scheduling { - const size_t my_id = my_state->id_; - const size_t offset = my_state->random_() % my_scheduler->num_threads(); - const size_t max_tries = 1; // my_scheduler->num_threads(); TODO: Tune this value - for (size_t i = 0; i < max_tries; i++) { - size_t target = (offset + i) % my_scheduler->num_threads(); - if (target == my_id) { - continue; - } - auto target_state = my_scheduler->thread_state_for(target); +bool abstract_task::steal_work() { + PROFILE_STEALING("abstract_task::steal_work") + const auto my_state = base::this_thread::state(); + const auto my_scheduler = my_state->scheduler_; - // TODO: Cleaner Locking Using std::guarded_lock - target_state->lock_.lock(); + const size_t my_id = my_state->id_; + const size_t offset = my_state->random_() % my_scheduler->num_threads(); + const size_t max_tries = 1; // my_scheduler->num_threads(); TODO: Tune this value + for (size_t i = 0; i < max_tries; i++) { + size_t target = (offset + i) % my_scheduler->num_threads(); + if (target == my_id) { + continue; + } + auto target_state = my_scheduler->thread_state_for(target); - // Dig down to our level - PROFILE_STEALING("Go to our level") - abstract_task* current_task = target_state->root_task_; - while (current_task != nullptr && current_task->depth() < depth()) { - current_task = current_task->child_task_; - } - PROFILE_END_BLOCK + // TODO: Cleaner Locking Using std::guarded_lock + target_state->lock_.lock(); - // Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation - PROFILE_STEALING("Internal Steal") - if (current_task != nullptr) { - // See if it equals our type and depth of task - if (current_task->unique_id_ == unique_id_ && - current_task->depth_ == depth_) { - if (internal_stealing(current_task)) { - // internal steal was a success, hand it back to the internal scheduler - target_state->lock_.unlock(); - return true; - } + // Dig down to our level + PROFILE_STEALING("Go to our level") + abstract_task *current_task = target_state->root_task_; + while (current_task != nullptr && current_task->depth() < depth()) { + current_task = current_task->child_task_; + } + PROFILE_END_BLOCK - // No success, we need to steal work from a deeper level using 'top level task stealing' - current_task = current_task->child_task_; - } - } - PROFILE_END_BLOCK; + // Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation + PROFILE_STEALING("Internal Steal") + if (current_task != nullptr) { + // See if it equals our type and depth of task + if (current_task->unique_id_ == unique_id_ && + current_task->depth_ == depth_) { + if (internal_stealing(current_task)) { + // internal steal was a success, hand it back to the internal scheduler + target_state->lock_.unlock(); + return true; + } + // No success, we need to steal work from a deeper level using 'top level task stealing' + current_task = current_task->child_task_; + } + } + PROFILE_END_BLOCK; - // Execute 'top level task steal' if possible - // (only try deeper tasks to keep depth restricted stealing). - PROFILE_STEALING("Top Level Steal") - while (current_task != nullptr) { - auto lock = &target_state->lock_; - if (current_task->split_task(lock)) { - // internal steal was no success (we did a top level task steal) - return false; - } - current_task = current_task->child_task_; - } - PROFILE_END_BLOCK; - target_state->lock_.unlock(); - } + // Execute 'top level task steal' if possible + // (only try deeper tasks to keep depth restricted stealing). + PROFILE_STEALING("Top Level Steal") + while (current_task != nullptr) { + auto lock = &target_state->lock_; + if (current_task->split_task(lock)) { + // internal steal was no success (we did a top level task steal) + return false; + } - // internal steal was no success - return false; - }; - } + current_task = current_task->child_task_; } + PROFILE_END_BLOCK; + target_state->lock_.unlock(); + } + + // internal steal was no success + return false; +} + +} +} } diff --git a/lib/pls/src/internal/scheduling/fork_join_task.cpp b/lib/pls/src/internal/scheduling/fork_join_task.cpp index 0c5f51f..ed30ee9 100644 --- a/lib/pls/src/internal/scheduling/fork_join_task.cpp +++ b/lib/pls/src/internal/scheduling/fork_join_task.cpp @@ -4,131 +4,133 @@ #include "pls/internal/scheduling/fork_join_task.h" namespace pls { - namespace internal { - namespace scheduling { - fork_join_sub_task::fork_join_sub_task(): - data_structures::deque_item{}, - ref_count_{0}, - parent_{nullptr}, - tbb_task_{nullptr}, - stack_state_{nullptr} {} - - fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other): - data_structures::deque_item(other), - ref_count_{0}, - parent_{nullptr}, - tbb_task_{nullptr}, - stack_state_{nullptr} {} - - void fork_join_sub_task::execute() { - PROFILE_WORK_BLOCK("execute sub_task") - tbb_task_->currently_executing_ = this; - execute_internal(); - tbb_task_->currently_executing_ = nullptr; - PROFILE_END_BLOCK - wait_for_all(); - - if (parent_ != nullptr) { - parent_->ref_count_--; - } - } - - void fork_join_sub_task::spawn_child_internal(fork_join_sub_task* sub_task) { - // Keep our refcount up to date - ref_count_++; - - // Assign forced values - sub_task->parent_ = this; - sub_task->tbb_task_ = tbb_task_; - sub_task->stack_state_ = tbb_task_->my_stack_->save_state(); - - tbb_task_->deque_.push_tail(sub_task); - } - - void fork_join_sub_task::wait_for_all() { - while (ref_count_ > 0) { - PROFILE_STEALING("get local sub task") - fork_join_sub_task* local_task = tbb_task_->get_local_sub_task(); - PROFILE_END_BLOCK - if (local_task != nullptr) { - local_task->execute(); - } else { - // Try to steal work. - // External steal will be executed implicitly if success - PROFILE_STEALING("steal work") - bool internal_steal_success = tbb_task_->steal_work(); - PROFILE_END_BLOCK - if (internal_steal_success) { - tbb_task_->last_stolen_->execute(); - } - } - } - tbb_task_->my_stack_->reset_state(stack_state_); - } - - fork_join_sub_task* fork_join_task::get_local_sub_task() { - return deque_.pop_tail(); - } - - fork_join_sub_task* fork_join_task::get_stolen_sub_task() { - return deque_.pop_head(); - } - - bool fork_join_task::internal_stealing(abstract_task* other_task) { - PROFILE_STEALING("fork_join_task::internal_stealin") - auto cast_other_task = reinterpret_cast(other_task); - - auto stolen_sub_task = cast_other_task->get_stolen_sub_task(); - if (stolen_sub_task == nullptr) { - return false; - } else { - // Make sub-task belong to our fork_join_task instance - stolen_sub_task->tbb_task_ = this; - stolen_sub_task->stack_state_ = my_stack_->save_state(); - // We will execute this next without explicitly moving it onto our stack storage - last_stolen_ = stolen_sub_task; - - return true; - } - } - - bool fork_join_task::split_task(base::spin_lock* lock) { - PROFILE_STEALING("fork_join_task::split_task") - fork_join_sub_task* stolen_sub_task = get_stolen_sub_task(); - if (stolen_sub_task == nullptr) { - return false; - } - fork_join_task task{stolen_sub_task, this->unique_id()}; - - // In success case, unlock. - // TODO: this locking is complicated and error prone. - lock->unlock(); - - scheduler::execute_task(task, depth()); - return true; - } - - void fork_join_task::execute() { - PROFILE_WORK_BLOCK("execute fork_join_task"); - - // Bind this instance to our OS thread - my_stack_ = base::this_thread::state()->task_stack_; - root_task_->tbb_task_ = this; - root_task_->stack_state_ = my_stack_->save_state(); - - // Execute it on our OS thread until its finished - root_task_->execute(); - } - - fork_join_sub_task* fork_join_task::currently_executing() const { return currently_executing_; } - - fork_join_task::fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id): - abstract_task{0, id}, - root_task_{root_task}, - currently_executing_{nullptr}, - my_stack_{nullptr}, - deque_{}, - last_stolen_{nullptr} {}; - } +namespace internal { +namespace scheduling { + +fork_join_sub_task::fork_join_sub_task() : + data_structures::deque_item{}, + ref_count_{0}, + parent_{nullptr}, + tbb_task_{nullptr}, + stack_state_{nullptr} {} + +fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) : + data_structures::deque_item(other), + ref_count_{0}, + parent_{nullptr}, + tbb_task_{nullptr}, + stack_state_{nullptr} {} + +void fork_join_sub_task::execute() { + PROFILE_WORK_BLOCK("execute sub_task") + tbb_task_->currently_executing_ = this; + execute_internal(); + tbb_task_->currently_executing_ = nullptr; + PROFILE_END_BLOCK + wait_for_all(); + + if (parent_ != nullptr) { + parent_->ref_count_--; + } +} + +void fork_join_sub_task::spawn_child_internal(fork_join_sub_task *sub_task) { + // Keep our refcount up to date + ref_count_++; + + // Assign forced values + sub_task->parent_ = this; + sub_task->tbb_task_ = tbb_task_; + sub_task->stack_state_ = tbb_task_->my_stack_->save_state(); + + tbb_task_->deque_.push_tail(sub_task); +} + +void fork_join_sub_task::wait_for_all() { + while (ref_count_ > 0) { + PROFILE_STEALING("get local sub task") + fork_join_sub_task *local_task = tbb_task_->get_local_sub_task(); + PROFILE_END_BLOCK + if (local_task != nullptr) { + local_task->execute(); + } else { + // Try to steal work. + // External steal will be executed implicitly if success + PROFILE_STEALING("steal work") + bool internal_steal_success = tbb_task_->steal_work(); + PROFILE_END_BLOCK + if (internal_steal_success) { + tbb_task_->last_stolen_->execute(); + } } + } + tbb_task_->my_stack_->reset_state(stack_state_); +} + +fork_join_sub_task *fork_join_task::get_local_sub_task() { + return deque_.pop_tail(); +} + +fork_join_sub_task *fork_join_task::get_stolen_sub_task() { + return deque_.pop_head(); +} + +bool fork_join_task::internal_stealing(abstract_task *other_task) { + PROFILE_STEALING("fork_join_task::internal_stealin") + auto cast_other_task = reinterpret_cast(other_task); + + auto stolen_sub_task = cast_other_task->get_stolen_sub_task(); + if (stolen_sub_task == nullptr) { + return false; + } else { + // Make sub-task belong to our fork_join_task instance + stolen_sub_task->tbb_task_ = this; + stolen_sub_task->stack_state_ = my_stack_->save_state(); + // We will execute this next without explicitly moving it onto our stack storage + last_stolen_ = stolen_sub_task; + + return true; + } +} + +bool fork_join_task::split_task(base::spin_lock *lock) { + PROFILE_STEALING("fork_join_task::split_task") + fork_join_sub_task *stolen_sub_task = get_stolen_sub_task(); + if (stolen_sub_task == nullptr) { + return false; + } + fork_join_task task{stolen_sub_task, this->unique_id()}; + + // In success case, unlock. + // TODO: this locking is complicated and error prone. + lock->unlock(); + + scheduler::execute_task(task, depth()); + return true; +} + +void fork_join_task::execute() { + PROFILE_WORK_BLOCK("execute fork_join_task"); + + // Bind this instance to our OS thread + my_stack_ = base::this_thread::state()->task_stack_; + root_task_->tbb_task_ = this; + root_task_->stack_state_ = my_stack_->save_state(); + + // Execute it on our OS thread until its finished + root_task_->execute(); +} + +fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; } + +fork_join_task::fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id) : + abstract_task{0, id}, + root_task_{root_task}, + currently_executing_{nullptr}, + my_stack_{nullptr}, + deque_{}, + last_stolen_{nullptr} {} + +} +} } diff --git a/lib/pls/src/internal/scheduling/root_task.cpp b/lib/pls/src/internal/scheduling/root_task.cpp index ec1b68b..7e3ef89 100644 --- a/lib/pls/src/internal/scheduling/root_task.cpp +++ b/lib/pls/src/internal/scheduling/root_task.cpp @@ -1,9 +1,9 @@ #include "pls/internal/scheduling/root_task.h" namespace pls { - namespace internal { - namespace scheduling { +namespace internal { +namespace scheduling { - } - } +} +} } diff --git a/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp b/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp index e41571f..178afbb 100644 --- a/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp +++ b/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp @@ -1,9 +1,9 @@ #include "pls/internal/scheduling/run_on_n_threads_task.h" namespace pls { - namespace internal { - namespace scheduling { +namespace internal { +namespace scheduling { - } - } +} +} } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index dd06768..9491a06 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -2,60 +2,63 @@ #include "pls/internal/base/error_handling.h" namespace pls { - namespace internal { - namespace scheduling { - scheduler::scheduler(scheduler_memory* memory, const unsigned int num_threads): - num_threads_{num_threads}, - memory_{memory}, - sync_barrier_{num_threads + 1}, - terminated_{false} { - if (num_threads_ > memory_->max_threads()) { - PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); - } - - for (unsigned int i = 0; i < num_threads_; i++) { - // Placement new is required, as the memory of `memory_` is not required to be initialized. - new((void*)memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i}; - new ((void*)memory_->thread_for(i))base::thread(&worker_routine, memory_->thread_state_for(i)); - } - } - - scheduler::~scheduler() { - terminate(); - } - - void worker_routine() { - auto my_state = base::this_thread::state(); - - while (true) { - my_state->scheduler_->sync_barrier_.wait(); - if (my_state->scheduler_->terminated_) { - return; - } - - // The root task must only return when all work is done, - // because of this a simple call is enough to ensure the - // fork-join-section is done (logically joined back into our main thread). - my_state->root_task_->execute(); - - my_state->scheduler_->sync_barrier_.wait(); - } - } - - void scheduler::terminate(bool wait_for_workers) { - if (terminated_) { - return; - } - - terminated_ = true; - sync_barrier_.wait(); - - if (wait_for_workers) { - for (unsigned int i = 0; i < num_threads_; i++) { - memory_->thread_for(i)->join(); - } - } - } - } +namespace internal { +namespace scheduling { + +scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) : + num_threads_{num_threads}, + memory_{memory}, + sync_barrier_{num_threads + 1}, + terminated_{false} { + if (num_threads_ > memory_->max_threads()) { + PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); + } + + for (unsigned int i = 0; i < num_threads_; i++) { + // Placement new is required, as the memory of `memory_` is not required to be initialized. + new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i}; + new((void *) memory_->thread_for(i))base::thread(&worker_routine, + memory_->thread_state_for(i)); + } +} + +scheduler::~scheduler() { + terminate(); +} + +void worker_routine() { + auto my_state = base::this_thread::state(); + + while (true) { + my_state->scheduler_->sync_barrier_.wait(); + if (my_state->scheduler_->terminated_) { + return; + } + + // The root task must only return when all work is done, + // because of this a simple call is enough to ensure the + // fork-join-section is done (logically joined back into our main thread). + my_state->root_task_->execute(); + + my_state->scheduler_->sync_barrier_.wait(); + } +} + +void scheduler::terminate(bool wait_for_workers) { + if (terminated_) { + return; + } + + terminated_ = true; + sync_barrier_.wait(); + + if (wait_for_workers) { + for (unsigned int i = 0; i < num_threads_; i++) { + memory_->thread_for(i)->join(); } + } +} + +} +} } diff --git a/lib/pls/src/internal/scheduling/scheduler_memory.cpp b/lib/pls/src/internal/scheduling/scheduler_memory.cpp index 8a65002..7d46744 100644 --- a/lib/pls/src/internal/scheduling/scheduler_memory.cpp +++ b/lib/pls/src/internal/scheduling/scheduler_memory.cpp @@ -1,31 +1,36 @@ #include "pls/internal/scheduling/scheduler_memory.h" namespace pls { - namespace internal { - namespace scheduling { - malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const size_t memory_per_stack): - num_threads_{num_threads} { - threads_ = reinterpret_cast(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread))); - thread_states_ = reinterpret_cast(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread_state))); +namespace internal { +namespace scheduling { - task_stacks_ = reinterpret_cast(base::alignment::allocate_aligned(num_threads * sizeof(aligned_aligned_stack))); - task_stacks_memory_ = reinterpret_cast(base::alignment::allocate_aligned(num_threads * sizeof(char*))); - for (size_t i = 0; i < num_threads_; i++) { - task_stacks_memory_[i] = reinterpret_cast(base::alignment::allocate_aligned(memory_per_stack)); - new ((void*)task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack); - } - } +malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const size_t memory_per_stack) : + num_threads_{num_threads} { + threads_ = + reinterpret_cast(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread))); + thread_states_ = reinterpret_cast(base::alignment::allocate_aligned( + num_threads * sizeof(aligned_thread_state))); - malloc_scheduler_memory::~malloc_scheduler_memory() { - free(threads_); - free(thread_states_); + task_stacks_ = reinterpret_cast(base::alignment::allocate_aligned( + num_threads * sizeof(aligned_aligned_stack))); + task_stacks_memory_ = reinterpret_cast(base::alignment::allocate_aligned(num_threads * sizeof(char *))); + for (size_t i = 0; i < num_threads_; i++) { + task_stacks_memory_[i] = reinterpret_cast(base::alignment::allocate_aligned(memory_per_stack)); + new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack); + } +} + +malloc_scheduler_memory::~malloc_scheduler_memory() { + free(threads_); + free(thread_states_); + + for (size_t i = 0; i < num_threads_; i++) { + free(task_stacks_memory_[i]); + } + free(task_stacks_); + free(task_stacks_memory_); +} - for (size_t i = 0; i < num_threads_; i++) { - free(task_stacks_memory_[i]); - } - free(task_stacks_); - free(task_stacks_memory_); - } - } - } +} +} } diff --git a/lib/pls/src/internal/scheduling/thread_state.cpp b/lib/pls/src/internal/scheduling/thread_state.cpp index 8d467ed..f503b6a 100644 --- a/lib/pls/src/internal/scheduling/thread_state.cpp +++ b/lib/pls/src/internal/scheduling/thread_state.cpp @@ -1,9 +1,9 @@ #include "pls/internal/scheduling/thread_state.h" namespace pls { - namespace internal { - namespace scheduling { +namespace internal { +namespace scheduling { - } - } +} +} } diff --git a/test/base_tests.cpp b/test/base_tests.cpp index b22cfd4..61435d5 100644 --- a/test/base_tests.cpp +++ b/test/base_tests.cpp @@ -13,73 +13,73 @@ static bool base_tests_visited; static int base_tests_local_value_one; static vector base_tests_local_value_two; -TEST_CASE( "thread creation and joining", "[internal/data_structures/thread.h]") { - base_tests_visited = false; - auto t1 = start_thread([]() { base_tests_visited = true; }); - t1.join(); +TEST_CASE("thread creation and joining", "[internal/data_structures/thread.h]") { + base_tests_visited = false; + auto t1 = start_thread([]() { base_tests_visited = true; }); + t1.join(); - REQUIRE(base_tests_visited); + REQUIRE(base_tests_visited); } -TEST_CASE( "thread state", "[internal/data_structures/thread.h]") { - int state_one = 1; - vector state_two{1, 2}; +TEST_CASE("thread state", "[internal/data_structures/thread.h]") { + int state_one = 1; + vector state_two{1, 2}; - auto t1 = start_thread([]() { base_tests_local_value_one = *this_thread::state(); }, &state_one); - auto t2 = start_thread([]() { base_tests_local_value_two = *this_thread::state>(); }, &state_two); - t1.join(); - t2.join(); + auto t1 = start_thread([]() { base_tests_local_value_one = *this_thread::state(); }, &state_one); + auto t2 = start_thread([]() { base_tests_local_value_two = *this_thread::state>(); }, &state_two); + t1.join(); + t2.join(); - REQUIRE(base_tests_local_value_one == 1); - REQUIRE(base_tests_local_value_two == vector{1, 2}); + REQUIRE(base_tests_local_value_one == 1); + REQUIRE(base_tests_local_value_two == vector{1, 2}); } int base_tests_shared_counter; -TEST_CASE( "spinlock protects concurrent counter", "[internal/data_structures/spinlock.h]") { - constexpr int num_iterations = 1000000; - base_tests_shared_counter = 0; - spin_lock lock{}; - - SECTION( "lock can be used by itself" ) { - auto t1 = start_thread([&]() { - for (int i = 0; i < num_iterations; i++) { - lock.lock(); - base_tests_shared_counter++; - lock.unlock(); - } - }); - auto t2 = start_thread([&]() { - for (int i = 0; i < num_iterations; i++) { - lock.lock(); - base_tests_shared_counter--; - lock.unlock(); - } - }); - - t1.join(); - t2.join(); - - REQUIRE(base_tests_shared_counter == 0); - } - - SECTION( "lock can be used with std::lock_guard" ) { - auto t1 = start_thread([&]() { - for (int i = 0; i < num_iterations; i++) { - std::lock_guard my_lock{lock}; - base_tests_shared_counter++; - } - }); - auto t2 = start_thread([&]() { - for (int i = 0; i < num_iterations; i++) { - std::lock_guard my_lock{lock}; - base_tests_shared_counter--; - } - }); - - t1.join(); - t2.join(); - - REQUIRE(base_tests_shared_counter == 0); - } +TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spinlock.h]") { + constexpr int num_iterations = 1000000; + base_tests_shared_counter = 0; + spin_lock lock{}; + + SECTION("lock can be used by itself") { + auto t1 = start_thread([&]() { + for (int i = 0; i < num_iterations; i++) { + lock.lock(); + base_tests_shared_counter++; + lock.unlock(); + } + }); + auto t2 = start_thread([&]() { + for (int i = 0; i < num_iterations; i++) { + lock.lock(); + base_tests_shared_counter--; + lock.unlock(); + } + }); + + t1.join(); + t2.join(); + + REQUIRE(base_tests_shared_counter == 0); + } + + SECTION("lock can be used with std::lock_guard") { + auto t1 = start_thread([&]() { + for (int i = 0; i < num_iterations; i++) { + std::lock_guard my_lock{lock}; + base_tests_shared_counter++; + } + }); + auto t2 = start_thread([&]() { + for (int i = 0; i < num_iterations; i++) { + std::lock_guard my_lock{lock}; + base_tests_shared_counter--; + } + }); + + t1.join(); + t2.join(); + + REQUIRE(base_tests_shared_counter == 0); + } } diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index a878d72..34ec1f9 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -12,122 +12,121 @@ using namespace pls::internal::data_structures; using namespace pls::internal::base; using namespace std; - -TEST_CASE( "aligned stack stores objects correctly", "[internal/data_structures/aligned_stack.h]") { - constexpr long data_size = 1024; - char data[data_size]; - aligned_stack stack{data, data_size}; - - SECTION( "stack correctly pushes sub linesize objects" ) { - std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; - std::array small_data_two{}; - std::array small_data_three{'A'}; - - auto pointer_one = stack.push(small_data_one); - auto pointer_two = stack.push(small_data_two); - auto pointer_three = stack.push(small_data_three); - - REQUIRE(reinterpret_cast(pointer_one) % system_details::CACHE_LINE_SIZE == 0); - REQUIRE(reinterpret_cast(pointer_two) % system_details::CACHE_LINE_SIZE == 0); - REQUIRE(reinterpret_cast(pointer_three) % system_details::CACHE_LINE_SIZE == 0); - } - - SECTION( "stack correctly pushes above linesize objects" ) { - std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; - std::array big_data_one{}; - - auto big_pointer_one = stack.push(big_data_one); - auto small_pointer_one = stack.push(small_data_one); - - REQUIRE(reinterpret_cast(big_pointer_one) % system_details::CACHE_LINE_SIZE == 0); - REQUIRE(reinterpret_cast(small_pointer_one) % system_details::CACHE_LINE_SIZE == 0); - } - - SECTION( "stack correctly stores and retrieves objects" ) { - std::array data_one{'a', 'b', 'c', 'd', 'e'}; - - stack.push(data_one); - auto retrieved_data = stack.pop>(); - - REQUIRE(retrieved_data == std::array{'a', 'b', 'c', 'd', 'e'}); - } - - SECTION( "stack can push and pop multiple times with correct alignment" ) { - std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; - std::array small_data_two{}; - std::array small_data_three{'A'}; - - auto pointer_one = stack.push(small_data_one); - auto pointer_two = stack.push(small_data_two); - auto pointer_three = stack.push(small_data_three); - stack.pop(); - stack.pop(); - auto pointer_four = stack.push(small_data_two); - auto pointer_five = stack.push(small_data_three); - - REQUIRE(reinterpret_cast(pointer_one) % system_details::CACHE_LINE_SIZE == 0); - REQUIRE(reinterpret_cast(pointer_two) % system_details::CACHE_LINE_SIZE == 0); - REQUIRE(reinterpret_cast(pointer_three) % system_details::CACHE_LINE_SIZE == 0); - REQUIRE(reinterpret_cast(pointer_four) % system_details::CACHE_LINE_SIZE == 0); - REQUIRE(reinterpret_cast(pointer_five) % system_details::CACHE_LINE_SIZE == 0); - - REQUIRE(pointer_four == pointer_two); - REQUIRE(pointer_five == pointer_three); - } +TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/aligned_stack.h]") { + constexpr long data_size = 1024; + char data[data_size]; + aligned_stack stack{data, data_size}; + + SECTION("stack correctly pushes sub linesize objects") { + std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; + std::array small_data_two{}; + std::array small_data_three{'A'}; + + auto pointer_one = stack.push(small_data_one); + auto pointer_two = stack.push(small_data_two); + auto pointer_three = stack.push(small_data_three); + + REQUIRE(reinterpret_cast(pointer_one) % system_details::CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_two) % system_details::CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_three) % system_details::CACHE_LINE_SIZE == 0); + } + + SECTION("stack correctly pushes above linesize objects") { + std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; + std::array big_data_one{}; + + auto big_pointer_one = stack.push(big_data_one); + auto small_pointer_one = stack.push(small_data_one); + + REQUIRE(reinterpret_cast(big_pointer_one) % system_details::CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(small_pointer_one) % system_details::CACHE_LINE_SIZE == 0); + } + + SECTION("stack correctly stores and retrieves objects") { + std::array data_one{'a', 'b', 'c', 'd', 'e'}; + + stack.push(data_one); + auto retrieved_data = stack.pop>(); + + REQUIRE(retrieved_data == std::array{'a', 'b', 'c', 'd', 'e'}); + } + + SECTION("stack can push and pop multiple times with correct alignment") { + std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; + std::array small_data_two{}; + std::array small_data_three{'A'}; + + auto pointer_one = stack.push(small_data_one); + auto pointer_two = stack.push(small_data_two); + auto pointer_three = stack.push(small_data_three); + stack.pop(); + stack.pop(); + auto pointer_four = stack.push(small_data_two); + auto pointer_five = stack.push(small_data_three); + + REQUIRE(reinterpret_cast(pointer_one) % system_details::CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_two) % system_details::CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_three) % system_details::CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_four) % system_details::CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_five) % system_details::CACHE_LINE_SIZE == 0); + + REQUIRE(pointer_four == pointer_two); + REQUIRE(pointer_five == pointer_three); + } } -TEST_CASE( "deque stores objects correctly", "[internal/data_structures/deque.h]") { - class my_item: public deque_item { +TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]") { + class my_item : public deque_item { - }; + }; - deque deque; - my_item one, two, three; + deque deque; + my_item one, two, three; - SECTION( "add and remove items form the tail" ) { - deque.push_tail(&one); - deque.push_tail(&two); - deque.push_tail(&three); + SECTION("add and remove items form the tail") { + deque.push_tail(&one); + deque.push_tail(&two); + deque.push_tail(&three); - REQUIRE(deque.pop_tail() == &three); - REQUIRE(deque.pop_tail() == &two); - REQUIRE(deque.pop_tail() == &one); - } + REQUIRE(deque.pop_tail() == &three); + REQUIRE(deque.pop_tail() == &two); + REQUIRE(deque.pop_tail() == &one); + } - SECTION( "handles getting empty by popping the tail correctly" ) { - deque.push_tail(&one); - REQUIRE(deque.pop_tail() == &one); + SECTION("handles getting empty by popping the tail correctly") { + deque.push_tail(&one); + REQUIRE(deque.pop_tail() == &one); - deque.push_tail(&two); - REQUIRE(deque.pop_tail() == &two); - } + deque.push_tail(&two); + REQUIRE(deque.pop_tail() == &two); + } - SECTION( "remove items form the head" ) { - deque.push_tail(&one); - deque.push_tail(&two); - deque.push_tail(&three); + SECTION("remove items form the head") { + deque.push_tail(&one); + deque.push_tail(&two); + deque.push_tail(&three); - REQUIRE(deque.pop_head() == &one); - REQUIRE(deque.pop_head() == &two); - REQUIRE(deque.pop_head() == &three); - } + REQUIRE(deque.pop_head() == &one); + REQUIRE(deque.pop_head() == &two); + REQUIRE(deque.pop_head() == &three); + } - SECTION( "handles getting empty by popping the head correctly" ) { - deque.push_tail(&one); - REQUIRE(deque.pop_head() == &one); + SECTION("handles getting empty by popping the head correctly") { + deque.push_tail(&one); + REQUIRE(deque.pop_head() == &one); - deque.push_tail(&two); - REQUIRE(deque.pop_head() == &two); - } + deque.push_tail(&two); + REQUIRE(deque.pop_head() == &two); + } - SECTION( "handles getting empty by popping the head and tail correctly" ) { - deque.push_tail(&one); - REQUIRE(deque.pop_tail() == &one); + SECTION("handles getting empty by popping the head and tail correctly") { + deque.push_tail(&one); + REQUIRE(deque.pop_tail() == &one); - deque.push_tail(&two); - REQUIRE(deque.pop_head() == &two); + deque.push_tail(&two); + REQUIRE(deque.pop_head() == &two); - deque.push_tail(&three); - REQUIRE(deque.pop_tail() == &three); - } + deque.push_tail(&three); + REQUIRE(deque.pop_tail() == &three); + } } diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index d3a340d..a442abd 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -4,76 +4,75 @@ using namespace pls; -class once_sub_task: public fork_join_sub_task { - std::atomic* counter_; - int children_; +class once_sub_task : public fork_join_sub_task { + std::atomic *counter_; + int children_; -protected: - void execute_internal() override { - (*counter_)++; - for (int i = 0; i < children_; i++) { - spawn_child(once_sub_task(counter_, children_ - 1)); - } + protected: + void execute_internal() override { + (*counter_)++; + for (int i = 0; i < children_; i++) { + spawn_child(once_sub_task(counter_, children_ - 1)); } + } -public: - explicit once_sub_task(std::atomic* counter, int children): - fork_join_sub_task(), - counter_{counter}, - children_{children} {} + public: + explicit once_sub_task(std::atomic *counter, int children) : + fork_join_sub_task(), + counter_{counter}, + children_{children} {} }; -class force_steal_sub_task: public fork_join_sub_task { - std::atomic* parent_counter_; - std::atomic* overall_counter_; +class force_steal_sub_task : public fork_join_sub_task { + std::atomic *parent_counter_; + std::atomic *overall_counter_; -protected: - void execute_internal() override { - (*overall_counter_)--; - if (overall_counter_->load() > 0) { - std::atomic counter{1}; - spawn_child(force_steal_sub_task(&counter, overall_counter_)); - while (counter.load() > 0) - ; // Spin... - } - - (*parent_counter_)--; + protected: + void execute_internal() override { + (*overall_counter_)--; + if (overall_counter_->load() > 0) { + std::atomic counter{1}; + spawn_child(force_steal_sub_task(&counter, overall_counter_)); + while (counter.load() > 0); // Spin... } -public: - explicit force_steal_sub_task(std::atomic* parent_counter, std::atomic* overall_counter): - fork_join_sub_task(), - parent_counter_{parent_counter}, - overall_counter_{overall_counter} {} + (*parent_counter_)--; + } + + public: + explicit force_steal_sub_task(std::atomic *parent_counter, std::atomic *overall_counter) : + fork_join_sub_task(), + parent_counter_{parent_counter}, + overall_counter_{overall_counter} {} }; -TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") { - malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; +TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") { + malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; - SECTION("tasks are executed exactly once") { - scheduler my_scheduler{&my_scheduler_memory, 2}; - int start_counter = 4; - int total_tasks = 1 + 4 + 4 * 3 + 4 * 3 * 2 + 4 * 3 * 2 * 1; - std::atomic counter{0}; + SECTION("tasks are executed exactly once") { + scheduler my_scheduler{&my_scheduler_memory, 2}; + int start_counter = 4; + int total_tasks = 1 + 4 + 4 * 3 + 4 * 3 * 2 + 4 * 3 * 2 * 1; + std::atomic counter{0}; - my_scheduler.perform_work([&] (){ - once_sub_task sub_task{&counter, start_counter}; - fork_join_task task{&sub_task, unique_id::create(42)}; - scheduler::execute_task(task); - }); + my_scheduler.perform_work([&]() { + once_sub_task sub_task{&counter, start_counter}; + fork_join_task task{&sub_task, unique_id::create(42)}; + scheduler::execute_task(task); + }); - REQUIRE(counter.load() == total_tasks); - my_scheduler.terminate(true); - } + REQUIRE(counter.load() == total_tasks); + my_scheduler.terminate(true); + } - SECTION("tasks can be stolen") { - scheduler my_scheduler{&my_scheduler_memory, 8}; - my_scheduler.perform_work([&] (){ - std::atomic dummy_parent{1}, overall_counter{8}; - force_steal_sub_task sub_task{&dummy_parent, &overall_counter}; - fork_join_task task{&sub_task, unique_id::create(42)}; - scheduler::execute_task(task); - }); - my_scheduler.terminate(true); - } + SECTION("tasks can be stolen") { + scheduler my_scheduler{&my_scheduler_memory, 8}; + my_scheduler.perform_work([&]() { + std::atomic dummy_parent{1}, overall_counter{8}; + force_steal_sub_task sub_task{&dummy_parent, &overall_counter}; + fork_join_task task{&sub_task, unique_id::create(42)}; + scheduler::execute_task(task); + }); + my_scheduler.terminate(true); + } }