bounded_ws_deque.h 3.21 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36

#ifndef PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_
#define PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_

#include <cstdio>
#include <array>
#include <atomic>

#include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/data_structures/optional.h"

namespace pls {
namespace internal {
namespace data_structures {

/**
 * Classic, text book ws bounded deque based on arrays.
 * Stores a fixed amount of fixed size objects in an array,
 * allowing or local push/pop on the bottom and remote
 * pop on the top.
 *
 * The local operations are cheap as long as head and tail are
 * far enough apart, making it ideal to avoid cache problems.
 *
 * Depends on overaligned datatypes to be cache line friendly.
 * This does not concern C++14 and upwards, but hinders you to properly
 * allocate it on the heap in C++11 (see base::alignment::alignment_wrapper for a solution).
 */
// TODO: Relax memory orderings in here...
template<typename T>
class bounded_ws_deque {
 public:
  bounded_ws_deque(T *item_array, size_t size) : size_{size}, item_array_{item_array} {}

  void push_bottom(T item) {
37
    item_array_[local_bottom_] = item;
38
    local_bottom_++;
39
    bottom_.store(local_bottom_, std::memory_order_release);
40 41 42
  }

  bool is_empty() {
43
    return top_.load().value_ < bottom_.load();
44 45 46 47
  }

  optional<T> pop_top() {
    stamped_integer old_top = top_.load();
48 49
    unsigned int new_stamp = old_top.stamp_ + 1;
    unsigned int new_value = old_top.value_ + 1;
50

51
    if (bottom_.load() <= old_top.value_) {
52 53 54
      return optional<T>();
    }

55
    optional<T> result(item_array_[old_top.value_]);
56 57 58 59 60 61 62 63 64 65 66 67 68
    if (top_.compare_exchange_strong(old_top, {new_stamp, new_value})) {
      return result;
    }

    return optional<T>();
  }

  optional<T> pop_bottom() {
    if (local_bottom_ == 0) {
      return optional<T>();
    }

    local_bottom_--;
69
    bottom_.store(local_bottom_, std::memory_order_seq_cst);
70 71 72

    optional<T> result(item_array_[local_bottom_]);

73
    stamped_integer old_top = top_.load(std::memory_order_acquire);
74
    if (local_bottom_ > old_top.value_) {
75 76 77
      // Enough distance to just return the value
      return result;
    }
78
    if (local_bottom_ == old_top.value_) {
79 80
      local_bottom_ = 0;
      bottom_.store(local_bottom_);
81
      if (top_.compare_exchange_strong(old_top, {old_top.stamp_ + 1, 0})) {
82 83 84 85 86 87
        // We won the competition and the queue is empty
        return result;
      }
    }

    // The queue is empty and we lost the competition
88 89
    local_bottom_ = 0;
    bottom_.store(local_bottom_);
90
    top_.store({old_top.stamp_ + 1, 0});
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
    return optional<T>();
  }

 private:
  alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<stamped_integer> top_{stamped_integer{0, 0}};
  alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<unsigned int> bottom_{0};
  unsigned int local_bottom_{0};
  size_t size_;
  T *item_array_;
};

template<typename T, size_t SIZE>
class static_bounded_ws_deque {
 public:
  static_bounded_ws_deque() : items_{}, deque_{items_.data(), SIZE} {}

  bounded_ws_deque<T> &get_deque() { return deque_; }
 private:
  std::array<T, SIZE> items_;
  bounded_ws_deque<T> deque_;
};

}
}
}

#endif //PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_WS_DEQUE_H_