Commit 25d18c11 by FritzFlorian

Rework lock-free-work-stealing-deque to be simpler and fit 'classic' approach.

parent bd826491
Pipeline #1160 passed with stages
in 3 minutes 33 seconds
......@@ -6,7 +6,6 @@
#include <vector>
static constexpr int CUTOFF = 16;
static constexpr int NUM_ITERATIONS = 1000;
static constexpr int INPUT_SIZE = 2064;
typedef std::vector<std::complex<double>> complex_vector;
......
......@@ -12,24 +12,26 @@
#include "aligned_stack.h"
//#define LOCK_FREE_DEBUG_PRINT
namespace pls {
namespace internal {
namespace data_structures {
using cas_integer = base::system_details::cas_integer;
using pointer_t = base::system_details::pointer_t;
static cas_integer get_jump_wish(cas_integer n) {
static cas_integer get_stamp(cas_integer n) {
return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8);
}
static cas_integer get_offset(cas_integer n) {
return n & base::system_details::SECOND_HALF_CAS_INTEGER;
}
static cas_integer set_jump_wish(cas_integer n, cas_integer new_value) {
static cas_integer set_stamp(cas_integer n, cas_integer new_value) {
return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER);
}
static cas_integer set_offset(cas_integer n, cas_integer new_value) {
return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER);
}
//static cas_integer set_offset(cas_integer n, cas_integer new_value) {
// return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER);
//}
class work_stealing_deque_item {
// Pointer to the actual data
......@@ -84,7 +86,11 @@ class work_stealing_deque {
public:
using state = aligned_stack::state;
explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, head_{0}, tail_{0}, previous_tail_{0} {
explicit work_stealing_deque(aligned_stack *stack) : stack_{stack},
head_{0},
tail_{0},
previous_tail_{0},
base_pointer_{0} {
reset_base_pointer();
}
work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_},
......@@ -121,12 +127,12 @@ class work_stealing_deque {
template<typename T>
Item *push_tail(const T &new_item) {
// std::lock_guard<base::spin_lock> lock{lock_};
cas_integer local_tail = tail_;
cas_integer local_head = head_;
// PLS_ASSERT((local_tail >= get_offset(local_head)), "Tail MUST be in front of head!")
auto new_pair = allocate_item(new_item);
PLS_ASSERT((local_tail >= get_offset(local_head)), "Tail MUST be in front of head!")
auto new_pair = allocate_item(new_item);
// Prepare current tail to point to correct next items
auto tail_deque_item = item_at(local_tail);
tail_deque_item->set_data(&(new_pair->second));
......@@ -137,15 +143,18 @@ class work_stealing_deque {
// Linearization point, item appears after this write
cas_integer new_tail = current_stack_offset();
tail_ = new_tail;
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Pushed Tail " << local_tail << "->" << new_tail << std::endl;
// }
#ifdef LOCK_FREE_DEBUG_PRINT
{
std::lock_guard<base::spin_lock> lock{lock_};
std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
<< "Pushed Tail " << local_tail << "->" << new_tail << std::endl;
}
#endif
return &(new_pair->second);
}
Item *pop_tail() {
// std::lock_guard<base::spin_lock> lock{lock_};
cas_integer local_tail = tail_;
cas_integer local_head = head_;
......@@ -163,124 +172,112 @@ class work_stealing_deque {
local_head = head_; // Linearization point, outside knows list is empty
if (get_offset(local_head) < new_tail) {
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Poped Tail (distance) " << local_tail << "->" << new_tail << std::endl;
// }
return previous_tail_item->data<Item>(); // Enough distance, return item
#ifdef LOCK_FREE_DEBUG_PRINT
{
std::lock_guard<base::spin_lock> lock{lock_};
std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
<< "Poped Tail (distance) " << local_tail << "->" << new_tail << std::endl;
}
#endif
return previous_tail_item->data<Item>(); // Success, enough distance to other threads
}
cas_integer new_head = set_jump_wish(new_tail, 999999);
if (get_offset(local_head) == new_tail) {
// Try competing with consumers...
cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1);
// Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) {
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Poped Tail (won competition 1) " << local_tail << "->" << new_tail << std::endl;
// }
return previous_tail_item->data<Item>(); // We won the competition, linearization on whom got the item
}
// Cosumer either registered jump wish or has gotten the item.
// Local_Head has the new value of the head, see if the other thread got to advance it
// and if not (only jump wish) try to win the competition.
if (get_offset(local_head) == new_tail && head_.compare_exchange_strong(local_head, new_head)) {
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Poped Tail (won competition 2) " << local_tail << "->" << new_tail << std::endl;
// }
return previous_tail_item->data<Item>(); // We won the competition, linearization on whom got the item
#ifdef LOCK_FREE_DEBUG_PRINT
{
std::lock_guard<base::spin_lock> lock{lock_};
std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
<< "Poped Tail (won competition 1) " << local_tail << "->" << new_tail << std::endl;
}
#endif
return previous_tail_item->data<Item>(); // SUCCESS, we won the competition with other threads
}
}
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "FAILED to pop tail (lost competition) " << get_offset(local_head) << "; " << local_tail << "->"
// << new_tail << std::endl;
// }
#ifdef LOCK_FREE_DEBUG_PRINT
{
std::lock_guard<base::spin_lock> lock{lock_};
std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
<< "FAILED to pop tail (lost competition) " << get_offset(local_head) << "; " << local_tail << "->"
<< new_tail << std::endl;
}
#endif
// Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_
// We can not set it to 0, as the memory is still in use.
tail_ = get_offset(local_head); // Set tail to match the head value the other thread won the battle of
tail_ = get_offset(local_head); // ...we give up to the other winning thread
return nullptr;
return nullptr; // EMPTY, we lost the competition with other threads
}
Item *pop_head() {
// std::lock_guard<base::spin_lock> lock{lock_};
cas_integer local_tail = tail_;
cas_integer local_head = head_;
cas_integer local_head_offset = get_offset(local_head);
cas_integer local_tail = tail_;
if (local_head_offset >= local_tail) {
if (local_tail <= get_offset(local_head)) {
return nullptr; // EMPTY
}
work_stealing_deque_item *head_deque_item = item_at(local_head_offset);
// Load info on current deque item.
// In case we have a race with a new (aba) overwritten item at this position,
// there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head));
cas_integer next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>();
cas_integer jump_wish_head = set_jump_wish(local_head_offset, head_deque_item->next_item());
if (!head_.compare_exchange_strong(local_head, jump_wish_head)) {
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Failed to pop head (first cas) " << local_head_offset << "->" << next_item_offset << std::endl;
// }
return nullptr; // Someone interrupted us
// We try to set the head to this new position.
// Possible outcomes:
// 1) no one interrupted us, we win this competition
// 2) other thread took the head, we lose to this
// 3) owning thread removed tail, we lose to this
cas_integer new_head = set_stamp(next_item_offset, get_stamp(local_head) + 1);
if (head_.compare_exchange_strong(local_head, new_head)) {
#ifdef LOCK_FREE_DEBUG_PRINT
{
std::lock_guard<base::spin_lock> lock{lock_};
std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
<< "Popped Head " << get_offset(local_head) << "->" << next_item_offset << std::endl;
}
#endif
return head_data_item; // SUCCESS, we won the competition
}
local_tail = tail_;
if (local_head_offset >= local_tail) {
// std::cout << "Failed to pop head (second tail test) " << get_offset(local_head) << std::endl;
return nullptr; // EMPTY, tail was removed while we registered our jump wish
#ifdef LOCK_FREE_DEBUG_PRINT
{
std::lock_guard<base::spin_lock> lock{lock_};
std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
<< "Failed to pop head " << get_offset(local_head) << "->" << next_item_offset << std::endl;
}
#endif
return nullptr; // EMPTY, we lost the competition
cas_integer new_head = next_item_offset;
if (!head_.compare_exchange_strong(jump_wish_head, new_head)) {
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Failed to pop head (second cas) " << local_head_offset << "->" << next_item_offset << std::endl;
// }
return nullptr; // we lost the 'fight' on the item...
}
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Popped Head " << local_head_offset << "->" << next_item_offset << std::endl;
// }
return head_deque_item->data<Item>(); // We won the 'fight' on the item, it is now save to access it!
}
void release_memory_until(state state) {
// std::lock_guard<base::spin_lock> lock{lock_};
cas_integer
item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
cas_integer item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
cas_integer local_head = head_;
cas_integer local_tail = tail_;
// if (local_tail != item_offset) {
// std::cout << "...";
// } else {
// std::cout << "...";
// }
stack_->reset_state(state);
if (item_offset < local_tail) {
tail_ = item_offset;
if (get_offset(local_head) >= local_tail) {
head_ = item_offset;
head_ = set_stamp(item_offset, get_stamp(local_head) + 1);
}
}
// std::cout << "Release Memory " << item_offset << std::endl;
#ifdef LOCK_FREE_DEBUG_PRINT
{
std::lock_guard<base::spin_lock> lock{lock_};
std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
<< "Release Memory " << item_offset << std::endl;
}
#endif
}
void release_memory_until(Item *item) {
......@@ -290,14 +287,6 @@ class work_stealing_deque {
state save_state() {
return stack_->save_state();
}
// PUSH item onto stack (allocate + insert into stack) - CHECK
// POP item from bottom of stack (remove from stack, memory still used) - CHECK
// POP item from top of stack (remove from stack, memory still used) - CHECK
// RELEASE memory from all items allocated after this one (including this one) - CHECK
// -> Tell the data structure that it is safe to reuse the stack space
// Note: Item that is released must not be part of the queue at this point (it is already removed!)
};
}
......
......@@ -23,7 +23,6 @@ void fork_join_sub_task::execute() {
PROFILE_WORK_BLOCK("execute sub_task")
tbb_task_->currently_executing_ = this;
if (executed) {
int my_id = base::this_thread::state<thread_state>()->id_;
PLS_ERROR("Double Execution!")
}
executed = true;
......@@ -64,9 +63,7 @@ fork_join_sub_task *fork_join_task::get_local_sub_task() {
}
fork_join_sub_task *fork_join_task::get_stolen_sub_task() {
auto tmp = deque_.save_state();
auto result = deque_.pop_head();
return result;
return deque_.pop_head();
}
bool fork_join_task::internal_stealing(abstract_task *other_task) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment