diff --git a/containers_cpp/include/embb/containers/internal/wait_free_mpmc_queue-inl.h b/containers_cpp/include/embb/containers/internal/wait_free_mpmc_queue-inl.h index dfae2c5..e8707d1 100644 --- a/containers_cpp/include/embb/containers/internal/wait_free_mpmc_queue-inl.h +++ b/containers_cpp/include/embb/containers/internal/wait_free_mpmc_queue-inl.h @@ -104,7 +104,7 @@ embb::base::Atomic & WaitFreeMPMCQueueNode::DequeueAID() { return deq_aid; } -/// Using maximum value of OperationDesc::NodeIndex (15 bit) to +/// Using maximum value of OperationDesc::NodeIndex (30 bit) to /// represent 'undefined'. template const uint32_t WaitFreeMPMCQueueNode::UndefinedIndex = 0x3fffffff; @@ -176,6 +176,7 @@ WaitFreeMPMCQueue(size_t capacity) } // Allocate sentinel node: Node_t sentinelNode; + assert(sentinelNode.NextPoolIdx() == Node_t::UndefinedIndex); int sentinelNodePoolIndex = nodePool.Allocate(sentinelNode); if (sentinelNodePoolIndex < 0) { EMBB_THROW(embb::base::NoMemoryException, @@ -297,22 +298,25 @@ TryDequeue(Type & retElement) { } Help(); HelpFinishDequeue(); - // Accessor's operation Idx might have changed in between. + // Reload the operation description announced by this thread curOp = OperationDesc(operationDescriptions[accessorId].Load()); // Operation announced by this thread must not be pending any more: assert(!curOp.Pending); // Check element - index_t nodeIdx = curOp.NodeIndex; - Node_t & node = nodePool[nodeIdx]; + index_t nodeIdx = curOp.NodeIndex; if (nodeIdx == Node_t::UndefinedIndex) { // Allow dequeueing from empty queue, but // return false: retElement = Type(); return false; } - index_t deqNodeIdx = node.NextPoolIdx(); - retElement = nodePool[deqNodeIdx].Value(); - // Mark node as non-pending by setting node index to UndefinedIndex: + Node_t & node = nodePool[nodeIdx]; + assert(node.DequeueAID().Load() == accessorId); + // Return value of node next to node dequeued in this operation: + index_t nextNodeIdx = node.NextPoolIdx(); + retElement = nodePool[nextNodeIdx].Value(); + // Value is safe. Mark node as non-pending and available for reclamation by + // setting this operation's node index to UndefinedIndex: OperationDesc noOp( false, // non-pending false, // any @@ -325,6 +329,7 @@ TryDequeue(Type & retElement) { // should not happen. assert(false); } + // Release node hp.EnqueuePointerForDeletion(nodeIdx); return true; } @@ -337,13 +342,12 @@ HelpEnqueue(unsigned int accessorId) { index_t lastIdx = tailIdx.Load(); // Guard tail: hp.GuardPointer(0, lastIdx); - // Last node still is tail: + Node_t & lastNode = nodePool[lastIdx]; + index_t nextIdx = lastNode.NextPoolIdx(); if (lastIdx == tailIdx.Load()) { - Node_t & lastNode = nodePool[lastIdx]; - index_t nextIdx = lastNode.NextPoolIdx(); - // tail.next is null (no pending enqueue on tail): - if (lastNode.NextPoolIdx() == Node_t::UndefinedIndex) { - // Apply enqueue. + // Last node still is tail + if (nextIdx == Node_t::UndefinedIndex) { + // tail.next is null (no pending enqueue on tail), apply enqueue. // No other accessor helped this enqueue operation yet: if (IsPending(accessorId)) { // Set next-pointer of last element in list @@ -369,25 +373,25 @@ HelpFinishEnqueue() { index_t lastIdx = tailIdx.Load(); // Guard tail: hp.GuardPointer(0, lastIdx); - + // Load tail->next: Node_t & lastNode = nodePool[lastIdx]; - index_t nextIdx = lastNode.NextPoolIdx(); - // This node is NEXT of tail, but not tail => unfinished ENQ - hp.GuardPointer(1, nextIdx); + index_t nextIdx = lastNode.NextPoolIdx(); + Node_t & nextNode = nodePool[nextIdx]; + // tail->next not undefined => unfinished ENQ if (nextIdx != Node_t::UndefinedIndex) { - Node_t & nextNode = nodePool[nextIdx]; // Load accessor id from last (non-tail) element in list: index_t helpAID = nextNode.EnqueueAID(); // Load operation for accessor that started the unfinished enqueue: OperationDesc helpOp(operationDescriptions[helpAID].Load()); - // tail index still points at last node: // (last == tail && state[aid].node == next) if (lastIdx == tailIdx.Load() && - helpOp.NodeIndex == nextIdx) { + // Reload operation description here, do not use helpOp: + (OperationDesc(operationDescriptions[helpAID].Load()).NodeIndex == + nextIdx)) { OperationDesc newOp( - false, // set to nonpending - true, // enqueue + false, // set to nonpending + true, // enqueue nextIdx // node index == helpOp.NodeIndex ); index_t helpOpRaw = helpOp.Raw; @@ -408,12 +412,9 @@ HelpDequeue(index_t accessorId) { index_t firstIdx = headIdx.Load(); // Guard head: hp.GuardPointer(0, firstIdx); - if (firstIdx != headIdx.Load()) { - // Head pointer changed by concurrent enqueue - continue; - } + // Order matters for these assignments: + Node_t & first = nodePool[firstIdx]; index_t lastIdx = tailIdx.Load(); - Node_t & first = nodePool[firstIdx]; index_t nextIdx = first.NextPoolIdx(); // Guard head->next: hp.GuardPointer(1, nextIdx); @@ -421,12 +422,20 @@ HelpDequeue(index_t accessorId) { // Head->next pointer changed by concurrent enqueue continue; } + if (firstIdx != headIdx.Load()) { + // Head pointer changed by concurrent enqueue + continue; + } if (firstIdx == lastIdx) { // Queue might be empty if (nextIdx == Node_t::UndefinedIndex) { // Queue is empty OperationDesc curOp(operationDescriptions[accessorId].Load()); if (lastIdx == tailIdx.Load() && IsPending(accessorId)) { + // The CAS on the operation description is not ABA-prone as the + // pending state will only change from true to false and cannot + // change back to true unless the operation has been completed + // and a new operation has been announced by the same thread. OperationDesc newOp( false, // Set nonpending state false, @@ -438,8 +447,8 @@ HelpDequeue(index_t accessorId) { operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw); } } else { - // Head is advanced because of unfinished enqueue, so - // help other enqueue and retry: + // head == tail, but tail->next is not undefined, so head has been + // advanced because of unfinished enqueue; help other enqueue and retry: HelpFinishEnqueue(); } } else { @@ -458,13 +467,13 @@ HelpDequeue(index_t accessorId) { firstIdx // Set node index ); index_t curOpRaw = curOp.Raw; + // The CAS on the operation description is not ABA-prone as the + // node index to be set is guarded and cannot be used in a concurrent + // operation. if (!operationDescriptions[accessorId].CompareAndSwap( curOpRaw, newOp.Raw)) { - // This loop is wait-free as every accessor's operation - // will be pending for a limited time only, so continue - // and retry is okay. // This CAS can only have failed because another - // thread completed this dequeue operation in the + // thread helped completing this dequeue operation in the // meantime. continue; // Retry } @@ -476,17 +485,14 @@ HelpDequeue(index_t accessorId) { // In this case, HelpFinishDequeue will complete the dequeue. index_t curDeqAID = Node_t::UndefinedIndex; // Register this accessor as dequeuer of this node. - // If this CAS fails, another accessor is already (perhaps - // helping) dequeueing this node winning this CAS. In this - // case this dequeue operation is just ignored. - if (!first.DequeueAID().CompareAndSwap(curDeqAID, accessorId)) { - continue; - // Lost CAS to helping accessor. In this case, this operation - // is not pending anymore at this point. - } + first.DequeueAID().CompareAndSwap(curDeqAID, accessorId); + // If this CAS failed, the element has been dequeued by another thread + // (an operation can only fail if another operation succeeded). + // This operation is still pending at this point, and will be set to + // non-pending state in HelpFinishDequeue: HelpFinishDequeue(); } - } // while pending + } // while pending } template< @@ -502,8 +508,13 @@ HelpFinishDequeue() { hp.GuardPointer(1, nextIdx); index_t accessorId = first.DequeueAID().Load(); if (accessorId != Node_t::UndefinedIndex) { + // head.DeqeueueAID is set to the accessor id that won the last CAS + // in HelpDequeue OperationDesc curOp(operationDescriptions[accessorId].Load()); if (firstIdx == headIdx.Load() && + // This check is missing in the original publication but required + // to validate head->next: + nextIdx == first.NextPoolIdx() && nextIdx != Node_t::UndefinedIndex) { // Set state of helped operation to NONPENDING: OperationDesc newOp( @@ -514,7 +525,12 @@ HelpFinishDequeue() { // CAS without check as possibly another accessor // already helped this dequeue operation. index_t curOpRaw = curOp.Raw; + // The CAS on the operation description is not ABA-prone as the + // pending state will only change from true to false and cannot + // change back to true unless the operation has been completed + // and a new operation has been announced by the same thread. operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw); + // Update head: headIdx.CompareAndSwap(firstIdx, nextIdx); } } @@ -554,14 +570,6 @@ template< typename Type, class NodeAllocator, class OpAllocator, class ValuePool > void WaitFreeMPMCQueue:: DeleteNodeCallback(index_t releasedNodeIndex) { - if (sentinelNodeIndex == releasedNodeIndex) { - // Prevent sentinel node from reuse, it will be returned to pool - // in destructor only. - return; - } - if (NodeIsPending(releasedNodeIndex)) { - return; - } nodePool.Free(static_cast(releasedNodeIndex)); } @@ -580,23 +588,6 @@ IsPending(unsigned int accessorId) { return opDesc.Pending; } -template< - typename Type, class NodeAllocator, class OpAllocator, class ValuePool > -bool WaitFreeMPMCQueue:: -NodeIsPending(index_t nodeId) { - // Returns true if given node index is involved in any announced operation, - // including non-pending operations, as dequeue-operations are set to - // non-pending before returning the node's value. Dequeue operations are - // finalized by setting their node index to UndefinedIndex. - for (unsigned int accessorId = 0; accessorId < num_states; ++accessorId) { - OperationDesc desc(operationDescriptions[accessorId].Load()); - if (desc.NodeIndex == nodeId) { - return true; - } - } - return false; -} - } // namespace containers } // namespace embb diff --git a/containers_cpp/include/embb/containers/wait_free_mpmc_queue.h b/containers_cpp/include/embb/containers/wait_free_mpmc_queue.h index d5f73cc..b789d84 100644 --- a/containers_cpp/include/embb/containers/wait_free_mpmc_queue.h +++ b/containers_cpp/include/embb/containers/wait_free_mpmc_queue.h @@ -176,7 +176,7 @@ class WaitFreeMPMCQueue { * OperationDesc::NodeIndex (15 bit) minus one element * required for sentinel node. */ - static const index_t QUEUE_SIZE_MAX = static_cast(32767 - 1); + static const index_t QUEUE_SIZE_MAX = static_cast(0x3FFFFFFF - 1); /** * Number of guards per thread @@ -186,7 +186,7 @@ class WaitFreeMPMCQueue { /** * Null-pointer for hazard pointers */ - static const index_t UndefinedGuard = 0; + static const index_t UndefinedGuard = 0x3fffffff; /** * Helper class for operation descriptions. diff --git a/containers_cpp/test/main.cc b/containers_cpp/test/main.cc index a0fd1b4..8bcad85 100644 --- a/containers_cpp/test/main.cc +++ b/containers_cpp/test/main.cc @@ -62,20 +62,18 @@ PT_MAIN("Data Structures C++") { unsigned int max_threads = static_cast( 2 * partest::TestSuite::GetDefaultNumThreads()); embb_thread_set_max_count(max_threads); -#if 0 + PT_RUN(PoolTest< WaitFreeArrayValuePool >); PT_RUN(PoolTest< LockFreeTreeValuePool >); PT_RUN(HazardPointerTest); PT_RUN(QueueTest< WaitFreeSPSCQueue< ::std::pair > >); PT_RUN(QueueTest< LockFreeMPMCQueue< ::std::pair > COMMA true COMMA true >); -#endif PT_RUN(QueueTest< WaitFreeMPMCQueue< ::std::pair > COMMA true COMMA true >); -#if 0 PT_RUN(StackTest< LockFreeStack >); PT_RUN(ObjectPoolTest< LockFreeTreeValuePool >); PT_RUN(ObjectPoolTest< WaitFreeArrayValuePool >); -#endif + PT_EXPECT_EQ(embb_get_bytes_allocated(), static_cast(0)); } diff --git a/containers_cpp/test/queue_test-inl.h b/containers_cpp/test/queue_test-inl.h index 3e63930..c0eadbd 100644 --- a/containers_cpp/test/queue_test-inl.h +++ b/containers_cpp/test/queue_test-inl.h @@ -125,7 +125,7 @@ QueueTestOrderMPMC_Post() { for (size_t t = 0; t < static_cast(n_producers * n_producer_elements / 8); ++t) { - PT_ASSERT_EQ_MSG(total_tally[t], 0xff, + PT_ASSERT_EQ_MSG(static_cast(total_tally[t]), 0xff, "missing dequeued elements"); } }