diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h b/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h index 6ed238c..bd4eb03 100644 --- a/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h +++ b/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h @@ -38,7 +38,7 @@ class external_trading_deque { external_trading_deque(unsigned thread_id, size_t num_entries) : thread_id_(thread_id), entries_(num_entries) {} static traded_cas_field peek_traded_object(task *target_task); - static task *get_trade_object(task *target_task, traded_cas_field peeked_cas, external_trading_deque &other_deque); + static task *get_trade_object(task *target_task, traded_cas_field peeked_cas); /** * Pushes a task on the bottom of the deque. diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/task.h b/lib/pls/include/pls/internal/scheduling/lock_free/task.h index 3f7c7ac..2d0e3a9 100644 --- a/lib/pls/include/pls/internal/scheduling/lock_free/task.h +++ b/lib/pls/include/pls/internal/scheduling/lock_free/task.h @@ -3,7 +3,7 @@ #define PLS_LOCK_FREE_TASK_H_ #include "pls/internal/scheduling/base_task.h" -#include "pls/internal/data_structures/stamped_split_integer.h" +#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/scheduling/lock_free/traded_cas_field.h" namespace pls::internal::scheduling::lock_free { @@ -19,20 +19,21 @@ struct task : public base_task { base_task(stack_memory, stack_size, depth, thread_id) {} // Additional info for lock-free stealing and resource trading. - std::atomic external_trading_deque_cas_{}; + std::atomic external_trading_deque_cas_{{}}; void push_task_chain(task *spare_task_chain); - void propose_push_task_chain(task *spare_task_chain); - bool accept_proposed(); - bool decline_proposed(); task *pop_task_chain(); + void reset_task_chain(task *expected_content); + + static task *find_task(unsigned id, unsigned depth); private: + std::atomic num_resources_{}; + std::atomic resource_stack_next_{}; // STAMP = CAS stamp, half CAS length (16 or 32 Bit) - // VALUE_1 = Root of the actual stack, indicated by thread ID (8 or 16 Bit) - // VALUE_2 = Proposed element in queue, indicated by thread ID (8 or 16 Bit) - std::atomic resource_stack_root_{{0, 0, 0}}; + // VALUE = Root of the actual stack, indicated by thread ID (16 or 32 Bit) + std::atomic resource_stack_root_{{0, 0}}; }; } diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h b/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h index da17b94..4318b1a 100644 --- a/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h +++ b/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h @@ -4,92 +4,84 @@ #include -#include "pls/internal/base/error_handling.h" +#include "pls/internal/data_structures/stamped_split_integer.h" #include "pls/internal/base/system_details.h" namespace pls::internal::scheduling::lock_free { -struct task; struct traded_cas_field { - static_assert(base::system_details::CACHE_LINE_SIZE >= 4, - "Traded objects must not use their last address bits, as we use them for status flags." - "As traded objects are usually cache aligned, we need big enough cache lines."); - - // Base size of our CAS integer/pointer - static constexpr base::system_details::cas_integer CAS_SIZE = base::system_details::CAS_SIZE; - - // States of the integer (tag indicating current content) - static constexpr base::system_details::cas_integer EMPTY_TAG = 0x0lu; - static constexpr base::system_details::cas_integer STAMP_TAG = 0x1lu; - static constexpr base::system_details::cas_integer TRADE_TAG = 0x2lu; - - // Bitmasks and shifts for cas_integer_, two variants: - // cas_integer_ = traded object | tag - // cas_integer_ = stamp | id | tag - static constexpr base::system_details::cas_integer TAG_SIZE = 2ul; - static constexpr base::system_details::cas_integer TAG_BITS = ~((~0x0ul) << TAG_SIZE); - - static constexpr base::system_details::cas_integer TRADED_OBJECT_SIZE = CAS_SIZE - TAG_SIZE; - static constexpr base::system_details::cas_integer TRADED_OBJECT_SHIFT = TAG_SIZE; - static constexpr base::system_details::cas_integer - TRADE_OBJECT_BITS = ~((~0x0ul) << TRADED_OBJECT_SIZE) << TRADED_OBJECT_SHIFT; - - static constexpr base::system_details::cas_integer ID_SIZE = (CAS_SIZE / 2) - TAG_SIZE; // Half the CAS for the ID - static constexpr base::system_details::cas_integer ID_SHIFT = TAG_SIZE; - static constexpr base::system_details::cas_integer ID_BITS = ~((~0x0ul) << ID_SIZE) << ID_SHIFT; - - static constexpr base::system_details::cas_integer STAMP_SIZE = (CAS_SIZE / 2); // Half the CAS for the STAMP - static constexpr base::system_details::cas_integer STAMP_SHIFT = TAG_SIZE + ID_SIZE; - static constexpr base::system_details::cas_integer STAMP_BITS = ~((~0x0ul) << STAMP_SIZE) << STAMP_SHIFT; - public: - void fill_with_stamp_and_deque(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) { - cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | STAMP_TAG); + // A traded cas field should always go through a specific state space. + // The field is initially filled with a request to trade the task. + // If this request is not met, the field is emptied out, keeping the stamp. + // If this request is met, the field is filled with an traded in task. + // Next, the traded in task is taken out of the field, leaving it empty, keeping the stamp. + // + // After this, the field can be re-used for trading at a different point. + traded_cas_field() : stamp_{0}, trade_request_thread_id_{0}, traded_task_id_{0}, non_empty_flag_{0} {}; + + void fill_with_trade_request(unsigned long stamp, unsigned long trading_thread_id) { + PLS_ASSERT(is_empty(), + "traded_cas_field must follow state transitions (to trade request)"); + + non_empty_flag_ = 1; + stamp_ = stamp; + trade_request_thread_id_ = trading_thread_id + 1; + traded_task_id_ = 0; } - void fill_with_stamp_and_empty(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) { - cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | EMPTY_TAG); - PLS_ASSERT(is_empty(), "Must be empty after filling it empty..."); + void make_empty() { + PLS_ASSERT(is_filled_with_object() || is_filled_with_trade_request(), + "traded_cas_field must follow state transitions (to empty)"); + + non_empty_flag_ = 0; } + void fill_with_task(unsigned long task_id) { + PLS_ASSERT(is_filled_with_trade_request(), + "traded_cas_field must follow state transitions (to task)"); - [[nodiscard]] base::system_details::cas_integer get_stamp() const { - PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); - return (((base::system_details::cas_integer) cas_integer_) & STAMP_BITS) >> STAMP_SHIFT; + traded_task_id_ = task_id + 1; } - [[nodiscard]] base::system_details::cas_integer get_deque_id() const { - PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); - return (((base::system_details::cas_integer) cas_integer_) & ID_BITS) >> ID_SHIFT; + + [[nodiscard]] unsigned long get_stamp() const { + return stamp_; } - [[nodiscard]] bool is_filled_with_stamp() const { - return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == STAMP_TAG; + [[nodiscard]] unsigned long get_trade_request_thread_id() const { + PLS_ASSERT(is_filled_with_trade_request() || is_filled_with_object(), + "Must only read out the tag when the traded field contains one."); + return trade_request_thread_id_ - 1; } - void fill_with_trade_object(task *new_task) { - PLS_ASSERT((((base::system_details::cas_integer) new_task) & TAG_BITS) == 0, - "Must only store aligned objects in this data structure (last bits are needed for tag bit)"); - cas_integer_ = (((base::system_details::cas_integer) new_task) | TRADE_TAG); - } - [[nodiscard]] task *get_trade_object() const { + [[nodiscard]] unsigned long get_task_id() const { PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one."); - return reinterpret_cast(((base::system_details::cas_integer) cas_integer_) & TRADE_OBJECT_BITS); + return traded_task_id_ - 1; + } + + [[nodiscard]] bool is_filled_with_trade_request() const { + return non_empty_flag_ && trade_request_thread_id_ && !traded_task_id_; } [[nodiscard]] bool is_filled_with_object() const { - return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == TRADE_TAG; + return non_empty_flag_ && traded_task_id_; } - [[nodiscard]] bool is_empty() const { - return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == EMPTY_TAG; + return !non_empty_flag_; } bool operator==(const traded_cas_field &other) const { - return this->cas_integer_ == other.cas_integer_; + return this->stamp_ == other.stamp_ && this->traded_task_id_ == other.traded_task_id_ + && this->trade_request_thread_id_ == other.trade_request_thread_id_ + && this->non_empty_flag_ == other.non_empty_flag_; } bool operator!=(const traded_cas_field &other) const { return !((*this) == other); } private: - base::system_details::cas_integer cas_integer_{}; + base::system_details::cas_integer stamp_: base::system_details::CAS_SIZE / 2; + base::system_details::cas_integer trade_request_thread_id_: base::system_details::CAS_SIZE / 4; + base::system_details::cas_integer traded_task_id_: base::system_details::CAS_SIZE / 4 - 1; + base::system_details::cas_integer non_empty_flag_: 1; }; +static_assert(sizeof(traded_cas_field) * 8 == base::system_details::CAS_SIZE, "Must only have lock free CAS objects"); } diff --git a/lib/pls/src/internal/base/error_handling.cpp b/lib/pls/src/internal/base/error_handling.cpp index 4243a13..cad8c27 100644 --- a/lib/pls/src/internal/base/error_handling.cpp +++ b/lib/pls/src/internal/base/error_handling.cpp @@ -1,5 +1,9 @@ #include "pls/internal/base/error_handling.h" +#include +#include void pls_error(const char *msg) { + using namespace std::chrono_literals; + std::this_thread::sleep_for(5s); PLS_ERROR(msg); } diff --git a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp index f5a9a3d..baba64c 100644 --- a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp @@ -1,5 +1,6 @@ #include "pls/internal/scheduling/lock_free/external_trading_deque.h" #include "pls/internal/scheduling/lock_free/traded_cas_field.h" +#include "pls/internal/scheduling/lock_free/task.h" namespace pls::internal::scheduling::lock_free { @@ -9,14 +10,15 @@ traded_cas_field external_trading_deque::peek_traded_object(task *target_task) { } task *external_trading_deque::get_trade_object(task *target_task, - traded_cas_field peeked_cas, - external_trading_deque &other_deque) { + traded_cas_field peeked_cas) { traded_cas_field current_cas = peeked_cas; if (current_cas.is_filled_with_object()) { - task *result = current_cas.get_trade_object(); - traded_cas_field empty_cas; - empty_cas.fill_with_stamp_and_empty(other_deque.bot_internal_.stamp_, other_deque.thread_id_); + auto result_id = current_cas.get_task_id(); + + traded_cas_field empty_cas = peeked_cas; + empty_cas.make_empty(); if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { + task *result = task::find_task(result_id, target_task->depth_); return result; } } @@ -35,7 +37,7 @@ void external_trading_deque::push_bot(task *published_task) { // Field that all threads synchronize on. // This happens not in the deque itself, but in the published task. traded_cas_field sync_cas_field; - sync_cas_field.fill_with_stamp_and_deque(expected_stamp, thread_id_); + sync_cas_field.fill_with_trade_request(expected_stamp, thread_id_); published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); // Advance the bot pointer. Linearization point for making the task public. @@ -45,6 +47,19 @@ void external_trading_deque::push_bot(task *published_task) { } void external_trading_deque::reset_bot_and_top() { + for (int i = bot_internal_.value_; i >= 0; i--) { + auto ¤t_entry = entries_[i]; + + auto *task = current_entry.traded_task_.load(std::memory_order_relaxed); + auto task_cas = task->external_trading_deque_cas_.load(); + if (task_cas.is_filled_with_trade_request() && task_cas.get_trade_request_thread_id() == thread_id_) { + PLS_ASSERT(false, "Must not have 'non stolen' tasks left in own task chain!"); + } + + current_entry.traded_task_.store(nullptr, std::memory_order_relaxed); + current_entry.forwarding_stamp_.store(0, std::memory_order_relaxed); + } + bot_internal_.value_ = 0; bot_internal_.stamp_++; @@ -53,29 +68,34 @@ void external_trading_deque::reset_bot_and_top() { } task *external_trading_deque::pop_bot() { - if (bot_internal_.value_ > 0) { - bot_internal_.value_--; - bot_.store(bot_internal_.value_, std::memory_order_relaxed); - - auto ¤t_entry = entries_[bot_internal_.value_]; - auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); - auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); - - // We know what value must be in the cas field if no other thread stole it. - traded_cas_field expected_sync_cas_field; - expected_sync_cas_field.fill_with_stamp_and_deque(expected_stamp, thread_id_); - traded_cas_field empty_cas_field; - empty_cas_field.fill_with_stamp_and_empty(expected_stamp, thread_id_); - - if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, - empty_cas_field, - std::memory_order_acq_rel)) { - return popped_task; - } + if (bot_internal_.value_ == 0) { + return nullptr; } - reset_bot_and_top(); - return nullptr; + bot_internal_.value_--; + bot_.store(bot_internal_.value_, std::memory_order_relaxed); + + auto ¤t_entry = entries_[bot_internal_.value_]; + auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); + auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); + + // We know what value must be in the cas field if no other thread stole it. + traded_cas_field expected_sync_cas_field; + expected_sync_cas_field.fill_with_trade_request(expected_stamp, thread_id_); + traded_cas_field empty_cas_field = expected_sync_cas_field; + empty_cas_field.make_empty(); + + if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, + empty_cas_field, + std::memory_order_acq_rel)) { + current_entry.traded_task_.store(nullptr, std::memory_order_relaxed); + current_entry.forwarding_stamp_.store(0, std::memory_order_relaxed); + + return popped_task; + } else { + reset_bot_and_top(); + return nullptr; + } } external_trading_deque::peek_result external_trading_deque::peek_top() { @@ -102,28 +122,34 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul task *result = target_entry.traded_task_.load(); unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); + if (result == nullptr) { + return nullptr; + } + if (forwarding_stamp != expected_top.stamp_) { + // ...we failed because the top tag lags behind...try to fix it. + // This means only updating the tag, as this location can still hold data we need. + top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value_}); + return nullptr; + } // Try to get it by CAS with the expected field entry, giving up our offered_task for it traded_cas_field expected_sync_cas_field; - expected_sync_cas_field.fill_with_stamp_and_deque(expected_top.stamp_, thread_id_); + expected_sync_cas_field.fill_with_trade_request(expected_top.stamp_, thread_id_); - traded_cas_field offered_field; - offered_field.fill_with_trade_object(offered_task); + traded_cas_field offered_field = expected_sync_cas_field; + offered_field.fill_with_task(offered_task->thread_id_); if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { // We got it, for sure move the top pointer forward. top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1}); return result; } else { - if (forwarding_stamp != expected_top.stamp_) { - // ...we failed because the top tag lags behind...try to fix it. - // This means only updating the tag, as this location can still hold data we need. - top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value_}); + // TODO: Re-Check this condition for forwading the stamp! Should only happen if another top-stealer took the + // slot that we where interested in! + if (expected_sync_cas_field.is_filled_with_object() && expected_sync_cas_field.get_stamp() == expected_top.stamp_ + && expected_sync_cas_field.get_trade_request_thread_id() == thread_id_) { + top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1}); } - // TODO: Figure out how other tasks can put the stamp forward without race conditions. - // This must be here to ensure the lock-free property. - // For now first leave it out, as it makes fixing the other non-blocking interaction - // on the resource stack harder. return nullptr; } } diff --git a/lib/pls/src/internal/scheduling/lock_free/task.cpp b/lib/pls/src/internal/scheduling/lock_free/task.cpp index 883b18e..c3bf52b 100644 --- a/lib/pls/src/internal/scheduling/lock_free/task.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/task.cpp @@ -3,118 +3,88 @@ namespace pls::internal::scheduling::lock_free { -// TODO: this 'global' lookup hardly bound to the full scheduler to be setup could be reworked for better testing. -static task *find_task(unsigned id, unsigned depth) { +task *task::find_task(unsigned id, unsigned depth) { return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth); } -void task::propose_push_task_chain(task *spare_task_chain) { +void task::push_task_chain(task *spare_task_chain) { + num_resources_++; + PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_, "Makes no sense to push task onto itself, as it is not clean by definition."); PLS_ASSERT(this->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth."); - data_structures::stamped_split_integer current_root; - data_structures::stamped_split_integer target_root; + data_structures::stamped_integer current_root; + data_structures::stamped_integer target_root; + int iteration = 0; do { + iteration++; current_root = this->resource_stack_root_.load(); - PLS_ASSERT(current_root.value_2_ == 0, "Must only propose one push at a time!"); - - // Add it to the current stack state by chaining its next stack task to the current base. - // Popping threads will see this proposed task as if it is the first item in the stack and pop it first, - // making sure that the chain is valid without any further modification when accepting the proposed task. - auto *current_root_task = current_root.value_1_ == 0 ? nullptr : find_task(current_root.value_1_ - 1, this->depth_); - spare_task_chain->resource_stack_next_.store(current_root_task); - target_root.stamp_ = current_root.stamp_ + 1; - target_root.value_1_ = current_root.value_1_; - target_root.value_2_ = spare_task_chain->thread_id_ + 1; - } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); -} - -bool task::accept_proposed() { - data_structures::stamped_split_integer current_root; - data_structures::stamped_split_integer target_root; - do { - current_root = this->resource_stack_root_.load(); - if (current_root.value_2_ == 0) { - return false; // We are done, nothing to accept! + target_root.value_ = spare_task_chain->thread_id_ + 1; + + // TODO: Setting the resource stack next AFTER publishing the task to the CAS field + // is a race, as the resource stack next field can be tampered with. + if (current_root.value_ == 0) { + // Empty, simply push in with no successor. + // We are sure that a spare_task_chain is not in any stack when pushing it. + // Thus, its resource_stack_next_ field must be nullptr. + // TODO: this should be our race? + // TODO: add more checks (see if this is violated AND cas succeeds) + auto *old_value = spare_task_chain->resource_stack_next_.exchange(nullptr); + if (old_value != nullptr) { + printf("Why would this invariant happen?\n"); + } + } else { + // Already an entry. Find it's corresponding task and set it as our successor. + auto *current_root_task = find_task(current_root.value_ - 1, this->depth_); + spare_task_chain->resource_stack_next_.store(current_root_task); } - target_root.stamp_ = current_root.stamp_ + 1; - target_root.value_1_ = current_root.value_2_; - target_root.value_2_ = 0; } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); - - return true; } -bool task::decline_proposed() { - bool proposed_still_there; +void task::reset_task_chain(task *expected_content) { + num_resources_--; - data_structures::stamped_split_integer current_root; - data_structures::stamped_split_integer target_root; - do { - current_root = this->resource_stack_root_.load(); - proposed_still_there = current_root.value_2_ != 0; + data_structures::stamped_integer current_root = this->resource_stack_root_.load(); + PLS_ASSERT(current_root.value_ == expected_content->thread_id_ + 1, + "Must only reset the task chain if we exactly know its state! (current_root.value_)"); - // No need to fetch anything, just delete the proposed item. - target_root.stamp_ = current_root.stamp_ + 1; - target_root.value_1_ = current_root.value_1_; - target_root.value_2_ = 0; - } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); + auto *current_root_task = find_task(current_root.value_ - 1, this->depth_); + if (current_root_task->resource_stack_next_.load(std::memory_order_relaxed) != nullptr) { + printf("This could have been the bug...\n"); + } - return proposed_still_there; -} - -void task::push_task_chain(task *spare_task_chain) { - PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_, - "Makes no sense to push task onto itself, as it is not clean by definition."); - PLS_ASSERT(this->depth_ == spare_task_chain->depth_, - "Must only push tasks with correct depth."); - propose_push_task_chain(spare_task_chain); - accept_proposed(); + data_structures::stamped_integer target_root; + target_root.stamp_ = current_root.stamp_ + 1; + bool success = this->resource_stack_root_.compare_exchange_strong(current_root, target_root); + PLS_ASSERT(success, "Must always succeed in resetting the chain, as we must be the sole one operating on it!"); } task *task::pop_task_chain() { - data_structures::stamped_split_integer current_root; - data_structures::stamped_split_integer target_root; - + data_structures::stamped_integer current_root; + data_structures::stamped_integer target_root; task *output_task; do { current_root = this->resource_stack_root_.load(); - - if (current_root.value_2_ == 0) { - if (current_root.value_1_ == 0) { - // No main chain and no proposed element. - return nullptr; - } else { - // No entries in the proposed slot, but fond - // elements in primary stack, try to pop from there. - auto *current_root_task = find_task(current_root.value_1_ - 1, this->depth_); - auto *next_stack_task = current_root_task->resource_stack_next_.load(); - - target_root.stamp_ = current_root.stamp_ + 1; - target_root.value_1_ = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; - target_root.value_2_ = 0; - - output_task = current_root_task; - } + if (current_root.value_ == 0) { + // Empty... + return nullptr; } else { - // We got a proposed element. Treat it as beginning of resource stack by - // popping it instead of the main element chain. - auto *proposed_task = find_task(current_root.value_2_ - 1, this->depth_); + // Found something, try to pop it + auto *current_root_task = find_task(current_root.value_ - 1, this->depth_); + auto *next_stack_task = current_root_task->resource_stack_next_.load(); - // Empty out the proposed slot target_root.stamp_ = current_root.stamp_ + 1; - target_root.value_1_ = current_root.value_1_; - target_root.value_2_ = 0; + target_root.value_ = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; - output_task = proposed_task; + output_task = current_root_task; } } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); - PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains."); + PLS_ASSERT(num_resources_.fetch_add(-1) > 0, "Must only return an task from the chain if there are items!"); output_task->resource_stack_next_.store(nullptr); return output_task; diff --git a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp index cffba80..e95d817 100644 --- a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp @@ -53,6 +53,8 @@ std::tuple task_manager::steal_task(thread_state task *traded_task = static_cast(scheduler::get_trade_task(stolen_task, stealing_state)); base_task *chain_after_stolen_task = traded_task->next_; + // TODO: traded task resource_stack_next_ field is now marked as mine + // perform the actual pop operation task *pop_result_task = deque_.pop_top(traded_task, peek); if (pop_result_task) { @@ -62,31 +64,27 @@ std::tuple task_manager::steal_task(thread_state "We must only steal the task that we peeked at!"); // Update the resource stack associated with the stolen task. - // We first propose the traded task, in case someone took our traded in field directly. -// stolen_task->propose_push_task_chain(traded_task); + // TODO: push only onto task chain if the resource_stack_next_ was still mine + // (otherwise the CAS could have been stolen). + // This makes sure, that we never 'destroy' a task that we do not own by our stack push routine. stolen_task->push_task_chain(traded_task); auto peeked_traded_object = external_trading_deque::peek_traded_object(stolen_task); - task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task, - peeked_traded_object, - stealing_state.get_task_manager().deque_); + task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task, peeked_traded_object); if (optional_exchanged_task) { PLS_ASSERT(optional_exchanged_task == traded_task, "We are currently executing this, no one else can put another task in this field!"); - - // No one took the traded task, push it finally into the stack. -// stolen_task->accept_proposed(); + // TODO: we should also assert that the push worked in this case! } else { // Someone explicitly took the traded task from us, remove it from the stack. -// bool proposed_still_in_stack = stolen_task->decline_proposed(); -// PLS_ASSERT(proposed_still_in_stack, -// "The proposed task was stolen over the CAS field. It is not possible for another consumer to take the proposed task!"); - base_task *popped_task = stolen_task->pop_task_chain(); - PLS_ASSERT(popped_task == traded_task, "Other task must only steal CAS task if deque was empty!"); + // TODO: if the push failed, we do not need to reset anything. + // Otherwise the normal invariant that we seek holds. + stolen_task->reset_task_chain(traded_task); } return std::tuple{stolen_task, chain_after_stolen_task, true}; } else { + // TODO: traded task resource_stack_next_ field is de-marked from being mine return std::tuple{nullptr, nullptr, false}; } } else { @@ -102,6 +100,7 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { auto peeked_task_cas_before = external_trading_deque::peek_traded_object(target_task); task *pop_result = target_task->pop_task_chain(); if (pop_result) { + PLS_ASSERT(scheduler::check_task_chain_backward(*pop_result), "Must only pop proper task chains."); return pop_result; // Got something, so we are simply done here } auto peeked_task_cas_after = external_trading_deque::peek_traded_object(target_task); @@ -110,11 +109,12 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { continue; } - if (peeked_task_cas_after.is_empty() || peeked_task_cas_after.is_filled_with_stamp()) { - if (peeked_task_cas_after.is_filled_with_stamp()) { - printf("what happened!\n"); // TODO: issue to 80% because the stealing threads skip parts of the deque. + if (peeked_task_cas_after.is_empty() || peeked_task_cas_after.is_filled_with_trade_request()) { + if (peeked_task_cas_after.is_filled_with_trade_request()) { + printf("what happened! (%d)\n", base_task->thread_id_); continue; } + // The task was 'stable' during our pop from the stack. // Or in other words: no other thread operated on the task. // We are therefore the last child and do not get a clean task chain. @@ -123,8 +123,7 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { // The task was stable, but has a potential resource attached in its cas field. // Try to get it to not be blocked by the other preempted task. -// task *optional_cas_task = -// external_trading_deque::get_trade_object(target_task, peeked_task_cas_after, this->deque_); +// task *optional_cas_task = external_trading_deque::get_trade_object(target_task, peeked_task_cas_after); // if (optional_cas_task) { // // We got it, thus the other thread has not got it and will remove it from the queue. // return optional_cas_task; diff --git a/test/scheduling_lock_free_tests.cpp b/test/scheduling_lock_free_tests.cpp index b91e0ed..ad42a31 100644 --- a/test/scheduling_lock_free_tests.cpp +++ b/test/scheduling_lock_free_tests.cpp @@ -15,25 +15,25 @@ using namespace pls::internal::scheduling::lock_free; TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/lock_free/traded_cas_field]") { traded_cas_field empty_field; REQUIRE(empty_field.is_empty()); - REQUIRE(!empty_field.is_filled_with_stamp()); + REQUIRE(!empty_field.is_filled_with_trade_request()); REQUIRE(!empty_field.is_filled_with_object()); const int stamp = 42; const int ID = 10; traded_cas_field tag_field; - tag_field.fill_with_stamp_and_deque(stamp, ID); - REQUIRE(tag_field.is_filled_with_stamp()); + tag_field.fill_with_trade_request(stamp, ID); + REQUIRE(tag_field.is_filled_with_trade_request()); REQUIRE(!tag_field.is_empty()); REQUIRE(!tag_field.is_filled_with_object()); REQUIRE(tag_field.get_stamp() == stamp); - REQUIRE(tag_field.get_deque_id() == ID); + REQUIRE(tag_field.get_trade_request_thread_id() == ID); alignas(64) task obj{nullptr, 0, 0, 0}; - traded_cas_field obj_field; - obj_field.fill_with_trade_object(&obj); + traded_cas_field obj_field = tag_field; + obj_field.fill_with_task(obj.thread_id_); REQUIRE(obj_field.is_filled_with_object()); REQUIRE(!obj_field.is_empty()); - REQUIRE(!obj_field.is_filled_with_stamp()); + REQUIRE(!obj_field.is_filled_with_trade_request()); } TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") { @@ -53,53 +53,28 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") { REQUIRE(tasks[0]->pop_task_chain() == nullptr); } - SECTION("propose/pop") { - tasks[0]->propose_push_task_chain(tasks[1]); - REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); - REQUIRE(tasks[0]->pop_task_chain() == nullptr); - } - - SECTION("propose/accept/pop") { - tasks[0]->propose_push_task_chain(tasks[1]); - tasks[0]->accept_proposed(); - - REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); - REQUIRE(tasks[0]->pop_task_chain() == nullptr); - } - - SECTION("propose/decline/pop") { - tasks[0]->propose_push_task_chain(tasks[1]); - tasks[0]->decline_proposed(); - - REQUIRE(tasks[0]->pop_task_chain() == nullptr); - REQUIRE(tasks[0]->pop_task_chain() == nullptr); - } SECTION("propose intertwined normal ops") { tasks[0]->push_task_chain(tasks[1]); - tasks[0]->propose_push_task_chain(tasks[2]); + tasks[0]->push_task_chain(tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); tasks[0]->push_task_chain(tasks[1]); - tasks[0]->propose_push_task_chain(tasks[2]); + tasks[0]->push_task_chain(tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); - tasks[0]->accept_proposed(); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); tasks[0]->push_task_chain(tasks[1]); - tasks[0]->propose_push_task_chain(tasks[2]); - tasks[0]->decline_proposed(); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); REQUIRE(tasks[0]->pop_task_chain() == nullptr); tasks[0]->push_task_chain(tasks[1]); - tasks[0]->propose_push_task_chain(tasks[2]); - tasks[0]->accept_proposed(); + tasks[0]->push_task_chain(tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); @@ -118,63 +93,69 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") { } TEST_CASE("external trading deque", "[internal/scheduling/lock_free/external_trading_deque]") { - external_trading_deque deque_1{1, 16}; - external_trading_deque deque_2{2, 16}; - - task tasks[4] = {{nullptr, 0, 0, 0}, - {nullptr, 0, 1, 0}, - {nullptr, 0, 2, 0}, - {nullptr, 0, 3, 0}}; - - SECTION("basic operations") { - // Must start empty - REQUIRE(!deque_1.pop_bot()); - REQUIRE(!deque_2.pop_bot()); - - // Local push/pop - deque_1.push_bot(&tasks[0]); - REQUIRE(deque_1.pop_bot() == &tasks[0]); - REQUIRE(!deque_1.pop_bot()); - - // Local push, external pop - deque_1.push_bot(&tasks[0]); - auto peek = deque_1.peek_top(); - REQUIRE(deque_1.pop_top(&tasks[1], peek) == &tasks[0]); - auto trade_peek = deque_1.peek_traded_object(&tasks[0]); - REQUIRE(external_trading_deque::get_trade_object(&tasks[0], trade_peek, deque_1) == &tasks[1]); - REQUIRE(!deque_1.pop_top(&tasks[1], peek)); - REQUIRE(!deque_1.pop_bot()); - - // Keeps push/pop order - deque_1.push_bot(&tasks[0]); - deque_1.push_bot(&tasks[1]); - REQUIRE(deque_1.pop_bot() == &tasks[1]); - REQUIRE(deque_1.pop_bot() == &tasks[0]); - REQUIRE(!deque_1.pop_bot()); - - deque_1.push_bot(&tasks[0]); - deque_1.push_bot(&tasks[1]); - auto peek1 = deque_1.peek_top(); - REQUIRE(deque_1.pop_top(&tasks[2], peek1) == &tasks[0]); - auto peek2 = deque_1.peek_top(); - REQUIRE(deque_1.pop_top(&tasks[3], peek2) == &tasks[1]); + // simulate scheduler with four threads and depth 1. We are thread 0. + pls::scheduler scheduler{4, 4, 4096, false}; + pls::internal::scheduling::thread_state::set(&scheduler.thread_state_for(0)); + + task *tasks_1[] = {scheduler.task_manager_for(0).get_task(0), + scheduler.task_manager_for(0).get_task(1), + scheduler.task_manager_for(0).get_task(2), + scheduler.task_manager_for(0).get_task(3)}; + task *tasks_2[] = {scheduler.task_manager_for(1).get_task(0), + scheduler.task_manager_for(1).get_task(1), + scheduler.task_manager_for(1).get_task(2), + scheduler.task_manager_for(1).get_task(3)}; + + auto &thread_state_1 = scheduler.thread_state_for(0); + auto &task_manager_1 = scheduler.thread_state_for(0).get_task_manager(); + auto &thread_state_2 = scheduler.thread_state_for(1); + auto &task_manager_2 = scheduler.thread_state_for(1).get_task_manager(); + + SECTION("Must start empty") { + REQUIRE(!task_manager_1.pop_local_task()); + REQUIRE(!task_manager_1.pop_local_task()); + } + + SECTION("Local push/pop") { + task_manager_1.push_local_task(tasks_1[0]); + REQUIRE(task_manager_1.pop_local_task() == tasks_1[0]); + REQUIRE(!task_manager_1.pop_local_task()); + } + + SECTION("Local push, external pop") { + task_manager_1.push_local_task(tasks_1[0]); + REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == tasks_1[0]); + REQUIRE(task_manager_2.pop_clean_task_chain(tasks_1[0]) == tasks_2[0]); + REQUIRE(task_manager_1.pop_local_task() == nullptr); + } + + SECTION("Keeps push/pop order #1") { + task_manager_1.push_local_task(tasks_1[0]); + task_manager_1.push_local_task(tasks_1[1]); + REQUIRE(task_manager_1.pop_local_task() == tasks_1[1]); + REQUIRE(task_manager_1.pop_local_task() == tasks_1[0]); + REQUIRE(!task_manager_1.pop_local_task()); + } + + SECTION("Keeps push/pop order #2") { + task_manager_1.push_local_task(tasks_1[0]); + task_manager_1.push_local_task(tasks_1[1]); + REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == tasks_1[0]); + REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == tasks_1[1]); } SECTION("Interwined execution #1") { // Two top poppers - deque_1.push_bot(&tasks[0]); - auto peek1 = deque_1.peek_top(); - auto peek2 = deque_1.peek_top(); - REQUIRE(deque_1.pop_top(&tasks[1], peek1) == &tasks[0]); - REQUIRE(!deque_1.pop_top(&tasks[2], peek2)); + task_manager_1.push_local_task(tasks_1[0]); + REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == tasks_1[0]); + REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == nullptr); } SECTION("Interwined execution #2") { // Top and bottom access - deque_1.push_bot(&tasks[0]); - auto peek1 = deque_1.peek_top(); - REQUIRE(deque_1.pop_bot() == &tasks[0]); - REQUIRE(!deque_1.pop_top(&tasks[2], peek1)); + task_manager_1.push_local_task(tasks_1[0]); + REQUIRE(task_manager_1.pop_local_task() == tasks_1[0]); + REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == nullptr); } } #endif // PLS_DEQUE_VARIANT == PLS_DEQUE_LOCK_FREE