Commit d553ed51 by FritzFlorian

Replace cas_integer with class/bitmask based approach.

parent 2b269f96
...@@ -21,11 +21,13 @@ add_library(pls STATIC ...@@ -21,11 +21,13 @@ add_library(pls STATIC
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp
include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h
include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/prohibit_new.h
include/pls/internal/helpers/profiler.h include/pls/internal/helpers/profiler.h
include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/mini_benchmark.h
include/pls/internal/helpers/unique_id.h include/pls/internal/helpers/unique_id.h
include/pls/internal/helpers/split_integer.h
include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
......
...@@ -25,26 +25,6 @@ char *next_alignment(char *pointer); ...@@ -25,26 +25,6 @@ char *next_alignment(char *pointer);
} }
template<typename T>
struct aligned_aba_pointer {
const system_details::pointer_t pointer_;
explicit aligned_aba_pointer(T *pointer, unsigned int aba = 0) : pointer_{
reinterpret_cast<system_details::pointer_t >(pointer) + aba} {}
T *pointer() const {
return reinterpret_cast<T *>(pointer_ & system_details::CACHE_LINE_ADDRESS_USED_BITS);
}
unsigned int aba() const {
return pointer_ & system_details::CACHE_LINE_ADDRESS_UNUSED_BITS;
}
aligned_aba_pointer set_aba(unsigned int aba) const {
return aligned_aba_pointer(pointer(), aba);
}
};
} }
} }
} }
......
...@@ -24,26 +24,18 @@ namespace system_details { ...@@ -24,26 +24,18 @@ namespace system_details {
* pointer_t should be an integer type capable of holding ANY pointer value. * pointer_t should be an integer type capable of holding ANY pointer value.
*/ */
using pointer_t = std::uintptr_t; using pointer_t = std::uintptr_t;
constexpr pointer_t ZERO_POINTER = 0;
constexpr pointer_t MAX_POINTER = ~ZERO_POINTER;
/** /**
* Biggest type that supports atomic CAS operations. * Biggest type that supports atomic CAS operations.
* Usually it is sane to assume a pointer can be swapped in a single CAS operation. * Usually it is sane to assume a pointer can be swapped in a single CAS operation.
*/ */
using cas_integer = pointer_t; using cas_integer = std::uintptr_t;
constexpr cas_integer MIN_CAS_INTEGER = 0; constexpr unsigned long CAS_SIZE = sizeof(cas_integer);
constexpr cas_integer MAX_CAS_INTEGER = ~MIN_CAS_INTEGER;
constexpr cas_integer FIRST_HALF_CAS_INTEGER = MAX_CAS_INTEGER << ((sizeof(cas_integer) / 2) * 8);
constexpr cas_integer SECOND_HALF_CAS_INTEGER = ~FIRST_HALF_CAS_INTEGER;
/** /**
* Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings). * Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings).
*/ */
constexpr unsigned int CACHE_LINE_ADDRESS_BITS = 6; constexpr pointer_t CACHE_LINE_SIZE = 64;
constexpr pointer_t CACHE_LINE_SIZE = 2u << (CACHE_LINE_ADDRESS_BITS - 1);
constexpr pointer_t CACHE_LINE_ADDRESS_USED_BITS = MAX_POINTER << CACHE_LINE_ADDRESS_BITS;
constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BITS;
/** /**
* Choose one of the following ways to store thread specific data. * Choose one of the following ways to store thread specific data.
......
...@@ -3,12 +3,11 @@ ...@@ -3,12 +3,11 @@
#define PLS_WORK_STEALING_DEQUE_H_ #define PLS_WORK_STEALING_DEQUE_H_
#include <atomic> #include <atomic>
#include <mutex>
#include <pls/internal/scheduling/thread_state.h>
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include "pls/internal/helpers/split_integer.h"
#include "aligned_stack.h" #include "aligned_stack.h"
...@@ -16,30 +15,28 @@ namespace pls { ...@@ -16,30 +15,28 @@ namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
using cas_integer = base::system_details::cas_integer; using base::system_details::pointer_t;
using pointer_t = base::system_details::pointer_t;
static cas_integer get_stamp(cas_integer n) { // Integer split into two halfs, can be used in CAS operations
return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8); constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2;
} using cas_integer = helpers::split_integer<HALF_CACHE_LINE, HALF_CACHE_LINE>;
static cas_integer get_offset(cas_integer n) { static unsigned long get_stamp(cas_integer n) { return n.left; }
return n & base::system_details::SECOND_HALF_CAS_INTEGER; static unsigned long get_offset(cas_integer n) { return n.right; }
} static cas_integer set_stamp(cas_integer n, unsigned long new_value) {
static cas_integer set_stamp(cas_integer n, cas_integer new_value) { n.left = new_value;
return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER); return n;
} }
//static cas_integer set_offset(cas_integer n, cas_integer new_value) {
// return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER);
//}
// Single Item in the deque
class work_stealing_deque_item { class work_stealing_deque_item {
// Pointer to the actual data // Pointer to the actual data
pointer_t data_; pointer_t data_;
// Index (relative to stack base) to the next and previous element // Index (relative to stack base) to the next and previous element
cas_integer next_item_; unsigned long next_item_;
cas_integer previous_item_; unsigned long previous_item_;
public: public:
work_stealing_deque_item() : data_{0}, next_item_{0}, previous_item_{0} {} work_stealing_deque_item() : data_{0}, next_item_{}, previous_item_{} {}
template<typename Item> template<typename Item>
Item *data() { Item *data() {
...@@ -51,18 +48,11 @@ class work_stealing_deque_item { ...@@ -51,18 +48,11 @@ class work_stealing_deque_item {
data_ = reinterpret_cast<pointer_t >(data); data_ = reinterpret_cast<pointer_t >(data);
} }
cas_integer next_item() { unsigned long next_item() const { return next_item_; }
return next_item_; void set_next_item(unsigned long next_item) { next_item_ = next_item; }
}
void set_next_item(cas_integer next_item) { unsigned long previous_item() const { return previous_item_; }
next_item_ = next_item; void set_previous_item(unsigned long previous_item) { previous_item_ = previous_item; }
}
cas_integer previous_item() {
return previous_item_;
}
void set_previous_item(cas_integer previous_item) {
previous_item_ = previous_item;
}
}; };
static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE, static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE,
"Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!"); "Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!");
...@@ -83,9 +73,9 @@ class work_stealing_deque { ...@@ -83,9 +73,9 @@ class work_stealing_deque {
explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, explicit work_stealing_deque(aligned_stack *stack) : stack_{stack},
base_pointer_{0}, base_pointer_{0},
head_{0}, head_{cas_integer{}},
tail_{0}, tail_{cas_integer{}},
previous_tail_{0} { previous_tail_{cas_integer{}} {
reset_base_pointer(); reset_base_pointer();
} }
work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_}, work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_},
...@@ -94,144 +84,25 @@ class work_stealing_deque { ...@@ -94,144 +84,25 @@ class work_stealing_deque {
tail_{other.tail_.load()}, tail_{other.tail_.load()},
previous_tail_{other.previous_tail_} {} previous_tail_{other.previous_tail_} {}
void reset_base_pointer() { void reset_base_pointer();
base_pointer_ = reinterpret_cast<pointer_t >(stack_->save_state()); // Keep the base of our region in the stack work_stealing_deque_item *item_at(unsigned long offset);
} unsigned long current_stack_offset();
work_stealing_deque_item *item_at(cas_integer position) {
return reinterpret_cast<work_stealing_deque_item *>(base_pointer_
+ (base::system_details::CACHE_LINE_SIZE * position));
}
cas_integer current_stack_offset() {
return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
}
template<typename T> template<typename T>
std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item) { std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item);
// 'Union' type to push both on stack
using pair_t = std::pair<work_stealing_deque_item, T>;
// Allocate space on stack
auto new_pair = reinterpret_cast<pair_t *>(stack_->push<pair_t>());
// Initialize memory on stack
new((void *) &(new_pair->first)) work_stealing_deque_item();
new((void *) &(new_pair->second)) T(new_item);
return new_pair;
}
template<typename T> template<typename T>
Item *push_tail(const T &new_item) { Item *push_tail(const T &new_item);
cas_integer local_tail = tail_; Item *pop_tail();
Item *pop_head();
auto new_pair = allocate_item(new_item);
// Prepare current tail to point to correct next items
auto tail_deque_item = item_at(local_tail);
tail_deque_item->set_data(&(new_pair->second));
tail_deque_item->set_next_item(current_stack_offset());
tail_deque_item->set_previous_item(previous_tail_);
previous_tail_ = local_tail;
// Linearization point, item appears after this write
cas_integer new_tail = current_stack_offset();
tail_ = new_tail;
return &(new_pair->second);
}
Item *pop_tail() {
cas_integer local_tail = tail_;
cas_integer local_head = head_;
if (local_tail <= get_offset(local_head)) {
return nullptr; // EMPTY
}
work_stealing_deque_item *previous_tail_item = item_at(previous_tail_);
cas_integer new_tail = previous_tail_;
previous_tail_ = previous_tail_item->previous_item();
// Publish our wish to set the tail back
tail_ = new_tail;
// Get the state of local head AFTER we published our wish
local_head = head_; // Linearization point, outside knows list is empty
if (get_offset(local_head) < new_tail) {
return previous_tail_item->data<Item>(); // Success, enough distance to other threads
}
if (get_offset(local_head) == new_tail) {
cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1);
// Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) {
return previous_tail_item->data<Item>(); // SUCCESS, we won the competition with other threads
}
}
// Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_
tail_ = get_offset(local_head); // ...we give up to the other winning thread
return nullptr; // EMPTY, we lost the competition with other threads
}
Item *pop_head() {
cas_integer local_head = head_;
cas_integer local_tail = tail_;
if (local_tail <= get_offset(local_head)) {
return nullptr; // EMPTY
}
// Load info on current deque item.
// In case we have a race with a new (aba) overwritten item at this position,
// there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head));
cas_integer next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>();
// We try to set the head to this new position.
// Possible outcomes:
// 1) no one interrupted us, we win this competition
// 2) other thread took the head, we lose to this
// 3) owning thread removed tail, we lose to this
cas_integer new_head = set_stamp(next_item_offset, get_stamp(local_head) + 1);
if (head_.compare_exchange_strong(local_head, new_head)) {
return head_data_item; // SUCCESS, we won the competition
}
return nullptr; // EMPTY, we lost the competition
}
void release_memory_until(state state) {
cas_integer item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
cas_integer local_head = head_; void release_memory_until(state state);
cas_integer local_tail = tail_; state save_state();
stack_->reset_state(state);
if (item_offset < local_tail) {
tail_ = item_offset;
if (get_offset(local_head) >= local_tail) {
head_ = set_stamp(item_offset, get_stamp(local_head) + 1);
}
}
}
void release_memory_until(Item *item) {
release_memory_until(reinterpret_cast<pointer_t >(item));
}
state save_state() {
return stack_->save_state();
}
}; };
} }
} }
} }
#include "work_stealing_deque_impl.h"
#endif //PLS_WORK_STEALING_DEQUE_H_ #endif //PLS_WORK_STEALING_DEQUE_H_
#ifndef PLS_WORK_STEALING_DEQUE_IMPL_H_
#define PLS_WORK_STEALING_DEQUE_IMPL_H_
namespace pls {
namespace internal {
namespace data_structures {
template<typename Item>
void work_stealing_deque<Item>::reset_base_pointer() {
base_pointer_ = reinterpret_cast<pointer_t >(stack_->save_state()); // Keep the base of our region in the stack
}
template<typename Item>
work_stealing_deque_item *work_stealing_deque<Item>::item_at(unsigned long offset) {
return reinterpret_cast<work_stealing_deque_item *>(base_pointer_
+ (base::system_details::CACHE_LINE_SIZE * offset));
}
template<typename Item>
unsigned long work_stealing_deque<Item>::current_stack_offset() {
return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
}
template<typename Item>
template<typename T>
std::pair<work_stealing_deque_item, T> *work_stealing_deque<Item>::allocate_item(const T &new_item) {
// 'Union' type to push both on stack
using pair_t = std::pair<work_stealing_deque_item, T>;
// Allocate space on stack
auto new_pair = reinterpret_cast<pair_t *>(stack_->push<pair_t>());
// Initialize memory on stack
new((void *) &(new_pair->first)) work_stealing_deque_item();
new((void *) &(new_pair->second)) T(new_item);
return new_pair;
}
template<typename Item>
template<typename T>
Item *work_stealing_deque<Item>::push_tail(const T &new_item) {
cas_integer local_tail = tail_;
auto new_pair = allocate_item(new_item);
// Prepare current tail to point to correct next items
auto tail_deque_item = item_at(get_offset(local_tail));
tail_deque_item->set_data(&(new_pair->second));
tail_deque_item->set_next_item(current_stack_offset());
tail_deque_item->set_previous_item(get_offset(previous_tail_));
previous_tail_ = local_tail;
// Linearization point, item appears after this write
cas_integer new_tail = cas_integer{0, current_stack_offset()};
tail_ = new_tail;
return &(new_pair->second);
}
template<typename Item>
Item *work_stealing_deque<Item>::pop_tail() {
cas_integer local_tail = tail_;
cas_integer local_head = head_;
if (get_offset(local_tail) <= get_offset(local_head)) {
return nullptr; // EMPTY
}
work_stealing_deque_item *previous_tail_item = item_at(get_offset(previous_tail_));
cas_integer new_tail = previous_tail_;
previous_tail_ = cas_integer{0, previous_tail_item->previous_item()};
// Publish our wish to set the tail back
tail_ = new_tail;
// Get the state of local head AFTER we published our wish
local_head = head_; // Linearization point, outside knows list is empty
if (get_offset(local_head) < get_offset(new_tail)) {
return previous_tail_item->data<Item>(); // Success, enough distance to other threads
}
if (get_offset(local_head) == get_offset(new_tail)) {
cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1);
// Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) {
return previous_tail_item->data<Item>(); // SUCCESS, we won the competition with other threads
}
}
// Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_
tail_ = cas_integer{0, get_offset(local_head)}; // ...we give up to the other winning thread
return nullptr; // EMPTY, we lost the competition with other threads
}
template<typename Item>
Item *work_stealing_deque<Item>::pop_head() {
cas_integer local_head = head_;
cas_integer local_tail = tail_;
if (get_offset(local_tail) <= get_offset(local_head)) {
return nullptr; // EMPTY
}
// Load info on current deque item.
// In case we have a race with a new (aba) overwritten item at this position,
// there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head));
unsigned long next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>();
// We try to set the head to this new position.
// Possible outcomes:
// 1) no one interrupted us, we win this competition
// 2) other thread took the head, we lose to this
// 3) owning thread removed tail, we lose to this
cas_integer new_head = cas_integer{get_stamp(local_head) + 1, next_item_offset};
if (head_.compare_exchange_strong(local_head, new_head)) {
return head_data_item; // SUCCESS, we won the competition
}
return nullptr; // EMPTY, we lost the competition
}
template<typename Item>
void work_stealing_deque<Item>::release_memory_until(state state) {
unsigned long item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
cas_integer local_head = head_;
cas_integer local_tail = tail_;
stack_->reset_state(state);
if (item_offset < get_offset(local_tail)) {
tail_ = cas_integer{0, item_offset};
if (get_offset(local_head) >= get_offset(local_tail)) {
head_ = cas_integer{get_stamp(local_head) + 1, item_offset};
}
}
}
template<typename Item>
typename work_stealing_deque<Item>::state work_stealing_deque<Item>::save_state() {
return stack_->save_state();
}
}
}
}
#endif //PLS_WORK_STEALING_DEQUE_IMPL_H_
#ifndef PLS_SPLIT_INTEGER_H_
#define PLS_SPLIT_INTEGER_H_
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace helpers {
template<int L, int R>
struct split_integer {
unsigned long left:L;
unsigned long right:R;
split_integer() : left{0}, right{0} {};
split_integer(unsigned long new_left, unsigned long new_right) : left{new_left}, right{new_right} {};
};
}
}
}
#endif //PLS_SPLIT_CAS_INTEGER_H_
...@@ -201,11 +201,11 @@ TEST_CASE("work stealing deque stores objects correctly", "[internal/data_struct ...@@ -201,11 +201,11 @@ TEST_CASE("work stealing deque stores objects correctly", "[internal/data_struct
SECTION("handles stack reset 1 correctly when emptied by tail") { SECTION("handles stack reset 1 correctly when emptied by tail") {
deque.push_tail(one); deque.push_tail(one);
auto state = deque.save_state();
deque.push_tail(two); deque.push_tail(two);
auto tmp_result = deque.pop_tail(); REQUIRE(*deque.pop_tail() == two);
REQUIRE(*tmp_result == two);
deque.release_memory_until(tmp_result); deque.release_memory_until(state);
REQUIRE(*deque.pop_tail() == one); REQUIRE(*deque.pop_tail() == one);
deque.push_tail(three); deque.push_tail(three);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment