diff --git a/app/invoke_parallel/main.cpp b/app/invoke_parallel/main.cpp index c9dd5a7..1d5e32e 100644 --- a/app/invoke_parallel/main.cpp +++ b/app/invoke_parallel/main.cpp @@ -6,7 +6,6 @@ #include static constexpr int CUTOFF = 16; -static constexpr int NUM_ITERATIONS = 1000; static constexpr int INPUT_SIZE = 2064; typedef std::vector> complex_vector; diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h index ec0a743..29fa12a 100644 --- a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h @@ -12,24 +12,26 @@ #include "aligned_stack.h" +//#define LOCK_FREE_DEBUG_PRINT + namespace pls { namespace internal { namespace data_structures { using cas_integer = base::system_details::cas_integer; using pointer_t = base::system_details::pointer_t; -static cas_integer get_jump_wish(cas_integer n) { +static cas_integer get_stamp(cas_integer n) { return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8); } static cas_integer get_offset(cas_integer n) { return n & base::system_details::SECOND_HALF_CAS_INTEGER; } -static cas_integer set_jump_wish(cas_integer n, cas_integer new_value) { +static cas_integer set_stamp(cas_integer n, cas_integer new_value) { return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER); } -static cas_integer set_offset(cas_integer n, cas_integer new_value) { - return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER); -} +//static cas_integer set_offset(cas_integer n, cas_integer new_value) { +// return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER); +//} class work_stealing_deque_item { // Pointer to the actual data @@ -84,7 +86,11 @@ class work_stealing_deque { public: using state = aligned_stack::state; - explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, head_{0}, tail_{0}, previous_tail_{0} { + explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, + head_{0}, + tail_{0}, + previous_tail_{0}, + base_pointer_{0} { reset_base_pointer(); } work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_}, @@ -121,12 +127,12 @@ class work_stealing_deque { template Item *push_tail(const T &new_item) { -// std::lock_guard lock{lock_}; cas_integer local_tail = tail_; cas_integer local_head = head_; -// PLS_ASSERT((local_tail >= get_offset(local_head)), "Tail MUST be in front of head!") - auto new_pair = allocate_item(new_item); + PLS_ASSERT((local_tail >= get_offset(local_head)), "Tail MUST be in front of head!") + + auto new_pair = allocate_item(new_item); // Prepare current tail to point to correct next items auto tail_deque_item = item_at(local_tail); tail_deque_item->set_data(&(new_pair->second)); @@ -137,15 +143,18 @@ class work_stealing_deque { // Linearization point, item appears after this write cas_integer new_tail = current_stack_offset(); tail_ = new_tail; -// { -// std::lock_guard lock{lock_}; -// std::cout << base::this_thread::state()->id_ << " - " -// << "Pushed Tail " << local_tail << "->" << new_tail << std::endl; -// } +#ifdef LOCK_FREE_DEBUG_PRINT + { + std::lock_guard lock{lock_}; + std::cout << base::this_thread::state()->id_ << " - " + << "Pushed Tail " << local_tail << "->" << new_tail << std::endl; + } +#endif + + return &(new_pair->second); } Item *pop_tail() { -// std::lock_guard lock{lock_}; cas_integer local_tail = tail_; cas_integer local_head = head_; @@ -163,124 +172,112 @@ class work_stealing_deque { local_head = head_; // Linearization point, outside knows list is empty if (get_offset(local_head) < new_tail) { -// { -// std::lock_guard lock{lock_}; -// std::cout << base::this_thread::state()->id_ << " - " -// << "Poped Tail (distance) " << local_tail << "->" << new_tail << std::endl; -// } - return previous_tail_item->data(); // Enough distance, return item +#ifdef LOCK_FREE_DEBUG_PRINT + { + std::lock_guard lock{lock_}; + std::cout << base::this_thread::state()->id_ << " - " + << "Poped Tail (distance) " << local_tail << "->" << new_tail << std::endl; + } +#endif + return previous_tail_item->data(); // Success, enough distance to other threads } - cas_integer new_head = set_jump_wish(new_tail, 999999); if (get_offset(local_head) == new_tail) { - // Try competing with consumers... + cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1); + // Try competing with consumers by updating the head's stamp value if (head_.compare_exchange_strong(local_head, new_head)) { -// { -// std::lock_guard lock{lock_}; -// std::cout << base::this_thread::state()->id_ << " - " -// << "Poped Tail (won competition 1) " << local_tail << "->" << new_tail << std::endl; -// } - return previous_tail_item->data(); // We won the competition, linearization on whom got the item - } - // Cosumer either registered jump wish or has gotten the item. - // Local_Head has the new value of the head, see if the other thread got to advance it - // and if not (only jump wish) try to win the competition. - if (get_offset(local_head) == new_tail && head_.compare_exchange_strong(local_head, new_head)) { -// { -// std::lock_guard lock{lock_}; -// std::cout << base::this_thread::state()->id_ << " - " -// << "Poped Tail (won competition 2) " << local_tail << "->" << new_tail << std::endl; -// } - return previous_tail_item->data(); // We won the competition, linearization on whom got the item +#ifdef LOCK_FREE_DEBUG_PRINT + { + std::lock_guard lock{lock_}; + std::cout << base::this_thread::state()->id_ << " - " + << "Poped Tail (won competition 1) " << local_tail << "->" << new_tail << std::endl; + } +#endif + return previous_tail_item->data(); // SUCCESS, we won the competition with other threads } } -// { -// std::lock_guard lock{lock_}; -// std::cout << base::this_thread::state()->id_ << " - " -// << "FAILED to pop tail (lost competition) " << get_offset(local_head) << "; " << local_tail << "->" -// << new_tail << std::endl; -// } + +#ifdef LOCK_FREE_DEBUG_PRINT + { + std::lock_guard lock{lock_}; + std::cout << base::this_thread::state()->id_ << " - " + << "FAILED to pop tail (lost competition) " << get_offset(local_head) << "; " << local_tail << "->" + << new_tail << std::endl; + } +#endif // Some other thread either won the competition or it already set the head further than we are // before we even tried to compete with it. // Reset the queue into an empty state => head_ = tail_ - // We can not set it to 0, as the memory is still in use. - tail_ = get_offset(local_head); // Set tail to match the head value the other thread won the battle of + tail_ = get_offset(local_head); // ...we give up to the other winning thread - return nullptr; + return nullptr; // EMPTY, we lost the competition with other threads } Item *pop_head() { -// std::lock_guard lock{lock_}; - cas_integer local_tail = tail_; cas_integer local_head = head_; - cas_integer local_head_offset = get_offset(local_head); + cas_integer local_tail = tail_; - if (local_head_offset >= local_tail) { + if (local_tail <= get_offset(local_head)) { return nullptr; // EMPTY } - work_stealing_deque_item *head_deque_item = item_at(local_head_offset); + // Load info on current deque item. + // In case we have a race with a new (aba) overwritten item at this position, + // there has to be a competition over the tail -> the stamp increased and our next + // operation will fail anyways! + work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head)); cas_integer next_item_offset = head_deque_item->next_item(); Item *head_data_item = head_deque_item->data(); - cas_integer jump_wish_head = set_jump_wish(local_head_offset, head_deque_item->next_item()); - if (!head_.compare_exchange_strong(local_head, jump_wish_head)) { -// { -// std::lock_guard lock{lock_}; -// std::cout << base::this_thread::state()->id_ << " - " -// << "Failed to pop head (first cas) " << local_head_offset << "->" << next_item_offset << std::endl; -// } - return nullptr; // Someone interrupted us + // We try to set the head to this new position. + // Possible outcomes: + // 1) no one interrupted us, we win this competition + // 2) other thread took the head, we lose to this + // 3) owning thread removed tail, we lose to this + cas_integer new_head = set_stamp(next_item_offset, get_stamp(local_head) + 1); + if (head_.compare_exchange_strong(local_head, new_head)) { +#ifdef LOCK_FREE_DEBUG_PRINT + { + std::lock_guard lock{lock_}; + std::cout << base::this_thread::state()->id_ << " - " + << "Popped Head " << get_offset(local_head) << "->" << next_item_offset << std::endl; + } +#endif + return head_data_item; // SUCCESS, we won the competition } - - local_tail = tail_; - if (local_head_offset >= local_tail) { -// std::cout << "Failed to pop head (second tail test) " << get_offset(local_head) << std::endl; - return nullptr; // EMPTY, tail was removed while we registered our jump wish +#ifdef LOCK_FREE_DEBUG_PRINT + { + std::lock_guard lock{lock_}; + std::cout << base::this_thread::state()->id_ << " - " + << "Failed to pop head " << get_offset(local_head) << "->" << next_item_offset << std::endl; } +#endif + return nullptr; // EMPTY, we lost the competition - cas_integer new_head = next_item_offset; - if (!head_.compare_exchange_strong(jump_wish_head, new_head)) { -// { -// std::lock_guard lock{lock_}; -// std::cout << base::this_thread::state()->id_ << " - " -// << "Failed to pop head (second cas) " << local_head_offset << "->" << next_item_offset << std::endl; -// } - return nullptr; // we lost the 'fight' on the item... - } - -// { -// std::lock_guard lock{lock_}; -// std::cout << base::this_thread::state()->id_ << " - " -// << "Popped Head " << local_head_offset << "->" << next_item_offset << std::endl; -// } - return head_deque_item->data(); // We won the 'fight' on the item, it is now save to access it! } void release_memory_until(state state) { -// std::lock_guard lock{lock_}; - cas_integer - item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE; + cas_integer item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE; cas_integer local_head = head_; cas_integer local_tail = tail_; -// if (local_tail != item_offset) { -// std::cout << "..."; -// } else { -// std::cout << "..."; -// } - stack_->reset_state(state); if (item_offset < local_tail) { tail_ = item_offset; if (get_offset(local_head) >= local_tail) { - head_ = item_offset; + head_ = set_stamp(item_offset, get_stamp(local_head) + 1); } } -// std::cout << "Release Memory " << item_offset << std::endl; +#ifdef LOCK_FREE_DEBUG_PRINT + { + std::lock_guard lock{lock_}; + std::cout << base::this_thread::state()->id_ << " - " + << "Release Memory " << item_offset << std::endl; + } +#endif } void release_memory_until(Item *item) { @@ -290,14 +287,6 @@ class work_stealing_deque { state save_state() { return stack_->save_state(); } - - // PUSH item onto stack (allocate + insert into stack) - CHECK - // POP item from bottom of stack (remove from stack, memory still used) - CHECK - // POP item from top of stack (remove from stack, memory still used) - CHECK - - // RELEASE memory from all items allocated after this one (including this one) - CHECK - // -> Tell the data structure that it is safe to reuse the stack space - // Note: Item that is released must not be part of the queue at this point (it is already removed!) }; } diff --git a/lib/pls/src/internal/scheduling/fork_join_task.cpp b/lib/pls/src/internal/scheduling/fork_join_task.cpp index 7e879ec..c7c46d2 100644 --- a/lib/pls/src/internal/scheduling/fork_join_task.cpp +++ b/lib/pls/src/internal/scheduling/fork_join_task.cpp @@ -23,7 +23,6 @@ void fork_join_sub_task::execute() { PROFILE_WORK_BLOCK("execute sub_task") tbb_task_->currently_executing_ = this; if (executed) { - int my_id = base::this_thread::state()->id_; PLS_ERROR("Double Execution!") } executed = true; @@ -64,9 +63,7 @@ fork_join_sub_task *fork_join_task::get_local_sub_task() { } fork_join_sub_task *fork_join_task::get_stolen_sub_task() { - auto tmp = deque_.save_state(); - auto result = deque_.pop_head(); - return result; + return deque_.pop_head(); } bool fork_join_task::internal_stealing(abstract_task *other_task) {