wait_free_mpmc_queue-inl.h 21.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
/*
 * Copyright (c) 2014, Siemens AG. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 * this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 * this list of conditions and the following disclaimer in the documentation
 * and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef EMBB_CONTAINERS_INTERNAL_WAIT_FREE_MPMC_QUEUE_INL_H_
#define EMBB_CONTAINERS_INTERNAL_WAIT_FREE_MPMC_QUEUE_INL_H_

#include <embb/base/atomic.h>
#include <embb/base/function.h>
#include <embb/base/c/internal/thread_index.h>

namespace embb {
namespace containers {

namespace internal {

template<typename Type>
inline WaitFreeMPMCQueueNode<Type>::WaitFreeMPMCQueueNode()
: enq_aid(UndefinedIndex) {
  next_idx.Store(UndefinedIndex);
  deq_aid.Store(UndefinedIndex);
}

template<typename Type>
inline WaitFreeMPMCQueueNode<Type>::WaitFreeMPMCQueueNode(
  const self_t & other)
: value(other.value),
  enq_aid(other.enq_aid) {
  next_idx.Store(other.next_idx.Load());
  deq_aid.Store(other.deq_aid.Load());
}

template<typename Type>
inline WaitFreeMPMCQueueNode<Type>::WaitFreeMPMCQueueNode(
  Type const val, uint32_t enqAid)
: value(val), enq_aid(enqAid) {
  next_idx.Store(UndefinedIndex);
  deq_aid.Store(UndefinedIndex);
}

template<typename Type>
inline Type WaitFreeMPMCQueueNode<Type>::Value() const {
  return value;
}

template<typename Type>
inline uint32_t WaitFreeMPMCQueueNode<Type>::NextPoolIdx() const {
  return next_idx.Load();
}

template<typename Type>
inline WaitFreeMPMCQueueNode<Type> &
WaitFreeMPMCQueueNode<Type>::operator=(
  const self_t & other) {
  if (this != &other) {
    next_idx.Store(other.next_idx.Load());
    deq_aid.Store(other.deq_aid.Load());
    value = other.value;
    enq_aid = other.enq_aid;
  }
  return *this;
}

template<typename Type>
bool WaitFreeMPMCQueueNode<Type>::CASNext(
  uint32_t expNextIdx, uint32_t newNextIdx) {
  return next_idx.CompareAndSwap(expNextIdx, newNextIdx);
}

template<typename Type>
93
bool WaitFreeMPMCQueueNode<Type>::NextIsNull() const {
94 95 96 97 98 99 100 101 102 103 104 105 106
  return next_idx.Load() == UndefinedIndex;
}

template<typename Type>
inline uint32_t WaitFreeMPMCQueueNode<Type>::EnqueueAID() const {
  return enq_aid;
}

template<typename Type>
embb::base::Atomic<uint32_t> & WaitFreeMPMCQueueNode<Type>::DequeueAID() {
  return deq_aid;
}

107
/// Using maximum value of OperationDesc::NodeIndex (30 bit) to
108 109 110 111 112 113 114
/// represent 'undefined'.
template<typename Type>
const uint32_t WaitFreeMPMCQueueNode<Type>::UndefinedIndex = 0x3fffffff;

}  // namespace internal

template<
115 116
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::OperationDesc::
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
OperationDesc(
  bool pending,
  bool enqueue,
  index_t nodeIndex) :
  Pending(pending),
  Enqueue(enqueue),
  NodeIndex(nodeIndex),
  Raw(0) {
  index_t nodeIndexMask = NodeIndex & NODE_INDEX_MASK;
  if (Pending) {
    Raw |= PENDING_FLAG_MASK;
  }
  if (Enqueue) {
    Raw |= ENQUEUE_FLAG_MASK;
  }
  Raw |= nodeIndexMask;
}

template<
136 137
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::OperationDesc::
138 139 140 141 142 143 144
OperationDesc(index_t raw) : Raw(raw) {
  Pending   = (raw & PENDING_FLAG_MASK) ? true : false;
  Enqueue   = (raw & ENQUEUE_FLAG_MASK) ? true : false;
  NodeIndex = (raw & NODE_INDEX_MASK);
}

template<
145 146
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
WaitFreeMPMCQueue(size_t capacity)
: max_size_(capacity),
//  Disable "this is used in base member initializer" warning.
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable:4355)
#endif
  delete_pointer_callback(*this, &self_t::DeleteNodeCallback),
#ifdef _MSC_VER
#pragma warning(pop)
#endif
  hp(delete_pointer_callback, UndefinedGuard, 2),
  // Using int for numThreads so compiler warning is
  // raised when size and numThreads are switched by
  // mistake.
  num_states(embb::base::Thread::GetThreadsMaxCount()),
  // Node pool size, with respect to the maximum number of
  // retired nodes not eligible for reuse due to hazard pointers:
  node_pool_size(
166
    // Nodes in hazard pointers' retired lists
167 168
    (hp.GetRetiredListMaxSize() *
     embb::base::Thread::GetThreadsMaxCount()) +
169 170 171
    // Nodes guarded in operation descriptions
    embb::base::Thread::GetThreadsMaxCount() +
    // Actual capacity + 1 sentinel node
172 173 174 175 176 177 178 179 180
    max_size_ + 1),
  nodePool(node_pool_size, nullNode) {
  // Assert width of binary representation of operation description
  assert(sizeof(index_t) == 4);
  if (max_size_ > QUEUE_SIZE_MAX) {
    EMBB_THROW(embb::base::NoMemoryException,
      "Maximum size of queue exceeded");
  }
  // Allocate sentinel node:
181 182 183 184
  int sentinelNodePoolIndex = nodePool.Allocate();
  assert(
    nodePool[sentinelNodePoolIndex].NextPoolIdx() ==
    Node_t::UndefinedIndex);
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
  if (sentinelNodePoolIndex < 0) {
    EMBB_THROW(embb::base::NoMemoryException,
      "Allocation of sentinel node failed");
  }
  // No need to guard sentinel node, it is prevented from reuse
  // in the hazard pointers' delete callback (see DeleteNodeCallback).
  sentinelNodeIndex = static_cast<index_t>(sentinelNodePoolIndex);
  headIdx.Store(sentinelNodeIndex);
  tailIdx.Store(sentinelNodeIndex);
  // State of the queue is one operation description per queue accessor.
  // Initialize clear state: Null-operarion for every accessor.
  operationDescriptions = operationDescriptionAllocator.allocate(num_states);
  for (size_t accessorId = 0; accessorId < num_states; ++accessorId) {
    OperationDesc op(
      false,                 // nonpending
      true,                  // enqueue, should not matter
      Node_t::UndefinedIndex // node index
    );
    // As all operation descriptions are the same,
    // we do not have to map accessor ids to operation
    // pool indices. Every accessor will randomly grab
    // an operation pool element and stick to it, as
    // a threads accessor id will not change.
    operationDescriptions[accessorId].Store(op.Raw);
  }
}

template<
213 214
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
215 216 217 218 219 220 221 222 223 224
~WaitFreeMPMCQueue() {
  // Dequeue until queue is empty:
  Type val;
  // Delete internally managed memory regions:
  operationDescriptionAllocator.deallocate(
    operationDescriptions,
    num_states);
}

template<
225 226
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
inline bool WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
LoadAccessorThreadIndex(index_t & retIndexValue) {
  unsigned int tmpIndexValue; // For conversion size32_t <-> unsigned int
  if (embb_internal_thread_index(&tmpIndexValue) == EMBB_SUCCESS) {
    // Fail if thread index is not in range of number of accessors:
    if (tmpIndexValue < num_states) {
      retIndexValue = tmpIndexValue;
      return true;
    }
    return false;
  }
  retIndexValue = Node_t::UndefinedIndex;
  return false;
}

template<
242 243
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
inline size_t WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
244 245 246 247 248 249 250
RetiredListMaxSize(size_t nThreads) {
  return static_cast<size_t>(
    1.25 *
    static_cast<double>(nThreads) * static_cast<double>(num_guards)) + 1;
}

template<
251 252
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
bool WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
253 254 255 256 257 258 259
TryEnqueue(Type const & element) {
  index_t accessorId = Node_t::UndefinedIndex;
  if (!LoadAccessorThreadIndex(accessorId)) {
    EMBB_THROW(embb::base::ErrorException,
      "Invalid thread ID.");
  }
  // Register new node in pool:
260
  int nodeIndex = nodePool.Allocate(element, accessorId);
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
  if (nodeIndex < 0) {
    return false; // Queue is at capacity
  }
  OperationDesc enqOp(
    true,    // pending
    true,    // enqueue
    static_cast<index_t>(nodeIndex)
    );
  operationDescriptions[accessorId].Store(enqOp.Raw);
  Help();
  HelpFinishEnqueue();
  return true;
}

template<
276 277
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
bool WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
TryDequeue(Type & retElement) {
  index_t accessorId = static_cast<index_t>(-1);
  if (!LoadAccessorThreadIndex(accessorId)) {
    EMBB_THROW(embb::base::ErrorException,
      "Invalid thread ID. Verify embb::base::Thread::thread_max_count.");
  }
  OperationDesc curOp(operationDescriptions[accessorId].Load());
  // Assert that current operation of this accessor is completed:
  assert(!curOp.Pending);
  // Register new operation description for CAS:
  OperationDesc newOp(
    true,                  // pending
    false,                 // dequeue
    Node_t::UndefinedIndex // node index
    );
  index_t curOpRaw = curOp.Raw;
  if (!operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw)) {
    // The threads own operation has changed,
    // should not happen.
    assert(false);
  }
  Help();
  HelpFinishDequeue();
301
  // Reload the operation description announced by this thread
302 303 304 305
  curOp = OperationDesc(operationDescriptions[accessorId].Load());
  // Operation announced by this thread must not be pending any more:
  assert(!curOp.Pending);
  // Check element
306
  index_t nodeIdx = curOp.NodeIndex;  
307 308 309 310 311 312
  if (nodeIdx == Node_t::UndefinedIndex) {
    // Allow dequeueing from empty queue, but
    // return false:
    retElement = Type();
    return false;
  }
313
  Node_t & node = nodePool[static_cast<int>(nodeIdx)];
314 315 316
  assert(node.DequeueAID().Load() == accessorId);
  // Return value of node next to node dequeued in this operation:
  index_t nextNodeIdx = node.NextPoolIdx();
317
  retElement = nodePool[static_cast<int>(nextNodeIdx)].Value();
318 319
  // Value is safe. Mark node as non-pending and available for reclamation by
  // setting this operation's node index to UndefinedIndex:
320 321 322 323 324 325 326 327 328 329 330 331
  OperationDesc noOp(
    false,                 // non-pending
    false,                 // any
    Node_t::UndefinedIndex // no node index
    );
  curOp = OperationDesc(operationDescriptions[accessorId].Load());
  curOpRaw = curOp.Raw;
  if (!operationDescriptions[accessorId].CompareAndSwap(curOpRaw, noOp.Raw)) {
    // The threads own operation has changed,
    // should not happen.
    assert(false);
  }
332
  // Release node
333 334 335 336 337
  hp.EnqueuePointerForDeletion(nodeIdx);
  return true;
}

template<
338 339
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
void WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
340 341 342 343 344
HelpEnqueue(unsigned int accessorId) {
  while (IsPending(accessorId)) {
    index_t lastIdx = tailIdx.Load();
    // Guard tail:
    hp.GuardPointer(0, lastIdx);
345
    Node_t & lastNode = nodePool[static_cast<int>(lastIdx)];
346
    index_t nextIdx = lastNode.NextPoolIdx();
347
    if (lastIdx == tailIdx.Load()) {
348 349 350
      // Last node still is tail
      if (nextIdx == Node_t::UndefinedIndex) {
        // tail.next is null (no pending enqueue on tail), apply enqueue.
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
        // No other accessor helped this enqueue operation yet:
        if (IsPending(accessorId)) {
          // Set next-pointer of last element in list
          OperationDesc opDesc(operationDescriptions[accessorId].Load());
          if (lastNode.CASNext(nextIdx, opDesc.NodeIndex)) {
            HelpFinishEnqueue();
            return;
          }
        }
      } else {
        // Some enqueue operation in progress
        HelpFinishEnqueue();
      }
    }
  }
}

template<
369 370
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
void WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
371 372 373 374 375
HelpFinishEnqueue() {
  // Load node pointed at by tail:
  index_t lastIdx = tailIdx.Load();
  // Guard tail:
  hp.GuardPointer(0, lastIdx);
376
  // Load tail->next:
377
  Node_t & lastNode = nodePool[static_cast<int>(lastIdx)];
378
  index_t nextIdx   = lastNode.NextPoolIdx();
379
  Node_t & nextNode = nodePool[static_cast<int>(nextIdx)];
380
  // tail->next not undefined => unfinished ENQ
381 382 383 384 385 386 387 388
  if (nextIdx != Node_t::UndefinedIndex) {
    // Load accessor id from last (non-tail) element in list:
    index_t helpAID = nextNode.EnqueueAID();
    // Load operation for accessor that started the unfinished enqueue:
    OperationDesc helpOp(operationDescriptions[helpAID].Load());
    // tail index still points at last node:
    // (last == tail && state[aid].node == next)
    if (lastIdx == tailIdx.Load() &&
389 390 391
      // Reload operation description here, do not use helpOp:
      (OperationDesc(operationDescriptions[helpAID].Load()).NodeIndex ==
        nextIdx)) {
392
      OperationDesc newOp(
393 394
        false,  // set to nonpending
        true,   // enqueue
395 396 397 398 399 400 401 402 403 404 405 406 407
        nextIdx // node index == helpOp.NodeIndex
        );
      index_t helpOpRaw = helpOp.Raw;
      operationDescriptions[helpAID].CompareAndSwap(
        helpOpRaw,
        newOp.Raw);
      // Update tail pointer:
      tailIdx.CompareAndSwap(lastIdx, nextIdx);
    }
  }
}

template<
408 409
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
void WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
410 411 412 413 414
HelpDequeue(index_t accessorId) {
  while (IsPending(accessorId)) {
    index_t firstIdx = headIdx.Load();
    // Guard head:
    hp.GuardPointer(0, firstIdx);
415
    // Order matters for these assignments:
416
    Node_t & first  = nodePool[static_cast<int>(firstIdx)];
417 418 419 420 421 422 423 424
    index_t lastIdx = tailIdx.Load();
    index_t nextIdx = first.NextPoolIdx();
    // Guard head->next:
    hp.GuardPointer(1, nextIdx);
    if (nextIdx != first.NextPoolIdx()) {
      // Head->next pointer changed by concurrent enqueue
      continue;
    }
425 426 427 428
    if (firstIdx != headIdx.Load()) {
      // Head pointer changed by concurrent enqueue
      continue;
    }
429 430 431 432 433 434
    if (firstIdx == lastIdx) {
      // Queue might be empty
      if (nextIdx == Node_t::UndefinedIndex) {
        // Queue is empty
        OperationDesc curOp(operationDescriptions[accessorId].Load());
        if (lastIdx == tailIdx.Load() && IsPending(accessorId)) {
435 436 437 438
          // The CAS on the operation description is not ABA-prone as the
          // pending state will only change from true to false and cannot
          // change back to true unless the operation has been completed
          // and a new operation has been announced by the same thread.
439 440 441 442 443 444 445 446 447 448 449
          OperationDesc newOp(
            false,                 // Set nonpending state
            false,
            Node_t::UndefinedIndex // Leave undefined, to signal failed dequeue
            );
          // CAS without check as possibly another accessor
          // already helped this dequeue operation.
          index_t curOpRaw = curOp.Raw;
          operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw);
        }
      } else {
450 451
        // head == tail, but tail->next is not undefined, so head has been
        // advanced because of unfinished enqueue; help other enqueue and retry:
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
        HelpFinishEnqueue();
      }
    } else {
      // Queue is not empty
      OperationDesc curOp(operationDescriptions[accessorId].Load());
      index_t nodeIdx = curOp.NodeIndex;
      if (!IsPending(accessorId)) {
        // Accessor not pending because another thread completed this
        // operation already, done.
        break;
      }
      if (firstIdx == headIdx.Load() && nodeIdx != firstIdx) {
        OperationDesc newOp(
          true,
          false,
          firstIdx // Set node index
          );
        index_t curOpRaw = curOp.Raw;
470 471 472
        // The CAS on the operation description is not ABA-prone as the
        // node index to be set is guarded and cannot be used in a concurrent
        // operation.
473 474 475
        if (!operationDescriptions[accessorId].CompareAndSwap(
              curOpRaw, newOp.Raw)) {
          // This CAS can only have failed because another
476
          // thread helped completing this dequeue operation in the
477 478 479 480 481 482 483 484 485 486 487
          // meantime.
          continue;  // Retry
        }
      }
      // The following CAS also happens if
      //
      //   firstIdx != headIdx.Load() || nodeIdx == firstIdx
      //
      // In this case, HelpFinishDequeue will complete the dequeue.
      index_t curDeqAID = Node_t::UndefinedIndex;
      // Register this accessor as dequeuer of this node.
488 489 490 491 492
      first.DequeueAID().CompareAndSwap(curDeqAID, accessorId);
      // If this CAS failed, the element has been dequeued by another thread
      // (an operation can only fail if another operation succeeded).
      // This operation is still pending at this point, and will be set to
      // non-pending state in HelpFinishDequeue:
493 494
      HelpFinishDequeue();
    }
495
  } // while pending
496 497 498
}

template<
499 500
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
void WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
501 502 503 504
HelpFinishDequeue() {
  index_t firstIdx = headIdx.Load();
  // Guard head:
  hp.GuardPointer(0, firstIdx);
505
  Node_t & first = nodePool[static_cast<int>(firstIdx)];
506
  index_t nextIdx = first.NextPoolIdx();
507 508 509
  // Guard and head->next
  // Actually not necessary, as head->next will only change from Undefined
  // to a node index value, but not back to Undefined.
510
  hp.GuardPointer(1, nextIdx);
511
  if (nextIdx != nodePool[static_cast<int>(firstIdx)].NextPoolIdx()) {
512 513
    return;
  }
514 515
  index_t accessorId = first.DequeueAID().Load();
  if (accessorId != Node_t::UndefinedIndex) {
516 517
    // head.DeqeueueAID is set to the accessor id that won the last CAS
    // in HelpDequeue
518 519 520 521 522 523 524 525 526 527 528 529
    OperationDesc curOp(operationDescriptions[accessorId].Load());
    if (firstIdx == headIdx.Load() &&
        nextIdx != Node_t::UndefinedIndex) {
      // Set state of helped operation to NONPENDING:
      OperationDesc newOp(
        false, // nonpending
        false, // dequeue
        curOp.NodeIndex
        );
      // CAS without check as possibly another accessor
      // already helped this dequeue operation.
      index_t curOpRaw = curOp.Raw;
530 531 532 533
      // The CAS on the operation description is not ABA-prone as the
      // pending state will only change from true to false and cannot
      // change back to true unless the operation has been completed
      // and a new operation has been announced by the same thread.
534
      operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw);
535
      // Update head:
536 537 538 539 540 541
      headIdx.CompareAndSwap(firstIdx, nextIdx);
    }
  }
}

template<
542 543
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
void WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
Help() {
  // Fairness guarantee in every thread: 
  // "Every other thread will help my operation before helping its
  // own operation"
  // Number of total operations in operation description buffer to be
  // helped before engaging in own announced operation
  index_t numHelpOps    = static_cast<index_t>(num_states);
  index_t ownAccessorId = Node_t::UndefinedIndex;
  LoadAccessorThreadIndex(ownAccessorId);
  // Start helping accessor with next id, ensuring that own accessor id
  // will be used in last iteration of help loop: 
  index_t startAccessorId = (ownAccessorId + 1) % num_states;
  for (unsigned int accessorId = startAccessorId;
       numHelpOps > 0;
       ++accessorId, --numHelpOps) {
    OperationDesc desc(
      operationDescriptions[accessorId % num_states].Load());
    if (desc.Pending) {
      if (desc.Enqueue) {
        HelpEnqueue(accessorId % num_states);
      } else {
        HelpDequeue(accessorId % num_states);
      }
    }
  }
}

template<
572 573
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
void WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
574
DeleteNodeCallback(index_t releasedNodeIndex) {
575 576 577
  if (!NodeIsPending(releasedNodeIndex)) {
    nodePool.Free(static_cast<int>(releasedNodeIndex));
  }
578 579 580
}

template<
581 582
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
inline size_t WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
583 584 585 586
GetCapacity() {
  return max_size_;
}

587 588

template<
589 590
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
inline bool WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
591 592 593 594 595 596 597 598 599 600
NodeIsPending(index_t nodeIdx) {
  for (unsigned int accessorId = 0; accessorId < num_states; ++accessorId) {
    if (OperationDesc(operationDescriptions[accessorId].Load()).NodeIndex ==
        nodeIdx) {
      return true;
    }
  }
  return false;
}

601
template<
602 603
  typename Type, class ValuePool, class NodeAllocator, class OpAllocator >
inline bool WaitFreeMPMCQueue<Type, ValuePool, NodeAllocator, OpAllocator>::
604 605 606 607 608 609 610 611 612
IsPending(unsigned int accessorId) {
  OperationDesc opDesc(operationDescriptions[accessorId].Load());
  return opDesc.Pending;
}

} // namespace containers
} // namespace embb

#endif  // EMBB_CONTAINERS_INTERNAL_WAIT_FREE_MPMC_QUEUE_INL_H_