From 3fe8c4753e3f71851b3d93c3a93aacad97f5a41f Mon Sep 17 00:00:00 2001 From: Tobias Fuchs Date: Sun, 12 Apr 2015 18:25:22 +0200 Subject: [PATCH] containers_cpp: added WaitFreeMPMCQueue --- containers_cpp/include/embb/containers/internal/indexed_object_pool-inl.h | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ containers_cpp/include/embb/containers/internal/indexed_object_pool.h | 130 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ containers_cpp/include/embb/containers/internal/object_pool-inl.h | 55 ++++--------------------------------------------------- containers_cpp/include/embb/containers/internal/returning_true_iterator.h | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ containers_cpp/include/embb/containers/internal/wait_free_mpmc_queue-inl.h | 603 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ containers_cpp/include/embb/containers/lock_free_mpmc_queue.h | 4 +--- containers_cpp/include/embb/containers/object_pool.h | 26 -------------------------- containers_cpp/include/embb/containers/wait_free_mpmc_queue.h | 425 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ containers_cpp/include/embb/containers/wait_free_spsc_queue.h | 1 + containers_cpp/test/main.cc | 14 ++++++++++---- 10 files changed, 1373 insertions(+), 84 deletions(-) create mode 100644 containers_cpp/include/embb/containers/internal/indexed_object_pool-inl.h create mode 100644 containers_cpp/include/embb/containers/internal/indexed_object_pool.h create mode 100644 containers_cpp/include/embb/containers/internal/returning_true_iterator.h create mode 100644 containers_cpp/include/embb/containers/internal/wait_free_mpmc_queue-inl.h create mode 100644 containers_cpp/include/embb/containers/wait_free_mpmc_queue.h diff --git a/containers_cpp/include/embb/containers/internal/indexed_object_pool-inl.h b/containers_cpp/include/embb/containers/internal/indexed_object_pool-inl.h new file mode 100644 index 0000000..9399129 --- /dev/null +++ b/containers_cpp/include/embb/containers/internal/indexed_object_pool-inl.h @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2014, Siemens AG. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_INL_H_ +#define EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_INL_H_ + +#include + +namespace embb { +namespace containers { +namespace internal { + +template +template +IndexedObjectPool:: +IndexedObjectPool(RAI first, RAI last) : + size_(static_cast(std::distance(first, last))), + indexPool(internal::ReturningTrueIterator(0), + internal::ReturningTrueIterator(size_)) { + // use the allocator to allocate array of size dist + elements = allocator.allocate(size_); + // fill element pool with elements from the iteration + int i = 0; + for (RAI curIter(first); curIter != last; ++curIter, ++i) { + // assign element from iteration + elements[i] = *curIter; + } +} + +template +IndexedObjectPool:: +IndexedObjectPool(size_t size, const T & defaultInstance) : + size_(size), + indexPool(internal::ReturningTrueIterator(0), + internal::ReturningTrueIterator(size_)) { + // use the allocator to allocate array of size dist + elements = allocator.allocate(size); + // fill element pool with elements from the iteration + for (size_t i = 0; i < size_; ++i) { + // initialize element from default constructor and + // assignment operator + elements[i] = defaultInstance; + } +} + +template +IndexedObjectPool:: +~IndexedObjectPool() { + allocator.deallocate(elements, size_); +} + +template +int IndexedObjectPool:: +Allocate(T & element) { + // Reserve a pool index: + bool reservedFlag; + int index = indexPool.Allocate(reservedFlag); + // Assign element to be allocated at pool index. + // Index returned from index pool is -1 if no index + // is available. + if (index >= 0) { + element = elements[index]; + } + return index; +} + +template +void IndexedObjectPool:: +Free(int elementIndex) { + // Call the referenced element's destructor: + elements[elementIndex].~T(); + // Release index of the element for reuse: + indexPool.Free(true, elementIndex); +} + +template +T & IndexedObjectPool:: +operator[](size_t elementIndex) { + return elements[elementIndex]; +} + +} // namespace internal +} // namespace containers +} // namespace embb + +#endif // EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_INL_H_ diff --git a/containers_cpp/include/embb/containers/internal/indexed_object_pool.h b/containers_cpp/include/embb/containers/internal/indexed_object_pool.h new file mode 100644 index 0000000..0a0f57c --- /dev/null +++ b/containers_cpp/include/embb/containers/internal/indexed_object_pool.h @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2014, Siemens AG. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_H_ +#define EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_H_ + +#include +#include + +#include +#include + +namespace embb { +namespace containers { +namespace internal { + +template< + typename T, + class IndexPool = LockFreeTreeValuePool, + class Allocator = embb::base::Allocator +> +class IndexedObjectPool { + private: + const size_t size_; + T * elements; + Allocator allocator; + IndexPool indexPool; + IndexedObjectPool(); + // Prevent copy-construction + IndexedObjectPool(const IndexedObjectPool&); + // Prevent assignment + IndexedObjectPool& operator=(const IndexedObjectPool&); + + public: + /** + * \see value_pool_concept + * + * \notthreadsafe + * + * \memory dynamically allocates + * \c n * (sizeof(T) + sizeof(embb::base::Atomic)) + * bytes, where \c n is the number of elements in the pool. + */ + template + IndexedObjectPool( + RAI first, + /**< [IN] first iterator to elements the pool is filled with */ + RAI last + /**< [IN] last iterator to elements the pool is filled with */ + ); + + /** + * \see value_pool_concept + * + * \notthreadsafe + * + * \memory dynamically allocates + * \c n * (sizeof(T) + sizeof(embb::base::Atomic)) + * bytes, where \c n is the number of elements in the pool. + */ + IndexedObjectPool( + size_t size, + /**< [IN] Number of elements the pool is filled with */ + const T & defaultInstance + /**< [IN] Default instance to initialize pool elements with */ + ); + + /** + * Destructor, deallocating memory + */ + ~IndexedObjectPool(); + + /** + * Request element and index from pool. + * + * \return index of element + * + * \see object_pool_concept + * + */ + int Allocate(T & element); + + /** + * Return element and index to the pool. + * + * \see value_pool_concept + * + */ + void Free(int elementIndex); + + /** + * Return element from the pool at given index. + * + * \see object_pool_concept + * + */ + T & operator[](size_t elementIndex); +}; + +} // namespace internal +} // namespace containers +} // namespace embb + +#include + +#endif // EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_H_ + diff --git a/containers_cpp/include/embb/containers/internal/object_pool-inl.h b/containers_cpp/include/embb/containers/internal/object_pool-inl.h index 61711d5..9a8032e 100644 --- a/containers_cpp/include/embb/containers/internal/object_pool-inl.h +++ b/containers_cpp/include/embb/containers/internal/object_pool-inl.h @@ -27,58 +27,10 @@ #ifndef EMBB_CONTAINERS_INTERNAL_OBJECT_POOL_INL_H_ #define EMBB_CONTAINERS_INTERNAL_OBJECT_POOL_INL_H_ +#include + namespace embb { namespace containers { -template -ObjectPool:: -ReturningTrueIterator::ReturningTrueIterator(size_t count_value) : -count_value(count_value), - ret_value(true) -{} - -template -typename ObjectPool:: -ReturningTrueIterator::self_type -ObjectPool:: -ReturningTrueIterator::operator++() { - self_type i = *this; - count_value++; - return i; -} - -template -typename ObjectPool:: -ReturningTrueIterator::self_type ObjectPool:: -ReturningTrueIterator::operator++(int) { - count_value++; - return *this; -} - -template -typename ObjectPool:: -ReturningTrueIterator::reference ObjectPool:: -ReturningTrueIterator::operator*() { - return ret_value; -} - -template -typename ObjectPool:: -ReturningTrueIterator::pointer ObjectPool:: -ReturningTrueIterator::operator->() { - return &ret_value; -} - -template -bool ObjectPool:: -ReturningTrueIterator::operator==(const self_type& rhs) { - return count_value == rhs.count_value; -} - -template -bool ObjectPool:: -ReturningTrueIterator::operator!=(const self_type& rhs) { - return count_value != rhs.count_value; -} template bool ObjectPool:: @@ -118,7 +70,8 @@ size_t ObjectPool::GetCapacity() { template ObjectPool::ObjectPool(size_t capacity) : capacity(capacity), - p(ReturningTrueIterator(0), ReturningTrueIterator(capacity)) { + p(internal::ReturningTrueIterator(0), + internal::ReturningTrueIterator(capacity)) { // Allocate the objects (without construction, just get the memory) objects = objectAllocator.allocate(capacity); } diff --git a/containers_cpp/include/embb/containers/internal/returning_true_iterator.h b/containers_cpp/include/embb/containers/internal/returning_true_iterator.h new file mode 100644 index 0000000..2108970 --- /dev/null +++ b/containers_cpp/include/embb/containers/internal/returning_true_iterator.h @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2014, Siemens AG. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef EMBB_CONTAINERS_INTERNAL_RETURNING_TRUE_ITERATOR_H_ +#define EMBB_CONTAINERS_INTERNAL_RETURNING_TRUE_ITERATOR_H_ + +#include + +namespace embb { +namespace containers { +namespace internal { + +/** + * Helper providing a virtual iterator that just returns true in each + * iteration step. Used for filling the value pool. Implements the normal + * C++ iterator concept. Not further documented here. + */ +class ReturningTrueIterator { + public: + typedef ReturningTrueIterator self_type; + typedef bool value_type; + typedef bool & reference; + typedef bool * pointer; + typedef ::std::forward_iterator_tag iterator_category; + typedef int difference_type; + + public: + inline explicit ReturningTrueIterator(size_t count_value) : + count_value(count_value), + ret_value(true) + { } + inline self_type operator++() { + self_type i = *this; + count_value++; + return i; + } + inline self_type operator++(int) { + count_value++; + return *this; + } + inline reference operator*() { + return ret_value; + } + inline pointer operator->() { + return &ret_value; + } + inline bool operator==(const self_type & rhs) { + return count_value == rhs.count_value; + } + inline bool operator!=(const self_type & rhs) { + return count_value != rhs.count_value; + } + inline difference_type operator-(const self_type & rhs) { + return static_cast(count_value) - + static_cast(rhs.count_value); + } + + private: + size_t count_value; + bool ret_value; +}; +} // namespace internal +} // namespace containers +} // namespace embb + +#endif // EMBB_CONTAINERS_INTERNAL_RETURNING_TRUE_ITERATOR_H_ + diff --git a/containers_cpp/include/embb/containers/internal/wait_free_mpmc_queue-inl.h b/containers_cpp/include/embb/containers/internal/wait_free_mpmc_queue-inl.h new file mode 100644 index 0000000..dfae2c5 --- /dev/null +++ b/containers_cpp/include/embb/containers/internal/wait_free_mpmc_queue-inl.h @@ -0,0 +1,603 @@ +/* + * Copyright (c) 2014, Siemens AG. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef EMBB_CONTAINERS_INTERNAL_WAIT_FREE_MPMC_QUEUE_INL_H_ +#define EMBB_CONTAINERS_INTERNAL_WAIT_FREE_MPMC_QUEUE_INL_H_ + +#include +#include +#include + +namespace embb { +namespace containers { + +namespace internal { + +template +inline WaitFreeMPMCQueueNode::WaitFreeMPMCQueueNode() +: enq_aid(UndefinedIndex) { + next_idx.Store(UndefinedIndex); + deq_aid.Store(UndefinedIndex); +} + +template +inline WaitFreeMPMCQueueNode::WaitFreeMPMCQueueNode( + const self_t & other) +: value(other.value), + enq_aid(other.enq_aid) { + next_idx.Store(other.next_idx.Load()); + deq_aid.Store(other.deq_aid.Load()); +} + +template +inline WaitFreeMPMCQueueNode::WaitFreeMPMCQueueNode( + Type const val, uint32_t enqAid) +: value(val), enq_aid(enqAid) { + next_idx.Store(UndefinedIndex); + deq_aid.Store(UndefinedIndex); +} + +template +inline Type WaitFreeMPMCQueueNode::Value() const { + return value; +} + +template +inline uint32_t WaitFreeMPMCQueueNode::NextPoolIdx() const { + return next_idx.Load(); +} + +template +inline WaitFreeMPMCQueueNode & +WaitFreeMPMCQueueNode::operator=( + const self_t & other) { + if (this != &other) { + next_idx.Store(other.next_idx.Load()); + deq_aid.Store(other.deq_aid.Load()); + value = other.value; + enq_aid = other.enq_aid; + } + return *this; +} + +template +bool WaitFreeMPMCQueueNode::CASNext( + uint32_t expNextIdx, uint32_t newNextIdx) { + return next_idx.CompareAndSwap(expNextIdx, newNextIdx); +} + +template +bool WaitFreeMPMCQueueNode::Next_IsNull() const { + return next_idx.Load() == UndefinedIndex; +} + +template +inline uint32_t WaitFreeMPMCQueueNode::EnqueueAID() const { + return enq_aid; +} + +template +embb::base::Atomic & WaitFreeMPMCQueueNode::DequeueAID() { + return deq_aid; +} + +/// Using maximum value of OperationDesc::NodeIndex (15 bit) to +/// represent 'undefined'. +template +const uint32_t WaitFreeMPMCQueueNode::UndefinedIndex = 0x3fffffff; + +} // namespace internal + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +WaitFreeMPMCQueue::OperationDesc:: +OperationDesc( + bool pending, + bool enqueue, + index_t nodeIndex) : + Pending(pending), + Enqueue(enqueue), + NodeIndex(nodeIndex), + Raw(0) { + index_t nodeIndexMask = NodeIndex & NODE_INDEX_MASK; + if (Pending) { + Raw |= PENDING_FLAG_MASK; + } + if (Enqueue) { + Raw |= ENQUEUE_FLAG_MASK; + } + Raw |= nodeIndexMask; +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +WaitFreeMPMCQueue::OperationDesc:: +OperationDesc(index_t raw) : Raw(raw) { + Pending = (raw & PENDING_FLAG_MASK) ? true : false; + Enqueue = (raw & ENQUEUE_FLAG_MASK) ? true : false; + NodeIndex = (raw & NODE_INDEX_MASK); +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +WaitFreeMPMCQueue:: +WaitFreeMPMCQueue(size_t capacity) +: max_size_(capacity), +// Disable "this is used in base member initializer" warning. +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable:4355) +#endif + delete_pointer_callback(*this, &self_t::DeleteNodeCallback), +#ifdef _MSC_VER +#pragma warning(pop) +#endif + hp(delete_pointer_callback, UndefinedGuard, 2), + // Using int for numThreads so compiler warning is + // raised when size and numThreads are switched by + // mistake. + num_states(embb::base::Thread::GetThreadsMaxCount()), + // Node pool size, with respect to the maximum number of + // retired nodes not eligible for reuse due to hazard pointers: + node_pool_size( + // numThreads caused trouble here + (hp.GetRetiredListMaxSize() * + embb::base::Thread::GetThreadsMaxCount()) + + max_size_ + 1), + nodePool(node_pool_size, nullNode) { + // Assert width of binary representation of operation description + assert(sizeof(index_t) == 4); + if (max_size_ > QUEUE_SIZE_MAX) { + EMBB_THROW(embb::base::NoMemoryException, + "Maximum size of queue exceeded"); + } + // Allocate sentinel node: + Node_t sentinelNode; + int sentinelNodePoolIndex = nodePool.Allocate(sentinelNode); + if (sentinelNodePoolIndex < 0) { + EMBB_THROW(embb::base::NoMemoryException, + "Allocation of sentinel node failed"); + } + // No need to guard sentinel node, it is prevented from reuse + // in the hazard pointers' delete callback (see DeleteNodeCallback). + sentinelNodeIndex = static_cast(sentinelNodePoolIndex); + headIdx.Store(sentinelNodeIndex); + tailIdx.Store(sentinelNodeIndex); + // State of the queue is one operation description per queue accessor. + // Initialize clear state: Null-operarion for every accessor. + operationDescriptions = operationDescriptionAllocator.allocate(num_states); + for (size_t accessorId = 0; accessorId < num_states; ++accessorId) { + OperationDesc op( + false, // nonpending + true, // enqueue, should not matter + Node_t::UndefinedIndex // node index + ); + // As all operation descriptions are the same, + // we do not have to map accessor ids to operation + // pool indices. Every accessor will randomly grab + // an operation pool element and stick to it, as + // a threads accessor id will not change. + operationDescriptions[accessorId].Store(op.Raw); + } +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +WaitFreeMPMCQueue:: +~WaitFreeMPMCQueue() { + // Dequeue until queue is empty: + Type val; + // Delete internally managed memory regions: + operationDescriptionAllocator.deallocate( + operationDescriptions, + num_states); +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +inline bool WaitFreeMPMCQueue:: +LoadAccessorThreadIndex(index_t & retIndexValue) { + unsigned int tmpIndexValue; // For conversion size32_t <-> unsigned int + if (embb_internal_thread_index(&tmpIndexValue) == EMBB_SUCCESS) { + // Fail if thread index is not in range of number of accessors: + if (tmpIndexValue < num_states) { + retIndexValue = tmpIndexValue; + return true; + } + return false; + } + retIndexValue = Node_t::UndefinedIndex; + return false; +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +inline size_t WaitFreeMPMCQueue:: +RetiredListMaxSize(size_t nThreads) { + return static_cast( + 1.25 * + static_cast(nThreads) * static_cast(num_guards)) + 1; +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +bool WaitFreeMPMCQueue:: +TryEnqueue(Type const & element) { + index_t accessorId = Node_t::UndefinedIndex; + if (!LoadAccessorThreadIndex(accessorId)) { + EMBB_THROW(embb::base::ErrorException, + "Invalid thread ID."); + } + // Register new node in pool: + Node_t poolNode; + int nodeIndex = nodePool.Allocate(poolNode); + if (nodeIndex < 0) { + return false; // Queue is at capacity + } + // Initialize node in pool: + Node_t newNode(element, accessorId); + nodePool[static_cast(nodeIndex)] = newNode; + OperationDesc enqOp( + true, // pending + true, // enqueue + static_cast(nodeIndex) + ); + operationDescriptions[accessorId].Store(enqOp.Raw); + Help(); + HelpFinishEnqueue(); + return true; +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +bool WaitFreeMPMCQueue:: +TryDequeue(Type & retElement) { + index_t accessorId = static_cast(-1); + if (!LoadAccessorThreadIndex(accessorId)) { + EMBB_THROW(embb::base::ErrorException, + "Invalid thread ID. Verify embb::base::Thread::thread_max_count."); + } + OperationDesc curOp(operationDescriptions[accessorId].Load()); + // Assert that current operation of this accessor is completed: + assert(!curOp.Pending); + // Register new operation description for CAS: + OperationDesc newOp( + true, // pending + false, // dequeue + Node_t::UndefinedIndex // node index + ); + index_t curOpRaw = curOp.Raw; + if (!operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw)) { + // The threads own operation has changed, + // should not happen. + assert(false); + } + Help(); + HelpFinishDequeue(); + // Accessor's operation Idx might have changed in between. + curOp = OperationDesc(operationDescriptions[accessorId].Load()); + // Operation announced by this thread must not be pending any more: + assert(!curOp.Pending); + // Check element + index_t nodeIdx = curOp.NodeIndex; + Node_t & node = nodePool[nodeIdx]; + if (nodeIdx == Node_t::UndefinedIndex) { + // Allow dequeueing from empty queue, but + // return false: + retElement = Type(); + return false; + } + index_t deqNodeIdx = node.NextPoolIdx(); + retElement = nodePool[deqNodeIdx].Value(); + // Mark node as non-pending by setting node index to UndefinedIndex: + OperationDesc noOp( + false, // non-pending + false, // any + Node_t::UndefinedIndex // no node index + ); + curOp = OperationDesc(operationDescriptions[accessorId].Load()); + curOpRaw = curOp.Raw; + if (!operationDescriptions[accessorId].CompareAndSwap(curOpRaw, noOp.Raw)) { + // The threads own operation has changed, + // should not happen. + assert(false); + } + hp.EnqueuePointerForDeletion(nodeIdx); + return true; +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +void WaitFreeMPMCQueue:: +HelpEnqueue(unsigned int accessorId) { + while (IsPending(accessorId)) { + index_t lastIdx = tailIdx.Load(); + // Guard tail: + hp.GuardPointer(0, lastIdx); + // Last node still is tail: + if (lastIdx == tailIdx.Load()) { + Node_t & lastNode = nodePool[lastIdx]; + index_t nextIdx = lastNode.NextPoolIdx(); + // tail.next is null (no pending enqueue on tail): + if (lastNode.NextPoolIdx() == Node_t::UndefinedIndex) { + // Apply enqueue. + // No other accessor helped this enqueue operation yet: + if (IsPending(accessorId)) { + // Set next-pointer of last element in list + OperationDesc opDesc(operationDescriptions[accessorId].Load()); + if (lastNode.CASNext(nextIdx, opDesc.NodeIndex)) { + HelpFinishEnqueue(); + return; + } + } + } else { + // Some enqueue operation in progress + HelpFinishEnqueue(); + } + } + } +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +void WaitFreeMPMCQueue:: +HelpFinishEnqueue() { + // Load node pointed at by tail: + index_t lastIdx = tailIdx.Load(); + // Guard tail: + hp.GuardPointer(0, lastIdx); + + Node_t & lastNode = nodePool[lastIdx]; + index_t nextIdx = lastNode.NextPoolIdx(); + // This node is NEXT of tail, but not tail => unfinished ENQ + hp.GuardPointer(1, nextIdx); + if (nextIdx != Node_t::UndefinedIndex) { + Node_t & nextNode = nodePool[nextIdx]; + // Load accessor id from last (non-tail) element in list: + index_t helpAID = nextNode.EnqueueAID(); + // Load operation for accessor that started the unfinished enqueue: + OperationDesc helpOp(operationDescriptions[helpAID].Load()); + + // tail index still points at last node: + // (last == tail && state[aid].node == next) + if (lastIdx == tailIdx.Load() && + helpOp.NodeIndex == nextIdx) { + OperationDesc newOp( + false, // set to nonpending + true, // enqueue + nextIdx // node index == helpOp.NodeIndex + ); + index_t helpOpRaw = helpOp.Raw; + operationDescriptions[helpAID].CompareAndSwap( + helpOpRaw, + newOp.Raw); + // Update tail pointer: + tailIdx.CompareAndSwap(lastIdx, nextIdx); + } + } +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +void WaitFreeMPMCQueue:: +HelpDequeue(index_t accessorId) { + while (IsPending(accessorId)) { + index_t firstIdx = headIdx.Load(); + // Guard head: + hp.GuardPointer(0, firstIdx); + if (firstIdx != headIdx.Load()) { + // Head pointer changed by concurrent enqueue + continue; + } + index_t lastIdx = tailIdx.Load(); + Node_t & first = nodePool[firstIdx]; + index_t nextIdx = first.NextPoolIdx(); + // Guard head->next: + hp.GuardPointer(1, nextIdx); + if (nextIdx != first.NextPoolIdx()) { + // Head->next pointer changed by concurrent enqueue + continue; + } + if (firstIdx == lastIdx) { + // Queue might be empty + if (nextIdx == Node_t::UndefinedIndex) { + // Queue is empty + OperationDesc curOp(operationDescriptions[accessorId].Load()); + if (lastIdx == tailIdx.Load() && IsPending(accessorId)) { + OperationDesc newOp( + false, // Set nonpending state + false, + Node_t::UndefinedIndex // Leave undefined, to signal failed dequeue + ); + // CAS without check as possibly another accessor + // already helped this dequeue operation. + index_t curOpRaw = curOp.Raw; + operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw); + } + } else { + // Head is advanced because of unfinished enqueue, so + // help other enqueue and retry: + HelpFinishEnqueue(); + } + } else { + // Queue is not empty + OperationDesc curOp(operationDescriptions[accessorId].Load()); + index_t nodeIdx = curOp.NodeIndex; + if (!IsPending(accessorId)) { + // Accessor not pending because another thread completed this + // operation already, done. + break; + } + if (firstIdx == headIdx.Load() && nodeIdx != firstIdx) { + OperationDesc newOp( + true, + false, + firstIdx // Set node index + ); + index_t curOpRaw = curOp.Raw; + if (!operationDescriptions[accessorId].CompareAndSwap( + curOpRaw, newOp.Raw)) { + // This loop is wait-free as every accessor's operation + // will be pending for a limited time only, so continue + // and retry is okay. + // This CAS can only have failed because another + // thread completed this dequeue operation in the + // meantime. + continue; // Retry + } + } + // The following CAS also happens if + // + // firstIdx != headIdx.Load() || nodeIdx == firstIdx + // + // In this case, HelpFinishDequeue will complete the dequeue. + index_t curDeqAID = Node_t::UndefinedIndex; + // Register this accessor as dequeuer of this node. + // If this CAS fails, another accessor is already (perhaps + // helping) dequeueing this node winning this CAS. In this + // case this dequeue operation is just ignored. + if (!first.DequeueAID().CompareAndSwap(curDeqAID, accessorId)) { + continue; + // Lost CAS to helping accessor. In this case, this operation + // is not pending anymore at this point. + } + HelpFinishDequeue(); + } + } // while pending +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +void WaitFreeMPMCQueue:: +HelpFinishDequeue() { + index_t firstIdx = headIdx.Load(); + // Guard head: + hp.GuardPointer(0, firstIdx); + Node_t & first = nodePool[firstIdx]; + index_t nextIdx = first.NextPoolIdx(); + // Guard and head->next: + hp.GuardPointer(1, nextIdx); + index_t accessorId = first.DequeueAID().Load(); + if (accessorId != Node_t::UndefinedIndex) { + OperationDesc curOp(operationDescriptions[accessorId].Load()); + if (firstIdx == headIdx.Load() && + nextIdx != Node_t::UndefinedIndex) { + // Set state of helped operation to NONPENDING: + OperationDesc newOp( + false, // nonpending + false, // dequeue + curOp.NodeIndex + ); + // CAS without check as possibly another accessor + // already helped this dequeue operation. + index_t curOpRaw = curOp.Raw; + operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw); + headIdx.CompareAndSwap(firstIdx, nextIdx); + } + } +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +void WaitFreeMPMCQueue:: +Help() { + // Fairness guarantee in every thread: + // "Every other thread will help my operation before helping its + // own operation" + // Number of total operations in operation description buffer to be + // helped before engaging in own announced operation + index_t numHelpOps = static_cast(num_states); + index_t ownAccessorId = Node_t::UndefinedIndex; + LoadAccessorThreadIndex(ownAccessorId); + // Start helping accessor with next id, ensuring that own accessor id + // will be used in last iteration of help loop: + index_t startAccessorId = (ownAccessorId + 1) % num_states; + for (unsigned int accessorId = startAccessorId; + numHelpOps > 0; + ++accessorId, --numHelpOps) { + OperationDesc desc( + operationDescriptions[accessorId % num_states].Load()); + if (desc.Pending) { + if (desc.Enqueue) { + HelpEnqueue(accessorId % num_states); + } else { + HelpDequeue(accessorId % num_states); + } + } + } +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +void WaitFreeMPMCQueue:: +DeleteNodeCallback(index_t releasedNodeIndex) { + if (sentinelNodeIndex == releasedNodeIndex) { + // Prevent sentinel node from reuse, it will be returned to pool + // in destructor only. + return; + } + if (NodeIsPending(releasedNodeIndex)) { + return; + } + nodePool.Free(static_cast(releasedNodeIndex)); +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +inline size_t WaitFreeMPMCQueue:: +GetCapacity() { + return max_size_; +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +inline bool WaitFreeMPMCQueue:: +IsPending(unsigned int accessorId) { + OperationDesc opDesc(operationDescriptions[accessorId].Load()); + return opDesc.Pending; +} + +template< + typename Type, class NodeAllocator, class OpAllocator, class ValuePool > +bool WaitFreeMPMCQueue:: +NodeIsPending(index_t nodeId) { + // Returns true if given node index is involved in any announced operation, + // including non-pending operations, as dequeue-operations are set to + // non-pending before returning the node's value. Dequeue operations are + // finalized by setting their node index to UndefinedIndex. + for (unsigned int accessorId = 0; accessorId < num_states; ++accessorId) { + OperationDesc desc(operationDescriptions[accessorId].Load()); + if (desc.NodeIndex == nodeId) { + return true; + } + } + return false; +} + +} // namespace containers +} // namespace embb + +#endif // EMBB_CONTAINERS_INTERNAL_WAIT_FREE_MPMC_QUEUE_INL_H_ diff --git a/containers_cpp/include/embb/containers/lock_free_mpmc_queue.h b/containers_cpp/include/embb/containers/lock_free_mpmc_queue.h index 37c5439..b8b28cb 100644 --- a/containers_cpp/include/embb/containers/lock_free_mpmc_queue.h +++ b/containers_cpp/include/embb/containers/lock_free_mpmc_queue.h @@ -29,13 +29,10 @@ #include #include - #include #include #include - #include -#include namespace embb { namespace containers { @@ -98,6 +95,7 @@ class LockFreeMPMCQueueNode { * \ingroup CPP_CONTAINERS_QUEUES * * \see WaitFreeSPSCQueue + * \see WaitFreeMPMCQueue * * \tparam Type Type of the queue elements * \tparam ValuePool Type of the value pool used as basis for the ObjectPool diff --git a/containers_cpp/include/embb/containers/object_pool.h b/containers_cpp/include/embb/containers/object_pool.h index 0a94708..458371e 100644 --- a/containers_cpp/include/embb/containers/object_pool.h +++ b/containers_cpp/include/embb/containers/object_pool.h @@ -79,32 +79,6 @@ class ObjectPool { */ ValuePool p; - /** - * Helper providing a virtual iterator that just returns true in each - * iteration step. Used for filling the value pool. Implements the normal - * C++ iterator concept. Not further documented here. - */ - class ReturningTrueIterator { - public: - typedef ReturningTrueIterator self_type; - typedef bool value_type; - typedef bool& reference; - typedef bool* pointer; - typedef std::forward_iterator_tag iterator_category; - typedef int difference_type; - explicit ReturningTrueIterator(size_t count_value); - self_type operator++(); - self_type operator++(int); - reference operator*(); - pointer operator->(); - bool operator==(const self_type& rhs); - bool operator!=(const self_type& rhs); - - private: - size_t count_value; - bool ret_value; - }; - bool IsContained(const Type &obj) const; int GetIndexOfObject(const Type &obj) const; Type* AllocateRaw(); diff --git a/containers_cpp/include/embb/containers/wait_free_mpmc_queue.h b/containers_cpp/include/embb/containers/wait_free_mpmc_queue.h new file mode 100644 index 0000000..d5f73cc --- /dev/null +++ b/containers_cpp/include/embb/containers/wait_free_mpmc_queue.h @@ -0,0 +1,425 @@ +/* + * Copyright (c) 2014, Siemens AG. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef EMBB_CONTAINERS_WAIT_FREE_MPMC_QUEUE_H_ +#define EMBB_CONTAINERS_WAIT_FREE_MPMC_QUEUE_H_ + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace embb { +namespace containers { + +namespace internal { + +/** + * Queue node + * + * Single linked list, contains the element (\c value) and a pool index + * referencing the next node (\c next_idx). + * + * \tparam ElementT Element type + */ +template +class WaitFreeMPMCQueueNode { + private: + typedef WaitFreeMPMCQueueNode self_t; + + public: + typedef uint32_t index_t; + static const index_t UndefinedIndex; + + private: + /** + * Node value + */ + Type value; + + /** + * Enqeue accessor id + */ + uint32_t enq_aid; + + /** + * Pool-index of next Node in list (atomic), -1 for none + */ + embb::base::Atomic next_idx; + + /** + * Dequeue accessor id (atomic), -1 for none + */ + embb::base::Atomic deq_aid; + + public: + /** + * Default constructor + */ + inline WaitFreeMPMCQueueNode(); + + /** + * Copy constructor + */ + inline WaitFreeMPMCQueueNode( + const self_t & other + /**< [IN] Instance to copy */ + ); + + /** + * Assignment operator + */ + inline self_t & operator=( + const self_t & other + /**< [IN] Instance to assign */ + ); + + /** + * Constructs a new instance of Node. + */ + inline WaitFreeMPMCQueueNode( + Type const val, + /**< [IN] Value to be contained in the Node instance */ + uint32_t enqAid + /**< [IN] Enqueuer accessor id */ + ); + + inline Type Value() const; + + inline uint32_t NextPoolIdx() const; + + /** + * Set pointer to next Node element via CAS. + * + * \return true if new pointer value could be set. + */ + inline bool CASNext( + uint32_t expNextIdx, + /**< [IN] Expected current pointer value */ + uint32_t newNextIdx + /**< [IN] New pointer value to set */ + ); + + inline bool Next_IsNull() const; + + inline uint32_t EnqueueAID() const; + + inline embb::base::Atomic & DequeueAID(); +}; + +} // namespace internal + +/** + * Wait-free queue for multiple enqueuers and dequeuers + * + * \concept{CPP_CONCEPTS_QUEUE} + * + * \ingroup CPP_CONTAINERS_QUEUES + * + * \see LockFreeMPMCQueue + * \see WaitFreeSPSCQueue + * + * \tparam Type Type of the queue elements + * \tparam NodeAllocator Allocator type for allocating queue nodes + * \tparam OpAllocator Allocator type for allocating operation description + * objects + * \tparam ValuePool Type of the value pool used as basis for the ObjectPool + * which stores queue nodes + */ +template< + typename Type, + class NodeAllocator = + embb::base::AllocatorCacheAligned >, + class OpAllocator = + embb::base::AllocatorCacheAligned >, + class ValuePool = + WaitFreeArrayValuePool +> +class WaitFreeMPMCQueue { + private: + typedef internal::WaitFreeMPMCQueueNode Node_t; + typedef typename internal::WaitFreeMPMCQueueNode::index_t index_t; + typedef WaitFreeMPMCQueue self_t; + typedef internal::IndexedObjectPool< + internal::WaitFreeMPMCQueueNode, ValuePool, NodeAllocator> NodePool_t; + + private: + /** + * Maximum size of queue. Using maximum value of + * OperationDesc::NodeIndex (15 bit) minus one element + * required for sentinel node. + */ + static const index_t QUEUE_SIZE_MAX = static_cast(32767 - 1); + + /** + * Number of guards per thread + */ + static const size_t num_guards = 2; + + /** + * Null-pointer for hazard pointers + */ + static const index_t UndefinedGuard = 0; + + /** + * Helper class for operation descriptions. + * Used instead of bit-field struct for portability. + */ + class OperationDesc { + private: + static const index_t PENDING_FLAG_MASK = 0x80000000; ///< Bit 32 + static const index_t ENQUEUE_FLAG_MASK = 0x40000000; ///< Bit 31 + static const index_t NODE_INDEX_MASK = 0x3FFFFFFF; ///< Bit 30..1 + + private: + OperationDesc(); + + public: + bool Pending; + bool Enqueue; + index_t NodeIndex; + index_t Raw; + + public: + /** + * Converts state to binary value { pending:1, enqueue:1, nodeIndex:30 } + */ + OperationDesc( + bool pending, + bool enqueue, + index_t nodeIndex); + + /** + * Expects binary value { pending:1, enqueue:1, nodeIndex:30 } + */ + explicit OperationDesc(index_t raw); + }; + + /** Index of head node in node pool, should be aligned */ + embb::base::Atomic headIdx; + /** Index of tail node in node pool, should be aligned */ + embb::base::Atomic tailIdx; + /** Maximum element capacity of the queue */ + size_t max_size_; + /** Callback instance for release of guarded node indices */ + embb::base::Function< void, index_t > delete_pointer_callback; + /** Hazard pointer for node index (guards stack top pointer) */ + embb::containers::internal::HazardPointer< index_t > hp; + /** Instance of empty node used as sentinel */ + Node_t nullNode; + /** Maximum number of threads accessing this queue instance */ + size_t num_states; + /** Capacity of the node pool, includes overhead due to hazard pointers */ + size_t node_pool_size; + /** Array containing two states for every concurrent accessor on the + queue (needed for swapping) */ + embb::base::Atomic * operationDescriptions; + /** Pool for element nodes in the queue */ + NodePool_t nodePool; + /** Allocator for memory used for operation descriptions */ + OpAllocator operationDescriptionAllocator; + /** Index of sentinel node, stored for release in destructor */ + index_t sentinelNodeIndex; + + private: + /** + * Resolves thread index usable as accessor id. + * + * \returns True if thread index could be resolved, false otherwise. + * A call could fail, e.g. if there have been created more + * threads than initially configured in the embb runtime. + */ + inline bool LoadAccessorThreadIndex( + index_t & retIndexValue + /**< [OUT] Value of thread index */ + ); + + /** + * Resolves maximum required capacity for retired lists in hazard + * pointer implementation + */ + inline size_t RetiredListMaxSize( + size_t nThreads + /**< [IN] maximum number of threads operating on the queue */ + ); + + public: + /** + * Creates a queue with the specified capacity. + * + * \memory + * Let \c t be the maximum number of threads and \c x be 2.5*t+1. + * Then, x*(3*t+1) elements of size sizeof(void*), \c x + * elements of size sizeof(Type), and \c capacity+1 elements of size + * sizeof(Type) are allocated. + * + * \notthreadsafe + * + * \see CPP_CONCEPTS_QUEUE + */ + WaitFreeMPMCQueue( + size_t capacity + /**< [IN] Element capacity of the queue */ + ); + + /** + * Destroys the queue. + * + * \notthreadsafe + */ + ~WaitFreeMPMCQueue(); + + /** + * Tries to enqueue an element into the queue. + * + * \return \c true if the element could be enqueued, \c false if the queue is + * full. + * + * \waitfree + * + * \note It might be possible to enqueue more elements into the queue than its + * capacity permits. + * + * \see CPP_CONCEPTS_QUEUE + */ + bool TryEnqueue( + Type const & element + /**< [IN] Const reference to the element that shall be enqueued */ + ); + + /** + * Tries to dequeue an element from the queue. + * + * \return \c true if an element could be dequeued, \c false if the queue is + * empty. + * + * \waitfree + * + * \see CPP_CONCEPTS_QUEUE + */ + bool TryDequeue( + Type & retElement + /**< [IN,OUT] Reference to the dequeued element. Unchanged, if the + operation was not successful. */ + ); + + /** + * Returns the maximum size for instances of this queue. + * + * \waitfree + */ + inline size_t GetCapacity(); + + private: + /** + * Help progressing pending enqueue operations of given accessors + */ + void HelpEnqueue( + unsigned int accessorId + /**< [IN] Accessor id of operations to help */ + ); + + /** + * Help finishing pending enqueue operations of arbitrary accessors, + * including own pending enqueue operations. + * + * \waitfree + */ + void HelpFinishEnqueue(); + + /** + * Help progressing pending dequeue operations of given accessor, + * including own pending dequeue operations. + * + * \waitfree + */ + void HelpDequeue( + index_t accessorId + /**< [IN] Accessor id of operations to help */ + ); + + /** + * Help finishing pending dequeue operations of arbitrary accessors, + * including own pending dequeue operations. + * + * \waitfree + */ + void HelpFinishDequeue(); + + /** + * Help finishing pending operations of arbitrary accessors, including + * own pending operations. + * One operation of every other thread is completed before engaging in the + * calling thread's own announced operation. + * + * \waitfree + */ + void Help(); + + /** + * Whether the accessor with given id is pending. + * + * \returns True if the given accessor has a pending operation. + * + * \waitfree + */ + inline bool IsPending( + unsigned int accessorId + /**< [IN] Accessor id of operations to help */ + ); + + /** + * Whether the node with given index is in the process of being + * dequeued. Prevents reclamation of node id when it cannot be + * guarded by hazard pointers, between HelpDequeue and HelpFinishDequeue. + */ + bool NodeIsPending( + index_t nodeId + /**< [IN] Pool index of the node instance to test */ + ); + + /** + * Callback used for deallocating a node index from hazard + * pointers. Frees the associated node in the pool. + * + * \waitfree + */ + void DeleteNodeCallback( + index_t releasedNodeIndex + /**< [IN] Pool index of the node instance to release */ + ); +}; + +} // namespace containers +} // namespace embb + +#include + +#endif // EMBB_CONTAINERS_WAIT_FREE_MPMC_QUEUE_H_ diff --git a/containers_cpp/include/embb/containers/wait_free_spsc_queue.h b/containers_cpp/include/embb/containers/wait_free_spsc_queue.h index 30f7235..dddaf5d 100644 --- a/containers_cpp/include/embb/containers/wait_free_spsc_queue.h +++ b/containers_cpp/include/embb/containers/wait_free_spsc_queue.h @@ -113,6 +113,7 @@ namespace containers { * \ingroup CPP_CONTAINERS_QUEUES * * \see LockFreeMPMCQueue + * \see WaitFreeMPMCQueue * * \tparam Type Type of the queue elements * \tparam Allocator Allocator type for allocating queue elements. diff --git a/containers_cpp/test/main.cc b/containers_cpp/test/main.cc index 0e26fee..a0fd1b4 100644 --- a/containers_cpp/test/main.cc +++ b/containers_cpp/test/main.cc @@ -26,10 +26,11 @@ #include #include -#include #include #include #include +#include +#include #include #include @@ -47,6 +48,7 @@ using embb::containers::WaitFreeArrayValuePool; using embb::containers::LockFreeTreeValuePool; using embb::containers::WaitFreeSPSCQueue; using embb::containers::LockFreeMPMCQueue; +using embb::containers::WaitFreeMPMCQueue; using embb::containers::LockFreeStack; using embb::containers::LockFreeTreeValuePool; using embb::containers::WaitFreeArrayValuePool; @@ -60,16 +62,20 @@ PT_MAIN("Data Structures C++") { unsigned int max_threads = static_cast( 2 * partest::TestSuite::GetDefaultNumThreads()); embb_thread_set_max_count(max_threads); - +#if 0 PT_RUN(PoolTest< WaitFreeArrayValuePool >); PT_RUN(PoolTest< LockFreeTreeValuePool >); PT_RUN(HazardPointerTest); PT_RUN(QueueTest< WaitFreeSPSCQueue< ::std::pair > >); PT_RUN(QueueTest< LockFreeMPMCQueue< ::std::pair > COMMA true COMMA true >); +#endif + PT_RUN(QueueTest< WaitFreeMPMCQueue< ::std::pair > + COMMA true COMMA true >); +#if 0 PT_RUN(StackTest< LockFreeStack >); PT_RUN(ObjectPoolTest< LockFreeTreeValuePool >); PT_RUN(ObjectPoolTest< WaitFreeArrayValuePool >); - - PT_EXPECT(embb_get_bytes_allocated() == 0); +#endif + PT_EXPECT_EQ(embb_get_bytes_allocated(), static_cast(0)); } -- libgit2 0.26.0