Commit 3fe8c475 by Tobias Fuchs

containers_cpp: added WaitFreeMPMCQueue

parent 5abeb065
/*
* Copyright (c) 2014, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_INL_H_
#define EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_INL_H_
#include <embb/containers/internal/returning_true_iterator.h>
namespace embb {
namespace containers {
namespace internal {
template<typename T, class IndexPool, class Allocator>
template<typename RAI>
IndexedObjectPool<T, IndexPool, Allocator>::
IndexedObjectPool(RAI first, RAI last) :
size_(static_cast<size_t>(std::distance(first, last))),
indexPool(internal::ReturningTrueIterator(0),
internal::ReturningTrueIterator(size_)) {
// use the allocator to allocate array of size dist
elements = allocator.allocate(size_);
// fill element pool with elements from the iteration
int i = 0;
for (RAI curIter(first); curIter != last; ++curIter, ++i) {
// assign element from iteration
elements[i] = *curIter;
}
}
template<typename T, class IndexPool, class Allocator>
IndexedObjectPool<T, IndexPool, Allocator >::
IndexedObjectPool(size_t size, const T & defaultInstance) :
size_(size),
indexPool(internal::ReturningTrueIterator(0),
internal::ReturningTrueIterator(size_)) {
// use the allocator to allocate array of size dist
elements = allocator.allocate(size);
// fill element pool with elements from the iteration
for (size_t i = 0; i < size_; ++i) {
// initialize element from default constructor and
// assignment operator
elements[i] = defaultInstance;
}
}
template<typename T, class IndexPool, class Allocator>
IndexedObjectPool<T, IndexPool, Allocator >::
~IndexedObjectPool() {
allocator.deallocate(elements, size_);
}
template<typename T, class IndexPool, class Allocator>
int IndexedObjectPool<T, IndexPool, Allocator >::
Allocate(T & element) {
// Reserve a pool index:
bool reservedFlag;
int index = indexPool.Allocate(reservedFlag);
// Assign element to be allocated at pool index.
// Index returned from index pool is -1 if no index
// is available.
if (index >= 0) {
element = elements[index];
}
return index;
}
template<typename T, class IndexPool, class Allocator>
void IndexedObjectPool<T, IndexPool, Allocator >::
Free(int elementIndex) {
// Call the referenced element's destructor:
elements[elementIndex].~T();
// Release index of the element for reuse:
indexPool.Free(true, elementIndex);
}
template<typename T, class IndexPool, class Allocator>
T & IndexedObjectPool<T, IndexPool, Allocator >::
operator[](size_t elementIndex) {
return elements[elementIndex];
}
} // namespace internal
} // namespace containers
} // namespace embb
#endif // EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_INL_H_
/*
* Copyright (c) 2014, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_H_
#define EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_H_
#include <embb/base/atomic.h>
#include <embb/base/memory_allocation.h>
#include <embb/containers/lock_free_tree_value_pool.h>
#include <embb/containers/wait_free_array_value_pool.h>
namespace embb {
namespace containers {
namespace internal {
template<
typename T,
class IndexPool = LockFreeTreeValuePool<bool, false>,
class Allocator = embb::base::Allocator<T>
>
class IndexedObjectPool {
private:
const size_t size_;
T * elements;
Allocator allocator;
IndexPool indexPool;
IndexedObjectPool();
// Prevent copy-construction
IndexedObjectPool(const IndexedObjectPool&);
// Prevent assignment
IndexedObjectPool& operator=(const IndexedObjectPool&);
public:
/**
* \see value_pool_concept
*
* \notthreadsafe
*
* \memory dynamically allocates
* \c n * (sizeof(T) + sizeof(embb::base::Atomic<bool>))
* bytes, where \c n is the number of elements in the pool.
*/
template<typename RAI>
IndexedObjectPool(
RAI first,
/**< [IN] first iterator to elements the pool is filled with */
RAI last
/**< [IN] last iterator to elements the pool is filled with */
);
/**
* \see value_pool_concept
*
* \notthreadsafe
*
* \memory dynamically allocates
* \c n * (sizeof(T) + sizeof(embb::base::Atomic<bool>))
* bytes, where \c n is the number of elements in the pool.
*/
IndexedObjectPool(
size_t size,
/**< [IN] Number of elements the pool is filled with */
const T & defaultInstance
/**< [IN] Default instance to initialize pool elements with */
);
/**
* Destructor, deallocating memory
*/
~IndexedObjectPool();
/**
* Request element and index from pool.
*
* \return index of element
*
* \see object_pool_concept
*
*/
int Allocate(T & element);
/**
* Return element and index to the pool.
*
* \see value_pool_concept
*
*/
void Free(int elementIndex);
/**
* Return element from the pool at given index.
*
* \see object_pool_concept
*
*/
T & operator[](size_t elementIndex);
};
} // namespace internal
} // namespace containers
} // namespace embb
#include <embb/containers/internal/indexed_object_pool-inl.h>
#endif // EMBB_CONTAINERS_INTERNAL_INDEXED_OBJECT_POOL_H_
...@@ -27,58 +27,10 @@ ...@@ -27,58 +27,10 @@
#ifndef EMBB_CONTAINERS_INTERNAL_OBJECT_POOL_INL_H_ #ifndef EMBB_CONTAINERS_INTERNAL_OBJECT_POOL_INL_H_
#define EMBB_CONTAINERS_INTERNAL_OBJECT_POOL_INL_H_ #define EMBB_CONTAINERS_INTERNAL_OBJECT_POOL_INL_H_
#include <embb/containers/internal/returning_true_iterator.h>
namespace embb { namespace embb {
namespace containers { namespace containers {
template<class Type, typename ValuePool, class ObjectAllocator>
ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::ReturningTrueIterator(size_t count_value) :
count_value(count_value),
ret_value(true)
{}
template<class Type, typename ValuePool, class ObjectAllocator>
typename ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::self_type
ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::operator++() {
self_type i = *this;
count_value++;
return i;
}
template<class Type, typename ValuePool, class ObjectAllocator>
typename ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::self_type ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::operator++(int) {
count_value++;
return *this;
}
template<class Type, typename ValuePool, class ObjectAllocator>
typename ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::reference ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::operator*() {
return ret_value;
}
template<class Type, typename ValuePool, class ObjectAllocator>
typename ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::pointer ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::operator->() {
return &ret_value;
}
template<class Type, typename ValuePool, class ObjectAllocator>
bool ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::operator==(const self_type& rhs) {
return count_value == rhs.count_value;
}
template<class Type, typename ValuePool, class ObjectAllocator>
bool ObjectPool<Type, ValuePool, ObjectAllocator>::
ReturningTrueIterator::operator!=(const self_type& rhs) {
return count_value != rhs.count_value;
}
template<class Type, typename ValuePool, class ObjectAllocator> template<class Type, typename ValuePool, class ObjectAllocator>
bool ObjectPool<Type, ValuePool, ObjectAllocator>:: bool ObjectPool<Type, ValuePool, ObjectAllocator>::
...@@ -118,7 +70,8 @@ size_t ObjectPool<Type, ValuePool, ObjectAllocator>::GetCapacity() { ...@@ -118,7 +70,8 @@ size_t ObjectPool<Type, ValuePool, ObjectAllocator>::GetCapacity() {
template<class Type, typename ValuePool, class ObjectAllocator> template<class Type, typename ValuePool, class ObjectAllocator>
ObjectPool<Type, ValuePool, ObjectAllocator>::ObjectPool(size_t capacity) : ObjectPool<Type, ValuePool, ObjectAllocator>::ObjectPool(size_t capacity) :
capacity(capacity), capacity(capacity),
p(ReturningTrueIterator(0), ReturningTrueIterator(capacity)) { p(internal::ReturningTrueIterator(0),
internal::ReturningTrueIterator(capacity)) {
// Allocate the objects (without construction, just get the memory) // Allocate the objects (without construction, just get the memory)
objects = objectAllocator.allocate(capacity); objects = objectAllocator.allocate(capacity);
} }
......
/*
* Copyright (c) 2014, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef EMBB_CONTAINERS_INTERNAL_RETURNING_TRUE_ITERATOR_H_
#define EMBB_CONTAINERS_INTERNAL_RETURNING_TRUE_ITERATOR_H_
#include <iterator>
namespace embb {
namespace containers {
namespace internal {
/**
* Helper providing a virtual iterator that just returns true in each
* iteration step. Used for filling the value pool. Implements the normal
* C++ iterator concept. Not further documented here.
*/
class ReturningTrueIterator {
public:
typedef ReturningTrueIterator self_type;
typedef bool value_type;
typedef bool & reference;
typedef bool * pointer;
typedef ::std::forward_iterator_tag iterator_category;
typedef int difference_type;
public:
inline explicit ReturningTrueIterator(size_t count_value) :
count_value(count_value),
ret_value(true)
{ }
inline self_type operator++() {
self_type i = *this;
count_value++;
return i;
}
inline self_type operator++(int) {
count_value++;
return *this;
}
inline reference operator*() {
return ret_value;
}
inline pointer operator->() {
return &ret_value;
}
inline bool operator==(const self_type & rhs) {
return count_value == rhs.count_value;
}
inline bool operator!=(const self_type & rhs) {
return count_value != rhs.count_value;
}
inline difference_type operator-(const self_type & rhs) {
return static_cast<difference_type>(count_value) -
static_cast<difference_type>(rhs.count_value);
}
private:
size_t count_value;
bool ret_value;
};
} // namespace internal
} // namespace containers
} // namespace embb
#endif // EMBB_CONTAINERS_INTERNAL_RETURNING_TRUE_ITERATOR_H_
/*
* Copyright (c) 2014, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef EMBB_CONTAINERS_INTERNAL_WAIT_FREE_MPMC_QUEUE_INL_H_
#define EMBB_CONTAINERS_INTERNAL_WAIT_FREE_MPMC_QUEUE_INL_H_
#include <embb/base/atomic.h>
#include <embb/base/function.h>
#include <embb/base/c/internal/thread_index.h>
namespace embb {
namespace containers {
namespace internal {
template<typename Type>
inline WaitFreeMPMCQueueNode<Type>::WaitFreeMPMCQueueNode()
: enq_aid(UndefinedIndex) {
next_idx.Store(UndefinedIndex);
deq_aid.Store(UndefinedIndex);
}
template<typename Type>
inline WaitFreeMPMCQueueNode<Type>::WaitFreeMPMCQueueNode(
const self_t & other)
: value(other.value),
enq_aid(other.enq_aid) {
next_idx.Store(other.next_idx.Load());
deq_aid.Store(other.deq_aid.Load());
}
template<typename Type>
inline WaitFreeMPMCQueueNode<Type>::WaitFreeMPMCQueueNode(
Type const val, uint32_t enqAid)
: value(val), enq_aid(enqAid) {
next_idx.Store(UndefinedIndex);
deq_aid.Store(UndefinedIndex);
}
template<typename Type>
inline Type WaitFreeMPMCQueueNode<Type>::Value() const {
return value;
}
template<typename Type>
inline uint32_t WaitFreeMPMCQueueNode<Type>::NextPoolIdx() const {
return next_idx.Load();
}
template<typename Type>
inline WaitFreeMPMCQueueNode<Type> &
WaitFreeMPMCQueueNode<Type>::operator=(
const self_t & other) {
if (this != &other) {
next_idx.Store(other.next_idx.Load());
deq_aid.Store(other.deq_aid.Load());
value = other.value;
enq_aid = other.enq_aid;
}
return *this;
}
template<typename Type>
bool WaitFreeMPMCQueueNode<Type>::CASNext(
uint32_t expNextIdx, uint32_t newNextIdx) {
return next_idx.CompareAndSwap(expNextIdx, newNextIdx);
}
template<typename Type>
bool WaitFreeMPMCQueueNode<Type>::Next_IsNull() const {
return next_idx.Load() == UndefinedIndex;
}
template<typename Type>
inline uint32_t WaitFreeMPMCQueueNode<Type>::EnqueueAID() const {
return enq_aid;
}
template<typename Type>
embb::base::Atomic<uint32_t> & WaitFreeMPMCQueueNode<Type>::DequeueAID() {
return deq_aid;
}
/// Using maximum value of OperationDesc::NodeIndex (15 bit) to
/// represent 'undefined'.
template<typename Type>
const uint32_t WaitFreeMPMCQueueNode<Type>::UndefinedIndex = 0x3fffffff;
} // namespace internal
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::OperationDesc::
OperationDesc(
bool pending,
bool enqueue,
index_t nodeIndex) :
Pending(pending),
Enqueue(enqueue),
NodeIndex(nodeIndex),
Raw(0) {
index_t nodeIndexMask = NodeIndex & NODE_INDEX_MASK;
if (Pending) {
Raw |= PENDING_FLAG_MASK;
}
if (Enqueue) {
Raw |= ENQUEUE_FLAG_MASK;
}
Raw |= nodeIndexMask;
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::OperationDesc::
OperationDesc(index_t raw) : Raw(raw) {
Pending = (raw & PENDING_FLAG_MASK) ? true : false;
Enqueue = (raw & ENQUEUE_FLAG_MASK) ? true : false;
NodeIndex = (raw & NODE_INDEX_MASK);
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
WaitFreeMPMCQueue(size_t capacity)
: max_size_(capacity),
// Disable "this is used in base member initializer" warning.
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable:4355)
#endif
delete_pointer_callback(*this, &self_t::DeleteNodeCallback),
#ifdef _MSC_VER
#pragma warning(pop)
#endif
hp(delete_pointer_callback, UndefinedGuard, 2),
// Using int for numThreads so compiler warning is
// raised when size and numThreads are switched by
// mistake.
num_states(embb::base::Thread::GetThreadsMaxCount()),
// Node pool size, with respect to the maximum number of
// retired nodes not eligible for reuse due to hazard pointers:
node_pool_size(
// numThreads caused trouble here
(hp.GetRetiredListMaxSize() *
embb::base::Thread::GetThreadsMaxCount()) +
max_size_ + 1),
nodePool(node_pool_size, nullNode) {
// Assert width of binary representation of operation description
assert(sizeof(index_t) == 4);
if (max_size_ > QUEUE_SIZE_MAX) {
EMBB_THROW(embb::base::NoMemoryException,
"Maximum size of queue exceeded");
}
// Allocate sentinel node:
Node_t sentinelNode;
int sentinelNodePoolIndex = nodePool.Allocate(sentinelNode);
if (sentinelNodePoolIndex < 0) {
EMBB_THROW(embb::base::NoMemoryException,
"Allocation of sentinel node failed");
}
// No need to guard sentinel node, it is prevented from reuse
// in the hazard pointers' delete callback (see DeleteNodeCallback).
sentinelNodeIndex = static_cast<index_t>(sentinelNodePoolIndex);
headIdx.Store(sentinelNodeIndex);
tailIdx.Store(sentinelNodeIndex);
// State of the queue is one operation description per queue accessor.
// Initialize clear state: Null-operarion for every accessor.
operationDescriptions = operationDescriptionAllocator.allocate(num_states);
for (size_t accessorId = 0; accessorId < num_states; ++accessorId) {
OperationDesc op(
false, // nonpending
true, // enqueue, should not matter
Node_t::UndefinedIndex // node index
);
// As all operation descriptions are the same,
// we do not have to map accessor ids to operation
// pool indices. Every accessor will randomly grab
// an operation pool element and stick to it, as
// a threads accessor id will not change.
operationDescriptions[accessorId].Store(op.Raw);
}
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
~WaitFreeMPMCQueue() {
// Dequeue until queue is empty:
Type val;
// Delete internally managed memory regions:
operationDescriptionAllocator.deallocate(
operationDescriptions,
num_states);
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
inline bool WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
LoadAccessorThreadIndex(index_t & retIndexValue) {
unsigned int tmpIndexValue; // For conversion size32_t <-> unsigned int
if (embb_internal_thread_index(&tmpIndexValue) == EMBB_SUCCESS) {
// Fail if thread index is not in range of number of accessors:
if (tmpIndexValue < num_states) {
retIndexValue = tmpIndexValue;
return true;
}
return false;
}
retIndexValue = Node_t::UndefinedIndex;
return false;
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
inline size_t WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
RetiredListMaxSize(size_t nThreads) {
return static_cast<size_t>(
1.25 *
static_cast<double>(nThreads) * static_cast<double>(num_guards)) + 1;
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
bool WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
TryEnqueue(Type const & element) {
index_t accessorId = Node_t::UndefinedIndex;
if (!LoadAccessorThreadIndex(accessorId)) {
EMBB_THROW(embb::base::ErrorException,
"Invalid thread ID.");
}
// Register new node in pool:
Node_t poolNode;
int nodeIndex = nodePool.Allocate(poolNode);
if (nodeIndex < 0) {
return false; // Queue is at capacity
}
// Initialize node in pool:
Node_t newNode(element, accessorId);
nodePool[static_cast<index_t>(nodeIndex)] = newNode;
OperationDesc enqOp(
true, // pending
true, // enqueue
static_cast<index_t>(nodeIndex)
);
operationDescriptions[accessorId].Store(enqOp.Raw);
Help();
HelpFinishEnqueue();
return true;
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
bool WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
TryDequeue(Type & retElement) {
index_t accessorId = static_cast<index_t>(-1);
if (!LoadAccessorThreadIndex(accessorId)) {
EMBB_THROW(embb::base::ErrorException,
"Invalid thread ID. Verify embb::base::Thread::thread_max_count.");
}
OperationDesc curOp(operationDescriptions[accessorId].Load());
// Assert that current operation of this accessor is completed:
assert(!curOp.Pending);
// Register new operation description for CAS:
OperationDesc newOp(
true, // pending
false, // dequeue
Node_t::UndefinedIndex // node index
);
index_t curOpRaw = curOp.Raw;
if (!operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw)) {
// The threads own operation has changed,
// should not happen.
assert(false);
}
Help();
HelpFinishDequeue();
// Accessor's operation Idx might have changed in between.
curOp = OperationDesc(operationDescriptions[accessorId].Load());
// Operation announced by this thread must not be pending any more:
assert(!curOp.Pending);
// Check element
index_t nodeIdx = curOp.NodeIndex;
Node_t & node = nodePool[nodeIdx];
if (nodeIdx == Node_t::UndefinedIndex) {
// Allow dequeueing from empty queue, but
// return false:
retElement = Type();
return false;
}
index_t deqNodeIdx = node.NextPoolIdx();
retElement = nodePool[deqNodeIdx].Value();
// Mark node as non-pending by setting node index to UndefinedIndex:
OperationDesc noOp(
false, // non-pending
false, // any
Node_t::UndefinedIndex // no node index
);
curOp = OperationDesc(operationDescriptions[accessorId].Load());
curOpRaw = curOp.Raw;
if (!operationDescriptions[accessorId].CompareAndSwap(curOpRaw, noOp.Raw)) {
// The threads own operation has changed,
// should not happen.
assert(false);
}
hp.EnqueuePointerForDeletion(nodeIdx);
return true;
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
void WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
HelpEnqueue(unsigned int accessorId) {
while (IsPending(accessorId)) {
index_t lastIdx = tailIdx.Load();
// Guard tail:
hp.GuardPointer(0, lastIdx);
// Last node still is tail:
if (lastIdx == tailIdx.Load()) {
Node_t & lastNode = nodePool[lastIdx];
index_t nextIdx = lastNode.NextPoolIdx();
// tail.next is null (no pending enqueue on tail):
if (lastNode.NextPoolIdx() == Node_t::UndefinedIndex) {
// Apply enqueue.
// No other accessor helped this enqueue operation yet:
if (IsPending(accessorId)) {
// Set next-pointer of last element in list
OperationDesc opDesc(operationDescriptions[accessorId].Load());
if (lastNode.CASNext(nextIdx, opDesc.NodeIndex)) {
HelpFinishEnqueue();
return;
}
}
} else {
// Some enqueue operation in progress
HelpFinishEnqueue();
}
}
}
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
void WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
HelpFinishEnqueue() {
// Load node pointed at by tail:
index_t lastIdx = tailIdx.Load();
// Guard tail:
hp.GuardPointer(0, lastIdx);
Node_t & lastNode = nodePool[lastIdx];
index_t nextIdx = lastNode.NextPoolIdx();
// This node is NEXT of tail, but not tail => unfinished ENQ
hp.GuardPointer(1, nextIdx);
if (nextIdx != Node_t::UndefinedIndex) {
Node_t & nextNode = nodePool[nextIdx];
// Load accessor id from last (non-tail) element in list:
index_t helpAID = nextNode.EnqueueAID();
// Load operation for accessor that started the unfinished enqueue:
OperationDesc helpOp(operationDescriptions[helpAID].Load());
// tail index still points at last node:
// (last == tail && state[aid].node == next)
if (lastIdx == tailIdx.Load() &&
helpOp.NodeIndex == nextIdx) {
OperationDesc newOp(
false, // set to nonpending
true, // enqueue
nextIdx // node index == helpOp.NodeIndex
);
index_t helpOpRaw = helpOp.Raw;
operationDescriptions[helpAID].CompareAndSwap(
helpOpRaw,
newOp.Raw);
// Update tail pointer:
tailIdx.CompareAndSwap(lastIdx, nextIdx);
}
}
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
void WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
HelpDequeue(index_t accessorId) {
while (IsPending(accessorId)) {
index_t firstIdx = headIdx.Load();
// Guard head:
hp.GuardPointer(0, firstIdx);
if (firstIdx != headIdx.Load()) {
// Head pointer changed by concurrent enqueue
continue;
}
index_t lastIdx = tailIdx.Load();
Node_t & first = nodePool[firstIdx];
index_t nextIdx = first.NextPoolIdx();
// Guard head->next:
hp.GuardPointer(1, nextIdx);
if (nextIdx != first.NextPoolIdx()) {
// Head->next pointer changed by concurrent enqueue
continue;
}
if (firstIdx == lastIdx) {
// Queue might be empty
if (nextIdx == Node_t::UndefinedIndex) {
// Queue is empty
OperationDesc curOp(operationDescriptions[accessorId].Load());
if (lastIdx == tailIdx.Load() && IsPending(accessorId)) {
OperationDesc newOp(
false, // Set nonpending state
false,
Node_t::UndefinedIndex // Leave undefined, to signal failed dequeue
);
// CAS without check as possibly another accessor
// already helped this dequeue operation.
index_t curOpRaw = curOp.Raw;
operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw);
}
} else {
// Head is advanced because of unfinished enqueue, so
// help other enqueue and retry:
HelpFinishEnqueue();
}
} else {
// Queue is not empty
OperationDesc curOp(operationDescriptions[accessorId].Load());
index_t nodeIdx = curOp.NodeIndex;
if (!IsPending(accessorId)) {
// Accessor not pending because another thread completed this
// operation already, done.
break;
}
if (firstIdx == headIdx.Load() && nodeIdx != firstIdx) {
OperationDesc newOp(
true,
false,
firstIdx // Set node index
);
index_t curOpRaw = curOp.Raw;
if (!operationDescriptions[accessorId].CompareAndSwap(
curOpRaw, newOp.Raw)) {
// This loop is wait-free as every accessor's operation
// will be pending for a limited time only, so continue
// and retry is okay.
// This CAS can only have failed because another
// thread completed this dequeue operation in the
// meantime.
continue; // Retry
}
}
// The following CAS also happens if
//
// firstIdx != headIdx.Load() || nodeIdx == firstIdx
//
// In this case, HelpFinishDequeue will complete the dequeue.
index_t curDeqAID = Node_t::UndefinedIndex;
// Register this accessor as dequeuer of this node.
// If this CAS fails, another accessor is already (perhaps
// helping) dequeueing this node winning this CAS. In this
// case this dequeue operation is just ignored.
if (!first.DequeueAID().CompareAndSwap(curDeqAID, accessorId)) {
continue;
// Lost CAS to helping accessor. In this case, this operation
// is not pending anymore at this point.
}
HelpFinishDequeue();
}
} // while pending
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
void WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
HelpFinishDequeue() {
index_t firstIdx = headIdx.Load();
// Guard head:
hp.GuardPointer(0, firstIdx);
Node_t & first = nodePool[firstIdx];
index_t nextIdx = first.NextPoolIdx();
// Guard and head->next:
hp.GuardPointer(1, nextIdx);
index_t accessorId = first.DequeueAID().Load();
if (accessorId != Node_t::UndefinedIndex) {
OperationDesc curOp(operationDescriptions[accessorId].Load());
if (firstIdx == headIdx.Load() &&
nextIdx != Node_t::UndefinedIndex) {
// Set state of helped operation to NONPENDING:
OperationDesc newOp(
false, // nonpending
false, // dequeue
curOp.NodeIndex
);
// CAS without check as possibly another accessor
// already helped this dequeue operation.
index_t curOpRaw = curOp.Raw;
operationDescriptions[accessorId].CompareAndSwap(curOpRaw, newOp.Raw);
headIdx.CompareAndSwap(firstIdx, nextIdx);
}
}
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
void WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
Help() {
// Fairness guarantee in every thread:
// "Every other thread will help my operation before helping its
// own operation"
// Number of total operations in operation description buffer to be
// helped before engaging in own announced operation
index_t numHelpOps = static_cast<index_t>(num_states);
index_t ownAccessorId = Node_t::UndefinedIndex;
LoadAccessorThreadIndex(ownAccessorId);
// Start helping accessor with next id, ensuring that own accessor id
// will be used in last iteration of help loop:
index_t startAccessorId = (ownAccessorId + 1) % num_states;
for (unsigned int accessorId = startAccessorId;
numHelpOps > 0;
++accessorId, --numHelpOps) {
OperationDesc desc(
operationDescriptions[accessorId % num_states].Load());
if (desc.Pending) {
if (desc.Enqueue) {
HelpEnqueue(accessorId % num_states);
} else {
HelpDequeue(accessorId % num_states);
}
}
}
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
void WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
DeleteNodeCallback(index_t releasedNodeIndex) {
if (sentinelNodeIndex == releasedNodeIndex) {
// Prevent sentinel node from reuse, it will be returned to pool
// in destructor only.
return;
}
if (NodeIsPending(releasedNodeIndex)) {
return;
}
nodePool.Free(static_cast<int>(releasedNodeIndex));
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
inline size_t WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
GetCapacity() {
return max_size_;
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
inline bool WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
IsPending(unsigned int accessorId) {
OperationDesc opDesc(operationDescriptions[accessorId].Load());
return opDesc.Pending;
}
template<
typename Type, class NodeAllocator, class OpAllocator, class ValuePool >
bool WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool>::
NodeIsPending(index_t nodeId) {
// Returns true if given node index is involved in any announced operation,
// including non-pending operations, as dequeue-operations are set to
// non-pending before returning the node's value. Dequeue operations are
// finalized by setting their node index to UndefinedIndex.
for (unsigned int accessorId = 0; accessorId < num_states; ++accessorId) {
OperationDesc desc(operationDescriptions[accessorId].Load());
if (desc.NodeIndex == nodeId) {
return true;
}
}
return false;
}
} // namespace containers
} // namespace embb
#endif // EMBB_CONTAINERS_INTERNAL_WAIT_FREE_MPMC_QUEUE_INL_H_
...@@ -29,13 +29,10 @@ ...@@ -29,13 +29,10 @@
#include <embb/base/atomic.h> #include <embb/base/atomic.h>
#include <embb/base/function.h> #include <embb/base/function.h>
#include <embb/containers/lock_free_tree_value_pool.h> #include <embb/containers/lock_free_tree_value_pool.h>
#include <embb/containers/object_pool.h> #include <embb/containers/object_pool.h>
#include <embb/containers/internal/hazard_pointer.h> #include <embb/containers/internal/hazard_pointer.h>
#include <limits> #include <limits>
#include <stdexcept>
namespace embb { namespace embb {
namespace containers { namespace containers {
...@@ -98,6 +95,7 @@ class LockFreeMPMCQueueNode { ...@@ -98,6 +95,7 @@ class LockFreeMPMCQueueNode {
* \ingroup CPP_CONTAINERS_QUEUES * \ingroup CPP_CONTAINERS_QUEUES
* *
* \see WaitFreeSPSCQueue * \see WaitFreeSPSCQueue
* \see WaitFreeMPMCQueue
* *
* \tparam Type Type of the queue elements * \tparam Type Type of the queue elements
* \tparam ValuePool Type of the value pool used as basis for the ObjectPool * \tparam ValuePool Type of the value pool used as basis for the ObjectPool
......
...@@ -79,32 +79,6 @@ class ObjectPool { ...@@ -79,32 +79,6 @@ class ObjectPool {
*/ */
ValuePool p; ValuePool p;
/**
* Helper providing a virtual iterator that just returns true in each
* iteration step. Used for filling the value pool. Implements the normal
* C++ iterator concept. Not further documented here.
*/
class ReturningTrueIterator {
public:
typedef ReturningTrueIterator self_type;
typedef bool value_type;
typedef bool& reference;
typedef bool* pointer;
typedef std::forward_iterator_tag iterator_category;
typedef int difference_type;
explicit ReturningTrueIterator(size_t count_value);
self_type operator++();
self_type operator++(int);
reference operator*();
pointer operator->();
bool operator==(const self_type& rhs);
bool operator!=(const self_type& rhs);
private:
size_t count_value;
bool ret_value;
};
bool IsContained(const Type &obj) const; bool IsContained(const Type &obj) const;
int GetIndexOfObject(const Type &obj) const; int GetIndexOfObject(const Type &obj) const;
Type* AllocateRaw(); Type* AllocateRaw();
......
/*
* Copyright (c) 2014, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef EMBB_CONTAINERS_WAIT_FREE_MPMC_QUEUE_H_
#define EMBB_CONTAINERS_WAIT_FREE_MPMC_QUEUE_H_
#include <embb/containers/wait_free_array_value_pool.h>
#include <embb/containers/internal/indexed_object_pool.h>
#include <embb/containers/internal/hazard_pointer.h>
#include <embb/containers/internal/returning_true_iterator.h>
#include <embb/base/atomic.h>
#include <embb/base/function.h>
#include <embb/base/c/internal/thread_index.h>
#include <stdlib.h>
namespace embb {
namespace containers {
namespace internal {
/**
* Queue node
*
* Single linked list, contains the element (\c value) and a pool index
* referencing the next node (\c next_idx).
*
* \tparam ElementT Element type
*/
template<typename Type>
class WaitFreeMPMCQueueNode {
private:
typedef WaitFreeMPMCQueueNode<Type> self_t;
public:
typedef uint32_t index_t;
static const index_t UndefinedIndex;
private:
/**
* Node value
*/
Type value;
/**
* Enqeue accessor id
*/
uint32_t enq_aid;
/**
* Pool-index of next Node in list (atomic), -1 for none
*/
embb::base::Atomic<uint32_t> next_idx;
/**
* Dequeue accessor id (atomic), -1 for none
*/
embb::base::Atomic<uint32_t> deq_aid;
public:
/**
* Default constructor
*/
inline WaitFreeMPMCQueueNode();
/**
* Copy constructor
*/
inline WaitFreeMPMCQueueNode(
const self_t & other
/**< [IN] Instance to copy */
);
/**
* Assignment operator
*/
inline self_t & operator=(
const self_t & other
/**< [IN] Instance to assign */
);
/**
* Constructs a new instance of Node.
*/
inline WaitFreeMPMCQueueNode(
Type const val,
/**< [IN] Value to be contained in the Node instance */
uint32_t enqAid
/**< [IN] Enqueuer accessor id */
);
inline Type Value() const;
inline uint32_t NextPoolIdx() const;
/**
* Set pointer to next Node element via CAS.
*
* \return true if new pointer value could be set.
*/
inline bool CASNext(
uint32_t expNextIdx,
/**< [IN] Expected current pointer value */
uint32_t newNextIdx
/**< [IN] New pointer value to set */
);
inline bool Next_IsNull() const;
inline uint32_t EnqueueAID() const;
inline embb::base::Atomic<index_t> & DequeueAID();
};
} // namespace internal
/**
* Wait-free queue for multiple enqueuers and dequeuers
*
* \concept{CPP_CONCEPTS_QUEUE}
*
* \ingroup CPP_CONTAINERS_QUEUES
*
* \see LockFreeMPMCQueue
* \see WaitFreeSPSCQueue
*
* \tparam Type Type of the queue elements
* \tparam NodeAllocator Allocator type for allocating queue nodes
* \tparam OpAllocator Allocator type for allocating operation description
* objects
* \tparam ValuePool Type of the value pool used as basis for the ObjectPool
* which stores queue nodes
*/
template<
typename Type,
class NodeAllocator =
embb::base::AllocatorCacheAligned<internal::WaitFreeMPMCQueueNode<Type> >,
class OpAllocator =
embb::base::AllocatorCacheAligned<embb::base::Atomic<uint32_t> >,
class ValuePool =
WaitFreeArrayValuePool<bool, false>
>
class WaitFreeMPMCQueue {
private:
typedef internal::WaitFreeMPMCQueueNode<Type> Node_t;
typedef typename internal::WaitFreeMPMCQueueNode<Type>::index_t index_t;
typedef WaitFreeMPMCQueue<Type, NodeAllocator, OpAllocator, ValuePool> self_t;
typedef internal::IndexedObjectPool<
internal::WaitFreeMPMCQueueNode<Type>, ValuePool, NodeAllocator> NodePool_t;
private:
/**
* Maximum size of queue. Using maximum value of
* OperationDesc::NodeIndex (15 bit) minus one element
* required for sentinel node.
*/
static const index_t QUEUE_SIZE_MAX = static_cast<index_t>(32767 - 1);
/**
* Number of guards per thread
*/
static const size_t num_guards = 2;
/**
* Null-pointer for hazard pointers
*/
static const index_t UndefinedGuard = 0;
/**
* Helper class for operation descriptions.
* Used instead of bit-field struct for portability.
*/
class OperationDesc {
private:
static const index_t PENDING_FLAG_MASK = 0x80000000; ///< Bit 32
static const index_t ENQUEUE_FLAG_MASK = 0x40000000; ///< Bit 31
static const index_t NODE_INDEX_MASK = 0x3FFFFFFF; ///< Bit 30..1
private:
OperationDesc();
public:
bool Pending;
bool Enqueue;
index_t NodeIndex;
index_t Raw;
public:
/**
* Converts state to binary value { pending:1, enqueue:1, nodeIndex:30 }
*/
OperationDesc(
bool pending,
bool enqueue,
index_t nodeIndex);
/**
* Expects binary value { pending:1, enqueue:1, nodeIndex:30 }
*/
explicit OperationDesc(index_t raw);
};
/** Index of head node in node pool, should be aligned */
embb::base::Atomic<index_t> headIdx;
/** Index of tail node in node pool, should be aligned */
embb::base::Atomic<index_t> tailIdx;
/** Maximum element capacity of the queue */
size_t max_size_;
/** Callback instance for release of guarded node indices */
embb::base::Function< void, index_t > delete_pointer_callback;
/** Hazard pointer for node index (guards stack top pointer) */
embb::containers::internal::HazardPointer< index_t > hp;
/** Instance of empty node used as sentinel */
Node_t nullNode;
/** Maximum number of threads accessing this queue instance */
size_t num_states;
/** Capacity of the node pool, includes overhead due to hazard pointers */
size_t node_pool_size;
/** Array containing two states for every concurrent accessor on the
queue (needed for swapping) */
embb::base::Atomic<index_t> * operationDescriptions;
/** Pool for element nodes in the queue */
NodePool_t nodePool;
/** Allocator for memory used for operation descriptions */
OpAllocator operationDescriptionAllocator;
/** Index of sentinel node, stored for release in destructor */
index_t sentinelNodeIndex;
private:
/**
* Resolves thread index usable as accessor id.
*
* \returns True if thread index could be resolved, false otherwise.
* A call could fail, e.g. if there have been created more
* threads than initially configured in the embb runtime.
*/
inline bool LoadAccessorThreadIndex(
index_t & retIndexValue
/**< [OUT] Value of thread index */
);
/**
* Resolves maximum required capacity for retired lists in hazard
* pointer implementation
*/
inline size_t RetiredListMaxSize(
size_t nThreads
/**< [IN] maximum number of threads operating on the queue */
);
public:
/**
* Creates a queue with the specified capacity.
*
* \memory
* Let \c t be the maximum number of threads and \c x be <tt>2.5*t+1</tt>.
* Then, <tt>x*(3*t+1)</tt> elements of size <tt>sizeof(void*)</tt>, \c x
* elements of size <tt>sizeof(Type)</tt>, and \c capacity+1 elements of size
* <tt>sizeof(Type)</tt> are allocated.
*
* \notthreadsafe
*
* \see CPP_CONCEPTS_QUEUE
*/
WaitFreeMPMCQueue(
size_t capacity
/**< [IN] Element capacity of the queue */
);
/**
* Destroys the queue.
*
* \notthreadsafe
*/
~WaitFreeMPMCQueue();
/**
* Tries to enqueue an element into the queue.
*
* \return \c true if the element could be enqueued, \c false if the queue is
* full.
*
* \waitfree
*
* \note It might be possible to enqueue more elements into the queue than its
* capacity permits.
*
* \see CPP_CONCEPTS_QUEUE
*/
bool TryEnqueue(
Type const & element
/**< [IN] Const reference to the element that shall be enqueued */
);
/**
* Tries to dequeue an element from the queue.
*
* \return \c true if an element could be dequeued, \c false if the queue is
* empty.
*
* \waitfree
*
* \see CPP_CONCEPTS_QUEUE
*/
bool TryDequeue(
Type & retElement
/**< [IN,OUT] Reference to the dequeued element. Unchanged, if the
operation was not successful. */
);
/**
* Returns the maximum size for instances of this queue.
*
* \waitfree
*/
inline size_t GetCapacity();
private:
/**
* Help progressing pending enqueue operations of given accessors
*/
void HelpEnqueue(
unsigned int accessorId
/**< [IN] Accessor id of operations to help */
);
/**
* Help finishing pending enqueue operations of arbitrary accessors,
* including own pending enqueue operations.
*
* \waitfree
*/
void HelpFinishEnqueue();
/**
* Help progressing pending dequeue operations of given accessor,
* including own pending dequeue operations.
*
* \waitfree
*/
void HelpDequeue(
index_t accessorId
/**< [IN] Accessor id of operations to help */
);
/**
* Help finishing pending dequeue operations of arbitrary accessors,
* including own pending dequeue operations.
*
* \waitfree
*/
void HelpFinishDequeue();
/**
* Help finishing pending operations of arbitrary accessors, including
* own pending operations.
* One operation of every other thread is completed before engaging in the
* calling thread's own announced operation.
*
* \waitfree
*/
void Help();
/**
* Whether the accessor with given id is pending.
*
* \returns True if the given accessor has a pending operation.
*
* \waitfree
*/
inline bool IsPending(
unsigned int accessorId
/**< [IN] Accessor id of operations to help */
);
/**
* Whether the node with given index is in the process of being
* dequeued. Prevents reclamation of node id when it cannot be
* guarded by hazard pointers, between HelpDequeue and HelpFinishDequeue.
*/
bool NodeIsPending(
index_t nodeId
/**< [IN] Pool index of the node instance to test */
);
/**
* Callback used for deallocating a node index from hazard
* pointers. Frees the associated node in the pool.
*
* \waitfree
*/
void DeleteNodeCallback(
index_t releasedNodeIndex
/**< [IN] Pool index of the node instance to release */
);
};
} // namespace containers
} // namespace embb
#include <embb/containers/internal/wait_free_mpmc_queue-inl.h>
#endif // EMBB_CONTAINERS_WAIT_FREE_MPMC_QUEUE_H_
...@@ -113,6 +113,7 @@ namespace containers { ...@@ -113,6 +113,7 @@ namespace containers {
* \ingroup CPP_CONTAINERS_QUEUES * \ingroup CPP_CONTAINERS_QUEUES
* *
* \see LockFreeMPMCQueue * \see LockFreeMPMCQueue
* \see WaitFreeMPMCQueue
* *
* \tparam Type Type of the queue elements * \tparam Type Type of the queue elements
* \tparam Allocator Allocator type for allocating queue elements. * \tparam Allocator Allocator type for allocating queue elements.
......
...@@ -26,10 +26,11 @@ ...@@ -26,10 +26,11 @@
#include <embb/containers/lock_free_tree_value_pool.h> #include <embb/containers/lock_free_tree_value_pool.h>
#include <embb/containers/wait_free_array_value_pool.h> #include <embb/containers/wait_free_array_value_pool.h>
#include <embb/containers/wait_free_spsc_queue.h>
#include <embb/containers/object_pool.h> #include <embb/containers/object_pool.h>
#include <embb/containers/lock_free_stack.h> #include <embb/containers/lock_free_stack.h>
#include <embb/containers/lock_free_mpmc_queue.h> #include <embb/containers/lock_free_mpmc_queue.h>
#include <embb/containers/wait_free_spsc_queue.h>
#include <embb/containers/wait_free_mpmc_queue.h>
#include <embb/base/c/memory_allocation.h> #include <embb/base/c/memory_allocation.h>
#include <partest/partest.h> #include <partest/partest.h>
...@@ -47,6 +48,7 @@ using embb::containers::WaitFreeArrayValuePool; ...@@ -47,6 +48,7 @@ using embb::containers::WaitFreeArrayValuePool;
using embb::containers::LockFreeTreeValuePool; using embb::containers::LockFreeTreeValuePool;
using embb::containers::WaitFreeSPSCQueue; using embb::containers::WaitFreeSPSCQueue;
using embb::containers::LockFreeMPMCQueue; using embb::containers::LockFreeMPMCQueue;
using embb::containers::WaitFreeMPMCQueue;
using embb::containers::LockFreeStack; using embb::containers::LockFreeStack;
using embb::containers::LockFreeTreeValuePool; using embb::containers::LockFreeTreeValuePool;
using embb::containers::WaitFreeArrayValuePool; using embb::containers::WaitFreeArrayValuePool;
...@@ -60,16 +62,20 @@ PT_MAIN("Data Structures C++") { ...@@ -60,16 +62,20 @@ PT_MAIN("Data Structures C++") {
unsigned int max_threads = static_cast<unsigned int>( unsigned int max_threads = static_cast<unsigned int>(
2 * partest::TestSuite::GetDefaultNumThreads()); 2 * partest::TestSuite::GetDefaultNumThreads());
embb_thread_set_max_count(max_threads); embb_thread_set_max_count(max_threads);
#if 0
PT_RUN(PoolTest< WaitFreeArrayValuePool<int COMMA -1> >); PT_RUN(PoolTest< WaitFreeArrayValuePool<int COMMA -1> >);
PT_RUN(PoolTest< LockFreeTreeValuePool<int COMMA -1> >); PT_RUN(PoolTest< LockFreeTreeValuePool<int COMMA -1> >);
PT_RUN(HazardPointerTest); PT_RUN(HazardPointerTest);
PT_RUN(QueueTest< WaitFreeSPSCQueue< ::std::pair<size_t COMMA int> > >); PT_RUN(QueueTest< WaitFreeSPSCQueue< ::std::pair<size_t COMMA int> > >);
PT_RUN(QueueTest< LockFreeMPMCQueue< ::std::pair<size_t COMMA int> > PT_RUN(QueueTest< LockFreeMPMCQueue< ::std::pair<size_t COMMA int> >
COMMA true COMMA true >); COMMA true COMMA true >);
#endif
PT_RUN(QueueTest< WaitFreeMPMCQueue< ::std::pair<size_t COMMA int> >
COMMA true COMMA true >);
#if 0
PT_RUN(StackTest< LockFreeStack<int> >); PT_RUN(StackTest< LockFreeStack<int> >);
PT_RUN(ObjectPoolTest< LockFreeTreeValuePool<bool COMMA false > >); PT_RUN(ObjectPoolTest< LockFreeTreeValuePool<bool COMMA false > >);
PT_RUN(ObjectPoolTest< WaitFreeArrayValuePool<bool COMMA false> >); PT_RUN(ObjectPoolTest< WaitFreeArrayValuePool<bool COMMA false> >);
#endif
PT_EXPECT(embb_get_bytes_allocated() == 0); PT_EXPECT_EQ(embb_get_bytes_allocated(), static_cast<size_t>(0));
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment