From 927d8ac59a7e8de9a94ef273ade322c7506be513 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Wed, 18 Mar 2020 17:23:55 +0100 Subject: [PATCH] Rework memory allocation to heap based RAII objects. Remove the strict static memory allocation scheme in favour of placing objects on the heap at startup. This still keeps the requirements posed for modern, high performance embedded systems, but makes APIs a lot cleaner. --- NOTES.md | 10 ++++++++++ app/playground/CMakeLists.txt | 2 +- app/playground/main.cpp | 69 +++++++++++++++++++++++++++++---------------------------------------- app/playground/tsan_support.h | 21 +++++++++++++++++++++ lib/pls/CMakeLists.txt | 24 ++++++++---------------- lib/pls/include/pls/internal/base/system_details.h | 11 ++++++----- lib/pls/include/pls/internal/base/thread.h | 116 -------------------------------------------------------------------------------------------------------------------- lib/pls/include/pls/internal/base/thread_impl.h | 85 ------------------------------------------------------------------------------------- lib/pls/include/pls/internal/scheduling/external_trading_deque.h | 182 +++++++++++++++++++++++++++----------------------------------------------------------------------------------------------------------------------------------------------------------- lib/pls/include/pls/internal/scheduling/scheduler.h | 52 +++++++++++++++++++++++++++++++++++----------------- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 10 +++------- lib/pls/include/pls/internal/scheduling/scheduler_memory.h | 33 --------------------------------- lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h | 43 ------------------------------------------- lib/pls/include/pls/internal/scheduling/task.h | 34 ++++++++++++++++++---------------- lib/pls/include/pls/internal/scheduling/task_manager.h | 59 +++++++++++++++++++++-------------------------------------- lib/pls/include/pls/internal/scheduling/task_manager_impl.h | 7 +------ lib/pls/include/pls/internal/scheduling/thread_state.h | 74 +++++++++++++++++++++++++++++++++++--------------------------------------- lib/pls/include/pls/internal/scheduling/thread_state_static.h | 31 ------------------------------- lib/pls/include/pls/internal/scheduling/traded_cas_field.h | 6 +----- lib/pls/src/internal/base/thread.cpp | 47 ----------------------------------------------- lib/pls/src/internal/scheduling/external_trading_deque.cpp | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/src/internal/scheduling/scheduler.cpp | 45 ++++++++++++++++++++++----------------------- lib/pls/src/internal/scheduling/task_manager.cpp | 51 ++++++++++++++++++++++++++++----------------------- lib/pls/src/internal/scheduling/thread_state.cpp | 12 +++++------- test/base_tests.cpp | 35 +++++++---------------------------- test/scheduling_tests.cpp | 59 +++++++++++++++++++++++++++++++++++++++++++---------------- 26 files changed, 457 insertions(+), 797 deletions(-) create mode 100644 app/playground/tsan_support.h delete mode 100644 lib/pls/include/pls/internal/base/thread.h delete mode 100644 lib/pls/include/pls/internal/base/thread_impl.h delete mode 100644 lib/pls/include/pls/internal/scheduling/scheduler_memory.h delete mode 100644 lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h delete mode 100644 lib/pls/include/pls/internal/scheduling/thread_state_static.h delete mode 100644 lib/pls/src/internal/base/thread.cpp create mode 100644 lib/pls/src/internal/scheduling/external_trading_deque.cpp diff --git a/NOTES.md b/NOTES.md index f7793f2..e761d94 100644 --- a/NOTES.md +++ b/NOTES.md @@ -4,6 +4,16 @@ The new version of pls uses a more complicated/less user friendly API in favor of performance and memory guarantees. For the old version refer to the second half of this document. +# 18.03.2020 - C++17 benefit, overaligned new + +We have many cache line aligned types. Before C++17 they can +only be allocated with correct alignment using alignas(...) +when they are stored in static storage (e.g. global memory, stack memory). + +As we move towards more modern RAII type resource management, the fact that +[C++ 17 supports alignment of arbitrary length, even in vectors](https://www.bfilipek.com/2019/08/newnew-align.html) +is extremely helpful. Before we needed wrappers to do aligned heap allocations. + # 18.03.2020 - Coding Standard/C++ Standard We previously stuck to strict 'static' allocation in global static diff --git a/app/playground/CMakeLists.txt b/app/playground/CMakeLists.txt index 018b19b..34b4ede 100644 --- a/app/playground/CMakeLists.txt +++ b/app/playground/CMakeLists.txt @@ -3,4 +3,4 @@ add_executable(playground main.cpp) # Example for adding the library to your app (as a cmake project dependency) -target_link_libraries(playground context_switcher Threads::Threads) +target_link_libraries(playground pls context_switcher Threads::Threads) diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 93bf83f..2ca61ee 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -1,52 +1,41 @@ -#include -#include -#include +#include +#include +#include +#include +#include -#include "barrier.h" +#include +#include "tsan_support.h" -#include "context_switcher/context_switcher.h" - -using namespace context_switcher; using namespace std; -// Memory for custom stack and continuation semantics -const size_t STACK_SIZE = 512 * 32; -const size_t NUM_STACKS = 4; -char custom_stacks[NUM_STACKS][STACK_SIZE]; +long count_memory_mappings() { + pid_t my_pid = getpid(); + ifstream proc_file{"/proc/" + to_string(my_pid) + "/maps"}; + + string line; + long line_count{0}; + while (getline(proc_file, line)) { + line_count++; + } + + return line_count; +} int main() { - context_switcher::continuation cont_t1, cont_main; - barrier bar{2}; - int error = 0; - - auto t1 = std::thread([&]() { - while (true) { - bar.wait(); - auto cont = enter_context(custom_stacks[0], STACK_SIZE, [&](continuation &&cont) { - error++; - cont_t1 = std::move(cont); - bar.wait(); - error++; - return std::move(cont_main); - }); - } - }); + mutex mut; int count = 0; - while (true) { - count++; - if (count % 100 == 0) { - printf("%d\n", count); - } - bar.wait(); - auto cont = enter_context(custom_stacks[1], STACK_SIZE, [&](continuation &&cont) { - error++; - cont_main = std::move(cont); - bar.wait(); - error++; - return std::move(cont_t1); - }); + printf("iteration: %d, mappings: %ld\n", count++, count_memory_mappings()); + void *main_fiber = __tsan_get_current_fiber(); + void *other_fiber = __tsan_create_fiber(0); + __tsan_switch_to_fiber(other_fiber, 0); + mut.lock(); + mut.unlock(); + __tsan_switch_to_fiber(main_fiber, 0); + __tsan_destroy_fiber(other_fiber); + } return 0; diff --git a/app/playground/tsan_support.h b/app/playground/tsan_support.h new file mode 100644 index 0000000..a5f60a0 --- /dev/null +++ b/app/playground/tsan_support.h @@ -0,0 +1,21 @@ + +#ifndef CONTEXT_SWITCHER_TSAN_SUPPORT +#define CONTEXT_SWITCHER_TSAN_SUPPORT + +extern "C" { +// Fiber switching API. +// - TSAN context for fiber can be created by __tsan_create_fiber +// and freed by __tsan_destroy_fiber. +// - TSAN context of current fiber or thread can be obtained +// by calling __tsan_get_current_fiber. +// - __tsan_switch_to_fiber should be called immediatly before switch +// to fiber, such as call of swapcontext. +// - Fiber name can be set by __tsan_set_fiber_name. +void *__tsan_get_current_fiber(void); +void *__tsan_create_fiber(unsigned flags); +void __tsan_destroy_fiber(void *fiber); +void __tsan_switch_to_fiber(void *fiber, unsigned flags); +void __tsan_set_fiber_name(void *fiber, const char *name); +}; + +#endif //CONTEXT_SWITCHER_TSAN_SUPPORT diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index f7f8232..4d53e8e 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -4,17 +4,18 @@ add_library(pls STATIC include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp include/pls/internal/base/swmr_spin_lock.h src/internal/base/swmr_spin_lock.cpp - include/pls/internal/base/thread.h src/internal/base/thread.cpp - include/pls/internal/base/thread_impl.h include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/system_details.h - include/pls/internal/base/error_handling.h + include/pls/internal/base/error_handling.h src/internal/base/error_handling.cpp include/pls/internal/base/alignment.h src/internal/base/alignment.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/stamped_integer.h include/pls/internal/data_structures/delayed_initialization.h + include/pls/internal/data_structures/bounded_trading_deque.h + include/pls/internal/data_structures/bounded_ws_deque.h + include/pls/internal/data_structures/optional.h include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/profiler.h @@ -24,23 +25,14 @@ add_library(pls STATIC include/pls/internal/helpers/seqence.h include/pls/internal/helpers/member_function.h - include/pls/internal/scheduling/thread_state.h + include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h - include/pls/internal/scheduling/scheduler_memory.h - include/pls/internal/scheduling/task_manager.h + include/pls/internal/scheduling/task_manager.h src/internal/scheduling/task_manager.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp - include/pls/internal/data_structures/bounded_ws_deque.h - include/pls/internal/data_structures/optional.h - include/pls/internal/scheduling/thread_state_static.h - src/internal/base/error_handling.cpp - include/pls/internal/data_structures/bounded_trading_deque.h - include/pls/internal/scheduling/external_trading_deque.h + include/pls/internal/scheduling/external_trading_deque.h src/internal/scheduling/external_trading_deque.cpp include/pls/internal/scheduling/traded_cas_field.h - include/pls/internal/scheduling/task_manager_impl.h - include/pls/internal/scheduling/static_scheduler_memory.h - include/pls/internal/scheduling/heap_scheduler_memory.h - src/internal/scheduling/task_manager.cpp src/internal/scheduling/thread_state.cpp) + include/pls/internal/scheduling/task_manager_impl.h) # Dependencies for pls target_link_libraries(pls Threads::Threads) diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h index 222d127..df7b45b 100644 --- a/lib/pls/include/pls/internal/base/system_details.h +++ b/lib/pls/include/pls/internal/base/system_details.h @@ -16,9 +16,7 @@ #include -namespace pls { -namespace internal { -namespace base { +namespace pls::internal::base { /** * Collection of system details, e.g. hardware cache line size. @@ -47,6 +45,11 @@ constexpr unsigned long CAS_SIZE = sizeof(cas_integer) * 8; constexpr pointer_t CACHE_LINE_SIZE = 64; /** + * Helper to align types/values on cache lines. + */ +#define PLS_CACHE_ALIGN alignas(base::system_details::CACHE_LINE_SIZE) + +/** * Choose one of the following ways to store thread specific data. * Try to choose the fastest available on this processor/system. */ @@ -87,7 +90,5 @@ inline void relax_cpu() { } } -} -} #endif //PLS_SYSTEM_DETAILS_H diff --git a/lib/pls/include/pls/internal/base/thread.h b/lib/pls/include/pls/internal/base/thread.h deleted file mode 100644 index 5bbdf6d..0000000 --- a/lib/pls/include/pls/internal/base/thread.h +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Abstraction for threading to allow porting. - * Currently using either pthread or C++ 11 threads. - */ - -#ifndef PLS_THREAD_H -#define PLS_THREAD_H - -#include -#include -#include -#include - -#include "system_details.h" - -namespace pls { -namespace internal { -namespace base { - -using thread_entrypoint = void(); - -/** - * Static methods than can be performed on the current thread. - * - * usage: - * this_thread::yield(); - * T* state = this_thread::state(); - * - * PORTABILITY: - * Current implementation is based on pthreads. - */ -class this_thread { - friend - class thread; -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - static pthread_key_t local_storage_key_; - static bool local_storage_key_initialized_; -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER - static __thread void *local_state_; -#endif - public: - static void yield() { - pthread_yield(); - } - - static void sleep(long microseconds) { - timespec time{0, 1000 * microseconds}; - nanosleep(&time, nullptr); - } - - /** - * Retrieves the local state pointer. - * - * @tparam T The type of the state that is stored. - * @return The state pointer hold for this thread. - */ - template - static T *state(); - - /** - * Stores a pointer to the thread local state object. - * The memory management for this has to be done by the user, - * we only keep the pointer. - * - * @tparam T The type of the state that is stored. - * @param state_pointer A pointer to the threads state object. - */ - template - static void set_state(T *state_pointer); -}; - -/** - * Abstraction for starting a function in a separate thread. - * Offers only threading functionality needed in this project, - * underlying implementation can be changed. - * Uses NO heap memory allocation. - * - * PORTABILITY: - * Current implementation is based on pthreads. - */ -class thread { - friend class this_thread; - // Keep handle to native implementation - pthread_t pthread_thread_; - bool running_; - - template - static void *start_pthread_internal(void *thread_pointer); - - public: - template - explicit thread(const Function &function, State *state_pointer); - - template - explicit thread(const Function &function); - - explicit thread(); - ~thread(); - - void join(); - - // make object move only - thread(thread &&) noexcept; - thread &operator=(thread &&) noexcept; - - thread(const thread &) = delete; - thread &operator=(const thread &) = delete; -}; - -} -} -} -#include "thread_impl.h" - -#endif //PLS_THREAD_H diff --git a/lib/pls/include/pls/internal/base/thread_impl.h b/lib/pls/include/pls/internal/base/thread_impl.h deleted file mode 100644 index ccde5a8..0000000 --- a/lib/pls/include/pls/internal/base/thread_impl.h +++ /dev/null @@ -1,85 +0,0 @@ - -#ifndef PLS_THREAD_IMPL_H -#define PLS_THREAD_IMPL_H - -namespace pls { -namespace internal { -namespace base { - -template -T *this_thread::state() { -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - return reinterpret_cast(pthread_getspecific(local_storage_key_)); -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER - return reinterpret_cast(local_state_); -#endif -} - -template -void this_thread::set_state(T *state_pointer) { -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer); -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER - local_state_ = state_pointer; -#endif -} - -template -struct thread_arguments { - Function function_; - State *state_; - std::atomic_flag *startup_flag_; -}; - -template -void *thread::start_pthread_internal(void *thread_pointer) { - // Actively copy all arguments into stack memory. - thread_arguments - arguments_copy = *reinterpret_cast *>(thread_pointer); - - // Now we have copies of everything we need on the stack. - // The original thread object can be moved freely (no more - // references to its memory location). - arguments_copy.startup_flag_->clear(); - - this_thread::set_state(arguments_copy.state_); - arguments_copy.function_(); - - // Finished executing the user function - pthread_exit(nullptr); -} - -template -thread::thread(const Function &function, State *state_pointer): - pthread_thread_{}, - running_{true} { - -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - if (!this_thread::local_storage_key_initialized_) { - pthread_key_create(&this_thread::local_storage_key_, nullptr); - this_thread::local_storage_key_initialized_ = true; - } -#endif - - // Wee need to wait for the started function to read - // the function_ and state_pointer_ property before returning - // from the constructor, as the object might be moved after this. - std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; - - thread_arguments arguments{function, state_pointer, &startup_flag}; - - startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return - pthread_create(&pthread_thread_, nullptr, start_pthread_internal < Function, State > , (void *) (&arguments)); - while (startup_flag.test_and_set()); // Busy waiting for the starting flag to clear -} - -template -thread::thread(const Function &function): thread{function, (void *) nullptr} {} - -} -} -} - -#endif //PLS_THREAD_IMPL_H diff --git a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h index 4fc6b1e..e873fe1 100644 --- a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h +++ b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h @@ -4,7 +4,7 @@ #include #include -#include +#include #include "pls/internal/base/error_handling.h" #include "pls/internal/base/system_details.h" @@ -15,9 +15,7 @@ #include "pls/internal/scheduling/traded_cas_field.h" #include "pls/internal/scheduling/task.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { using namespace data_structures; @@ -38,35 +36,11 @@ struct trading_deque_entry { * As each task is associated with memory this suffices to exchange memory blocks needed for execution. */ class external_trading_deque { - public: - external_trading_deque(trading_deque_entry *entries, size_t num_entries) : - entries_{entries}, num_entries_{num_entries} {}; - void set_thread_id(unsigned id) { - thread_id_ = id; - } - - static optional peek_traded_object(task *target_task) { - traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); - if (current_cas.is_filled_with_object()) { - return optional{current_cas.get_trade_object()}; - } else { - return optional{}; - } - } - - static optional get_trade_object(task *target_task) { - traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); - if (current_cas.is_filled_with_object()) { - task *result = current_cas.get_trade_object(); - traded_cas_field empty_cas; - if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { - return optional{result}; - } - } - - return optional{}; - } + external_trading_deque(unsigned thread_id, size_t num_entries) : thread_id_(thread_id), entries_(num_entries) {} + + static optional peek_traded_object(task *target_task); + static optional get_trade_object(task *target_task); /** * Pushes a task on the bottom of the deque. @@ -74,69 +48,14 @@ class external_trading_deque { * * @param published_task The task to publish on the bottom of the deque. */ - void push_bot(task *published_task) { - auto expected_stamp = bot_internal_.stamp; - auto ¤t_entry = entries_[bot_internal_.value]; - - // Publish the prepared task in the deque. - current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); - current_entry.traded_task_.store(published_task, std::memory_order_relaxed); - - // Field that all threads synchronize on. - // This happens not in the deque itself, but in the published task. - traded_cas_field sync_cas_field; - sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); - published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); - - // Advance the bot pointer. Linearization point for making the task public. - bot_internal_.stamp++; - bot_internal_.value++; - bot_.store(bot_internal_.value, std::memory_order_release); - } - - void reset_bot_and_top() { - bot_internal_.value = 0; - bot_internal_.stamp++; - - bot_.store(0); - top_.store({bot_internal_.stamp, 0}); - } - - void decrease_bot() { - bot_internal_.value--; - bot_.store(bot_internal_.value, std::memory_order_relaxed); - } + void push_bot(task *published_task); /** * Tries to pop the last task on the deque. * * @return optional holding the popped task if successful. */ - optional pop_bot() { - if (bot_internal_.value == 0) { - reset_bot_and_top(); - return optional{}; - } - decrease_bot(); - - auto ¤t_entry = entries_[bot_internal_.value]; - auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); - auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); - - // We know what value must be in the cas field if no other thread stole it. - traded_cas_field expected_sync_cas_field; - expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); - traded_cas_field empty_cas_field; - - if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, - empty_cas_field, - std::memory_order_acq_rel)) { - return optional{popped_task}; - } else { - reset_bot_and_top(); - return optional{}; - } - } + optional pop_bot(); struct peek_result { peek_result(optional top_task, stamped_integer top_pointer) : top_task_{std::move(top_task)}, @@ -153,82 +72,35 @@ class external_trading_deque { * * @return a peek result containing the optional top task (if present) and the current head pointer. */ - peek_result peek_top() { - auto local_top = top_.load(); - auto local_bot = bot_.load(); - - if (local_top.value < local_bot) { - return peek_result{optional{entries_[local_top.value].traded_task_}, local_top}; - } else { - return peek_result{optional{}, local_top}; - } - } - - optional pop_top(task *offered_task, stamped_integer expected_top) { - auto local_bot = bot_.load(); - if (expected_top.value >= local_bot) { - return data_structures::optional{}; - } - - auto &target_entry = entries_[expected_top.value]; - - // Read our potential result - task *result = target_entry.traded_task_.load(); - unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); - - // Try to get it by CAS with the expected field entry, giving up our offered_task for it - traded_cas_field expected_sync_cas_field; - expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_); - - traded_cas_field offered_field; - offered_field.fill_with_trade_object(offered_task); - - if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { - // We got it, for sure move the top pointer forward. - top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); - // Return the stolen task - return data_structures::optional{result}; - } else { - // We did not get it...help forwarding the top pointer anyway. - if (expected_top.stamp == forwarding_stamp) { - // ...move the pointer forward if someone else put a valid trade object in there. - top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); - } else { - // ...we failed because the top tag lags behind...try to fix it. - // This means only updating the tag, as this location can still hold data we need. - top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value}); - } - return data_structures::optional{}; - } - } + peek_result peek_top(); + + /** + * Tries to pop the task on top of the deque that was + * previously observed by 'peeking' at the deque. + * + * Returns the task if successful, returns nothing if + * either the peeked task is no longer at the top of the deque + * or another thread interfered and 'won' the task. + * + * @return optional holding the popped task if successful. + */ + optional pop_top(task *offered_task, peek_result peek_result); private: + void reset_bot_and_top(); + void decrease_bot(); + // info on this deque - trading_deque_entry *const entries_; - const size_t num_entries_; unsigned thread_id_; + std::vector entries_; // fields for stealing/interacting stamped_integer bot_internal_{0, 0}; - alignas(base::system_details::CACHE_LINE_SIZE) std::atomic top_{{0, 0}}; - alignas(base::system_details::CACHE_LINE_SIZE) std::atomic bot_{0}; - -}; - -template -class static_external_trading_deque { - public: - static_external_trading_deque() : items_{}, deque_{items_.data(), SIZE} {} - - external_trading_deque &get_deque() { return deque_; } - private: - std::array items_; - external_trading_deque deque_; + PLS_CACHE_ALIGN std::atomic top_{{0, 0}}; + PLS_CACHE_ALIGN std::atomic bot_{0}; }; } -} -} #endif //PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 5cb8f81..76ebc6d 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -3,18 +3,15 @@ #define PLS_SCHEDULER_H #include +#include +#include #include "pls/internal/helpers/profiler.h" - -#include "pls/internal/base/thread.h" #include "pls/internal/base/barrier.h" - -#include "pls/internal/scheduling/scheduler_memory.h" #include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/task_manager.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { struct task; @@ -29,16 +26,20 @@ class scheduler { public: /** * Initializes a scheduler instance with the given number of threads. - * This will spawn the threads and put them to sleep, ready to process an - * upcoming parallel section. + * This will allocate ALL runtime resources, spawn the worker threads + * and put them to sleep, ready to process an upcoming parallel section. + * + * The initialization should be seen as a heavy and not very predictable operation. + * After it is done the scheduler must (if configured correctly) never run out of resources + * and deliver tight time bounds of randomized work-stealing. * - * @param memory All memory is allocated statically, thus the user is required to provide the memory instance. * @param num_threads The number of worker threads to be created. */ - explicit scheduler(scheduler_memory &memory, unsigned int num_threads, bool reuse_thread = true); + explicit scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread = true); /** * The scheduler is implicitly terminated as soon as it leaves the scope. + * Resources follow a clean RAII style. */ ~scheduler(); @@ -53,18 +54,32 @@ class scheduler { template void perform_work(Function work_section); + /** + * Main parallelism construct, spawns a function for potential parallel execution. + * + * The result of the spawned function must not be relied on until sync() is called. + * Best see the lambda as if executed on a thread, e.g. it can cause race conditions + * and it is only finished after you join it back into the parent thread using sync(). + * + * @param lambda the lambda to be executed in parallel. + */ template static void spawn(Function &&lambda); + + /** + * Waits for all potentially parallel child tasks created with spawn(...). + */ static void sync(); - thread_state &thread_state_for(size_t id); + thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; } + task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; } /** * Explicitly terminate the worker threads. Scheduler must not be used after this. */ void terminate(); - unsigned int num_threads() const { return num_threads_; } + [[nodiscard]] unsigned int num_threads() const { return num_threads_; } private: static void work_thread_main_loop(); @@ -72,10 +87,12 @@ class scheduler { const unsigned int num_threads_; const bool reuse_thread_; - scheduler_memory &memory_; - base::barrier sync_barrier_; + std::vector worker_threads_; + std::vector> task_managers_; + std::vector> thread_states_; + class init_function; template class init_function_impl; @@ -84,11 +101,12 @@ class scheduler { std::atomic work_section_done_; bool terminated_; + + // TODO: remove this into a public wrapper class with templating + heap_stack_allocator stack_allocator_{}; }; } -} -} #include "scheduler_impl.h" #endif //PLS_SCHEDULER_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 9510505..158c0c7 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -12,9 +12,7 @@ #include "pls/internal/helpers/profiler.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { class scheduler::init_function { public: @@ -47,8 +45,8 @@ void scheduler::perform_work(Function work_section) { work_section_done_ = false; if (reuse_thread_) { - auto &my_state = memory_.thread_state_for(0); - base::this_thread::set_state(&my_state); // Make THIS THREAD become the main worker + auto &my_state = thread_state_for(0); + thread_state::set(&my_state); // Make THIS THREAD become the main worker sync_barrier_.wait(); // Trigger threads to wake up work_thread_work_section(); // Simply also perform the work section on the main loop @@ -66,7 +64,5 @@ void scheduler::spawn(Function &&lambda) { } } -} -} #endif //PLS_SCHEDULER_IMPL_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h deleted file mode 100644 index f07ffca..0000000 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef PLS_SCHEDULER_MEMORY_H -#define PLS_SCHEDULER_MEMORY_H - -#include "pls/internal/base/thread.h" -#include "pls/internal/scheduling/thread_state.h" - -namespace pls { -namespace internal { -namespace scheduling { - -// TODO: This way to handle memory is kind of a mess. We reworked it once...maybe it needs a second visit -// especially with the new 'stagered stack' (mmap allocs with page faults would be nice). -class scheduler_memory { - // Note: scheduler_memory is a pure interface and has no data. - // By not having an initialization routine we can do our 'static and heap specialization' - // without running into any ordering problems in the initialization sequence. - - protected: - thread_state **thread_states_array_{nullptr}; - - public: - virtual size_t max_threads() const = 0; - virtual base::thread &thread_for(size_t id) = 0; - thread_state &thread_state_for(size_t id) { - return *thread_states_array_[id]; - } -}; - -} -} -} - -#endif //PLS_SCHEDULER_MEMORY_H diff --git a/lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h deleted file mode 100644 index f19adb4..0000000 --- a/lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef PLS_STATIC_SCHEDULER_MEMORY_H -#define PLS_STATIC_SCHEDULER_MEMORY_H - -#include "pls/internal/base/thread.h" - -#include "pls/internal/scheduling/scheduler_memory.h" -#include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/scheduling/thread_state_static.h" - -namespace pls { -namespace internal { -namespace scheduling { - -template -class static_scheduler_memory : public scheduler_memory { - public: - static_scheduler_memory() : scheduler_memory{} { - for (size_t i = 0; i < MAX_THREADS; i++) { - thread_state_pointers_[i] = &thread_states_[i].get_thread_state(); - } - thread_states_array_ = thread_state_pointers_.data(); - } - - size_t max_threads() const override { - return MAX_THREADS; - } - - base::thread &thread_for(size_t id) override { - return threads_[id]; - } - private: - using thread_state_type = thread_state_static; - - alignas(base::system_details::CACHE_LINE_SIZE) std::array threads_; - alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_states_; - alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_state_pointers_; -}; - -} -} -} - -#endif // PLS_STATIC_SCHEDULER_MEMORY_H diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index c8696a8..32888fd 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -11,10 +11,7 @@ #include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/scheduling/traded_cas_field.h" -namespace pls { -namespace internal { -namespace scheduling { - +namespace pls::internal::scheduling { /** * A task is the smallest unit of execution seen by the runtime system. * @@ -27,16 +24,23 @@ namespace scheduling { * - running (currently executing user code) * - suspended (suspended by switching to a different task). */ -struct alignas(base::system_details::CACHE_LINE_SIZE) task { - void init(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) { - stack_memory_ = stack_memory; - stack_size_ = stack_size; - - depth_ = depth; - thread_id_ = thread_id; +struct PLS_CACHE_ALIGN task { + task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : + stack_memory_{stack_memory}, + stack_size_{stack_size}, + is_synchronized_{false}, + depth_{depth}, + thread_id_{thread_id}, + prev_{nullptr}, + next_{nullptr} {} - is_synchronized_ = false; - } + // Do not allow accidental copy/move operations. + // The whole runtime relies on tasks never changing memory positions during execution. + // Create tasks ONCE and use them until the runtime is shut down. + task(const task &other) = delete; + task(task &&other) = delete; + task &operator=(const task &other) = delete; + task &operator=(task &&other) = delete; template context_switcher::continuation run_as_task(F &&lambda) { @@ -46,7 +50,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { // TODO: Proper access control and split it up into responsibilities // Stack/Continuation Management char *stack_memory_; - size_t stack_size_; // TODO: maybe remove it, not needed in here + size_t stack_size_; context_switcher::continuation continuation_; bool is_synchronized_; @@ -66,7 +70,5 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { }; } -} -} #endif //PLS_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 8b473a6..e02942d 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -11,9 +11,19 @@ #include "pls/internal/data_structures/aligned_stack.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { + +class stack_allocator { + public: + virtual char *allocate_stack(size_t size) = 0; + virtual void free_stack(size_t size, char *stack) = 0; +}; + +class heap_stack_allocator : public stack_allocator { + public: + char *allocate_stack(size_t size) override { return new char[size]; } + void free_stack(size_t, char *stack) override { delete[] stack; } +}; /** * Handles management of tasks in the system. Each thread has a local task manager, @@ -21,24 +31,17 @@ namespace scheduling { */ class task_manager { public: - explicit task_manager(task *tasks, - data_structures::aligned_stack static_stack_space, + explicit task_manager(unsigned thread_id, size_t num_tasks, size_t stack_size, - external_trading_deque &deque); + stack_allocator &stack_allocator); + ~task_manager(); void push_resource_on_task(task *target_task, task *spare_task_chain); task *pop_resource_from_task(task *target_task); - task *get_this_thread_task(size_t depth) { - return &this_thread_tasks_[depth]; - } - - void set_thread_id(unsigned id) { - for (size_t i = 0; i < num_tasks_; i++) { - this_thread_tasks_[i].thread_id_ = id; - } - deque_.set_thread_id(id); + task &get_this_thread_task(size_t depth) { + return *tasks_[depth]; } task &get_active_task() { @@ -63,33 +66,13 @@ class task_manager { private: size_t num_tasks_; - task *this_thread_tasks_; + stack_allocator &stack_allocator_; + std::vector> tasks_; task *active_task_; - external_trading_deque &deque_; + external_trading_deque deque_; }; -template -class static_task_manager { - public: - static_task_manager() - : tasks_{}, - static_stack_storage_{}, - static_external_trading_deque_{}, - task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE, - static_external_trading_deque_.get_deque()} {}; - task_manager &get_task_manager() { return task_manager_; } - - private: - std::array tasks_; - data_structures::static_aligned_stack static_stack_storage_; - static_external_trading_deque static_external_trading_deque_; - - task_manager task_manager_; -}; - -} -} } #include "task_manager_impl.h" diff --git a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h index ba11377..3fda72c 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h @@ -8,13 +8,10 @@ #include "context_switcher/continuation.h" -#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/thread_state.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { template void task_manager::spawn_child(F &&lambda) { @@ -77,7 +74,5 @@ void task_manager::spawn_child(F &&lambda) { } } -} -} #endif //PLS_TASK_MANAGER_IMPL_H_ diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 329ff91..7f717fd 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -6,75 +6,71 @@ #include #include -#include "pls/internal/base/system_details.h" - #include "context_switcher/continuation.h" -namespace pls { -namespace internal { -namespace scheduling { +#include "pls/internal/base/system_details.h" + +namespace pls::internal::scheduling { class scheduler; class task_manager; -struct task; -struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { +/** + * Proxy-Object for thread local state needed during scheduling. + * The main use is to perform thread_state::get() as a thread local + * memory to identify the current worker thread state. + * + * Holds only minimal data by itself and points to the appropriate scheduler + * and task manager objects associated with this thread. + */ +struct PLS_CACHE_ALIGN thread_state { private: - scheduler *scheduler_; - unsigned id_; + const unsigned thread_id_; + scheduler &scheduler_; task_manager &task_manager_; - alignas(base::system_details::CACHE_LINE_SIZE) context_switcher::continuation main_loop_continuation_; - alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; + PLS_CACHE_ALIGN context_switcher::continuation main_loop_continuation_; + PLS_CACHE_ALIGN std::minstd_rand random_; public: - explicit thread_state(task_manager &task_manager) : - scheduler_{nullptr}, - id_{0}, + explicit thread_state(scheduler &scheduler, + unsigned thread_id, + task_manager &task_manager) : + thread_id_{thread_id}, + scheduler_{scheduler}, task_manager_{task_manager}, random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count())} {}; + // Do not allow accidental copy/move operations. + thread_state(const thread_state &) = delete; + thread_state(thread_state &&) = delete; + thread_state &operator=(const thread_state &) = delete; + thread_state &operator=(thread_state &&) = delete; + /** * Convenience helper to get the thread_state instance associated with this thread. * Must only be called on threads that are associated with a thread_state, * this will most likely be threads created by the scheduler. * - * Each call is guaranteed to be a new lockup, i.e. it is not cached after fiber context switches. + * Each call is guaranteed to be a new lookup, i.e. it is not cached after fiber context switches. * * @return The thread_state of this thread. */ - static thread_state &PLS_NOINLINE get(); + [[nodiscard]] static thread_state &PLS_NOINLINE get(); + static void set(thread_state *); - unsigned get_id() { return id_; } - void set_id(unsigned id) { - id_ = id; - } - task_manager &get_task_manager() { return task_manager_; } - scheduler &get_scheduler() { return *scheduler_; } - void set_scheduler(scheduler *scheduler) { - scheduler_ = scheduler; - } - long get_rand() { + [[nodiscard]] unsigned get_thread_id() const { return thread_id_; } + [[nodiscard]] task_manager &get_task_manager() { return task_manager_; } + [[nodiscard]] scheduler &get_scheduler() { return scheduler_; } + [[nodiscard]] long get_rand() { return random_(); } - context_switcher::continuation &main_continuation() { + [[nodiscard]] context_switcher::continuation &main_continuation() { return main_loop_continuation_; } - - // Do not allow move/copy operations. - // State is a pure memory container with references/pointers into it from all over the code. - // It should be allocated, used and de-allocated, nothing more. - thread_state(thread_state &&) = delete; - thread_state &operator=(thread_state &&) = delete; - - thread_state(const thread_state &) = delete; - thread_state &operator=(const thread_state &) = delete; - }; } -} -} #endif //PLS_THREAD_STATE_H diff --git a/lib/pls/include/pls/internal/scheduling/thread_state_static.h b/lib/pls/include/pls/internal/scheduling/thread_state_static.h deleted file mode 100644 index 93539fb..0000000 --- a/lib/pls/include/pls/internal/scheduling/thread_state_static.h +++ /dev/null @@ -1,31 +0,0 @@ - -#ifndef PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ -#define PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ - -#include "pls/internal/scheduling/task_manager.h" -#include "pls/internal/base/system_details.h" - -#include "thread_state.h" - -namespace pls { -namespace internal { -namespace scheduling { - -template -struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state_static { - public: - thread_state_static() - : static_task_manager_{}, - thread_state_{static_task_manager_.get_task_manager()} {} - thread_state &get_thread_state() { return thread_state_; } - - private: - alignas(base::system_details::CACHE_LINE_SIZE) static_task_manager static_task_manager_; - alignas(base::system_details::CACHE_LINE_SIZE) thread_state thread_state_; -}; - -} -} -} - -#endif //PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ diff --git a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h index 6a468b6..3d31528 100644 --- a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h +++ b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h @@ -7,9 +7,7 @@ #include "pls/internal/base/error_handling.h" #include "pls/internal/base/system_details.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { struct task; struct traded_cas_field { @@ -82,7 +80,5 @@ struct traded_cas_field { }; } -} -} #endif //PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ diff --git a/lib/pls/src/internal/base/thread.cpp b/lib/pls/src/internal/base/thread.cpp deleted file mode 100644 index 04b564c..0000000 --- a/lib/pls/src/internal/base/thread.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "pls/internal/base/thread.h" - -namespace pls { -namespace internal { -namespace base { - -thread::thread() : - pthread_thread_{}, - running_{false} {} - -thread::~thread() { - if (running_) { - join(); - } -} - -thread::thread(thread &&other) noexcept : - pthread_thread_{other.pthread_thread_}, - running_{other.running_} { - other.running_ = false; -} - -thread &thread::operator=(thread &&other) noexcept { - this->pthread_thread_ = other.pthread_thread_; - this->running_ = other.running_; - - other.running_ = false; - - return *this; -} - -#ifdef PLS_THREAD_SPECIFIC_PTHREAD -pthread_key_t this_thread::local_storage_key_ = false; -bool this_thread::local_storage_key_initialized_; -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER -__thread void *this_thread::local_state_; -#endif - -void thread::join() { - pthread_join(pthread_thread_, nullptr); - running_ = false; -} - -} -} -} diff --git a/lib/pls/src/internal/scheduling/external_trading_deque.cpp b/lib/pls/src/internal/scheduling/external_trading_deque.cpp new file mode 100644 index 0000000..927528e --- /dev/null +++ b/lib/pls/src/internal/scheduling/external_trading_deque.cpp @@ -0,0 +1,136 @@ +#include "pls/internal/scheduling/external_trading_deque.h" + +namespace pls::internal::scheduling { + +optional external_trading_deque::peek_traded_object(task *target_task) { + traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); + if (current_cas.is_filled_with_object()) { + return optional{current_cas.get_trade_object()}; + } else { + return optional{}; + } +} + +optional external_trading_deque::get_trade_object(task *target_task) { + traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); + if (current_cas.is_filled_with_object()) { + task *result = current_cas.get_trade_object(); + traded_cas_field empty_cas; + if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { + return optional{result}; + } + } + + return optional{}; +} + +void external_trading_deque::push_bot(task *published_task) { + auto expected_stamp = bot_internal_.stamp; + auto ¤t_entry = entries_[bot_internal_.value]; + + // Publish the prepared task in the deque. + current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); + current_entry.traded_task_.store(published_task, std::memory_order_relaxed); + + // Field that all threads synchronize on. + // This happens not in the deque itself, but in the published task. + traded_cas_field sync_cas_field; + sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); + published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); + + // Advance the bot pointer. Linearization point for making the task public. + bot_internal_.stamp++; + bot_internal_.value++; + bot_.store(bot_internal_.value, std::memory_order_release); +} + +void external_trading_deque::reset_bot_and_top() { + bot_internal_.value = 0; + bot_internal_.stamp++; + + bot_.store(0); + top_.store({bot_internal_.stamp, 0}); +} + +void external_trading_deque::decrease_bot() { + bot_internal_.value--; + bot_.store(bot_internal_.value, std::memory_order_relaxed); +} + +optional external_trading_deque::pop_bot() { + if (bot_internal_.value == 0) { + reset_bot_and_top(); + return optional{}; + } + decrease_bot(); + + auto ¤t_entry = entries_[bot_internal_.value]; + auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); + auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); + + // We know what value must be in the cas field if no other thread stole it. + traded_cas_field expected_sync_cas_field; + expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); + traded_cas_field empty_cas_field; + + if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, + empty_cas_field, + std::memory_order_acq_rel)) { + return optional{popped_task}; + } else { + reset_bot_and_top(); + return optional{}; + } +} + +external_trading_deque::peek_result external_trading_deque::peek_top() { + auto local_top = top_.load(); + auto local_bot = bot_.load(); + + if (local_top.value < local_bot) { + return peek_result{optional{entries_[local_top.value].traded_task_}, local_top}; + } else { + return peek_result{optional{}, local_top}; + } +} + +optional external_trading_deque::pop_top(task *offered_task, peek_result peek_result) { + stamped_integer expected_top = peek_result.top_pointer_; + auto local_bot = bot_.load(); + if (expected_top.value >= local_bot) { + return data_structures::optional{}; + } + + auto &target_entry = entries_[expected_top.value]; + + // Read our potential result + task *result = target_entry.traded_task_.load(); + unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); + + // Try to get it by CAS with the expected field entry, giving up our offered_task for it + traded_cas_field expected_sync_cas_field; + expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_); + + traded_cas_field offered_field; + offered_field.fill_with_trade_object(offered_task); + + if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { + // We got it, for sure move the top pointer forward. + top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); + // Return the stolen task + return data_structures::optional{result}; + } else { + // We did not get it...help forwarding the top pointer anyway. + if (expected_top.stamp == forwarding_stamp) { + // ...move the pointer forward if someone else put a valid trade object in there. + top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); + } else { + // ...we failed because the top tag lags behind...try to fix it. + // This means only updating the tag, as this location can still hold data we need. + top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value}); + } + return data_structures::optional{}; + } +} + +} diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index f3d9773..318de19 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -2,36 +2,39 @@ #include "context_switcher/context_switcher.h" -#include "pls/internal/scheduling/task_manager.h" -#include "pls/internal/scheduling/thread_state.h" - #include "pls/internal/base/thread.h" #include "pls/internal/base/error_handling.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { -scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, bool reuse_thread) : +scheduler::scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread) : num_threads_{num_threads}, reuse_thread_{reuse_thread}, - memory_{memory}, sync_barrier_{num_threads + 1 - reuse_thread}, + worker_threads_{}, + thread_states_{}, + main_thread_starter_function_{nullptr}, + work_section_done_{false}, terminated_{false} { - if (num_threads_ > memory.max_threads()) { - PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); - } + worker_threads_.reserve(num_threads); + task_managers_.reserve(num_threads); + thread_states_.reserve(num_threads); for (unsigned int i = 0; i < num_threads_; i++) { - // Placement new is required, as the memory of `memory_` is not required to be initialized. - memory.thread_state_for(i).set_scheduler(this); - memory.thread_state_for(i).set_id(i); - memory.thread_state_for(i).get_task_manager().set_thread_id(i); + auto &this_task_manager = + task_managers_.emplace_back(std::make_unique(i, computation_depth, stack_size, stack_allocator_)); + auto &this_thread_state = thread_states_.emplace_back(std::make_unique(*this, i, *this_task_manager)); if (reuse_thread && i == 0) { + worker_threads_.emplace_back(); continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. } - memory.thread_for(i) = base::thread(&scheduler::work_thread_main_loop, &memory_.thread_state_for(i)); + + auto *this_thread_state_pointer = this_thread_state.get(); + worker_threads_.emplace_back([this_thread_state_pointer] { + thread_state::set(this_thread_state_pointer); + work_thread_main_loop(); + }); } } @@ -63,7 +66,7 @@ void scheduler::work_thread_work_section() { auto const num_threads = my_state.get_scheduler().num_threads(); - if (my_state.get_id() == 0) { + if (my_state.get_thread_id() == 0) { // Main Thread, kick off by executing the user's main code block. main_thread_starter_function_->run(); } @@ -74,7 +77,7 @@ void scheduler::work_thread_work_section() { // TODO: move steal routine into separate function const size_t target = my_state.get_rand() % num_threads; - if (target == my_state.get_id()) { + if (target == my_state.get_thread_id()) { continue; } @@ -134,16 +137,12 @@ void scheduler::terminate() { if (reuse_thread_ && i == 0) { continue; } - memory_.thread_for(i).join(); + worker_threads_[i].join(); } } -thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); } - void scheduler::sync() { thread_state::get().get_task_manager().sync(); } } -} -} diff --git a/lib/pls/src/internal/scheduling/task_manager.cpp b/lib/pls/src/internal/scheduling/task_manager.cpp index c0c7e12..245b41f 100644 --- a/lib/pls/src/internal/scheduling/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/task_manager.cpp @@ -2,31 +2,38 @@ #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/scheduler.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { -task_manager::task_manager(task *tasks, - data_structures::aligned_stack static_stack_space, +task_manager::task_manager(unsigned thread_id, size_t num_tasks, size_t stack_size, - external_trading_deque &deque) : num_tasks_{num_tasks}, - this_thread_tasks_{tasks}, - active_task_{&tasks[0]}, - deque_{deque} { + stack_allocator &stack_allocator) : num_tasks_{num_tasks}, + stack_allocator_{stack_allocator}, + tasks_{}, + deque_{thread_id, num_tasks_} { + tasks_.reserve(num_tasks); + for (size_t i = 0; i < num_tasks - 1; i++) { - tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0); + char *stack_memory = stack_allocator.allocate_stack(stack_size); + tasks_.emplace_back(std::make_unique(stack_memory, stack_size, i, thread_id)); + if (i > 0) { - tasks[i].prev_ = &tasks[i - 1]; - } - if (i < num_tasks - 2) { - tasks[i].next_ = &tasks[i + 1]; + tasks_[i - 1]->next_ = tasks_[i].get(); + tasks_[i]->prev_ = tasks_[i - 1].get(); } } + active_task_ = tasks_[0].get(); +} + +task_manager::~task_manager() { + for (auto &task : tasks_) { + stack_allocator_.free_stack(task->stack_size_, task->stack_memory_); + } } -static task *find_task(unsigned id, unsigned depth) { +static task &find_task(unsigned id, unsigned depth) { return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth); } @@ -49,7 +56,7 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) { traded_task->next_ = nullptr; // perform the actual pop operation - auto pop_result_task = deque_.pop_top(traded_task, peek.top_pointer_); + auto pop_result_task = deque_.pop_top(traded_task, peek); if (pop_result_task) { PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_, "It is impossible to steal an task we already own!"); @@ -90,8 +97,8 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha spare_task_chain->resource_stack_next_.store(nullptr); } else { // Already an entry. Find it's corresponding task and set it as our successor. - auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); - spare_task_chain->resource_stack_next_.store(current_root_task); + auto ¤t_root_task = find_task(current_root.value - 1, target_task->depth_); + spare_task_chain->resource_stack_next_.store(¤t_root_task); } } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); @@ -108,13 +115,13 @@ task *task_manager::pop_resource_from_task(task *target_task) { return nullptr; } else { // Found something, try to pop it - auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); - auto *next_stack_task = current_root_task->resource_stack_next_.load(); + auto ¤t_root_task = find_task(current_root.value - 1, target_task->depth_); + auto *next_stack_task = current_root_task.resource_stack_next_.load(); target_root.stamp = current_root.stamp + 1; target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; - output_task = current_root_task; + output_task = ¤t_root_task; } } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); @@ -233,5 +240,3 @@ bool task_manager::check_task_chain() { } } -} -} diff --git a/lib/pls/src/internal/scheduling/thread_state.cpp b/lib/pls/src/internal/scheduling/thread_state.cpp index fe70562..85d1c94 100644 --- a/lib/pls/src/internal/scheduling/thread_state.cpp +++ b/lib/pls/src/internal/scheduling/thread_state.cpp @@ -1,12 +1,10 @@ #include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/base/thread.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { -thread_state &thread_state::get() { return *base::this_thread::state(); } +thread_local thread_state *my_thread_state{nullptr}; + +thread_state &thread_state::get() { return *my_thread_state; } +void thread_state::set(thread_state *new_state) { my_thread_state = new_state; } -} -} } diff --git a/test/base_tests.cpp b/test/base_tests.cpp index dafdfca..03a5d33 100644 --- a/test/base_tests.cpp +++ b/test/base_tests.cpp @@ -1,38 +1,17 @@ #include -#include "pls/internal/base/thread.h" + #include "pls/internal/base/spin_lock.h" #include "pls/internal/base/system_details.h" #include #include +#include using namespace pls::internal::base; -using namespace std; static bool base_tests_visited; static int base_tests_local_value_one; -static vector base_tests_local_value_two; - -TEST_CASE("thread creation and joining", "[internal/data_structures/thread.h]") { - base_tests_visited = false; - thread t1{[]() { base_tests_visited = true; }}; - t1.join(); - - REQUIRE(base_tests_visited); -} - -TEST_CASE("thread state", "[internal/data_structures/thread.h]") { - int state_one = 1; - vector state_two{1, 2}; - - thread t1{[]() { base_tests_local_value_one = *this_thread::state(); }, &state_one}; - thread t2{[]() { base_tests_local_value_two = *this_thread::state>(); }, &state_two}; - t1.join(); - t2.join(); - - REQUIRE(base_tests_local_value_one == 1); - REQUIRE(base_tests_local_value_two == vector{1, 2}); -} +static std::vector base_tests_local_value_two; int base_tests_shared_counter; @@ -42,14 +21,14 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi spin_lock lock{}; SECTION("lock can be used by itself") { - thread t1{[&]() { + std::thread t1{[&]() { for (int i = 0; i < num_iterations; i++) { lock.lock(); base_tests_shared_counter++; lock.unlock(); } }}; - thread t2{[&]() { + std::thread t2{[&]() { for (int i = 0; i < num_iterations; i++) { lock.lock(); base_tests_shared_counter--; @@ -64,13 +43,13 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi } SECTION("lock can be used with std::lock_guard") { - thread t1{[&]() { + std::thread t1{[&]() { for (int i = 0; i < num_iterations; i++) { std::lock_guard my_lock{lock}; base_tests_shared_counter++; } }}; - thread t2{[&]() { + std::thread t2{[&]() { for (int i = 0; i < num_iterations; i++) { std::lock_guard my_lock{lock}; base_tests_shared_counter--; diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index ce82c9d..c192d6f 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -1,13 +1,42 @@ #include #include +#include #include "pls/internal/scheduling/traded_cas_field.h" #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/external_trading_deque.h" +#include "pls/internal/scheduling/scheduler.h" using namespace pls::internal::scheduling; +constexpr int MAX_NUM_THREADS = 8; +constexpr int MAX_NUM_TASKS = 32; +constexpr int MAX_STACK_SIZE = 1024 * 8; + +TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling/scheduler.h]") { + scheduler scheduler{3, MAX_NUM_TASKS, MAX_STACK_SIZE}; + + std::atomic num_run{0}; + scheduler.perform_work([&] { + scheduler::spawn([&] { + num_run++; + while (num_run < 3); + }); + scheduler::spawn([&] { + while (num_run < 1); + num_run++; + while (num_run < 3); + }); + scheduler::spawn([&] { + while (num_run < 2); + num_run++; + }); + scheduler::sync(); + }); + REQUIRE(true); +} + TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas_field.h]") { traded_cas_field empty_field; REQUIRE(empty_field.is_empty()); @@ -24,7 +53,7 @@ TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas REQUIRE(tag_field.get_stamp() == stamp); REQUIRE(tag_field.get_deque_id() == ID); - alignas(64) task obj; + alignas(64) task obj{nullptr, 0, 0, 0}; traded_cas_field obj_field; obj_field.fill_with_trade_object(&obj); REQUIRE(obj_field.is_filled_with_object()); @@ -33,15 +62,13 @@ TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas } TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque]") { - static_external_trading_deque<16> static_external_trading_deque_1; - external_trading_deque &deque_1 = static_external_trading_deque_1.get_deque(); - deque_1.set_thread_id(1); - - static_external_trading_deque<16> static_external_trading_deque_2; - external_trading_deque &deque_2 = static_external_trading_deque_2.get_deque(); - deque_2.set_thread_id(2); + external_trading_deque deque_1{1, 16}; + external_trading_deque deque_2{2, 16}; - std::vector tasks(16); + task tasks[4] = {{nullptr, 0, 0, 0}, + {nullptr, 0, 1, 0}, + {nullptr, 0, 2, 0}, + {nullptr, 0, 3, 0}}; SECTION("basic operations") { // Must start empty @@ -56,9 +83,9 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque // Local push, external pop deque_1.push_bot(&tasks[0]); auto peek = deque_1.peek_top(); - REQUIRE(*deque_1.pop_top(&tasks[1], peek.top_pointer_) == &tasks[0]); + REQUIRE(*deque_1.pop_top(&tasks[1], peek) == &tasks[0]); REQUIRE(*external_trading_deque::get_trade_object(&tasks[0]) == &tasks[1]); - REQUIRE(!deque_1.pop_top(&tasks[1], peek.top_pointer_)); + REQUIRE(!deque_1.pop_top(&tasks[1], peek)); REQUIRE(!deque_1.pop_bot()); // Keeps push/pop order @@ -71,9 +98,9 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque deque_1.push_bot(&tasks[0]); deque_1.push_bot(&tasks[1]); auto peek1 = deque_1.peek_top(); - REQUIRE(*deque_1.pop_top(&tasks[2], peek1.top_pointer_) == &tasks[0]); + REQUIRE(*deque_1.pop_top(&tasks[2], peek1) == &tasks[0]); auto peek2 = deque_1.peek_top(); - REQUIRE(*deque_1.pop_top(&tasks[3], peek2.top_pointer_) == &tasks[1]); + REQUIRE(*deque_1.pop_top(&tasks[3], peek2) == &tasks[1]); } SECTION("Interwined execution #1") { @@ -81,8 +108,8 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque deque_1.push_bot(&tasks[0]); auto peek1 = deque_1.peek_top(); auto peek2 = deque_1.peek_top(); - REQUIRE(*deque_1.pop_top(&tasks[1], peek1.top_pointer_) == &tasks[0]); - REQUIRE(!deque_1.pop_top(&tasks[2], peek2.top_pointer_)); + REQUIRE(*deque_1.pop_top(&tasks[1], peek1) == &tasks[0]); + REQUIRE(!deque_1.pop_top(&tasks[2], peek2)); } SECTION("Interwined execution #2") { @@ -90,6 +117,6 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque deque_1.push_bot(&tasks[0]); auto peek1 = deque_1.peek_top(); REQUIRE(*deque_1.pop_bot() == &tasks[0]); - REQUIRE(!deque_1.pop_top(&tasks[2], peek1.top_pointer_)); + REQUIRE(!deque_1.pop_top(&tasks[2], peek1)); } } -- libgit2 0.26.0