Commit a023d6e4 by Christian Kern

This resolves ticket #523 - rework hazard pointer:

   - Do not use lists
   - Remove HelpScan
   - Release memory to memory manager when destructed (before that, memory had to be released by the memory manager in its destructor).
   - Avoid stdlib algorithms
parent d36f1bdb
...@@ -129,5 +129,13 @@ void embb_internal_thread_index_set_max(unsigned int max) { ...@@ -129,5 +129,13 @@ void embb_internal_thread_index_set_max(unsigned int max) {
} }
void embb_internal_thread_index_reset() { void embb_internal_thread_index_reset() {
// This function is only called in tests, usually when all other threads
// except the main thread have terminated. However, the main thread still has
// potentially still stored its old index value in its thread local storage,
// which might be assigned additionally to another thread (as the counter is
// reset), which may lead to hard to detect bugs. Therefore, reset the thread
// local thread id here.
embb_internal_thread_index_var = UINT_MAX;
embb_counter_init(embb_thread_index_counter()); embb_counter_init(embb_thread_index_counter());
} }
...@@ -77,7 +77,12 @@ LockFreeMPMCQueue<Type, ValuePool>::~LockFreeMPMCQueue() { ...@@ -77,7 +77,12 @@ LockFreeMPMCQueue<Type, ValuePool>::~LockFreeMPMCQueue() {
template< typename Type, typename ValuePool > template< typename Type, typename ValuePool >
LockFreeMPMCQueue<Type, ValuePool>::LockFreeMPMCQueue(size_t capacity) : LockFreeMPMCQueue<Type, ValuePool>::LockFreeMPMCQueue(size_t capacity) :
capacity(capacity), capacity(capacity),
// Object pool, size with respect to the maximum number of retired nodes not
// eligible for reuse. +1 for dummy node.
objectPool(
MPMCQueueNodeHazardPointer_t::ComputeMaximumRetiredObjectCount(2) +
capacity + 1),
// Disable "this is used in base member initializer" warning. // Disable "this is used in base member initializer" warning.
// We explicitly want this. // We explicitly want this.
#ifdef EMBB_PLATFORM_COMPILER_MSVC #ifdef EMBB_PLATFORM_COMPILER_MSVC
...@@ -89,13 +94,7 @@ delete_pointer_callback(*this, ...@@ -89,13 +94,7 @@ delete_pointer_callback(*this,
#ifdef EMBB_PLATFORM_COMPILER_MSVC #ifdef EMBB_PLATFORM_COMPILER_MSVC
#pragma warning(pop) #pragma warning(pop)
#endif #endif
hazardPointer(delete_pointer_callback, NULL, 2), hazardPointer(delete_pointer_callback, NULL, 2) {
// Object pool, size with respect to the maximum number of retired nodes not
// eligible for reuse. +1 for dummy node.
objectPool(
hazardPointer.GetRetiredListMaxSize()*
embb::base::Thread::GetThreadsMaxCount() +
capacity + 1) {
// Allocate dummy node to reduce the number of special cases to consider. // Allocate dummy node to reduce the number of special cases to consider.
internal::LockFreeMPMCQueueNode<Type>* dummyNode = objectPool.Allocate(); internal::LockFreeMPMCQueueNode<Type>* dummyNode = objectPool.Allocate();
// Initially, head and tail point to the dummy node. // Initially, head and tail point to the dummy node.
...@@ -120,7 +119,7 @@ bool LockFreeMPMCQueue<Type, ValuePool>::TryEnqueue(Type const& element) { ...@@ -120,7 +119,7 @@ bool LockFreeMPMCQueue<Type, ValuePool>::TryEnqueue(Type const& element) {
for (;;) { for (;;) {
my_tail = tail; my_tail = tail;
hazardPointer.GuardPointer(0, my_tail); hazardPointer.Guard(0, my_tail);
// Check if pointer is still valid after guarding. // Check if pointer is still valid after guarding.
if (my_tail != tail) { if (my_tail != tail) {
...@@ -163,12 +162,12 @@ bool LockFreeMPMCQueue<Type, ValuePool>::TryDequeue(Type & element) { ...@@ -163,12 +162,12 @@ bool LockFreeMPMCQueue<Type, ValuePool>::TryDequeue(Type & element) {
Type data; Type data;
for (;;) { for (;;) {
my_head = head; my_head = head;
hazardPointer.GuardPointer(0, my_head); hazardPointer.Guard(0, my_head);
if (my_head != head) continue; if (my_head != head) continue;
my_tail = tail; my_tail = tail;
my_next = my_head->GetNext(); my_next = my_head->GetNext();
hazardPointer.GuardPointer(1, my_next); hazardPointer.Guard(1, my_next);
if (head != my_head) continue; if (head != my_head) continue;
if (my_next == NULL) if (my_next == NULL)
...@@ -187,7 +186,7 @@ bool LockFreeMPMCQueue<Type, ValuePool>::TryDequeue(Type & element) { ...@@ -187,7 +186,7 @@ bool LockFreeMPMCQueue<Type, ValuePool>::TryDequeue(Type & element) {
break; break;
} }
hazardPointer.EnqueuePointerForDeletion(my_head); hazardPointer.EnqueueForDeletion(my_head);
element = data; element = data;
return true; return true;
} }
......
...@@ -81,13 +81,13 @@ capacity(capacity), ...@@ -81,13 +81,13 @@ capacity(capacity),
#ifdef EMBB_PLATFORM_COMPILER_MSVC #ifdef EMBB_PLATFORM_COMPILER_MSVC
#pragma warning(pop) #pragma warning(pop)
#endif #endif
hazardPointer(delete_pointer_callback, NULL, 1),
// Object pool, size with respect to the maximum number of retired nodes not // Object pool, size with respect to the maximum number of retired nodes not
// eligible for reuse: // eligible for reuse:
objectPool( objectPool(
hazardPointer.GetRetiredListMaxSize()* StackNodeHazardPointer_t::ComputeMaximumRetiredObjectCount(1) +
embb::base::Thread::GetThreadsMaxCount() + capacity),
capacity) { hazardPointer(delete_pointer_callback, NULL, 1)
{
} }
template< typename Type, typename ValuePool > template< typename Type, typename ValuePool >
...@@ -128,7 +128,7 @@ bool LockFreeStack< Type, ValuePool >::TryPop(Type & element) { ...@@ -128,7 +128,7 @@ bool LockFreeStack< Type, ValuePool >::TryPop(Type & element) {
return false; return false;
// Guard top_cached // Guard top_cached
hazardPointer.GuardPointer(0, top_cached); hazardPointer.Guard(0, top_cached);
// Check if top is still top. If this is the case, it has not been // Check if top is still top. If this is the case, it has not been
// retired yet (because before retiring that thing, the retiring thread // retired yet (because before retiring that thing, the retiring thread
...@@ -144,16 +144,16 @@ bool LockFreeStack< Type, ValuePool >::TryPop(Type & element) { ...@@ -144,16 +144,16 @@ bool LockFreeStack< Type, ValuePool >::TryPop(Type & element) {
break; break;
} else { } else {
// We continue with the next and can unguard top_cached // We continue with the next and can unguard top_cached
hazardPointer.GuardPointer(0, NULL); hazardPointer.Guard(0, NULL);
} }
} }
Type data = top_cached->GetElement(); Type data = top_cached->GetElement();
// We don't need to read from this reference anymore, unguard it // We don't need to read from this reference anymore, unguard it
hazardPointer.GuardPointer(0, NULL); hazardPointer.Guard(0, NULL);
hazardPointer.EnqueuePointerForDeletion(top_cached); hazardPointer.EnqueueForDeletion(top_cached);
element = data; element = data;
return true; return true;
......
...@@ -113,8 +113,17 @@ class LockFreeMPMCQueue { ...@@ -113,8 +113,17 @@ class LockFreeMPMCQueue {
* least as many elements, maybe more. * least as many elements, maybe more.
*/ */
size_t capacity; size_t capacity;
// Do not change the ordering of class local variables.
// Important for initialization. /**
* The object pool, used for lock-free memory allocation.
*
* Warning: the objectPool has to be initialized before the hazardPointer
* object, to be sure that the hazardPointer object is destructed before the
* Pool as the hazardPointer object might return elements to the pool in its
* destructor. So the ordering of the members objectPool and hazardPointer is
* important here!
*/
ObjectPool< internal::LockFreeMPMCQueueNode<Type>, ValuePool > objectPool;
/** /**
* Callback to the method that is called by hazard pointers if a pointer is * Callback to the method that is called by hazard pointers if a pointer is
...@@ -124,15 +133,17 @@ class LockFreeMPMCQueue { ...@@ -124,15 +133,17 @@ class LockFreeMPMCQueue {
delete_pointer_callback; delete_pointer_callback;
/** /**
* The hazard pointer object, used for memory management. * Definition of the used hazard pointer type
*/ */
embb::containers::internal::HazardPointer typedef embb::containers::internal::HazardPointer
< internal::LockFreeMPMCQueueNode<Type>* > hazardPointer; < internal::LockFreeMPMCQueueNode<Type>* >
MPMCQueueNodeHazardPointer_t;
/** /**
* The object pool, used for lock-free memory allocation. * The hazard pointer object, used for memory management.
*/ */
ObjectPool< internal::LockFreeMPMCQueueNode<Type>, ValuePool > objectPool; MPMCQueueNodeHazardPointer_t hazardPointer;
/** /**
* Atomic pointer to the head node of the queue * Atomic pointer to the head node of the queue
......
...@@ -187,11 +187,6 @@ class LockFreeStack { ...@@ -187,11 +187,6 @@ class LockFreeStack {
delete_pointer_callback; delete_pointer_callback;
/** /**
* The hazard pointer object, used for memory management.
*/
internal::HazardPointer<internal::LockFreeStackNode<Type>*> hazardPointer;
/**
* The callback function, used to cleanup non-hazardous pointers. * The callback function, used to cleanup non-hazardous pointers.
* \see delete_pointer_callback * \see delete_pointer_callback
*/ */
...@@ -199,10 +194,27 @@ class LockFreeStack { ...@@ -199,10 +194,27 @@ class LockFreeStack {
/** /**
* The object pool, used for lock-free memory allocation. * The object pool, used for lock-free memory allocation.
*
* Warning: the objectPool has to be initialized before the hazardPointer
* object, to be sure that the hazardPointer object is destructed before the
* Pool as the hazardPointer object might return elements to the pool in its
* destructor. So the ordering of the members objectPool and hazardPointer is
* important here!
*/ */
ObjectPool< internal::LockFreeStackNode<Type>, ValuePool > objectPool; ObjectPool< internal::LockFreeStackNode<Type>, ValuePool > objectPool;
/** /**
* Definition of the used hazard pointer type
*/
typedef internal::HazardPointer < internal::LockFreeStackNode<Type>* >
StackNodeHazardPointer_t;
/**
* The hazard pointer object, used for memory management.
*/
StackNodeHazardPointer_t hazardPointer;
/**
* Atomic pointer to the top node of the stack (element that is popped next) * Atomic pointer to the top node of the stack (element that is popped next)
*/ */
embb::base::Atomic<internal::LockFreeStackNode<Type>*> top; embb::base::Atomic<internal::LockFreeStackNode<Type>*> top;
......
...@@ -36,33 +36,116 @@ ...@@ -36,33 +36,116 @@
namespace embb { namespace embb {
namespace containers { namespace containers {
namespace test { namespace test {
/**
* @brief a very simple wait-free object pool implementation to have tests
* being independent of the EMBB object pool implementation.
*/
class IntObjectTestPool {
private:
int* simplePoolObjects;
embb::base::Atomic<int>* simplePool;
public:
static const int ALLOCATED_MARKER = 1;
static const int FREE_MARKER = 0;
unsigned int poolSize;
IntObjectTestPool(unsigned int poolSize);
~IntObjectTestPool();
/**
* Allocate object from the pool
*
* @return the allocated object
*/
int* Allocate();
/**
* Return an element to the pool
*
* @param objectPointer the object to be freed
*/
void Release(int* objectPointer);
};
class HazardPointerTest : public partest::TestCase { class HazardPointerTest : public partest::TestCase {
private: private:
embb::base::Function<void, embb::base::Atomic<int>*> delete_pointer_callback; embb::base::Function<void, embb::base::Atomic<int>*> deletePointerCallback;
//used to allocate random stuff, we will just use the pointers, not the //used to allocate random stuff, we will just use the pointers, not the
//contents //contents
embb::containers::ObjectPool< embb::base::Atomic<int> >* object_pool; embb::containers::ObjectPool< embb::base::Atomic<int> >* objectPool;
//used to move pointer between threads //used to move pointer between threads
embb::containers::LockFreeStack< embb::base::Atomic<int>* >* stack; embb::containers::LockFreeStack< embb::base::Atomic<int>* >* stack;
embb::base::Mutex vector_mutex; embb::base::Mutex vectorMutex;
embb::containers::internal::HazardPointer<embb::base::Atomic<int>*>* hp; embb::containers::internal::HazardPointer<embb::base::Atomic<int>*>*
std::vector< embb::base::Atomic<int>* > deleted_vector; hazardPointer;
int n_threads; std::vector< embb::base::Atomic<int>* > deletedVector;
int n_elements_per_thread; int nThreads;
int n_elements; int nElementsPerThread;
int nElements;
public: public:
/** /**
* Adds test methods. * Adds test methods.
*/ */
HazardPointerTest(); HazardPointerTest();
void HazardPointerTest1_Pre(); void HazardPointerTest1Pre();
void HazardPointerTest1_Post(); void HazardPointerTest1Post();
void HazardPointerTest1_ThreadMethod(); void HazardPointerTest1ThreadMethod();
void DeletePointerCallback(embb::base::Atomic<int>* to_delete); void DeletePointerCallback(embb::base::Atomic<int>* toDelete);
}; };
class HazardPointerTest2 : public partest::TestCase {
private:
// number of threads, participating in that test
int nThreads;
embb::base::Function<void, int*> deletePointerCallback;
// the thread id of the master
embb::base::Atomic<unsigned int> currentMaster;
// variables, to synchronize threads. At each point in time, one master,
// the master changes each round until each thread was assigned master once.
embb::base::Atomic<int> sync1;
embb::base::Atomic<unsigned int> sync2;
unsigned int guardsPerThreadCount;
unsigned int guaranteedCapacityPool;
unsigned int poolSizeUsingHazardPointer;
// The threads write here, if they guarded an object successfully. Used to
// determine when all allocated objects were guarded successfully.
embb::base::Atomic<int*>* sharedGuarded;
// This array is used by the master, to communicate and share what he has
// allocated with the slaves.
embb::base::Atomic<int*>* sharedAllocated;
// Reference to the object pool
IntObjectTestPool* testPool;
embb::containers::internal::HazardPointer<int*>* hazardPointer;
static const int finishMarker = -1;
public:
void DeletePointerCallback(int* toDelete);
bool SetRelativeGuards();
void HazardPointerTest2Master();
void HazardPointerTest2Slave();
void HazardPointerTest2Pre();
void HazardPointerTest2Post();
void HazardPointerTest2ThreadMethod();
HazardPointerTest2();
};
} // namespace test } // namespace test
} // namespace containers } // namespace containers
} // namespace embb } // namespace embb
......
...@@ -55,6 +55,7 @@ using embb::containers::test::HazardPointerTest; ...@@ -55,6 +55,7 @@ using embb::containers::test::HazardPointerTest;
using embb::containers::test::QueueTest; using embb::containers::test::QueueTest;
using embb::containers::test::StackTest; using embb::containers::test::StackTest;
using embb::containers::test::ObjectPoolTest; using embb::containers::test::ObjectPoolTest;
using embb::containers::test::HazardPointerTest2;
PT_MAIN("Data Structures C++") { PT_MAIN("Data Structures C++") {
unsigned int max_threads = static_cast<unsigned int>( unsigned int max_threads = static_cast<unsigned int>(
...@@ -64,6 +65,7 @@ PT_MAIN("Data Structures C++") { ...@@ -64,6 +65,7 @@ PT_MAIN("Data Structures C++") {
PT_RUN(PoolTest< WaitFreeArrayValuePool<int COMMA -1> >); PT_RUN(PoolTest< WaitFreeArrayValuePool<int COMMA -1> >);
PT_RUN(PoolTest< LockFreeTreeValuePool<int COMMA -1> >); PT_RUN(PoolTest< LockFreeTreeValuePool<int COMMA -1> >);
PT_RUN(HazardPointerTest); PT_RUN(HazardPointerTest);
PT_RUN(HazardPointerTest2);
PT_RUN(QueueTest< WaitFreeSPSCQueue< ::std::pair<size_t COMMA int> > >); PT_RUN(QueueTest< WaitFreeSPSCQueue< ::std::pair<size_t COMMA int> > >);
PT_RUN(QueueTest< LockFreeMPMCQueue< ::std::pair<size_t COMMA int> > PT_RUN(QueueTest< LockFreeMPMCQueue< ::std::pair<size_t COMMA int> >
COMMA true COMMA true >); COMMA true COMMA true >);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment