Commit 18b2d744 by Florian Fritz

Merge branch 'lock_free_task_stack' into 'master'

Merge: Lock Free Deque

See merge request !9
parents aa270645 237588ee
Pipeline #1162 passed with stages
in 3 minutes 39 seconds
......@@ -6,7 +6,7 @@
#include <complex>
#include <vector>
static constexpr int CUTOFF = 10;
static constexpr int CUTOFF = 16;
static constexpr int NUM_ITERATIONS = 1000;
static constexpr int INPUT_SIZE = 2064;
typedef std::vector<std::complex<double>> complex_vector;
......
......@@ -2,48 +2,99 @@
#include <pls/internal/helpers/profiler.h>
#include <iostream>
#include <complex>
#include <vector>
static pls::static_scheduler_memory<8, 2 << 14> my_scheduler_memory;
static constexpr int CUTOFF = 16;
static constexpr int INPUT_SIZE = 2064;
typedef std::vector<std::complex<double>> complex_vector;
static constexpr int CUTOFF = 10;
void divide(complex_vector::iterator data, int n) {
complex_vector tmp_odd_elements(n / 2);
for (int i = 0; i < n / 2; i++) {
tmp_odd_elements[i] = data[i * 2 + 1];
}
for (int i = 0; i < n / 2; i++) {
data[i] = data[i * 2];
}
for (int i = 0; i < n / 2; i++) {
data[i + n / 2] = tmp_odd_elements[i];
}
}
void combine(complex_vector::iterator data, int n) {
for (int i = 0; i < n / 2; i++) {
std::complex<double> even = data[i];
std::complex<double> odd = data[i + n / 2];
// w is the "twiddle-factor".
// this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
// so it won't impact the performance comparison.
std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n));
long fib_serial(long n) {
if (n == 0) {
return 0;
data[i] = even + w * odd;
data[i + n / 2] = even - w * odd;
}
if (n == 1) {
return 1;
}
void fft(complex_vector::iterator data, int n) {
if (n < 2) {
return;
}
return fib_serial(n - 1) + fib_serial(n - 2);
PROFILE_WORK_BLOCK("Divide")
divide(data, n);
PROFILE_END_BLOCK
PROFILE_WORK_BLOCK("Invoke Parallel")
if (n == CUTOFF) {
PROFILE_WORK_BLOCK("FFT Serial")
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else if (n <= CUTOFF) {
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else {
pls::invoke_parallel(
[&] { fft(data, n / 2); },
[&] { fft(data + n / 2, n / 2); }
);
}
PROFILE_END_BLOCK
PROFILE_WORK_BLOCK("Combine")
combine(data, n);
PROFILE_END_BLOCK
}
long fib(long n) {
if (n <= CUTOFF) {
return fib_serial(n);
complex_vector prepare_input(int input_size) {
std::vector<double> known_frequencies{2, 11, 52, 88, 256};
complex_vector data(input_size);
// Set our input data to match a time series of the known_frequencies.
// When applying fft to this time-series we should find these frequencies.
for (int i = 0; i < input_size; i++) {
data[i] = std::complex<double>(0.0, 0.0);
for (auto frequencie : known_frequencies) {
data[i] += sin(2 * M_PI * frequencie * i / input_size);
}
}
// Actual 'invoke_parallel' logic/code
int left, right;
pls::invoke_parallel(
[&] { left = fib(n - 1); },
[&] { right = fib(n - 2); }
);
return left + right;
return data;
}
int main() {
PROFILE_ENABLE
pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 14};
pls::scheduler scheduler{&my_scheduler_memory, 8};
long result;
complex_vector initial_input = prepare_input(INPUT_SIZE);
scheduler.perform_work([&] {
PROFILE_MAIN_THREAD
// Call looks just the same, only requirement is
// the enclosure in the perform_work lambda.
for (int i = 0; i < 10; i++) {
result = fib(30);
std::cout << "Fib(30)=" << result << std::endl;
PROFILE_WORK_BLOCK("Top Level FFT")
complex_vector input = initial_input;
fft(input.begin(), input.size());
}
});
......
......@@ -11,8 +11,5 @@
#include <pls/internal/helpers/unique_id.h>
int main() {
std::cout << pls::internal::scheduling::root_task<void (*)>::create_id().type_.hash_code() << std::endl;
std::cout
<< pls::internal::helpers::unique_id::create<pls::internal::scheduling::root_task<void (*)>>().type_.hash_code()
<< std::endl;
}
# List all required files here (cmake best practice to NOT automate this step!)
add_library(pls STATIC
include/pls/pls.h src/pls.cpp
include/pls/pls.h src/pls.cpp
include/pls/algorithms/invoke_parallel.h
include/pls/algorithms/invoke_parallel_impl.h
include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp
include/pls/internal/base/thread.h src/internal/base/thread.cpp
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp
include/pls/internal/base/swmr_spin_lock.h src/internal/base/swmr_spin_lock.cpp
include/pls/internal/base/thread.h src/internal/base/thread.cpp
include/pls/internal/base/thread_impl.h
include/pls/internal/base/barrier.h src/internal/base/barrier.cpp
include/pls/internal/base/barrier.h src/internal/base/barrier.cpp
include/pls/internal/base/system_details.h
include/pls/internal/base/error_handling.h
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/base/backoff.h
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp
include/pls/internal/data_structures/work_stealing_deque.h
include/pls/internal/helpers/prohibit_new.h
include/pls/internal/helpers/profiler.h
include/pls/internal/helpers/mini_benchmark.h
include/pls/internal/helpers/unique_id.h
include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp
include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
)
include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
PUBLIC
$<INSTALL_INTERFACE:include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/src # TODO: Set this up when we require private headers
)
${CMAKE_CURRENT_SOURCE_DIR}/src # TODO: Set this up when we require private headers
)
# Add cmake dependencies here if needed
target_link_libraries(pls
Threads::Threads # pthread support
)
if(EASY_PROFILER)
)
if (EASY_PROFILER)
target_link_libraries(pls easy_profiler)
endif()
endif ()
# Rules for istalling the library on a system
# ...binaries
INSTALL(TARGETS pls
EXPORT pls-targets
LIBRARY
DESTINATION lib/pls
DESTINATION lib/pls
ARCHIVE
DESTINATION lib/pls
)
DESTINATION lib/pls
)
# ...all headers in `include`
INSTALL(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/pls
......
......@@ -5,6 +5,7 @@
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h"
#include "pls/internal/base/alignment.h"
namespace pls {
namespace algorithm {
......@@ -21,7 +22,7 @@ inline void run_body(const Body &internal_body, const abstract_task::id &id) {
auto current_sub_task = reinterpret_cast<fork_join_task *>(current_task)->currently_executing();
internal_body(current_sub_task);
} else {
fork_join_lambda<Body> root_body(&internal_body);
fork_join_lambda_by_reference<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
......@@ -32,14 +33,15 @@ template<typename Function1, typename Function2>
void invoke_parallel(const Function1 &function1, const Function2 &function2) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<Function1, Function2>();
auto internal_body = [&](fork_join_sub_task *this_task) {
auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); };
auto sub_task_2 = fork_join_lambda_by_reference<decltype(sub_task_body_2)>(&sub_task_body_2);
this_task->spawn_child(sub_task_1);
function2(); // Execute last function 'inline' without spawning a sub_task object
this_task->spawn_child(sub_task_2);
function1(); // Execute first function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
......@@ -53,14 +55,14 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2, con
static abstract_task::id id = unique_id::create<Function1, Function2, Function3>();
auto internal_body = [&](fork_join_sub_task *this_task) {
auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); };
auto sub_task_2 = fork_join_lambda<decltype(sub_task_body_2)>(&sub_task_body_2);
auto sub_task_2 = fork_join_lambda_by_reference<decltype(sub_task_body_2)>(&sub_task_body_2);
auto sub_task_body_3 = [&](fork_join_sub_task *) { function3(); };
auto sub_task_3 = fork_join_lambda_by_reference<decltype(sub_task_body_3)>(&sub_task_body_3);
this_task->spawn_child(sub_task_1);
this_task->spawn_child(sub_task_2);
function3(); // Execute last function 'inline' without spawning a sub_task object
this_task->spawn_child(sub_task_3);
function1(); // Execute first function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
......
......@@ -19,10 +19,32 @@ struct aligned_wrapper {
};
void *allocate_aligned(size_t size);
std::uintptr_t next_alignment(std::uintptr_t size);
system_details::pointer_t next_alignment(system_details::pointer_t size);
system_details::pointer_t previous_alignment(system_details::pointer_t size);
char *next_alignment(char *pointer);
}
template<typename T>
struct aligned_aba_pointer {
const system_details::pointer_t pointer_;
explicit aligned_aba_pointer(T *pointer, unsigned int aba = 0) : pointer_{
reinterpret_cast<system_details::pointer_t >(pointer) + aba} {}
T *pointer() const {
return reinterpret_cast<T *>(pointer_ & system_details::CACHE_LINE_ADDRESS_USED_BITS);
}
unsigned int aba() const {
return pointer_ & system_details::CACHE_LINE_ADDRESS_UNUSED_BITS;
}
aligned_aba_pointer set_aba(unsigned int aba) const {
return aligned_aba_pointer(pointer(), aba);
}
};
}
}
}
......
#ifndef PLS_BACKOFF_H_
#define PLS_BACKOFF_H_
#include "pls/internal/base/system_details.h"
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/thread.h"
#include <random>
#include <math.h>
namespace pls {
namespace internal {
namespace base {
class backoff {
const unsigned long INITIAL_SPIN_ITERS = 2u << 4u;
const unsigned long MAX_SPIN_ITERS = 2u << 8u;
const unsigned long MAX_ITERS = 2u << 10u;
const unsigned long YELD_ITERS = 2u << 10u;
unsigned long current_ = INITIAL_SPIN_ITERS;
std::minstd_rand random_;
static void spin(unsigned long iterations) {
for (volatile unsigned long i = 0; i < iterations; i++)
system_details::relax_cpu(); // Spin
}
public:
backoff() : current_{INITIAL_SPIN_ITERS}, random_{std::random_device{}()} {}
void do_backoff() {
PROFILE_LOCK("Backoff")
spin(random_() % std::min(current_, MAX_SPIN_ITERS));
if (current_ >= YELD_ITERS) {
PROFILE_LOCK("Yield")
this_thread::yield();
}
current_ = std::min(current_ * 2, MAX_ITERS);
}
void reset() {
current_ = INITIAL_SPIN_ITERS;
}
};
}
}
}
#endif //PLS_BACKOFF_H_
......@@ -11,5 +11,6 @@
* (or its inclusion adds too much overhead).
*/
#define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1);
#define PLS_ASSERT(cond, msg) if (!cond) { PLS_ERROR(msg) }
#endif //PLS_ERROR_HANDLING_H
......@@ -10,7 +10,7 @@ namespace internal {
namespace base {
// Default Spin-Lock implementation for this project.
using spin_lock = tas_spin_lock;
using spin_lock = ttas_spin_lock;
}
}
......
#ifndef PLS_SWMR_SPIN_LOCK_LOCK_H_
#define PLS_SWMR_SPIN_LOCK_LOCK_H_
#include <atomic>
#include "pls/internal/helpers/profiler.h"
namespace pls {
namespace internal {
namespace base {
/**
* Single writer, multiple reader spin lock.
* The writer is required to be the same thread all the time (single writer),
* while multiple threads can read.
* Readers fail to lock when the writer requests the lock,
* the acquires the lock after all remaining readers left the critical section.
*/
class swmr_spin_lock {
std::atomic<int> readers_;
std::atomic<int> write_request_;
public:
explicit swmr_spin_lock() : readers_{0}, write_request_{0} {}
bool reader_try_lock();
void reader_unlock();
void writer_lock();
void writer_unlock();
};
}
}
}
#endif //PLS_SWMR_SPIN_LOCK_LOCK_H_
......@@ -3,6 +3,9 @@
#define PLS_SYSTEM_DETAILS_H
#include <cstdint>
#if (COMPILER == MVCC)
#include <emmintrin.h>
#endif
namespace pls {
namespace internal {
......@@ -15,18 +18,58 @@ namespace base {
* Currently sane default values for x86.
*/
namespace system_details {
/**
* Pointer Types needed for ABA protection mixed into addresses.
* pointer_t should be an integer type capable of holding ANY pointer value.
*/
using pointer_t = std::uintptr_t;
constexpr pointer_t ZERO_POINTER = 0;
constexpr pointer_t MAX_POINTER = ~ZERO_POINTER;
/**
* Biggest type that supports atomic CAS operations.
* Usually it is sane to assume a pointer can be swapped in a single CAS operation.
*/
using cas_integer = pointer_t;
constexpr cas_integer MIN_CAS_INTEGER = 0;
constexpr cas_integer MAX_CAS_INTEGER = ~MIN_CAS_INTEGER;
constexpr cas_integer FIRST_HALF_CAS_INTEGER = MAX_CAS_INTEGER << ((sizeof(cas_integer) / 2) * 8);
constexpr cas_integer SECOND_HALF_CAS_INTEGER = ~FIRST_HALF_CAS_INTEGER;
/**
* Most processors have 64 byte cache lines
* Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings).
*/
constexpr std::uintptr_t CACHE_LINE_SIZE = 64;
constexpr unsigned int CACHE_LINE_ADDRESS_BITS = 6;
constexpr pointer_t CACHE_LINE_SIZE = 2u << (CACHE_LINE_ADDRESS_BITS - 1);
constexpr pointer_t CACHE_LINE_ADDRESS_USED_BITS = MAX_POINTER << CACHE_LINE_ADDRESS_BITS;
constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BITS;
/**
* Choose one of the following ways to store thread specific data.
* Try to choose the fastest available on this processor/system.
*/
// #define PLS_THREAD_SPECIFIC_PTHREAD
//#define PLS_THREAD_SPECIFIC_PTHREAD
#define PLS_THREAD_SPECIFIC_COMPILER
/**
* When spinning one wants to 'relax' the CPU from some task,
* e.g. disabling speculative execution/branch prediction
* or reducing its clock speed.
* This is both good for power draw, as well as for hyperthreading.
*
* Choose the implementation appropriate for your compiler-cpu combination.
*/
#if (COMPILER == MVCC)
inline void relax_cpu() {
_mm_pause();
}
#elif (COMPILER == GCC || COMPILER == LLVM)
inline void relax_cpu() {
asm("pause");
}
#endif
}
}
}
......
......@@ -21,11 +21,10 @@ namespace base {
*/
class tas_spin_lock {
std::atomic_flag flag_;
unsigned int yield_at_tries_;
public:
tas_spin_lock() : flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {};
tas_spin_lock(const tas_spin_lock &other) : flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {}
tas_spin_lock() : flag_{ATOMIC_FLAG_INIT} {};
tas_spin_lock(const tas_spin_lock &/*other*/) : flag_{ATOMIC_FLAG_INIT} {}
void lock();
bool try_lock(unsigned int num_tries = 1);
......
......@@ -9,6 +9,7 @@
#include <functional>
#include <pthread.h>
#include <atomic>
#include <time.h>
#include "system_details.h"
......@@ -44,6 +45,11 @@ class this_thread {
pthread_yield();
}
static void sleep(long microseconds) {
timespec time{0, 1000 * microseconds};
nanosleep(&time, nullptr);
}
/**
* Retrieves the local state pointer.
*
......
......@@ -6,6 +6,7 @@
#include <iostream>
#include "pls/internal/base/thread.h"
#include "pls/internal/base/backoff.h"
namespace pls {
namespace internal {
......@@ -18,11 +19,10 @@ namespace base {
*/
class ttas_spin_lock {
std::atomic<int> flag_;
const unsigned int yield_at_tries_;
public:
ttas_spin_lock() : flag_{0}, yield_at_tries_{1024} {};
ttas_spin_lock(const ttas_spin_lock &other) : flag_{0}, yield_at_tries_{other.yield_at_tries_} {}
ttas_spin_lock() : flag_{0} {};
ttas_spin_lock(const ttas_spin_lock &/*other*/) : flag_{0} {}
void lock();
bool try_lock(unsigned int num_tries = 1);
......
......@@ -12,6 +12,8 @@ namespace pls {
namespace internal {
namespace data_structures {
using base::system_details::pointer_t;
/**
* Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region.
* The objects will be stored aligned in the stack, making the storage cache friendly and very fast
......@@ -26,15 +28,16 @@ namespace data_structures {
*/
class aligned_stack {
// Keep bounds of our memory block
char *memory_start_;
char *memory_end_;
pointer_t memory_start_;
pointer_t memory_end_;
// Current head will always be aligned to cache lines
char *head_;
pointer_t head_;
public:
typedef char *state;
typedef pointer_t state;
aligned_stack() : memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {};
aligned_stack() : memory_start_{0}, memory_end_{0}, head_{0} {};
aligned_stack(pointer_t memory_region, std::size_t size);
aligned_stack(char *memory_region, std::size_t size);
template<typename T>
......@@ -48,7 +51,6 @@ class aligned_stack {
void reset_state(state new_state) { head_ = new_state; }
};
}
}
}
......
......@@ -9,7 +9,7 @@ namespace data_structures {
template<typename T>
T *aligned_stack::push(const T &object) {
// Copy-Construct
return new((void *) push < T > ())T(object);
return new(push < T > ())T(object);
}
template<typename T>
......
#ifndef PLS_WORK_STEALING_DEQUE_H_
#define PLS_WORK_STEALING_DEQUE_H_
#include <atomic>
#include <mutex>
#include <pls/internal/scheduling/thread_state.h>
#include "pls/internal/base/system_details.h"
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/error_handling.h"
#include "aligned_stack.h"
namespace pls {
namespace internal {
namespace data_structures {
using cas_integer = base::system_details::cas_integer;
using pointer_t = base::system_details::pointer_t;
static cas_integer get_stamp(cas_integer n) {
return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8);
}
static cas_integer get_offset(cas_integer n) {
return n & base::system_details::SECOND_HALF_CAS_INTEGER;
}
static cas_integer set_stamp(cas_integer n, cas_integer new_value) {
return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER);
}
//static cas_integer set_offset(cas_integer n, cas_integer new_value) {
// return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER);
//}
class work_stealing_deque_item {
// Pointer to the actual data
pointer_t data_;
// Index (relative to stack base) to the next and previous element
cas_integer next_item_;
cas_integer previous_item_;
public:
work_stealing_deque_item() : data_{0}, next_item_{0}, previous_item_{0} {}
template<typename Item>
Item *data() {
return reinterpret_cast<Item *>(data_);
}
template<typename Item>
void set_data(Item *data) {
data_ = reinterpret_cast<pointer_t >(data);
}
cas_integer next_item() {
return next_item_;
}
void set_next_item(cas_integer next_item) {
next_item_ = next_item;
}
cas_integer previous_item() {
return previous_item_;
}
void set_previous_item(cas_integer previous_item) {
previous_item_ = previous_item;
}
};
static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE,
"Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!");
template<typename Item>
class work_stealing_deque {
// Deque 'takes over' stack and handles memory management while in use.
// At any point in time the deque can stop using more memory and the stack can be used by other entities.
aligned_stack *stack_;
pointer_t base_pointer_;
std::atomic<cas_integer> head_;
std::atomic<cas_integer> tail_;
cas_integer previous_tail_;
base::spin_lock lock_{}; // TODO: Remove after debugging
public:
using state = aligned_stack::state;
explicit work_stealing_deque(aligned_stack *stack) : stack_{stack},
base_pointer_{0},
head_{0},
tail_{0},
previous_tail_{0} {
reset_base_pointer();
}
work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_},
base_pointer_{other.base_pointer_},
head_{other.head_.load()},
tail_{other.tail_.load()},
previous_tail_{other.previous_tail_} {}
void reset_base_pointer() {
base_pointer_ = reinterpret_cast<pointer_t >(stack_->save_state()); // Keep the base of our region in the stack
}
work_stealing_deque_item *item_at(cas_integer position) {
return reinterpret_cast<work_stealing_deque_item *>(base_pointer_
+ (base::system_details::CACHE_LINE_SIZE * position));
}
cas_integer current_stack_offset() {
return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
}
template<typename T>
std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item) {
// 'Union' type to push both on stack
using pair_t = std::pair<work_stealing_deque_item, T>;
// Allocate space on stack
auto new_pair = reinterpret_cast<pair_t *>(stack_->push<pair_t>());
// Initialize memory on stack
new((void *) &(new_pair->first)) work_stealing_deque_item();
new((void *) &(new_pair->second)) T(new_item);
return new_pair;
}
template<typename T>
Item *push_tail(const T &new_item) {
cas_integer local_tail = tail_;
auto new_pair = allocate_item(new_item);
// Prepare current tail to point to correct next items
auto tail_deque_item = item_at(local_tail);
tail_deque_item->set_data(&(new_pair->second));
tail_deque_item->set_next_item(current_stack_offset());
tail_deque_item->set_previous_item(previous_tail_);
previous_tail_ = local_tail;
// Linearization point, item appears after this write
cas_integer new_tail = current_stack_offset();
tail_ = new_tail;
return &(new_pair->second);
}
Item *pop_tail() {
cas_integer local_tail = tail_;
cas_integer local_head = head_;
if (local_tail <= get_offset(local_head)) {
return nullptr; // EMPTY
}
work_stealing_deque_item *previous_tail_item = item_at(previous_tail_);
cas_integer new_tail = previous_tail_;
previous_tail_ = previous_tail_item->previous_item();
// Publish our wish to set the tail back
tail_ = new_tail;
// Get the state of local head AFTER we published our wish
local_head = head_; // Linearization point, outside knows list is empty
if (get_offset(local_head) < new_tail) {
return previous_tail_item->data<Item>(); // Success, enough distance to other threads
}
if (get_offset(local_head) == new_tail) {
cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1);
// Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) {
return previous_tail_item->data<Item>(); // SUCCESS, we won the competition with other threads
}
}
// Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_
tail_ = get_offset(local_head); // ...we give up to the other winning thread
return nullptr; // EMPTY, we lost the competition with other threads
}
Item *pop_head() {
cas_integer local_head = head_;
cas_integer local_tail = tail_;
if (local_tail <= get_offset(local_head)) {
return nullptr; // EMPTY
}
// Load info on current deque item.
// In case we have a race with a new (aba) overwritten item at this position,
// there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head));
cas_integer next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>();
// We try to set the head to this new position.
// Possible outcomes:
// 1) no one interrupted us, we win this competition
// 2) other thread took the head, we lose to this
// 3) owning thread removed tail, we lose to this
cas_integer new_head = set_stamp(next_item_offset, get_stamp(local_head) + 1);
if (head_.compare_exchange_strong(local_head, new_head)) {
return head_data_item; // SUCCESS, we won the competition
}
return nullptr; // EMPTY, we lost the competition
}
void release_memory_until(state state) {
cas_integer item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
cas_integer local_head = head_;
cas_integer local_tail = tail_;
stack_->reset_state(state);
if (item_offset < local_tail) {
tail_ = item_offset;
if (get_offset(local_head) >= local_tail) {
head_ = set_stamp(item_offset, get_stamp(local_head) + 1);
}
}
}
void release_memory_until(Item *item) {
release_memory_until(reinterpret_cast<pointer_t >(item));
}
state save_state() {
return stack_->save_state();
}
};
}
}
}
#endif //PLS_WORK_STEALING_DEQUE_H_
......@@ -2,7 +2,7 @@
#ifndef PLS_ABSTRACT_TASK_H
#define PLS_ABSTRACT_TASK_H
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/swmr_spin_lock.h"
#include "pls/internal/helpers/unique_id.h"
namespace pls {
......@@ -16,7 +16,7 @@ class abstract_task {
private:
unsigned int depth_;
abstract_task::id unique_id_;
abstract_task *child_task_;
abstract_task *volatile child_task_;
public:
abstract_task(const unsigned int depth, const abstract_task::id &unique_id) :
......@@ -26,14 +26,14 @@ class abstract_task {
virtual void execute() = 0;
void set_child(abstract_task *child_task) { child_task_ = child_task; }
abstract_task *child() { return child_task_; }
abstract_task *child() const { return child_task_; }
void set_depth(unsigned int depth) { depth_ = depth; }
unsigned int depth() const { return depth_; }
id unique_id() const { return unique_id_; }
protected:
virtual bool internal_stealing(abstract_task *other_task) = 0;
virtual bool split_task(base::spin_lock *lock) = 0;
virtual bool split_task(base::swmr_spin_lock *lock) = 0;
bool steal_work();
};
......
......@@ -5,7 +5,7 @@
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/deque.h"
#include "pls/internal/data_structures/work_stealing_deque.h"
#include "abstract_task.h"
#include "thread_state.h"
......@@ -15,7 +15,7 @@ namespace internal {
namespace scheduling {
class fork_join_task;
class fork_join_sub_task : public data_structures::deque_item {
class fork_join_sub_task {
friend class fork_join_task;
// Coordinate finishing of sub_tasks
......@@ -25,8 +25,11 @@ class fork_join_sub_task : public data_structures::deque_item {
// Access to TBB scheduling environment
fork_join_task *tbb_task_;
bool executed = false;
int executed_at = -1;
// Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::aligned_stack::state stack_state_;
data_structures::work_stealing_deque<fork_join_sub_task>::state deque_state_;
protected:
explicit fork_join_sub_task();
fork_join_sub_task(const fork_join_sub_task &other);
......@@ -37,20 +40,19 @@ class fork_join_sub_task : public data_structures::deque_item {
public:
// Only use them when actually executing this sub_task (only public for simpler API design)
template<typename T>
void spawn_child(const T &sub_task);
void spawn_child(T &sub_task);
void wait_for_all();
private:
void spawn_child_internal(fork_join_sub_task *sub_task);
void execute();
};
template<typename Function>
class fork_join_lambda : public fork_join_sub_task {
class fork_join_lambda_by_reference : public fork_join_sub_task {
const Function *function_;
public:
explicit fork_join_lambda(const Function *function) : function_{function} {};
explicit fork_join_lambda_by_reference(const Function *function) : fork_join_sub_task{}, function_{function} {};
protected:
void execute_internal() override {
......@@ -58,15 +60,27 @@ class fork_join_lambda : public fork_join_sub_task {
}
};
template<typename Function>
class fork_join_lambda_by_value : public fork_join_sub_task {
const Function function_;
public:
explicit fork_join_lambda_by_value(const Function &function) : fork_join_sub_task{}, function_{function} {};
protected:
void execute_internal() override {
function_(this);
}
};
class fork_join_task : public abstract_task {
friend class fork_join_sub_task;
fork_join_sub_task *root_task_;
fork_join_sub_task *currently_executing_;
data_structures::aligned_stack *my_stack_;
// Double-Ended Queue management
data_structures::deque<fork_join_sub_task> deque_;
data_structures::work_stealing_deque<fork_join_sub_task> deque_;
// Steal Management
fork_join_sub_task *last_stolen_;
......@@ -75,7 +89,7 @@ class fork_join_task : public abstract_task {
fork_join_sub_task *get_stolen_sub_task();
bool internal_stealing(abstract_task *other_task) override;
bool split_task(base::spin_lock * /*lock*/) override;
bool split_task(base::swmr_spin_lock * /*lock*/) override;
public:
explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id);
......@@ -84,12 +98,21 @@ class fork_join_task : public abstract_task {
};
template<typename T>
void fork_join_sub_task::spawn_child(const T &task) {
void fork_join_sub_task::spawn_child(T &task) {
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<fork_join_sub_task, T>::value, "Only pass fork_join_sub_task subclasses!");
T *new_task = tbb_task_->my_stack_->push(task);
spawn_child_internal(new_task);
// Keep our refcount up to date
ref_count_++;
// Assign forced values
task.parent_ = this;
task.tbb_task_ = tbb_task_;
task.deque_state_ = tbb_task_->deque_.save_state();
// Push on our deque
const T const_task = task;
tbb_task_->deque_.push_tail(const_task);
}
}
......
......@@ -5,7 +5,7 @@
#include <mutex>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/swmr_spin_lock.h"
#include "abstract_task.h"
......@@ -43,7 +43,7 @@ class root_task : public abstract_task {
return false;
}
bool split_task(base::spin_lock * /*lock*/) override {
bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false;
}
};
......@@ -70,7 +70,7 @@ class root_worker_task : public abstract_task {
return false;
}
bool split_task(base::spin_lock * /*lock*/) override {
bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false;
}
};
......
......@@ -61,7 +61,7 @@ class run_on_n_threads_task : public abstract_task {
return false;
}
bool split_task(base::spin_lock *lock) override;
bool split_task(base::swmr_spin_lock *lock) override;
};
template<typename Function>
......@@ -89,19 +89,18 @@ class run_on_n_threads_task_worker : public abstract_task {
return false;
}
bool split_task(base::spin_lock * /*lock*/) override {
bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false;
}
};
template<typename Function>
bool run_on_n_threads_task<Function>::split_task(base::spin_lock *lock) {
bool run_on_n_threads_task<Function>::split_task(base::swmr_spin_lock *lock) {
if (get_counter() <= 0) {
return false;
}
// In success case, unlock.
// TODO: this locking is complicated and error prone.
lock->unlock();
lock->reader_unlock();
auto scheduler = base::this_thread::state<thread_state>()->scheduler_;
auto task = run_on_n_threads_task_worker<Function>{function_, this};
......
......@@ -43,27 +43,29 @@ void scheduler::execute_task(Task &task, int depth) {
abstract_task *new_task;
// Init Task
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task);
old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
{
my_state->lock_.writer_lock();
my_state->current_task_ = new_task;
old_task->set_child(new_task);
my_state->lock_.writer_unlock();
}
// Run Task
new_task->execute();
// Teardown state back to before the task was executed
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
my_state->task_stack_->pop<Task>();
{
my_state->lock_.writer_lock();
old_task->set_child(nullptr);
my_state->current_task_ = old_task;
my_state->task_stack_->pop<Task>();
my_state->lock_.writer_unlock();
}
}
......
......@@ -5,6 +5,7 @@
#include <random>
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/swmr_spin_lock.h"
#include "abstract_task.h"
namespace pls {
......@@ -15,13 +16,13 @@ namespace scheduling {
class scheduler;
struct thread_state {
scheduler *scheduler_;
abstract_task *root_task_;
abstract_task *current_task_;
data_structures::aligned_stack *task_stack_;
size_t id_;
base::spin_lock lock_;
std::minstd_rand random_;
alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_;
alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *root_task_;
alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_;
alignas(base::system_details::CACHE_LINE_SIZE) size_t id_;
alignas(base::system_details::CACHE_LINE_SIZE) base::swmr_spin_lock lock_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
thread_state() :
scheduler_{nullptr},
......@@ -29,6 +30,7 @@ struct thread_state {
current_task_{nullptr},
task_stack_{nullptr},
id_{0},
lock_{},
random_{id_} {};
thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) :
......@@ -37,6 +39,7 @@ struct thread_state {
current_task_{nullptr},
task_stack_{task_stack},
id_{id},
lock_{},
random_{id_} {}
};
......
......@@ -10,8 +10,8 @@ void *allocate_aligned(size_t size) {
return aligned_alloc(system_details::CACHE_LINE_SIZE, size);
}
std::uintptr_t next_alignment(std::uintptr_t size) {
std::uintptr_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
system_details::pointer_t next_alignment(system_details::pointer_t size) {
system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
......@@ -19,8 +19,17 @@ std::uintptr_t next_alignment(std::uintptr_t size) {
}
}
system_details::pointer_t previous_alignment(system_details::pointer_t size) {
system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
return size - miss_alignment;
}
}
char *next_alignment(char *pointer) {
return reinterpret_cast<char *>(next_alignment(reinterpret_cast<std::uintptr_t >(pointer)));
return reinterpret_cast<char *>(next_alignment(reinterpret_cast<system_details::pointer_t >(pointer)));
}
}
......
#include "pls/internal/base/swmr_spin_lock.h"
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace base {
bool swmr_spin_lock::reader_try_lock() {
PROFILE_LOCK("Try Acquire Read Lock")
if (write_request_.load(std::memory_order_acquire) == 1) {
return false;
}
// We think we can enter the region
readers_.fetch_add(1, std::memory_order_acquire);
if (write_request_.load(std::memory_order_acquire) == 1) {
// Whoops, the writer acquires the lock, so we back off again
readers_.fetch_add(-1, std::memory_order_release);
return false;
}
return true;
}
void swmr_spin_lock::reader_unlock() {
PROFILE_LOCK("Release Read Lock")
readers_--;
}
void swmr_spin_lock::writer_lock() {
PROFILE_LOCK("Acquire Write Lock")
// Tell the readers that we would like to write
write_request_ = 1;
// Wait for all of them to exit the critical section
while (readers_ > 0)
system_details::relax_cpu(); // Spin, not expensive as relaxed load
}
void swmr_spin_lock::writer_unlock() {
PROFILE_LOCK("Release Write Lock")
write_request_ = 0;
}
}
}
}
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/tas_spin_lock.h"
#include "pls/internal/base/backoff.h"
namespace pls {
namespace internal {
......@@ -7,27 +8,36 @@ namespace base {
void tas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock")
int tries = 0;
while (flag_.test_and_set(std::memory_order_acquire)) {
tries++;
if (tries % yield_at_tries_ == 0) {
this_thread::yield();
backoff backoff_strategy;
while (true) {
if (flag_.test_and_set(std::memory_order_acquire) == 0) {
return;
}
backoff_strategy.do_backoff();
}
}
bool tas_spin_lock::try_lock(unsigned int num_tries) {
PROFILE_LOCK("Try Acquire Lock")
while (flag_.test_and_set(std::memory_order_acquire)) {
backoff backoff_strategy;
while (true) {
if (flag_.test_and_set(std::memory_order_acquire) == 0) {
return true;
}
num_tries--;
if (num_tries <= 0) {
return false;
}
backoff_strategy.do_backoff();
}
return true;
}
void tas_spin_lock::unlock() {
PROFILE_LOCK("Unlock")
flag_.clear(std::memory_order_release);
}
......
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/ttas_spin_lock.h"
#include "pls/internal/base/backoff.h"
namespace pls {
namespace internal {
......@@ -7,40 +8,49 @@ namespace base {
void ttas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock")
int tries = 0;
int expected = 0;
backoff backoff_;
do {
while (flag_.load(std::memory_order_relaxed) == 1) {
tries++;
if (tries % yield_at_tries_ == 0) {
this_thread::yield();
}
}
while (true) {
while (flag_.load(std::memory_order_relaxed) == 1)
system_details::relax_cpu(); // Spin
expected = 0;
} while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire));
if (flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)) {
return;
}
backoff_.do_backoff();
}
}
bool ttas_spin_lock::try_lock(unsigned int num_tries) {
PROFILE_LOCK("Try Acquire Lock")
int expected = 0;
backoff backoff_;
do {
while (flag_.load(std::memory_order_relaxed) == 1) {
while (true) {
while (flag_.load() == 1) {
num_tries--;
if (num_tries <= 0) {
return false;
}
system_details::relax_cpu();
}
expected = 0;
} while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire));
return true;
if (flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)) {
return true;
}
num_tries--;
if (num_tries <= 0) {
return false;
}
backoff_.do_backoff();
}
}
void ttas_spin_lock::unlock() {
PROFILE_LOCK("Unlock")
flag_.store(0, std::memory_order_release);
}
......
......@@ -5,11 +5,16 @@ namespace pls {
namespace internal {
namespace data_structures {
aligned_stack::aligned_stack(char *memory_region, const std::size_t size) :
aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) :
memory_start_{memory_region},
memory_end_{memory_region + size},
head_{base::alignment::next_alignment(memory_start_)} {}
aligned_stack::aligned_stack(char *memory_region, const std::size_t size) :
memory_start_{(pointer_t) memory_region},
memory_end_{(pointer_t) memory_region + size},
head_{base::alignment::next_alignment(memory_start_)} {}
}
}
}
......@@ -14,11 +14,11 @@ deque_item *deque_internal::pop_head_internal() {
}
deque_item *result = head_;
head_ = head_->prev_;
head_ = head_->next_;
if (head_ == nullptr) {
tail_ = nullptr;
} else {
head_->next_ = nullptr;
head_->prev_ = nullptr;
}
return result;
......@@ -32,11 +32,11 @@ deque_item *deque_internal::pop_tail_internal() {
}
deque_item *result = tail_;
tail_ = tail_->next_;
tail_ = tail_->prev_;
if (tail_ == nullptr) {
head_ = nullptr;
} else {
tail_->prev_ = nullptr;
tail_->next_ = nullptr;
}
return result;
......@@ -46,12 +46,12 @@ void deque_internal::push_tail_internal(deque_item *new_item) {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ != nullptr) {
tail_->prev_ = new_item;
tail_->next_ = new_item;
} else {
head_ = new_item;
}
new_item->next_ = tail_;
new_item->prev_ = nullptr;
new_item->prev_ = tail_;
new_item->next_ = nullptr;
tail_ = new_item;
}
......
#include <pls/internal/base/backoff.h>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/thread_state.h"
......@@ -9,28 +10,31 @@ namespace internal {
namespace scheduling {
bool abstract_task::steal_work() {
thread_local static base::backoff backoff{};
PROFILE_STEALING("abstract_task::steal_work")
const auto my_state = base::this_thread::state<thread_state>();
const auto my_scheduler = my_state->scheduler_;
const size_t my_id = my_state->id_;
const size_t offset = my_state->random_() % my_scheduler->num_threads();
const size_t max_tries = 1; // my_scheduler->num_threads(); TODO: Tune this value
const size_t max_tries = my_scheduler->num_threads() - 1; // TODO: Tune this value
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % my_scheduler->num_threads();
if (target == my_id) {
continue;
target = (target + 1) % my_scheduler->num_threads();
}
auto target_state = my_scheduler->thread_state_for(target);
// TODO: Cleaner Locking Using std::guarded_lock
target_state->lock_.lock();
if (!target_state->lock_.reader_try_lock()) {
continue;
}
// Dig down to our level
PROFILE_STEALING("Go to our level")
abstract_task *current_task = target_state->root_task_;
while (current_task != nullptr && current_task->depth() < depth()) {
current_task = current_task->child_task_;
current_task = current_task->child();
}
PROFILE_END_BLOCK
......@@ -42,12 +46,13 @@ bool abstract_task::steal_work() {
current_task->depth_ == depth_) {
if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler
target_state->lock_.unlock();
target_state->lock_.reader_unlock();
backoff.reset();
return true;
}
// No success, we need to steal work from a deeper level using 'top level task stealing'
current_task = current_task->child_task_;
current_task = current_task->child();
}
}
PROFILE_END_BLOCK;
......@@ -59,17 +64,19 @@ bool abstract_task::steal_work() {
while (current_task != nullptr) {
auto lock = &target_state->lock_;
if (current_task->split_task(lock)) {
// internal steal was no success (we did a top level task steal)
// top level steal was a success (we did a top level task steal)
backoff.reset();
return false;
}
current_task = current_task->child_task_;
}
PROFILE_END_BLOCK;
target_state->lock_.unlock();
target_state->lock_.reader_unlock();
}
// internal steal was no success
backoff.do_backoff();
return false;
}
......
......@@ -8,22 +8,25 @@ namespace internal {
namespace scheduling {
fork_join_sub_task::fork_join_sub_task() :
data_structures::deque_item{},
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
deque_state_{0} {}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) :
data_structures::deque_item(other),
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
parent_{other.parent_},
tbb_task_{other.tbb_task_},
deque_state_{other.deque_state_} {}
void fork_join_sub_task::execute() {
PROFILE_WORK_BLOCK("execute sub_task")
tbb_task_->currently_executing_ = this;
if (executed) {
PLS_ERROR("Double Execution!")
}
executed = true;
executed_at = base::this_thread::state<thread_state>()->id_;
execute_internal();
tbb_task_->currently_executing_ = nullptr;
PROFILE_END_BLOCK
......@@ -34,18 +37,6 @@ void fork_join_sub_task::execute() {
}
}
void fork_join_sub_task::spawn_child_internal(fork_join_sub_task *sub_task) {
// Keep our refcount up to date
ref_count_++;
// Assign forced values
sub_task->parent_ = this;
sub_task->tbb_task_ = tbb_task_;
sub_task->stack_state_ = tbb_task_->my_stack_->save_state();
tbb_task_->deque_.push_tail(sub_task);
}
void fork_join_sub_task::wait_for_all() {
while (ref_count_ > 0) {
PROFILE_STEALING("get local sub task")
......@@ -64,7 +55,7 @@ void fork_join_sub_task::wait_for_all() {
}
}
}
tbb_task_->my_stack_->reset_state(stack_state_);
tbb_task_->deque_.release_memory_until(deque_state_);
}
fork_join_sub_task *fork_join_task::get_local_sub_task() {
......@@ -85,7 +76,7 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) {
} else {
// Make sub-task belong to our fork_join_task instance
stolen_sub_task->tbb_task_ = this;
stolen_sub_task->stack_state_ = my_stack_->save_state();
stolen_sub_task->deque_state_ = deque_.save_state();
// We will execute this next without explicitly moving it onto our stack storage
last_stolen_ = stolen_sub_task;
......@@ -93,7 +84,7 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) {
}
}
bool fork_join_task::split_task(base::spin_lock *lock) {
bool fork_join_task::split_task(base::swmr_spin_lock *lock) {
PROFILE_STEALING("fork_join_task::split_task")
fork_join_sub_task *stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
......@@ -102,8 +93,7 @@ bool fork_join_task::split_task(base::spin_lock *lock) {
fork_join_task task{stolen_sub_task, this->unique_id()};
// In success case, unlock.
// TODO: this locking is complicated and error prone.
lock->unlock();
lock->reader_unlock();
scheduler::execute_task(task, depth());
return true;
......@@ -113,9 +103,12 @@ void fork_join_task::execute() {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
// TODO: See if we did this right
// my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
deque_.reset_base_pointer();
root_task_->tbb_task_ = this;
root_task_->stack_state_ = my_stack_->save_state();
root_task_->deque_state_ = deque_.save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
......@@ -123,12 +116,12 @@ void fork_join_task::execute() {
fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; }
fork_join_task::fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id) :
fork_join_task::fork_join_task(fork_join_sub_task *root_task,
const abstract_task::id &id) :
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr},
deque_{},
deque_{base::this_thread::state<thread_state>()->task_stack_},
last_stolen_{nullptr} {}
}
......
add_executable(tests
main.cpp
base_tests.cpp scheduling_tests.cpp data_structures_test.cpp)
data_structures_test.cpp)
target_link_libraries(tests catch2 pls)
......@@ -4,6 +4,7 @@
#include <pls/internal/data_structures/aligned_stack.h>
#include <pls/internal/data_structures/deque.h>
#include <pls/internal/data_structures/work_stealing_deque.h>
#include <vector>
#include <mutex>
......@@ -130,3 +131,90 @@ TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]"
REQUIRE(deque.pop_tail() == &three);
}
}
TEST_CASE("work stealing deque stores objects correctly", "[internal/data_structures/aligned_stack.h]") {
constexpr long data_size = 2 << 14;
char data[data_size];
aligned_stack stack{data, data_size};
work_stealing_deque<int> deque{&stack};
int one = 1, two = 2, three = 3, four = 4;
SECTION("add and remove items form the tail") {
deque.push_tail(one);
deque.push_tail(two);
deque.push_tail(three);
REQUIRE(*deque.pop_tail() == three);
REQUIRE(*deque.pop_tail() == two);
REQUIRE(*deque.pop_tail() == one);
}
SECTION("handles getting empty by popping the tail correctly") {
deque.push_tail(one);
REQUIRE(*deque.pop_tail() == one);
deque.push_tail(two);
REQUIRE(*deque.pop_tail() == two);
}
SECTION("remove items form the head") {
deque.push_tail(one);
deque.push_tail(two);
deque.push_tail(three);
REQUIRE(*deque.pop_head() == one);
REQUIRE(*deque.pop_head() == two);
REQUIRE(*deque.pop_head() == three);
}
SECTION("handles getting empty by popping the head correctly") {
deque.push_tail(one);
REQUIRE(*deque.pop_head() == one);
deque.push_tail(two);
REQUIRE(*deque.pop_head() == two);
}
SECTION("handles getting empty by popping the head and tail correctly") {
deque.push_tail(one);
REQUIRE(*deque.pop_tail() == one);
deque.push_tail(two);
REQUIRE(*deque.pop_head() == two);
deque.push_tail(three);
REQUIRE(*deque.pop_tail() == three);
}
SECTION("handles jumps bigger 1 correctly") {
deque.push_tail(one);
deque.push_tail(two);
REQUIRE(*deque.pop_tail() == two);
deque.push_tail(three);
deque.push_tail(four);
REQUIRE(*deque.pop_head() == one);
REQUIRE(*deque.pop_head() == three);
REQUIRE(*deque.pop_head() == four);
}
SECTION("handles stack reset 1 correctly when emptied by tail") {
deque.push_tail(one);
deque.push_tail(two);
auto tmp_result = deque.pop_tail();
REQUIRE(*tmp_result == two);
deque.release_memory_until(tmp_result);
REQUIRE(*deque.pop_tail() == one);
deque.push_tail(three);
deque.push_tail(four);
REQUIRE(*deque.pop_head() == three);
REQUIRE(*deque.pop_tail() == four);
}
SECTION("synces correctly") {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment