main.cc 9.4 KB
Newer Older
1
/*
2 3 4
 * Main script that applies the linearizability tester on embb data structures.
 */

5 6 7 8 9 10
// Enable assertions even in Release mode
#ifdef NDEBUG
#undef NDEBUG
#include <assert.h>
#endif

11
#include <linearizability_tester.h>
12
#include <sequential_data_structures.h>
13 14 15 16 17
#include <tests.h>

#include <embb/base/thread.h>
#include <embb/containers/lock_free_stack.h>
#include <embb/containers/lock_free_mpmc_queue.h>
18
#include <embb/containers/wait_free_spsc_queue.h>
19

20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
// Class that provides unique ids for threads.
class id_creator {
 public:
   id_creator() : id(0) {};

   void next_id(size_t &cur_id){
     std::lock_guard<std::mutex> lock(mut);
     cur_id = id;
     id ++;
   }
 private:
   size_t id;

   std::mutex  mut;

};

// Each thread executes quasi randomly operations (TryEnqueue, TryDequeue)
38
// on the concurrent data structure and construct the history.
39 40
template<std::size_t N, class S>
static void embb_worker_stack(
lucapegolotti committed
41 42 43
  const WorkerConfiguration& worker_configuration,
  ConcurrentLog<state::Stack<N>>& concurrent_log,
  S& concurrent_stack)
44
{
lucapegolotti committed
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
  std::random_device rd;
  std::mt19937 gen(rd());
  std::uniform_int_distribution<> value_dist('\0', worker_configuration.max_value);
  std::uniform_int_distribution<> percentage_dist(0, 100);

  bool ret;

  char value;
  unsigned percentage;
  EntryPtr<state::Stack<N>> call_entry_ptr;
  for (unsigned number_of_ops{ 0U };
  number_of_ops < worker_configuration.number_of_ops;
    ++number_of_ops)
  {
    value = value_dist(rd);
    percentage = percentage_dist(rd);
61 62
    // Note: this threshold affects considerably the running time of the test
    // increasing threshold -> increasing running time
lucapegolotti committed
63 64 65 66 67 68 69 70 71 72 73 74 75
    if (percentage < 30)
    {
      call_entry_ptr = concurrent_log.push_back(state::Stack<N>::make_try_push_call(value));
      ret = concurrent_stack.TryPush(value);
      concurrent_log.push_back(call_entry_ptr, state::Stack<N>::make_try_push_ret(ret));
    }
    else
    {
      call_entry_ptr = concurrent_log.push_back(state::Stack<N>::make_try_pop_call());
      ret = concurrent_stack.TryPop(value);
      concurrent_log.push_back(call_entry_ptr, state::Stack<N>::make_try_pop_ret(ret, value));
    }
  }
76 77
}

78
// Each thread executes quasi randomly operations (TryEnqueue, TryDequeue)
79
// on the concurrent data structure and construct the history.
80 81
template<std::size_t N, class S>
static void embb_worker_queue(
lucapegolotti committed
82 83
  const WorkerConfiguration& worker_configuration,
  ConcurrentLog<state::Queue<N>>& concurrent_log,
84 85 86
  S& concurrent_queue,
  id_creator* creator,
  bool mpmc)
87
{
88

lucapegolotti committed
89 90 91 92 93
  std::random_device rd;
  std::mt19937 gen(rd());
  std::uniform_int_distribution<> value_dist('\0', worker_configuration.max_value);
  std::uniform_int_distribution<> percentage_dist(0, 100);

94 95 96 97
  // my_id is used only in the case of single producer, single consumer.
  // my_id == 0 : producer thread, my_id == 1 : consumer thread
  size_t my_id;
  creator->next_id(my_id);
lucapegolotti committed
98

99
  bool ret;
lucapegolotti committed
100 101 102
  char value;
  unsigned percentage;
  EntryPtr<state::Queue<N>> call_entry_ptr;
103 104 105 106
  if (mpmc){
    for (unsigned number_of_ops{ 0U };
    number_of_ops < worker_configuration.number_of_ops;
      ++number_of_ops)
lucapegolotti committed
107
    {
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
      value = value_dist(rd);
      percentage = percentage_dist(rd);
      // Note: this threshold affects considerably the running time of the test
      // increasing threshold -> increasing running time
      if (percentage < 20)
      {
        call_entry_ptr = concurrent_log.push_back(state::Queue<N>::make_try_enqueue_call(value));
        ret = concurrent_queue.TryEnqueue(value);
        concurrent_log.push_back(call_entry_ptr, state::Queue<N>::make_try_enqueue_ret(ret));
      }
      else
      {
        call_entry_ptr = concurrent_log.push_back(state::Queue<N>::make_try_dequeue_call());
        ret = concurrent_queue.TryDequeue(value);
        concurrent_log.push_back(call_entry_ptr, state::Queue<N>::make_try_dequeue_ret(ret, value));
      }
lucapegolotti committed
124
    }
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
  }
  // single produced - single consumer case. Note that the final number of operations
  // is not known, because threads do not enqueue/dequeue at each iteration
  else {
    if(my_id == 0){
        // multiplying by 2 the number of operations because
        // the thread will not make an operation at teach iteration
        for (unsigned number_of_ops{ 0U };
        number_of_ops < worker_configuration.number_of_ops * 2;
          ++number_of_ops)
        {
          percentage = percentage_dist(rd);
          value = value_dist(rd);
          if (percentage<20){
            call_entry_ptr = concurrent_log.push_back(state::Queue<N>::make_try_enqueue_call(value));
            ret = concurrent_queue.TryEnqueue(value);
            concurrent_log.push_back(call_entry_ptr, state::Queue<N>::make_try_enqueue_ret(ret));
          }

        }
    }
    else if(my_id == 1){
        for (unsigned number_of_ops{ 0U };
        number_of_ops < worker_configuration.number_of_ops * 2;
          ++number_of_ops)
        {
          percentage = percentage_dist(rd);
          if (percentage>80){
            call_entry_ptr = concurrent_log.push_back(state::Queue<N>::make_try_dequeue_call());
            ret = concurrent_queue.TryDequeue(value);
            concurrent_log.push_back(call_entry_ptr, state::Queue<N>::make_try_dequeue_ret(ret, value));
          }
        }
lucapegolotti committed
158 159
    }
  }
160

161 162
}

163
// Creates the history and apply the tester on it
164
template <class S>
165
static void embb_experiment_stack()
166
{
lucapegolotti committed
167 168 169 170 171 172 173 174 175 176
  constexpr std::chrono::hours max_duration{ 1 };
  constexpr std::size_t N = 560000U;
  constexpr unsigned number_of_threads = 4U;
  constexpr WorkerConfiguration worker_configuration = { '\24', 70000U };
  constexpr unsigned log_size = number_of_threads * worker_configuration.number_of_ops;

  Result<state::Stack<N>> result;
  ConcurrentLog<state::Stack<N>> concurrent_log{ 2U * log_size };
  S concurrent_stack(N);

177 178 179 180
  // Check if push and pop operations are possible
  char value;
  assert(concurrent_stack.TryPush(5));
  assert(concurrent_stack.TryPop(value));
181

lucapegolotti committed
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
  // create history
  start_threads(number_of_threads, embb_worker_stack<N, S>, std::cref(worker_configuration),
    std::ref(concurrent_log), std::ref(concurrent_stack));

  const std::size_t number_of_entries{ concurrent_log.number_of_entries() };
  const LogInfo<state::Stack<N>> log_info{ concurrent_log.info() };

  auto start = std::chrono::system_clock::now();
  auto end = std::chrono::system_clock::now();
  std::chrono::seconds seconds;

  start = std::chrono::system_clock::now();
  {
    Log<state::Stack<N>> log_copy{ log_info };
    assert(log_copy.number_of_entries() == number_of_entries);

    LinearizabilityTester<state::Stack<N>, Option::LRU_CACHE> tester{ log_copy.info(), max_duration };
    tester.check(result);
200 201
    // If structure is not linearizabile break run using assertion
    assert(result.is_timeout() || result.is_linearizable());
lucapegolotti committed
202 203 204 205 206 207
  }
  end = std::chrono::system_clock::now();
  seconds = std::chrono::duration_cast<std::chrono::seconds>(end - start);
  std::cout << "History length: " << number_of_entries
    << ", elapsed time: "
    << seconds.count() << " s " << std::endl;
208 209
}

210
// Creates the history and apply the tester on it
211
template <class S>
212
static void embb_experiment_queue(bool mpmc)
213
{
lucapegolotti committed
214 215 216 217 218 219 220 221 222 223 224
  constexpr std::chrono::hours max_duration{ 1 };
  constexpr std::size_t N = 560000U;
  constexpr unsigned number_of_threads = 4U;
  constexpr WorkerConfiguration worker_configuration = { '\24', 70000U };
  constexpr unsigned log_size = number_of_threads * worker_configuration.number_of_ops;


  Result<state::Queue<N>> result;
  ConcurrentLog<state::Queue<N>> concurrent_log{ 2U * log_size };
  S concurrent_queue(N);

225 226 227 228
  // check if enqueue and dequeue operations are possible
  char value;
  assert(concurrent_queue.TryEnqueue(5));
  assert(concurrent_queue.TryDequeue(value));
229
  id_creator* creator = new id_creator();
lucapegolotti committed
230 231

  // create history
232 233 234
  start_threads(number_of_threads,  embb_worker_queue<N, S>,  std::cref(worker_configuration),
    std::ref(concurrent_log), std::ref(concurrent_queue), std::cref(creator),std::cref(mpmc));
  delete creator;
lucapegolotti committed
235
  const std::size_t number_of_entries{ concurrent_log.number_of_entries() };
236
  const LogInfo<state::Queue<N>> log_info{ concurrent_log.info()};
lucapegolotti committed
237 238 239 240 241 242 243 244 245 246 247

  auto start = std::chrono::system_clock::now();
  auto end = std::chrono::system_clock::now();
  std::chrono::seconds seconds;

  start = std::chrono::system_clock::now();
  {
    Log<state::Queue<N>> log_copy{ log_info };
    assert(log_copy.number_of_entries() == number_of_entries);
    LinearizabilityTester<state::Queue<N>, Option::LRU_CACHE> tester{ log_copy.info(), max_duration };
    tester.check(result);
248 249
    // If structure is not linearizabile break run using assertion
    assert(result.is_timeout() || result.is_linearizable());
lucapegolotti committed
250 251 252 253 254 255
  }
  end = std::chrono::system_clock::now();
  seconds = std::chrono::duration_cast<std::chrono::seconds>(end - start);
  std::cout << "History length: " << number_of_entries
    << ", elapsed time:  "
    << seconds.count() << " s " << std::endl;
256 257 258 259
}


int main()
260
{
261

262
  // Test functions and structures in linearizability_tester.h and sequential_data_structures.h
263 264
  run_tests();
  embb::base::Thread::SetThreadsMaxCount(255);
265

266
  std::cout << "Linearizability test on LockFreeMPMCQueue" << std::endl;
267
  embb_experiment_queue<embb::containers::LockFreeMPMCQueue<char>>(true);
268

269
  std::cout << "Linearizability test on WaitFreeSPSCQueue" << std::endl;
270
  embb_experiment_queue<embb::containers::WaitFreeSPSCQueue<char>>(false);
271

272
  std::cout << "Linearizability test on LockFreeStack" << std::endl;
273
  embb_experiment_stack<embb::containers::LockFreeStack<char>>();
274 275
  return 0;
}