diff --git a/containers_cpp/test/main.cc b/containers_cpp/test/main.cc index fe51456..21dbe61 100644 --- a/containers_cpp/test/main.cc +++ b/containers_cpp/test/main.cc @@ -55,11 +55,12 @@ PT_MAIN("Data Structures C++") { PT_RUN(embb::containers::test::HazardPointerTest); - PT_RUN(embb::containers::test::QueueTest< - embb::containers::WaitFreeSPSCQueue >); + PT_RUN(embb::containers::test::QueueTest< + embb::containers::WaitFreeSPSCQueue< ::std::pair > >); - PT_RUN(embb::containers::test::QueueTest< - embb::containers::LockFreeMPMCQueue COMMA true COMMA true >); + PT_RUN(embb::containers::test::QueueTest< + embb::containers::LockFreeMPMCQueue< ::std::pair > + COMMA true COMMA true >); PT_RUN(embb::containers::test::StackTest< embb::containers::LockFreeStack >); diff --git a/containers_cpp/test/queue_test-inl.h b/containers_cpp/test/queue_test-inl.h index 4cc7a95..3b29355 100644 --- a/containers_cpp/test/queue_test-inl.h +++ b/containers_cpp/test/queue_test-inl.h @@ -30,132 +30,190 @@ #include #include -#include - namespace embb { namespace containers { namespace test { template QueueTest::QueueTest() : -n_threads(static_cast - (partest::TestSuite::GetDefaultNumThreads())), - n_iterations(200), - n_queue_elements_per_thread(100), - n_queue_elements(n_queue_elements_per_thread*n_threads), - queueSize(0) { + n_threads(static_cast(partest::TestSuite::GetDefaultNumThreads())), + n_queue_size( + static_cast(partest::TestSuite::GetDefaultNumIterations()) * + MIN_TOTAL_PRODUCE_CONSUME_COUNT), + n_total_produce_consume_count(n_queue_size), + n_producers(1), + n_consumers(1), + next_producer_id(0), + next_consumer_id(0), + n_producer_elements( + static_cast(partest::TestSuite::GetDefaultNumIterations() * + MIN_ENQ_ELEMENTS)) { CreateUnit("QueueTestSingleThreadEnqueueDequeue"). Pre(&QueueTest::QueueTestSingleThreadEnqueueDequeue_Pre, this). Add(&QueueTest::QueueTestSingleThreadEnqueueDequeue_ThreadMethod, this). Post(&QueueTest::QueueTestSingleThreadEnqueueDequeue_Post, this); - CreateUnit("QueueTestTwoThreadsSingleProducerSingleConsumer"). - Pre(&QueueTest::QueueTestSingleProducedSingleConsumer_Pre, this). - Add(&QueueTest::QueueTestSingleProducedSingleConsumer_ThreadMethod, - this, - 2, - TOTAL_PRODUCE_CONSUME_COUNT). - Post(&QueueTest::QueueTestSingleProducedSingleConsumer_Post, this); - -#ifdef EMBB_COMPILER_MSVC + Pre(&QueueTest::QueueTestSingleProducerSingleConsumer_Pre, this). + Add(&QueueTest::QueueTestSingleProducerSingleConsumer_ThreadMethod, + this, + 2, + static_cast(n_total_produce_consume_count)). + Post(&QueueTest::QueueTestSingleProducerSingleConsumer_Post, this); + +#ifdef _MSC_VER #pragma warning(push) #pragma warning(disable:4127) #endif - if (MultipleProducers == true && - MultipleConsumers == true) { -#ifdef EMBB_COMPILER_MSVC + if (MultipleProducers == true && MultipleConsumers == true) { + // MP/MC + n_producers = n_threads / 2; + n_consumers = n_threads / 2; +#ifdef _MSC_VER #pragma warning(pop) #endif - CreateUnit("QueueTestMultipleThreadsMultipleProducerMultipleConsumer"). - Pre(&QueueTest::QueueTestMultipleProducerMultipleConsumer_Pre, this). - Add(&QueueTest::QueueTestMultipleProducerMultipleConsumer_ThreadMethod, - this, - static_cast(n_threads), - static_cast(n_iterations)). - Post(&QueueTest::QueueTestMultipleProducerMultipleConsumer_Post, this); + CreateUnit("QueueTestOrderMultipleProducerMultipleConsumer"). + Pre(&QueueTest::QueueTestOrderMPMC_Pre, this). + Add(&QueueTest::QueueTestOrderMPMC_ConsumerThreadMethod, + this, + static_cast(n_consumers), + static_cast(1)). + Add(&QueueTest::QueueTestOrderMPMC_ProducerThreadMethod, + this, + static_cast(n_producers), + static_cast(1)). + Post(&QueueTest::QueueTestOrderMPMC_Post, this); } } template void QueueTest:: -QueueTestMultipleProducerMultipleConsumer_Pre() { +QueueTestOrderMPMC_Pre() { + queue = new Queue_t(static_cast(n_producer_elements)); embb_internal_thread_index_reset(); - queue = new Queue_t(static_cast(n_queue_elements)); - - thread_local_vectors = - new std::vector[static_cast(n_threads)]; - - for (int i = 0; i != n_threads; ++i) { - int offset = n_queue_elements_per_thread * 2; - - for (int i2 = 0; i2 != n_queue_elements_per_thread; ++i2) { - int push_element = i2 + (offset*i); - thread_local_vectors[i].push_back(push_element); - expected_queue_elements.push_back(push_element); - } + next_producer_id = 0; + next_consumer_id = 0; + consumers.clear(); + producers.clear(); + for (size_t p = 0; p < static_cast(n_producers); ++p) { + producers.push_back(Producer(queue, p, n_producer_elements)); + } + for (size_t c = 0; c < static_cast(n_consumers); ++c) { + consumers.push_back(Consumer(queue, n_producers, n_producer_elements)); } } template void QueueTest:: -QueueTestMultipleProducerMultipleConsumer_Post() { - std::vector produced; - for (int i = 0; i != n_threads; ++i) { - std::vector& loc_elements = thread_local_vectors[i]; - for (std::vector::iterator it = loc_elements.begin(); - it != loc_elements.end(); - ++it) { - produced.push_back(*it); +QueueTestOrderMPMC_Post() { + delete queue; + // Tally for all elements enqueued by all producers, + // initialized with all 0: + ::std::vector total_tally; + size_t n_elements_total = static_cast(n_producers * n_producer_elements); + for (size_t i = 0; i < n_elements_total / 8; ++i) { + total_tally.push_back(0); + } + // Collect all dequeued element flags from consumers: + for (size_t c = 0; c < static_cast(n_consumers); ++c) { + for (size_t e = 0; e < n_elements_total / 8; ++e) { + total_tally[e] |= consumers[c].Tally()[e]; } } - - PT_ASSERT(produced.size() == expected_queue_elements.size()); - - std::sort(expected_queue_elements.begin(), expected_queue_elements.end()); - std::sort(produced.begin(), produced.end()); - - for (unsigned int i = 0; - i != static_cast(produced.size()); ++i) { - PT_ASSERT(expected_queue_elements[i] == produced[i]); + // Test if all elements have been dequeued by any + // consumer. + // To avoid static cast warning: + for (size_t t = 0; + t < static_cast(n_producers * n_producer_elements / 8); + ++t) { + PT_ASSERT_EQ_MSG(total_tally[t], 0xff, + "missing dequeued elements"); } - - delete[] thread_local_vectors; - delete queue; } template void QueueTest:: -QueueTestMultipleProducerMultipleConsumer_ThreadMethod() { - unsigned int thread_index; - int return_val = embb_internal_thread_index(&thread_index); - - PT_ASSERT(EMBB_SUCCESS == return_val); +QueueTestOrderMPMC_ProducerThreadMethod() { + size_t p_id = next_producer_id.FetchAndAdd(1); + producers[p_id].Run(); +} - std::vector& my_elements = thread_local_vectors[thread_index]; +template +void QueueTest:: +QueueTestOrderMPMC_ConsumerThreadMethod() { + size_t c_id = next_consumer_id.FetchAndAdd(1); + consumers[c_id].Run(); +} - for (std::vector::iterator it = my_elements.begin(); - it != my_elements.end(); - ++it) { - int enq = *it; - bool success = queue->TryEnqueue(enq); - PT_ASSERT(success == true); +template +void QueueTest::Producer:: +Run() { + // Enqueue pairs of (producer id, counter): + for (int i = 0; i < n_producer_elements; ++i) { + while (!q->TryEnqueue(element_t(producer_id, i))) { + embb::base::Thread::CurrentYield(); + } + } + // Enqueue -1 as terminator element of this producer: + while (!q->TryEnqueue(element_t(producer_id, -1))) { + embb::base::Thread::CurrentYield(); } +} - my_elements.clear(); +template +QueueTest::Consumer:: +Consumer(Queue_t * const queue, int numProducers, int numProducerElements) : + q(queue), + n_producers(numProducers), + n_producer_elements(numProducerElements) { + for (int p_id = 0; p_id < n_producers; ++p_id) { + // Initialize last value dequeued from producers with + // below-minimum value: + sequence_number.push_back(-1); + // Initialize element tally for producer with all 0, + // 8 flags / char: + for (int i = 0; i < n_producer_elements / 8; ++i) { + consumer_tally.push_back(0); + } + } +} - for (int i = 0; i != n_queue_elements_per_thread; ++i) { - int dequ; - bool success = queue->TryDequeue(dequ); - PT_ASSERT(success == true); - my_elements.push_back(dequ); +template +void QueueTest::Consumer:: +Run() { + element_t element; + size_t producerId; + // To avoid compiler warning + bool forever = true; + while (forever) { + if (!q->TryDequeue(element)) { + continue; + } + if (element.second < 0) { + break; + } + producerId = element.first; + // Assert on dequeued element: + PT_ASSERT_LT_MSG(producerId, static_cast(n_producers), + "Invalid producer id in dequeue"); + PT_ASSERT_LT_MSG(sequence_number[producerId], element.second, + "Invalid element sequence"); + // Store last value received from the element's producer: + sequence_number[producerId] = element.second; + const size_t pos((producerId * n_producer_elements) + + static_cast(element.second)); + // Test dequeued element's position flag: tally[pos] == 1 + PT_ASSERT_EQ_MSG(consumer_tally[pos / 8] & (0x80 >> (pos % 8)), 0, + "Element dequeued twice"); + // Set flag at dequeued element's position: + // tally[pos] = 1 + consumer_tally[pos / 8] |= (0x80 >> (pos % 8)); } } template void QueueTest:: -QueueTestSingleProducedSingleConsumer_Pre() { +QueueTestSingleProducerSingleConsumer_Pre() { embb_internal_thread_index_reset(); - - queue = new Queue_t(QUEUE_SIZE); + queue = new Queue_t(static_cast(n_queue_size)); thread_selector_producer = -1; produce_count = 0; consume_count = 0; @@ -165,53 +223,47 @@ QueueTestSingleProducedSingleConsumer_Pre() { template void QueueTest:: -QueueTestSingleProducedSingleConsumer_Post() { +QueueTestSingleProducerSingleConsumer_Post() { embb_atomic_memory_barrier(); ::std::sort(consumed_elements.begin(), consumed_elements.end()); ::std::sort(produced_elements.begin(), produced_elements.end()); - PT_ASSERT(consumed_elements.size() == produced_elements.size()); - for (unsigned int i = 0; i != static_cast(consumed_elements.size()); i++) { PT_ASSERT(consumed_elements[i] == produced_elements[i]); } - delete queue; } template void QueueTest:: -QueueTestSingleProducedSingleConsumer_ThreadMethod() { +QueueTestSingleProducerSingleConsumer_ThreadMethod() { unsigned int thread_index; int return_val = embb_internal_thread_index(&thread_index); PT_ASSERT(return_val == EMBB_SUCCESS); - if (thread_selector_producer == -1) { int expected = -1; thread_selector_producer.CompareAndSwap(expected, static_cast(thread_index)); while (thread_selector_producer == -1) {} } - - // we are the producer if (static_cast(thread_selector_producer.Load()) == thread_index) { - while (produce_count >= QUEUE_SIZE) {} + // we are the producer + while (produce_count >= n_queue_size) { } - int random_var = rand() % 10000; + element_t random_var(0, rand() % 10000); bool success = queue->TryEnqueue(random_var); PT_ASSERT(success == true); produce_count++; produced_elements.push_back(random_var); - // we are the consumer } else { - while (consume_count < TOTAL_PRODUCE_CONSUME_COUNT) { + // we are the consumer + while (consume_count < n_total_produce_consume_count) { consume_count++; - while (produce_count == 0) {} - int consumed; + element_t consumed; bool success = queue->TryDequeue(consumed); PT_ASSERT(success == true); produce_count--; @@ -223,23 +275,24 @@ QueueTestSingleProducedSingleConsumer_ThreadMethod() { template void QueueTest:: QueueTestSingleThreadEnqueueDequeue_ThreadMethod() { - for (int i = 0; i != QUEUE_SIZE; ++i) { - bool success = queue->TryEnqueue(i * 133); + for (int i = 0; i != n_queue_size; ++i) { + bool success = queue->TryEnqueue(element_t(0, i * 133)); PT_ASSERT(success == true); } - for (int i = 0; i != QUEUE_SIZE; ++i) { - int dequ = -1; + for (int i = 0; i != n_queue_size; ++i) { + element_t dequ(0, -1); bool success = queue->TryDequeue(dequ); PT_ASSERT(success == true); - PT_ASSERT(dequ == i * 133); + PT_ASSERT(dequ.second == i * 133); } } template void QueueTest:: QueueTestSingleThreadEnqueueDequeue_Pre() { - queue = new Queue_t(QUEUE_SIZE); + queue = new Queue_t(static_cast(n_queue_size)); } + template void QueueTest:: QueueTestSingleThreadEnqueueDequeue_Post() { diff --git a/containers_cpp/test/queue_test.h b/containers_cpp/test/queue_test.h index c405b05..136e4a7 100644 --- a/containers_cpp/test/queue_test.h +++ b/containers_cpp/test/queue_test.h @@ -28,9 +28,9 @@ #define CONTAINERS_CPP_TEST_QUEUE_TEST_H_ #include +#include #include #include -#include namespace embb { namespace containers { @@ -39,36 +39,76 @@ template class QueueTest : public partest::TestCase { + public: + typedef ::std::pair element_t; + private: + /// Minimum number of elements enqueued by every producer + /// in MP/MC unit test. Must be a multiple of 8. + static const int MIN_ENQ_ELEMENTS = 120; + static const int MIN_TOTAL_PRODUCE_CONSUME_COUNT = 1000; + + private: + class Consumer { + private: + Queue_t * q; + int n_producers; + int n_producer_elements; + ::std::vector consumer_tally; + ::std::vector sequence_number; + public: + Consumer(Queue_t * const queue, int numProducers, int numProducerElements); + void Run(); + const ::std::vector & Tally() const { + return consumer_tally; + } + }; + class Producer { + private: + Queue_t * q; + size_t producer_id; + int n_producer_elements; + public: + Producer(Queue_t * const queue, size_t id, int numProducerElements) : + q(queue), + producer_id(id), + n_producer_elements(numProducerElements) {} + void Run(); + }; + private: - static const int QUEUE_SIZE = 100; - static const int TOTAL_PRODUCE_CONSUME_COUNT = 10000; int n_threads; + int n_queue_size; + int n_total_produce_consume_count; embb::base::Atomic thread_selector_producer; embb::base::Atomic produce_count; - std::vector consumed_elements; - std::vector produced_elements; + ::std::vector consumed_elements; + ::std::vector produced_elements; + ::std::vector consumers; + ::std::vector producers; - //for multiple p/c - int n_iterations; - int n_queue_elements_per_thread; - int n_queue_elements; - std::vector expected_queue_elements; - std::vector* thread_local_vectors; - embb::base::Atomic queueSize; + // for multiple p/c + int n_producers; + int n_consumers; + embb::base::Atomic next_producer_id; + embb::base::Atomic next_consumer_id; + /// Number of elements enqueued by every producer, depending + /// on number of iterations for regression tests. + int n_producer_elements; int consume_count; Queue_t* queue; - void QueueTestMultipleProducerMultipleConsumer_Pre(); - void QueueTestMultipleProducerMultipleConsumer_Post(); - void QueueTestMultipleProducerMultipleConsumer_ThreadMethod(); - void QueueTestSingleProducedSingleConsumer_Pre(); - void QueueTestSingleProducedSingleConsumer_Post(); - void QueueTestSingleProducedSingleConsumer_ThreadMethod(); - void QueueTestSingleThreadEnqueueDequeue_ThreadMethod(); + void QueueTestOrderMPMC_Pre(); + void QueueTestOrderMPMC_Post(); + void QueueTestOrderMPMC_ProducerThreadMethod(); + void QueueTestOrderMPMC_ConsumerThreadMethod(); + void QueueTestSingleProducerSingleConsumer_Pre(); + void QueueTestSingleProducerSingleConsumer_Post(); + void QueueTestSingleProducerSingleConsumer_ThreadMethod(); void QueueTestSingleThreadEnqueueDequeue_Pre(); void QueueTestSingleThreadEnqueueDequeue_Post(); - + void QueueTestSingleThreadEnqueueDequeue_ThreadMethod(); + public: QueueTest(); };