Commit a0d83966 by Tobias Fuchs

containers_cpp: added assertions and comments in WaitFreeMPMCQueue

parent 3fe8c475
......@@ -104,7 +104,7 @@ embb::base::Atomic<uint32_t> & WaitFreeMPMCQueueNode<Type>::DequeueAID() {
return deq_aid;
}
/// Using maximum value of OperationDesc::NodeIndex (15 bit) to
/// Using maximum value of OperationDesc::NodeIndex (30 bit) to
/// represent 'undefined'.
template<typename Type>
const uint32_t WaitFreeMPMCQueueNode<Type>::UndefinedIndex = 0x3fffffff;
......@@ -176,6 +176,7 @@ WaitFreeMPMCQueue(size_t capacity)
}
// Allocate sentinel node:
Node_t sentinelNode;
assert(sentinelNode.NextPoolIdx() == Node_t::UndefinedIndex);
int sentinelNodePoolIndex = nodePool.Allocate(sentinelNode);
if (sentinelNodePoolIndex < 0) {
EMBB_THROW(embb::base::NoMemoryException,
......@@ -297,22 +298,25 @@ TryDequeue(Type & retElement) {
}
Help();
HelpFinishDequeue();
// Accessor's operation Idx might have changed in between.
// Reload the operation description announced by this thread
curOp = OperationDesc(operationDescriptions[accessorId].Load());
// Operation announced by this thread must not be pending any more:
assert(!curOp.Pending);
// Check element
index_t nodeIdx = curOp.NodeIndex;
Node_t & node = nodePool[nodeIdx];
index_t nodeIdx = curOp.NodeIndex;
if (nodeIdx == Node_t::UndefinedIndex) {
// Allow dequeueing from empty queue, but
// return false:
retElement = Type();
return false;
}
index_t deqNodeIdx = node.NextPoolIdx();
retElement = nodePool[deqNodeIdx].Value();
// Mark node as non-pending by setting node index to UndefinedIndex:
Node_t & node = nodePool[nodeIdx];
assert(node.DequeueAID().Load() == accessorId);
// Return value of node next to node dequeued in this operation:
index_t nextNodeIdx = node.NextPoolIdx();
retElement = nodePool[nextNodeIdx].Value();
// Value is safe. Mark node as non-pending and available for reclamation by
// setting this operation's node index to UndefinedIndex:
OperationDesc noOp(
false, // non-pending
false, // any
......@@ -325,6 +329,7 @@ TryDequeue(Type & retElement) {
// should not happen.
assert(false);
}
// Release node
hp.EnqueuePointerForDeletion(nodeIdx);
return true;
}
......@@ -337,13 +342,12 @@ HelpEnqueue(unsigned int accessorId) {
index_t lastIdx = tailIdx.Load();
// Guard tail:
hp.GuardPointer(0, lastIdx);
// Last node still is tail:
Node_t & lastNode = nodePool[lastIdx];
index_t nextIdx = lastNode.NextPoolIdx();
if (lastIdx == tailIdx.Load()) {
Node_t & lastNode = nodePool[lastIdx];
index_t nextIdx = lastNode.NextPoolIdx();
// tail.next is null (no pending enqueue on tail):
if (lastNode.NextPoolIdx() == Node_t::UndefinedIndex) {
// Apply enqueue.
// Last node still is tail
if (nextIdx == Node_t::UndefinedIndex) {
// tail.next is null (no pending enqueue on tail), apply enqueue.
// No other accessor helped this enqueue operation yet:
if (IsPending(accessorId)) {
// Set next-pointer of last element in list
......@@ -369,25 +373,25 @@ HelpFinishEnqueue() {
index_t lastIdx = tailIdx.Load();
// Guard tail:
hp.GuardPointer(0, lastIdx);
// Load tail->next:
Node_t & lastNode = nodePool[lastIdx];
index_t nextIdx = lastNode.NextPoolIdx();
// This node is NEXT of tail, but not tail => unfinished ENQ
hp.GuardPointer(1, nextIdx);
index_t nextIdx = lastNode.NextPoolIdx();
Node_t & nextNode = nodePool[nextIdx];
// tail->next not undefined => unfinished ENQ
if (nextIdx != Node_t::UndefinedIndex) {
Node_t & nextNode = nodePool[nextIdx];
// Load accessor id from last (non-tail) element in list:
index_t helpAID = nextNode.EnqueueAID();
// Load operation for accessor that started the unfinished enqueue:
OperationDesc helpOp(operationDescriptions[helpAID].Load());
// tail index still points at last node:
// (last == tail && state[aid].node == next)
if (lastIdx == tailIdx.Load() &&
helpOp.NodeIndex == nextIdx) {
// Reload operation description here, do not use helpOp:
(OperationDesc(operationDescriptions[helpAID].Load()).NodeIndex ==
nextIdx)) {
OperationDesc newOp(
false, // set to nonpending
true, // enqueue
false, // set to nonpending
true, // enqueue
nextIdx // node index == helpOp.NodeIndex
);
index_t helpOpRaw = helpOp.Raw;
......@@ -408,12 +412,9 @@ HelpDequeue(index_t accessorId) {
index_t firstIdx = headIdx.Load();
// Guard head:
hp.GuardPointer(0, firstIdx);
if (firstIdx != headIdx.Load()) {
// Head pointer changed by concurrent enqueue
continue;
}
// Order matters for these assignments:
Node_t & first = nodePool[firstIdx];
index_t lastIdx = tailIdx.Load();
Node_t & first = nodePool[firstIdx];
index_t nextIdx = first.NextPoolIdx();
// Guard head->next:
hp.GuardPointer(1, nextIdx);
......@@ -421,12 +422,20 @@ HelpDequeue(index_t accessorId) {
// Head->next pointer changed by concurrent enqueue
continue;
}
if (firstIdx != headIdx.Load()) {
// Head pointer changed by concurrent enqueue
continue;
}
if (firstIdx == lastIdx) {
// Queue might be empty
if (nextIdx == Node_t::UndefinedIndex) {
// Queue is empty
OperationDesc curOp(operationDescriptions[accessorId].Load());
if (lastIdx == tailIdx.Load() && IsPending(accessorId)) {
// The CAS on the operation description is not ABA-prone as the
// pending state will only change from true to false and cannot
// change back to true unless the operation has been completed
// and a new operation has been announced by the same thread.
OperationDesc newOp(
false, // Set nonpending state
false,
......@@ -438,8 +447,8 @@ HelpDequeue(index_t accessorId) {
operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw);
}
} else {
// Head is advanced because of unfinished enqueue, so
// help other enqueue and retry:
// head == tail, but tail->next is not undefined, so head has been
// advanced because of unfinished enqueue; help other enqueue and retry:
HelpFinishEnqueue();
}
} else {
......@@ -458,13 +467,13 @@ HelpDequeue(index_t accessorId) {
firstIdx // Set node index
);
index_t curOpRaw = curOp.Raw;
// The CAS on the operation description is not ABA-prone as the
// node index to be set is guarded and cannot be used in a concurrent
// operation.
if (!operationDescriptions[accessorId].CompareAndSwap(
curOpRaw, newOp.Raw)) {
// This loop is wait-free as every accessor's operation
// will be pending for a limited time only, so continue
// and retry is okay.
// This CAS can only have failed because another
// thread completed this dequeue operation in the
// thread helped completing this dequeue operation in the
// meantime.
continue; // Retry
}
......@@ -476,17 +485,14 @@ HelpDequeue(index_t accessorId) {
// In this case, HelpFinishDequeue will complete the dequeue.
index_t curDeqAID = Node_t::UndefinedIndex;
// Register this accessor as dequeuer of this node.
// If this CAS fails, another accessor is already (perhaps
// helping) dequeueing this node winning this CAS. In this
// case this dequeue operation is just ignored.
if (!first.DequeueAID().CompareAndSwap(curDeqAID, accessorId)) {
continue;
// Lost CAS to helping accessor. In this case, this operation
// is not pending anymore at this point.
}
first.DequeueAID().CompareAndSwap(curDeqAID, accessorId);
// If this CAS failed, the element has been dequeued by another thread
// (an operation can only fail if another operation succeeded).
// This operation is still pending at this point, and will be set to
// non-pending state in HelpFinishDequeue:
HelpFinishDequeue();
}
} // while pending
} // while pending
}
template<
......@@ -502,8 +508,13 @@ HelpFinishDequeue() {
hp.GuardPointer(1, nextIdx);
index_t accessorId = first.DequeueAID().Load();
if (accessorId != Node_t::UndefinedIndex) {
// head.DeqeueueAID is set to the accessor id that won the last CAS
// in HelpDequeue
OperationDesc curOp(operationDescriptions[accessorId].Load());
if (firstIdx == headIdx.Load() &&
// This check is missing in the original publication but required
// to validate head->next:
nextIdx == first.NextPoolIdx() &&
nextIdx != Node_t::UndefinedIndex) {
// Set state of helped operation to NONPENDING:
OperationDesc newOp(
......@@ -514,7 +525,12 @@ HelpFinishDequeue() {
// CAS without check as possibly another accessor
// already helped this dequeue operation.
index_t curOpRaw = curOp.Raw;
// The CAS on the operation description is not ABA-prone as the
// pending state will only change from true to false and cannot
// change back to true unless the operation has been completed
// and a new operation has been announced by the same thread.
operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw);
// Update head:
headIdx.CompareAndSwap(firstIdx, nextIdx);
}
}
......@@ -554,14 +570,6 @@ template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
void WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
DeleteNodeCallback(index_t releasedNodeIndex) {
if (sentinelNodeIndex == releasedNodeIndex) {
// Prevent sentinel node from reuse, it will be returned to pool
// in destructor only.
return;
}
if (NodeIsPending(releasedNodeIndex)) {
return;
}
nodePool.Free(static_cast<int>(releasedNodeIndex));
}
......@@ -580,23 +588,6 @@ IsPending(unsigned int accessorId) {
return opDesc.Pending;
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
bool WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
NodeIsPending(index_t nodeId) {
// Returns true if given node index is involved in any announced operation,
// including non-pending operations, as dequeue-operations are set to
// non-pending before returning the node's value. Dequeue operations are
// finalized by setting their node index to UndefinedIndex.
for (unsigned int accessorId = 0; accessorId < num_states; ++accessorId) {
OperationDesc desc(operationDescriptions[accessorId].Load());
if (desc.NodeIndex == nodeId) {
return true;
}
}
return false;
}
} // namespace containers
} // namespace embb
......
......@@ -176,7 +176,7 @@ class WaitFreeMPMCQueue {
* OperationDesc::NodeIndex (15 bit) minus one element
* required for sentinel node.
*/
static const index_t QUEUE_SIZE_MAX = static_cast<index_t>(32767 - 1);
static const index_t QUEUE_SIZE_MAX = static_cast<index_t>(0x3FFFFFFF - 1);
/**
* Number of guards per thread
......@@ -186,7 +186,7 @@ class WaitFreeMPMCQueue {
/**
* Null-pointer for hazard pointers
*/
static const index_t UndefinedGuard = 0;
static const index_t UndefinedGuard = 0x3fffffff;
/**
* Helper class for operation descriptions.
......
......@@ -62,20 +62,18 @@ PT_MAIN("Data Structures C++") {
unsigned int max_threads = static_cast<unsigned int>(
2 * partest::TestSuite::GetDefaultNumThreads());
embb_thread_set_max_count(max_threads);
#if 0
PT_RUN(PoolTest< WaitFreeArrayValuePool<int COMMA -1> >);
PT_RUN(PoolTest< LockFreeTreeValuePool<int COMMA -1> >);
PT_RUN(HazardPointerTest);
PT_RUN(QueueTest< WaitFreeSPSCQueue< ::std::pair<size_t COMMA int> > >);
PT_RUN(QueueTest< LockFreeMPMCQueue< ::std::pair<size_t COMMA int> >
COMMA true COMMA true >);
#endif
PT_RUN(QueueTest< WaitFreeMPMCQueue< ::std::pair<size_t COMMA int> >
COMMA true COMMA true >);
#if 0
PT_RUN(StackTest< LockFreeStack<int> >);
PT_RUN(ObjectPoolTest< LockFreeTreeValuePool<bool COMMA false > >);
PT_RUN(ObjectPoolTest< WaitFreeArrayValuePool<bool COMMA false> >);
#endif
PT_EXPECT_EQ(embb_get_bytes_allocated(), static_cast<size_t>(0));
}
......@@ -125,7 +125,7 @@ QueueTestOrderMPMC_Post() {
for (size_t t = 0;
t < static_cast<size_t>(n_producers * n_producer_elements / 8);
++t) {
PT_ASSERT_EQ_MSG(total_tally[t], 0xff,
PT_ASSERT_EQ_MSG(static_cast<int>(total_tally[t]), 0xff,
"missing dequeued elements");
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment