diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index d10012b..54abdd1 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -6,7 +6,7 @@ #include #include -static constexpr int CUTOFF = 10; +static constexpr int CUTOFF = 16; static constexpr int NUM_ITERATIONS = 1000; static constexpr int INPUT_SIZE = 2064; typedef std::vector> complex_vector; diff --git a/app/invoke_parallel/main.cpp b/app/invoke_parallel/main.cpp index 371f90f..1d5e32e 100644 --- a/app/invoke_parallel/main.cpp +++ b/app/invoke_parallel/main.cpp @@ -2,48 +2,99 @@ #include #include +#include +#include -static pls::static_scheduler_memory<8, 2 << 14> my_scheduler_memory; +static constexpr int CUTOFF = 16; +static constexpr int INPUT_SIZE = 2064; +typedef std::vector> complex_vector; -static constexpr int CUTOFF = 10; +void divide(complex_vector::iterator data, int n) { + complex_vector tmp_odd_elements(n / 2); + for (int i = 0; i < n / 2; i++) { + tmp_odd_elements[i] = data[i * 2 + 1]; + } + for (int i = 0; i < n / 2; i++) { + data[i] = data[i * 2]; + } + for (int i = 0; i < n / 2; i++) { + data[i + n / 2] = tmp_odd_elements[i]; + } +} + +void combine(complex_vector::iterator data, int n) { + for (int i = 0; i < n / 2; i++) { + std::complex even = data[i]; + std::complex odd = data[i + n / 2]; + + // w is the "twiddle-factor". + // this could be cached, but we run the same 'data_structures' algorithm parallel/serial, + // so it won't impact the performance comparison. + std::complex w = exp(std::complex(0, -2. * M_PI * i / n)); -long fib_serial(long n) { - if (n == 0) { - return 0; + data[i] = even + w * odd; + data[i + n / 2] = even - w * odd; } - if (n == 1) { - return 1; +} + +void fft(complex_vector::iterator data, int n) { + if (n < 2) { + return; } - return fib_serial(n - 1) + fib_serial(n - 2); + PROFILE_WORK_BLOCK("Divide") + divide(data, n); + PROFILE_END_BLOCK + PROFILE_WORK_BLOCK("Invoke Parallel") + if (n == CUTOFF) { + PROFILE_WORK_BLOCK("FFT Serial") + fft(data, n / 2); + fft(data + n / 2, n / 2); + } else if (n <= CUTOFF) { + fft(data, n / 2); + fft(data + n / 2, n / 2); + } else { + pls::invoke_parallel( + [&] { fft(data, n / 2); }, + [&] { fft(data + n / 2, n / 2); } + ); + } + PROFILE_END_BLOCK + PROFILE_WORK_BLOCK("Combine") + combine(data, n); + PROFILE_END_BLOCK } -long fib(long n) { - if (n <= CUTOFF) { - return fib_serial(n); +complex_vector prepare_input(int input_size) { + std::vector known_frequencies{2, 11, 52, 88, 256}; + complex_vector data(input_size); + + // Set our input data to match a time series of the known_frequencies. + // When applying fft to this time-series we should find these frequencies. + for (int i = 0; i < input_size; i++) { + data[i] = std::complex(0.0, 0.0); + for (auto frequencie : known_frequencies) { + data[i] += sin(2 * M_PI * frequencie * i / input_size); + } } - // Actual 'invoke_parallel' logic/code - int left, right; - pls::invoke_parallel( - [&] { left = fib(n - 1); }, - [&] { right = fib(n - 2); } - ); - return left + right; + return data; } int main() { PROFILE_ENABLE + pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 14}; pls::scheduler scheduler{&my_scheduler_memory, 8}; - long result; + complex_vector initial_input = prepare_input(INPUT_SIZE); scheduler.perform_work([&] { PROFILE_MAIN_THREAD // Call looks just the same, only requirement is // the enclosure in the perform_work lambda. for (int i = 0; i < 10; i++) { - result = fib(30); - std::cout << "Fib(30)=" << result << std::endl; + PROFILE_WORK_BLOCK("Top Level FFT") + complex_vector input = initial_input; + fft(input.begin(), input.size()); } }); diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 442d5c6..d3a7a50 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -11,8 +11,5 @@ #include int main() { - std::cout << pls::internal::scheduling::root_task::create_id().type_.hash_code() << std::endl; - std::cout - << pls::internal::helpers::unique_id::create>().type_.hash_code() - << std::endl; + } diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 66ed5b1..52c5710 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -1,65 +1,68 @@ # List all required files here (cmake best practice to NOT automate this step!) add_library(pls STATIC - include/pls/pls.h src/pls.cpp + include/pls/pls.h src/pls.cpp include/pls/algorithms/invoke_parallel.h include/pls/algorithms/invoke_parallel_impl.h include/pls/internal/base/spin_lock.h - include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp - include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp - include/pls/internal/base/thread.h src/internal/base/thread.cpp + include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp + include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp + include/pls/internal/base/swmr_spin_lock.h src/internal/base/swmr_spin_lock.cpp + include/pls/internal/base/thread.h src/internal/base/thread.cpp include/pls/internal/base/thread_impl.h - include/pls/internal/base/barrier.h src/internal/base/barrier.cpp + include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/system_details.h include/pls/internal/base/error_handling.h - include/pls/internal/base/alignment.h src/internal/base/alignment.cpp + include/pls/internal/base/alignment.h src/internal/base/alignment.cpp + include/pls/internal/base/backoff.h - include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp + include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h - include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp + include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp + include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/profiler.h include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/unique_id.h - include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp - include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp - include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp - include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp + include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp + include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp + include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp + include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp - include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp - include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp -) + include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp + include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp + ) # Add everything in `./include` to be in the include path of this project target_include_directories(pls PUBLIC - $ - $ + $ + $ PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR}/src # TODO: Set this up when we require private headers -) + ${CMAKE_CURRENT_SOURCE_DIR}/src # TODO: Set this up when we require private headers + ) # Add cmake dependencies here if needed target_link_libraries(pls Threads::Threads # pthread support -) -if(EASY_PROFILER) + ) +if (EASY_PROFILER) target_link_libraries(pls easy_profiler) -endif() +endif () # Rules for istalling the library on a system # ...binaries INSTALL(TARGETS pls EXPORT pls-targets LIBRARY - DESTINATION lib/pls + DESTINATION lib/pls ARCHIVE - DESTINATION lib/pls -) + DESTINATION lib/pls + ) # ...all headers in `include` INSTALL( DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/pls diff --git a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h index 44cbc74..2fd05ce 100644 --- a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h +++ b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h @@ -5,6 +5,7 @@ #include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/helpers/unique_id.h" +#include "pls/internal/base/alignment.h" namespace pls { namespace algorithm { @@ -21,7 +22,7 @@ inline void run_body(const Body &internal_body, const abstract_task::id &id) { auto current_sub_task = reinterpret_cast(current_task)->currently_executing(); internal_body(current_sub_task); } else { - fork_join_lambda root_body(&internal_body); + fork_join_lambda_by_reference root_body(&internal_body); fork_join_task root_task{&root_body, id}; scheduler::execute_task(root_task); } @@ -32,14 +33,15 @@ template void invoke_parallel(const Function1 &function1, const Function2 &function2) { using namespace ::pls::internal::scheduling; using namespace ::pls::internal::helpers; + using namespace ::pls::internal::base; static abstract_task::id id = unique_id::create(); auto internal_body = [&](fork_join_sub_task *this_task) { - auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); }; - auto sub_task_1 = fork_join_lambda(&sub_task_body_1); + auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); }; + auto sub_task_2 = fork_join_lambda_by_reference(&sub_task_body_2); - this_task->spawn_child(sub_task_1); - function2(); // Execute last function 'inline' without spawning a sub_task object + this_task->spawn_child(sub_task_2); + function1(); // Execute first function 'inline' without spawning a sub_task object this_task->wait_for_all(); }; @@ -53,14 +55,14 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2, con static abstract_task::id id = unique_id::create(); auto internal_body = [&](fork_join_sub_task *this_task) { - auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); }; - auto sub_task_1 = fork_join_lambda(&sub_task_body_1); auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); }; - auto sub_task_2 = fork_join_lambda(&sub_task_body_2); + auto sub_task_2 = fork_join_lambda_by_reference(&sub_task_body_2); + auto sub_task_body_3 = [&](fork_join_sub_task *) { function3(); }; + auto sub_task_3 = fork_join_lambda_by_reference(&sub_task_body_3); - this_task->spawn_child(sub_task_1); this_task->spawn_child(sub_task_2); - function3(); // Execute last function 'inline' without spawning a sub_task object + this_task->spawn_child(sub_task_3); + function1(); // Execute first function 'inline' without spawning a sub_task object this_task->wait_for_all(); }; diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index a531c58..5797932 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -19,10 +19,32 @@ struct aligned_wrapper { }; void *allocate_aligned(size_t size); -std::uintptr_t next_alignment(std::uintptr_t size); +system_details::pointer_t next_alignment(system_details::pointer_t size); +system_details::pointer_t previous_alignment(system_details::pointer_t size); char *next_alignment(char *pointer); } + +template +struct aligned_aba_pointer { + const system_details::pointer_t pointer_; + + explicit aligned_aba_pointer(T *pointer, unsigned int aba = 0) : pointer_{ + reinterpret_cast(pointer) + aba} {} + + T *pointer() const { + return reinterpret_cast(pointer_ & system_details::CACHE_LINE_ADDRESS_USED_BITS); + } + + unsigned int aba() const { + return pointer_ & system_details::CACHE_LINE_ADDRESS_UNUSED_BITS; + } + + aligned_aba_pointer set_aba(unsigned int aba) const { + return aligned_aba_pointer(pointer(), aba); + } +}; + } } } diff --git a/lib/pls/include/pls/internal/base/backoff.h b/lib/pls/include/pls/internal/base/backoff.h new file mode 100644 index 0000000..9e7cbf5 --- /dev/null +++ b/lib/pls/include/pls/internal/base/backoff.h @@ -0,0 +1,54 @@ + +#ifndef PLS_BACKOFF_H_ +#define PLS_BACKOFF_H_ + +#include "pls/internal/base/system_details.h" +#include "pls/internal/helpers/profiler.h" +#include "pls/internal/base/thread.h" + +#include +#include + +namespace pls { +namespace internal { +namespace base { + +class backoff { + const unsigned long INITIAL_SPIN_ITERS = 2u << 4u; + const unsigned long MAX_SPIN_ITERS = 2u << 8u; + const unsigned long MAX_ITERS = 2u << 10u; + const unsigned long YELD_ITERS = 2u << 10u; + + unsigned long current_ = INITIAL_SPIN_ITERS; + std::minstd_rand random_; + + static void spin(unsigned long iterations) { + for (volatile unsigned long i = 0; i < iterations; i++) + system_details::relax_cpu(); // Spin + } + + public: + backoff() : current_{INITIAL_SPIN_ITERS}, random_{std::random_device{}()} {} + + void do_backoff() { + PROFILE_LOCK("Backoff") + spin(random_() % std::min(current_, MAX_SPIN_ITERS)); + + if (current_ >= YELD_ITERS) { + PROFILE_LOCK("Yield") + this_thread::yield(); + } + + current_ = std::min(current_ * 2, MAX_ITERS); + } + + void reset() { + current_ = INITIAL_SPIN_ITERS; + } +}; + +} +} +} + +#endif //PLS_BACKOFF_H_ diff --git a/lib/pls/include/pls/internal/base/error_handling.h b/lib/pls/include/pls/internal/base/error_handling.h index 235964e..381758a 100644 --- a/lib/pls/include/pls/internal/base/error_handling.h +++ b/lib/pls/include/pls/internal/base/error_handling.h @@ -11,5 +11,6 @@ * (or its inclusion adds too much overhead). */ #define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1); +#define PLS_ASSERT(cond, msg) if (!cond) { PLS_ERROR(msg) } #endif //PLS_ERROR_HANDLING_H diff --git a/lib/pls/include/pls/internal/base/spin_lock.h b/lib/pls/include/pls/internal/base/spin_lock.h index 2506145..497dcb1 100644 --- a/lib/pls/include/pls/internal/base/spin_lock.h +++ b/lib/pls/include/pls/internal/base/spin_lock.h @@ -10,7 +10,7 @@ namespace internal { namespace base { // Default Spin-Lock implementation for this project. -using spin_lock = tas_spin_lock; +using spin_lock = ttas_spin_lock; } } diff --git a/lib/pls/include/pls/internal/base/swmr_spin_lock.h b/lib/pls/include/pls/internal/base/swmr_spin_lock.h new file mode 100644 index 0000000..bfe9284 --- /dev/null +++ b/lib/pls/include/pls/internal/base/swmr_spin_lock.h @@ -0,0 +1,38 @@ + +#ifndef PLS_SWMR_SPIN_LOCK_LOCK_H_ +#define PLS_SWMR_SPIN_LOCK_LOCK_H_ + +#include + +#include "pls/internal/helpers/profiler.h" + +namespace pls { +namespace internal { +namespace base { + +/** + * Single writer, multiple reader spin lock. + * The writer is required to be the same thread all the time (single writer), + * while multiple threads can read. + * Readers fail to lock when the writer requests the lock, + * the acquires the lock after all remaining readers left the critical section. + */ +class swmr_spin_lock { + std::atomic readers_; + std::atomic write_request_; + + public: + explicit swmr_spin_lock() : readers_{0}, write_request_{0} {} + + bool reader_try_lock(); + void reader_unlock(); + + void writer_lock(); + void writer_unlock(); +}; + +} +} +} + +#endif //PLS_SWMR_SPIN_LOCK_LOCK_H_ diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h index af2e71f..92e4c65 100644 --- a/lib/pls/include/pls/internal/base/system_details.h +++ b/lib/pls/include/pls/internal/base/system_details.h @@ -3,6 +3,9 @@ #define PLS_SYSTEM_DETAILS_H #include +#if (COMPILER == MVCC) +#include +#endif namespace pls { namespace internal { @@ -15,18 +18,58 @@ namespace base { * Currently sane default values for x86. */ namespace system_details { + +/** + * Pointer Types needed for ABA protection mixed into addresses. + * pointer_t should be an integer type capable of holding ANY pointer value. + */ +using pointer_t = std::uintptr_t; +constexpr pointer_t ZERO_POINTER = 0; +constexpr pointer_t MAX_POINTER = ~ZERO_POINTER; + +/** + * Biggest type that supports atomic CAS operations. + * Usually it is sane to assume a pointer can be swapped in a single CAS operation. + */ +using cas_integer = pointer_t; +constexpr cas_integer MIN_CAS_INTEGER = 0; +constexpr cas_integer MAX_CAS_INTEGER = ~MIN_CAS_INTEGER; +constexpr cas_integer FIRST_HALF_CAS_INTEGER = MAX_CAS_INTEGER << ((sizeof(cas_integer) / 2) * 8); +constexpr cas_integer SECOND_HALF_CAS_INTEGER = ~FIRST_HALF_CAS_INTEGER; + /** - * Most processors have 64 byte cache lines + * Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings). */ -constexpr std::uintptr_t CACHE_LINE_SIZE = 64; +constexpr unsigned int CACHE_LINE_ADDRESS_BITS = 6; +constexpr pointer_t CACHE_LINE_SIZE = 2u << (CACHE_LINE_ADDRESS_BITS - 1); +constexpr pointer_t CACHE_LINE_ADDRESS_USED_BITS = MAX_POINTER << CACHE_LINE_ADDRESS_BITS; +constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BITS; /** * Choose one of the following ways to store thread specific data. * Try to choose the fastest available on this processor/system. */ -// #define PLS_THREAD_SPECIFIC_PTHREAD +//#define PLS_THREAD_SPECIFIC_PTHREAD #define PLS_THREAD_SPECIFIC_COMPILER +/** + * When spinning one wants to 'relax' the CPU from some task, + * e.g. disabling speculative execution/branch prediction + * or reducing its clock speed. + * This is both good for power draw, as well as for hyperthreading. + * + * Choose the implementation appropriate for your compiler-cpu combination. + */ +#if (COMPILER == MVCC) +inline void relax_cpu() { + _mm_pause(); +} +#elif (COMPILER == GCC || COMPILER == LLVM) +inline void relax_cpu() { + asm("pause"); +} +#endif + } } } diff --git a/lib/pls/include/pls/internal/base/tas_spin_lock.h b/lib/pls/include/pls/internal/base/tas_spin_lock.h index d2c1465..74a11a5 100644 --- a/lib/pls/include/pls/internal/base/tas_spin_lock.h +++ b/lib/pls/include/pls/internal/base/tas_spin_lock.h @@ -21,11 +21,10 @@ namespace base { */ class tas_spin_lock { std::atomic_flag flag_; - unsigned int yield_at_tries_; public: - tas_spin_lock() : flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {}; - tas_spin_lock(const tas_spin_lock &other) : flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {} + tas_spin_lock() : flag_{ATOMIC_FLAG_INIT} {}; + tas_spin_lock(const tas_spin_lock &/*other*/) : flag_{ATOMIC_FLAG_INIT} {} void lock(); bool try_lock(unsigned int num_tries = 1); diff --git a/lib/pls/include/pls/internal/base/thread.h b/lib/pls/include/pls/internal/base/thread.h index 48dc59c..3ffdf88 100644 --- a/lib/pls/include/pls/internal/base/thread.h +++ b/lib/pls/include/pls/internal/base/thread.h @@ -9,6 +9,7 @@ #include #include #include +#include #include "system_details.h" @@ -44,6 +45,11 @@ class this_thread { pthread_yield(); } + static void sleep(long microseconds) { + timespec time{0, 1000 * microseconds}; + nanosleep(&time, nullptr); + } + /** * Retrieves the local state pointer. * diff --git a/lib/pls/include/pls/internal/base/ttas_spin_lock.h b/lib/pls/include/pls/internal/base/ttas_spin_lock.h index 73160ba..787f772 100644 --- a/lib/pls/include/pls/internal/base/ttas_spin_lock.h +++ b/lib/pls/include/pls/internal/base/ttas_spin_lock.h @@ -6,6 +6,7 @@ #include #include "pls/internal/base/thread.h" +#include "pls/internal/base/backoff.h" namespace pls { namespace internal { @@ -18,11 +19,10 @@ namespace base { */ class ttas_spin_lock { std::atomic flag_; - const unsigned int yield_at_tries_; public: - ttas_spin_lock() : flag_{0}, yield_at_tries_{1024} {}; - ttas_spin_lock(const ttas_spin_lock &other) : flag_{0}, yield_at_tries_{other.yield_at_tries_} {} + ttas_spin_lock() : flag_{0} {}; + ttas_spin_lock(const ttas_spin_lock &/*other*/) : flag_{0} {} void lock(); bool try_lock(unsigned int num_tries = 1); diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h index dc46812..2e04702 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h @@ -12,6 +12,8 @@ namespace pls { namespace internal { namespace data_structures { +using base::system_details::pointer_t; + /** * Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region. * The objects will be stored aligned in the stack, making the storage cache friendly and very fast @@ -26,15 +28,16 @@ namespace data_structures { */ class aligned_stack { // Keep bounds of our memory block - char *memory_start_; - char *memory_end_; + pointer_t memory_start_; + pointer_t memory_end_; // Current head will always be aligned to cache lines - char *head_; + pointer_t head_; public: - typedef char *state; + typedef pointer_t state; - aligned_stack() : memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {}; + aligned_stack() : memory_start_{0}, memory_end_{0}, head_{0} {}; + aligned_stack(pointer_t memory_region, std::size_t size); aligned_stack(char *memory_region, std::size_t size); template @@ -48,7 +51,6 @@ class aligned_stack { void reset_state(state new_state) { head_ = new_state; } }; - } } } diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h index f034a4e..849971a 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h @@ -9,7 +9,7 @@ namespace data_structures { template T *aligned_stack::push(const T &object) { // Copy-Construct - return new((void *) push < T > ())T(object); + return new(push < T > ())T(object); } template diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h new file mode 100644 index 0000000..23f734d --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h @@ -0,0 +1,240 @@ + +#ifndef PLS_WORK_STEALING_DEQUE_H_ +#define PLS_WORK_STEALING_DEQUE_H_ + +#include +#include +#include + +#include "pls/internal/base/system_details.h" +#include "pls/internal/base/spin_lock.h" +#include "pls/internal/base/error_handling.h" + +#include "aligned_stack.h" + +namespace pls { +namespace internal { +namespace data_structures { + +using cas_integer = base::system_details::cas_integer; +using pointer_t = base::system_details::pointer_t; +static cas_integer get_stamp(cas_integer n) { + return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8); +} +static cas_integer get_offset(cas_integer n) { + return n & base::system_details::SECOND_HALF_CAS_INTEGER; +} +static cas_integer set_stamp(cas_integer n, cas_integer new_value) { + return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER); +} +//static cas_integer set_offset(cas_integer n, cas_integer new_value) { +// return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER); +//} + +class work_stealing_deque_item { + // Pointer to the actual data + pointer_t data_; + // Index (relative to stack base) to the next and previous element + cas_integer next_item_; + cas_integer previous_item_; + + public: + work_stealing_deque_item() : data_{0}, next_item_{0}, previous_item_{0} {} + + template + Item *data() { + return reinterpret_cast(data_); + } + + template + void set_data(Item *data) { + data_ = reinterpret_cast(data); + } + + cas_integer next_item() { + return next_item_; + } + void set_next_item(cas_integer next_item) { + next_item_ = next_item; + } + cas_integer previous_item() { + return previous_item_; + } + void set_previous_item(cas_integer previous_item) { + previous_item_ = previous_item; + } +}; +static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE, + "Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!"); + +template +class work_stealing_deque { + // Deque 'takes over' stack and handles memory management while in use. + // At any point in time the deque can stop using more memory and the stack can be used by other entities. + aligned_stack *stack_; + pointer_t base_pointer_; + + std::atomic head_; + std::atomic tail_; + cas_integer previous_tail_; + + base::spin_lock lock_{}; // TODO: Remove after debugging + + + public: + using state = aligned_stack::state; + + explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, + base_pointer_{0}, + head_{0}, + tail_{0}, + previous_tail_{0} { + reset_base_pointer(); + } + work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_}, + base_pointer_{other.base_pointer_}, + head_{other.head_.load()}, + tail_{other.tail_.load()}, + previous_tail_{other.previous_tail_} {} + + void reset_base_pointer() { + base_pointer_ = reinterpret_cast(stack_->save_state()); // Keep the base of our region in the stack + } + + work_stealing_deque_item *item_at(cas_integer position) { + return reinterpret_cast(base_pointer_ + + (base::system_details::CACHE_LINE_SIZE * position)); + } + + cas_integer current_stack_offset() { + return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE; + } + + template + std::pair *allocate_item(const T &new_item) { + // 'Union' type to push both on stack + using pair_t = std::pair; + // Allocate space on stack + auto new_pair = reinterpret_cast(stack_->push()); + // Initialize memory on stack + new((void *) &(new_pair->first)) work_stealing_deque_item(); + new((void *) &(new_pair->second)) T(new_item); + + return new_pair; + } + + template + Item *push_tail(const T &new_item) { + cas_integer local_tail = tail_; + + auto new_pair = allocate_item(new_item); + // Prepare current tail to point to correct next items + auto tail_deque_item = item_at(local_tail); + tail_deque_item->set_data(&(new_pair->second)); + tail_deque_item->set_next_item(current_stack_offset()); + tail_deque_item->set_previous_item(previous_tail_); + previous_tail_ = local_tail; + + // Linearization point, item appears after this write + cas_integer new_tail = current_stack_offset(); + tail_ = new_tail; + + return &(new_pair->second); + } + + Item *pop_tail() { + cas_integer local_tail = tail_; + cas_integer local_head = head_; + + if (local_tail <= get_offset(local_head)) { + return nullptr; // EMPTY + } + + work_stealing_deque_item *previous_tail_item = item_at(previous_tail_); + cas_integer new_tail = previous_tail_; + previous_tail_ = previous_tail_item->previous_item(); + + // Publish our wish to set the tail back + tail_ = new_tail; + // Get the state of local head AFTER we published our wish + local_head = head_; // Linearization point, outside knows list is empty + + if (get_offset(local_head) < new_tail) { + return previous_tail_item->data(); // Success, enough distance to other threads + } + + if (get_offset(local_head) == new_tail) { + cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1); + // Try competing with consumers by updating the head's stamp value + if (head_.compare_exchange_strong(local_head, new_head)) { + return previous_tail_item->data(); // SUCCESS, we won the competition with other threads + } + } + + // Some other thread either won the competition or it already set the head further than we are + // before we even tried to compete with it. + // Reset the queue into an empty state => head_ = tail_ + tail_ = get_offset(local_head); // ...we give up to the other winning thread + + return nullptr; // EMPTY, we lost the competition with other threads + } + + Item *pop_head() { + cas_integer local_head = head_; + cas_integer local_tail = tail_; + + if (local_tail <= get_offset(local_head)) { + return nullptr; // EMPTY + } + // Load info on current deque item. + // In case we have a race with a new (aba) overwritten item at this position, + // there has to be a competition over the tail -> the stamp increased and our next + // operation will fail anyways! + work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head)); + cas_integer next_item_offset = head_deque_item->next_item(); + Item *head_data_item = head_deque_item->data(); + + // We try to set the head to this new position. + // Possible outcomes: + // 1) no one interrupted us, we win this competition + // 2) other thread took the head, we lose to this + // 3) owning thread removed tail, we lose to this + cas_integer new_head = set_stamp(next_item_offset, get_stamp(local_head) + 1); + if (head_.compare_exchange_strong(local_head, new_head)) { + return head_data_item; // SUCCESS, we won the competition + } + + return nullptr; // EMPTY, we lost the competition + + } + + void release_memory_until(state state) { + cas_integer item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE; + + cas_integer local_head = head_; + cas_integer local_tail = tail_; + + stack_->reset_state(state); + + if (item_offset < local_tail) { + tail_ = item_offset; + if (get_offset(local_head) >= local_tail) { + head_ = set_stamp(item_offset, get_stamp(local_head) + 1); + } + } + } + + void release_memory_until(Item *item) { + release_memory_until(reinterpret_cast(item)); + } + + state save_state() { + return stack_->save_state(); + } +}; + +} +} +} + +#endif //PLS_WORK_STEALING_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/scheduling/abstract_task.h b/lib/pls/include/pls/internal/scheduling/abstract_task.h index 45e5490..21d7357 100644 --- a/lib/pls/include/pls/internal/scheduling/abstract_task.h +++ b/lib/pls/include/pls/internal/scheduling/abstract_task.h @@ -2,7 +2,7 @@ #ifndef PLS_ABSTRACT_TASK_H #define PLS_ABSTRACT_TASK_H -#include "pls/internal/base/spin_lock.h" +#include "pls/internal/base/swmr_spin_lock.h" #include "pls/internal/helpers/unique_id.h" namespace pls { @@ -16,7 +16,7 @@ class abstract_task { private: unsigned int depth_; abstract_task::id unique_id_; - abstract_task *child_task_; + abstract_task *volatile child_task_; public: abstract_task(const unsigned int depth, const abstract_task::id &unique_id) : @@ -26,14 +26,14 @@ class abstract_task { virtual void execute() = 0; void set_child(abstract_task *child_task) { child_task_ = child_task; } - abstract_task *child() { return child_task_; } + abstract_task *child() const { return child_task_; } void set_depth(unsigned int depth) { depth_ = depth; } unsigned int depth() const { return depth_; } id unique_id() const { return unique_id_; } protected: virtual bool internal_stealing(abstract_task *other_task) = 0; - virtual bool split_task(base::spin_lock *lock) = 0; + virtual bool split_task(base::swmr_spin_lock *lock) = 0; bool steal_work(); }; diff --git a/lib/pls/include/pls/internal/scheduling/fork_join_task.h b/lib/pls/include/pls/internal/scheduling/fork_join_task.h index ffa98a5..51f5c09 100644 --- a/lib/pls/include/pls/internal/scheduling/fork_join_task.h +++ b/lib/pls/include/pls/internal/scheduling/fork_join_task.h @@ -5,7 +5,7 @@ #include "pls/internal/helpers/profiler.h" #include "pls/internal/data_structures/aligned_stack.h" -#include "pls/internal/data_structures/deque.h" +#include "pls/internal/data_structures/work_stealing_deque.h" #include "abstract_task.h" #include "thread_state.h" @@ -15,7 +15,7 @@ namespace internal { namespace scheduling { class fork_join_task; -class fork_join_sub_task : public data_structures::deque_item { +class fork_join_sub_task { friend class fork_join_task; // Coordinate finishing of sub_tasks @@ -25,8 +25,11 @@ class fork_join_sub_task : public data_structures::deque_item { // Access to TBB scheduling environment fork_join_task *tbb_task_; + bool executed = false; + int executed_at = -1; + // Stack Management (reset stack pointer after wait_for_all() calls) - data_structures::aligned_stack::state stack_state_; + data_structures::work_stealing_deque::state deque_state_; protected: explicit fork_join_sub_task(); fork_join_sub_task(const fork_join_sub_task &other); @@ -37,20 +40,19 @@ class fork_join_sub_task : public data_structures::deque_item { public: // Only use them when actually executing this sub_task (only public for simpler API design) template - void spawn_child(const T &sub_task); + void spawn_child(T &sub_task); void wait_for_all(); private: - void spawn_child_internal(fork_join_sub_task *sub_task); void execute(); }; template -class fork_join_lambda : public fork_join_sub_task { +class fork_join_lambda_by_reference : public fork_join_sub_task { const Function *function_; public: - explicit fork_join_lambda(const Function *function) : function_{function} {}; + explicit fork_join_lambda_by_reference(const Function *function) : fork_join_sub_task{}, function_{function} {}; protected: void execute_internal() override { @@ -58,15 +60,27 @@ class fork_join_lambda : public fork_join_sub_task { } }; +template +class fork_join_lambda_by_value : public fork_join_sub_task { + const Function function_; + + public: + explicit fork_join_lambda_by_value(const Function &function) : fork_join_sub_task{}, function_{function} {}; + + protected: + void execute_internal() override { + function_(this); + } +}; + class fork_join_task : public abstract_task { friend class fork_join_sub_task; fork_join_sub_task *root_task_; fork_join_sub_task *currently_executing_; - data_structures::aligned_stack *my_stack_; // Double-Ended Queue management - data_structures::deque deque_; + data_structures::work_stealing_deque deque_; // Steal Management fork_join_sub_task *last_stolen_; @@ -75,7 +89,7 @@ class fork_join_task : public abstract_task { fork_join_sub_task *get_stolen_sub_task(); bool internal_stealing(abstract_task *other_task) override; - bool split_task(base::spin_lock * /*lock*/) override; + bool split_task(base::swmr_spin_lock * /*lock*/) override; public: explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id); @@ -84,12 +98,21 @@ class fork_join_task : public abstract_task { }; template -void fork_join_sub_task::spawn_child(const T &task) { +void fork_join_sub_task::spawn_child(T &task) { PROFILE_FORK_JOIN_STEALING("spawn_child") static_assert(std::is_base_of::value, "Only pass fork_join_sub_task subclasses!"); - T *new_task = tbb_task_->my_stack_->push(task); - spawn_child_internal(new_task); + // Keep our refcount up to date + ref_count_++; + + // Assign forced values + task.parent_ = this; + task.tbb_task_ = tbb_task_; + task.deque_state_ = tbb_task_->deque_.save_state(); + + // Push on our deque + const T const_task = task; + tbb_task_->deque_.push_tail(const_task); } } diff --git a/lib/pls/include/pls/internal/scheduling/root_task.h b/lib/pls/include/pls/internal/scheduling/root_task.h index eff93e5..32a6ea2 100644 --- a/lib/pls/include/pls/internal/scheduling/root_task.h +++ b/lib/pls/include/pls/internal/scheduling/root_task.h @@ -5,7 +5,7 @@ #include #include "pls/internal/helpers/profiler.h" -#include "pls/internal/base/spin_lock.h" +#include "pls/internal/base/swmr_spin_lock.h" #include "abstract_task.h" @@ -43,7 +43,7 @@ class root_task : public abstract_task { return false; } - bool split_task(base::spin_lock * /*lock*/) override { + bool split_task(base::swmr_spin_lock * /*lock*/) override { return false; } }; @@ -70,7 +70,7 @@ class root_worker_task : public abstract_task { return false; } - bool split_task(base::spin_lock * /*lock*/) override { + bool split_task(base::swmr_spin_lock * /*lock*/) override { return false; } }; diff --git a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h index 0a708ff..f412cbd 100644 --- a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h +++ b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h @@ -61,7 +61,7 @@ class run_on_n_threads_task : public abstract_task { return false; } - bool split_task(base::spin_lock *lock) override; + bool split_task(base::swmr_spin_lock *lock) override; }; template @@ -89,19 +89,18 @@ class run_on_n_threads_task_worker : public abstract_task { return false; } - bool split_task(base::spin_lock * /*lock*/) override { + bool split_task(base::swmr_spin_lock * /*lock*/) override { return false; } }; template -bool run_on_n_threads_task::split_task(base::spin_lock *lock) { +bool run_on_n_threads_task::split_task(base::swmr_spin_lock *lock) { if (get_counter() <= 0) { return false; } // In success case, unlock. - // TODO: this locking is complicated and error prone. - lock->unlock(); + lock->reader_unlock(); auto scheduler = base::this_thread::state()->scheduler_; auto task = run_on_n_threads_task_worker{function_, this}; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index fcfc1dd..0af46c7 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -43,27 +43,29 @@ void scheduler::execute_task(Task &task, int depth) { abstract_task *new_task; // Init Task - { - std::lock_guard lock{my_state->lock_}; - old_task = my_state->current_task_; - new_task = my_state->task_stack_->push(task); + old_task = my_state->current_task_; + new_task = my_state->task_stack_->push(task); + + new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); - new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); + { + my_state->lock_.writer_lock(); my_state->current_task_ = new_task; old_task->set_child(new_task); + my_state->lock_.writer_unlock(); } // Run Task new_task->execute(); // Teardown state back to before the task was executed - { - std::lock_guard lock{my_state->lock_}; + my_state->task_stack_->pop(); + { + my_state->lock_.writer_lock(); old_task->set_child(nullptr); my_state->current_task_ = old_task; - - my_state->task_stack_->pop(); + my_state->lock_.writer_unlock(); } } diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 22154f8..0efbf78 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -5,6 +5,7 @@ #include #include "pls/internal/data_structures/aligned_stack.h" +#include "pls/internal/base/swmr_spin_lock.h" #include "abstract_task.h" namespace pls { @@ -15,13 +16,13 @@ namespace scheduling { class scheduler; struct thread_state { - scheduler *scheduler_; - abstract_task *root_task_; - abstract_task *current_task_; - data_structures::aligned_stack *task_stack_; - size_t id_; - base::spin_lock lock_; - std::minstd_rand random_; + alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_; + alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *root_task_; + alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *current_task_; + alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_; + alignas(base::system_details::CACHE_LINE_SIZE) size_t id_; + alignas(base::system_details::CACHE_LINE_SIZE) base::swmr_spin_lock lock_; + alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; thread_state() : scheduler_{nullptr}, @@ -29,6 +30,7 @@ struct thread_state { current_task_{nullptr}, task_stack_{nullptr}, id_{0}, + lock_{}, random_{id_} {}; thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) : @@ -37,6 +39,7 @@ struct thread_state { current_task_{nullptr}, task_stack_{task_stack}, id_{id}, + lock_{}, random_{id_} {} }; diff --git a/lib/pls/src/internal/base/alignment.cpp b/lib/pls/src/internal/base/alignment.cpp index d41364b..79b7a44 100644 --- a/lib/pls/src/internal/base/alignment.cpp +++ b/lib/pls/src/internal/base/alignment.cpp @@ -10,8 +10,8 @@ void *allocate_aligned(size_t size) { return aligned_alloc(system_details::CACHE_LINE_SIZE, size); } -std::uintptr_t next_alignment(std::uintptr_t size) { - std::uintptr_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE; +system_details::pointer_t next_alignment(system_details::pointer_t size) { + system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE; if (miss_alignment == 0) { return size; } else { @@ -19,8 +19,17 @@ std::uintptr_t next_alignment(std::uintptr_t size) { } } +system_details::pointer_t previous_alignment(system_details::pointer_t size) { + system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE; + if (miss_alignment == 0) { + return size; + } else { + return size - miss_alignment; + } +} + char *next_alignment(char *pointer) { - return reinterpret_cast(next_alignment(reinterpret_cast(pointer))); + return reinterpret_cast(next_alignment(reinterpret_cast(pointer))); } } diff --git a/lib/pls/src/internal/base/swmr_spin_lock.cpp b/lib/pls/src/internal/base/swmr_spin_lock.cpp new file mode 100644 index 0000000..ed26ac0 --- /dev/null +++ b/lib/pls/src/internal/base/swmr_spin_lock.cpp @@ -0,0 +1,46 @@ +#include "pls/internal/base/swmr_spin_lock.h" +#include "pls/internal/base/system_details.h" + +namespace pls { +namespace internal { +namespace base { + +bool swmr_spin_lock::reader_try_lock() { + PROFILE_LOCK("Try Acquire Read Lock") + if (write_request_.load(std::memory_order_acquire) == 1) { + return false; + } + // We think we can enter the region + readers_.fetch_add(1, std::memory_order_acquire); + if (write_request_.load(std::memory_order_acquire) == 1) { + // Whoops, the writer acquires the lock, so we back off again + readers_.fetch_add(-1, std::memory_order_release); + return false; + } + + return true; +} + +void swmr_spin_lock::reader_unlock() { + PROFILE_LOCK("Release Read Lock") + readers_--; +} + +void swmr_spin_lock::writer_lock() { + PROFILE_LOCK("Acquire Write Lock") + // Tell the readers that we would like to write + write_request_ = 1; + + // Wait for all of them to exit the critical section + while (readers_ > 0) + system_details::relax_cpu(); // Spin, not expensive as relaxed load +} + +void swmr_spin_lock::writer_unlock() { + PROFILE_LOCK("Release Write Lock") + write_request_ = 0; +} + +} +} +} diff --git a/lib/pls/src/internal/base/tas_spin_lock.cpp b/lib/pls/src/internal/base/tas_spin_lock.cpp index b35d805..ed233bf 100644 --- a/lib/pls/src/internal/base/tas_spin_lock.cpp +++ b/lib/pls/src/internal/base/tas_spin_lock.cpp @@ -1,5 +1,6 @@ #include "pls/internal/helpers/profiler.h" #include "pls/internal/base/tas_spin_lock.h" +#include "pls/internal/base/backoff.h" namespace pls { namespace internal { @@ -7,27 +8,36 @@ namespace base { void tas_spin_lock::lock() { PROFILE_LOCK("Acquire Lock") - int tries = 0; - while (flag_.test_and_set(std::memory_order_acquire)) { - tries++; - if (tries % yield_at_tries_ == 0) { - this_thread::yield(); + backoff backoff_strategy; + + while (true) { + if (flag_.test_and_set(std::memory_order_acquire) == 0) { + return; } + backoff_strategy.do_backoff(); } } bool tas_spin_lock::try_lock(unsigned int num_tries) { PROFILE_LOCK("Try Acquire Lock") - while (flag_.test_and_set(std::memory_order_acquire)) { + backoff backoff_strategy; + + while (true) { + if (flag_.test_and_set(std::memory_order_acquire) == 0) { + return true; + } + num_tries--; if (num_tries <= 0) { return false; } + + backoff_strategy.do_backoff(); } - return true; } void tas_spin_lock::unlock() { + PROFILE_LOCK("Unlock") flag_.clear(std::memory_order_release); } diff --git a/lib/pls/src/internal/base/ttas_spin_lock.cpp b/lib/pls/src/internal/base/ttas_spin_lock.cpp index ba1d3ac..7573d07 100644 --- a/lib/pls/src/internal/base/ttas_spin_lock.cpp +++ b/lib/pls/src/internal/base/ttas_spin_lock.cpp @@ -1,5 +1,6 @@ #include "pls/internal/helpers/profiler.h" #include "pls/internal/base/ttas_spin_lock.h" +#include "pls/internal/base/backoff.h" namespace pls { namespace internal { @@ -7,40 +8,49 @@ namespace base { void ttas_spin_lock::lock() { PROFILE_LOCK("Acquire Lock") - int tries = 0; int expected = 0; + backoff backoff_; - do { - while (flag_.load(std::memory_order_relaxed) == 1) { - tries++; - if (tries % yield_at_tries_ == 0) { - this_thread::yield(); - } - } + while (true) { + while (flag_.load(std::memory_order_relaxed) == 1) + system_details::relax_cpu(); // Spin expected = 0; - } while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)); + if (flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)) { + return; + } + backoff_.do_backoff(); + } } bool ttas_spin_lock::try_lock(unsigned int num_tries) { PROFILE_LOCK("Try Acquire Lock") int expected = 0; + backoff backoff_; - do { - while (flag_.load(std::memory_order_relaxed) == 1) { + while (true) { + while (flag_.load() == 1) { num_tries--; if (num_tries <= 0) { return false; } + system_details::relax_cpu(); } expected = 0; - } while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)); - - return true; + if (flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)) { + return true; + } + num_tries--; + if (num_tries <= 0) { + return false; + } + backoff_.do_backoff(); + } } void ttas_spin_lock::unlock() { + PROFILE_LOCK("Unlock") flag_.store(0, std::memory_order_release); } diff --git a/lib/pls/src/internal/data_structures/aligned_stack.cpp b/lib/pls/src/internal/data_structures/aligned_stack.cpp index c91c3ec..4564338 100644 --- a/lib/pls/src/internal/data_structures/aligned_stack.cpp +++ b/lib/pls/src/internal/data_structures/aligned_stack.cpp @@ -5,11 +5,16 @@ namespace pls { namespace internal { namespace data_structures { -aligned_stack::aligned_stack(char *memory_region, const std::size_t size) : +aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) : memory_start_{memory_region}, memory_end_{memory_region + size}, head_{base::alignment::next_alignment(memory_start_)} {} +aligned_stack::aligned_stack(char *memory_region, const std::size_t size) : + memory_start_{(pointer_t) memory_region}, + memory_end_{(pointer_t) memory_region + size}, + head_{base::alignment::next_alignment(memory_start_)} {} + } } } diff --git a/lib/pls/src/internal/data_structures/deque.cpp b/lib/pls/src/internal/data_structures/deque.cpp index 017a590..9f13b0d 100644 --- a/lib/pls/src/internal/data_structures/deque.cpp +++ b/lib/pls/src/internal/data_structures/deque.cpp @@ -14,11 +14,11 @@ deque_item *deque_internal::pop_head_internal() { } deque_item *result = head_; - head_ = head_->prev_; + head_ = head_->next_; if (head_ == nullptr) { tail_ = nullptr; } else { - head_->next_ = nullptr; + head_->prev_ = nullptr; } return result; @@ -32,11 +32,11 @@ deque_item *deque_internal::pop_tail_internal() { } deque_item *result = tail_; - tail_ = tail_->next_; + tail_ = tail_->prev_; if (tail_ == nullptr) { head_ = nullptr; } else { - tail_->prev_ = nullptr; + tail_->next_ = nullptr; } return result; @@ -46,12 +46,12 @@ void deque_internal::push_tail_internal(deque_item *new_item) { std::lock_guard lock{lock_}; if (tail_ != nullptr) { - tail_->prev_ = new_item; + tail_->next_ = new_item; } else { head_ = new_item; } - new_item->next_ = tail_; - new_item->prev_ = nullptr; + new_item->prev_ = tail_; + new_item->next_ = nullptr; tail_ = new_item; } diff --git a/lib/pls/src/internal/scheduling/abstract_task.cpp b/lib/pls/src/internal/scheduling/abstract_task.cpp index 50a7003..aeccb1d 100644 --- a/lib/pls/src/internal/scheduling/abstract_task.cpp +++ b/lib/pls/src/internal/scheduling/abstract_task.cpp @@ -1,3 +1,4 @@ +#include #include "pls/internal/helpers/profiler.h" #include "pls/internal/scheduling/thread_state.h" @@ -9,28 +10,31 @@ namespace internal { namespace scheduling { bool abstract_task::steal_work() { + thread_local static base::backoff backoff{}; + PROFILE_STEALING("abstract_task::steal_work") const auto my_state = base::this_thread::state(); const auto my_scheduler = my_state->scheduler_; const size_t my_id = my_state->id_; const size_t offset = my_state->random_() % my_scheduler->num_threads(); - const size_t max_tries = 1; // my_scheduler->num_threads(); TODO: Tune this value + const size_t max_tries = my_scheduler->num_threads() - 1; // TODO: Tune this value for (size_t i = 0; i < max_tries; i++) { size_t target = (offset + i) % my_scheduler->num_threads(); if (target == my_id) { - continue; + target = (target + 1) % my_scheduler->num_threads(); } auto target_state = my_scheduler->thread_state_for(target); - // TODO: Cleaner Locking Using std::guarded_lock - target_state->lock_.lock(); + if (!target_state->lock_.reader_try_lock()) { + continue; + } // Dig down to our level PROFILE_STEALING("Go to our level") abstract_task *current_task = target_state->root_task_; while (current_task != nullptr && current_task->depth() < depth()) { - current_task = current_task->child_task_; + current_task = current_task->child(); } PROFILE_END_BLOCK @@ -42,12 +46,13 @@ bool abstract_task::steal_work() { current_task->depth_ == depth_) { if (internal_stealing(current_task)) { // internal steal was a success, hand it back to the internal scheduler - target_state->lock_.unlock(); + target_state->lock_.reader_unlock(); + backoff.reset(); return true; } // No success, we need to steal work from a deeper level using 'top level task stealing' - current_task = current_task->child_task_; + current_task = current_task->child(); } } PROFILE_END_BLOCK; @@ -59,17 +64,19 @@ bool abstract_task::steal_work() { while (current_task != nullptr) { auto lock = &target_state->lock_; if (current_task->split_task(lock)) { - // internal steal was no success (we did a top level task steal) + // top level steal was a success (we did a top level task steal) + backoff.reset(); return false; } current_task = current_task->child_task_; } PROFILE_END_BLOCK; - target_state->lock_.unlock(); + target_state->lock_.reader_unlock(); } // internal steal was no success + backoff.do_backoff(); return false; } diff --git a/lib/pls/src/internal/scheduling/fork_join_task.cpp b/lib/pls/src/internal/scheduling/fork_join_task.cpp index ed30ee9..c7c46d2 100644 --- a/lib/pls/src/internal/scheduling/fork_join_task.cpp +++ b/lib/pls/src/internal/scheduling/fork_join_task.cpp @@ -8,22 +8,25 @@ namespace internal { namespace scheduling { fork_join_sub_task::fork_join_sub_task() : - data_structures::deque_item{}, ref_count_{0}, parent_{nullptr}, tbb_task_{nullptr}, - stack_state_{nullptr} {} + deque_state_{0} {} fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) : - data_structures::deque_item(other), ref_count_{0}, - parent_{nullptr}, - tbb_task_{nullptr}, - stack_state_{nullptr} {} + parent_{other.parent_}, + tbb_task_{other.tbb_task_}, + deque_state_{other.deque_state_} {} void fork_join_sub_task::execute() { PROFILE_WORK_BLOCK("execute sub_task") tbb_task_->currently_executing_ = this; + if (executed) { + PLS_ERROR("Double Execution!") + } + executed = true; + executed_at = base::this_thread::state()->id_; execute_internal(); tbb_task_->currently_executing_ = nullptr; PROFILE_END_BLOCK @@ -34,18 +37,6 @@ void fork_join_sub_task::execute() { } } -void fork_join_sub_task::spawn_child_internal(fork_join_sub_task *sub_task) { - // Keep our refcount up to date - ref_count_++; - - // Assign forced values - sub_task->parent_ = this; - sub_task->tbb_task_ = tbb_task_; - sub_task->stack_state_ = tbb_task_->my_stack_->save_state(); - - tbb_task_->deque_.push_tail(sub_task); -} - void fork_join_sub_task::wait_for_all() { while (ref_count_ > 0) { PROFILE_STEALING("get local sub task") @@ -64,7 +55,7 @@ void fork_join_sub_task::wait_for_all() { } } } - tbb_task_->my_stack_->reset_state(stack_state_); + tbb_task_->deque_.release_memory_until(deque_state_); } fork_join_sub_task *fork_join_task::get_local_sub_task() { @@ -85,7 +76,7 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) { } else { // Make sub-task belong to our fork_join_task instance stolen_sub_task->tbb_task_ = this; - stolen_sub_task->stack_state_ = my_stack_->save_state(); + stolen_sub_task->deque_state_ = deque_.save_state(); // We will execute this next without explicitly moving it onto our stack storage last_stolen_ = stolen_sub_task; @@ -93,7 +84,7 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) { } } -bool fork_join_task::split_task(base::spin_lock *lock) { +bool fork_join_task::split_task(base::swmr_spin_lock *lock) { PROFILE_STEALING("fork_join_task::split_task") fork_join_sub_task *stolen_sub_task = get_stolen_sub_task(); if (stolen_sub_task == nullptr) { @@ -102,8 +93,7 @@ bool fork_join_task::split_task(base::spin_lock *lock) { fork_join_task task{stolen_sub_task, this->unique_id()}; // In success case, unlock. - // TODO: this locking is complicated and error prone. - lock->unlock(); + lock->reader_unlock(); scheduler::execute_task(task, depth()); return true; @@ -113,9 +103,12 @@ void fork_join_task::execute() { PROFILE_WORK_BLOCK("execute fork_join_task"); // Bind this instance to our OS thread - my_stack_ = base::this_thread::state()->task_stack_; + // TODO: See if we did this right + // my_stack_ = base::this_thread::state()->task_stack_; + deque_.reset_base_pointer(); + root_task_->tbb_task_ = this; - root_task_->stack_state_ = my_stack_->save_state(); + root_task_->deque_state_ = deque_.save_state(); // Execute it on our OS thread until its finished root_task_->execute(); @@ -123,12 +116,12 @@ void fork_join_task::execute() { fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; } -fork_join_task::fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id) : +fork_join_task::fork_join_task(fork_join_sub_task *root_task, + const abstract_task::id &id) : abstract_task{0, id}, root_task_{root_task}, currently_executing_{nullptr}, - my_stack_{nullptr}, - deque_{}, + deque_{base::this_thread::state()->task_stack_}, last_stolen_{nullptr} {} } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 8e7850d..7f224a1 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,4 +1,4 @@ add_executable(tests main.cpp - base_tests.cpp scheduling_tests.cpp data_structures_test.cpp) + data_structures_test.cpp) target_link_libraries(tests catch2 pls) diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index 34ec1f9..56e4524 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -130,3 +131,90 @@ TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]" REQUIRE(deque.pop_tail() == &three); } } + +TEST_CASE("work stealing deque stores objects correctly", "[internal/data_structures/aligned_stack.h]") { + constexpr long data_size = 2 << 14; + char data[data_size]; + aligned_stack stack{data, data_size}; + work_stealing_deque deque{&stack}; + + int one = 1, two = 2, three = 3, four = 4; + + SECTION("add and remove items form the tail") { + deque.push_tail(one); + deque.push_tail(two); + deque.push_tail(three); + + REQUIRE(*deque.pop_tail() == three); + REQUIRE(*deque.pop_tail() == two); + REQUIRE(*deque.pop_tail() == one); + } + + SECTION("handles getting empty by popping the tail correctly") { + deque.push_tail(one); + REQUIRE(*deque.pop_tail() == one); + + deque.push_tail(two); + REQUIRE(*deque.pop_tail() == two); + } + + SECTION("remove items form the head") { + deque.push_tail(one); + deque.push_tail(two); + deque.push_tail(three); + + REQUIRE(*deque.pop_head() == one); + REQUIRE(*deque.pop_head() == two); + REQUIRE(*deque.pop_head() == three); + } + + SECTION("handles getting empty by popping the head correctly") { + deque.push_tail(one); + REQUIRE(*deque.pop_head() == one); + + deque.push_tail(two); + REQUIRE(*deque.pop_head() == two); + } + + SECTION("handles getting empty by popping the head and tail correctly") { + deque.push_tail(one); + REQUIRE(*deque.pop_tail() == one); + + deque.push_tail(two); + REQUIRE(*deque.pop_head() == two); + + deque.push_tail(three); + REQUIRE(*deque.pop_tail() == three); + } + + SECTION("handles jumps bigger 1 correctly") { + deque.push_tail(one); + deque.push_tail(two); + REQUIRE(*deque.pop_tail() == two); + + deque.push_tail(three); + deque.push_tail(four); + REQUIRE(*deque.pop_head() == one); + REQUIRE(*deque.pop_head() == three); + REQUIRE(*deque.pop_head() == four); + } + + SECTION("handles stack reset 1 correctly when emptied by tail") { + deque.push_tail(one); + deque.push_tail(two); + auto tmp_result = deque.pop_tail(); + REQUIRE(*tmp_result == two); + + deque.release_memory_until(tmp_result); + REQUIRE(*deque.pop_tail() == one); + + deque.push_tail(three); + deque.push_tail(four); + REQUIRE(*deque.pop_head() == three); + REQUIRE(*deque.pop_tail() == four); + } + + SECTION("synces correctly") { + + } +}