Commit 66fabda7 by lucapegolotti

Fix problem with single producer - single consumer queue: previous test was not…

Fix problem with single producer - single consumer queue: previous test was not suitable for this case, modified in order to account for spsc
parent 6c07daa1
/* /*
* Main script that applies the linearizability tester on embb data structures. * Main script that applies the linearizability tester on embb data structures.
*/ */
...@@ -17,7 +17,24 @@ ...@@ -17,7 +17,24 @@
#include <embb/containers/lock_free_mpmc_queue.h> #include <embb/containers/lock_free_mpmc_queue.h>
#include <embb/containers/wait_free_spsc_queue.h> #include <embb/containers/wait_free_spsc_queue.h>
// Each thread executes quasi randomly operations (TryEqneueu, TryDequeue) // Class that provides unique ids for threads.
class id_creator {
public:
id_creator() : id(0) {};
void next_id(size_t &cur_id){
std::lock_guard<std::mutex> lock(mut);
cur_id = id;
id ++;
}
private:
size_t id;
std::mutex mut;
};
// Each thread executes quasi randomly operations (TryEnqueue, TryDequeue)
// on the concurrent data structure and construct the history. // on the concurrent data structure and construct the history.
template<std::size_t N, class S> template<std::size_t N, class S>
static void embb_worker_stack( static void embb_worker_stack(
...@@ -64,39 +81,83 @@ template<std::size_t N, class S> ...@@ -64,39 +81,83 @@ template<std::size_t N, class S>
static void embb_worker_queue( static void embb_worker_queue(
const WorkerConfiguration& worker_configuration, const WorkerConfiguration& worker_configuration,
ConcurrentLog<state::Queue<N>>& concurrent_log, ConcurrentLog<state::Queue<N>>& concurrent_log,
S& concurrent_queue) S& concurrent_queue,
id_creator* creator,
bool mpmc)
{ {
std::random_device rd; std::random_device rd;
std::mt19937 gen(rd()); std::mt19937 gen(rd());
std::uniform_int_distribution<> value_dist('\0', worker_configuration.max_value); std::uniform_int_distribution<> value_dist('\0', worker_configuration.max_value);
std::uniform_int_distribution<> percentage_dist(0, 100); std::uniform_int_distribution<> percentage_dist(0, 100);
bool ret; // my_id is used only in the case of single producer, single consumer.
// my_id == 0 : producer thread, my_id == 1 : consumer thread
size_t my_id;
creator->next_id(my_id);
bool ret;
char value; char value;
unsigned percentage; unsigned percentage;
EntryPtr<state::Queue<N>> call_entry_ptr; EntryPtr<state::Queue<N>> call_entry_ptr;
for (unsigned number_of_ops{ 0U }; if (mpmc){
number_of_ops < worker_configuration.number_of_ops; for (unsigned number_of_ops{ 0U };
++number_of_ops) number_of_ops < worker_configuration.number_of_ops;
{ ++number_of_ops)
value = value_dist(rd);
percentage = percentage_dist(rd);
// Note: this threshold affects considerably the running time of the test
// increasing threshold -> increasing running time
if (percentage < 20)
{ {
call_entry_ptr = concurrent_log.push_back(state::Queue<N>::make_try_enqueue_call(value)); value = value_dist(rd);
ret = concurrent_queue.TryEnqueue(value); percentage = percentage_dist(rd);
concurrent_log.push_back(call_entry_ptr, state::Queue<N>::make_try_enqueue_ret(ret)); // Note: this threshold affects considerably the running time of the test
// increasing threshold -> increasing running time
if (percentage < 20)
{
call_entry_ptr = concurrent_log.push_back(state::Queue<N>::make_try_enqueue_call(value));
ret = concurrent_queue.TryEnqueue(value);
concurrent_log.push_back(call_entry_ptr, state::Queue<N>::make_try_enqueue_ret(ret));
}
else
{
call_entry_ptr = concurrent_log.push_back(state::Queue<N>::make_try_dequeue_call());
ret = concurrent_queue.TryDequeue(value);
concurrent_log.push_back(call_entry_ptr, state::Queue<N>::make_try_dequeue_ret(ret, value));
}
} }
else }
{ // single produced - single consumer case. Note that the final number of operations
call_entry_ptr = concurrent_log.push_back(state::Queue<N>::make_try_dequeue_call()); // is not known, because threads do not enqueue/dequeue at each iteration
ret = concurrent_queue.TryDequeue(value); else {
concurrent_log.push_back(call_entry_ptr, state::Queue<N>::make_try_dequeue_ret(ret, value)); if(my_id == 0){
// multiplying by 2 the number of operations because
// the thread will not make an operation at teach iteration
for (unsigned number_of_ops{ 0U };
number_of_ops < worker_configuration.number_of_ops * 2;
++number_of_ops)
{
percentage = percentage_dist(rd);
value = value_dist(rd);
if (percentage<20){
call_entry_ptr = concurrent_log.push_back(state::Queue<N>::make_try_enqueue_call(value));
ret = concurrent_queue.TryEnqueue(value);
concurrent_log.push_back(call_entry_ptr, state::Queue<N>::make_try_enqueue_ret(ret));
}
}
}
else if(my_id == 1){
for (unsigned number_of_ops{ 0U };
number_of_ops < worker_configuration.number_of_ops * 2;
++number_of_ops)
{
percentage = percentage_dist(rd);
if (percentage>80){
call_entry_ptr = concurrent_log.push_back(state::Queue<N>::make_try_dequeue_call());
ret = concurrent_queue.TryDequeue(value);
concurrent_log.push_back(call_entry_ptr, state::Queue<N>::make_try_dequeue_ret(ret, value));
}
}
} }
} }
} }
// Creates the history and apply the tester on it // Creates the history and apply the tester on it
...@@ -117,7 +178,7 @@ static void embb_experiment_stack() ...@@ -117,7 +178,7 @@ static void embb_experiment_stack()
char value; char value;
assert(concurrent_stack.TryPush(5)); assert(concurrent_stack.TryPush(5));
assert(concurrent_stack.TryPop(value)); assert(concurrent_stack.TryPop(value));
// create history // create history
start_threads(number_of_threads, embb_worker_stack<N, S>, std::cref(worker_configuration), start_threads(number_of_threads, embb_worker_stack<N, S>, std::cref(worker_configuration),
std::ref(concurrent_log), std::ref(concurrent_stack)); std::ref(concurrent_log), std::ref(concurrent_stack));
...@@ -148,7 +209,7 @@ static void embb_experiment_stack() ...@@ -148,7 +209,7 @@ static void embb_experiment_stack()
// Creates the history and apply the tester on it // Creates the history and apply the tester on it
template <class S> template <class S>
static void embb_experiment_queue() static void embb_experiment_queue(bool mpmc)
{ {
constexpr std::chrono::hours max_duration{ 1 }; constexpr std::chrono::hours max_duration{ 1 };
constexpr std::size_t N = 560000U; constexpr std::size_t N = 560000U;
...@@ -165,12 +226,14 @@ static void embb_experiment_queue() ...@@ -165,12 +226,14 @@ static void embb_experiment_queue()
char value; char value;
assert(concurrent_queue.TryEnqueue(5)); assert(concurrent_queue.TryEnqueue(5));
assert(concurrent_queue.TryDequeue(value)); assert(concurrent_queue.TryDequeue(value));
id_creator* creator = new id_creator();
// create history // create history
start_threads(number_of_threads, embb_worker_queue<N, S>, std::cref(worker_configuration), start_threads(number_of_threads, embb_worker_queue<N, S>, std::cref(worker_configuration),
std::ref(concurrent_log), std::ref(concurrent_queue)); std::ref(concurrent_log), std::ref(concurrent_queue), std::cref(creator),std::cref(mpmc));
delete creator;
const std::size_t number_of_entries{ concurrent_log.number_of_entries() }; const std::size_t number_of_entries{ concurrent_log.number_of_entries() };
const LogInfo<state::Queue<N>> log_info{ concurrent_log.info() }; const LogInfo<state::Queue<N>> log_info{ concurrent_log.info()};
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
...@@ -194,20 +257,19 @@ static void embb_experiment_queue() ...@@ -194,20 +257,19 @@ static void embb_experiment_queue()
int main() int main()
{ {
// Test functions and structures in linearizability_tester.h and sequential_data_structures.h // Test functions and structures in linearizability_tester.h and sequential_data_structures.h
run_tests(); run_tests();
embb::base::Thread::SetThreadsMaxCount(255); embb::base::Thread::SetThreadsMaxCount(255);
std::cout << "Linearizability test on LockFreeMPMCQueue" << std::endl; std::cout << "Linearizability test on LockFreeMPMCQueue" << std::endl;
embb_experiment_queue<embb::containers::LockFreeMPMCQueue<char>>(); embb_experiment_queue<embb::containers::LockFreeMPMCQueue<char>>(true);
std::cout << "Linearizability test on WaitFreeSPSCQueue" << std::endl; std::cout << "Linearizability test on WaitFreeSPSCQueue" << std::endl;
embb_experiment_queue<embb::containers::WaitFreeSPSCQueue<char>>(); embb_experiment_queue<embb::containers::WaitFreeSPSCQueue<char>>(false);
std::cout << "Linearizability test on LockFreeStack" << std::endl; std::cout << "Linearizability test on LockFreeStack" << std::endl;
embb_experiment_stack<embb::containers::LockFreeStack<char>>(); embb_experiment_stack<embb::containers::LockFreeStack<char>>();
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment