Commit 927d8ac5 by FritzFlorian

Rework memory allocation to heap based RAII objects.

Remove the strict static memory allocation scheme in favour of placing objects on the heap at startup. This still keeps the requirements posed for modern, high performance embedded systems, but makes APIs a lot cleaner.
parent ce2632a1
Pipeline #1420 failed with stages
in 29 seconds
...@@ -4,6 +4,16 @@ The new version of pls uses a more complicated/less user friendly ...@@ -4,6 +4,16 @@ The new version of pls uses a more complicated/less user friendly
API in favor of performance and memory guarantees. API in favor of performance and memory guarantees.
For the old version refer to the second half of this document. For the old version refer to the second half of this document.
# 18.03.2020 - C++17 benefit, overaligned new
We have many cache line aligned types. Before C++17 they can
only be allocated with correct alignment using alignas(...)
when they are stored in static storage (e.g. global memory, stack memory).
As we move towards more modern RAII type resource management, the fact that
[C++ 17 supports alignment of arbitrary length, even in vectors](https://www.bfilipek.com/2019/08/newnew-align.html)
is extremely helpful. Before we needed wrappers to do aligned heap allocations.
# 18.03.2020 - Coding Standard/C++ Standard # 18.03.2020 - Coding Standard/C++ Standard
We previously stuck to strict 'static' allocation in global static We previously stuck to strict 'static' allocation in global static
......
...@@ -3,4 +3,4 @@ add_executable(playground ...@@ -3,4 +3,4 @@ add_executable(playground
main.cpp) main.cpp)
# Example for adding the library to your app (as a cmake project dependency) # Example for adding the library to your app (as a cmake project dependency)
target_link_libraries(playground context_switcher Threads::Threads) target_link_libraries(playground pls context_switcher Threads::Threads)
#include <utility> #include <sys/types.h>
#include <cstdio> #include <unistd.h>
#include <thread> #include <fstream>
#include <sstream>
#include <string>
#include "barrier.h" #include <mutex>
#include "tsan_support.h"
#include "context_switcher/context_switcher.h"
using namespace context_switcher;
using namespace std; using namespace std;
// Memory for custom stack and continuation semantics long count_memory_mappings() {
const size_t STACK_SIZE = 512 * 32; pid_t my_pid = getpid();
const size_t NUM_STACKS = 4; ifstream proc_file{"/proc/" + to_string(my_pid) + "/maps"};
char custom_stacks[NUM_STACKS][STACK_SIZE];
int main() {
context_switcher::continuation cont_t1, cont_main;
barrier bar{2};
int error = 0;
auto t1 = std::thread([&]() { string line;
while (true) { long line_count{0};
bar.wait(); while (getline(proc_file, line)) {
auto cont = enter_context(custom_stacks[0], STACK_SIZE, [&](continuation &&cont) { line_count++;
error++;
cont_t1 = std::move(cont);
bar.wait();
error++;
return std::move(cont_main);
});
} }
});
int count = 0; return line_count;
}
int main() {
mutex mut;
int count = 0;
while (true) { while (true) {
count++; printf("iteration: %d, mappings: %ld\n", count++, count_memory_mappings());
if (count % 100 == 0) { void *main_fiber = __tsan_get_current_fiber();
printf("%d\n", count); void *other_fiber = __tsan_create_fiber(0);
} __tsan_switch_to_fiber(other_fiber, 0);
bar.wait(); mut.lock();
auto cont = enter_context(custom_stacks[1], STACK_SIZE, [&](continuation &&cont) { mut.unlock();
error++; __tsan_switch_to_fiber(main_fiber, 0);
cont_main = std::move(cont); __tsan_destroy_fiber(other_fiber);
bar.wait();
error++;
return std::move(cont_t1);
});
} }
return 0; return 0;
......
#ifndef CONTEXT_SWITCHER_TSAN_SUPPORT
#define CONTEXT_SWITCHER_TSAN_SUPPORT
extern "C" {
// Fiber switching API.
// - TSAN context for fiber can be created by __tsan_create_fiber
// and freed by __tsan_destroy_fiber.
// - TSAN context of current fiber or thread can be obtained
// by calling __tsan_get_current_fiber.
// - __tsan_switch_to_fiber should be called immediatly before switch
// to fiber, such as call of swapcontext.
// - Fiber name can be set by __tsan_set_fiber_name.
void *__tsan_get_current_fiber(void);
void *__tsan_create_fiber(unsigned flags);
void __tsan_destroy_fiber(void *fiber);
void __tsan_switch_to_fiber(void *fiber, unsigned flags);
void __tsan_set_fiber_name(void *fiber, const char *name);
};
#endif //CONTEXT_SWITCHER_TSAN_SUPPORT
...@@ -4,17 +4,18 @@ add_library(pls STATIC ...@@ -4,17 +4,18 @@ add_library(pls STATIC
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp
include/pls/internal/base/swmr_spin_lock.h src/internal/base/swmr_spin_lock.cpp include/pls/internal/base/swmr_spin_lock.h src/internal/base/swmr_spin_lock.cpp
include/pls/internal/base/thread.h src/internal/base/thread.cpp
include/pls/internal/base/thread_impl.h
include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/barrier.h src/internal/base/barrier.cpp
include/pls/internal/base/system_details.h include/pls/internal/base/system_details.h
include/pls/internal/base/error_handling.h include/pls/internal/base/error_handling.h src/internal/base/error_handling.cpp
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/stamped_integer.h include/pls/internal/data_structures/stamped_integer.h
include/pls/internal/data_structures/delayed_initialization.h include/pls/internal/data_structures/delayed_initialization.h
include/pls/internal/data_structures/bounded_trading_deque.h
include/pls/internal/data_structures/bounded_ws_deque.h
include/pls/internal/data_structures/optional.h
include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/prohibit_new.h
include/pls/internal/helpers/profiler.h include/pls/internal/helpers/profiler.h
...@@ -24,23 +25,14 @@ add_library(pls STATIC ...@@ -24,23 +25,14 @@ add_library(pls STATIC
include/pls/internal/helpers/seqence.h include/pls/internal/helpers/seqence.h
include/pls/internal/helpers/member_function.h include/pls/internal/helpers/member_function.h
include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/scheduler_memory.h include/pls/internal/scheduling/task_manager.h src/internal/scheduling/task_manager.cpp
include/pls/internal/scheduling/task_manager.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/scheduling/external_trading_deque.h src/internal/scheduling/external_trading_deque.cpp
include/pls/internal/data_structures/optional.h
include/pls/internal/scheduling/thread_state_static.h
src/internal/base/error_handling.cpp
include/pls/internal/data_structures/bounded_trading_deque.h
include/pls/internal/scheduling/external_trading_deque.h
include/pls/internal/scheduling/traded_cas_field.h include/pls/internal/scheduling/traded_cas_field.h
include/pls/internal/scheduling/task_manager_impl.h include/pls/internal/scheduling/task_manager_impl.h)
include/pls/internal/scheduling/static_scheduler_memory.h
include/pls/internal/scheduling/heap_scheduler_memory.h
src/internal/scheduling/task_manager.cpp src/internal/scheduling/thread_state.cpp)
# Dependencies for pls # Dependencies for pls
target_link_libraries(pls Threads::Threads) target_link_libraries(pls Threads::Threads)
......
...@@ -16,9 +16,7 @@ ...@@ -16,9 +16,7 @@
#include <cstdint> #include <cstdint>
namespace pls { namespace pls::internal::base {
namespace internal {
namespace base {
/** /**
* Collection of system details, e.g. hardware cache line size. * Collection of system details, e.g. hardware cache line size.
...@@ -47,6 +45,11 @@ constexpr unsigned long CAS_SIZE = sizeof(cas_integer) * 8; ...@@ -47,6 +45,11 @@ constexpr unsigned long CAS_SIZE = sizeof(cas_integer) * 8;
constexpr pointer_t CACHE_LINE_SIZE = 64; constexpr pointer_t CACHE_LINE_SIZE = 64;
/** /**
* Helper to align types/values on cache lines.
*/
#define PLS_CACHE_ALIGN alignas(base::system_details::CACHE_LINE_SIZE)
/**
* Choose one of the following ways to store thread specific data. * Choose one of the following ways to store thread specific data.
* Try to choose the fastest available on this processor/system. * Try to choose the fastest available on this processor/system.
*/ */
...@@ -87,7 +90,5 @@ inline void relax_cpu() { ...@@ -87,7 +90,5 @@ inline void relax_cpu() {
} }
} }
}
}
#endif //PLS_SYSTEM_DETAILS_H #endif //PLS_SYSTEM_DETAILS_H
/**
* Abstraction for threading to allow porting.
* Currently using either pthread or C++ 11 threads.
*/
#ifndef PLS_THREAD_H
#define PLS_THREAD_H
#include <functional>
#include <pthread.h>
#include <atomic>
#include <ctime>
#include "system_details.h"
namespace pls {
namespace internal {
namespace base {
using thread_entrypoint = void();
/**
* Static methods than can be performed on the current thread.
*
* usage:
* this_thread::yield();
* T* state = this_thread::state<T>();
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
class this_thread {
friend
class thread;
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
static pthread_key_t local_storage_key_;
static bool local_storage_key_initialized_;
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
static __thread void *local_state_;
#endif
public:
static void yield() {
pthread_yield();
}
static void sleep(long microseconds) {
timespec time{0, 1000 * microseconds};
nanosleep(&time, nullptr);
}
/**
* Retrieves the local state pointer.
*
* @tparam T The type of the state that is stored.
* @return The state pointer hold for this thread.
*/
template<typename T>
static T *state();
/**
* Stores a pointer to the thread local state object.
* The memory management for this has to be done by the user,
* we only keep the pointer.
*
* @tparam T The type of the state that is stored.
* @param state_pointer A pointer to the threads state object.
*/
template<typename T>
static void set_state(T *state_pointer);
};
/**
* Abstraction for starting a function in a separate thread.
* Offers only threading functionality needed in this project,
* underlying implementation can be changed.
* Uses NO heap memory allocation.
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
class thread {
friend class this_thread;
// Keep handle to native implementation
pthread_t pthread_thread_;
bool running_;
template<typename Function, typename State>
static void *start_pthread_internal(void *thread_pointer);
public:
template<typename Function, typename State>
explicit thread(const Function &function, State *state_pointer);
template<typename Function>
explicit thread(const Function &function);
explicit thread();
~thread();
void join();
// make object move only
thread(thread &&) noexcept;
thread &operator=(thread &&) noexcept;
thread(const thread &) = delete;
thread &operator=(const thread &) = delete;
};
}
}
}
#include "thread_impl.h"
#endif //PLS_THREAD_H
#ifndef PLS_THREAD_IMPL_H
#define PLS_THREAD_IMPL_H
namespace pls {
namespace internal {
namespace base {
template<typename T>
T *this_thread::state() {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
return reinterpret_cast<T*>(pthread_getspecific(local_storage_key_));
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
return reinterpret_cast<T *>(local_state_);
#endif
}
template<typename T>
void this_thread::set_state(T *state_pointer) {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
local_state_ = state_pointer;
#endif
}
template<typename Function, typename State>
struct thread_arguments {
Function function_;
State *state_;
std::atomic_flag *startup_flag_;
};
template<typename Function, typename State>
void *thread::start_pthread_internal(void *thread_pointer) {
// Actively copy all arguments into stack memory.
thread_arguments<Function, State>
arguments_copy = *reinterpret_cast<thread_arguments<Function, State> *>(thread_pointer);
// Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more
// references to its memory location).
arguments_copy.startup_flag_->clear();
this_thread::set_state(arguments_copy.state_);
arguments_copy.function_();
// Finished executing the user function
pthread_exit(nullptr);
}
template<typename Function, typename State>
thread::thread(const Function &function, State *state_pointer):
pthread_thread_{},
running_{true} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
if (!this_thread::local_storage_key_initialized_) {
pthread_key_create(&this_thread::local_storage_key_, nullptr);
this_thread::local_storage_key_initialized_ = true;
}
#endif
// Wee need to wait for the started function to read
// the function_ and state_pointer_ property before returning
// from the constructor, as the object might be moved after this.
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
thread_arguments<Function, State> arguments{function, state_pointer, &startup_flag};
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal < Function, State > , (void *) (&arguments));
while (startup_flag.test_and_set()); // Busy waiting for the starting flag to clear
}
template<typename Function>
thread::thread(const Function &function): thread{function, (void *) nullptr} {}
}
}
}
#endif //PLS_THREAD_IMPL_H
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
#include <atomic> #include <atomic>
#include <utility> #include <utility>
#include <tuple> #include <vector>
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
...@@ -15,9 +15,7 @@ ...@@ -15,9 +15,7 @@
#include "pls/internal/scheduling/traded_cas_field.h" #include "pls/internal/scheduling/traded_cas_field.h"
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
namespace pls { namespace pls::internal::scheduling {
namespace internal {
namespace scheduling {
using namespace data_structures; using namespace data_structures;
...@@ -38,35 +36,11 @@ struct trading_deque_entry { ...@@ -38,35 +36,11 @@ struct trading_deque_entry {
* As each task is associated with memory this suffices to exchange memory blocks needed for execution. * As each task is associated with memory this suffices to exchange memory blocks needed for execution.
*/ */
class external_trading_deque { class external_trading_deque {
public: public:
external_trading_deque(trading_deque_entry *entries, size_t num_entries) : external_trading_deque(unsigned thread_id, size_t num_entries) : thread_id_(thread_id), entries_(num_entries) {}
entries_{entries}, num_entries_{num_entries} {};
void set_thread_id(unsigned id) { static optional<task *> peek_traded_object(task *target_task);
thread_id_ = id; static optional<task *> get_trade_object(task *target_task);
}
static optional<task *> peek_traded_object(task *target_task) {
traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
if (current_cas.is_filled_with_object()) {
return optional<task *>{current_cas.get_trade_object()};
} else {
return optional<task *>{};
}
}
static optional<task *> get_trade_object(task *target_task) {
traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
if (current_cas.is_filled_with_object()) {
task *result = current_cas.get_trade_object();
traded_cas_field empty_cas;
if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) {
return optional<task *>{result};
}
}
return optional<task *>{};
}
/** /**
* Pushes a task on the bottom of the deque. * Pushes a task on the bottom of the deque.
...@@ -74,69 +48,14 @@ class external_trading_deque { ...@@ -74,69 +48,14 @@ class external_trading_deque {
* *
* @param published_task The task to publish on the bottom of the deque. * @param published_task The task to publish on the bottom of the deque.
*/ */
void push_bot(task *published_task) { void push_bot(task *published_task);
auto expected_stamp = bot_internal_.stamp;
auto &current_entry = entries_[bot_internal_.value];
// Publish the prepared task in the deque.
current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed);
current_entry.traded_task_.store(published_task, std::memory_order_relaxed);
// Field that all threads synchronize on.
// This happens not in the deque itself, but in the published task.
traded_cas_field sync_cas_field;
sync_cas_field.fill_with_stamp(expected_stamp, thread_id_);
published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release);
// Advance the bot pointer. Linearization point for making the task public.
bot_internal_.stamp++;
bot_internal_.value++;
bot_.store(bot_internal_.value, std::memory_order_release);
}
void reset_bot_and_top() {
bot_internal_.value = 0;
bot_internal_.stamp++;
bot_.store(0);
top_.store({bot_internal_.stamp, 0});
}
void decrease_bot() {
bot_internal_.value--;
bot_.store(bot_internal_.value, std::memory_order_relaxed);
}
/** /**
* Tries to pop the last task on the deque. * Tries to pop the last task on the deque.
* *
* @return optional<task*> holding the popped task if successful. * @return optional<task*> holding the popped task if successful.
*/ */
optional<task *> pop_bot() { optional<task *> pop_bot();
if (bot_internal_.value == 0) {
reset_bot_and_top();
return optional<task *>{};
}
decrease_bot();
auto &current_entry = entries_[bot_internal_.value];
auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed);
auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed);
// We know what value must be in the cas field if no other thread stole it.
traded_cas_field expected_sync_cas_field;
expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_);
traded_cas_field empty_cas_field;
if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field,
empty_cas_field,
std::memory_order_acq_rel)) {
return optional<task *>{popped_task};
} else {
reset_bot_and_top();
return optional<task *>{};
}
}
struct peek_result { struct peek_result {
peek_result(optional<task *> top_task, stamped_integer top_pointer) : top_task_{std::move(top_task)}, peek_result(optional<task *> top_task, stamped_integer top_pointer) : top_task_{std::move(top_task)},
...@@ -153,82 +72,35 @@ class external_trading_deque { ...@@ -153,82 +72,35 @@ class external_trading_deque {
* *
* @return a peek result containing the optional top task (if present) and the current head pointer. * @return a peek result containing the optional top task (if present) and the current head pointer.
*/ */
peek_result peek_top() { peek_result peek_top();
auto local_top = top_.load();
auto local_bot = bot_.load(); /**
* Tries to pop the task on top of the deque that was
if (local_top.value < local_bot) { * previously observed by 'peeking' at the deque.
return peek_result{optional<task *>{entries_[local_top.value].traded_task_}, local_top}; *
} else { * Returns the task if successful, returns nothing if
return peek_result{optional<task *>{}, local_top}; * either the peeked task is no longer at the top of the deque
} * or another thread interfered and 'won' the task.
} *
* @return optional<task*> holding the popped task if successful.
optional<task *> pop_top(task *offered_task, stamped_integer expected_top) { */
auto local_bot = bot_.load(); optional<task *> pop_top(task *offered_task, peek_result peek_result);
if (expected_top.value >= local_bot) {
return data_structures::optional<task *>{};
}
auto &target_entry = entries_[expected_top.value];
// Read our potential result
task *result = target_entry.traded_task_.load();
unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load();
// Try to get it by CAS with the expected field entry, giving up our offered_task for it
traded_cas_field expected_sync_cas_field;
expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_);
traded_cas_field offered_field;
offered_field.fill_with_trade_object(offered_task);
if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) {
// We got it, for sure move the top pointer forward.
top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1});
// Return the stolen task
return data_structures::optional<task *>{result};
} else {
// We did not get it...help forwarding the top pointer anyway.
if (expected_top.stamp == forwarding_stamp) {
// ...move the pointer forward if someone else put a valid trade object in there.
top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1});
} else {
// ...we failed because the top tag lags behind...try to fix it.
// This means only updating the tag, as this location can still hold data we need.
top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value});
}
return data_structures::optional<task *>{};
}
}
private: private:
void reset_bot_and_top();
void decrease_bot();
// info on this deque // info on this deque
trading_deque_entry *const entries_;
const size_t num_entries_;
unsigned thread_id_; unsigned thread_id_;
std::vector<trading_deque_entry> entries_;
// fields for stealing/interacting // fields for stealing/interacting
stamped_integer bot_internal_{0, 0}; stamped_integer bot_internal_{0, 0};
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<stamped_integer> top_{{0, 0}}; PLS_CACHE_ALIGN std::atomic<stamped_integer> top_{{0, 0}};
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<size_t> bot_{0}; PLS_CACHE_ALIGN std::atomic<size_t> bot_{0};
};
template<size_t SIZE>
class static_external_trading_deque {
public:
static_external_trading_deque() : items_{}, deque_{items_.data(), SIZE} {}
external_trading_deque &get_deque() { return deque_; }
private:
std::array<trading_deque_entry, SIZE> items_;
external_trading_deque deque_;
}; };
} }
}
}
#endif //PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ #endif //PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_
...@@ -3,18 +3,15 @@ ...@@ -3,18 +3,15 @@
#define PLS_SCHEDULER_H #define PLS_SCHEDULER_H
#include <atomic> #include <atomic>
#include <thread>
#include <vector>
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h" #include "pls/internal/base/barrier.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task_manager.h"
namespace pls { namespace pls::internal::scheduling {
namespace internal {
namespace scheduling {
struct task; struct task;
...@@ -29,16 +26,20 @@ class scheduler { ...@@ -29,16 +26,20 @@ class scheduler {
public: public:
/** /**
* Initializes a scheduler instance with the given number of threads. * Initializes a scheduler instance with the given number of threads.
* This will spawn the threads and put them to sleep, ready to process an * This will allocate ALL runtime resources, spawn the worker threads
* upcoming parallel section. * and put them to sleep, ready to process an upcoming parallel section.
*
* The initialization should be seen as a heavy and not very predictable operation.
* After it is done the scheduler must (if configured correctly) never run out of resources
* and deliver tight time bounds of randomized work-stealing.
* *
* @param memory All memory is allocated statically, thus the user is required to provide the memory instance.
* @param num_threads The number of worker threads to be created. * @param num_threads The number of worker threads to be created.
*/ */
explicit scheduler(scheduler_memory &memory, unsigned int num_threads, bool reuse_thread = true); explicit scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread = true);
/** /**
* The scheduler is implicitly terminated as soon as it leaves the scope. * The scheduler is implicitly terminated as soon as it leaves the scope.
* Resources follow a clean RAII style.
*/ */
~scheduler(); ~scheduler();
...@@ -53,18 +54,32 @@ class scheduler { ...@@ -53,18 +54,32 @@ class scheduler {
template<typename Function> template<typename Function>
void perform_work(Function work_section); void perform_work(Function work_section);
/**
* Main parallelism construct, spawns a function for potential parallel execution.
*
* The result of the spawned function must not be relied on until sync() is called.
* Best see the lambda as if executed on a thread, e.g. it can cause race conditions
* and it is only finished after you join it back into the parent thread using sync().
*
* @param lambda the lambda to be executed in parallel.
*/
template<typename Function> template<typename Function>
static void spawn(Function &&lambda); static void spawn(Function &&lambda);
/**
* Waits for all potentially parallel child tasks created with spawn(...).
*/
static void sync(); static void sync();
thread_state &thread_state_for(size_t id); thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; }
task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; }
/** /**
* Explicitly terminate the worker threads. Scheduler must not be used after this. * Explicitly terminate the worker threads. Scheduler must not be used after this.
*/ */
void terminate(); void terminate();
unsigned int num_threads() const { return num_threads_; } [[nodiscard]] unsigned int num_threads() const { return num_threads_; }
private: private:
static void work_thread_main_loop(); static void work_thread_main_loop();
...@@ -72,10 +87,12 @@ class scheduler { ...@@ -72,10 +87,12 @@ class scheduler {
const unsigned int num_threads_; const unsigned int num_threads_;
const bool reuse_thread_; const bool reuse_thread_;
scheduler_memory &memory_;
base::barrier sync_barrier_; base::barrier sync_barrier_;
std::vector<std::thread> worker_threads_;
std::vector<std::unique_ptr<task_manager>> task_managers_;
std::vector<std::unique_ptr<thread_state>> thread_states_;
class init_function; class init_function;
template<typename F> template<typename F>
class init_function_impl; class init_function_impl;
...@@ -84,11 +101,12 @@ class scheduler { ...@@ -84,11 +101,12 @@ class scheduler {
std::atomic<bool> work_section_done_; std::atomic<bool> work_section_done_;
bool terminated_; bool terminated_;
// TODO: remove this into a public wrapper class with templating
heap_stack_allocator stack_allocator_{};
}; };
} }
}
}
#include "scheduler_impl.h" #include "scheduler_impl.h"
#endif //PLS_SCHEDULER_H #endif //PLS_SCHEDULER_H
...@@ -12,9 +12,7 @@ ...@@ -12,9 +12,7 @@
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
namespace pls { namespace pls::internal::scheduling {
namespace internal {
namespace scheduling {
class scheduler::init_function { class scheduler::init_function {
public: public:
...@@ -47,8 +45,8 @@ void scheduler::perform_work(Function work_section) { ...@@ -47,8 +45,8 @@ void scheduler::perform_work(Function work_section) {
work_section_done_ = false; work_section_done_ = false;
if (reuse_thread_) { if (reuse_thread_) {
auto &my_state = memory_.thread_state_for(0); auto &my_state = thread_state_for(0);
base::this_thread::set_state(&my_state); // Make THIS THREAD become the main worker thread_state::set(&my_state); // Make THIS THREAD become the main worker
sync_barrier_.wait(); // Trigger threads to wake up sync_barrier_.wait(); // Trigger threads to wake up
work_thread_work_section(); // Simply also perform the work section on the main loop work_thread_work_section(); // Simply also perform the work section on the main loop
...@@ -66,7 +64,5 @@ void scheduler::spawn(Function &&lambda) { ...@@ -66,7 +64,5 @@ void scheduler::spawn(Function &&lambda) {
} }
} }
}
}
#endif //PLS_SCHEDULER_IMPL_H #endif //PLS_SCHEDULER_IMPL_H
#ifndef PLS_SCHEDULER_MEMORY_H
#define PLS_SCHEDULER_MEMORY_H
#include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
// TODO: This way to handle memory is kind of a mess. We reworked it once...maybe it needs a second visit
// especially with the new 'stagered stack' (mmap allocs with page faults would be nice).
class scheduler_memory {
// Note: scheduler_memory is a pure interface and has no data.
// By not having an initialization routine we can do our 'static and heap specialization'
// without running into any ordering problems in the initialization sequence.
protected:
thread_state **thread_states_array_{nullptr};
public:
virtual size_t max_threads() const = 0;
virtual base::thread &thread_for(size_t id) = 0;
thread_state &thread_state_for(size_t id) {
return *thread_states_array_[id];
}
};
}
}
}
#endif //PLS_SCHEDULER_MEMORY_H
#ifndef PLS_STATIC_SCHEDULER_MEMORY_H
#define PLS_STATIC_SCHEDULER_MEMORY_H
#include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/thread_state_static.h"
namespace pls {
namespace internal {
namespace scheduling {
template<size_t MAX_THREADS, size_t NUM_TASKS, size_t STACK_SIZE>
class static_scheduler_memory : public scheduler_memory {
public:
static_scheduler_memory() : scheduler_memory{} {
for (size_t i = 0; i < MAX_THREADS; i++) {
thread_state_pointers_[i] = &thread_states_[i].get_thread_state();
}
thread_states_array_ = thread_state_pointers_.data();
}
size_t max_threads() const override {
return MAX_THREADS;
}
base::thread &thread_for(size_t id) override {
return threads_[id];
}
private:
using thread_state_type = thread_state_static<NUM_TASKS, STACK_SIZE>;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<base::thread, MAX_THREADS> threads_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state_type, MAX_THREADS> thread_states_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state *, MAX_THREADS> thread_state_pointers_;
};
}
}
}
#endif // PLS_STATIC_SCHEDULER_MEMORY_H
...@@ -11,10 +11,7 @@ ...@@ -11,10 +11,7 @@
#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/traded_cas_field.h" #include "pls/internal/scheduling/traded_cas_field.h"
namespace pls { namespace pls::internal::scheduling {
namespace internal {
namespace scheduling {
/** /**
* A task is the smallest unit of execution seen by the runtime system. * A task is the smallest unit of execution seen by the runtime system.
* *
...@@ -27,16 +24,23 @@ namespace scheduling { ...@@ -27,16 +24,23 @@ namespace scheduling {
* - running (currently executing user code) * - running (currently executing user code)
* - suspended (suspended by switching to a different task). * - suspended (suspended by switching to a different task).
*/ */
struct alignas(base::system_details::CACHE_LINE_SIZE) task { struct PLS_CACHE_ALIGN task {
void init(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) { task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) :
stack_memory_ = stack_memory; stack_memory_{stack_memory},
stack_size_ = stack_size; stack_size_{stack_size},
is_synchronized_{false},
depth_ = depth; depth_{depth},
thread_id_ = thread_id; thread_id_{thread_id},
prev_{nullptr},
next_{nullptr} {}
is_synchronized_ = false; // Do not allow accidental copy/move operations.
} // The whole runtime relies on tasks never changing memory positions during execution.
// Create tasks ONCE and use them until the runtime is shut down.
task(const task &other) = delete;
task(task &&other) = delete;
task &operator=(const task &other) = delete;
task &operator=(task &&other) = delete;
template<typename F> template<typename F>
context_switcher::continuation run_as_task(F &&lambda) { context_switcher::continuation run_as_task(F &&lambda) {
...@@ -46,7 +50,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { ...@@ -46,7 +50,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task {
// TODO: Proper access control and split it up into responsibilities // TODO: Proper access control and split it up into responsibilities
// Stack/Continuation Management // Stack/Continuation Management
char *stack_memory_; char *stack_memory_;
size_t stack_size_; // TODO: maybe remove it, not needed in here size_t stack_size_;
context_switcher::continuation continuation_; context_switcher::continuation continuation_;
bool is_synchronized_; bool is_synchronized_;
...@@ -66,7 +70,5 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { ...@@ -66,7 +70,5 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task {
}; };
} }
}
}
#endif //PLS_TASK_H #endif //PLS_TASK_H
...@@ -11,9 +11,19 @@ ...@@ -11,9 +11,19 @@
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
namespace pls { namespace pls::internal::scheduling {
namespace internal {
namespace scheduling { class stack_allocator {
public:
virtual char *allocate_stack(size_t size) = 0;
virtual void free_stack(size_t size, char *stack) = 0;
};
class heap_stack_allocator : public stack_allocator {
public:
char *allocate_stack(size_t size) override { return new char[size]; }
void free_stack(size_t, char *stack) override { delete[] stack; }
};
/** /**
* Handles management of tasks in the system. Each thread has a local task manager, * Handles management of tasks in the system. Each thread has a local task manager,
...@@ -21,24 +31,17 @@ namespace scheduling { ...@@ -21,24 +31,17 @@ namespace scheduling {
*/ */
class task_manager { class task_manager {
public: public:
explicit task_manager(task *tasks, explicit task_manager(unsigned thread_id,
data_structures::aligned_stack static_stack_space,
size_t num_tasks, size_t num_tasks,
size_t stack_size, size_t stack_size,
external_trading_deque &deque); stack_allocator &stack_allocator);
~task_manager();
void push_resource_on_task(task *target_task, task *spare_task_chain); void push_resource_on_task(task *target_task, task *spare_task_chain);
task *pop_resource_from_task(task *target_task); task *pop_resource_from_task(task *target_task);
task *get_this_thread_task(size_t depth) { task &get_this_thread_task(size_t depth) {
return &this_thread_tasks_[depth]; return *tasks_[depth];
}
void set_thread_id(unsigned id) {
for (size_t i = 0; i < num_tasks_; i++) {
this_thread_tasks_[i].thread_id_ = id;
}
deque_.set_thread_id(id);
} }
task &get_active_task() { task &get_active_task() {
...@@ -63,33 +66,13 @@ class task_manager { ...@@ -63,33 +66,13 @@ class task_manager {
private: private:
size_t num_tasks_; size_t num_tasks_;
task *this_thread_tasks_; stack_allocator &stack_allocator_;
std::vector<std::unique_ptr<task>> tasks_;
task *active_task_; task *active_task_;
external_trading_deque &deque_; external_trading_deque deque_;
}; };
template<size_t NUM_TASKS, size_t STACK_SIZE>
class static_task_manager {
public:
static_task_manager()
: tasks_{},
static_stack_storage_{},
static_external_trading_deque_{},
task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE,
static_external_trading_deque_.get_deque()} {};
task_manager &get_task_manager() { return task_manager_; }
private:
std::array<task, NUM_TASKS> tasks_;
data_structures::static_aligned_stack<NUM_TASKS * STACK_SIZE> static_stack_storage_;
static_external_trading_deque<NUM_TASKS> static_external_trading_deque_;
task_manager task_manager_;
};
}
}
} }
#include "task_manager_impl.h" #include "task_manager_impl.h"
......
...@@ -8,13 +8,10 @@ ...@@ -8,13 +8,10 @@
#include "context_switcher/continuation.h" #include "context_switcher/continuation.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
namespace pls { namespace pls::internal::scheduling {
namespace internal {
namespace scheduling {
template<typename F> template<typename F>
void task_manager::spawn_child(F &&lambda) { void task_manager::spawn_child(F &&lambda) {
...@@ -77,7 +74,5 @@ void task_manager::spawn_child(F &&lambda) { ...@@ -77,7 +74,5 @@ void task_manager::spawn_child(F &&lambda) {
} }
} }
}
}
#endif //PLS_TASK_MANAGER_IMPL_H_ #endif //PLS_TASK_MANAGER_IMPL_H_
...@@ -6,75 +6,71 @@ ...@@ -6,75 +6,71 @@
#include <chrono> #include <chrono>
#include <utility> #include <utility>
#include "pls/internal/base/system_details.h"
#include "context_switcher/continuation.h" #include "context_switcher/continuation.h"
namespace pls { #include "pls/internal/base/system_details.h"
namespace internal {
namespace scheduling { namespace pls::internal::scheduling {
class scheduler; class scheduler;
class task_manager; class task_manager;
struct task;
struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { /**
* Proxy-Object for thread local state needed during scheduling.
* The main use is to perform thread_state::get() as a thread local
* memory to identify the current worker thread state.
*
* Holds only minimal data by itself and points to the appropriate scheduler
* and task manager objects associated with this thread.
*/
struct PLS_CACHE_ALIGN thread_state {
private: private:
scheduler *scheduler_; const unsigned thread_id_;
unsigned id_; scheduler &scheduler_;
task_manager &task_manager_; task_manager &task_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) context_switcher::continuation main_loop_continuation_; PLS_CACHE_ALIGN context_switcher::continuation main_loop_continuation_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; PLS_CACHE_ALIGN std::minstd_rand random_;
public: public:
explicit thread_state(task_manager &task_manager) : explicit thread_state(scheduler &scheduler,
scheduler_{nullptr}, unsigned thread_id,
id_{0}, task_manager &task_manager) :
thread_id_{thread_id},
scheduler_{scheduler},
task_manager_{task_manager}, task_manager_{task_manager},
random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {}; random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {};
// Do not allow accidental copy/move operations.
thread_state(const thread_state &) = delete;
thread_state(thread_state &&) = delete;
thread_state &operator=(const thread_state &) = delete;
thread_state &operator=(thread_state &&) = delete;
/** /**
* Convenience helper to get the thread_state instance associated with this thread. * Convenience helper to get the thread_state instance associated with this thread.
* Must only be called on threads that are associated with a thread_state, * Must only be called on threads that are associated with a thread_state,
* this will most likely be threads created by the scheduler. * this will most likely be threads created by the scheduler.
* *
* Each call is guaranteed to be a new lockup, i.e. it is not cached after fiber context switches. * Each call is guaranteed to be a new lookup, i.e. it is not cached after fiber context switches.
* *
* @return The thread_state of this thread. * @return The thread_state of this thread.
*/ */
static thread_state &PLS_NOINLINE get(); [[nodiscard]] static thread_state &PLS_NOINLINE get();
static void set(thread_state *);
unsigned get_id() { return id_; } [[nodiscard]] unsigned get_thread_id() const { return thread_id_; }
void set_id(unsigned id) { [[nodiscard]] task_manager &get_task_manager() { return task_manager_; }
id_ = id; [[nodiscard]] scheduler &get_scheduler() { return scheduler_; }
} [[nodiscard]] long get_rand() {
task_manager &get_task_manager() { return task_manager_; }
scheduler &get_scheduler() { return *scheduler_; }
void set_scheduler(scheduler *scheduler) {
scheduler_ = scheduler;
}
long get_rand() {
return random_(); return random_();
} }
context_switcher::continuation &main_continuation() { [[nodiscard]] context_switcher::continuation &main_continuation() {
return main_loop_continuation_; return main_loop_continuation_;
} }
// Do not allow move/copy operations.
// State is a pure memory container with references/pointers into it from all over the code.
// It should be allocated, used and de-allocated, nothing more.
thread_state(thread_state &&) = delete;
thread_state &operator=(thread_state &&) = delete;
thread_state(const thread_state &) = delete;
thread_state &operator=(const thread_state &) = delete;
}; };
} }
}
}
#endif //PLS_THREAD_STATE_H #endif //PLS_THREAD_STATE_H
#ifndef PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_
#define PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/base/system_details.h"
#include "thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
template<size_t NUM_TASKS, size_t STACK_SIZE>
struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state_static {
public:
thread_state_static()
: static_task_manager_{},
thread_state_{static_task_manager_.get_task_manager()} {}
thread_state &get_thread_state() { return thread_state_; }
private:
alignas(base::system_details::CACHE_LINE_SIZE) static_task_manager<NUM_TASKS, STACK_SIZE> static_task_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) thread_state thread_state_;
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_
...@@ -7,9 +7,7 @@ ...@@ -7,9 +7,7 @@
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
namespace pls { namespace pls::internal::scheduling {
namespace internal {
namespace scheduling {
struct task; struct task;
struct traded_cas_field { struct traded_cas_field {
...@@ -82,7 +80,5 @@ struct traded_cas_field { ...@@ -82,7 +80,5 @@ struct traded_cas_field {
}; };
} }
}
}
#endif //PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ #endif //PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_
#include "pls/internal/base/thread.h"
namespace pls {
namespace internal {
namespace base {
thread::thread() :
pthread_thread_{},
running_{false} {}
thread::~thread() {
if (running_) {
join();
}
}
thread::thread(thread &&other) noexcept :
pthread_thread_{other.pthread_thread_},
running_{other.running_} {
other.running_ = false;
}
thread &thread::operator=(thread &&other) noexcept {
this->pthread_thread_ = other.pthread_thread_;
this->running_ = other.running_;
other.running_ = false;
return *this;
}
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_key_t this_thread::local_storage_key_ = false;
bool this_thread::local_storage_key_initialized_;
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
__thread void *this_thread::local_state_;
#endif
void thread::join() {
pthread_join(pthread_thread_, nullptr);
running_ = false;
}
}
}
}
#include "pls/internal/scheduling/external_trading_deque.h"
namespace pls::internal::scheduling {
optional<task *> external_trading_deque::peek_traded_object(task *target_task) {
traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
if (current_cas.is_filled_with_object()) {
return optional<task *>{current_cas.get_trade_object()};
} else {
return optional<task *>{};
}
}
optional<task *> external_trading_deque::get_trade_object(task *target_task) {
traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
if (current_cas.is_filled_with_object()) {
task *result = current_cas.get_trade_object();
traded_cas_field empty_cas;
if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) {
return optional<task *>{result};
}
}
return optional<task *>{};
}
void external_trading_deque::push_bot(task *published_task) {
auto expected_stamp = bot_internal_.stamp;
auto &current_entry = entries_[bot_internal_.value];
// Publish the prepared task in the deque.
current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed);
current_entry.traded_task_.store(published_task, std::memory_order_relaxed);
// Field that all threads synchronize on.
// This happens not in the deque itself, but in the published task.
traded_cas_field sync_cas_field;
sync_cas_field.fill_with_stamp(expected_stamp, thread_id_);
published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release);
// Advance the bot pointer. Linearization point for making the task public.
bot_internal_.stamp++;
bot_internal_.value++;
bot_.store(bot_internal_.value, std::memory_order_release);
}
void external_trading_deque::reset_bot_and_top() {
bot_internal_.value = 0;
bot_internal_.stamp++;
bot_.store(0);
top_.store({bot_internal_.stamp, 0});
}
void external_trading_deque::decrease_bot() {
bot_internal_.value--;
bot_.store(bot_internal_.value, std::memory_order_relaxed);
}
optional<task *> external_trading_deque::pop_bot() {
if (bot_internal_.value == 0) {
reset_bot_and_top();
return optional<task *>{};
}
decrease_bot();
auto &current_entry = entries_[bot_internal_.value];
auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed);
auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed);
// We know what value must be in the cas field if no other thread stole it.
traded_cas_field expected_sync_cas_field;
expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_);
traded_cas_field empty_cas_field;
if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field,
empty_cas_field,
std::memory_order_acq_rel)) {
return optional<task *>{popped_task};
} else {
reset_bot_and_top();
return optional<task *>{};
}
}
external_trading_deque::peek_result external_trading_deque::peek_top() {
auto local_top = top_.load();
auto local_bot = bot_.load();
if (local_top.value < local_bot) {
return peek_result{optional<task *>{entries_[local_top.value].traded_task_}, local_top};
} else {
return peek_result{optional<task *>{}, local_top};
}
}
optional<task *> external_trading_deque::pop_top(task *offered_task, peek_result peek_result) {
stamped_integer expected_top = peek_result.top_pointer_;
auto local_bot = bot_.load();
if (expected_top.value >= local_bot) {
return data_structures::optional<task *>{};
}
auto &target_entry = entries_[expected_top.value];
// Read our potential result
task *result = target_entry.traded_task_.load();
unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load();
// Try to get it by CAS with the expected field entry, giving up our offered_task for it
traded_cas_field expected_sync_cas_field;
expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_);
traded_cas_field offered_field;
offered_field.fill_with_trade_object(offered_task);
if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) {
// We got it, for sure move the top pointer forward.
top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1});
// Return the stolen task
return data_structures::optional<task *>{result};
} else {
// We did not get it...help forwarding the top pointer anyway.
if (expected_top.stamp == forwarding_stamp) {
// ...move the pointer forward if someone else put a valid trade object in there.
top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1});
} else {
// ...we failed because the top tag lags behind...try to fix it.
// This means only updating the tag, as this location can still hold data we need.
top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value});
}
return data_structures::optional<task *>{};
}
}
}
...@@ -2,36 +2,39 @@ ...@@ -2,36 +2,39 @@
#include "context_switcher/context_switcher.h" #include "context_switcher/context_switcher.h"
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
namespace pls { namespace pls::internal::scheduling {
namespace internal {
namespace scheduling {
scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, bool reuse_thread) : scheduler::scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread) :
num_threads_{num_threads}, num_threads_{num_threads},
reuse_thread_{reuse_thread}, reuse_thread_{reuse_thread},
memory_{memory},
sync_barrier_{num_threads + 1 - reuse_thread}, sync_barrier_{num_threads + 1 - reuse_thread},
worker_threads_{},
thread_states_{},
main_thread_starter_function_{nullptr},
work_section_done_{false},
terminated_{false} { terminated_{false} {
if (num_threads_ > memory.max_threads()) {
PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
}
worker_threads_.reserve(num_threads);
task_managers_.reserve(num_threads);
thread_states_.reserve(num_threads);
for (unsigned int i = 0; i < num_threads_; i++) { for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized. auto &this_task_manager =
memory.thread_state_for(i).set_scheduler(this); task_managers_.emplace_back(std::make_unique<task_manager>(i, computation_depth, stack_size, stack_allocator_));
memory.thread_state_for(i).set_id(i); auto &this_thread_state = thread_states_.emplace_back(std::make_unique<thread_state>(*this, i, *this_task_manager));
memory.thread_state_for(i).get_task_manager().set_thread_id(i);
if (reuse_thread && i == 0) { if (reuse_thread && i == 0) {
worker_threads_.emplace_back();
continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
} }
memory.thread_for(i) = base::thread(&scheduler::work_thread_main_loop, &memory_.thread_state_for(i));
auto *this_thread_state_pointer = this_thread_state.get();
worker_threads_.emplace_back([this_thread_state_pointer] {
thread_state::set(this_thread_state_pointer);
work_thread_main_loop();
});
} }
} }
...@@ -63,7 +66,7 @@ void scheduler::work_thread_work_section() { ...@@ -63,7 +66,7 @@ void scheduler::work_thread_work_section() {
auto const num_threads = my_state.get_scheduler().num_threads(); auto const num_threads = my_state.get_scheduler().num_threads();
if (my_state.get_id() == 0) { if (my_state.get_thread_id() == 0) {
// Main Thread, kick off by executing the user's main code block. // Main Thread, kick off by executing the user's main code block.
main_thread_starter_function_->run(); main_thread_starter_function_->run();
} }
...@@ -74,7 +77,7 @@ void scheduler::work_thread_work_section() { ...@@ -74,7 +77,7 @@ void scheduler::work_thread_work_section() {
// TODO: move steal routine into separate function // TODO: move steal routine into separate function
const size_t target = my_state.get_rand() % num_threads; const size_t target = my_state.get_rand() % num_threads;
if (target == my_state.get_id()) { if (target == my_state.get_thread_id()) {
continue; continue;
} }
...@@ -134,16 +137,12 @@ void scheduler::terminate() { ...@@ -134,16 +137,12 @@ void scheduler::terminate() {
if (reuse_thread_ && i == 0) { if (reuse_thread_ && i == 0) {
continue; continue;
} }
memory_.thread_for(i).join(); worker_threads_[i].join();
} }
} }
thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); }
void scheduler::sync() { void scheduler::sync() {
thread_state::get().get_task_manager().sync(); thread_state::get().get_task_manager().sync();
} }
} }
}
}
...@@ -2,31 +2,38 @@ ...@@ -2,31 +2,38 @@
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls { namespace pls::internal::scheduling {
namespace internal {
namespace scheduling {
task_manager::task_manager(task *tasks, task_manager::task_manager(unsigned thread_id,
data_structures::aligned_stack static_stack_space,
size_t num_tasks, size_t num_tasks,
size_t stack_size, size_t stack_size,
external_trading_deque &deque) : num_tasks_{num_tasks}, stack_allocator &stack_allocator) : num_tasks_{num_tasks},
this_thread_tasks_{tasks}, stack_allocator_{stack_allocator},
active_task_{&tasks[0]}, tasks_{},
deque_{deque} { deque_{thread_id, num_tasks_} {
tasks_.reserve(num_tasks);
for (size_t i = 0; i < num_tasks - 1; i++) { for (size_t i = 0; i < num_tasks - 1; i++) {
tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0); char *stack_memory = stack_allocator.allocate_stack(stack_size);
tasks_.emplace_back(std::make_unique<task>(stack_memory, stack_size, i, thread_id));
if (i > 0) { if (i > 0) {
tasks[i].prev_ = &tasks[i - 1]; tasks_[i - 1]->next_ = tasks_[i].get();
tasks_[i]->prev_ = tasks_[i - 1].get();
} }
if (i < num_tasks - 2) {
tasks[i].next_ = &tasks[i + 1];
} }
active_task_ = tasks_[0].get();
}
task_manager::~task_manager() {
for (auto &task : tasks_) {
stack_allocator_.free_stack(task->stack_size_, task->stack_memory_);
} }
} }
static task *find_task(unsigned id, unsigned depth) { static task &find_task(unsigned id, unsigned depth) {
return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth); return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth);
} }
...@@ -49,7 +56,7 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) { ...@@ -49,7 +56,7 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) {
traded_task->next_ = nullptr; traded_task->next_ = nullptr;
// perform the actual pop operation // perform the actual pop operation
auto pop_result_task = deque_.pop_top(traded_task, peek.top_pointer_); auto pop_result_task = deque_.pop_top(traded_task, peek);
if (pop_result_task) { if (pop_result_task) {
PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_, PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_,
"It is impossible to steal an task we already own!"); "It is impossible to steal an task we already own!");
...@@ -90,8 +97,8 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha ...@@ -90,8 +97,8 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha
spare_task_chain->resource_stack_next_.store(nullptr); spare_task_chain->resource_stack_next_.store(nullptr);
} else { } else {
// Already an entry. Find it's corresponding task and set it as our successor. // Already an entry. Find it's corresponding task and set it as our successor.
auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); auto &current_root_task = find_task(current_root.value - 1, target_task->depth_);
spare_task_chain->resource_stack_next_.store(current_root_task); spare_task_chain->resource_stack_next_.store(&current_root_task);
} }
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
...@@ -108,13 +115,13 @@ task *task_manager::pop_resource_from_task(task *target_task) { ...@@ -108,13 +115,13 @@ task *task_manager::pop_resource_from_task(task *target_task) {
return nullptr; return nullptr;
} else { } else {
// Found something, try to pop it // Found something, try to pop it
auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); auto &current_root_task = find_task(current_root.value - 1, target_task->depth_);
auto *next_stack_task = current_root_task->resource_stack_next_.load(); auto *next_stack_task = current_root_task.resource_stack_next_.load();
target_root.stamp = current_root.stamp + 1; target_root.stamp = current_root.stamp + 1;
target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
output_task = current_root_task; output_task = &current_root_task;
} }
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
...@@ -233,5 +240,3 @@ bool task_manager::check_task_chain() { ...@@ -233,5 +240,3 @@ bool task_manager::check_task_chain() {
} }
} }
}
}
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/base/thread.h"
namespace pls { namespace pls::internal::scheduling {
namespace internal {
namespace scheduling {
thread_state &thread_state::get() { return *base::this_thread::state<thread_state>(); } thread_local thread_state *my_thread_state{nullptr};
thread_state &thread_state::get() { return *my_thread_state; }
void thread_state::set(thread_state *new_state) { my_thread_state = new_state; }
}
}
} }
#include <catch.hpp> #include <catch.hpp>
#include "pls/internal/base/thread.h"
#include "pls/internal/base/spin_lock.h" #include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
#include <vector> #include <vector>
#include <mutex> #include <mutex>
#include <thread>
using namespace pls::internal::base; using namespace pls::internal::base;
using namespace std;
static bool base_tests_visited; static bool base_tests_visited;
static int base_tests_local_value_one; static int base_tests_local_value_one;
static vector<int> base_tests_local_value_two; static std::vector<int> base_tests_local_value_two;
TEST_CASE("thread creation and joining", "[internal/data_structures/thread.h]") {
base_tests_visited = false;
thread t1{[]() { base_tests_visited = true; }};
t1.join();
REQUIRE(base_tests_visited);
}
TEST_CASE("thread state", "[internal/data_structures/thread.h]") {
int state_one = 1;
vector<int> state_two{1, 2};
thread t1{[]() { base_tests_local_value_one = *this_thread::state<int>(); }, &state_one};
thread t2{[]() { base_tests_local_value_two = *this_thread::state<vector<int>>(); }, &state_two};
t1.join();
t2.join();
REQUIRE(base_tests_local_value_one == 1);
REQUIRE(base_tests_local_value_two == vector<int>{1, 2});
}
int base_tests_shared_counter; int base_tests_shared_counter;
...@@ -42,14 +21,14 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi ...@@ -42,14 +21,14 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi
spin_lock lock{}; spin_lock lock{};
SECTION("lock can be used by itself") { SECTION("lock can be used by itself") {
thread t1{[&]() { std::thread t1{[&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
lock.lock(); lock.lock();
base_tests_shared_counter++; base_tests_shared_counter++;
lock.unlock(); lock.unlock();
} }
}}; }};
thread t2{[&]() { std::thread t2{[&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
lock.lock(); lock.lock();
base_tests_shared_counter--; base_tests_shared_counter--;
...@@ -64,13 +43,13 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi ...@@ -64,13 +43,13 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi
} }
SECTION("lock can be used with std::lock_guard") { SECTION("lock can be used with std::lock_guard") {
thread t1{[&]() { std::thread t1{[&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
std::lock_guard<spin_lock> my_lock{lock}; std::lock_guard<spin_lock> my_lock{lock};
base_tests_shared_counter++; base_tests_shared_counter++;
} }
}}; }};
thread t2{[&]() { std::thread t2{[&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
std::lock_guard<spin_lock> my_lock{lock}; std::lock_guard<spin_lock> my_lock{lock};
base_tests_shared_counter--; base_tests_shared_counter--;
......
#include <catch.hpp> #include <catch.hpp>
#include <vector> #include <vector>
#include <atomic>
#include "pls/internal/scheduling/traded_cas_field.h" #include "pls/internal/scheduling/traded_cas_field.h"
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/external_trading_deque.h" #include "pls/internal/scheduling/external_trading_deque.h"
#include "pls/internal/scheduling/scheduler.h"
using namespace pls::internal::scheduling; using namespace pls::internal::scheduling;
constexpr int MAX_NUM_THREADS = 8;
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 8;
TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling/scheduler.h]") {
scheduler scheduler{3, MAX_NUM_TASKS, MAX_STACK_SIZE};
std::atomic<int> num_run{0};
scheduler.perform_work([&] {
scheduler::spawn([&] {
num_run++;
while (num_run < 3);
});
scheduler::spawn([&] {
while (num_run < 1);
num_run++;
while (num_run < 3);
});
scheduler::spawn([&] {
while (num_run < 2);
num_run++;
});
scheduler::sync();
});
REQUIRE(true);
}
TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas_field.h]") { TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas_field.h]") {
traded_cas_field empty_field; traded_cas_field empty_field;
REQUIRE(empty_field.is_empty()); REQUIRE(empty_field.is_empty());
...@@ -24,7 +53,7 @@ TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas ...@@ -24,7 +53,7 @@ TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas
REQUIRE(tag_field.get_stamp() == stamp); REQUIRE(tag_field.get_stamp() == stamp);
REQUIRE(tag_field.get_deque_id() == ID); REQUIRE(tag_field.get_deque_id() == ID);
alignas(64) task obj; alignas(64) task obj{nullptr, 0, 0, 0};
traded_cas_field obj_field; traded_cas_field obj_field;
obj_field.fill_with_trade_object(&obj); obj_field.fill_with_trade_object(&obj);
REQUIRE(obj_field.is_filled_with_object()); REQUIRE(obj_field.is_filled_with_object());
...@@ -33,15 +62,13 @@ TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas ...@@ -33,15 +62,13 @@ TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas
} }
TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque]") { TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque]") {
static_external_trading_deque<16> static_external_trading_deque_1; external_trading_deque deque_1{1, 16};
external_trading_deque &deque_1 = static_external_trading_deque_1.get_deque(); external_trading_deque deque_2{2, 16};
deque_1.set_thread_id(1);
static_external_trading_deque<16> static_external_trading_deque_2;
external_trading_deque &deque_2 = static_external_trading_deque_2.get_deque();
deque_2.set_thread_id(2);
std::vector<task> tasks(16); task tasks[4] = {{nullptr, 0, 0, 0},
{nullptr, 0, 1, 0},
{nullptr, 0, 2, 0},
{nullptr, 0, 3, 0}};
SECTION("basic operations") { SECTION("basic operations") {
// Must start empty // Must start empty
...@@ -56,9 +83,9 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque ...@@ -56,9 +83,9 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque
// Local push, external pop // Local push, external pop
deque_1.push_bot(&tasks[0]); deque_1.push_bot(&tasks[0]);
auto peek = deque_1.peek_top(); auto peek = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[1], peek.top_pointer_) == &tasks[0]); REQUIRE(*deque_1.pop_top(&tasks[1], peek) == &tasks[0]);
REQUIRE(*external_trading_deque::get_trade_object(&tasks[0]) == &tasks[1]); REQUIRE(*external_trading_deque::get_trade_object(&tasks[0]) == &tasks[1]);
REQUIRE(!deque_1.pop_top(&tasks[1], peek.top_pointer_)); REQUIRE(!deque_1.pop_top(&tasks[1], peek));
REQUIRE(!deque_1.pop_bot()); REQUIRE(!deque_1.pop_bot());
// Keeps push/pop order // Keeps push/pop order
...@@ -71,9 +98,9 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque ...@@ -71,9 +98,9 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque
deque_1.push_bot(&tasks[0]); deque_1.push_bot(&tasks[0]);
deque_1.push_bot(&tasks[1]); deque_1.push_bot(&tasks[1]);
auto peek1 = deque_1.peek_top(); auto peek1 = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[2], peek1.top_pointer_) == &tasks[0]); REQUIRE(*deque_1.pop_top(&tasks[2], peek1) == &tasks[0]);
auto peek2 = deque_1.peek_top(); auto peek2 = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[3], peek2.top_pointer_) == &tasks[1]); REQUIRE(*deque_1.pop_top(&tasks[3], peek2) == &tasks[1]);
} }
SECTION("Interwined execution #1") { SECTION("Interwined execution #1") {
...@@ -81,8 +108,8 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque ...@@ -81,8 +108,8 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque
deque_1.push_bot(&tasks[0]); deque_1.push_bot(&tasks[0]);
auto peek1 = deque_1.peek_top(); auto peek1 = deque_1.peek_top();
auto peek2 = deque_1.peek_top(); auto peek2 = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[1], peek1.top_pointer_) == &tasks[0]); REQUIRE(*deque_1.pop_top(&tasks[1], peek1) == &tasks[0]);
REQUIRE(!deque_1.pop_top(&tasks[2], peek2.top_pointer_)); REQUIRE(!deque_1.pop_top(&tasks[2], peek2));
} }
SECTION("Interwined execution #2") { SECTION("Interwined execution #2") {
...@@ -90,6 +117,6 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque ...@@ -90,6 +117,6 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque
deque_1.push_bot(&tasks[0]); deque_1.push_bot(&tasks[0]);
auto peek1 = deque_1.peek_top(); auto peek1 = deque_1.peek_top();
REQUIRE(*deque_1.pop_bot() == &tasks[0]); REQUIRE(*deque_1.pop_bot() == &tasks[0]);
REQUIRE(!deque_1.pop_top(&tasks[2], peek1.top_pointer_)); REQUIRE(!deque_1.pop_top(&tasks[2], peek1));
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment