Commit 0dc56d42 by FritzFlorian

Rework cas_integer interface to be OO instead of functional.

parent d553ed51
Pipeline #1189 passed with stages
in 3 minutes 53 seconds
...@@ -27,7 +27,6 @@ add_library(pls STATIC ...@@ -27,7 +27,6 @@ add_library(pls STATIC
include/pls/internal/helpers/profiler.h include/pls/internal/helpers/profiler.h
include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/mini_benchmark.h
include/pls/internal/helpers/unique_id.h include/pls/internal/helpers/unique_id.h
include/pls/internal/helpers/split_integer.h
include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include "pls/internal/helpers/split_integer.h"
#include "aligned_stack.h" #include "aligned_stack.h"
...@@ -19,21 +18,23 @@ using base::system_details::pointer_t; ...@@ -19,21 +18,23 @@ using base::system_details::pointer_t;
// Integer split into two halfs, can be used in CAS operations // Integer split into two halfs, can be used in CAS operations
constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2; constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2;
using cas_integer = helpers::split_integer<HALF_CACHE_LINE, HALF_CACHE_LINE>; using offset_t = base::system_details::cas_integer;
static unsigned long get_stamp(cas_integer n) { return n.left; } struct stamped_integer {
static unsigned long get_offset(cas_integer n) { return n.right; } offset_t stamp:HALF_CACHE_LINE;
static cas_integer set_stamp(cas_integer n, unsigned long new_value) { offset_t offset:HALF_CACHE_LINE;
n.left = new_value;
return n; stamped_integer() : stamp{0}, offset{0} {};
} stamped_integer(offset_t new_offset) : stamp{0}, offset{new_offset} {};
stamped_integer(offset_t new_stamp, offset_t new_offset) : stamp{new_stamp}, offset{new_offset} {};
};
// Single Item in the deque // Single Item in the deque
class work_stealing_deque_item { class work_stealing_deque_item {
// Pointer to the actual data // Pointer to the actual data
pointer_t data_; pointer_t data_;
// Index (relative to stack base) to the next and previous element // Index (relative to stack base) to the next and previous element
unsigned long next_item_; offset_t next_item_;
unsigned long previous_item_; offset_t previous_item_;
public: public:
work_stealing_deque_item() : data_{0}, next_item_{}, previous_item_{} {} work_stealing_deque_item() : data_{0}, next_item_{}, previous_item_{} {}
...@@ -48,11 +49,11 @@ class work_stealing_deque_item { ...@@ -48,11 +49,11 @@ class work_stealing_deque_item {
data_ = reinterpret_cast<pointer_t >(data); data_ = reinterpret_cast<pointer_t >(data);
} }
unsigned long next_item() const { return next_item_; } offset_t next_item() const { return next_item_; }
void set_next_item(unsigned long next_item) { next_item_ = next_item; } void set_next_item(offset_t next_item) { next_item_ = next_item; }
unsigned long previous_item() const { return previous_item_; } offset_t previous_item() const { return previous_item_; }
void set_previous_item(unsigned long previous_item) { previous_item_ = previous_item; } void set_previous_item(offset_t previous_item) { previous_item_ = previous_item; }
}; };
static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE, static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE,
"Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!"); "Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!");
...@@ -64,18 +65,18 @@ class work_stealing_deque { ...@@ -64,18 +65,18 @@ class work_stealing_deque {
aligned_stack *stack_; aligned_stack *stack_;
pointer_t base_pointer_; pointer_t base_pointer_;
std::atomic<cas_integer> head_; std::atomic<stamped_integer> head_;
std::atomic<cas_integer> tail_; std::atomic<offset_t> tail_;
cas_integer previous_tail_; offset_t previous_tail_;
public: public:
using state = aligned_stack::state; using state = aligned_stack::state;
explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, explicit work_stealing_deque(aligned_stack *stack) : stack_{stack},
base_pointer_{0}, base_pointer_{0},
head_{cas_integer{}}, head_{stamped_integer{0, 0}},
tail_{cas_integer{}}, tail_{0},
previous_tail_{cas_integer{}} { previous_tail_{0} {
reset_base_pointer(); reset_base_pointer();
} }
work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_}, work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_},
...@@ -85,8 +86,8 @@ class work_stealing_deque { ...@@ -85,8 +86,8 @@ class work_stealing_deque {
previous_tail_{other.previous_tail_} {} previous_tail_{other.previous_tail_} {}
void reset_base_pointer(); void reset_base_pointer();
work_stealing_deque_item *item_at(unsigned long offset); work_stealing_deque_item *item_at(offset_t offset);
unsigned long current_stack_offset(); offset_t current_stack_offset();
template<typename T> template<typename T>
std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item); std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item);
......
...@@ -12,13 +12,13 @@ void work_stealing_deque<Item>::reset_base_pointer() { ...@@ -12,13 +12,13 @@ void work_stealing_deque<Item>::reset_base_pointer() {
} }
template<typename Item> template<typename Item>
work_stealing_deque_item *work_stealing_deque<Item>::item_at(unsigned long offset) { work_stealing_deque_item *work_stealing_deque<Item>::item_at(offset_t offset) {
return reinterpret_cast<work_stealing_deque_item *>(base_pointer_ return reinterpret_cast<work_stealing_deque_item *>(base_pointer_
+ (base::system_details::CACHE_LINE_SIZE * offset)); + (base::system_details::CACHE_LINE_SIZE * offset));
} }
template<typename Item> template<typename Item>
unsigned long work_stealing_deque<Item>::current_stack_offset() { offset_t work_stealing_deque<Item>::current_stack_offset() {
return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE; return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
} }
...@@ -39,18 +39,18 @@ std::pair<work_stealing_deque_item, T> *work_stealing_deque<Item>::allocate_item ...@@ -39,18 +39,18 @@ std::pair<work_stealing_deque_item, T> *work_stealing_deque<Item>::allocate_item
template<typename Item> template<typename Item>
template<typename T> template<typename T>
Item *work_stealing_deque<Item>::push_tail(const T &new_item) { Item *work_stealing_deque<Item>::push_tail(const T &new_item) {
cas_integer local_tail = tail_; offset_t local_tail = tail_;
auto new_pair = allocate_item(new_item); auto new_pair = allocate_item(new_item);
// Prepare current tail to point to correct next items // Prepare current tail to point to correct next items
auto tail_deque_item = item_at(get_offset(local_tail)); auto tail_deque_item = item_at(local_tail);
tail_deque_item->set_data(&(new_pair->second)); tail_deque_item->set_data(&(new_pair->second));
tail_deque_item->set_next_item(current_stack_offset()); tail_deque_item->set_next_item(current_stack_offset());
tail_deque_item->set_previous_item(get_offset(previous_tail_)); tail_deque_item->set_previous_item(previous_tail_);
previous_tail_ = local_tail; previous_tail_ = local_tail;
// Linearization point, item appears after this write // Linearization point, item appears after this write
cas_integer new_tail = cas_integer{0, current_stack_offset()}; offset_t new_tail = current_stack_offset();
tail_ = new_tail; tail_ = new_tail;
return &(new_pair->second); return &(new_pair->second);
...@@ -58,28 +58,28 @@ Item *work_stealing_deque<Item>::push_tail(const T &new_item) { ...@@ -58,28 +58,28 @@ Item *work_stealing_deque<Item>::push_tail(const T &new_item) {
template<typename Item> template<typename Item>
Item *work_stealing_deque<Item>::pop_tail() { Item *work_stealing_deque<Item>::pop_tail() {
cas_integer local_tail = tail_; offset_t local_tail = tail_;
cas_integer local_head = head_; stamped_integer local_head = head_;
if (get_offset(local_tail) <= get_offset(local_head)) { if (local_tail <= local_head.offset) {
return nullptr; // EMPTY return nullptr; // EMPTY
} }
work_stealing_deque_item *previous_tail_item = item_at(get_offset(previous_tail_)); work_stealing_deque_item *previous_tail_item = item_at(previous_tail_);
cas_integer new_tail = previous_tail_; offset_t new_tail = previous_tail_;
previous_tail_ = cas_integer{0, previous_tail_item->previous_item()}; previous_tail_ = previous_tail_item->previous_item();
// Publish our wish to set the tail back // Publish our wish to set the tail back
tail_ = new_tail; tail_ = new_tail;
// Get the state of local head AFTER we published our wish // Get the state of local head AFTER we published our wish
local_head = head_; // Linearization point, outside knows list is empty local_head = head_; // Linearization point, outside knows list is empty
if (get_offset(local_head) < get_offset(new_tail)) { if (local_head.offset < new_tail) {
return previous_tail_item->data<Item>(); // Success, enough distance to other threads return previous_tail_item->data<Item>(); // Success, enough distance to other threads
} }
if (get_offset(local_head) == get_offset(new_tail)) { if (local_head.offset == new_tail) {
cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1); stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail};
// Try competing with consumers by updating the head's stamp value // Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) { if (head_.compare_exchange_strong(local_head, new_head)) {
return previous_tail_item->data<Item>(); // SUCCESS, we won the competition with other threads return previous_tail_item->data<Item>(); // SUCCESS, we won the competition with other threads
...@@ -89,25 +89,25 @@ Item *work_stealing_deque<Item>::pop_tail() { ...@@ -89,25 +89,25 @@ Item *work_stealing_deque<Item>::pop_tail() {
// Some other thread either won the competition or it already set the head further than we are // Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it. // before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_ // Reset the queue into an empty state => head_ = tail_
tail_ = cas_integer{0, get_offset(local_head)}; // ...we give up to the other winning thread tail_ = local_head.offset; // ...we give up to the other winning thread
return nullptr; // EMPTY, we lost the competition with other threads return nullptr; // EMPTY, we lost the competition with other threads
} }
template<typename Item> template<typename Item>
Item *work_stealing_deque<Item>::pop_head() { Item *work_stealing_deque<Item>::pop_head() {
cas_integer local_head = head_; stamped_integer local_head = head_;
cas_integer local_tail = tail_; offset_t local_tail = tail_;
if (get_offset(local_tail) <= get_offset(local_head)) { if (local_tail <= local_head.offset) {
return nullptr; // EMPTY return nullptr; // EMPTY
} }
// Load info on current deque item. // Load info on current deque item.
// In case we have a race with a new (aba) overwritten item at this position, // In case we have a race with a new (aba) overwritten item at this position,
// there has to be a competition over the tail -> the stamp increased and our next // there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways! // operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head)); work_stealing_deque_item *head_deque_item = item_at(local_head.offset);
unsigned long next_item_offset = head_deque_item->next_item(); offset_t next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>(); Item *head_data_item = head_deque_item->data<Item>();
// We try to set the head to this new position. // We try to set the head to this new position.
...@@ -115,7 +115,7 @@ Item *work_stealing_deque<Item>::pop_head() { ...@@ -115,7 +115,7 @@ Item *work_stealing_deque<Item>::pop_head() {
// 1) no one interrupted us, we win this competition // 1) no one interrupted us, we win this competition
// 2) other thread took the head, we lose to this // 2) other thread took the head, we lose to this
// 3) owning thread removed tail, we lose to this // 3) owning thread removed tail, we lose to this
cas_integer new_head = cas_integer{get_stamp(local_head) + 1, next_item_offset}; stamped_integer new_head = stamped_integer{local_head.stamp + 1, next_item_offset};
if (head_.compare_exchange_strong(local_head, new_head)) { if (head_.compare_exchange_strong(local_head, new_head)) {
return head_data_item; // SUCCESS, we won the competition return head_data_item; // SUCCESS, we won the competition
} }
...@@ -127,15 +127,15 @@ template<typename Item> ...@@ -127,15 +127,15 @@ template<typename Item>
void work_stealing_deque<Item>::release_memory_until(state state) { void work_stealing_deque<Item>::release_memory_until(state state) {
unsigned long item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE; unsigned long item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
cas_integer local_head = head_; stamped_integer local_head = head_;
cas_integer local_tail = tail_; offset_t local_tail = tail_;
stack_->reset_state(state); stack_->reset_state(state);
if (item_offset < get_offset(local_tail)) { if (item_offset < local_tail) {
tail_ = cas_integer{0, item_offset}; tail_ = item_offset;
if (get_offset(local_head) >= get_offset(local_tail)) { if (local_head.offset >= local_tail) {
head_ = cas_integer{get_stamp(local_head) + 1, item_offset}; head_ = stamped_integer{local_head.stamp + 1, item_offset};
} }
} }
} }
......
#ifndef PLS_SPLIT_INTEGER_H_
#define PLS_SPLIT_INTEGER_H_
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace helpers {
template<int L, int R>
struct split_integer {
unsigned long left:L;
unsigned long right:R;
split_integer() : left{0}, right{0} {};
split_integer(unsigned long new_left, unsigned long new_right) : left{new_left}, right{new_right} {};
};
}
}
}
#endif //PLS_SPLIT_CAS_INTEGER_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment