diff --git a/NOTES.md b/NOTES.md index f7793f2..e761d94 100644 --- a/NOTES.md +++ b/NOTES.md @@ -4,6 +4,16 @@ The new version of pls uses a more complicated/less user friendly API in favor of performance and memory guarantees. For the old version refer to the second half of this document. +# 18.03.2020 - C++17 benefit, overaligned new + +We have many cache line aligned types. Before C++17 they can +only be allocated with correct alignment using alignas(...) +when they are stored in static storage (e.g. global memory, stack memory). + +As we move towards more modern RAII type resource management, the fact that +[C++ 17 supports alignment of arbitrary length, even in vectors](https://www.bfilipek.com/2019/08/newnew-align.html) +is extremely helpful. Before we needed wrappers to do aligned heap allocations. + # 18.03.2020 - Coding Standard/C++ Standard We previously stuck to strict 'static' allocation in global static diff --git a/app/playground/CMakeLists.txt b/app/playground/CMakeLists.txt index 018b19b..34b4ede 100644 --- a/app/playground/CMakeLists.txt +++ b/app/playground/CMakeLists.txt @@ -3,4 +3,4 @@ add_executable(playground main.cpp) # Example for adding the library to your app (as a cmake project dependency) -target_link_libraries(playground context_switcher Threads::Threads) +target_link_libraries(playground pls context_switcher Threads::Threads) diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 93bf83f..2ca61ee 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -1,52 +1,41 @@ -#include -#include -#include +#include +#include +#include +#include +#include -#include "barrier.h" +#include +#include "tsan_support.h" -#include "context_switcher/context_switcher.h" - -using namespace context_switcher; using namespace std; -// Memory for custom stack and continuation semantics -const size_t STACK_SIZE = 512 * 32; -const size_t NUM_STACKS = 4; -char custom_stacks[NUM_STACKS][STACK_SIZE]; +long count_memory_mappings() { + pid_t my_pid = getpid(); + ifstream proc_file{"/proc/" + to_string(my_pid) + "/maps"}; + + string line; + long line_count{0}; + while (getline(proc_file, line)) { + line_count++; + } + + return line_count; +} int main() { - context_switcher::continuation cont_t1, cont_main; - barrier bar{2}; - int error = 0; - - auto t1 = std::thread([&]() { - while (true) { - bar.wait(); - auto cont = enter_context(custom_stacks[0], STACK_SIZE, [&](continuation &&cont) { - error++; - cont_t1 = std::move(cont); - bar.wait(); - error++; - return std::move(cont_main); - }); - } - }); + mutex mut; int count = 0; - while (true) { - count++; - if (count % 100 == 0) { - printf("%d\n", count); - } - bar.wait(); - auto cont = enter_context(custom_stacks[1], STACK_SIZE, [&](continuation &&cont) { - error++; - cont_main = std::move(cont); - bar.wait(); - error++; - return std::move(cont_t1); - }); + printf("iteration: %d, mappings: %ld\n", count++, count_memory_mappings()); + void *main_fiber = __tsan_get_current_fiber(); + void *other_fiber = __tsan_create_fiber(0); + __tsan_switch_to_fiber(other_fiber, 0); + mut.lock(); + mut.unlock(); + __tsan_switch_to_fiber(main_fiber, 0); + __tsan_destroy_fiber(other_fiber); + } return 0; diff --git a/app/playground/tsan_support.h b/app/playground/tsan_support.h new file mode 100644 index 0000000..a5f60a0 --- /dev/null +++ b/app/playground/tsan_support.h @@ -0,0 +1,21 @@ + +#ifndef CONTEXT_SWITCHER_TSAN_SUPPORT +#define CONTEXT_SWITCHER_TSAN_SUPPORT + +extern "C" { +// Fiber switching API. +// - TSAN context for fiber can be created by __tsan_create_fiber +// and freed by __tsan_destroy_fiber. +// - TSAN context of current fiber or thread can be obtained +// by calling __tsan_get_current_fiber. +// - __tsan_switch_to_fiber should be called immediatly before switch +// to fiber, such as call of swapcontext. +// - Fiber name can be set by __tsan_set_fiber_name. +void *__tsan_get_current_fiber(void); +void *__tsan_create_fiber(unsigned flags); +void __tsan_destroy_fiber(void *fiber); +void __tsan_switch_to_fiber(void *fiber, unsigned flags); +void __tsan_set_fiber_name(void *fiber, const char *name); +}; + +#endif //CONTEXT_SWITCHER_TSAN_SUPPORT diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index f7f8232..4d53e8e 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -4,17 +4,18 @@ add_library(pls STATIC include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp include/pls/internal/base/swmr_spin_lock.h src/internal/base/swmr_spin_lock.cpp - include/pls/internal/base/thread.h src/internal/base/thread.cpp - include/pls/internal/base/thread_impl.h include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/system_details.h - include/pls/internal/base/error_handling.h + include/pls/internal/base/error_handling.h src/internal/base/error_handling.cpp include/pls/internal/base/alignment.h src/internal/base/alignment.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/stamped_integer.h include/pls/internal/data_structures/delayed_initialization.h + include/pls/internal/data_structures/bounded_trading_deque.h + include/pls/internal/data_structures/bounded_ws_deque.h + include/pls/internal/data_structures/optional.h include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/profiler.h @@ -24,23 +25,14 @@ add_library(pls STATIC include/pls/internal/helpers/seqence.h include/pls/internal/helpers/member_function.h - include/pls/internal/scheduling/thread_state.h + include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h - include/pls/internal/scheduling/scheduler_memory.h - include/pls/internal/scheduling/task_manager.h + include/pls/internal/scheduling/task_manager.h src/internal/scheduling/task_manager.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp - include/pls/internal/data_structures/bounded_ws_deque.h - include/pls/internal/data_structures/optional.h - include/pls/internal/scheduling/thread_state_static.h - src/internal/base/error_handling.cpp - include/pls/internal/data_structures/bounded_trading_deque.h - include/pls/internal/scheduling/external_trading_deque.h + include/pls/internal/scheduling/external_trading_deque.h src/internal/scheduling/external_trading_deque.cpp include/pls/internal/scheduling/traded_cas_field.h - include/pls/internal/scheduling/task_manager_impl.h - include/pls/internal/scheduling/static_scheduler_memory.h - include/pls/internal/scheduling/heap_scheduler_memory.h - src/internal/scheduling/task_manager.cpp src/internal/scheduling/thread_state.cpp) + include/pls/internal/scheduling/task_manager_impl.h) # Dependencies for pls target_link_libraries(pls Threads::Threads) diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h index 222d127..df7b45b 100644 --- a/lib/pls/include/pls/internal/base/system_details.h +++ b/lib/pls/include/pls/internal/base/system_details.h @@ -16,9 +16,7 @@ #include -namespace pls { -namespace internal { -namespace base { +namespace pls::internal::base { /** * Collection of system details, e.g. hardware cache line size. @@ -47,6 +45,11 @@ constexpr unsigned long CAS_SIZE = sizeof(cas_integer) * 8; constexpr pointer_t CACHE_LINE_SIZE = 64; /** + * Helper to align types/values on cache lines. + */ +#define PLS_CACHE_ALIGN alignas(base::system_details::CACHE_LINE_SIZE) + +/** * Choose one of the following ways to store thread specific data. * Try to choose the fastest available on this processor/system. */ @@ -87,7 +90,5 @@ inline void relax_cpu() { } } -} -} #endif //PLS_SYSTEM_DETAILS_H diff --git a/lib/pls/include/pls/internal/base/thread.h b/lib/pls/include/pls/internal/base/thread.h deleted file mode 100644 index 5bbdf6d..0000000 --- a/lib/pls/include/pls/internal/base/thread.h +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Abstraction for threading to allow porting. - * Currently using either pthread or C++ 11 threads. - */ - -#ifndef PLS_THREAD_H -#define PLS_THREAD_H - -#include -#include -#include -#include - -#include "system_details.h" - -namespace pls { -namespace internal { -namespace base { - -using thread_entrypoint = void(); - -/** - * Static methods than can be performed on the current thread. - * - * usage: - * this_thread::yield(); - * T* state = this_thread::state(); - * - * PORTABILITY: - * Current implementation is based on pthreads. - */ -class this_thread { - friend - class thread; -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - static pthread_key_t local_storage_key_; - static bool local_storage_key_initialized_; -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER - static __thread void *local_state_; -#endif - public: - static void yield() { - pthread_yield(); - } - - static void sleep(long microseconds) { - timespec time{0, 1000 * microseconds}; - nanosleep(&time, nullptr); - } - - /** - * Retrieves the local state pointer. - * - * @tparam T The type of the state that is stored. - * @return The state pointer hold for this thread. - */ - template - static T *state(); - - /** - * Stores a pointer to the thread local state object. - * The memory management for this has to be done by the user, - * we only keep the pointer. - * - * @tparam T The type of the state that is stored. - * @param state_pointer A pointer to the threads state object. - */ - template - static void set_state(T *state_pointer); -}; - -/** - * Abstraction for starting a function in a separate thread. - * Offers only threading functionality needed in this project, - * underlying implementation can be changed. - * Uses NO heap memory allocation. - * - * PORTABILITY: - * Current implementation is based on pthreads. - */ -class thread { - friend class this_thread; - // Keep handle to native implementation - pthread_t pthread_thread_; - bool running_; - - template - static void *start_pthread_internal(void *thread_pointer); - - public: - template - explicit thread(const Function &function, State *state_pointer); - - template - explicit thread(const Function &function); - - explicit thread(); - ~thread(); - - void join(); - - // make object move only - thread(thread &&) noexcept; - thread &operator=(thread &&) noexcept; - - thread(const thread &) = delete; - thread &operator=(const thread &) = delete; -}; - -} -} -} -#include "thread_impl.h" - -#endif //PLS_THREAD_H diff --git a/lib/pls/include/pls/internal/base/thread_impl.h b/lib/pls/include/pls/internal/base/thread_impl.h deleted file mode 100644 index ccde5a8..0000000 --- a/lib/pls/include/pls/internal/base/thread_impl.h +++ /dev/null @@ -1,85 +0,0 @@ - -#ifndef PLS_THREAD_IMPL_H -#define PLS_THREAD_IMPL_H - -namespace pls { -namespace internal { -namespace base { - -template -T *this_thread::state() { -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - return reinterpret_cast(pthread_getspecific(local_storage_key_)); -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER - return reinterpret_cast(local_state_); -#endif -} - -template -void this_thread::set_state(T *state_pointer) { -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer); -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER - local_state_ = state_pointer; -#endif -} - -template -struct thread_arguments { - Function function_; - State *state_; - std::atomic_flag *startup_flag_; -}; - -template -void *thread::start_pthread_internal(void *thread_pointer) { - // Actively copy all arguments into stack memory. - thread_arguments - arguments_copy = *reinterpret_cast *>(thread_pointer); - - // Now we have copies of everything we need on the stack. - // The original thread object can be moved freely (no more - // references to its memory location). - arguments_copy.startup_flag_->clear(); - - this_thread::set_state(arguments_copy.state_); - arguments_copy.function_(); - - // Finished executing the user function - pthread_exit(nullptr); -} - -template -thread::thread(const Function &function, State *state_pointer): - pthread_thread_{}, - running_{true} { - -#ifdef PLS_THREAD_SPECIFIC_PTHREAD - if (!this_thread::local_storage_key_initialized_) { - pthread_key_create(&this_thread::local_storage_key_, nullptr); - this_thread::local_storage_key_initialized_ = true; - } -#endif - - // Wee need to wait for the started function to read - // the function_ and state_pointer_ property before returning - // from the constructor, as the object might be moved after this. - std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; - - thread_arguments arguments{function, state_pointer, &startup_flag}; - - startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return - pthread_create(&pthread_thread_, nullptr, start_pthread_internal < Function, State > , (void *) (&arguments)); - while (startup_flag.test_and_set()); // Busy waiting for the starting flag to clear -} - -template -thread::thread(const Function &function): thread{function, (void *) nullptr} {} - -} -} -} - -#endif //PLS_THREAD_IMPL_H diff --git a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h index 4fc6b1e..e873fe1 100644 --- a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h +++ b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h @@ -4,7 +4,7 @@ #include #include -#include +#include #include "pls/internal/base/error_handling.h" #include "pls/internal/base/system_details.h" @@ -15,9 +15,7 @@ #include "pls/internal/scheduling/traded_cas_field.h" #include "pls/internal/scheduling/task.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { using namespace data_structures; @@ -38,35 +36,11 @@ struct trading_deque_entry { * As each task is associated with memory this suffices to exchange memory blocks needed for execution. */ class external_trading_deque { - public: - external_trading_deque(trading_deque_entry *entries, size_t num_entries) : - entries_{entries}, num_entries_{num_entries} {}; - void set_thread_id(unsigned id) { - thread_id_ = id; - } - - static optional peek_traded_object(task *target_task) { - traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); - if (current_cas.is_filled_with_object()) { - return optional{current_cas.get_trade_object()}; - } else { - return optional{}; - } - } - - static optional get_trade_object(task *target_task) { - traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); - if (current_cas.is_filled_with_object()) { - task *result = current_cas.get_trade_object(); - traded_cas_field empty_cas; - if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { - return optional{result}; - } - } - - return optional{}; - } + external_trading_deque(unsigned thread_id, size_t num_entries) : thread_id_(thread_id), entries_(num_entries) {} + + static optional peek_traded_object(task *target_task); + static optional get_trade_object(task *target_task); /** * Pushes a task on the bottom of the deque. @@ -74,69 +48,14 @@ class external_trading_deque { * * @param published_task The task to publish on the bottom of the deque. */ - void push_bot(task *published_task) { - auto expected_stamp = bot_internal_.stamp; - auto ¤t_entry = entries_[bot_internal_.value]; - - // Publish the prepared task in the deque. - current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); - current_entry.traded_task_.store(published_task, std::memory_order_relaxed); - - // Field that all threads synchronize on. - // This happens not in the deque itself, but in the published task. - traded_cas_field sync_cas_field; - sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); - published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); - - // Advance the bot pointer. Linearization point for making the task public. - bot_internal_.stamp++; - bot_internal_.value++; - bot_.store(bot_internal_.value, std::memory_order_release); - } - - void reset_bot_and_top() { - bot_internal_.value = 0; - bot_internal_.stamp++; - - bot_.store(0); - top_.store({bot_internal_.stamp, 0}); - } - - void decrease_bot() { - bot_internal_.value--; - bot_.store(bot_internal_.value, std::memory_order_relaxed); - } + void push_bot(task *published_task); /** * Tries to pop the last task on the deque. * * @return optional holding the popped task if successful. */ - optional pop_bot() { - if (bot_internal_.value == 0) { - reset_bot_and_top(); - return optional{}; - } - decrease_bot(); - - auto ¤t_entry = entries_[bot_internal_.value]; - auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); - auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); - - // We know what value must be in the cas field if no other thread stole it. - traded_cas_field expected_sync_cas_field; - expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); - traded_cas_field empty_cas_field; - - if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, - empty_cas_field, - std::memory_order_acq_rel)) { - return optional{popped_task}; - } else { - reset_bot_and_top(); - return optional{}; - } - } + optional pop_bot(); struct peek_result { peek_result(optional top_task, stamped_integer top_pointer) : top_task_{std::move(top_task)}, @@ -153,82 +72,35 @@ class external_trading_deque { * * @return a peek result containing the optional top task (if present) and the current head pointer. */ - peek_result peek_top() { - auto local_top = top_.load(); - auto local_bot = bot_.load(); - - if (local_top.value < local_bot) { - return peek_result{optional{entries_[local_top.value].traded_task_}, local_top}; - } else { - return peek_result{optional{}, local_top}; - } - } - - optional pop_top(task *offered_task, stamped_integer expected_top) { - auto local_bot = bot_.load(); - if (expected_top.value >= local_bot) { - return data_structures::optional{}; - } - - auto &target_entry = entries_[expected_top.value]; - - // Read our potential result - task *result = target_entry.traded_task_.load(); - unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); - - // Try to get it by CAS with the expected field entry, giving up our offered_task for it - traded_cas_field expected_sync_cas_field; - expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_); - - traded_cas_field offered_field; - offered_field.fill_with_trade_object(offered_task); - - if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { - // We got it, for sure move the top pointer forward. - top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); - // Return the stolen task - return data_structures::optional{result}; - } else { - // We did not get it...help forwarding the top pointer anyway. - if (expected_top.stamp == forwarding_stamp) { - // ...move the pointer forward if someone else put a valid trade object in there. - top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); - } else { - // ...we failed because the top tag lags behind...try to fix it. - // This means only updating the tag, as this location can still hold data we need. - top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value}); - } - return data_structures::optional{}; - } - } + peek_result peek_top(); + + /** + * Tries to pop the task on top of the deque that was + * previously observed by 'peeking' at the deque. + * + * Returns the task if successful, returns nothing if + * either the peeked task is no longer at the top of the deque + * or another thread interfered and 'won' the task. + * + * @return optional holding the popped task if successful. + */ + optional pop_top(task *offered_task, peek_result peek_result); private: + void reset_bot_and_top(); + void decrease_bot(); + // info on this deque - trading_deque_entry *const entries_; - const size_t num_entries_; unsigned thread_id_; + std::vector entries_; // fields for stealing/interacting stamped_integer bot_internal_{0, 0}; - alignas(base::system_details::CACHE_LINE_SIZE) std::atomic top_{{0, 0}}; - alignas(base::system_details::CACHE_LINE_SIZE) std::atomic bot_{0}; - -}; - -template -class static_external_trading_deque { - public: - static_external_trading_deque() : items_{}, deque_{items_.data(), SIZE} {} - - external_trading_deque &get_deque() { return deque_; } - private: - std::array items_; - external_trading_deque deque_; + PLS_CACHE_ALIGN std::atomic top_{{0, 0}}; + PLS_CACHE_ALIGN std::atomic bot_{0}; }; } -} -} #endif //PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 5cb8f81..76ebc6d 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -3,18 +3,15 @@ #define PLS_SCHEDULER_H #include +#include +#include #include "pls/internal/helpers/profiler.h" - -#include "pls/internal/base/thread.h" #include "pls/internal/base/barrier.h" - -#include "pls/internal/scheduling/scheduler_memory.h" #include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/task_manager.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { struct task; @@ -29,16 +26,20 @@ class scheduler { public: /** * Initializes a scheduler instance with the given number of threads. - * This will spawn the threads and put them to sleep, ready to process an - * upcoming parallel section. + * This will allocate ALL runtime resources, spawn the worker threads + * and put them to sleep, ready to process an upcoming parallel section. + * + * The initialization should be seen as a heavy and not very predictable operation. + * After it is done the scheduler must (if configured correctly) never run out of resources + * and deliver tight time bounds of randomized work-stealing. * - * @param memory All memory is allocated statically, thus the user is required to provide the memory instance. * @param num_threads The number of worker threads to be created. */ - explicit scheduler(scheduler_memory &memory, unsigned int num_threads, bool reuse_thread = true); + explicit scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread = true); /** * The scheduler is implicitly terminated as soon as it leaves the scope. + * Resources follow a clean RAII style. */ ~scheduler(); @@ -53,18 +54,32 @@ class scheduler { template void perform_work(Function work_section); + /** + * Main parallelism construct, spawns a function for potential parallel execution. + * + * The result of the spawned function must not be relied on until sync() is called. + * Best see the lambda as if executed on a thread, e.g. it can cause race conditions + * and it is only finished after you join it back into the parent thread using sync(). + * + * @param lambda the lambda to be executed in parallel. + */ template static void spawn(Function &&lambda); + + /** + * Waits for all potentially parallel child tasks created with spawn(...). + */ static void sync(); - thread_state &thread_state_for(size_t id); + thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; } + task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; } /** * Explicitly terminate the worker threads. Scheduler must not be used after this. */ void terminate(); - unsigned int num_threads() const { return num_threads_; } + [[nodiscard]] unsigned int num_threads() const { return num_threads_; } private: static void work_thread_main_loop(); @@ -72,10 +87,12 @@ class scheduler { const unsigned int num_threads_; const bool reuse_thread_; - scheduler_memory &memory_; - base::barrier sync_barrier_; + std::vector worker_threads_; + std::vector> task_managers_; + std::vector> thread_states_; + class init_function; template class init_function_impl; @@ -84,11 +101,12 @@ class scheduler { std::atomic work_section_done_; bool terminated_; + + // TODO: remove this into a public wrapper class with templating + heap_stack_allocator stack_allocator_{}; }; } -} -} #include "scheduler_impl.h" #endif //PLS_SCHEDULER_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 9510505..158c0c7 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -12,9 +12,7 @@ #include "pls/internal/helpers/profiler.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { class scheduler::init_function { public: @@ -47,8 +45,8 @@ void scheduler::perform_work(Function work_section) { work_section_done_ = false; if (reuse_thread_) { - auto &my_state = memory_.thread_state_for(0); - base::this_thread::set_state(&my_state); // Make THIS THREAD become the main worker + auto &my_state = thread_state_for(0); + thread_state::set(&my_state); // Make THIS THREAD become the main worker sync_barrier_.wait(); // Trigger threads to wake up work_thread_work_section(); // Simply also perform the work section on the main loop @@ -66,7 +64,5 @@ void scheduler::spawn(Function &&lambda) { } } -} -} #endif //PLS_SCHEDULER_IMPL_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h deleted file mode 100644 index f07ffca..0000000 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef PLS_SCHEDULER_MEMORY_H -#define PLS_SCHEDULER_MEMORY_H - -#include "pls/internal/base/thread.h" -#include "pls/internal/scheduling/thread_state.h" - -namespace pls { -namespace internal { -namespace scheduling { - -// TODO: This way to handle memory is kind of a mess. We reworked it once...maybe it needs a second visit -// especially with the new 'stagered stack' (mmap allocs with page faults would be nice). -class scheduler_memory { - // Note: scheduler_memory is a pure interface and has no data. - // By not having an initialization routine we can do our 'static and heap specialization' - // without running into any ordering problems in the initialization sequence. - - protected: - thread_state **thread_states_array_{nullptr}; - - public: - virtual size_t max_threads() const = 0; - virtual base::thread &thread_for(size_t id) = 0; - thread_state &thread_state_for(size_t id) { - return *thread_states_array_[id]; - } -}; - -} -} -} - -#endif //PLS_SCHEDULER_MEMORY_H diff --git a/lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h deleted file mode 100644 index f19adb4..0000000 --- a/lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef PLS_STATIC_SCHEDULER_MEMORY_H -#define PLS_STATIC_SCHEDULER_MEMORY_H - -#include "pls/internal/base/thread.h" - -#include "pls/internal/scheduling/scheduler_memory.h" -#include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/scheduling/thread_state_static.h" - -namespace pls { -namespace internal { -namespace scheduling { - -template -class static_scheduler_memory : public scheduler_memory { - public: - static_scheduler_memory() : scheduler_memory{} { - for (size_t i = 0; i < MAX_THREADS; i++) { - thread_state_pointers_[i] = &thread_states_[i].get_thread_state(); - } - thread_states_array_ = thread_state_pointers_.data(); - } - - size_t max_threads() const override { - return MAX_THREADS; - } - - base::thread &thread_for(size_t id) override { - return threads_[id]; - } - private: - using thread_state_type = thread_state_static; - - alignas(base::system_details::CACHE_LINE_SIZE) std::array threads_; - alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_states_; - alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_state_pointers_; -}; - -} -} -} - -#endif // PLS_STATIC_SCHEDULER_MEMORY_H diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index c8696a8..32888fd 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -11,10 +11,7 @@ #include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/scheduling/traded_cas_field.h" -namespace pls { -namespace internal { -namespace scheduling { - +namespace pls::internal::scheduling { /** * A task is the smallest unit of execution seen by the runtime system. * @@ -27,16 +24,23 @@ namespace scheduling { * - running (currently executing user code) * - suspended (suspended by switching to a different task). */ -struct alignas(base::system_details::CACHE_LINE_SIZE) task { - void init(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) { - stack_memory_ = stack_memory; - stack_size_ = stack_size; - - depth_ = depth; - thread_id_ = thread_id; +struct PLS_CACHE_ALIGN task { + task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : + stack_memory_{stack_memory}, + stack_size_{stack_size}, + is_synchronized_{false}, + depth_{depth}, + thread_id_{thread_id}, + prev_{nullptr}, + next_{nullptr} {} - is_synchronized_ = false; - } + // Do not allow accidental copy/move operations. + // The whole runtime relies on tasks never changing memory positions during execution. + // Create tasks ONCE and use them until the runtime is shut down. + task(const task &other) = delete; + task(task &&other) = delete; + task &operator=(const task &other) = delete; + task &operator=(task &&other) = delete; template context_switcher::continuation run_as_task(F &&lambda) { @@ -46,7 +50,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { // TODO: Proper access control and split it up into responsibilities // Stack/Continuation Management char *stack_memory_; - size_t stack_size_; // TODO: maybe remove it, not needed in here + size_t stack_size_; context_switcher::continuation continuation_; bool is_synchronized_; @@ -66,7 +70,5 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { }; } -} -} #endif //PLS_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 8b473a6..e02942d 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -11,9 +11,19 @@ #include "pls/internal/data_structures/aligned_stack.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { + +class stack_allocator { + public: + virtual char *allocate_stack(size_t size) = 0; + virtual void free_stack(size_t size, char *stack) = 0; +}; + +class heap_stack_allocator : public stack_allocator { + public: + char *allocate_stack(size_t size) override { return new char[size]; } + void free_stack(size_t, char *stack) override { delete[] stack; } +}; /** * Handles management of tasks in the system. Each thread has a local task manager, @@ -21,24 +31,17 @@ namespace scheduling { */ class task_manager { public: - explicit task_manager(task *tasks, - data_structures::aligned_stack static_stack_space, + explicit task_manager(unsigned thread_id, size_t num_tasks, size_t stack_size, - external_trading_deque &deque); + stack_allocator &stack_allocator); + ~task_manager(); void push_resource_on_task(task *target_task, task *spare_task_chain); task *pop_resource_from_task(task *target_task); - task *get_this_thread_task(size_t depth) { - return &this_thread_tasks_[depth]; - } - - void set_thread_id(unsigned id) { - for (size_t i = 0; i < num_tasks_; i++) { - this_thread_tasks_[i].thread_id_ = id; - } - deque_.set_thread_id(id); + task &get_this_thread_task(size_t depth) { + return *tasks_[depth]; } task &get_active_task() { @@ -63,33 +66,13 @@ class task_manager { private: size_t num_tasks_; - task *this_thread_tasks_; + stack_allocator &stack_allocator_; + std::vector> tasks_; task *active_task_; - external_trading_deque &deque_; + external_trading_deque deque_; }; -template -class static_task_manager { - public: - static_task_manager() - : tasks_{}, - static_stack_storage_{}, - static_external_trading_deque_{}, - task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE, - static_external_trading_deque_.get_deque()} {}; - task_manager &get_task_manager() { return task_manager_; } - - private: - std::array tasks_; - data_structures::static_aligned_stack static_stack_storage_; - static_external_trading_deque static_external_trading_deque_; - - task_manager task_manager_; -}; - -} -} } #include "task_manager_impl.h" diff --git a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h index ba11377..3fda72c 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h @@ -8,13 +8,10 @@ #include "context_switcher/continuation.h" -#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/thread_state.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { template void task_manager::spawn_child(F &&lambda) { @@ -77,7 +74,5 @@ void task_manager::spawn_child(F &&lambda) { } } -} -} #endif //PLS_TASK_MANAGER_IMPL_H_ diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 329ff91..7f717fd 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -6,75 +6,71 @@ #include #include -#include "pls/internal/base/system_details.h" - #include "context_switcher/continuation.h" -namespace pls { -namespace internal { -namespace scheduling { +#include "pls/internal/base/system_details.h" + +namespace pls::internal::scheduling { class scheduler; class task_manager; -struct task; -struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { +/** + * Proxy-Object for thread local state needed during scheduling. + * The main use is to perform thread_state::get() as a thread local + * memory to identify the current worker thread state. + * + * Holds only minimal data by itself and points to the appropriate scheduler + * and task manager objects associated with this thread. + */ +struct PLS_CACHE_ALIGN thread_state { private: - scheduler *scheduler_; - unsigned id_; + const unsigned thread_id_; + scheduler &scheduler_; task_manager &task_manager_; - alignas(base::system_details::CACHE_LINE_SIZE) context_switcher::continuation main_loop_continuation_; - alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; + PLS_CACHE_ALIGN context_switcher::continuation main_loop_continuation_; + PLS_CACHE_ALIGN std::minstd_rand random_; public: - explicit thread_state(task_manager &task_manager) : - scheduler_{nullptr}, - id_{0}, + explicit thread_state(scheduler &scheduler, + unsigned thread_id, + task_manager &task_manager) : + thread_id_{thread_id}, + scheduler_{scheduler}, task_manager_{task_manager}, random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count())} {}; + // Do not allow accidental copy/move operations. + thread_state(const thread_state &) = delete; + thread_state(thread_state &&) = delete; + thread_state &operator=(const thread_state &) = delete; + thread_state &operator=(thread_state &&) = delete; + /** * Convenience helper to get the thread_state instance associated with this thread. * Must only be called on threads that are associated with a thread_state, * this will most likely be threads created by the scheduler. * - * Each call is guaranteed to be a new lockup, i.e. it is not cached after fiber context switches. + * Each call is guaranteed to be a new lookup, i.e. it is not cached after fiber context switches. * * @return The thread_state of this thread. */ - static thread_state &PLS_NOINLINE get(); + [[nodiscard]] static thread_state &PLS_NOINLINE get(); + static void set(thread_state *); - unsigned get_id() { return id_; } - void set_id(unsigned id) { - id_ = id; - } - task_manager &get_task_manager() { return task_manager_; } - scheduler &get_scheduler() { return *scheduler_; } - void set_scheduler(scheduler *scheduler) { - scheduler_ = scheduler; - } - long get_rand() { + [[nodiscard]] unsigned get_thread_id() const { return thread_id_; } + [[nodiscard]] task_manager &get_task_manager() { return task_manager_; } + [[nodiscard]] scheduler &get_scheduler() { return scheduler_; } + [[nodiscard]] long get_rand() { return random_(); } - context_switcher::continuation &main_continuation() { + [[nodiscard]] context_switcher::continuation &main_continuation() { return main_loop_continuation_; } - - // Do not allow move/copy operations. - // State is a pure memory container with references/pointers into it from all over the code. - // It should be allocated, used and de-allocated, nothing more. - thread_state(thread_state &&) = delete; - thread_state &operator=(thread_state &&) = delete; - - thread_state(const thread_state &) = delete; - thread_state &operator=(const thread_state &) = delete; - }; } -} -} #endif //PLS_THREAD_STATE_H diff --git a/lib/pls/include/pls/internal/scheduling/thread_state_static.h b/lib/pls/include/pls/internal/scheduling/thread_state_static.h deleted file mode 100644 index 93539fb..0000000 --- a/lib/pls/include/pls/internal/scheduling/thread_state_static.h +++ /dev/null @@ -1,31 +0,0 @@ - -#ifndef PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ -#define PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ - -#include "pls/internal/scheduling/task_manager.h" -#include "pls/internal/base/system_details.h" - -#include "thread_state.h" - -namespace pls { -namespace internal { -namespace scheduling { - -template -struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state_static { - public: - thread_state_static() - : static_task_manager_{}, - thread_state_{static_task_manager_.get_task_manager()} {} - thread_state &get_thread_state() { return thread_state_; } - - private: - alignas(base::system_details::CACHE_LINE_SIZE) static_task_manager static_task_manager_; - alignas(base::system_details::CACHE_LINE_SIZE) thread_state thread_state_; -}; - -} -} -} - -#endif //PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ diff --git a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h index 6a468b6..3d31528 100644 --- a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h +++ b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h @@ -7,9 +7,7 @@ #include "pls/internal/base/error_handling.h" #include "pls/internal/base/system_details.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { struct task; struct traded_cas_field { @@ -82,7 +80,5 @@ struct traded_cas_field { }; } -} -} #endif //PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ diff --git a/lib/pls/src/internal/base/thread.cpp b/lib/pls/src/internal/base/thread.cpp deleted file mode 100644 index 04b564c..0000000 --- a/lib/pls/src/internal/base/thread.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "pls/internal/base/thread.h" - -namespace pls { -namespace internal { -namespace base { - -thread::thread() : - pthread_thread_{}, - running_{false} {} - -thread::~thread() { - if (running_) { - join(); - } -} - -thread::thread(thread &&other) noexcept : - pthread_thread_{other.pthread_thread_}, - running_{other.running_} { - other.running_ = false; -} - -thread &thread::operator=(thread &&other) noexcept { - this->pthread_thread_ = other.pthread_thread_; - this->running_ = other.running_; - - other.running_ = false; - - return *this; -} - -#ifdef PLS_THREAD_SPECIFIC_PTHREAD -pthread_key_t this_thread::local_storage_key_ = false; -bool this_thread::local_storage_key_initialized_; -#endif -#ifdef PLS_THREAD_SPECIFIC_COMPILER -__thread void *this_thread::local_state_; -#endif - -void thread::join() { - pthread_join(pthread_thread_, nullptr); - running_ = false; -} - -} -} -} diff --git a/lib/pls/src/internal/scheduling/external_trading_deque.cpp b/lib/pls/src/internal/scheduling/external_trading_deque.cpp new file mode 100644 index 0000000..927528e --- /dev/null +++ b/lib/pls/src/internal/scheduling/external_trading_deque.cpp @@ -0,0 +1,136 @@ +#include "pls/internal/scheduling/external_trading_deque.h" + +namespace pls::internal::scheduling { + +optional external_trading_deque::peek_traded_object(task *target_task) { + traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); + if (current_cas.is_filled_with_object()) { + return optional{current_cas.get_trade_object()}; + } else { + return optional{}; + } +} + +optional external_trading_deque::get_trade_object(task *target_task) { + traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); + if (current_cas.is_filled_with_object()) { + task *result = current_cas.get_trade_object(); + traded_cas_field empty_cas; + if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { + return optional{result}; + } + } + + return optional{}; +} + +void external_trading_deque::push_bot(task *published_task) { + auto expected_stamp = bot_internal_.stamp; + auto ¤t_entry = entries_[bot_internal_.value]; + + // Publish the prepared task in the deque. + current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); + current_entry.traded_task_.store(published_task, std::memory_order_relaxed); + + // Field that all threads synchronize on. + // This happens not in the deque itself, but in the published task. + traded_cas_field sync_cas_field; + sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); + published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); + + // Advance the bot pointer. Linearization point for making the task public. + bot_internal_.stamp++; + bot_internal_.value++; + bot_.store(bot_internal_.value, std::memory_order_release); +} + +void external_trading_deque::reset_bot_and_top() { + bot_internal_.value = 0; + bot_internal_.stamp++; + + bot_.store(0); + top_.store({bot_internal_.stamp, 0}); +} + +void external_trading_deque::decrease_bot() { + bot_internal_.value--; + bot_.store(bot_internal_.value, std::memory_order_relaxed); +} + +optional external_trading_deque::pop_bot() { + if (bot_internal_.value == 0) { + reset_bot_and_top(); + return optional{}; + } + decrease_bot(); + + auto ¤t_entry = entries_[bot_internal_.value]; + auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); + auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); + + // We know what value must be in the cas field if no other thread stole it. + traded_cas_field expected_sync_cas_field; + expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); + traded_cas_field empty_cas_field; + + if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, + empty_cas_field, + std::memory_order_acq_rel)) { + return optional{popped_task}; + } else { + reset_bot_and_top(); + return optional{}; + } +} + +external_trading_deque::peek_result external_trading_deque::peek_top() { + auto local_top = top_.load(); + auto local_bot = bot_.load(); + + if (local_top.value < local_bot) { + return peek_result{optional{entries_[local_top.value].traded_task_}, local_top}; + } else { + return peek_result{optional{}, local_top}; + } +} + +optional external_trading_deque::pop_top(task *offered_task, peek_result peek_result) { + stamped_integer expected_top = peek_result.top_pointer_; + auto local_bot = bot_.load(); + if (expected_top.value >= local_bot) { + return data_structures::optional{}; + } + + auto &target_entry = entries_[expected_top.value]; + + // Read our potential result + task *result = target_entry.traded_task_.load(); + unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); + + // Try to get it by CAS with the expected field entry, giving up our offered_task for it + traded_cas_field expected_sync_cas_field; + expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_); + + traded_cas_field offered_field; + offered_field.fill_with_trade_object(offered_task); + + if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { + // We got it, for sure move the top pointer forward. + top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); + // Return the stolen task + return data_structures::optional{result}; + } else { + // We did not get it...help forwarding the top pointer anyway. + if (expected_top.stamp == forwarding_stamp) { + // ...move the pointer forward if someone else put a valid trade object in there. + top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); + } else { + // ...we failed because the top tag lags behind...try to fix it. + // This means only updating the tag, as this location can still hold data we need. + top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value}); + } + return data_structures::optional{}; + } +} + +} diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index f3d9773..318de19 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -2,36 +2,39 @@ #include "context_switcher/context_switcher.h" -#include "pls/internal/scheduling/task_manager.h" -#include "pls/internal/scheduling/thread_state.h" - #include "pls/internal/base/thread.h" #include "pls/internal/base/error_handling.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { -scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, bool reuse_thread) : +scheduler::scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread) : num_threads_{num_threads}, reuse_thread_{reuse_thread}, - memory_{memory}, sync_barrier_{num_threads + 1 - reuse_thread}, + worker_threads_{}, + thread_states_{}, + main_thread_starter_function_{nullptr}, + work_section_done_{false}, terminated_{false} { - if (num_threads_ > memory.max_threads()) { - PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); - } + worker_threads_.reserve(num_threads); + task_managers_.reserve(num_threads); + thread_states_.reserve(num_threads); for (unsigned int i = 0; i < num_threads_; i++) { - // Placement new is required, as the memory of `memory_` is not required to be initialized. - memory.thread_state_for(i).set_scheduler(this); - memory.thread_state_for(i).set_id(i); - memory.thread_state_for(i).get_task_manager().set_thread_id(i); + auto &this_task_manager = + task_managers_.emplace_back(std::make_unique(i, computation_depth, stack_size, stack_allocator_)); + auto &this_thread_state = thread_states_.emplace_back(std::make_unique(*this, i, *this_task_manager)); if (reuse_thread && i == 0) { + worker_threads_.emplace_back(); continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. } - memory.thread_for(i) = base::thread(&scheduler::work_thread_main_loop, &memory_.thread_state_for(i)); + + auto *this_thread_state_pointer = this_thread_state.get(); + worker_threads_.emplace_back([this_thread_state_pointer] { + thread_state::set(this_thread_state_pointer); + work_thread_main_loop(); + }); } } @@ -63,7 +66,7 @@ void scheduler::work_thread_work_section() { auto const num_threads = my_state.get_scheduler().num_threads(); - if (my_state.get_id() == 0) { + if (my_state.get_thread_id() == 0) { // Main Thread, kick off by executing the user's main code block. main_thread_starter_function_->run(); } @@ -74,7 +77,7 @@ void scheduler::work_thread_work_section() { // TODO: move steal routine into separate function const size_t target = my_state.get_rand() % num_threads; - if (target == my_state.get_id()) { + if (target == my_state.get_thread_id()) { continue; } @@ -134,16 +137,12 @@ void scheduler::terminate() { if (reuse_thread_ && i == 0) { continue; } - memory_.thread_for(i).join(); + worker_threads_[i].join(); } } -thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); } - void scheduler::sync() { thread_state::get().get_task_manager().sync(); } } -} -} diff --git a/lib/pls/src/internal/scheduling/task_manager.cpp b/lib/pls/src/internal/scheduling/task_manager.cpp index c0c7e12..245b41f 100644 --- a/lib/pls/src/internal/scheduling/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/task_manager.cpp @@ -2,31 +2,38 @@ #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/scheduler.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { -task_manager::task_manager(task *tasks, - data_structures::aligned_stack static_stack_space, +task_manager::task_manager(unsigned thread_id, size_t num_tasks, size_t stack_size, - external_trading_deque &deque) : num_tasks_{num_tasks}, - this_thread_tasks_{tasks}, - active_task_{&tasks[0]}, - deque_{deque} { + stack_allocator &stack_allocator) : num_tasks_{num_tasks}, + stack_allocator_{stack_allocator}, + tasks_{}, + deque_{thread_id, num_tasks_} { + tasks_.reserve(num_tasks); + for (size_t i = 0; i < num_tasks - 1; i++) { - tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0); + char *stack_memory = stack_allocator.allocate_stack(stack_size); + tasks_.emplace_back(std::make_unique(stack_memory, stack_size, i, thread_id)); + if (i > 0) { - tasks[i].prev_ = &tasks[i - 1]; - } - if (i < num_tasks - 2) { - tasks[i].next_ = &tasks[i + 1]; + tasks_[i - 1]->next_ = tasks_[i].get(); + tasks_[i]->prev_ = tasks_[i - 1].get(); } } + active_task_ = tasks_[0].get(); +} + +task_manager::~task_manager() { + for (auto &task : tasks_) { + stack_allocator_.free_stack(task->stack_size_, task->stack_memory_); + } } -static task *find_task(unsigned id, unsigned depth) { +static task &find_task(unsigned id, unsigned depth) { return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth); } @@ -49,7 +56,7 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) { traded_task->next_ = nullptr; // perform the actual pop operation - auto pop_result_task = deque_.pop_top(traded_task, peek.top_pointer_); + auto pop_result_task = deque_.pop_top(traded_task, peek); if (pop_result_task) { PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_, "It is impossible to steal an task we already own!"); @@ -90,8 +97,8 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha spare_task_chain->resource_stack_next_.store(nullptr); } else { // Already an entry. Find it's corresponding task and set it as our successor. - auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); - spare_task_chain->resource_stack_next_.store(current_root_task); + auto ¤t_root_task = find_task(current_root.value - 1, target_task->depth_); + spare_task_chain->resource_stack_next_.store(¤t_root_task); } } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); @@ -108,13 +115,13 @@ task *task_manager::pop_resource_from_task(task *target_task) { return nullptr; } else { // Found something, try to pop it - auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); - auto *next_stack_task = current_root_task->resource_stack_next_.load(); + auto ¤t_root_task = find_task(current_root.value - 1, target_task->depth_); + auto *next_stack_task = current_root_task.resource_stack_next_.load(); target_root.stamp = current_root.stamp + 1; target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; - output_task = current_root_task; + output_task = ¤t_root_task; } } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); @@ -233,5 +240,3 @@ bool task_manager::check_task_chain() { } } -} -} diff --git a/lib/pls/src/internal/scheduling/thread_state.cpp b/lib/pls/src/internal/scheduling/thread_state.cpp index fe70562..85d1c94 100644 --- a/lib/pls/src/internal/scheduling/thread_state.cpp +++ b/lib/pls/src/internal/scheduling/thread_state.cpp @@ -1,12 +1,10 @@ #include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/base/thread.h" -namespace pls { -namespace internal { -namespace scheduling { +namespace pls::internal::scheduling { -thread_state &thread_state::get() { return *base::this_thread::state(); } +thread_local thread_state *my_thread_state{nullptr}; + +thread_state &thread_state::get() { return *my_thread_state; } +void thread_state::set(thread_state *new_state) { my_thread_state = new_state; } -} -} } diff --git a/test/base_tests.cpp b/test/base_tests.cpp index dafdfca..03a5d33 100644 --- a/test/base_tests.cpp +++ b/test/base_tests.cpp @@ -1,38 +1,17 @@ #include -#include "pls/internal/base/thread.h" + #include "pls/internal/base/spin_lock.h" #include "pls/internal/base/system_details.h" #include #include +#include using namespace pls::internal::base; -using namespace std; static bool base_tests_visited; static int base_tests_local_value_one; -static vector base_tests_local_value_two; - -TEST_CASE("thread creation and joining", "[internal/data_structures/thread.h]") { - base_tests_visited = false; - thread t1{[]() { base_tests_visited = true; }}; - t1.join(); - - REQUIRE(base_tests_visited); -} - -TEST_CASE("thread state", "[internal/data_structures/thread.h]") { - int state_one = 1; - vector state_two{1, 2}; - - thread t1{[]() { base_tests_local_value_one = *this_thread::state(); }, &state_one}; - thread t2{[]() { base_tests_local_value_two = *this_thread::state>(); }, &state_two}; - t1.join(); - t2.join(); - - REQUIRE(base_tests_local_value_one == 1); - REQUIRE(base_tests_local_value_two == vector{1, 2}); -} +static std::vector base_tests_local_value_two; int base_tests_shared_counter; @@ -42,14 +21,14 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi spin_lock lock{}; SECTION("lock can be used by itself") { - thread t1{[&]() { + std::thread t1{[&]() { for (int i = 0; i < num_iterations; i++) { lock.lock(); base_tests_shared_counter++; lock.unlock(); } }}; - thread t2{[&]() { + std::thread t2{[&]() { for (int i = 0; i < num_iterations; i++) { lock.lock(); base_tests_shared_counter--; @@ -64,13 +43,13 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi } SECTION("lock can be used with std::lock_guard") { - thread t1{[&]() { + std::thread t1{[&]() { for (int i = 0; i < num_iterations; i++) { std::lock_guard my_lock{lock}; base_tests_shared_counter++; } }}; - thread t2{[&]() { + std::thread t2{[&]() { for (int i = 0; i < num_iterations; i++) { std::lock_guard my_lock{lock}; base_tests_shared_counter--; diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index ce82c9d..c192d6f 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -1,13 +1,42 @@ #include #include +#include #include "pls/internal/scheduling/traded_cas_field.h" #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/external_trading_deque.h" +#include "pls/internal/scheduling/scheduler.h" using namespace pls::internal::scheduling; +constexpr int MAX_NUM_THREADS = 8; +constexpr int MAX_NUM_TASKS = 32; +constexpr int MAX_STACK_SIZE = 1024 * 8; + +TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling/scheduler.h]") { + scheduler scheduler{3, MAX_NUM_TASKS, MAX_STACK_SIZE}; + + std::atomic num_run{0}; + scheduler.perform_work([&] { + scheduler::spawn([&] { + num_run++; + while (num_run < 3); + }); + scheduler::spawn([&] { + while (num_run < 1); + num_run++; + while (num_run < 3); + }); + scheduler::spawn([&] { + while (num_run < 2); + num_run++; + }); + scheduler::sync(); + }); + REQUIRE(true); +} + TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas_field.h]") { traded_cas_field empty_field; REQUIRE(empty_field.is_empty()); @@ -24,7 +53,7 @@ TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas REQUIRE(tag_field.get_stamp() == stamp); REQUIRE(tag_field.get_deque_id() == ID); - alignas(64) task obj; + alignas(64) task obj{nullptr, 0, 0, 0}; traded_cas_field obj_field; obj_field.fill_with_trade_object(&obj); REQUIRE(obj_field.is_filled_with_object()); @@ -33,15 +62,13 @@ TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas } TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque]") { - static_external_trading_deque<16> static_external_trading_deque_1; - external_trading_deque &deque_1 = static_external_trading_deque_1.get_deque(); - deque_1.set_thread_id(1); - - static_external_trading_deque<16> static_external_trading_deque_2; - external_trading_deque &deque_2 = static_external_trading_deque_2.get_deque(); - deque_2.set_thread_id(2); + external_trading_deque deque_1{1, 16}; + external_trading_deque deque_2{2, 16}; - std::vector tasks(16); + task tasks[4] = {{nullptr, 0, 0, 0}, + {nullptr, 0, 1, 0}, + {nullptr, 0, 2, 0}, + {nullptr, 0, 3, 0}}; SECTION("basic operations") { // Must start empty @@ -56,9 +83,9 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque // Local push, external pop deque_1.push_bot(&tasks[0]); auto peek = deque_1.peek_top(); - REQUIRE(*deque_1.pop_top(&tasks[1], peek.top_pointer_) == &tasks[0]); + REQUIRE(*deque_1.pop_top(&tasks[1], peek) == &tasks[0]); REQUIRE(*external_trading_deque::get_trade_object(&tasks[0]) == &tasks[1]); - REQUIRE(!deque_1.pop_top(&tasks[1], peek.top_pointer_)); + REQUIRE(!deque_1.pop_top(&tasks[1], peek)); REQUIRE(!deque_1.pop_bot()); // Keeps push/pop order @@ -71,9 +98,9 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque deque_1.push_bot(&tasks[0]); deque_1.push_bot(&tasks[1]); auto peek1 = deque_1.peek_top(); - REQUIRE(*deque_1.pop_top(&tasks[2], peek1.top_pointer_) == &tasks[0]); + REQUIRE(*deque_1.pop_top(&tasks[2], peek1) == &tasks[0]); auto peek2 = deque_1.peek_top(); - REQUIRE(*deque_1.pop_top(&tasks[3], peek2.top_pointer_) == &tasks[1]); + REQUIRE(*deque_1.pop_top(&tasks[3], peek2) == &tasks[1]); } SECTION("Interwined execution #1") { @@ -81,8 +108,8 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque deque_1.push_bot(&tasks[0]); auto peek1 = deque_1.peek_top(); auto peek2 = deque_1.peek_top(); - REQUIRE(*deque_1.pop_top(&tasks[1], peek1.top_pointer_) == &tasks[0]); - REQUIRE(!deque_1.pop_top(&tasks[2], peek2.top_pointer_)); + REQUIRE(*deque_1.pop_top(&tasks[1], peek1) == &tasks[0]); + REQUIRE(!deque_1.pop_top(&tasks[2], peek2)); } SECTION("Interwined execution #2") { @@ -90,6 +117,6 @@ TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque deque_1.push_bot(&tasks[0]); auto peek1 = deque_1.peek_top(); REQUIRE(*deque_1.pop_bot() == &tasks[0]); - REQUIRE(!deque_1.pop_top(&tasks[2], peek1.top_pointer_)); + REQUIRE(!deque_1.pop_top(&tasks[2], peek1)); } }