external_trading_deque.cpp 5.07 KB
Newer Older
1 2
#include "pls/internal/scheduling/lock_free/external_trading_deque.h"
#include "pls/internal/scheduling/lock_free/traded_cas_field.h"
3

4
namespace pls::internal::scheduling::lock_free {
5

6
traded_cas_field external_trading_deque::peek_traded_object(task *target_task) {
7
  traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
8
  return current_cas;
9 10
}

11 12 13 14
task *external_trading_deque::get_trade_object(task *target_task,
                                               traded_cas_field peeked_cas,
                                               external_trading_deque &other_deque) {
  traded_cas_field current_cas = peeked_cas;
15 16 17
  if (current_cas.is_filled_with_object()) {
    task *result = current_cas.get_trade_object();
    traded_cas_field empty_cas;
18
    empty_cas.fill_with_stamp_and_empty(other_deque.bot_internal_.stamp_, other_deque.thread_id_);
19
    if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) {
20
      return result;
21 22 23
    }
  }

24
  return nullptr;
25 26 27
}

void external_trading_deque::push_bot(task *published_task) {
28 29
  auto expected_stamp = bot_internal_.stamp_;
  auto &current_entry = entries_[bot_internal_.value_];
30 31 32 33 34 35 36 37

  // Publish the prepared task in the deque.
  current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed);
  current_entry.traded_task_.store(published_task, std::memory_order_relaxed);

  // Field that all threads synchronize on.
  // This happens not in the deque itself, but in the published task.
  traded_cas_field sync_cas_field;
38
  sync_cas_field.fill_with_stamp_and_deque(expected_stamp, thread_id_);
39 40 41
  published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release);

  // Advance the bot pointer. Linearization point for making the task public.
42 43 44
  bot_internal_.stamp_++;
  bot_internal_.value_++;
  bot_.store(bot_internal_.value_, std::memory_order_release);
45 46 47
}

void external_trading_deque::reset_bot_and_top() {
48 49
  bot_internal_.value_ = 0;
  bot_internal_.stamp_++;
50 51

  bot_.store(0);
52
  top_.store({bot_internal_.stamp_, 0});
53 54
}

55
task *external_trading_deque::pop_bot() {
56 57 58
  if (bot_internal_.value_ > 0) {
    bot_internal_.value_--;
    bot_.store(bot_internal_.value_, std::memory_order_relaxed);
59

60
    auto &current_entry = entries_[bot_internal_.value_];
61 62
    auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed);
    auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed);
63

64 65
    // We know what value must be in the cas field if no other thread stole it.
    traded_cas_field expected_sync_cas_field;
66
    expected_sync_cas_field.fill_with_stamp_and_deque(expected_stamp, thread_id_);
67
    traded_cas_field empty_cas_field;
68
    empty_cas_field.fill_with_stamp_and_empty(expected_stamp, thread_id_);
69

70 71 72 73 74
    if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field,
                                                                         empty_cas_field,
                                                                         std::memory_order_acq_rel)) {
      return popped_task;
    }
75
  }
76 77 78

  reset_bot_and_top();
  return nullptr;
79 80 81 82 83 84
}

external_trading_deque::peek_result external_trading_deque::peek_top() {
  auto local_top = top_.load();
  auto local_bot = bot_.load();

85 86
  if (local_top.value_ < local_bot) {
    return peek_result{entries_[local_top.value_].traded_task_, local_top};
87
  } else {
88
    return peek_result{nullptr, local_top};
89 90 91
  }
}

92
task *external_trading_deque::pop_top(task *offered_task, peek_result peek_result) {
93 94
  stamped_integer expected_top = peek_result.top_pointer_;
  auto local_bot = bot_.load();
95
  if (expected_top.value_ >= local_bot) {
96
    return nullptr;
97 98
  }

99
  auto &target_entry = entries_[expected_top.value_];
100 101 102 103 104

  // Read our potential result
  task *result = target_entry.traded_task_.load();
  unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load();

105

106 107
  // Try to get it by CAS with the expected field entry, giving up our offered_task for it
  traded_cas_field expected_sync_cas_field;
108
  expected_sync_cas_field.fill_with_stamp_and_deque(expected_top.stamp_, thread_id_);
109 110 111 112 113 114

  traded_cas_field offered_field;
  offered_field.fill_with_trade_object(offered_task);

  if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) {
    // We got it, for sure move the top pointer forward.
115
    top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1});
116
    return result;
117
  } else {
118
    if (forwarding_stamp != expected_top.stamp_) {
119 120
      // ...we failed because the top tag lags behind...try to fix it.
      // This means only updating the tag, as this location can still hold data we need.
121
      top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value_});
122
    }
123 124 125 126
    // TODO: Figure out how other tasks can put the stamp forward without race conditions.
    //       This must be here to ensure the lock-free property.
    //       For now first leave it out, as it makes fixing the other non-blocking interaction
    //       on the resource stack harder.
127
    return nullptr;
128 129 130 131
  }
}

}