Commit bd826491 by FritzFlorian

WIP: lock-free work stealing deque based on our stack.

parent d16ad3eb
Pipeline #1159 passed with stages
in 3 minutes 37 seconds
......@@ -6,7 +6,7 @@
#include <complex>
#include <vector>
static constexpr int CUTOFF = 10;
static constexpr int CUTOFF = 16;
static constexpr int NUM_ITERATIONS = 1000;
static constexpr int INPUT_SIZE = 2064;
typedef std::vector<std::complex<double>> complex_vector;
......
......@@ -2,48 +2,100 @@
#include <pls/internal/helpers/profiler.h>
#include <iostream>
#include <complex>
#include <vector>
static pls::static_scheduler_memory<8, 2 << 14> my_scheduler_memory;
static constexpr int CUTOFF = 16;
static constexpr int NUM_ITERATIONS = 1000;
static constexpr int INPUT_SIZE = 2064;
typedef std::vector<std::complex<double>> complex_vector;
static constexpr int CUTOFF = 10;
void divide(complex_vector::iterator data, int n) {
complex_vector tmp_odd_elements(n / 2);
for (int i = 0; i < n / 2; i++) {
tmp_odd_elements[i] = data[i * 2 + 1];
}
for (int i = 0; i < n / 2; i++) {
data[i] = data[i * 2];
}
for (int i = 0; i < n / 2; i++) {
data[i + n / 2] = tmp_odd_elements[i];
}
}
void combine(complex_vector::iterator data, int n) {
for (int i = 0; i < n / 2; i++) {
std::complex<double> even = data[i];
std::complex<double> odd = data[i + n / 2];
// w is the "twiddle-factor".
// this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
// so it won't impact the performance comparison.
std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n));
long fib_serial(long n) {
if (n == 0) {
return 0;
data[i] = even + w * odd;
data[i + n / 2] = even - w * odd;
}
if (n == 1) {
return 1;
}
void fft(complex_vector::iterator data, int n) {
if (n < 2) {
return;
}
return fib_serial(n - 1) + fib_serial(n - 2);
PROFILE_WORK_BLOCK("Divide")
divide(data, n);
PROFILE_END_BLOCK
PROFILE_WORK_BLOCK("Invoke Parallel")
if (n == CUTOFF) {
PROFILE_WORK_BLOCK("FFT Serial")
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else if (n <= CUTOFF) {
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else {
pls::invoke_parallel(
[&] { fft(data, n / 2); },
[&] { fft(data + n / 2, n / 2); }
);
}
PROFILE_END_BLOCK
PROFILE_WORK_BLOCK("Combine")
combine(data, n);
PROFILE_END_BLOCK
}
long fib(long n) {
if (n <= CUTOFF) {
return fib_serial(n);
complex_vector prepare_input(int input_size) {
std::vector<double> known_frequencies{2, 11, 52, 88, 256};
complex_vector data(input_size);
// Set our input data to match a time series of the known_frequencies.
// When applying fft to this time-series we should find these frequencies.
for (int i = 0; i < input_size; i++) {
data[i] = std::complex<double>(0.0, 0.0);
for (auto frequencie : known_frequencies) {
data[i] += sin(2 * M_PI * frequencie * i / input_size);
}
}
// Actual 'invoke_parallel' logic/code
int left, right;
pls::invoke_parallel(
[&] { left = fib(n - 1); },
[&] { right = fib(n - 2); }
);
return left + right;
return data;
}
int main() {
PROFILE_ENABLE
pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 14};
pls::scheduler scheduler{&my_scheduler_memory, 8};
long result;
complex_vector initial_input = prepare_input(INPUT_SIZE);
scheduler.perform_work([&] {
PROFILE_MAIN_THREAD
// Call looks just the same, only requirement is
// the enclosure in the perform_work lambda.
for (int i = 0; i < 10; i++) {
result = fib(30);
std::cout << "Fib(30)=" << result << std::endl;
PROFILE_WORK_BLOCK("Top Level FFT")
complex_vector input = initial_input;
fft(input.begin(), input.size());
}
});
......
......@@ -11,8 +11,5 @@
#include <pls/internal/helpers/unique_id.h>
int main() {
std::cout << pls::internal::scheduling::root_task<void (*)>::create_id().type_.hash_code() << std::endl;
std::cout
<< pls::internal::helpers::unique_id::create<pls::internal::scheduling::root_task<void (*)>>().type_.hash_code()
<< std::endl;
}
......@@ -20,6 +20,7 @@ add_library(pls STATIC
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp
include/pls/internal/data_structures/work_stealing_deque.h
include/pls/internal/helpers/prohibit_new.h
include/pls/internal/helpers/profiler.h
......
......@@ -19,10 +19,32 @@ struct aligned_wrapper {
};
void *allocate_aligned(size_t size);
std::uintptr_t next_alignment(std::uintptr_t size);
system_details::pointer_t next_alignment(system_details::pointer_t size);
system_details::pointer_t previous_alignment(system_details::pointer_t size);
char *next_alignment(char *pointer);
}
template<typename T>
struct aligned_aba_pointer {
const system_details::pointer_t pointer_;
explicit aligned_aba_pointer(T *pointer, unsigned int aba = 0) : pointer_{
reinterpret_cast<system_details::pointer_t >(pointer) + aba} {}
T *pointer() const {
return reinterpret_cast<T *>(pointer_ & system_details::CACHE_LINE_ADDRESS_USED_BITS);
}
unsigned int aba() const {
return pointer_ & system_details::CACHE_LINE_ADDRESS_UNUSED_BITS;
}
aligned_aba_pointer set_aba(unsigned int aba) const {
return aligned_aba_pointer(pointer(), aba);
}
};
}
}
}
......
......@@ -14,8 +14,8 @@ namespace internal {
namespace base {
class backoff {
static constexpr unsigned long INITIAL_SPIN_ITERS = 2u << 2u;
static constexpr unsigned long MAX_SPIN_ITERS = 2u << 6u;
const unsigned long INITIAL_SPIN_ITERS = 2u << 2u;
const unsigned long MAX_SPIN_ITERS = 2u << 6u;
unsigned long current_ = INITIAL_SPIN_ITERS;
std::minstd_rand random_;
......
......@@ -11,5 +11,6 @@
* (or its inclusion adds too much overhead).
*/
#define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1);
#define PLS_ASSERT(cond, msg) if (!cond) { PLS_ERROR(msg) }
#endif //PLS_ERROR_HANDLING_H
......@@ -18,10 +18,32 @@ namespace base {
* Currently sane default values for x86.
*/
namespace system_details {
/**
* Pointer Types needed for ABA protection mixed into addresses.
* pointer_t should be an integer type capable of holding ANY pointer value.
*/
using pointer_t = std::uintptr_t;
constexpr pointer_t ZERO_POINTER = 0;
constexpr pointer_t MAX_POINTER = ~ZERO_POINTER;
/**
* Biggest type that supports atomic CAS operations.
* Usually it is sane to assume a pointer can be swapped in a single CAS operation.
*/
using cas_integer = pointer_t;
constexpr cas_integer MIN_CAS_INTEGER = 0;
constexpr cas_integer MAX_CAS_INTEGER = ~MIN_CAS_INTEGER;
constexpr cas_integer FIRST_HALF_CAS_INTEGER = MAX_CAS_INTEGER << ((sizeof(cas_integer) / 2) * 8);
constexpr cas_integer SECOND_HALF_CAS_INTEGER = ~FIRST_HALF_CAS_INTEGER;
/**
* Most processors have 64 byte cache lines
* Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings).
*/
constexpr std::uintptr_t CACHE_LINE_SIZE = 64;
constexpr unsigned int CACHE_LINE_ADDRESS_BITS = 6;
constexpr pointer_t CACHE_LINE_SIZE = 2u << (CACHE_LINE_ADDRESS_BITS - 1);
constexpr pointer_t CACHE_LINE_ADDRESS_USED_BITS = MAX_POINTER << CACHE_LINE_ADDRESS_BITS;
constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BITS;
/**
* Choose one of the following ways to store thread specific data.
......
......@@ -19,11 +19,10 @@ namespace base {
*/
class ttas_spin_lock {
std::atomic<int> flag_;
backoff backoff_;
public:
ttas_spin_lock() : flag_{0}, backoff_{} {};
ttas_spin_lock(const ttas_spin_lock &/*other*/) : flag_{0}, backoff_{} {}
ttas_spin_lock() : flag_{0} {};
ttas_spin_lock(const ttas_spin_lock &/*other*/) : flag_{0} {}
void lock();
bool try_lock(unsigned int num_tries = 1);
......
......@@ -12,6 +12,8 @@ namespace pls {
namespace internal {
namespace data_structures {
using base::system_details::pointer_t;
/**
* Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region.
* The objects will be stored aligned in the stack, making the storage cache friendly and very fast
......@@ -26,15 +28,16 @@ namespace data_structures {
*/
class aligned_stack {
// Keep bounds of our memory block
char *memory_start_;
char *memory_end_;
pointer_t memory_start_;
pointer_t memory_end_;
// Current head will always be aligned to cache lines
char *head_;
pointer_t head_;
public:
typedef char *state;
typedef pointer_t state;
aligned_stack() : memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {};
aligned_stack() : memory_start_{0}, memory_end_{0}, head_{0} {};
aligned_stack(pointer_t memory_region, std::size_t size);
aligned_stack(char *memory_region, std::size_t size);
template<typename T>
......@@ -48,7 +51,6 @@ class aligned_stack {
void reset_state(state new_state) { head_ = new_state; }
};
}
}
}
......
......@@ -9,7 +9,7 @@ namespace data_structures {
template<typename T>
T *aligned_stack::push(const T &object) {
// Copy-Construct
return new((void *) push < T > ())T(object);
return new(push < T > ())T(object);
}
template<typename T>
......
#ifndef PLS_WORK_STEALING_DEQUE_H_
#define PLS_WORK_STEALING_DEQUE_H_
#include <atomic>
#include <mutex>
#include <pls/internal/scheduling/thread_state.h>
#include "pls/internal/base/system_details.h"
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/error_handling.h"
#include "aligned_stack.h"
namespace pls {
namespace internal {
namespace data_structures {
using cas_integer = base::system_details::cas_integer;
using pointer_t = base::system_details::pointer_t;
static cas_integer get_jump_wish(cas_integer n) {
return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8);
}
static cas_integer get_offset(cas_integer n) {
return n & base::system_details::SECOND_HALF_CAS_INTEGER;
}
static cas_integer set_jump_wish(cas_integer n, cas_integer new_value) {
return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER);
}
static cas_integer set_offset(cas_integer n, cas_integer new_value) {
return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER);
}
class work_stealing_deque_item {
// Pointer to the actual data
pointer_t data_;
// Index (relative to stack base) to the next and previous element
cas_integer next_item_;
cas_integer previous_item_;
public:
work_stealing_deque_item() : data_{0}, next_item_{0}, previous_item_{0} {}
template<typename Item>
Item *data() {
return reinterpret_cast<Item *>(data_);
}
template<typename Item>
void set_data(Item *data) {
data_ = reinterpret_cast<pointer_t >(data);
}
cas_integer next_item() {
return next_item_;
}
void set_next_item(cas_integer next_item) {
next_item_ = next_item;
}
cas_integer previous_item() {
return previous_item_;
}
void set_previous_item(cas_integer previous_item) {
previous_item_ = previous_item;
}
};
static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE,
"Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!");
template<typename Item>
class work_stealing_deque {
// Deque 'takes over' stack and handles memory management while in use.
// At any point in time the deque can stop using more memory and the stack can be used by other entities.
aligned_stack *stack_;
pointer_t base_pointer_;
std::atomic<cas_integer> head_;
std::atomic<cas_integer> tail_;
cas_integer previous_tail_;
base::spin_lock lock_{}; // TODO: Remove after debugging
public:
using state = aligned_stack::state;
explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, head_{0}, tail_{0}, previous_tail_{0} {
reset_base_pointer();
}
work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_},
base_pointer_{other.base_pointer_},
head_{other.head_.load()},
tail_{other.tail_.load()},
previous_tail_{other.previous_tail_} {}
void reset_base_pointer() {
base_pointer_ = reinterpret_cast<pointer_t >(stack_->save_state()); // Keep the base of our region in the stack
}
work_stealing_deque_item *item_at(cas_integer position) {
return reinterpret_cast<work_stealing_deque_item *>(base_pointer_
+ (base::system_details::CACHE_LINE_SIZE * position));
}
cas_integer current_stack_offset() {
return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
}
template<typename T>
std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item) {
// 'Union' type to push both on stack
using pair_t = std::pair<work_stealing_deque_item, T>;
// Allocate space on stack
auto new_pair = reinterpret_cast<pair_t *>(stack_->push<pair_t>());
// Initialize memory on stack
new((void *) &(new_pair->first)) work_stealing_deque_item();
new((void *) &(new_pair->second)) T(new_item);
return new_pair;
}
template<typename T>
Item *push_tail(const T &new_item) {
// std::lock_guard<base::spin_lock> lock{lock_};
cas_integer local_tail = tail_;
cas_integer local_head = head_;
// PLS_ASSERT((local_tail >= get_offset(local_head)), "Tail MUST be in front of head!")
auto new_pair = allocate_item(new_item);
// Prepare current tail to point to correct next items
auto tail_deque_item = item_at(local_tail);
tail_deque_item->set_data(&(new_pair->second));
tail_deque_item->set_next_item(current_stack_offset());
tail_deque_item->set_previous_item(previous_tail_);
previous_tail_ = local_tail;
// Linearization point, item appears after this write
cas_integer new_tail = current_stack_offset();
tail_ = new_tail;
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Pushed Tail " << local_tail << "->" << new_tail << std::endl;
// }
}
Item *pop_tail() {
// std::lock_guard<base::spin_lock> lock{lock_};
cas_integer local_tail = tail_;
cas_integer local_head = head_;
if (local_tail <= get_offset(local_head)) {
return nullptr; // EMPTY
}
work_stealing_deque_item *previous_tail_item = item_at(previous_tail_);
cas_integer new_tail = previous_tail_;
previous_tail_ = previous_tail_item->previous_item();
// Publish our wish to set the tail back
tail_ = new_tail;
// Get the state of local head AFTER we published our wish
local_head = head_; // Linearization point, outside knows list is empty
if (get_offset(local_head) < new_tail) {
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Poped Tail (distance) " << local_tail << "->" << new_tail << std::endl;
// }
return previous_tail_item->data<Item>(); // Enough distance, return item
}
cas_integer new_head = set_jump_wish(new_tail, 999999);
if (get_offset(local_head) == new_tail) {
// Try competing with consumers...
if (head_.compare_exchange_strong(local_head, new_head)) {
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Poped Tail (won competition 1) " << local_tail << "->" << new_tail << std::endl;
// }
return previous_tail_item->data<Item>(); // We won the competition, linearization on whom got the item
}
// Cosumer either registered jump wish or has gotten the item.
// Local_Head has the new value of the head, see if the other thread got to advance it
// and if not (only jump wish) try to win the competition.
if (get_offset(local_head) == new_tail && head_.compare_exchange_strong(local_head, new_head)) {
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Poped Tail (won competition 2) " << local_tail << "->" << new_tail << std::endl;
// }
return previous_tail_item->data<Item>(); // We won the competition, linearization on whom got the item
}
}
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "FAILED to pop tail (lost competition) " << get_offset(local_head) << "; " << local_tail << "->"
// << new_tail << std::endl;
// }
// Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_
// We can not set it to 0, as the memory is still in use.
tail_ = get_offset(local_head); // Set tail to match the head value the other thread won the battle of
return nullptr;
}
Item *pop_head() {
// std::lock_guard<base::spin_lock> lock{lock_};
cas_integer local_tail = tail_;
cas_integer local_head = head_;
cas_integer local_head_offset = get_offset(local_head);
if (local_head_offset >= local_tail) {
return nullptr; // EMPTY
}
work_stealing_deque_item *head_deque_item = item_at(local_head_offset);
cas_integer next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>();
cas_integer jump_wish_head = set_jump_wish(local_head_offset, head_deque_item->next_item());
if (!head_.compare_exchange_strong(local_head, jump_wish_head)) {
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Failed to pop head (first cas) " << local_head_offset << "->" << next_item_offset << std::endl;
// }
return nullptr; // Someone interrupted us
}
local_tail = tail_;
if (local_head_offset >= local_tail) {
// std::cout << "Failed to pop head (second tail test) " << get_offset(local_head) << std::endl;
return nullptr; // EMPTY, tail was removed while we registered our jump wish
}
cas_integer new_head = next_item_offset;
if (!head_.compare_exchange_strong(jump_wish_head, new_head)) {
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Failed to pop head (second cas) " << local_head_offset << "->" << next_item_offset << std::endl;
// }
return nullptr; // we lost the 'fight' on the item...
}
// {
// std::lock_guard<base::spin_lock> lock{lock_};
// std::cout << base::this_thread::state<scheduling::thread_state>()->id_ << " - "
// << "Popped Head " << local_head_offset << "->" << next_item_offset << std::endl;
// }
return head_deque_item->data<Item>(); // We won the 'fight' on the item, it is now save to access it!
}
void release_memory_until(state state) {
// std::lock_guard<base::spin_lock> lock{lock_};
cas_integer
item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
cas_integer local_head = head_;
cas_integer local_tail = tail_;
// if (local_tail != item_offset) {
// std::cout << "...";
// } else {
// std::cout << "...";
// }
stack_->reset_state(state);
if (item_offset < local_tail) {
tail_ = item_offset;
if (get_offset(local_head) >= local_tail) {
head_ = item_offset;
}
}
// std::cout << "Release Memory " << item_offset << std::endl;
}
void release_memory_until(Item *item) {
release_memory_until(reinterpret_cast<pointer_t >(item));
}
state save_state() {
return stack_->save_state();
}
// PUSH item onto stack (allocate + insert into stack) - CHECK
// POP item from bottom of stack (remove from stack, memory still used) - CHECK
// POP item from top of stack (remove from stack, memory still used) - CHECK
// RELEASE memory from all items allocated after this one (including this one) - CHECK
// -> Tell the data structure that it is safe to reuse the stack space
// Note: Item that is released must not be part of the queue at this point (it is already removed!)
};
}
}
}
#endif //PLS_WORK_STEALING_DEQUE_H_
......@@ -5,7 +5,7 @@
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/deque.h"
#include "pls/internal/data_structures/work_stealing_deque.h"
#include "abstract_task.h"
#include "thread_state.h"
......@@ -15,7 +15,7 @@ namespace internal {
namespace scheduling {
class fork_join_task;
class fork_join_sub_task : public data_structures::deque_item {
class fork_join_sub_task {
friend class fork_join_task;
// Coordinate finishing of sub_tasks
......@@ -25,8 +25,11 @@ class fork_join_sub_task : public data_structures::deque_item {
// Access to TBB scheduling environment
fork_join_task *tbb_task_;
bool executed = false;
int executed_at = -1;
// Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::aligned_stack::state stack_state_;
data_structures::work_stealing_deque<fork_join_sub_task>::state deque_state_;
protected:
explicit fork_join_sub_task();
fork_join_sub_task(const fork_join_sub_task &other);
......@@ -37,11 +40,10 @@ class fork_join_sub_task : public data_structures::deque_item {
public:
// Only use them when actually executing this sub_task (only public for simpler API design)
template<typename T>
void spawn_child(const T &sub_task);
void spawn_child(T &sub_task);
void wait_for_all();
private:
void spawn_child_internal(fork_join_sub_task *sub_task);
void execute();
};
......@@ -50,7 +52,7 @@ class fork_join_lambda_by_reference : public fork_join_sub_task {
const Function *function_;
public:
explicit fork_join_lambda_by_reference(const Function *function) : function_{function} {};
explicit fork_join_lambda_by_reference(const Function *function) : fork_join_sub_task{}, function_{function} {};
protected:
void execute_internal() override {
......@@ -63,7 +65,7 @@ class fork_join_lambda_by_value : public fork_join_sub_task {
const Function function_;
public:
explicit fork_join_lambda_by_value(const Function &function) : function_{function} {};
explicit fork_join_lambda_by_value(const Function &function) : fork_join_sub_task{}, function_{function} {};
protected:
void execute_internal() override {
......@@ -76,10 +78,9 @@ class fork_join_task : public abstract_task {
fork_join_sub_task *root_task_;
fork_join_sub_task *currently_executing_;
data_structures::aligned_stack *my_stack_;
// Double-Ended Queue management
data_structures::deque<fork_join_sub_task> deque_;
data_structures::work_stealing_deque<fork_join_sub_task> deque_;
// Steal Management
fork_join_sub_task *last_stolen_;
......@@ -97,12 +98,21 @@ class fork_join_task : public abstract_task {
};
template<typename T>
void fork_join_sub_task::spawn_child(const T &task) {
void fork_join_sub_task::spawn_child(T &task) {
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<fork_join_sub_task, T>::value, "Only pass fork_join_sub_task subclasses!");
T *new_task = tbb_task_->my_stack_->push(task);
spawn_child_internal(new_task);
// Keep our refcount up to date
ref_count_++;
// Assign forced values
task.parent_ = this;
task.tbb_task_ = tbb_task_;
task.deque_state_ = tbb_task_->deque_.save_state();
// Push on our deque
const T const_task = task;
tbb_task_->deque_.push_tail(const_task);
}
}
......
......@@ -10,8 +10,8 @@ void *allocate_aligned(size_t size) {
return aligned_alloc(system_details::CACHE_LINE_SIZE, size);
}
std::uintptr_t next_alignment(std::uintptr_t size) {
std::uintptr_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
system_details::pointer_t next_alignment(system_details::pointer_t size) {
system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
......@@ -19,8 +19,17 @@ std::uintptr_t next_alignment(std::uintptr_t size) {
}
}
system_details::pointer_t previous_alignment(system_details::pointer_t size) {
system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
return size - miss_alignment;
}
}
char *next_alignment(char *pointer) {
return reinterpret_cast<char *>(next_alignment(reinterpret_cast<std::uintptr_t >(pointer)));
return reinterpret_cast<char *>(next_alignment(reinterpret_cast<system_details::pointer_t >(pointer)));
}
}
......
......@@ -23,22 +23,22 @@ bool swmr_spin_lock::reader_try_lock() {
void swmr_spin_lock::reader_unlock() {
PROFILE_LOCK("Release Read Lock")
readers_.fetch_add(-1, std::memory_order_release);
readers_--;
}
void swmr_spin_lock::writer_lock() {
PROFILE_LOCK("Acquire Write Lock")
// Tell the readers that we would like to write
write_request_.store(1, std::memory_order_acquire);
write_request_ = 1;
// Wait for all of them to exit the critical section
while (readers_.load(std::memory_order_acquire) > 0)
while (readers_ > 0)
system_details::relax_cpu(); // Spin, not expensive as relaxed load
}
void swmr_spin_lock::writer_unlock() {
PROFILE_LOCK("Release Write Lock")
write_request_.store(0, std::memory_order_release);
write_request_ = 0;
}
}
......
......@@ -9,7 +9,7 @@ namespace base {
void ttas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock")
int expected = 0;
backoff_.reset();
backoff backoff_;
while (true) {
while (flag_.load(std::memory_order_relaxed) == 1)
......@@ -26,7 +26,7 @@ void ttas_spin_lock::lock() {
bool ttas_spin_lock::try_lock(unsigned int num_tries) {
PROFILE_LOCK("Try Acquire Lock")
int expected = 0;
backoff_.reset();
backoff backoff_;
while (true) {
while (flag_.load() == 1) {
......
......@@ -5,11 +5,16 @@ namespace pls {
namespace internal {
namespace data_structures {
aligned_stack::aligned_stack(char *memory_region, const std::size_t size) :
aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) :
memory_start_{memory_region},
memory_end_{memory_region + size},
head_{base::alignment::next_alignment(memory_start_)} {}
aligned_stack::aligned_stack(char *memory_region, const std::size_t size) :
memory_start_{(pointer_t) memory_region},
memory_end_{(pointer_t) memory_region + size},
head_{base::alignment::next_alignment(memory_start_)} {}
}
}
}
......@@ -14,11 +14,11 @@ deque_item *deque_internal::pop_head_internal() {
}
deque_item *result = head_;
head_ = head_->prev_;
head_ = head_->next_;
if (head_ == nullptr) {
tail_ = nullptr;
} else {
head_->next_ = nullptr;
head_->prev_ = nullptr;
}
return result;
......@@ -32,11 +32,11 @@ deque_item *deque_internal::pop_tail_internal() {
}
deque_item *result = tail_;
tail_ = tail_->next_;
tail_ = tail_->prev_;
if (tail_ == nullptr) {
head_ = nullptr;
} else {
tail_->prev_ = nullptr;
tail_->next_ = nullptr;
}
return result;
......@@ -46,12 +46,12 @@ void deque_internal::push_tail_internal(deque_item *new_item) {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ != nullptr) {
tail_->prev_ = new_item;
tail_->next_ = new_item;
} else {
head_ = new_item;
}
new_item->next_ = tail_;
new_item->prev_ = nullptr;
new_item->prev_ = tail_;
new_item->next_ = nullptr;
tail_ = new_item;
}
......
......@@ -8,22 +8,26 @@ namespace internal {
namespace scheduling {
fork_join_sub_task::fork_join_sub_task() :
data_structures::deque_item{},
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
deque_state_{0} {}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) :
data_structures::deque_item(other),
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
parent_{other.parent_},
tbb_task_{other.tbb_task_},
deque_state_{other.deque_state_} {}
void fork_join_sub_task::execute() {
PROFILE_WORK_BLOCK("execute sub_task")
tbb_task_->currently_executing_ = this;
if (executed) {
int my_id = base::this_thread::state<thread_state>()->id_;
PLS_ERROR("Double Execution!")
}
executed = true;
executed_at = base::this_thread::state<thread_state>()->id_;
execute_internal();
tbb_task_->currently_executing_ = nullptr;
PROFILE_END_BLOCK
......@@ -34,18 +38,6 @@ void fork_join_sub_task::execute() {
}
}
void fork_join_sub_task::spawn_child_internal(fork_join_sub_task *sub_task) {
// Keep our refcount up to date
ref_count_++;
// Assign forced values
sub_task->parent_ = this;
sub_task->tbb_task_ = tbb_task_;
sub_task->stack_state_ = tbb_task_->my_stack_->save_state();
tbb_task_->deque_.push_tail(sub_task);
}
void fork_join_sub_task::wait_for_all() {
while (ref_count_ > 0) {
PROFILE_STEALING("get local sub task")
......@@ -54,19 +46,17 @@ void fork_join_sub_task::wait_for_all() {
if (local_task != nullptr) {
local_task->execute();
} else {
while (ref_count_ > 0) {
// Try to steal work.
// External steal will be executed implicitly if success
PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work();
PROFILE_END_BLOCK
if (internal_steal_success) {
tbb_task_->last_stolen_->execute();
}
// Try to steal work.
// External steal will be executed implicitly if success
PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work();
PROFILE_END_BLOCK
if (internal_steal_success) {
tbb_task_->last_stolen_->execute();
}
}
}
tbb_task_->my_stack_->reset_state(stack_state_);
tbb_task_->deque_.release_memory_until(deque_state_);
}
fork_join_sub_task *fork_join_task::get_local_sub_task() {
......@@ -74,7 +64,9 @@ fork_join_sub_task *fork_join_task::get_local_sub_task() {
}
fork_join_sub_task *fork_join_task::get_stolen_sub_task() {
return deque_.pop_head();
auto tmp = deque_.save_state();
auto result = deque_.pop_head();
return result;
}
bool fork_join_task::internal_stealing(abstract_task *other_task) {
......@@ -87,7 +79,7 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) {
} else {
// Make sub-task belong to our fork_join_task instance
stolen_sub_task->tbb_task_ = this;
stolen_sub_task->stack_state_ = my_stack_->save_state();
stolen_sub_task->deque_state_ = deque_.save_state();
// We will execute this next without explicitly moving it onto our stack storage
last_stolen_ = stolen_sub_task;
......@@ -114,9 +106,12 @@ void fork_join_task::execute() {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
// TODO: See if we did this right
// my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
deque_.reset_base_pointer();
root_task_->tbb_task_ = this;
root_task_->stack_state_ = my_stack_->save_state();
root_task_->deque_state_ = deque_.save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
......@@ -124,12 +119,12 @@ void fork_join_task::execute() {
fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; }
fork_join_task::fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id) :
fork_join_task::fork_join_task(fork_join_sub_task *root_task,
const abstract_task::id &id) :
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr},
deque_{},
deque_{base::this_thread::state<thread_state>()->task_stack_},
last_stolen_{nullptr} {}
}
......
add_executable(tests
main.cpp
base_tests.cpp scheduling_tests.cpp data_structures_test.cpp)
data_structures_test.cpp)
target_link_libraries(tests catch2 pls)
......@@ -4,6 +4,7 @@
#include <pls/internal/data_structures/aligned_stack.h>
#include <pls/internal/data_structures/deque.h>
#include <pls/internal/data_structures/work_stealing_deque.h>
#include <vector>
#include <mutex>
......@@ -130,3 +131,90 @@ TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]"
REQUIRE(deque.pop_tail() == &three);
}
}
TEST_CASE("work stealing deque stores objects correctly", "[internal/data_structures/aligned_stack.h]") {
constexpr long data_size = 2 << 14;
char data[data_size];
aligned_stack stack{data, data_size};
work_stealing_deque<int> deque{&stack};
int one = 1, two = 2, three = 3, four = 4;
SECTION("add and remove items form the tail") {
deque.push_tail(one);
deque.push_tail(two);
deque.push_tail(three);
REQUIRE(*deque.pop_tail() == three);
REQUIRE(*deque.pop_tail() == two);
REQUIRE(*deque.pop_tail() == one);
}
SECTION("handles getting empty by popping the tail correctly") {
deque.push_tail(one);
REQUIRE(*deque.pop_tail() == one);
deque.push_tail(two);
REQUIRE(*deque.pop_tail() == two);
}
SECTION("remove items form the head") {
deque.push_tail(one);
deque.push_tail(two);
deque.push_tail(three);
REQUIRE(*deque.pop_head() == one);
REQUIRE(*deque.pop_head() == two);
REQUIRE(*deque.pop_head() == three);
}
SECTION("handles getting empty by popping the head correctly") {
deque.push_tail(one);
REQUIRE(*deque.pop_head() == one);
deque.push_tail(two);
REQUIRE(*deque.pop_head() == two);
}
SECTION("handles getting empty by popping the head and tail correctly") {
deque.push_tail(one);
REQUIRE(*deque.pop_tail() == one);
deque.push_tail(two);
REQUIRE(*deque.pop_head() == two);
deque.push_tail(three);
REQUIRE(*deque.pop_tail() == three);
}
SECTION("handles jumps bigger 1 correctly") {
deque.push_tail(one);
deque.push_tail(two);
REQUIRE(*deque.pop_tail() == two);
deque.push_tail(three);
deque.push_tail(four);
REQUIRE(*deque.pop_head() == one);
REQUIRE(*deque.pop_head() == three);
REQUIRE(*deque.pop_head() == four);
}
SECTION("handles stack reset 1 correctly when emptied by tail") {
deque.push_tail(one);
deque.push_tail(two);
auto tmp_result = deque.pop_tail();
REQUIRE(*tmp_result == two);
deque.release_memory_until(tmp_result);
REQUIRE(*deque.pop_tail() == one);
deque.push_tail(three);
deque.push_tail(four);
REQUIRE(*deque.pop_head() == three);
REQUIRE(*deque.pop_tail() == four);
}
SECTION("synces correctly") {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment