Commit d16ad3eb by FritzFlorian

Add basic exponential backoff to our code.

parent aad1db1f
Pipeline #1158 failed with stages
in 4 minutes 8 seconds
......@@ -15,6 +15,7 @@ add_library(pls STATIC
include/pls/internal/base/system_details.h
include/pls/internal/base/error_handling.h
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/base/backoff.h
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
......
#ifndef PLS_BACKOFF_H_
#define PLS_BACKOFF_H_
#include "pls/internal/base/system_details.h"
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/thread.h"
#include <random>
#include <math.h>
namespace pls {
namespace internal {
namespace base {
class backoff {
static constexpr unsigned long INITIAL_SPIN_ITERS = 2u << 2u;
static constexpr unsigned long MAX_SPIN_ITERS = 2u << 6u;
unsigned long current_ = INITIAL_SPIN_ITERS;
std::minstd_rand random_;
static void spin(unsigned long iterations) {
for (volatile unsigned long i = 0; i < iterations; i++)
system_details::relax_cpu(); // Spin
}
public:
backoff() : current_{INITIAL_SPIN_ITERS}, random_{std::random_device{}()} {}
void do_backoff() {
PROFILE_LOCK("Backoff")
spin(random_() % std::min(current_, MAX_SPIN_ITERS));
current_ = current_ * 2;
if (current_ > MAX_SPIN_ITERS) {
current_ = MAX_SPIN_ITERS;
}
}
void reset() {
current_ = INITIAL_SPIN_ITERS;
}
};
}
}
}
#endif //PLS_BACKOFF_H_
......@@ -10,7 +10,7 @@ namespace internal {
namespace base {
// Default Spin-Lock implementation for this project.
using spin_lock = tas_spin_lock;
using spin_lock = ttas_spin_lock;
}
}
......
......@@ -21,11 +21,10 @@ namespace base {
*/
class tas_spin_lock {
std::atomic_flag flag_;
unsigned int yield_at_tries_;
public:
tas_spin_lock() : flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {};
tas_spin_lock(const tas_spin_lock &other) : flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {}
tas_spin_lock() : flag_{ATOMIC_FLAG_INIT} {};
tas_spin_lock(const tas_spin_lock &/*other*/) : flag_{ATOMIC_FLAG_INIT} {}
void lock();
bool try_lock(unsigned int num_tries = 1);
......
......@@ -6,6 +6,7 @@
#include <iostream>
#include "pls/internal/base/thread.h"
#include "pls/internal/base/backoff.h"
namespace pls {
namespace internal {
......@@ -18,11 +19,11 @@ namespace base {
*/
class ttas_spin_lock {
std::atomic<int> flag_;
const unsigned int yield_at_tries_;
backoff backoff_;
public:
ttas_spin_lock() : flag_{0}, yield_at_tries_{1024} {};
ttas_spin_lock(const ttas_spin_lock &other) : flag_{0}, yield_at_tries_{other.yield_at_tries_} {}
ttas_spin_lock() : flag_{0}, backoff_{} {};
ttas_spin_lock(const ttas_spin_lock &/*other*/) : flag_{0}, backoff_{} {}
void lock();
bool try_lock(unsigned int num_tries = 1);
......
......@@ -7,14 +7,14 @@ namespace base {
bool swmr_spin_lock::reader_try_lock() {
PROFILE_LOCK("Try Acquire Read Lock")
if (write_request_.load(std::memory_order_relaxed) == 1) {
if (write_request_.load(std::memory_order_acquire) == 1) {
return false;
}
// We think we can enter the region
readers_.fetch_add(1, std::memory_order_acquire);
if (write_request_.load() == 1) {
if (write_request_.load(std::memory_order_acquire) == 1) {
// Whoops, the writer acquires the lock, so we back off again
readers_--;
readers_.fetch_add(-1, std::memory_order_release);
return false;
}
......@@ -29,17 +29,10 @@ void swmr_spin_lock::reader_unlock() {
void swmr_spin_lock::writer_lock() {
PROFILE_LOCK("Acquire Write Lock")
// Tell the readers that we would like to write
int expected;
while (true) {
expected = 0;
if (write_request_.compare_exchange_weak(expected, 1, std::memory_order_acquire)) {
break;
}
system_details::relax_cpu(); // Spin until WE set the write lock flag
}
write_request_.store(1, std::memory_order_acquire);
// Wait for all of them to exit the critical section
while (readers_.load() > 0)
while (readers_.load(std::memory_order_acquire) > 0)
system_details::relax_cpu(); // Spin, not expensive as relaxed load
}
......
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/tas_spin_lock.h"
#include "pls/internal/base/backoff.h"
namespace pls {
namespace internal {
......@@ -7,30 +8,31 @@ namespace base {
void tas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock")
int tries = 0;
while (true) {
tries++;
if (tries % yield_at_tries_ == 0) {
this_thread::yield();
}
backoff backoff_strategy;
while (true) {
if (flag_.test_and_set(std::memory_order_acquire) == 0) {
return;
}
backoff_strategy.do_backoff();
}
}
bool tas_spin_lock::try_lock(unsigned int num_tries) {
PROFILE_LOCK("Try Acquire Lock")
backoff backoff_strategy;
while (true) {
if (flag_.test_and_set(std::memory_order_acquire) == 0) {
return true;
}
num_tries--;
if (num_tries <= 0) {
return false;
}
if (flag_.test_and_set(std::memory_order_acquire) == 0) {
return true;
}
backoff_strategy.do_backoff();
}
}
......
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/ttas_spin_lock.h"
#include "pls/internal/base/backoff.h"
namespace pls {
namespace internal {
......@@ -7,40 +8,44 @@ namespace base {
void ttas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock")
int tries = 0;
int expected = 0;
backoff_.reset();
while (true) {
while (flag_.load(std::memory_order_relaxed) == 1) {
tries++;
if (tries % yield_at_tries_ == 0) {
this_thread::yield();
}
}
while (flag_.load(std::memory_order_relaxed) == 1)
system_details::relax_cpu(); // Spin
expected = 0;
if (flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)) {
return;
}
backoff_.do_backoff();
}
}
bool ttas_spin_lock::try_lock(unsigned int num_tries) {
PROFILE_LOCK("Try Acquire Lock")
int expected = 0;
backoff_.reset();
while (true) {
while (flag_.load(std::memory_order_relaxed) == 1) {
while (flag_.load() == 1) {
num_tries--;
if (num_tries <= 0) {
return false;
}
system_details::relax_cpu();
}
expected = 0;
if (flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)) {
return true;
}
num_tries--;
if (num_tries <= 0) {
return false;
}
backoff_.do_backoff();
}
}
......
......@@ -15,11 +15,11 @@ bool abstract_task::steal_work() {
const size_t my_id = my_state->id_;
const size_t offset = my_state->random_() % my_scheduler->num_threads();
const size_t max_tries = my_scheduler->num_threads(); // TODO: Tune this value
const size_t max_tries = my_scheduler->num_threads() - 1; // TODO: Tune this value
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % my_scheduler->num_threads();
if (target == my_id) {
continue;
target = (target + 1) % my_scheduler->num_threads();
}
auto target_state = my_scheduler->thread_state_for(target);
......
......@@ -54,13 +54,15 @@ void fork_join_sub_task::wait_for_all() {
if (local_task != nullptr) {
local_task->execute();
} else {
// Try to steal work.
// External steal will be executed implicitly if success
PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work();
PROFILE_END_BLOCK
if (internal_steal_success) {
tbb_task_->last_stolen_->execute();
while (ref_count_ > 0) {
// Try to steal work.
// External steal will be executed implicitly if success
PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work();
PROFILE_END_BLOCK
if (internal_steal_success) {
tbb_task_->last_stolen_->execute();
}
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment