#include "pls/internal/scheduling/external_trading_deque.h" namespace pls::internal::scheduling { optional external_trading_deque::peek_traded_object(task *target_task) { traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); if (current_cas.is_filled_with_object()) { return optional{current_cas.get_trade_object()}; } else { return optional{}; } } optional external_trading_deque::get_trade_object(task *target_task) { traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); if (current_cas.is_filled_with_object()) { task *result = current_cas.get_trade_object(); traded_cas_field empty_cas; if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { return optional{result}; } } return optional{}; } void external_trading_deque::push_bot(task *published_task) { auto expected_stamp = bot_internal_.stamp; auto ¤t_entry = entries_[bot_internal_.value]; // Publish the prepared task in the deque. current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); current_entry.traded_task_.store(published_task, std::memory_order_relaxed); // Field that all threads synchronize on. // This happens not in the deque itself, but in the published task. traded_cas_field sync_cas_field; sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); // Advance the bot pointer. Linearization point for making the task public. bot_internal_.stamp++; bot_internal_.value++; bot_.store(bot_internal_.value, std::memory_order_release); } void external_trading_deque::reset_bot_and_top() { bot_internal_.value = 0; bot_internal_.stamp++; bot_.store(0); top_.store({bot_internal_.stamp, 0}); } void external_trading_deque::decrease_bot() { bot_internal_.value--; bot_.store(bot_internal_.value, std::memory_order_relaxed); } optional external_trading_deque::pop_bot() { if (bot_internal_.value == 0) { reset_bot_and_top(); return optional{}; } decrease_bot(); auto ¤t_entry = entries_[bot_internal_.value]; auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); // We know what value must be in the cas field if no other thread stole it. traded_cas_field expected_sync_cas_field; expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); traded_cas_field empty_cas_field; if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, empty_cas_field, std::memory_order_acq_rel)) { return optional{popped_task}; } else { reset_bot_and_top(); return optional{}; } } external_trading_deque::peek_result external_trading_deque::peek_top() { auto local_top = top_.load(); auto local_bot = bot_.load(); if (local_top.value < local_bot) { return peek_result{optional{entries_[local_top.value].traded_task_}, local_top}; } else { return peek_result{optional{}, local_top}; } } optional external_trading_deque::pop_top(task *offered_task, peek_result peek_result) { stamped_integer expected_top = peek_result.top_pointer_; auto local_bot = bot_.load(); if (expected_top.value >= local_bot) { return data_structures::optional{}; } auto &target_entry = entries_[expected_top.value]; // Read our potential result task *result = target_entry.traded_task_.load(); unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); // Try to get it by CAS with the expected field entry, giving up our offered_task for it traded_cas_field expected_sync_cas_field; expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_); traded_cas_field offered_field; offered_field.fill_with_trade_object(offered_task); if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { // We got it, for sure move the top pointer forward. top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); // Return the stolen task return data_structures::optional{result}; } else { // We did not get it...help forwarding the top pointer anyway. if (expected_top.stamp == forwarding_stamp) { // ...move the pointer forward if someone else put a valid trade object in there. top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); } else { // ...we failed because the top tag lags behind...try to fix it. // This means only updating the tag, as this location can still hold data we need. top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value}); } return data_structures::optional{}; } } }