external_trading_deque.cpp 5.17 KB
Newer Older
1 2
#include "pls/internal/scheduling/lock_free/external_trading_deque.h"
#include "pls/internal/scheduling/lock_free/traded_cas_field.h"
3

4
namespace pls::internal::scheduling::lock_free {
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137

optional<task *> external_trading_deque::peek_traded_object(task *target_task) {
  traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
  if (current_cas.is_filled_with_object()) {
    return optional<task *>{current_cas.get_trade_object()};
  } else {
    return optional<task *>{};
  }
}

optional<task *> external_trading_deque::get_trade_object(task *target_task) {
  traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
  if (current_cas.is_filled_with_object()) {
    task *result = current_cas.get_trade_object();
    traded_cas_field empty_cas;
    if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) {
      return optional<task *>{result};
    }
  }

  return optional<task *>{};
}

void external_trading_deque::push_bot(task *published_task) {
  auto expected_stamp = bot_internal_.stamp;
  auto &current_entry = entries_[bot_internal_.value];

  // Publish the prepared task in the deque.
  current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed);
  current_entry.traded_task_.store(published_task, std::memory_order_relaxed);

  // Field that all threads synchronize on.
  // This happens not in the deque itself, but in the published task.
  traded_cas_field sync_cas_field;
  sync_cas_field.fill_with_stamp(expected_stamp, thread_id_);
  published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release);

  // Advance the bot pointer. Linearization point for making the task public.
  bot_internal_.stamp++;
  bot_internal_.value++;
  bot_.store(bot_internal_.value, std::memory_order_release);
}

void external_trading_deque::reset_bot_and_top() {
  bot_internal_.value = 0;
  bot_internal_.stamp++;

  bot_.store(0);
  top_.store({bot_internal_.stamp, 0});
}

void external_trading_deque::decrease_bot() {
  bot_internal_.value--;
  bot_.store(bot_internal_.value, std::memory_order_relaxed);
}

optional<task *> external_trading_deque::pop_bot() {
  if (bot_internal_.value == 0) {
    reset_bot_and_top();
    return optional<task *>{};
  }
  decrease_bot();

  auto &current_entry = entries_[bot_internal_.value];
  auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed);
  auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed);

  // We know what value must be in the cas field if no other thread stole it.
  traded_cas_field expected_sync_cas_field;
  expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_);
  traded_cas_field empty_cas_field;

  if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field,
                                                                       empty_cas_field,
                                                                       std::memory_order_acq_rel)) {
    return optional<task *>{popped_task};
  } else {
    reset_bot_and_top();
    return optional<task *>{};
  }
}

external_trading_deque::peek_result external_trading_deque::peek_top() {
  auto local_top = top_.load();
  auto local_bot = bot_.load();

  if (local_top.value < local_bot) {
    return peek_result{optional<task *>{entries_[local_top.value].traded_task_}, local_top};
  } else {
    return peek_result{optional<task *>{}, local_top};
  }
}

optional<task *> external_trading_deque::pop_top(task *offered_task, peek_result peek_result) {
  stamped_integer expected_top = peek_result.top_pointer_;
  auto local_bot = bot_.load();
  if (expected_top.value >= local_bot) {
    return data_structures::optional<task *>{};
  }

  auto &target_entry = entries_[expected_top.value];

  // Read our potential result
  task *result = target_entry.traded_task_.load();
  unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load();

  // Try to get it by CAS with the expected field entry, giving up our offered_task for it
  traded_cas_field expected_sync_cas_field;
  expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_);

  traded_cas_field offered_field;
  offered_field.fill_with_trade_object(offered_task);

  if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) {
    // We got it, for sure move the top pointer forward.
    top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1});
    // Return the stolen task
    return data_structures::optional<task *>{result};
  } else {
    // We did not get it...help forwarding the top pointer anyway.
    if (expected_top.stamp == forwarding_stamp) {
      // ...move the pointer forward if someone else put a valid trade object in there.
      top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1});
    } else {
      // ...we failed because the top tag lags behind...try to fix it.
      // This means only updating the tag, as this location can still hold data we need.
      top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value});
    }
    return data_structures::optional<task *>{};
  }
}

}