Commit 693d4e9b by FritzFlorian

WIP: re-work static memory allocation for scheduler.

We changed how the memory is allocated from passing char* buffers to then store objects into to creating 'fat objects' for all scheduler state. This eases development for us, as we can make changes to data structures without too much effort (e.g. add a second array to manage tasks if required).
parent 5bc35f9e
......@@ -12,6 +12,7 @@ set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/lib)
# specific setup code is located in individual files.
include(cmake/DisabelInSource.cmake)
include(cmake/SetupOptimizationLevel.cmake)
include(cmake/SetupAssemblyOutput.cmake)
include(cmake/SetupThreadingSupport.cmake)
include(cmake/SetupThreadSanitizer.cmake)
include(cmake/SetupAddressSanitizer.cmake)
......
......@@ -23,6 +23,7 @@ add_library(pls STATIC
include/pls/dataflow/internal/graph_impl.h
include/pls/dataflow/internal/switch_node.h
include/pls/dataflow/internal/merge_node.h
include/pls/dataflow/internal/split_node.h
include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
......@@ -33,15 +34,11 @@ add_library(pls STATIC
include/pls/internal/base/barrier.h src/internal/base/barrier.cpp
include/pls/internal/base/system_details.h
include/pls/internal/base/error_handling.h
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/base/alignment.h include/pls/internal/base/alignment_impl.h
include/pls/internal/scheduling/data_structures/aligned_stack.h src/internal/scheduling/data_structures/aligned_stack.cpp
include/pls/internal/scheduling/data_structures/aligned_stack_impl.h
include/pls/internal/scheduling/data_structures/deque.h
include/pls/internal/scheduling/data_structures/locking_deque.h
include/pls/internal/scheduling/data_structures/locking_deque_impl.h
include/pls/internal/scheduling/data_structures/work_stealing_deque.h include/pls/internal/scheduling/data_structures/work_stealing_deque_impl.h
include/pls/internal/scheduling/data_structures/stamped_integer.h
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/stamped_integer.h
include/pls/internal/helpers/prohibit_new.h
include/pls/internal/helpers/profiler.h
......@@ -49,14 +46,17 @@ add_library(pls STATIC
include/pls/internal/helpers/unique_id.h
include/pls/internal/helpers/range.h
include/pls/internal/helpers/seqence.h
include/pls/internal/helpers/member_function.h
include/pls/internal/scheduling/thread_state.h
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/scheduling/scheduler_memory.h
include/pls/internal/scheduling/lambda_task.h
include/pls/dataflow/internal/split_node.h include/pls/internal/helpers/member_function.h)
include/pls/internal/scheduling/task_manager.h
include/pls/internal/scheduling/cont_manager.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
PUBLIC
......
......@@ -4,6 +4,7 @@
#include <cstdint>
#include <cstdlib>
#include <array>
#include "system_details.h"
......@@ -12,18 +13,40 @@ namespace internal {
namespace base {
namespace alignment {
system_details::pointer_t next_alignment(system_details::pointer_t size);
char *next_alignment(char *pointer);
system_details::pointer_t previous_alignment(system_details::pointer_t size);
/**
* Forces alignment requirements on a type equal to a cache line size.
* This can be useful to store the elements aligned in an array or to allocate them using new.
*
* The object is constructed using perfect forwarding. Thus initialization looks identical to the
* wrapped object (can be used in containers), access to the elements has to be done through
* the pointer returned by pointer() or moved out using object().
*
* (This is required, as C++11 does not allow 'over_aligned' types, meaning alignments higher
* than the max_align_t (most times 16 bytes) wont be respected properly.)
*/
template<typename T>
struct aligned_wrapper {
alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)];
T *pointer() { return reinterpret_cast<T *>(data); }
struct alignas(system_details::CACHE_LINE_SIZE) cache_alignment_wrapper {
private:
std::array<char, sizeof(T) + system_details::CACHE_LINE_SIZE> memory_;
char *data_;
public:
template<typename ...ARGS>
explicit cache_alignment_wrapper(ARGS &&...args): memory_{}, data_{next_alignment(memory_.data())} {
new(data_) T(std::forward<ARGS>(args)...);
}
T &object() { return *reinterpret_cast<T *>(data_); }
T *pointer() { return reinterpret_cast<T *>(data_); }
};
void *allocate_aligned(size_t size);
system_details::pointer_t next_alignment(system_details::pointer_t size);
system_details::pointer_t previous_alignment(system_details::pointer_t size);
void *allocate_aligned(size_t size);
#include "alignment_impl.h"
}
}
}
}
......
#ifndef PLS_INTERNAL_BASE_ALIGNMENT_IMPL_H_
#define PLS_INTERNAL_BASE_ALIGNMENT_IMPL_H_
namespace pls {
namespace internal {
namespace base {
namespace alignment {
void *allocate_aligned(size_t size) {
return aligned_alloc(system_details::CACHE_LINE_SIZE, size);
}
constexpr system_details::pointer_t next_alignment(system_details::pointer_t size) {
return (size % system_details::CACHE_LINE_SIZE) == 0 ?
size :
size + (system_details::CACHE_LINE_SIZE - (size % system_details::CACHE_LINE_SIZE));
}
constexpr system_details::pointer_t previous_alignment(system_details::pointer_t size) {
return (size % system_details::CACHE_LINE_SIZE) == 0 ?
size :
size - (size % system_details::CACHE_LINE_SIZE);
}
constexpr char *next_alignment(char *pointer) {
return reinterpret_cast<char *>(next_alignment(reinterpret_cast<system_details::pointer_t >(pointer)));
}
}
}
}
}
#endif //PLS_INTERNAL_BASE_ALIGNMENT_IMPL_H_
......@@ -83,6 +83,7 @@ class thread {
friend class this_thread;
// Keep handle to native implementation
pthread_t pthread_thread_;
bool running_;
template<typename Function, typename State>
static void *start_pthread_internal(void *thread_pointer);
......@@ -94,12 +95,14 @@ class thread {
template<typename Function>
explicit thread(const Function &function);
public:
explicit thread();
~thread();
void join();
// make object move only
thread(thread &&) noexcept = default;
thread &operator=(thread &&) noexcept = default;
thread(thread &&) noexcept;
thread &operator=(thread &&) noexcept;
thread(const thread &) = delete;
thread &operator=(const thread &) = delete;
......
......@@ -53,7 +53,8 @@ void *thread::start_pthread_internal(void *thread_pointer) {
template<typename Function, typename State>
thread::thread(const Function &function, State *state_pointer):
pthread_thread_{} {
pthread_thread_{},
running_{true} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
if (!this_thread::local_storage_key_initialized_) {
......
......@@ -4,13 +4,15 @@
#include <cstdint>
#include <cstdlib>
#include <array>
#include <memory>
#include "pls/internal/base/error_handling.h"
#include "pls/internal/base/alignment.h"
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace scheduling {
namespace data_structures {
using base::system_details::pointer_t;
......@@ -20,44 +22,62 @@ using base::system_details::pointer_t;
* The objects will be stored aligned in the stack, making the storage cache friendly and very fast
* (as long as one can live with the stack restrictions).
*
* IMPORTANT: Does not call destructors on stored objects! Do not allocate resources in the objects!
* IMPORTANT: Does only call the deconstructor when explicitly using pop<T>().
* In this case you have to be sure that push<T, ...>() and pop<T>() calls
* match up through out your program.
*
* Usage:
* aligned_stack stack{pointer_to_memory, size_of_memory};
* aligned_static_stack<SIZE> stack; or heap_aligned_stack stack(size);
* T* pointer = stack.push<T>(constructor_arguments); // Perfect-Forward-Construct the object on top of stack
* stack.pop<T>(); // Remove the top object of type T
* stack.pop<T>(); // Remove the top object of type T and deconstruct it
*/
class aligned_stack {
public:
typedef size_t stack_offset;
aligned_stack() : aligned_memory_start_{0}, aligned_memory_end_{0}, max_offset_{0}, current_offset_{0} {};
aligned_stack(pointer_t memory_region, std::size_t size);
aligned_stack(char *memory_region, std::size_t size);
protected:
aligned_stack(char *memory_pointer, size_t size);
aligned_stack(char *memory_pointer, size_t size, size_t original_size);
public:
template<typename T, typename ...ARGS>
T *push(ARGS &&... args);
template<typename T>
void *push_bytes();
void *push_bytes(size_t size);
template<typename T>
T pop();
void pop();
void *memory_at_offset(stack_offset offset) const;
stack_offset save_offset() const { return current_offset_; }
void reset_offset(stack_offset new_offset) { current_offset_ = new_offset; }
private:
protected:
// Keep bounds of our memory block
pointer_t aligned_memory_start_;
pointer_t aligned_memory_end_;
char *unaligned_memory_pointer_;
char *memory_pointer_;
stack_offset max_offset_;
stack_offset current_offset_;
};
}
template<size_t SIZE>
class static_aligned_stack : public aligned_stack {
public:
static_aligned_stack();
private:
alignas(base::system_details::CACHE_LINE_SIZE) std::array<char, SIZE> memory_;
};
class heap_aligned_stack : public aligned_stack {
public:
explicit heap_aligned_stack(size_t size);
~heap_aligned_stack();
};
}
}
}
......
......@@ -6,12 +6,10 @@
namespace pls {
namespace internal {
namespace scheduling {
namespace data_structures {
template<typename T, typename ...ARGS>
T *aligned_stack::push(ARGS &&... args) {
// Perfect-Forward construct
return new(push_bytes<T>())T(std::forward<ARGS>(args)...);
}
......@@ -21,14 +19,24 @@ void *aligned_stack::push_bytes() {
}
template<typename T>
T aligned_stack::pop() {
void aligned_stack::pop() {
auto num_cache_lines = base::alignment::next_alignment(sizeof(T)) / base::system_details::CACHE_LINE_SIZE;
current_offset_ -= num_cache_lines;
return *reinterpret_cast<T *>(memory_at_offset(current_offset_));
auto result = *reinterpret_cast<T *>(memory_at_offset(current_offset_));
~result();
}
template<size_t SIZE>
static_aligned_stack<SIZE>::static_aligned_stack(): aligned_stack{memory_.data()} {};
heap_aligned_stack::heap_aligned_stack(size_t size) :
aligned_stack{new char[base::alignment::next_alignment(size)], size, base::alignment::next_alignment(size)} {}
heap_aligned_stack::~heap_aligned_stack() {
delete[] unaligned_memory_pointer_;
}
}
}
}
......
#ifndef PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_
#define PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_
#ifndef PLS_STAMPED_INTEGER_H_
#define PLS_STAMPED_INTEGER_H_
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace scheduling {
namespace data_structures {
constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2;
......@@ -24,6 +23,5 @@ struct stamped_integer {
}
}
}
}
#endif //PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_
#endif //PLS_STAMPED_INTEGER_H_
#ifndef PLS_CONT_MANAGER_H_
#define PLS_CONT_MANAGER_H_
#include <memory>
#include "pls/internal/data_structures/aligned_stack.h"
namespace pls {
namespace internal {
namespace scheduling {
class cont_manager {
public:
explicit cont_manager() = default;
private:
// TODO: Add attributes
};
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
class static_cont_manager : public cont_manager {
};
}
}
}
#endif //PLS_CONT_MANAGER_H_
#ifndef PLS_DEQUE_H_
#define PLS_DEQUE_H_
#include "work_stealing_deque.h"
#include "locking_deque.h"
namespace pls {
namespace internal {
namespace scheduling {
namespace data_structures {
template<typename Task>
using deque = work_stealing_deque<Task>;
}
}
}
}
#endif //PLS_DEQUE_H_
#ifndef PLS_LOCKING_DEQUE_H
#define PLS_LOCKING_DEQUE_H
#include <mutex>
#include "pls/internal/base/spin_lock.h"
#include "aligned_stack.h"
namespace pls {
namespace internal {
namespace scheduling {
namespace data_structures {
using deque_offset = aligned_stack::stack_offset;
/**
* Wraps any object into a deque Task.
*/
template<typename Task>
struct locking_deque_task {
Task *item_;
locking_deque_task *prev_;
locking_deque_task *next_;
};
template<typename Task, typename Content>
struct locking_deque_container : public locking_deque_task<Task> {
Content content_;
public:
template<typename ...ARGS>
explicit locking_deque_container(ARGS &&... args) : content_{std::forward<ARGS>(args)...} {}
};
/**
* A double linked list based deque.
* Storage is therefore only needed for the individual items.
*
* @tparam Task The type of Tasks stored in this deque
*/
template<typename Task>
class locking_deque {
aligned_stack *stack_;
locking_deque_task<Task> *head_;
locking_deque_task<Task> *tail_;
locking_deque_task<Task> *last_inserted_;
base::spin_lock lock_;
public:
explicit locking_deque(aligned_stack *stack)
: stack_{stack}, head_{nullptr}, tail_{nullptr}, lock_{} {}
template<typename T, typename ...ARGS>
T *push_task(ARGS &&... args);
template<typename T, typename ...ARGS>
T *push_object(ARGS &&... args);
void *push_bytes(size_t size);
void publish_last_task();
Task *pop_local_task(bool &cas_fail_out);
Task *pop_local_task();
Task *pop_external_task(bool &cas_fail_out);
Task *pop_external_task();
void reset_offset(deque_offset state);
deque_offset save_offset();
};
}
}
}
}
#include "locking_deque_impl.h"
#endif //PLS_LOCKING_DEQUE_H
#ifndef PLS_LOCKING_DEQUE_IMPL_H_
#define PLS_LOCKING_DEQUE_IMPL_H_
namespace pls {
namespace internal {
namespace scheduling {
namespace data_structures {
template<typename Task>
template<typename T, typename ...ARGS>
T *locking_deque<Task>::push_task(ARGS &&...args) {
static_assert(std::is_same<Task, T>::value || std::is_base_of<Task, T>::value,
"Must only push types of <Item> onto work_stealing_deque<Item>");
// Allocate object
auto deque_item = stack_->push<locking_deque_container<Task, T>>(std::forward<ARGS>(args)...);
deque_item->item_ = &deque_item->content_;
// Keep for later publishing
last_inserted_ = deque_item;
// ...actual data reference
return &deque_item->content_;
}
template<typename Task>
template<typename T, typename ...ARGS>
T *locking_deque<Task>::push_object(ARGS &&... args) {
// Simply add data to the stack, do not publish it in any way
return stack_->push<T>(std::forward(args)...);
}
template<typename Task>
void *locking_deque<Task>::push_bytes(size_t size) {
// Simply add data to the stack, do not publish it in any way
return stack_->push_bytes(size);
}
template<typename Task>
void locking_deque<Task>::publish_last_task() {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ != nullptr) {
tail_->next_ = last_inserted_;
} else {
head_ = last_inserted_;
}
last_inserted_->prev_ = tail_;
last_inserted_->next_ = nullptr;
tail_ = last_inserted_;
}
template<typename Task>
Task *locking_deque<Task>::pop_local_task() {
bool cas_fail_out;
return pop_local_task(cas_fail_out);
}
template<typename Task>
Task *locking_deque<Task>::pop_local_task(bool &cas_fail_out) {
std::lock_guard<base::spin_lock> lock{lock_};
cas_fail_out = false; // Can not fail CAS in locking implementation
if (tail_ == nullptr) {
return nullptr;
}
auto result = tail_;
tail_ = tail_->prev_;
if (tail_ == nullptr) {
head_ = nullptr;
} else {
tail_->next_ = nullptr;
}
return result->item_;
}
template<typename Task>
Task *locking_deque<Task>::pop_external_task() {
bool cas_fail_out;
return pop_external_task(cas_fail_out);
}
template<typename Task>
Task *locking_deque<Task>::pop_external_task(bool &cas_fail_out) {
std::lock_guard<base::spin_lock> lock{lock_};
cas_fail_out = false; // Can not fail CAS in locking implementation
if (head_ == nullptr) {
return nullptr;
}
auto result = head_;
head_ = head_->next_;
if (head_ == nullptr) {
tail_ = nullptr;
} else {
head_->prev_ = nullptr;
}
return result->item_;
}
template<typename Task>
void locking_deque<Task>::reset_offset(deque_offset state) {
stack_->reset_offset(state);
}
template<typename Task>
deque_offset locking_deque<Task>::save_offset() {
return stack_->save_offset();
}
}
}
}
}
#endif //PLS_LOCKING_DEQUE_IMPL_H_
#ifndef PLS_WORK_STEALING_DEQUE_H_
#define PLS_WORK_STEALING_DEQUE_H_
#include <atomic>
#include "pls/internal/base/error_handling.h"
#include "stamped_integer.h"
#include "aligned_stack.h"
namespace pls {
namespace internal {
namespace scheduling {
namespace data_structures {
using base::system_details::pointer_t;
// Integer split into two halfs, can be used in CAS operations
using data_structures::stamped_integer;
using deque_offset = stamped_integer::member_t;
// Single Item in the deque
class work_stealing_deque_item {
// TODO: In our opinion these atomic's are a pure formality to make the thread sanitizer happy,
// as the race occurs in 'pop_head', where ALL CASES reading a corrupt/old value are cases
// where the next CAS fails anywas, thus making these corrupted values have no influence on
// the overall program execution.
// ==> If we find performance problems in this queue, try removing the atomics again.
// Pointer to the actual data
std::atomic<pointer_t> data_;
// Index (relative to stack base) to the next and previous element
std::atomic<deque_offset> next_item_;
deque_offset previous_item_;
public:
work_stealing_deque_item() : data_{0}, next_item_{}, previous_item_{} {}
template<typename Item>
Item *data() {
return reinterpret_cast<Item *>(data_.load());
}
template<typename Item>
void set_data(Item *data) {
data_ = reinterpret_cast<pointer_t >(data);
}
deque_offset next_item() const { return next_item_.load(); }
void set_next_item(deque_offset next_item) { next_item_ = next_item; }
deque_offset previous_item() const { return previous_item_; }
void set_previous_item(deque_offset previous_item) { previous_item_ = previous_item; }
};
template<typename Task>
class work_stealing_deque {
// Deque 'takes over' stack and handles memory management while in use.
// At any point in time the deque can stop using more memory and the stack can be used by other entities.
aligned_stack *stack_;
std::atomic<stamped_integer> head_;
std::atomic<deque_offset> tail_;
deque_offset previous_tail_;
Task *last_pushed_task_;
public:
explicit work_stealing_deque(aligned_stack *stack) : stack_{stack},
head_{stamped_integer{0, 0}},
tail_{0},
previous_tail_{0},
last_pushed_task_{0} {}
template<typename T, typename ...ARGS>
T *push_task(ARGS &&... args);
template<typename T, typename ...ARGS>
T *push_object(ARGS &&... args);
void *push_bytes(size_t size);
void publish_last_task();
Task *pop_local_task(bool &cas_fail_out);
Task *pop_local_task();
Task *pop_external_task(bool &cas_fail_out);
Task *pop_external_task();
void reset_offset(deque_offset offset);
deque_offset save_offset();
private:
work_stealing_deque_item *item_at(deque_offset offset);
deque_offset current_stack_offset();
};
}
}
}
}
#include "work_stealing_deque_impl.h"
#endif //PLS_WORK_STEALING_DEQUE_H_
#ifndef PLS_WORK_STEALING_DEQUE_IMPL_H_
#define PLS_WORK_STEALING_DEQUE_IMPL_H_
#include <utility>
#include <new>
namespace pls {
namespace internal {
namespace scheduling {
namespace data_structures {
template<typename Task>
work_stealing_deque_item *work_stealing_deque<Task>::item_at(deque_offset offset) {
return reinterpret_cast<work_stealing_deque_item *>(stack_->memory_at_offset(offset));
}
template<typename Task>
deque_offset work_stealing_deque<Task>::current_stack_offset() {
return stack_->save_offset();
}
template<typename Task>
template<typename T, typename ...ARGS>
T *work_stealing_deque<Task>::push_task(ARGS &&... args) {
static_assert(std::is_same<Task, T>::value || std::is_base_of<Task, T>::value,
"Must only push types of <Task> onto work_stealing_deque<Task>");
// 'Union' type to push both the task and the deque entry as one part onto the stack
using pair_t = std::pair<work_stealing_deque_item, T>;
// Allocate space on stack
auto new_pair = reinterpret_cast<pair_t *>(stack_->push_bytes<pair_t>());
// Initialize memory on stack
new((void *) &(new_pair->first)) work_stealing_deque_item();
new((void *) &(new_pair->second)) T(std::forward<ARGS>(args)...);
// Keep reference for later publishing
last_pushed_task_ = &new_pair->second;
// Item is not publicly visible until it is published
return &(new_pair->second);
}
template<typename Task>
template<typename T, typename ...ARGS>
T *work_stealing_deque<Task>::push_object(ARGS &&... args) {
// Simply add data to the stack, do not publish it in any way
return stack_->push<T>(std::forward(args)...);
}
template<typename Task>
void *work_stealing_deque<Task>::push_bytes(size_t size) {
// Simply add data to the stack, do not publish it in any way
return stack_->push_bytes(size);
}
template<typename Task>
void work_stealing_deque<Task>::publish_last_task() {
deque_offset local_tail = tail_;
// Prepare current tail to point to correct next task
auto tail_deque_item = item_at(local_tail);
tail_deque_item->set_data(last_pushed_task_);
tail_deque_item->set_next_item(current_stack_offset());
tail_deque_item->set_previous_item(previous_tail_);
previous_tail_ = local_tail;
// Linearization point, task appears after this write
deque_offset new_tail = current_stack_offset();
tail_ = new_tail;
}
template<typename Task>
Task *work_stealing_deque<Task>::pop_local_task() {
bool cas_fail_out;
return pop_local_task(cas_fail_out);
}
template<typename Task>
Task *work_stealing_deque<Task>::pop_local_task(bool &cas_fail_out) {
deque_offset local_tail = tail_;
stamped_integer local_head = head_;
if (local_tail <= local_head.value) {
cas_fail_out = false;
return nullptr; // EMPTY
}
work_stealing_deque_item *previous_tail_item = item_at(previous_tail_);
deque_offset new_tail = previous_tail_;
previous_tail_ = previous_tail_item->previous_item();
// Publish our wish to set the tail back
tail_ = new_tail;
// Get the state of local head AFTER we published our wish
local_head = head_; // Linearization point, outside knows list is empty
if (local_head.value < new_tail) {
cas_fail_out = false;
return previous_tail_item->data<Task>(); // Success, enough distance to other threads
}
if (local_head.value == new_tail) {
stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail};
// Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) {
cas_fail_out = false;
return previous_tail_item->data<Task>(); // SUCCESS, we won the competition with other threads
}
}
// Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_
tail_ = local_head.value; // ...we give up to the other winning thread
cas_fail_out = false; // We failed the CAS race, but the queue is also empty for sure!
return nullptr; // EMPTY, we lost the competition with other threads
}
template<typename Task>
Task *work_stealing_deque<Task>::pop_external_task() {
bool cas_fail_out;
return pop_external_task(cas_fail_out);
}
template<typename Task>
Task *work_stealing_deque<Task>::pop_external_task(bool &cas_fail_out) {
stamped_integer local_head = head_;
deque_offset local_tail = tail_;
if (local_tail <= local_head.value) {
cas_fail_out = false;
return nullptr; // EMPTY
}
// Load info on current deque item.
// In case we have a race with a new (aba) overwritten item at this position,
// there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(local_head.value);
deque_offset next_item_offset = head_deque_item->next_item();
Task *head_data_item = head_deque_item->data<Task>();
// We try to set the head to this new position.
// Possible outcomes:
// 1) no one interrupted us, we win this competition
// 2) other thread took the head, we lose to this
// 3) owning thread removed tail, we lose to this
stamped_integer new_head = stamped_integer{local_head.stamp + 1, next_item_offset};
if (head_.compare_exchange_strong(local_head, new_head)) {
cas_fail_out = false;
return head_data_item; // SUCCESS, we won the competition
}
cas_fail_out = true;
return nullptr; // EMPTY, we lost the competition
}
template<typename Task>
void work_stealing_deque<Task>::reset_offset(deque_offset offset) {
stack_->reset_offset(offset);
stamped_integer local_head = head_;
deque_offset local_tail = tail_;
if (offset < local_tail) {
tail_ = offset;
if (local_head.value >= local_tail) {
head_ = stamped_integer{local_head.stamp + 1, offset};
}
}
}
template<typename Task>
deque_offset work_stealing_deque<Task>::save_offset() {
return current_stack_offset();
}
}
}
}
}
#endif //PLS_WORK_STEALING_DEQUE_IMPL_H_
......@@ -7,7 +7,7 @@
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h"
......
#ifndef PLS_SCHEDULER_MEMORY_H
#define PLS_SCHEDULER_MEMORY_H
#include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/base/thread.h"
#include <vector>
#include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
......@@ -13,98 +13,76 @@ namespace scheduling {
void worker_routine();
class scheduler_memory {
private:
size_t max_threads_;
thread_state **thread_states_;
base::thread **threads_;
data_structures::aligned_stack **task_stacks_;
protected:
void init(size_t max_therads,
thread_state **thread_states,
base::thread **threads,
data_structures::aligned_stack **task_stacks) {
max_threads_ = max_therads;
thread_states_ = thread_states;
threads_ = threads;
task_stacks_ = task_stacks;
}
// We first worried about performance of this being virtual.
// However, we decided that only thread_state_for is used during the
// runtime and that only when stealing. As stealing is expensive anyways,
// this should not add too much overhead.
public:
virtual size_t max_threads() const = 0;
virtual base::thread &thread_for(size_t id) const = 0;
virtual thread_state &thread_state_for(size_t id) const = 0;
};
template<size_t MAX_THREADS, size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
class static_scheduler_memory : public scheduler_memory {
public:
size_t max_threads() const {
return max_threads_;
}
thread_state *thread_state_for(size_t id) const {
return thread_states_[id];
size_t max_threads() const override {
return MAX_THREADS;
}
base::thread *thread_for(size_t id) const {
base::thread &thread_for(size_t id) const override {
return threads_[id];
}
data_structures::aligned_stack *task_stack_for(size_t id) const {
return task_stacks_[id];
thread_state &thread_state_for(size_t id) const override {
return thread_states_[id];
}
};
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
class static_scheduler_memory : public scheduler_memory {
// Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<base::thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_thread_stack = base::alignment::aligned_wrapper<std::array<char, TASK_STACK_SIZE>>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
// Actual Memory
std::array<aligned_thread, MAX_THREADS> threads_;
std::array<aligned_thread_state, MAX_THREADS> thread_states_;
std::array<aligned_thread_stack, MAX_THREADS> task_stacks_memory_;
std::array<aligned_aligned_stack, MAX_THREADS> task_stacks_;
// References for parent
std::array<base::thread *, MAX_THREADS> thread_refs_;
std::array<thread_state *, MAX_THREADS> thread_state_refs_;
std::array<data_structures::aligned_stack *, MAX_THREADS> task_stack_refs_;
private:
using thread_state_type = static_thread_state<NUM_TASKS, MAX_TASK_STACK_SIZE, NUM_CONTS, MAX_CONT_SIZE>;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<base::thread, MAX_THREADS> threads_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state_type, MAX_THREADS> thread_states_;
};
template<size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
class heap_scheduler_memory : public scheduler_memory {
public:
static_scheduler_memory() : scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(),
TASK_STACK_SIZE);
thread_refs_[i] = threads_[i].pointer();
thread_state_refs_[i] = thread_states_[i].pointer();
task_stack_refs_[i] = task_stacks_[i].pointer();
explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads},
thread_vector_{},
thread_state_vector_{} {
thread_vector_.reserve(max_threads);
thread_state_vector_.reserve(max_threads);
for (size_t i = 0; i < max_threads; i++) {
thread_vector_.emplace_back();
thread_state_vector_.emplace_back();
}
init(MAX_THREADS, thread_state_refs_.data(), thread_refs_.data(), task_stack_refs_.data());
}
};
class malloc_scheduler_memory : public scheduler_memory {
// Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<base::thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
size_t max_threads() const override {
return max_threads_;
}
const size_t num_threads_;
base::thread &thread_for(size_t id) const override {
return thread_vector_[id];
}
// Actual Memory
aligned_thread *threads_;
aligned_thread_state *thread_states_;
char **task_stacks_memory_;
aligned_aligned_stack *task_stacks_;
thread_state &thread_state_for(size_t id) const override {
return thread_state_vector_[id].object();
}
// References for parent
base::thread **thread_refs_;
thread_state **thread_state_refs_;
data_structures::aligned_stack **task_stack_refs_;
private:
using thread_state_type = static_thread_state<NUM_TASKS, MAX_TASK_STACK_SIZE, NUM_CONTS, MAX_CONT_SIZE>;
// thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require
// the new operator to obey alignments bigger than 16, cache lines are usually 64).
// To allow this object to be allocated using 'new' (which the vector does internally),
// we need to wrap it in an non aligned object.
using thread_state_wrapper = base::alignment::cache_alignment_wrapper<thread_state_type>;
public:
explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16);
~malloc_scheduler_memory();
size_t max_threads_;
std::vector<base::thread> thread_vector_;
std::vector<thread_state_wrapper> thread_state_vector_;
};
}
......
......@@ -4,9 +4,7 @@
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/scheduling/data_structures/deque.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
......
#ifndef PLS_TASK_MANAGER_H_
#define PLS_TASK_MANAGER_H_
#include <memory>
#include "pls/internal/data_structures/aligned_stack.h"
namespace pls {
namespace internal {
namespace scheduling {
class task_manager {
protected:
explicit task_manager(data_structures::aligned_stack &task_stack) : task_stack_{task_stack} {}
private:
data_structures::aligned_stack &task_stack_;
};
template<size_t NUM_TASKS, size_t MAX_STACK_SIZE>
class static_task_manager : public task_manager {
public:
static_task_manager() : task_manager{static_task_stack_} {};
private:
data_structures::static_aligned_stack<MAX_STACK_SIZE> static_task_stack_;
};
}
}
}
#endif //PLS_TASK_MANAGER_H_
......@@ -3,11 +3,13 @@
#define PLS_THREAD_STATE_H
#include <random>
#include <memory>
#include <array>
#include <chrono>
#include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/scheduling/data_structures/deque.h"
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/cont_manager.h"
namespace pls {
namespace internal {
......@@ -17,22 +19,27 @@ namespace scheduling {
class scheduler;
class task;
struct thread_state {
alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_;
struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
scheduler *scheduler_;
size_t id_;
task_manager &task_manager_;
cont_manager &cont_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_;
alignas(base::system_details::CACHE_LINE_SIZE) data_structures::deque<task> deque_;
alignas(base::system_details::CACHE_LINE_SIZE) size_t id_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) :
scheduler_{scheduler},
protected:
thread_state(task_manager &task_manager,
cont_manager &cont_manager) :
scheduler_{nullptr},
id_{0},
task_manager_{task_manager},
cont_manager_{cont_manager},
current_task_{nullptr},
task_stack_{task_stack},
deque_{task_stack_},
id_{id},
random_{id_} {}
random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {}
public:
/**
* Convenience helper to get the thread_state instance associated with this thread.
* Must only be called on threads that are associated with a thread_state,
......@@ -41,6 +48,25 @@ struct thread_state {
* @return The thread_state of this thread.
*/
static thread_state *get() { return base::this_thread::state<thread_state>(); }
// Do not allow move/copy operations.
// State is a pure memory container with references/pointers into it from all over the code.
// It should be allocated, used and de-allocated, nothing more.
thread_state(thread_state &&) = delete;
thread_state &operator=(thread_state &&) = delete;
thread_state(const thread_state &) = delete;
thread_state &operator=(const thread_state &) = delete;
};
template<size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
struct static_thread_state : public thread_state {
public:
static_thread_state() : thread_state{static_task_manager_, static_cont_manager_} {}
private:
static_task_manager<NUM_TASKS, MAX_TASK_STACK_SIZE> static_task_manager_;
static_cont_manager<NUM_CONTS, MAX_CONT_SIZE> static_cont_manager_;
};
}
......
......@@ -12,8 +12,6 @@
namespace pls {
using internal::scheduling::static_scheduler_memory;
using internal::scheduling::malloc_scheduler_memory;
using internal::scheduling::scheduler;
using unique_id = internal::helpers::unique_id;
......
#include "pls/internal/base/alignment.h"
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace base {
namespace alignment {
void *allocate_aligned(size_t size) {
return aligned_alloc(system_details::CACHE_LINE_SIZE, size);
}
system_details::pointer_t next_alignment(system_details::pointer_t size) {
system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
return size + (base::system_details::CACHE_LINE_SIZE - miss_alignment);
}
}
system_details::pointer_t previous_alignment(system_details::pointer_t size) {
system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
return size - miss_alignment;
}
}
char *next_alignment(char *pointer) {
return reinterpret_cast<char *>(next_alignment(reinterpret_cast<system_details::pointer_t >(pointer)));
}
}
}
}
}
......@@ -4,6 +4,31 @@ namespace pls {
namespace internal {
namespace base {
thread::thread() :
pthread_thread_{},
running_{false} {}
thread::~thread() {
if (running_) {
join();
}
}
thread::thread(thread &&other) noexcept :
pthread_thread_{other.pthread_thread_},
running_{other.running_} {
other.running_ = false;
}
thread &thread::operator=(thread &&other) noexcept {
this->pthread_thread_ = other.pthread_thread_;
this->running_ = other.running_;
other.running_ = false;
return *this;
}
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_key_t this_thread::local_storage_key_ = false;
bool this_thread::local_storage_key_initialized_;
......@@ -14,6 +39,7 @@ __thread void *this_thread::local_state_;
void thread::join() {
pthread_join(pthread_thread_, nullptr);
running_ = false;
}
}
......
#include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace scheduling {
namespace data_structures {
aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) :
aligned_memory_start_{base::alignment::next_alignment(memory_region)},
aligned_memory_end_{base::alignment::previous_alignment(memory_region + size)},
max_offset_{(aligned_memory_end_ - aligned_memory_start_) / base::system_details::CACHE_LINE_SIZE},
current_offset_{0} {}
aligned_stack::aligned_stack(char *memory_pointer, size_t size) :
memory_pointer_{memory_pointer}, // MUST be aligned
max_offset_{size / base::system_details::CACHE_LINE_SIZE},
current_offset_{0} {
PLS_ASSERT((pointer_t) memory_pointer_ % base::system_details::CACHE_LINE_SIZE != 0,
"Must initialize an aligned_stack with a properly aligned memory region!")
}
aligned_stack::aligned_stack(char *memory_region, const std::size_t size) :
aligned_stack((pointer_t) memory_region, size) {}
aligned_stack::aligned_stack(char *unaligned_memory_pointer, size_t size, size_t unaligned_size) :
unaligned_memory_pointer_{unaligned_memory_pointer},
memory_pointer_{base::alignment::next_alignment(unaligned_memory_pointer)},
max_offset_{unaligned_size / base::system_details::CACHE_LINE_SIZE} {
PLS_ASSERT(size == base::alignment::previous_alignment(unaligned_size),
"Initialized aligned stack with invalid memory configuration!")
}
void *aligned_stack::memory_at_offset(stack_offset offset) const {
const auto byte_offset = offset * base::system_details::CACHE_LINE_SIZE;
return reinterpret_cast<void *>(aligned_memory_start_ + byte_offset);
return reinterpret_cast<void *>(memory_pointer_ + byte_offset);
}
void *aligned_stack::push_bytes(size_t size) {
......@@ -28,9 +34,8 @@ void *aligned_stack::push_bytes(size_t size) {
// Move head to next aligned position after new object
current_offset_ += num_cache_lines;
if (current_offset_ > max_offset_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
PLS_ASSERT(current_offset_ > max_offset_,
"Tried to allocate object on alligned_stack without sufficient memory!");
return result;
}
......@@ -38,4 +43,3 @@ void *aligned_stack::push_bytes(size_t size) {
}
}
}
}
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/data_structures/deque.h"
#include "pls/internal/base/error_handling.h"
......@@ -21,7 +20,8 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads, b
for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized.
new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i),
memory_->cont_stack_for(i), i};
if (reuse_thread && i == 0) {
continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
......
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/data_structures/aligned_stack.h"
namespace pls {
namespace internal {
namespace scheduling {
malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const size_t memory_per_stack) :
num_threads_{num_threads} {
threads_ =
reinterpret_cast<aligned_thread *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread)));
thread_states_ = reinterpret_cast<aligned_thread_state *>(base::alignment::allocate_aligned(
num_threads * sizeof(aligned_thread_state)));
task_stacks_ = reinterpret_cast<aligned_aligned_stack *>(base::alignment::allocate_aligned(
num_threads * sizeof(aligned_aligned_stack)));
task_stacks_memory_ = reinterpret_cast<char **>(base::alignment::allocate_aligned(num_threads * sizeof(char *)));
thread_refs_ = static_cast<base::thread **>(malloc(num_threads * sizeof(base::thread *)));
thread_state_refs_ = static_cast<thread_state **>(malloc(num_threads * sizeof(thread_state *)));
task_stack_refs_ =
static_cast<data_structures::aligned_stack **>(malloc(num_threads * sizeof(data_structures::aligned_stack *)));
for (size_t i = 0; i < num_threads_; i++) {
task_stacks_memory_[i] = reinterpret_cast<char *>(base::alignment::allocate_aligned(memory_per_stack));
new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack);
thread_refs_[i] = threads_[i].pointer();
thread_state_refs_[i] = thread_states_[i].pointer();
task_stack_refs_[i] = task_stacks_[i].pointer();
}
init(num_threads, thread_state_refs_, thread_refs_, task_stack_refs_);
}
malloc_scheduler_memory::~malloc_scheduler_memory() {
free(threads_);
free(thread_states_);
for (size_t i = 0; i < num_threads_; i++) {
free(task_stacks_memory_[i]);
}
free(task_stacks_);
free(task_stacks_memory_);
free(thread_refs_);
free(thread_state_refs_);
free(task_stack_refs_);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment