#include "pls/internal/scheduling/lock_free/external_trading_deque.h" #include "pls/internal/scheduling/lock_free/traded_cas_field.h" #include "pls/internal/scheduling/lock_free/task.h" namespace pls::internal::scheduling::lock_free { traded_cas_field external_trading_deque::peek_traded_object(task *target_task) { traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(std::memory_order_relaxed); return current_cas; } task *external_trading_deque::get_trade_object(task *target_task, traded_cas_field peeked_cas) { traded_cas_field current_cas = peeked_cas; if (current_cas.is_filled_with_object()) { auto result_id = current_cas.get_task_id(); traded_cas_field empty_cas = peeked_cas; empty_cas.make_empty(); if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas, std::memory_order_acq_rel)) { task *result = task::find_task(result_id, target_task->depth_); return result; } } return nullptr; } void external_trading_deque::push_bot(task *published_task) { auto expected_stamp = bot_internal_.stamp_; auto ¤t_entry = entries_[bot_internal_.value_]; // Publish the prepared task in the deque. current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); current_entry.traded_task_.store(published_task, std::memory_order_relaxed); // Field that all threads synchronize on. // This happens not in the deque itself, but in the published task. traded_cas_field sync_cas_field; sync_cas_field.fill_with_trade_request(expected_stamp, thread_id_); published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); // Advance the bot pointer. Linearization point for making the task public. bot_internal_.stamp_++; bot_internal_.value_++; bot_.store(bot_internal_.value_, std::memory_order_release); } void external_trading_deque::reset_bot_and_top() { bot_internal_.value_ = 0; bot_internal_.stamp_++; bot_.store(0, std::memory_order_release); top_.store({bot_internal_.stamp_, 0}, std::memory_order_release); } task *external_trading_deque::pop_bot() { if (bot_internal_.value_ == 0) { return nullptr; } bot_internal_.value_--; bot_.store(bot_internal_.value_, std::memory_order_relaxed); auto ¤t_entry = entries_[bot_internal_.value_]; auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); // We know what value must be in the cas field if no other thread stole it. traded_cas_field expected_sync_cas_field; expected_sync_cas_field.fill_with_trade_request(expected_stamp, thread_id_); traded_cas_field empty_cas_field = expected_sync_cas_field; empty_cas_field.make_empty(); if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, empty_cas_field, std::memory_order_acq_rel)) { return popped_task; } else { reset_bot_and_top(); return nullptr; } } external_trading_deque::peek_result external_trading_deque::peek_top() { auto local_top = top_.load(std::memory_order_acquire); auto local_bot = bot_.load(std::memory_order_acquire); if (local_top.value_ < local_bot) { return peek_result{entries_[local_top.value_].traded_task_.load(std::memory_order_relaxed), local_top}; } else { return peek_result{nullptr, local_top}; } } task *external_trading_deque::pop_top(task *offered_task, peek_result peek_result) { stamped_integer expected_top = peek_result.top_pointer_; auto local_bot = bot_.load(std::memory_order_acquire); if (expected_top.value_ >= local_bot) { return nullptr; } auto &target_entry = entries_[expected_top.value_]; // Read our potential result task *result = target_entry.traded_task_.load(std::memory_order_relaxed); unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(std::memory_order_relaxed); if (result == nullptr) { return nullptr; } if (forwarding_stamp != expected_top.stamp_) { // ...we failed because the top tag lags behind...try to fix it. // This means only updating the tag, as this location can still hold data we need. data_structures::stamped_integer forwarded_top{forwarding_stamp, expected_top.value_}; top_.compare_exchange_strong(expected_top, forwarded_top, std::memory_order_relaxed); return nullptr; // We might be in a consistent state again, however, the peeked task is no longer valid! } // Try to get it by CAS with the expected field entry, giving up our offered_task for it traded_cas_field expected_sync_cas_field; expected_sync_cas_field.fill_with_trade_request(expected_top.stamp_, thread_id_); traded_cas_field offered_field = expected_sync_cas_field; offered_field.fill_with_task(offered_task->thread_id_); if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field, std::memory_order_acq_rel)) { // We got it, for sure move the top pointer forward. top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1}, std::memory_order_acq_rel); return result; } else { if (expected_sync_cas_field.is_filled_with_object() && expected_sync_cas_field.get_stamp() == expected_top.stamp_ && expected_sync_cas_field.get_trade_request_thread_id() == thread_id_) { top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1}, std::memory_order_relaxed); } return nullptr; } } }