diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index d10012b..54abdd1 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -6,7 +6,7 @@ #include #include -static constexpr int CUTOFF = 10; +static constexpr int CUTOFF = 16; static constexpr int NUM_ITERATIONS = 1000; static constexpr int INPUT_SIZE = 2064; typedef std::vector> complex_vector; diff --git a/app/invoke_parallel/main.cpp b/app/invoke_parallel/main.cpp index 371f90f..c9dd5a7 100644 --- a/app/invoke_parallel/main.cpp +++ b/app/invoke_parallel/main.cpp @@ -2,48 +2,100 @@ #include #include +#include +#include -static pls::static_scheduler_memory<8, 2 << 14> my_scheduler_memory; +static constexpr int CUTOFF = 16; +static constexpr int NUM_ITERATIONS = 1000; +static constexpr int INPUT_SIZE = 2064; +typedef std::vector> complex_vector; -static constexpr int CUTOFF = 10; +void divide(complex_vector::iterator data, int n) { + complex_vector tmp_odd_elements(n / 2); + for (int i = 0; i < n / 2; i++) { + tmp_odd_elements[i] = data[i * 2 + 1]; + } + for (int i = 0; i < n / 2; i++) { + data[i] = data[i * 2]; + } + for (int i = 0; i < n / 2; i++) { + data[i + n / 2] = tmp_odd_elements[i]; + } +} + +void combine(complex_vector::iterator data, int n) { + for (int i = 0; i < n / 2; i++) { + std::complex even = data[i]; + std::complex odd = data[i + n / 2]; + + // w is the "twiddle-factor". + // this could be cached, but we run the same 'data_structures' algorithm parallel/serial, + // so it won't impact the performance comparison. + std::complex w = exp(std::complex(0, -2. * M_PI * i / n)); -long fib_serial(long n) { - if (n == 0) { - return 0; + data[i] = even + w * odd; + data[i + n / 2] = even - w * odd; } - if (n == 1) { - return 1; +} + +void fft(complex_vector::iterator data, int n) { + if (n < 2) { + return; } - return fib_serial(n - 1) + fib_serial(n - 2); + PROFILE_WORK_BLOCK("Divide") + divide(data, n); + PROFILE_END_BLOCK + PROFILE_WORK_BLOCK("Invoke Parallel") + if (n == CUTOFF) { + PROFILE_WORK_BLOCK("FFT Serial") + fft(data, n / 2); + fft(data + n / 2, n / 2); + } else if (n <= CUTOFF) { + fft(data, n / 2); + fft(data + n / 2, n / 2); + } else { + pls::invoke_parallel( + [&] { fft(data, n / 2); }, + [&] { fft(data + n / 2, n / 2); } + ); + } + PROFILE_END_BLOCK + PROFILE_WORK_BLOCK("Combine") + combine(data, n); + PROFILE_END_BLOCK } -long fib(long n) { - if (n <= CUTOFF) { - return fib_serial(n); +complex_vector prepare_input(int input_size) { + std::vector known_frequencies{2, 11, 52, 88, 256}; + complex_vector data(input_size); + + // Set our input data to match a time series of the known_frequencies. + // When applying fft to this time-series we should find these frequencies. + for (int i = 0; i < input_size; i++) { + data[i] = std::complex(0.0, 0.0); + for (auto frequencie : known_frequencies) { + data[i] += sin(2 * M_PI * frequencie * i / input_size); + } } - // Actual 'invoke_parallel' logic/code - int left, right; - pls::invoke_parallel( - [&] { left = fib(n - 1); }, - [&] { right = fib(n - 2); } - ); - return left + right; + return data; } int main() { PROFILE_ENABLE + pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 14}; pls::scheduler scheduler{&my_scheduler_memory, 8}; - long result; + complex_vector initial_input = prepare_input(INPUT_SIZE); scheduler.perform_work([&] { PROFILE_MAIN_THREAD // Call looks just the same, only requirement is // the enclosure in the perform_work lambda. for (int i = 0; i < 10; i++) { - result = fib(30); - std::cout << "Fib(30)=" << result << std::endl; + PROFILE_WORK_BLOCK("Top Level FFT") + complex_vector input = initial_input; + fft(input.begin(), input.size()); } }); diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 442d5c6..d3a7a50 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -11,8 +11,5 @@ #include int main() { - std::cout << pls::internal::scheduling::root_task::create_id().type_.hash_code() << std::endl; - std::cout - << pls::internal::helpers::unique_id::create>().type_.hash_code() - << std::endl; + } diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 5d4999e..52c5710 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -20,6 +20,7 @@ add_library(pls STATIC include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp + include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/profiler.h diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index a531c58..5797932 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -19,10 +19,32 @@ struct aligned_wrapper { }; void *allocate_aligned(size_t size); -std::uintptr_t next_alignment(std::uintptr_t size); +system_details::pointer_t next_alignment(system_details::pointer_t size); +system_details::pointer_t previous_alignment(system_details::pointer_t size); char *next_alignment(char *pointer); } + +template +struct aligned_aba_pointer { + const system_details::pointer_t pointer_; + + explicit aligned_aba_pointer(T *pointer, unsigned int aba = 0) : pointer_{ + reinterpret_cast(pointer) + aba} {} + + T *pointer() const { + return reinterpret_cast(pointer_ & system_details::CACHE_LINE_ADDRESS_USED_BITS); + } + + unsigned int aba() const { + return pointer_ & system_details::CACHE_LINE_ADDRESS_UNUSED_BITS; + } + + aligned_aba_pointer set_aba(unsigned int aba) const { + return aligned_aba_pointer(pointer(), aba); + } +}; + } } } diff --git a/lib/pls/include/pls/internal/base/backoff.h b/lib/pls/include/pls/internal/base/backoff.h index 7d2650f..c4d3827 100644 --- a/lib/pls/include/pls/internal/base/backoff.h +++ b/lib/pls/include/pls/internal/base/backoff.h @@ -14,8 +14,8 @@ namespace internal { namespace base { class backoff { - static constexpr unsigned long INITIAL_SPIN_ITERS = 2u << 2u; - static constexpr unsigned long MAX_SPIN_ITERS = 2u << 6u; + const unsigned long INITIAL_SPIN_ITERS = 2u << 2u; + const unsigned long MAX_SPIN_ITERS = 2u << 6u; unsigned long current_ = INITIAL_SPIN_ITERS; std::minstd_rand random_; diff --git a/lib/pls/include/pls/internal/base/error_handling.h b/lib/pls/include/pls/internal/base/error_handling.h index 235964e..381758a 100644 --- a/lib/pls/include/pls/internal/base/error_handling.h +++ b/lib/pls/include/pls/internal/base/error_handling.h @@ -11,5 +11,6 @@ * (or its inclusion adds too much overhead). */ #define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1); +#define PLS_ASSERT(cond, msg) if (!cond) { PLS_ERROR(msg) } #endif //PLS_ERROR_HANDLING_H diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h index 82f435c..92e4c65 100644 --- a/lib/pls/include/pls/internal/base/system_details.h +++ b/lib/pls/include/pls/internal/base/system_details.h @@ -18,10 +18,32 @@ namespace base { * Currently sane default values for x86. */ namespace system_details { + +/** + * Pointer Types needed for ABA protection mixed into addresses. + * pointer_t should be an integer type capable of holding ANY pointer value. + */ +using pointer_t = std::uintptr_t; +constexpr pointer_t ZERO_POINTER = 0; +constexpr pointer_t MAX_POINTER = ~ZERO_POINTER; + +/** + * Biggest type that supports atomic CAS operations. + * Usually it is sane to assume a pointer can be swapped in a single CAS operation. + */ +using cas_integer = pointer_t; +constexpr cas_integer MIN_CAS_INTEGER = 0; +constexpr cas_integer MAX_CAS_INTEGER = ~MIN_CAS_INTEGER; +constexpr cas_integer FIRST_HALF_CAS_INTEGER = MAX_CAS_INTEGER << ((sizeof(cas_integer) / 2) * 8); +constexpr cas_integer SECOND_HALF_CAS_INTEGER = ~FIRST_HALF_CAS_INTEGER; + /** - * Most processors have 64 byte cache lines + * Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings). */ -constexpr std::uintptr_t CACHE_LINE_SIZE = 64; +constexpr unsigned int CACHE_LINE_ADDRESS_BITS = 6; +constexpr pointer_t CACHE_LINE_SIZE = 2u << (CACHE_LINE_ADDRESS_BITS - 1); +constexpr pointer_t CACHE_LINE_ADDRESS_USED_BITS = MAX_POINTER << CACHE_LINE_ADDRESS_BITS; +constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BITS; /** * Choose one of the following ways to store thread specific data. diff --git a/lib/pls/include/pls/internal/base/ttas_spin_lock.h b/lib/pls/include/pls/internal/base/ttas_spin_lock.h index 3e839f7..787f772 100644 --- a/lib/pls/include/pls/internal/base/ttas_spin_lock.h +++ b/lib/pls/include/pls/internal/base/ttas_spin_lock.h @@ -19,11 +19,10 @@ namespace base { */ class ttas_spin_lock { std::atomic flag_; - backoff backoff_; public: - ttas_spin_lock() : flag_{0}, backoff_{} {}; - ttas_spin_lock(const ttas_spin_lock &/*other*/) : flag_{0}, backoff_{} {} + ttas_spin_lock() : flag_{0} {}; + ttas_spin_lock(const ttas_spin_lock &/*other*/) : flag_{0} {} void lock(); bool try_lock(unsigned int num_tries = 1); diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h index dc46812..2e04702 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h @@ -12,6 +12,8 @@ namespace pls { namespace internal { namespace data_structures { +using base::system_details::pointer_t; + /** * Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region. * The objects will be stored aligned in the stack, making the storage cache friendly and very fast @@ -26,15 +28,16 @@ namespace data_structures { */ class aligned_stack { // Keep bounds of our memory block - char *memory_start_; - char *memory_end_; + pointer_t memory_start_; + pointer_t memory_end_; // Current head will always be aligned to cache lines - char *head_; + pointer_t head_; public: - typedef char *state; + typedef pointer_t state; - aligned_stack() : memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {}; + aligned_stack() : memory_start_{0}, memory_end_{0}, head_{0} {}; + aligned_stack(pointer_t memory_region, std::size_t size); aligned_stack(char *memory_region, std::size_t size); template @@ -48,7 +51,6 @@ class aligned_stack { void reset_state(state new_state) { head_ = new_state; } }; - } } } diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h index f034a4e..849971a 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h @@ -9,7 +9,7 @@ namespace data_structures { template T *aligned_stack::push(const T &object) { // Copy-Construct - return new((void *) push < T > ())T(object); + return new(push < T > ())T(object); } template diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h new file mode 100644 index 0000000..ec0a743 --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h @@ -0,0 +1,307 @@ + +#ifndef PLS_WORK_STEALING_DEQUE_H_ +#define PLS_WORK_STEALING_DEQUE_H_ + +#include +#include +#include + +#include "pls/internal/base/system_details.h" +#include "pls/internal/base/spin_lock.h" +#include "pls/internal/base/error_handling.h" + +#include "aligned_stack.h" + +namespace pls { +namespace internal { +namespace data_structures { + +using cas_integer = base::system_details::cas_integer; +using pointer_t = base::system_details::pointer_t; +static cas_integer get_jump_wish(cas_integer n) { + return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8); +} +static cas_integer get_offset(cas_integer n) { + return n & base::system_details::SECOND_HALF_CAS_INTEGER; +} +static cas_integer set_jump_wish(cas_integer n, cas_integer new_value) { + return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER); +} +static cas_integer set_offset(cas_integer n, cas_integer new_value) { + return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER); +} + +class work_stealing_deque_item { + // Pointer to the actual data + pointer_t data_; + // Index (relative to stack base) to the next and previous element + cas_integer next_item_; + cas_integer previous_item_; + + public: + work_stealing_deque_item() : data_{0}, next_item_{0}, previous_item_{0} {} + + template + Item *data() { + return reinterpret_cast(data_); + } + + template + void set_data(Item *data) { + data_ = reinterpret_cast(data); + } + + cas_integer next_item() { + return next_item_; + } + void set_next_item(cas_integer next_item) { + next_item_ = next_item; + } + cas_integer previous_item() { + return previous_item_; + } + void set_previous_item(cas_integer previous_item) { + previous_item_ = previous_item; + } +}; +static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE, + "Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!"); + +template +class work_stealing_deque { + // Deque 'takes over' stack and handles memory management while in use. + // At any point in time the deque can stop using more memory and the stack can be used by other entities. + aligned_stack *stack_; + pointer_t base_pointer_; + + std::atomic head_; + std::atomic tail_; + cas_integer previous_tail_; + + base::spin_lock lock_{}; // TODO: Remove after debugging + + + public: + using state = aligned_stack::state; + + explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, head_{0}, tail_{0}, previous_tail_{0} { + reset_base_pointer(); + } + work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_}, + base_pointer_{other.base_pointer_}, + head_{other.head_.load()}, + tail_{other.tail_.load()}, + previous_tail_{other.previous_tail_} {} + + void reset_base_pointer() { + base_pointer_ = reinterpret_cast(stack_->save_state()); // Keep the base of our region in the stack + } + + work_stealing_deque_item *item_at(cas_integer position) { + return reinterpret_cast(base_pointer_ + + (base::system_details::CACHE_LINE_SIZE * position)); + } + + cas_integer current_stack_offset() { + return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE; + } + + template + std::pair *allocate_item(const T &new_item) { + // 'Union' type to push both on stack + using pair_t = std::pair; + // Allocate space on stack + auto new_pair = reinterpret_cast(stack_->push()); + // Initialize memory on stack + new((void *) &(new_pair->first)) work_stealing_deque_item(); + new((void *) &(new_pair->second)) T(new_item); + + return new_pair; + } + + template + Item *push_tail(const T &new_item) { +// std::lock_guard lock{lock_}; + cas_integer local_tail = tail_; + cas_integer local_head = head_; +// PLS_ASSERT((local_tail >= get_offset(local_head)), "Tail MUST be in front of head!") + auto new_pair = allocate_item(new_item); + + // Prepare current tail to point to correct next items + auto tail_deque_item = item_at(local_tail); + tail_deque_item->set_data(&(new_pair->second)); + tail_deque_item->set_next_item(current_stack_offset()); + tail_deque_item->set_previous_item(previous_tail_); + previous_tail_ = local_tail; + + // Linearization point, item appears after this write + cas_integer new_tail = current_stack_offset(); + tail_ = new_tail; +// { +// std::lock_guard lock{lock_}; +// std::cout << base::this_thread::state()->id_ << " - " +// << "Pushed Tail " << local_tail << "->" << new_tail << std::endl; +// } + } + + Item *pop_tail() { +// std::lock_guard lock{lock_}; + cas_integer local_tail = tail_; + cas_integer local_head = head_; + + if (local_tail <= get_offset(local_head)) { + return nullptr; // EMPTY + } + + work_stealing_deque_item *previous_tail_item = item_at(previous_tail_); + cas_integer new_tail = previous_tail_; + previous_tail_ = previous_tail_item->previous_item(); + + // Publish our wish to set the tail back + tail_ = new_tail; + // Get the state of local head AFTER we published our wish + local_head = head_; // Linearization point, outside knows list is empty + + if (get_offset(local_head) < new_tail) { +// { +// std::lock_guard lock{lock_}; +// std::cout << base::this_thread::state()->id_ << " - " +// << "Poped Tail (distance) " << local_tail << "->" << new_tail << std::endl; +// } + return previous_tail_item->data(); // Enough distance, return item + } + + cas_integer new_head = set_jump_wish(new_tail, 999999); + if (get_offset(local_head) == new_tail) { + // Try competing with consumers... + if (head_.compare_exchange_strong(local_head, new_head)) { +// { +// std::lock_guard lock{lock_}; +// std::cout << base::this_thread::state()->id_ << " - " +// << "Poped Tail (won competition 1) " << local_tail << "->" << new_tail << std::endl; +// } + return previous_tail_item->data(); // We won the competition, linearization on whom got the item + } + // Cosumer either registered jump wish or has gotten the item. + // Local_Head has the new value of the head, see if the other thread got to advance it + // and if not (only jump wish) try to win the competition. + if (get_offset(local_head) == new_tail && head_.compare_exchange_strong(local_head, new_head)) { +// { +// std::lock_guard lock{lock_}; +// std::cout << base::this_thread::state()->id_ << " - " +// << "Poped Tail (won competition 2) " << local_tail << "->" << new_tail << std::endl; +// } + return previous_tail_item->data(); // We won the competition, linearization on whom got the item + } + } +// { +// std::lock_guard lock{lock_}; +// std::cout << base::this_thread::state()->id_ << " - " +// << "FAILED to pop tail (lost competition) " << get_offset(local_head) << "; " << local_tail << "->" +// << new_tail << std::endl; +// } + + // Some other thread either won the competition or it already set the head further than we are + // before we even tried to compete with it. + // Reset the queue into an empty state => head_ = tail_ + // We can not set it to 0, as the memory is still in use. + tail_ = get_offset(local_head); // Set tail to match the head value the other thread won the battle of + + return nullptr; + } + + Item *pop_head() { +// std::lock_guard lock{lock_}; + cas_integer local_tail = tail_; + cas_integer local_head = head_; + cas_integer local_head_offset = get_offset(local_head); + + if (local_head_offset >= local_tail) { + return nullptr; // EMPTY + } + work_stealing_deque_item *head_deque_item = item_at(local_head_offset); + cas_integer next_item_offset = head_deque_item->next_item(); + Item *head_data_item = head_deque_item->data(); + + cas_integer jump_wish_head = set_jump_wish(local_head_offset, head_deque_item->next_item()); + if (!head_.compare_exchange_strong(local_head, jump_wish_head)) { +// { +// std::lock_guard lock{lock_}; +// std::cout << base::this_thread::state()->id_ << " - " +// << "Failed to pop head (first cas) " << local_head_offset << "->" << next_item_offset << std::endl; +// } + return nullptr; // Someone interrupted us + } + + local_tail = tail_; + if (local_head_offset >= local_tail) { +// std::cout << "Failed to pop head (second tail test) " << get_offset(local_head) << std::endl; + return nullptr; // EMPTY, tail was removed while we registered our jump wish + } + + cas_integer new_head = next_item_offset; + if (!head_.compare_exchange_strong(jump_wish_head, new_head)) { +// { +// std::lock_guard lock{lock_}; +// std::cout << base::this_thread::state()->id_ << " - " +// << "Failed to pop head (second cas) " << local_head_offset << "->" << next_item_offset << std::endl; +// } + return nullptr; // we lost the 'fight' on the item... + } + +// { +// std::lock_guard lock{lock_}; +// std::cout << base::this_thread::state()->id_ << " - " +// << "Popped Head " << local_head_offset << "->" << next_item_offset << std::endl; +// } + return head_deque_item->data(); // We won the 'fight' on the item, it is now save to access it! + } + + void release_memory_until(state state) { +// std::lock_guard lock{lock_}; + cas_integer + item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE; + + cas_integer local_head = head_; + cas_integer local_tail = tail_; + +// if (local_tail != item_offset) { +// std::cout << "..."; +// } else { +// std::cout << "..."; +// } + + stack_->reset_state(state); + + if (item_offset < local_tail) { + tail_ = item_offset; + if (get_offset(local_head) >= local_tail) { + head_ = item_offset; + } + } + +// std::cout << "Release Memory " << item_offset << std::endl; + } + + void release_memory_until(Item *item) { + release_memory_until(reinterpret_cast(item)); + } + + state save_state() { + return stack_->save_state(); + } + + // PUSH item onto stack (allocate + insert into stack) - CHECK + // POP item from bottom of stack (remove from stack, memory still used) - CHECK + // POP item from top of stack (remove from stack, memory still used) - CHECK + + // RELEASE memory from all items allocated after this one (including this one) - CHECK + // -> Tell the data structure that it is safe to reuse the stack space + // Note: Item that is released must not be part of the queue at this point (it is already removed!) +}; + +} +} +} + +#endif //PLS_WORK_STEALING_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/scheduling/fork_join_task.h b/lib/pls/include/pls/internal/scheduling/fork_join_task.h index ffa28b6..51f5c09 100644 --- a/lib/pls/include/pls/internal/scheduling/fork_join_task.h +++ b/lib/pls/include/pls/internal/scheduling/fork_join_task.h @@ -5,7 +5,7 @@ #include "pls/internal/helpers/profiler.h" #include "pls/internal/data_structures/aligned_stack.h" -#include "pls/internal/data_structures/deque.h" +#include "pls/internal/data_structures/work_stealing_deque.h" #include "abstract_task.h" #include "thread_state.h" @@ -15,7 +15,7 @@ namespace internal { namespace scheduling { class fork_join_task; -class fork_join_sub_task : public data_structures::deque_item { +class fork_join_sub_task { friend class fork_join_task; // Coordinate finishing of sub_tasks @@ -25,8 +25,11 @@ class fork_join_sub_task : public data_structures::deque_item { // Access to TBB scheduling environment fork_join_task *tbb_task_; + bool executed = false; + int executed_at = -1; + // Stack Management (reset stack pointer after wait_for_all() calls) - data_structures::aligned_stack::state stack_state_; + data_structures::work_stealing_deque::state deque_state_; protected: explicit fork_join_sub_task(); fork_join_sub_task(const fork_join_sub_task &other); @@ -37,11 +40,10 @@ class fork_join_sub_task : public data_structures::deque_item { public: // Only use them when actually executing this sub_task (only public for simpler API design) template - void spawn_child(const T &sub_task); + void spawn_child(T &sub_task); void wait_for_all(); private: - void spawn_child_internal(fork_join_sub_task *sub_task); void execute(); }; @@ -50,7 +52,7 @@ class fork_join_lambda_by_reference : public fork_join_sub_task { const Function *function_; public: - explicit fork_join_lambda_by_reference(const Function *function) : function_{function} {}; + explicit fork_join_lambda_by_reference(const Function *function) : fork_join_sub_task{}, function_{function} {}; protected: void execute_internal() override { @@ -63,7 +65,7 @@ class fork_join_lambda_by_value : public fork_join_sub_task { const Function function_; public: - explicit fork_join_lambda_by_value(const Function &function) : function_{function} {}; + explicit fork_join_lambda_by_value(const Function &function) : fork_join_sub_task{}, function_{function} {}; protected: void execute_internal() override { @@ -76,10 +78,9 @@ class fork_join_task : public abstract_task { fork_join_sub_task *root_task_; fork_join_sub_task *currently_executing_; - data_structures::aligned_stack *my_stack_; // Double-Ended Queue management - data_structures::deque deque_; + data_structures::work_stealing_deque deque_; // Steal Management fork_join_sub_task *last_stolen_; @@ -97,12 +98,21 @@ class fork_join_task : public abstract_task { }; template -void fork_join_sub_task::spawn_child(const T &task) { +void fork_join_sub_task::spawn_child(T &task) { PROFILE_FORK_JOIN_STEALING("spawn_child") static_assert(std::is_base_of::value, "Only pass fork_join_sub_task subclasses!"); - T *new_task = tbb_task_->my_stack_->push(task); - spawn_child_internal(new_task); + // Keep our refcount up to date + ref_count_++; + + // Assign forced values + task.parent_ = this; + task.tbb_task_ = tbb_task_; + task.deque_state_ = tbb_task_->deque_.save_state(); + + // Push on our deque + const T const_task = task; + tbb_task_->deque_.push_tail(const_task); } } diff --git a/lib/pls/src/internal/base/alignment.cpp b/lib/pls/src/internal/base/alignment.cpp index d41364b..79b7a44 100644 --- a/lib/pls/src/internal/base/alignment.cpp +++ b/lib/pls/src/internal/base/alignment.cpp @@ -10,8 +10,8 @@ void *allocate_aligned(size_t size) { return aligned_alloc(system_details::CACHE_LINE_SIZE, size); } -std::uintptr_t next_alignment(std::uintptr_t size) { - std::uintptr_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE; +system_details::pointer_t next_alignment(system_details::pointer_t size) { + system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE; if (miss_alignment == 0) { return size; } else { @@ -19,8 +19,17 @@ std::uintptr_t next_alignment(std::uintptr_t size) { } } +system_details::pointer_t previous_alignment(system_details::pointer_t size) { + system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE; + if (miss_alignment == 0) { + return size; + } else { + return size - miss_alignment; + } +} + char *next_alignment(char *pointer) { - return reinterpret_cast(next_alignment(reinterpret_cast(pointer))); + return reinterpret_cast(next_alignment(reinterpret_cast(pointer))); } } diff --git a/lib/pls/src/internal/base/swmr_spin_lock.cpp b/lib/pls/src/internal/base/swmr_spin_lock.cpp index 459faa4..ed26ac0 100644 --- a/lib/pls/src/internal/base/swmr_spin_lock.cpp +++ b/lib/pls/src/internal/base/swmr_spin_lock.cpp @@ -23,22 +23,22 @@ bool swmr_spin_lock::reader_try_lock() { void swmr_spin_lock::reader_unlock() { PROFILE_LOCK("Release Read Lock") - readers_.fetch_add(-1, std::memory_order_release); + readers_--; } void swmr_spin_lock::writer_lock() { PROFILE_LOCK("Acquire Write Lock") // Tell the readers that we would like to write - write_request_.store(1, std::memory_order_acquire); + write_request_ = 1; // Wait for all of them to exit the critical section - while (readers_.load(std::memory_order_acquire) > 0) + while (readers_ > 0) system_details::relax_cpu(); // Spin, not expensive as relaxed load } void swmr_spin_lock::writer_unlock() { PROFILE_LOCK("Release Write Lock") - write_request_.store(0, std::memory_order_release); + write_request_ = 0; } } diff --git a/lib/pls/src/internal/base/ttas_spin_lock.cpp b/lib/pls/src/internal/base/ttas_spin_lock.cpp index 6add7cc..7573d07 100644 --- a/lib/pls/src/internal/base/ttas_spin_lock.cpp +++ b/lib/pls/src/internal/base/ttas_spin_lock.cpp @@ -9,7 +9,7 @@ namespace base { void ttas_spin_lock::lock() { PROFILE_LOCK("Acquire Lock") int expected = 0; - backoff_.reset(); + backoff backoff_; while (true) { while (flag_.load(std::memory_order_relaxed) == 1) @@ -26,7 +26,7 @@ void ttas_spin_lock::lock() { bool ttas_spin_lock::try_lock(unsigned int num_tries) { PROFILE_LOCK("Try Acquire Lock") int expected = 0; - backoff_.reset(); + backoff backoff_; while (true) { while (flag_.load() == 1) { diff --git a/lib/pls/src/internal/data_structures/aligned_stack.cpp b/lib/pls/src/internal/data_structures/aligned_stack.cpp index c91c3ec..4564338 100644 --- a/lib/pls/src/internal/data_structures/aligned_stack.cpp +++ b/lib/pls/src/internal/data_structures/aligned_stack.cpp @@ -5,11 +5,16 @@ namespace pls { namespace internal { namespace data_structures { -aligned_stack::aligned_stack(char *memory_region, const std::size_t size) : +aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) : memory_start_{memory_region}, memory_end_{memory_region + size}, head_{base::alignment::next_alignment(memory_start_)} {} +aligned_stack::aligned_stack(char *memory_region, const std::size_t size) : + memory_start_{(pointer_t) memory_region}, + memory_end_{(pointer_t) memory_region + size}, + head_{base::alignment::next_alignment(memory_start_)} {} + } } } diff --git a/lib/pls/src/internal/data_structures/deque.cpp b/lib/pls/src/internal/data_structures/deque.cpp index 017a590..9f13b0d 100644 --- a/lib/pls/src/internal/data_structures/deque.cpp +++ b/lib/pls/src/internal/data_structures/deque.cpp @@ -14,11 +14,11 @@ deque_item *deque_internal::pop_head_internal() { } deque_item *result = head_; - head_ = head_->prev_; + head_ = head_->next_; if (head_ == nullptr) { tail_ = nullptr; } else { - head_->next_ = nullptr; + head_->prev_ = nullptr; } return result; @@ -32,11 +32,11 @@ deque_item *deque_internal::pop_tail_internal() { } deque_item *result = tail_; - tail_ = tail_->next_; + tail_ = tail_->prev_; if (tail_ == nullptr) { head_ = nullptr; } else { - tail_->prev_ = nullptr; + tail_->next_ = nullptr; } return result; @@ -46,12 +46,12 @@ void deque_internal::push_tail_internal(deque_item *new_item) { std::lock_guard lock{lock_}; if (tail_ != nullptr) { - tail_->prev_ = new_item; + tail_->next_ = new_item; } else { head_ = new_item; } - new_item->next_ = tail_; - new_item->prev_ = nullptr; + new_item->prev_ = tail_; + new_item->next_ = nullptr; tail_ = new_item; } diff --git a/lib/pls/src/internal/scheduling/fork_join_task.cpp b/lib/pls/src/internal/scheduling/fork_join_task.cpp index 492475d..7e879ec 100644 --- a/lib/pls/src/internal/scheduling/fork_join_task.cpp +++ b/lib/pls/src/internal/scheduling/fork_join_task.cpp @@ -8,22 +8,26 @@ namespace internal { namespace scheduling { fork_join_sub_task::fork_join_sub_task() : - data_structures::deque_item{}, ref_count_{0}, parent_{nullptr}, tbb_task_{nullptr}, - stack_state_{nullptr} {} + deque_state_{0} {} fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) : - data_structures::deque_item(other), ref_count_{0}, - parent_{nullptr}, - tbb_task_{nullptr}, - stack_state_{nullptr} {} + parent_{other.parent_}, + tbb_task_{other.tbb_task_}, + deque_state_{other.deque_state_} {} void fork_join_sub_task::execute() { PROFILE_WORK_BLOCK("execute sub_task") tbb_task_->currently_executing_ = this; + if (executed) { + int my_id = base::this_thread::state()->id_; + PLS_ERROR("Double Execution!") + } + executed = true; + executed_at = base::this_thread::state()->id_; execute_internal(); tbb_task_->currently_executing_ = nullptr; PROFILE_END_BLOCK @@ -34,18 +38,6 @@ void fork_join_sub_task::execute() { } } -void fork_join_sub_task::spawn_child_internal(fork_join_sub_task *sub_task) { - // Keep our refcount up to date - ref_count_++; - - // Assign forced values - sub_task->parent_ = this; - sub_task->tbb_task_ = tbb_task_; - sub_task->stack_state_ = tbb_task_->my_stack_->save_state(); - - tbb_task_->deque_.push_tail(sub_task); -} - void fork_join_sub_task::wait_for_all() { while (ref_count_ > 0) { PROFILE_STEALING("get local sub task") @@ -54,19 +46,17 @@ void fork_join_sub_task::wait_for_all() { if (local_task != nullptr) { local_task->execute(); } else { - while (ref_count_ > 0) { - // Try to steal work. - // External steal will be executed implicitly if success - PROFILE_STEALING("steal work") - bool internal_steal_success = tbb_task_->steal_work(); - PROFILE_END_BLOCK - if (internal_steal_success) { - tbb_task_->last_stolen_->execute(); - } + // Try to steal work. + // External steal will be executed implicitly if success + PROFILE_STEALING("steal work") + bool internal_steal_success = tbb_task_->steal_work(); + PROFILE_END_BLOCK + if (internal_steal_success) { + tbb_task_->last_stolen_->execute(); } } } - tbb_task_->my_stack_->reset_state(stack_state_); + tbb_task_->deque_.release_memory_until(deque_state_); } fork_join_sub_task *fork_join_task::get_local_sub_task() { @@ -74,7 +64,9 @@ fork_join_sub_task *fork_join_task::get_local_sub_task() { } fork_join_sub_task *fork_join_task::get_stolen_sub_task() { - return deque_.pop_head(); + auto tmp = deque_.save_state(); + auto result = deque_.pop_head(); + return result; } bool fork_join_task::internal_stealing(abstract_task *other_task) { @@ -87,7 +79,7 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) { } else { // Make sub-task belong to our fork_join_task instance stolen_sub_task->tbb_task_ = this; - stolen_sub_task->stack_state_ = my_stack_->save_state(); + stolen_sub_task->deque_state_ = deque_.save_state(); // We will execute this next without explicitly moving it onto our stack storage last_stolen_ = stolen_sub_task; @@ -114,9 +106,12 @@ void fork_join_task::execute() { PROFILE_WORK_BLOCK("execute fork_join_task"); // Bind this instance to our OS thread - my_stack_ = base::this_thread::state()->task_stack_; + // TODO: See if we did this right + // my_stack_ = base::this_thread::state()->task_stack_; + deque_.reset_base_pointer(); + root_task_->tbb_task_ = this; - root_task_->stack_state_ = my_stack_->save_state(); + root_task_->deque_state_ = deque_.save_state(); // Execute it on our OS thread until its finished root_task_->execute(); @@ -124,12 +119,12 @@ void fork_join_task::execute() { fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; } -fork_join_task::fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id) : +fork_join_task::fork_join_task(fork_join_sub_task *root_task, + const abstract_task::id &id) : abstract_task{0, id}, root_task_{root_task}, currently_executing_{nullptr}, - my_stack_{nullptr}, - deque_{}, + deque_{base::this_thread::state()->task_stack_}, last_stolen_{nullptr} {} } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 8e7850d..7f224a1 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,4 +1,4 @@ add_executable(tests main.cpp - base_tests.cpp scheduling_tests.cpp data_structures_test.cpp) + data_structures_test.cpp) target_link_libraries(tests catch2 pls) diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index 34ec1f9..56e4524 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -130,3 +131,90 @@ TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]" REQUIRE(deque.pop_tail() == &three); } } + +TEST_CASE("work stealing deque stores objects correctly", "[internal/data_structures/aligned_stack.h]") { + constexpr long data_size = 2 << 14; + char data[data_size]; + aligned_stack stack{data, data_size}; + work_stealing_deque deque{&stack}; + + int one = 1, two = 2, three = 3, four = 4; + + SECTION("add and remove items form the tail") { + deque.push_tail(one); + deque.push_tail(two); + deque.push_tail(three); + + REQUIRE(*deque.pop_tail() == three); + REQUIRE(*deque.pop_tail() == two); + REQUIRE(*deque.pop_tail() == one); + } + + SECTION("handles getting empty by popping the tail correctly") { + deque.push_tail(one); + REQUIRE(*deque.pop_tail() == one); + + deque.push_tail(two); + REQUIRE(*deque.pop_tail() == two); + } + + SECTION("remove items form the head") { + deque.push_tail(one); + deque.push_tail(two); + deque.push_tail(three); + + REQUIRE(*deque.pop_head() == one); + REQUIRE(*deque.pop_head() == two); + REQUIRE(*deque.pop_head() == three); + } + + SECTION("handles getting empty by popping the head correctly") { + deque.push_tail(one); + REQUIRE(*deque.pop_head() == one); + + deque.push_tail(two); + REQUIRE(*deque.pop_head() == two); + } + + SECTION("handles getting empty by popping the head and tail correctly") { + deque.push_tail(one); + REQUIRE(*deque.pop_tail() == one); + + deque.push_tail(two); + REQUIRE(*deque.pop_head() == two); + + deque.push_tail(three); + REQUIRE(*deque.pop_tail() == three); + } + + SECTION("handles jumps bigger 1 correctly") { + deque.push_tail(one); + deque.push_tail(two); + REQUIRE(*deque.pop_tail() == two); + + deque.push_tail(three); + deque.push_tail(four); + REQUIRE(*deque.pop_head() == one); + REQUIRE(*deque.pop_head() == three); + REQUIRE(*deque.pop_head() == four); + } + + SECTION("handles stack reset 1 correctly when emptied by tail") { + deque.push_tail(one); + deque.push_tail(two); + auto tmp_result = deque.pop_tail(); + REQUIRE(*tmp_result == two); + + deque.release_memory_until(tmp_result); + REQUIRE(*deque.pop_tail() == one); + + deque.push_tail(three); + deque.push_tail(four); + REQUIRE(*deque.pop_head() == three); + REQUIRE(*deque.pop_tail() == four); + } + + SECTION("synces correctly") { + + } +}