Commit 98ae70bb by FritzFlorian

Refactor: Pull stamped integer into own file.

parent 0dc56d42
Pipeline #1190 passed with stages
in 3 minutes 47 seconds
...@@ -22,6 +22,7 @@ add_library(pls STATIC ...@@ -22,6 +22,7 @@ add_library(pls STATIC
include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp
include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h
include/pls/internal/data_structures/stamped_integer.h
include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/prohibit_new.h
include/pls/internal/helpers/profiler.h include/pls/internal/helpers/profiler.h
......
#ifndef PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_
#define PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace data_structures {
constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2;
struct stamped_integer {
using member_t = base::system_details::cas_integer;
member_t stamp:HALF_CACHE_LINE;
member_t value:HALF_CACHE_LINE;
stamped_integer() : stamp{0}, value{0} {};
stamped_integer(member_t new_value) : stamp{0}, value{new_value} {};
stamped_integer(member_t new_stamp, member_t new_value) : stamp{new_stamp}, value{new_value} {};
};
}
}
}
#endif //PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_
...@@ -4,9 +4,9 @@ ...@@ -4,9 +4,9 @@
#include <atomic> #include <atomic>
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/base/system_details.h"
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/thread_state.h"
#include "aligned_stack.h" #include "aligned_stack.h"
...@@ -17,16 +17,8 @@ namespace data_structures { ...@@ -17,16 +17,8 @@ namespace data_structures {
using base::system_details::pointer_t; using base::system_details::pointer_t;
// Integer split into two halfs, can be used in CAS operations // Integer split into two halfs, can be used in CAS operations
constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2; using data_structures::stamped_integer;
using offset_t = base::system_details::cas_integer; using offset_t = stamped_integer::member_t;
struct stamped_integer {
offset_t stamp:HALF_CACHE_LINE;
offset_t offset:HALF_CACHE_LINE;
stamped_integer() : stamp{0}, offset{0} {};
stamped_integer(offset_t new_offset) : stamp{0}, offset{new_offset} {};
stamped_integer(offset_t new_stamp, offset_t new_offset) : stamp{new_stamp}, offset{new_offset} {};
};
// Single Item in the deque // Single Item in the deque
class work_stealing_deque_item { class work_stealing_deque_item {
......
...@@ -61,7 +61,7 @@ Item *work_stealing_deque<Item>::pop_tail() { ...@@ -61,7 +61,7 @@ Item *work_stealing_deque<Item>::pop_tail() {
offset_t local_tail = tail_; offset_t local_tail = tail_;
stamped_integer local_head = head_; stamped_integer local_head = head_;
if (local_tail <= local_head.offset) { if (local_tail <= local_head.value) {
return nullptr; // EMPTY return nullptr; // EMPTY
} }
...@@ -74,11 +74,11 @@ Item *work_stealing_deque<Item>::pop_tail() { ...@@ -74,11 +74,11 @@ Item *work_stealing_deque<Item>::pop_tail() {
// Get the state of local head AFTER we published our wish // Get the state of local head AFTER we published our wish
local_head = head_; // Linearization point, outside knows list is empty local_head = head_; // Linearization point, outside knows list is empty
if (local_head.offset < new_tail) { if (local_head.value < new_tail) {
return previous_tail_item->data<Item>(); // Success, enough distance to other threads return previous_tail_item->data<Item>(); // Success, enough distance to other threads
} }
if (local_head.offset == new_tail) { if (local_head.value == new_tail) {
stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail}; stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail};
// Try competing with consumers by updating the head's stamp value // Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) { if (head_.compare_exchange_strong(local_head, new_head)) {
...@@ -89,7 +89,7 @@ Item *work_stealing_deque<Item>::pop_tail() { ...@@ -89,7 +89,7 @@ Item *work_stealing_deque<Item>::pop_tail() {
// Some other thread either won the competition or it already set the head further than we are // Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it. // before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_ // Reset the queue into an empty state => head_ = tail_
tail_ = local_head.offset; // ...we give up to the other winning thread tail_ = local_head.value; // ...we give up to the other winning thread
return nullptr; // EMPTY, we lost the competition with other threads return nullptr; // EMPTY, we lost the competition with other threads
} }
...@@ -99,14 +99,14 @@ Item *work_stealing_deque<Item>::pop_head() { ...@@ -99,14 +99,14 @@ Item *work_stealing_deque<Item>::pop_head() {
stamped_integer local_head = head_; stamped_integer local_head = head_;
offset_t local_tail = tail_; offset_t local_tail = tail_;
if (local_tail <= local_head.offset) { if (local_tail <= local_head.value) {
return nullptr; // EMPTY return nullptr; // EMPTY
} }
// Load info on current deque item. // Load info on current deque item.
// In case we have a race with a new (aba) overwritten item at this position, // In case we have a race with a new (aba) overwritten item at this position,
// there has to be a competition over the tail -> the stamp increased and our next // there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways! // operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(local_head.offset); work_stealing_deque_item *head_deque_item = item_at(local_head.value);
offset_t next_item_offset = head_deque_item->next_item(); offset_t next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>(); Item *head_data_item = head_deque_item->data<Item>();
...@@ -134,7 +134,7 @@ void work_stealing_deque<Item>::release_memory_until(state state) { ...@@ -134,7 +134,7 @@ void work_stealing_deque<Item>::release_memory_until(state state) {
if (item_offset < local_tail) { if (item_offset < local_tail) {
tail_ = item_offset; tail_ = item_offset;
if (local_head.offset >= local_tail) { if (local_head.value >= local_tail) {
head_ = stamped_integer{local_head.stamp + 1, item_offset}; head_ = stamped_integer{local_head.stamp + 1, item_offset};
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment