Commit 91d2d4f6 by Tobias Fuchs

containers_cpp: extended unit tests for MP/MC queue by checks for relative order

parent 72dd5ed2
......@@ -55,11 +55,12 @@ PT_MAIN("Data Structures C++") {
PT_RUN(embb::containers::test::HazardPointerTest);
PT_RUN(embb::containers::test::QueueTest<
embb::containers::WaitFreeSPSCQueue<int> >);
PT_RUN(embb::containers::test::QueueTest<
embb::containers::WaitFreeSPSCQueue< ::std::pair<size_t COMMA int> > >);
PT_RUN(embb::containers::test::QueueTest<
embb::containers::LockFreeMPMCQueue<int> COMMA true COMMA true >);
PT_RUN(embb::containers::test::QueueTest<
embb::containers::LockFreeMPMCQueue< ::std::pair<size_t COMMA int> >
COMMA true COMMA true >);
PT_RUN(embb::containers::test::StackTest<
embb::containers::LockFreeStack<int> >);
......
......@@ -30,132 +30,190 @@
#include <algorithm>
#include <vector>
#include <embb/base/internal/config.h>
namespace embb {
namespace containers {
namespace test {
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::QueueTest() :
n_threads(static_cast<int>
(partest::TestSuite::GetDefaultNumThreads())),
n_iterations(200),
n_queue_elements_per_thread(100),
n_queue_elements(n_queue_elements_per_thread*n_threads),
queueSize(0) {
n_threads(static_cast<int>(partest::TestSuite::GetDefaultNumThreads())),
n_queue_size(
static_cast<int>(partest::TestSuite::GetDefaultNumIterations()) *
MIN_TOTAL_PRODUCE_CONSUME_COUNT),
n_total_produce_consume_count(n_queue_size),
n_producers(1),
n_consumers(1),
next_producer_id(0),
next_consumer_id(0),
n_producer_elements(
static_cast<int>(partest::TestSuite::GetDefaultNumIterations() *
MIN_ENQ_ELEMENTS)) {
CreateUnit("QueueTestSingleThreadEnqueueDequeue").
Pre(&QueueTest::QueueTestSingleThreadEnqueueDequeue_Pre, this).
Add(&QueueTest::QueueTestSingleThreadEnqueueDequeue_ThreadMethod, this).
Post(&QueueTest::QueueTestSingleThreadEnqueueDequeue_Post, this);
CreateUnit("QueueTestTwoThreadsSingleProducerSingleConsumer").
Pre(&QueueTest::QueueTestSingleProducedSingleConsumer_Pre, this).
Add(&QueueTest::QueueTestSingleProducedSingleConsumer_ThreadMethod,
this,
2,
TOTAL_PRODUCE_CONSUME_COUNT).
Post(&QueueTest::QueueTestSingleProducedSingleConsumer_Post, this);
#ifdef EMBB_COMPILER_MSVC
Pre(&QueueTest::QueueTestSingleProducerSingleConsumer_Pre, this).
Add(&QueueTest::QueueTestSingleProducerSingleConsumer_ThreadMethod,
this,
2,
static_cast<size_t>(n_total_produce_consume_count)).
Post(&QueueTest::QueueTestSingleProducerSingleConsumer_Post, this);
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable:4127)
#endif
if (MultipleProducers == true &&
MultipleConsumers == true) {
#ifdef EMBB_COMPILER_MSVC
if (MultipleProducers == true && MultipleConsumers == true) {
// MP/MC
n_producers = n_threads / 2;
n_consumers = n_threads / 2;
#ifdef _MSC_VER
#pragma warning(pop)
#endif
CreateUnit("QueueTestMultipleThreadsMultipleProducerMultipleConsumer").
Pre(&QueueTest::QueueTestMultipleProducerMultipleConsumer_Pre, this).
Add(&QueueTest::QueueTestMultipleProducerMultipleConsumer_ThreadMethod,
this,
static_cast<size_t>(n_threads),
static_cast<size_t>(n_iterations)).
Post(&QueueTest::QueueTestMultipleProducerMultipleConsumer_Post, this);
CreateUnit("QueueTestOrderMultipleProducerMultipleConsumer").
Pre(&QueueTest::QueueTestOrderMPMC_Pre, this).
Add(&QueueTest::QueueTestOrderMPMC_ConsumerThreadMethod,
this,
static_cast<size_t>(n_consumers),
static_cast<size_t>(1)).
Add(&QueueTest::QueueTestOrderMPMC_ProducerThreadMethod,
this,
static_cast<size_t>(n_producers),
static_cast<size_t>(1)).
Post(&QueueTest::QueueTestOrderMPMC_Post, this);
}
}
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::
QueueTestMultipleProducerMultipleConsumer_Pre() {
QueueTestOrderMPMC_Pre() {
queue = new Queue_t(static_cast<size_t>(n_producer_elements));
embb_internal_thread_index_reset();
queue = new Queue_t(static_cast<size_t>(n_queue_elements));
thread_local_vectors =
new std::vector<int>[static_cast<unsigned int>(n_threads)];
for (int i = 0; i != n_threads; ++i) {
int offset = n_queue_elements_per_thread * 2;
for (int i2 = 0; i2 != n_queue_elements_per_thread; ++i2) {
int push_element = i2 + (offset*i);
thread_local_vectors[i].push_back(push_element);
expected_queue_elements.push_back(push_element);
}
next_producer_id = 0;
next_consumer_id = 0;
consumers.clear();
producers.clear();
for (size_t p = 0; p < static_cast<size_t>(n_producers); ++p) {
producers.push_back(Producer(queue, p, n_producer_elements));
}
for (size_t c = 0; c < static_cast<size_t>(n_consumers); ++c) {
consumers.push_back(Consumer(queue, n_producers, n_producer_elements));
}
}
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::
QueueTestMultipleProducerMultipleConsumer_Post() {
std::vector<int> produced;
for (int i = 0; i != n_threads; ++i) {
std::vector<int>& loc_elements = thread_local_vectors[i];
for (std::vector<int>::iterator it = loc_elements.begin();
it != loc_elements.end();
++it) {
produced.push_back(*it);
QueueTestOrderMPMC_Post() {
delete queue;
// Tally for all elements enqueued by all producers,
// initialized with all 0:
::std::vector<unsigned char> total_tally;
size_t n_elements_total = static_cast<size_t>(n_producers * n_producer_elements);
for (size_t i = 0; i < n_elements_total / 8; ++i) {
total_tally.push_back(0);
}
// Collect all dequeued element flags from consumers:
for (size_t c = 0; c < static_cast<size_t>(n_consumers); ++c) {
for (size_t e = 0; e < n_elements_total / 8; ++e) {
total_tally[e] |= consumers[c].Tally()[e];
}
}
PT_ASSERT(produced.size() == expected_queue_elements.size());
std::sort(expected_queue_elements.begin(), expected_queue_elements.end());
std::sort(produced.begin(), produced.end());
for (unsigned int i = 0;
i != static_cast<unsigned int>(produced.size()); ++i) {
PT_ASSERT(expected_queue_elements[i] == produced[i]);
// Test if all elements have been dequeued by any
// consumer.
// To avoid static cast warning:
for (size_t t = 0;
t < static_cast<size_t>(n_producers * n_producer_elements / 8);
++t) {
PT_ASSERT_EQ_MSG(total_tally[t], 0xff,
"missing dequeued elements");
}
delete[] thread_local_vectors;
delete queue;
}
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::
QueueTestMultipleProducerMultipleConsumer_ThreadMethod() {
unsigned int thread_index;
int return_val = embb_internal_thread_index(&thread_index);
PT_ASSERT(EMBB_SUCCESS == return_val);
QueueTestOrderMPMC_ProducerThreadMethod() {
size_t p_id = next_producer_id.FetchAndAdd(1);
producers[p_id].Run();
}
std::vector<int>& my_elements = thread_local_vectors[thread_index];
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::
QueueTestOrderMPMC_ConsumerThreadMethod() {
size_t c_id = next_consumer_id.FetchAndAdd(1);
consumers[c_id].Run();
}
for (std::vector<int>::iterator it = my_elements.begin();
it != my_elements.end();
++it) {
int enq = *it;
bool success = queue->TryEnqueue(enq);
PT_ASSERT(success == true);
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::Producer::
Run() {
// Enqueue pairs of (producer id, counter):
for (int i = 0; i < n_producer_elements; ++i) {
while (!q->TryEnqueue(element_t(producer_id, i))) {
embb::base::Thread::CurrentYield();
}
}
// Enqueue -1 as terminator element of this producer:
while (!q->TryEnqueue(element_t(producer_id, -1))) {
embb::base::Thread::CurrentYield();
}
}
my_elements.clear();
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::Consumer::
Consumer(Queue_t * const queue, int numProducers, int numProducerElements) :
q(queue),
n_producers(numProducers),
n_producer_elements(numProducerElements) {
for (int p_id = 0; p_id < n_producers; ++p_id) {
// Initialize last value dequeued from producers with
// below-minimum value:
sequence_number.push_back(-1);
// Initialize element tally for producer with all 0,
// 8 flags / char:
for (int i = 0; i < n_producer_elements / 8; ++i) {
consumer_tally.push_back(0);
}
}
}
for (int i = 0; i != n_queue_elements_per_thread; ++i) {
int dequ;
bool success = queue->TryDequeue(dequ);
PT_ASSERT(success == true);
my_elements.push_back(dequ);
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::Consumer::
Run() {
element_t element;
size_t producerId;
// To avoid compiler warning
bool forever = true;
while (forever) {
if (!q->TryDequeue(element)) {
continue;
}
if (element.second < 0) {
break;
}
producerId = element.first;
// Assert on dequeued element:
PT_ASSERT_LT_MSG(producerId, static_cast<size_t>(n_producers),
"Invalid producer id in dequeue");
PT_ASSERT_LT_MSG(sequence_number[producerId], element.second,
"Invalid element sequence");
// Store last value received from the element's producer:
sequence_number[producerId] = element.second;
const size_t pos((producerId * n_producer_elements) +
static_cast<size_t>(element.second));
// Test dequeued element's position flag: tally[pos] == 1
PT_ASSERT_EQ_MSG(consumer_tally[pos / 8] & (0x80 >> (pos % 8)), 0,
"Element dequeued twice");
// Set flag at dequeued element's position:
// tally[pos] = 1
consumer_tally[pos / 8] |= (0x80 >> (pos % 8));
}
}
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::
QueueTestSingleProducedSingleConsumer_Pre() {
QueueTestSingleProducerSingleConsumer_Pre() {
embb_internal_thread_index_reset();
queue = new Queue_t(QUEUE_SIZE);
queue = new Queue_t(static_cast<size_t>(n_queue_size));
thread_selector_producer = -1;
produce_count = 0;
consume_count = 0;
......@@ -165,53 +223,47 @@ QueueTestSingleProducedSingleConsumer_Pre() {
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::
QueueTestSingleProducedSingleConsumer_Post() {
QueueTestSingleProducerSingleConsumer_Post() {
embb_atomic_memory_barrier();
::std::sort(consumed_elements.begin(), consumed_elements.end());
::std::sort(produced_elements.begin(), produced_elements.end());
PT_ASSERT(consumed_elements.size() == produced_elements.size());
for (unsigned int i = 0;
i != static_cast<unsigned int>(consumed_elements.size()); i++) {
PT_ASSERT(consumed_elements[i] == produced_elements[i]);
}
delete queue;
}
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::
QueueTestSingleProducedSingleConsumer_ThreadMethod() {
QueueTestSingleProducerSingleConsumer_ThreadMethod() {
unsigned int thread_index;
int return_val = embb_internal_thread_index(&thread_index);
PT_ASSERT(return_val == EMBB_SUCCESS);
if (thread_selector_producer == -1) {
int expected = -1;
thread_selector_producer.CompareAndSwap(expected,
static_cast<int>(thread_index));
while (thread_selector_producer == -1) {}
}
// we are the producer
if (static_cast<unsigned int>(thread_selector_producer.Load()) ==
thread_index) {
while (produce_count >= QUEUE_SIZE) {}
// we are the producer
while (produce_count >= n_queue_size) { }
int random_var = rand() % 10000;
element_t random_var(0, rand() % 10000);
bool success = queue->TryEnqueue(random_var);
PT_ASSERT(success == true);
produce_count++;
produced_elements.push_back(random_var);
// we are the consumer
} else {
while (consume_count < TOTAL_PRODUCE_CONSUME_COUNT) {
// we are the consumer
while (consume_count < n_total_produce_consume_count) {
consume_count++;
while (produce_count == 0) {}
int consumed;
element_t consumed;
bool success = queue->TryDequeue(consumed);
PT_ASSERT(success == true);
produce_count--;
......@@ -223,23 +275,24 @@ QueueTestSingleProducedSingleConsumer_ThreadMethod() {
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::
QueueTestSingleThreadEnqueueDequeue_ThreadMethod() {
for (int i = 0; i != QUEUE_SIZE; ++i) {
bool success = queue->TryEnqueue(i * 133);
for (int i = 0; i != n_queue_size; ++i) {
bool success = queue->TryEnqueue(element_t(0, i * 133));
PT_ASSERT(success == true);
}
for (int i = 0; i != QUEUE_SIZE; ++i) {
int dequ = -1;
for (int i = 0; i != n_queue_size; ++i) {
element_t dequ(0, -1);
bool success = queue->TryDequeue(dequ);
PT_ASSERT(success == true);
PT_ASSERT(dequ == i * 133);
PT_ASSERT(dequ.second == i * 133);
}
}
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::
QueueTestSingleThreadEnqueueDequeue_Pre() {
queue = new Queue_t(QUEUE_SIZE);
queue = new Queue_t(static_cast<size_t>(n_queue_size));
}
template<typename Queue_t, bool MultipleProducers, bool MultipleConsumers>
void QueueTest<Queue_t, MultipleProducers, MultipleConsumers>::
QueueTestSingleThreadEnqueueDequeue_Post() {
......
......@@ -28,9 +28,9 @@
#define CONTAINERS_CPP_TEST_QUEUE_TEST_H_
#include <vector>
#include <utility>
#include <partest/partest.h>
#include <embb/base/duration.h>
#include <embb/containers/wait_free_spsc_queue.h>
namespace embb {
namespace containers {
......@@ -39,36 +39,76 @@ template<typename Queue_t,
bool MultipleProducers = false,
bool MultipleConsumers = false>
class QueueTest : public partest::TestCase {
public:
typedef ::std::pair<size_t, int> element_t;
private:
/// Minimum number of elements enqueued by every producer
/// in MP/MC unit test. Must be a multiple of 8.
static const int MIN_ENQ_ELEMENTS = 120;
static const int MIN_TOTAL_PRODUCE_CONSUME_COUNT = 1000;
private:
class Consumer {
private:
Queue_t * q;
int n_producers;
int n_producer_elements;
::std::vector<unsigned char> consumer_tally;
::std::vector<int> sequence_number;
public:
Consumer(Queue_t * const queue, int numProducers, int numProducerElements);
void Run();
const ::std::vector<unsigned char> & Tally() const {
return consumer_tally;
}
};
class Producer {
private:
Queue_t * q;
size_t producer_id;
int n_producer_elements;
public:
Producer(Queue_t * const queue, size_t id, int numProducerElements) :
q(queue),
producer_id(id),
n_producer_elements(numProducerElements) {}
void Run();
};
private:
static const int QUEUE_SIZE = 100;
static const int TOTAL_PRODUCE_CONSUME_COUNT = 10000;
int n_threads;
int n_queue_size;
int n_total_produce_consume_count;
embb::base::Atomic<int> thread_selector_producer;
embb::base::Atomic<int> produce_count;
std::vector<int> consumed_elements;
std::vector<int> produced_elements;
::std::vector<element_t> consumed_elements;
::std::vector<element_t> produced_elements;
::std::vector<Consumer> consumers;
::std::vector<Producer> producers;
//for multiple p/c
int n_iterations;
int n_queue_elements_per_thread;
int n_queue_elements;
std::vector<int> expected_queue_elements;
std::vector<int>* thread_local_vectors;
embb::base::Atomic<int> queueSize;
// for multiple p/c
int n_producers;
int n_consumers;
embb::base::Atomic<size_t> next_producer_id;
embb::base::Atomic<size_t> next_consumer_id;
/// Number of elements enqueued by every producer, depending
/// on number of iterations for regression tests.
int n_producer_elements;
int consume_count;
Queue_t* queue;
void QueueTestMultipleProducerMultipleConsumer_Pre();
void QueueTestMultipleProducerMultipleConsumer_Post();
void QueueTestMultipleProducerMultipleConsumer_ThreadMethod();
void QueueTestSingleProducedSingleConsumer_Pre();
void QueueTestSingleProducedSingleConsumer_Post();
void QueueTestSingleProducedSingleConsumer_ThreadMethod();
void QueueTestSingleThreadEnqueueDequeue_ThreadMethod();
void QueueTestOrderMPMC_Pre();
void QueueTestOrderMPMC_Post();
void QueueTestOrderMPMC_ProducerThreadMethod();
void QueueTestOrderMPMC_ConsumerThreadMethod();
void QueueTestSingleProducerSingleConsumer_Pre();
void QueueTestSingleProducerSingleConsumer_Post();
void QueueTestSingleProducerSingleConsumer_ThreadMethod();
void QueueTestSingleThreadEnqueueDequeue_Pre();
void QueueTestSingleThreadEnqueueDequeue_Post();
void QueueTestSingleThreadEnqueueDequeue_ThreadMethod();
public:
QueueTest();
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment