external_trading_deque.cpp 6.22 KB
Newer Older
1 2
#include "pls/internal/scheduling/lock_free/external_trading_deque.h"
#include "pls/internal/scheduling/lock_free/traded_cas_field.h"
3
#include "pls/internal/scheduling/lock_free/task.h"
4

5
namespace pls::internal::scheduling::lock_free {
6

7
traded_cas_field external_trading_deque::peek_traded_object(task *target_task) {
8
  traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(std::memory_order_relaxed);
9
  return current_cas;
10 11
}

12
task *external_trading_deque::get_trade_object(task *target_task,
13
                                               traded_cas_field peeked_cas) {
14
  traded_cas_field current_cas = peeked_cas;
15
  if (current_cas.is_filled_with_object()) {
16 17 18 19
    auto result_id = current_cas.get_task_id();

    traded_cas_field empty_cas = peeked_cas;
    empty_cas.make_empty();
20 21 22
    if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas,
                                                                         empty_cas,
                                                                         std::memory_order_acq_rel)) {
23
      task *result = task::find_task(result_id, target_task->depth_);
24
      return result;
25 26 27
    }
  }

28
  return nullptr;
29 30 31
}

void external_trading_deque::push_bot(task *published_task) {
32 33
  auto expected_stamp = bot_internal_.stamp_;
  auto &current_entry = entries_[bot_internal_.value_];
34 35 36 37 38 39 40 41

  // Publish the prepared task in the deque.
  current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed);
  current_entry.traded_task_.store(published_task, std::memory_order_relaxed);

  // Field that all threads synchronize on.
  // This happens not in the deque itself, but in the published task.
  traded_cas_field sync_cas_field;
42
  sync_cas_field.fill_with_trade_request(expected_stamp, thread_id_);
43 44 45
  published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release);

  // Advance the bot pointer. Linearization point for making the task public.
46 47 48
  bot_internal_.stamp_++;
  bot_internal_.value_++;
  bot_.store(bot_internal_.value_, std::memory_order_release);
49 50 51
}

void external_trading_deque::reset_bot_and_top() {
52 53
  bot_internal_.value_ = 0;
  bot_internal_.stamp_++;
54

55 56
  bot_.store(0, std::memory_order_release);
  top_.store({bot_internal_.stamp_, 0}, std::memory_order_release);
57 58
}

59
task *external_trading_deque::pop_bot() {
60 61
  if (bot_internal_.value_ == 0) {
    return nullptr;
62
  }
63

64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  bot_internal_.value_--;
  bot_.store(bot_internal_.value_, std::memory_order_relaxed);

  auto &current_entry = entries_[bot_internal_.value_];
  auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed);
  auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed);

  // We know what value must be in the cas field if no other thread stole it.
  traded_cas_field expected_sync_cas_field;
  expected_sync_cas_field.fill_with_trade_request(expected_stamp, thread_id_);
  traded_cas_field empty_cas_field = expected_sync_cas_field;
  empty_cas_field.make_empty();

  if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field,
                                                                       empty_cas_field,
                                                                       std::memory_order_acq_rel)) {
    return popped_task;
  } else {
    reset_bot_and_top();
    return nullptr;
  }
85 86 87
}

external_trading_deque::peek_result external_trading_deque::peek_top() {
88 89
  auto local_top = top_.load(std::memory_order_acquire);
  auto local_bot = bot_.load(std::memory_order_acquire);
90

91
  if (local_top.value_ < local_bot) {
92
    return peek_result{entries_[local_top.value_].traded_task_.load(std::memory_order_relaxed), local_top};
93
  } else {
94
    return peek_result{nullptr, local_top};
95 96 97
  }
}

98
task *external_trading_deque::pop_top(task *offered_task, peek_result peek_result) {
99
  stamped_integer expected_top = peek_result.top_pointer_;
100
  auto local_bot = bot_.load(std::memory_order_acquire);
101
  if (expected_top.value_ >= local_bot) {
102
    return nullptr;
103 104
  }

105
  auto &target_entry = entries_[expected_top.value_];
106 107

  // Read our potential result
108 109
  task *result = target_entry.traded_task_.load(std::memory_order_relaxed);
  unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(std::memory_order_relaxed);
110

111 112 113 114 115 116
  if (result == nullptr) {
    return nullptr;
  }
  if (forwarding_stamp != expected_top.stamp_) {
    // ...we failed because the top tag lags behind...try to fix it.
    // This means only updating the tag, as this location can still hold data we need.
117 118 119 120 121 122 123 124 125 126
    data_structures::stamped_integer forwarded_top{forwarding_stamp, expected_top.value_};
    if (top_.compare_exchange_strong(expected_top,
                                     forwarded_top,
                                     std::memory_order_relaxed)) {
      // We could update the top as needed, continue the pop request
      expected_top = forwarded_top;
    } else {
      // We did not get to update the top tag, back off
      return nullptr;
    }
127
  }
128

129 130
  // Try to get it by CAS with the expected field entry, giving up our offered_task for it
  traded_cas_field expected_sync_cas_field;
131
  expected_sync_cas_field.fill_with_trade_request(expected_top.stamp_, thread_id_);
132

133 134
  traded_cas_field offered_field = expected_sync_cas_field;
  offered_field.fill_with_task(offered_task->thread_id_);
135

136 137 138
  if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field,
                                                                  offered_field,
                                                                  std::memory_order_acq_rel)) {
139
    // We got it, for sure move the top pointer forward.
140 141 142
    top_.compare_exchange_strong(expected_top,
                                 {expected_top.stamp_ + 1, expected_top.value_ + 1},
                                 std::memory_order_acq_rel);
143
    return result;
144
  } else {
145 146
    if (expected_sync_cas_field.is_filled_with_object() && expected_sync_cas_field.get_stamp() == expected_top.stamp_
        && expected_sync_cas_field.get_trade_request_thread_id() == thread_id_) {
147 148 149
      top_.compare_exchange_strong(expected_top,
                                   {expected_top.stamp_ + 1, expected_top.value_ + 1},
                                   std::memory_order_relaxed);
150
    }
151
    return nullptr;
152 153 154 155
  }
}

}