diff --git a/linearizability_tester/main.cc b/linearizability_tester/main.cc index e7d8ec1..828c386 100755 --- a/linearizability_tester/main.cc +++ b/linearizability_tester/main.cc @@ -1,4 +1,4 @@ -/* +/* * Main script that applies the linearizability tester on embb data structures. */ @@ -17,7 +17,24 @@ #include #include -// Each thread executes quasi randomly operations (TryEqneueu, TryDequeue) +// Class that provides unique ids for threads. +class id_creator { + public: + id_creator() : id(0) {}; + + void next_id(size_t &cur_id){ + std::lock_guard lock(mut); + cur_id = id; + id ++; + } + private: + size_t id; + + std::mutex mut; + +}; + +// Each thread executes quasi randomly operations (TryEnqueue, TryDequeue) // on the concurrent data structure and construct the history. template static void embb_worker_stack( @@ -64,39 +81,83 @@ template static void embb_worker_queue( const WorkerConfiguration& worker_configuration, ConcurrentLog>& concurrent_log, - S& concurrent_queue) + S& concurrent_queue, + id_creator* creator, + bool mpmc) { + std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution<> value_dist('\0', worker_configuration.max_value); std::uniform_int_distribution<> percentage_dist(0, 100); - bool ret; + // my_id is used only in the case of single producer, single consumer. + // my_id == 0 : producer thread, my_id == 1 : consumer thread + size_t my_id; + creator->next_id(my_id); + bool ret; char value; unsigned percentage; EntryPtr> call_entry_ptr; - for (unsigned number_of_ops{ 0U }; - number_of_ops < worker_configuration.number_of_ops; - ++number_of_ops) - { - value = value_dist(rd); - percentage = percentage_dist(rd); - // Note: this threshold affects considerably the running time of the test - // increasing threshold -> increasing running time - if (percentage < 20) + if (mpmc){ + for (unsigned number_of_ops{ 0U }; + number_of_ops < worker_configuration.number_of_ops; + ++number_of_ops) { - call_entry_ptr = concurrent_log.push_back(state::Queue::make_try_enqueue_call(value)); - ret = concurrent_queue.TryEnqueue(value); - concurrent_log.push_back(call_entry_ptr, state::Queue::make_try_enqueue_ret(ret)); + value = value_dist(rd); + percentage = percentage_dist(rd); + // Note: this threshold affects considerably the running time of the test + // increasing threshold -> increasing running time + if (percentage < 20) + { + call_entry_ptr = concurrent_log.push_back(state::Queue::make_try_enqueue_call(value)); + ret = concurrent_queue.TryEnqueue(value); + concurrent_log.push_back(call_entry_ptr, state::Queue::make_try_enqueue_ret(ret)); + } + else + { + call_entry_ptr = concurrent_log.push_back(state::Queue::make_try_dequeue_call()); + ret = concurrent_queue.TryDequeue(value); + concurrent_log.push_back(call_entry_ptr, state::Queue::make_try_dequeue_ret(ret, value)); + } } - else - { - call_entry_ptr = concurrent_log.push_back(state::Queue::make_try_dequeue_call()); - ret = concurrent_queue.TryDequeue(value); - concurrent_log.push_back(call_entry_ptr, state::Queue::make_try_dequeue_ret(ret, value)); + } + // single produced - single consumer case. Note that the final number of operations + // is not known, because threads do not enqueue/dequeue at each iteration + else { + if(my_id == 0){ + // multiplying by 2 the number of operations because + // the thread will not make an operation at teach iteration + for (unsigned number_of_ops{ 0U }; + number_of_ops < worker_configuration.number_of_ops * 2; + ++number_of_ops) + { + percentage = percentage_dist(rd); + value = value_dist(rd); + if (percentage<20){ + call_entry_ptr = concurrent_log.push_back(state::Queue::make_try_enqueue_call(value)); + ret = concurrent_queue.TryEnqueue(value); + concurrent_log.push_back(call_entry_ptr, state::Queue::make_try_enqueue_ret(ret)); + } + + } + } + else if(my_id == 1){ + for (unsigned number_of_ops{ 0U }; + number_of_ops < worker_configuration.number_of_ops * 2; + ++number_of_ops) + { + percentage = percentage_dist(rd); + if (percentage>80){ + call_entry_ptr = concurrent_log.push_back(state::Queue::make_try_dequeue_call()); + ret = concurrent_queue.TryDequeue(value); + concurrent_log.push_back(call_entry_ptr, state::Queue::make_try_dequeue_ret(ret, value)); + } + } } } + } // Creates the history and apply the tester on it @@ -117,7 +178,7 @@ static void embb_experiment_stack() char value; assert(concurrent_stack.TryPush(5)); assert(concurrent_stack.TryPop(value)); - + // create history start_threads(number_of_threads, embb_worker_stack, std::cref(worker_configuration), std::ref(concurrent_log), std::ref(concurrent_stack)); @@ -148,7 +209,7 @@ static void embb_experiment_stack() // Creates the history and apply the tester on it template -static void embb_experiment_queue() +static void embb_experiment_queue(bool mpmc) { constexpr std::chrono::hours max_duration{ 1 }; constexpr std::size_t N = 560000U; @@ -165,12 +226,14 @@ static void embb_experiment_queue() char value; assert(concurrent_queue.TryEnqueue(5)); assert(concurrent_queue.TryDequeue(value)); + id_creator* creator = new id_creator(); // create history - start_threads(number_of_threads, embb_worker_queue, std::cref(worker_configuration), - std::ref(concurrent_log), std::ref(concurrent_queue)); + start_threads(number_of_threads, embb_worker_queue, std::cref(worker_configuration), + std::ref(concurrent_log), std::ref(concurrent_queue), std::cref(creator),std::cref(mpmc)); + delete creator; const std::size_t number_of_entries{ concurrent_log.number_of_entries() }; - const LogInfo> log_info{ concurrent_log.info() }; + const LogInfo> log_info{ concurrent_log.info()}; auto start = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now(); @@ -194,20 +257,19 @@ static void embb_experiment_queue() int main() -{ +{ // Test functions and structures in linearizability_tester.h and sequential_data_structures.h run_tests(); embb::base::Thread::SetThreadsMaxCount(255); - + std::cout << "Linearizability test on LockFreeMPMCQueue" << std::endl; - embb_experiment_queue>(); + embb_experiment_queue>(true); std::cout << "Linearizability test on WaitFreeSPSCQueue" << std::endl; - embb_experiment_queue>(); + embb_experiment_queue>(false); std::cout << "Linearizability test on LockFreeStack" << std::endl; embb_experiment_stack>(); return 0; } -