diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 7653def..1888079 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -21,11 +21,13 @@ add_library(pls STATIC include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp + include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/profiler.h include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/unique_id.h + include/pls/internal/helpers/split_integer.h include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index 5797932..2d05e07 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -25,26 +25,6 @@ char *next_alignment(char *pointer); } -template -struct aligned_aba_pointer { - const system_details::pointer_t pointer_; - - explicit aligned_aba_pointer(T *pointer, unsigned int aba = 0) : pointer_{ - reinterpret_cast(pointer) + aba} {} - - T *pointer() const { - return reinterpret_cast(pointer_ & system_details::CACHE_LINE_ADDRESS_USED_BITS); - } - - unsigned int aba() const { - return pointer_ & system_details::CACHE_LINE_ADDRESS_UNUSED_BITS; - } - - aligned_aba_pointer set_aba(unsigned int aba) const { - return aligned_aba_pointer(pointer(), aba); - } -}; - } } } diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h index 92e4c65..0f0d5e7 100644 --- a/lib/pls/include/pls/internal/base/system_details.h +++ b/lib/pls/include/pls/internal/base/system_details.h @@ -24,26 +24,18 @@ namespace system_details { * pointer_t should be an integer type capable of holding ANY pointer value. */ using pointer_t = std::uintptr_t; -constexpr pointer_t ZERO_POINTER = 0; -constexpr pointer_t MAX_POINTER = ~ZERO_POINTER; /** * Biggest type that supports atomic CAS operations. * Usually it is sane to assume a pointer can be swapped in a single CAS operation. */ -using cas_integer = pointer_t; -constexpr cas_integer MIN_CAS_INTEGER = 0; -constexpr cas_integer MAX_CAS_INTEGER = ~MIN_CAS_INTEGER; -constexpr cas_integer FIRST_HALF_CAS_INTEGER = MAX_CAS_INTEGER << ((sizeof(cas_integer) / 2) * 8); -constexpr cas_integer SECOND_HALF_CAS_INTEGER = ~FIRST_HALF_CAS_INTEGER; +using cas_integer = std::uintptr_t; +constexpr unsigned long CAS_SIZE = sizeof(cas_integer); /** * Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings). */ -constexpr unsigned int CACHE_LINE_ADDRESS_BITS = 6; -constexpr pointer_t CACHE_LINE_SIZE = 2u << (CACHE_LINE_ADDRESS_BITS - 1); -constexpr pointer_t CACHE_LINE_ADDRESS_USED_BITS = MAX_POINTER << CACHE_LINE_ADDRESS_BITS; -constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BITS; +constexpr pointer_t CACHE_LINE_SIZE = 64; /** * Choose one of the following ways to store thread specific data. diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h index e74d376..9afec8d 100644 --- a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h @@ -3,12 +3,11 @@ #define PLS_WORK_STEALING_DEQUE_H_ #include -#include -#include +#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/base/system_details.h" -#include "pls/internal/base/spin_lock.h" #include "pls/internal/base/error_handling.h" +#include "pls/internal/helpers/split_integer.h" #include "aligned_stack.h" @@ -16,30 +15,28 @@ namespace pls { namespace internal { namespace data_structures { -using cas_integer = base::system_details::cas_integer; -using pointer_t = base::system_details::pointer_t; -static cas_integer get_stamp(cas_integer n) { - return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8); -} -static cas_integer get_offset(cas_integer n) { - return n & base::system_details::SECOND_HALF_CAS_INTEGER; -} -static cas_integer set_stamp(cas_integer n, cas_integer new_value) { - return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER); +using base::system_details::pointer_t; + +// Integer split into two halfs, can be used in CAS operations +constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2; +using cas_integer = helpers::split_integer; +static unsigned long get_stamp(cas_integer n) { return n.left; } +static unsigned long get_offset(cas_integer n) { return n.right; } +static cas_integer set_stamp(cas_integer n, unsigned long new_value) { + n.left = new_value; + return n; } -//static cas_integer set_offset(cas_integer n, cas_integer new_value) { -// return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER); -//} +// Single Item in the deque class work_stealing_deque_item { // Pointer to the actual data pointer_t data_; // Index (relative to stack base) to the next and previous element - cas_integer next_item_; - cas_integer previous_item_; + unsigned long next_item_; + unsigned long previous_item_; public: - work_stealing_deque_item() : data_{0}, next_item_{0}, previous_item_{0} {} + work_stealing_deque_item() : data_{0}, next_item_{}, previous_item_{} {} template Item *data() { @@ -51,18 +48,11 @@ class work_stealing_deque_item { data_ = reinterpret_cast(data); } - cas_integer next_item() { - return next_item_; - } - void set_next_item(cas_integer next_item) { - next_item_ = next_item; - } - cas_integer previous_item() { - return previous_item_; - } - void set_previous_item(cas_integer previous_item) { - previous_item_ = previous_item; - } + unsigned long next_item() const { return next_item_; } + void set_next_item(unsigned long next_item) { next_item_ = next_item; } + + unsigned long previous_item() const { return previous_item_; } + void set_previous_item(unsigned long previous_item) { previous_item_ = previous_item; } }; static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE, "Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!"); @@ -83,9 +73,9 @@ class work_stealing_deque { explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, base_pointer_{0}, - head_{0}, - tail_{0}, - previous_tail_{0} { + head_{cas_integer{}}, + tail_{cas_integer{}}, + previous_tail_{cas_integer{}} { reset_base_pointer(); } work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_}, @@ -94,144 +84,25 @@ class work_stealing_deque { tail_{other.tail_.load()}, previous_tail_{other.previous_tail_} {} - void reset_base_pointer() { - base_pointer_ = reinterpret_cast(stack_->save_state()); // Keep the base of our region in the stack - } - - work_stealing_deque_item *item_at(cas_integer position) { - return reinterpret_cast(base_pointer_ - + (base::system_details::CACHE_LINE_SIZE * position)); - } - - cas_integer current_stack_offset() { - return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE; - } + void reset_base_pointer(); + work_stealing_deque_item *item_at(unsigned long offset); + unsigned long current_stack_offset(); template - std::pair *allocate_item(const T &new_item) { - // 'Union' type to push both on stack - using pair_t = std::pair; - // Allocate space on stack - auto new_pair = reinterpret_cast(stack_->push()); - // Initialize memory on stack - new((void *) &(new_pair->first)) work_stealing_deque_item(); - new((void *) &(new_pair->second)) T(new_item); - - return new_pair; - } + std::pair *allocate_item(const T &new_item); template - Item *push_tail(const T &new_item) { - cas_integer local_tail = tail_; - - auto new_pair = allocate_item(new_item); - // Prepare current tail to point to correct next items - auto tail_deque_item = item_at(local_tail); - tail_deque_item->set_data(&(new_pair->second)); - tail_deque_item->set_next_item(current_stack_offset()); - tail_deque_item->set_previous_item(previous_tail_); - previous_tail_ = local_tail; - - // Linearization point, item appears after this write - cas_integer new_tail = current_stack_offset(); - tail_ = new_tail; - - return &(new_pair->second); - } - - Item *pop_tail() { - cas_integer local_tail = tail_; - cas_integer local_head = head_; - - if (local_tail <= get_offset(local_head)) { - return nullptr; // EMPTY - } - - work_stealing_deque_item *previous_tail_item = item_at(previous_tail_); - cas_integer new_tail = previous_tail_; - previous_tail_ = previous_tail_item->previous_item(); - - // Publish our wish to set the tail back - tail_ = new_tail; - // Get the state of local head AFTER we published our wish - local_head = head_; // Linearization point, outside knows list is empty - - if (get_offset(local_head) < new_tail) { - return previous_tail_item->data(); // Success, enough distance to other threads - } - - if (get_offset(local_head) == new_tail) { - cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1); - // Try competing with consumers by updating the head's stamp value - if (head_.compare_exchange_strong(local_head, new_head)) { - return previous_tail_item->data(); // SUCCESS, we won the competition with other threads - } - } - - // Some other thread either won the competition or it already set the head further than we are - // before we even tried to compete with it. - // Reset the queue into an empty state => head_ = tail_ - tail_ = get_offset(local_head); // ...we give up to the other winning thread - - return nullptr; // EMPTY, we lost the competition with other threads - } - - Item *pop_head() { - cas_integer local_head = head_; - cas_integer local_tail = tail_; - - if (local_tail <= get_offset(local_head)) { - return nullptr; // EMPTY - } - // Load info on current deque item. - // In case we have a race with a new (aba) overwritten item at this position, - // there has to be a competition over the tail -> the stamp increased and our next - // operation will fail anyways! - work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head)); - cas_integer next_item_offset = head_deque_item->next_item(); - Item *head_data_item = head_deque_item->data(); - - // We try to set the head to this new position. - // Possible outcomes: - // 1) no one interrupted us, we win this competition - // 2) other thread took the head, we lose to this - // 3) owning thread removed tail, we lose to this - cas_integer new_head = set_stamp(next_item_offset, get_stamp(local_head) + 1); - if (head_.compare_exchange_strong(local_head, new_head)) { - return head_data_item; // SUCCESS, we won the competition - } - - return nullptr; // EMPTY, we lost the competition - - } - - void release_memory_until(state state) { - cas_integer item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE; + Item *push_tail(const T &new_item); + Item *pop_tail(); + Item *pop_head(); - cas_integer local_head = head_; - cas_integer local_tail = tail_; - - stack_->reset_state(state); - - if (item_offset < local_tail) { - tail_ = item_offset; - if (get_offset(local_head) >= local_tail) { - head_ = set_stamp(item_offset, get_stamp(local_head) + 1); - } - } - } - - void release_memory_until(Item *item) { - release_memory_until(reinterpret_cast(item)); - } - - state save_state() { - return stack_->save_state(); - } + void release_memory_until(state state); + state save_state(); }; } } } +#include "work_stealing_deque_impl.h" #endif //PLS_WORK_STEALING_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h new file mode 100644 index 0000000..516e1c5 --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h @@ -0,0 +1,152 @@ + +#ifndef PLS_WORK_STEALING_DEQUE_IMPL_H_ +#define PLS_WORK_STEALING_DEQUE_IMPL_H_ + +namespace pls { +namespace internal { +namespace data_structures { + +template +void work_stealing_deque::reset_base_pointer() { + base_pointer_ = reinterpret_cast(stack_->save_state()); // Keep the base of our region in the stack +} + +template +work_stealing_deque_item *work_stealing_deque::item_at(unsigned long offset) { + return reinterpret_cast(base_pointer_ + + (base::system_details::CACHE_LINE_SIZE * offset)); +} + +template +unsigned long work_stealing_deque::current_stack_offset() { + return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE; +} + +template +template +std::pair *work_stealing_deque::allocate_item(const T &new_item) { + // 'Union' type to push both on stack + using pair_t = std::pair; + // Allocate space on stack + auto new_pair = reinterpret_cast(stack_->push()); + // Initialize memory on stack + new((void *) &(new_pair->first)) work_stealing_deque_item(); + new((void *) &(new_pair->second)) T(new_item); + + return new_pair; +} + +template +template +Item *work_stealing_deque::push_tail(const T &new_item) { + cas_integer local_tail = tail_; + + auto new_pair = allocate_item(new_item); + // Prepare current tail to point to correct next items + auto tail_deque_item = item_at(get_offset(local_tail)); + tail_deque_item->set_data(&(new_pair->second)); + tail_deque_item->set_next_item(current_stack_offset()); + tail_deque_item->set_previous_item(get_offset(previous_tail_)); + previous_tail_ = local_tail; + + // Linearization point, item appears after this write + cas_integer new_tail = cas_integer{0, current_stack_offset()}; + tail_ = new_tail; + + return &(new_pair->second); +} + +template +Item *work_stealing_deque::pop_tail() { + cas_integer local_tail = tail_; + cas_integer local_head = head_; + + if (get_offset(local_tail) <= get_offset(local_head)) { + return nullptr; // EMPTY + } + + work_stealing_deque_item *previous_tail_item = item_at(get_offset(previous_tail_)); + cas_integer new_tail = previous_tail_; + previous_tail_ = cas_integer{0, previous_tail_item->previous_item()}; + + // Publish our wish to set the tail back + tail_ = new_tail; + // Get the state of local head AFTER we published our wish + local_head = head_; // Linearization point, outside knows list is empty + + if (get_offset(local_head) < get_offset(new_tail)) { + return previous_tail_item->data(); // Success, enough distance to other threads + } + + if (get_offset(local_head) == get_offset(new_tail)) { + cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1); + // Try competing with consumers by updating the head's stamp value + if (head_.compare_exchange_strong(local_head, new_head)) { + return previous_tail_item->data(); // SUCCESS, we won the competition with other threads + } + } + + // Some other thread either won the competition or it already set the head further than we are + // before we even tried to compete with it. + // Reset the queue into an empty state => head_ = tail_ + tail_ = cas_integer{0, get_offset(local_head)}; // ...we give up to the other winning thread + + return nullptr; // EMPTY, we lost the competition with other threads +} + +template +Item *work_stealing_deque::pop_head() { + cas_integer local_head = head_; + cas_integer local_tail = tail_; + + if (get_offset(local_tail) <= get_offset(local_head)) { + return nullptr; // EMPTY + } + // Load info on current deque item. + // In case we have a race with a new (aba) overwritten item at this position, + // there has to be a competition over the tail -> the stamp increased and our next + // operation will fail anyways! + work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head)); + unsigned long next_item_offset = head_deque_item->next_item(); + Item *head_data_item = head_deque_item->data(); + + // We try to set the head to this new position. + // Possible outcomes: + // 1) no one interrupted us, we win this competition + // 2) other thread took the head, we lose to this + // 3) owning thread removed tail, we lose to this + cas_integer new_head = cas_integer{get_stamp(local_head) + 1, next_item_offset}; + if (head_.compare_exchange_strong(local_head, new_head)) { + return head_data_item; // SUCCESS, we won the competition + } + + return nullptr; // EMPTY, we lost the competition +} + +template +void work_stealing_deque::release_memory_until(state state) { + unsigned long item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE; + + cas_integer local_head = head_; + cas_integer local_tail = tail_; + + stack_->reset_state(state); + + if (item_offset < get_offset(local_tail)) { + tail_ = cas_integer{0, item_offset}; + if (get_offset(local_head) >= get_offset(local_tail)) { + head_ = cas_integer{get_stamp(local_head) + 1, item_offset}; + } + } +} + +template +typename work_stealing_deque::state work_stealing_deque::save_state() { + return stack_->save_state(); +} + +} +} +} + +#endif //PLS_WORK_STEALING_DEQUE_IMPL_H_ diff --git a/lib/pls/include/pls/internal/helpers/split_integer.h b/lib/pls/include/pls/internal/helpers/split_integer.h new file mode 100644 index 0000000..83d973e --- /dev/null +++ b/lib/pls/include/pls/internal/helpers/split_integer.h @@ -0,0 +1,24 @@ + +#ifndef PLS_SPLIT_INTEGER_H_ +#define PLS_SPLIT_INTEGER_H_ + +#include "pls/internal/base/system_details.h" + +namespace pls { +namespace internal { +namespace helpers { + +template +struct split_integer { + unsigned long left:L; + unsigned long right:R; + + split_integer() : left{0}, right{0} {}; + split_integer(unsigned long new_left, unsigned long new_right) : left{new_left}, right{new_right} {}; +}; + +} +} +} + +#endif //PLS_SPLIT_CAS_INTEGER_H_ diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index 56e4524..97f91ca 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -201,11 +201,11 @@ TEST_CASE("work stealing deque stores objects correctly", "[internal/data_struct SECTION("handles stack reset 1 correctly when emptied by tail") { deque.push_tail(one); + auto state = deque.save_state(); deque.push_tail(two); - auto tmp_result = deque.pop_tail(); - REQUIRE(*tmp_result == two); + REQUIRE(*deque.pop_tail() == two); - deque.release_memory_until(tmp_result); + deque.release_memory_until(state); REQUIRE(*deque.pop_tail() == one); deque.push_tail(three);