diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 8180fa3..47ce0a1 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -1,5 +1,5 @@ #include "pls/internal/scheduling/scheduler.h" -#include "pls/internal/scheduling/scheduler_memory.h" +#include "pls/internal/scheduling/static_scheduler_memory.h" using namespace pls::internal::scheduling; diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 81e80df..a77eb72 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -61,7 +61,11 @@ add_library(pls STATIC src/internal/base/error_handling.cpp include/pls/internal/data_structures/bounded_trading_deque.h include/pls/internal/scheduling/external_trading_deque.h - include/pls/internal/scheduling/traded_cas_field.h) + include/pls/internal/scheduling/traded_cas_field.h + include/pls/internal/scheduling/task_manager_impl.h + include/pls/internal/scheduling/static_scheduler_memory.h + include/pls/internal/scheduling/heap_scheduler_memory.h + src/internal/scheduling/task_manager.cpp) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h new file mode 100644 index 0000000..07c44c2 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h @@ -0,0 +1,59 @@ +#ifndef PLS_HEAP_SCHEDULER_MEMORY_H +#define PLS_HEAP_SCHEDULER_MEMORY_H + +#include + +#include "pls/internal/base/thread.h" + +#include "pls/internal/scheduling/scheduler_memory.h" +#include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/thread_state_static.h" + +namespace pls { +namespace internal { +namespace scheduling { + +template +class heap_scheduler_memory : public scheduler_memory { + public: + explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads}, + thread_vector_{}, + thread_state_vector_{}, + thread_state_pointers_{} { + thread_vector_.reserve(max_threads); + thread_state_vector_.reserve(max_threads); + + for (size_t i = 0; i < max_threads; i++) { + thread_vector_.emplace_back(); + thread_state_vector_.emplace_back(); + thread_state_pointers_.emplace_back(&thread_state_vector_[i].get_thread_state()); + } + thread_states_array_ = thread_state_pointers_.data(); + } + + size_t max_threads() const override { + return max_threads_; + } + + base::thread &thread_for(size_t id) override { + return thread_vector_[id]; + } + private: + using thread_state_type = thread_state_static; + // thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require + // the new operator to obey alignments bigger than 16, cache lines are usually 64). + // To allow this object to be allocated using 'new' (which the vector does internally), + // we need to wrap it in an non aligned object. + using thread_state_wrapper = base::alignment::cache_alignment_wrapper; + + size_t max_threads_; + std::vector thread_vector_; + std::vector thread_state_vector_; + std::vector thread_state_pointers_; +}; + +} +} +} + +#endif // PLS_HEOP_SCHEDULER_MEMORY_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index f165e91..1fb55ac 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -2,8 +2,7 @@ #ifndef PLS_SCHEDULER_H #define PLS_SCHEDULER_H -#include -#include +#include #include "pls/internal/helpers/profiler.h" @@ -12,18 +11,19 @@ #include "pls/internal/scheduling/scheduler_memory.h" #include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/scheduling/task.h" namespace pls { namespace internal { namespace scheduling { +struct task; + /** * The scheduler is the central part of the dispatching-framework. * It manages a pool of worker threads (creates, sleeps/wakes up, destroys) * and allows to execute parallel sections. * - * It works in close rellation with the 'task' class for scheduling. + * It works in close relation with the 'task' class for scheduling. */ class scheduler { public: @@ -54,9 +54,9 @@ class scheduler { void perform_work(Function work_section); template - static void spawn(Function &&lambda) { - thread_state::get().get_task_manager().spawn_child(std::forward(lambda)); - } + static void spawn(Function &&lambda); + + thread_state &thread_state_for(size_t id); /** * Explicitly terminate the worker threads. Scheduler must not be used after this. @@ -68,7 +68,6 @@ class scheduler { private: static void work_thread_main_loop(); void work_thread_work_section(); - thread_state &thread_state_for(size_t id); const unsigned int num_threads_; const bool reuse_thread_; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 30414ee..feb6ca4 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -7,7 +7,9 @@ #include "context_switcher/context_switcher.h" #include "context_switcher/continuation.h" +#include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/task.h" + #include "pls/internal/helpers/profiler.h" namespace pls { @@ -25,10 +27,12 @@ class scheduler::init_function_impl : public init_function { void run() override { auto &thread_state = thread_state::get(); thread_state.get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) { + thread_state.set_main_continuation(std::move(cont)); function_(); - return std::move(cont); + thread_state.get_scheduler().work_section_done_.store(true); + return std::move(thread_state.get_main_continuation()); }); - thread_state.get_scheduler().work_section_done_.store(true); + } private: F &function_; @@ -55,6 +59,11 @@ void scheduler::perform_work(Function work_section) { } } +template +void scheduler::spawn(Function &&lambda) { + thread_state::get().get_task_manager().spawn_child(std::forward(lambda)); +} + } } } diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h index 6db97d3..f07ffca 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h @@ -1,16 +1,15 @@ #ifndef PLS_SCHEDULER_MEMORY_H #define PLS_SCHEDULER_MEMORY_H -#include - #include "pls/internal/base/thread.h" #include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/scheduling/thread_state_static.h" namespace pls { namespace internal { namespace scheduling { +// TODO: This way to handle memory is kind of a mess. We reworked it once...maybe it needs a second visit +// especially with the new 'stagered stack' (mmap allocs with page faults would be nice). class scheduler_memory { // Note: scheduler_memory is a pure interface and has no data. // By not having an initialization routine we can do our 'static and heap specialization' @@ -27,70 +26,6 @@ class scheduler_memory { } }; -template -class static_scheduler_memory : public scheduler_memory { - public: - static_scheduler_memory() : scheduler_memory{} { - for (size_t i = 0; i < MAX_THREADS; i++) { - thread_state_pointers_[i] = &thread_states_[i].get_thread_state(); - } - thread_states_array_ = thread_state_pointers_.data(); - } - - size_t max_threads() const override { - return MAX_THREADS; - } - - base::thread &thread_for(size_t id) override { - return threads_[id]; - } - private: - using thread_state_type = thread_state_static; - - alignas(base::system_details::CACHE_LINE_SIZE) std::array threads_; - alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_states_; - alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_state_pointers_; -}; - -template -class heap_scheduler_memory : public scheduler_memory { - public: - explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads}, - thread_vector_{}, - thread_state_vector_{}, - thread_state_pointers_{} { - thread_vector_.reserve(max_threads); - thread_state_vector_.reserve(max_threads); - - for (size_t i = 0; i < max_threads; i++) { - thread_vector_.emplace_back(); - thread_state_vector_.emplace_back(); - thread_state_pointers_.emplace_back(&thread_state_vector_[i].get_thread_state()); - } - thread_states_array_ = thread_state_pointers_.data(); - } - - size_t max_threads() const override { - return max_threads_; - } - - base::thread &thread_for(size_t id) override { - return thread_vector_[id]; - } - private: - using thread_state_type = thread_state_static; - // thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require - // the new operator to obey alignments bigger than 16, cache lines are usually 64). - // To allow this object to be allocated using 'new' (which the vector does internally), - // we need to wrap it in an non aligned object. - using thread_state_wrapper = base::alignment::cache_alignment_wrapper; - - size_t max_threads_; - std::vector thread_vector_; - std::vector thread_state_vector_; - std::vector thread_state_pointers_; -}; - } } } diff --git a/lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h new file mode 100644 index 0000000..f19adb4 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/static_scheduler_memory.h @@ -0,0 +1,43 @@ +#ifndef PLS_STATIC_SCHEDULER_MEMORY_H +#define PLS_STATIC_SCHEDULER_MEMORY_H + +#include "pls/internal/base/thread.h" + +#include "pls/internal/scheduling/scheduler_memory.h" +#include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/thread_state_static.h" + +namespace pls { +namespace internal { +namespace scheduling { + +template +class static_scheduler_memory : public scheduler_memory { + public: + static_scheduler_memory() : scheduler_memory{} { + for (size_t i = 0; i < MAX_THREADS; i++) { + thread_state_pointers_[i] = &thread_states_[i].get_thread_state(); + } + thread_states_array_ = thread_state_pointers_.data(); + } + + size_t max_threads() const override { + return MAX_THREADS; + } + + base::thread &thread_for(size_t id) override { + return threads_[id]; + } + private: + using thread_state_type = thread_state_static; + + alignas(base::system_details::CACHE_LINE_SIZE) std::array threads_; + alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_states_; + alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_state_pointers_; +}; + +} +} +} + +#endif // PLS_STATIC_SCHEDULER_MEMORY_H diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 8be6867..5664046 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -8,7 +8,7 @@ #include "context_switcher/context_switcher.h" #include "pls/internal/base/system_details.h" - +#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/scheduling/traded_cas_field.h" namespace pls { @@ -36,34 +36,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { thread_id_ = thread_id; } - unsigned get_thread_id() const { - return thread_id_; - } - void set_thread_id(unsigned thread_id) { - thread_id_ = thread_id; - } - - task *get_prev() const { - return prev_; - } - void set_prev(task *prev) { - prev_ = prev; - } - - task *get_next() const { - return next_; - } - void set_next(task *next) { - next_ = next; - } - - task *get_parent_task() const { - return parent_task_; - } - void set_parent_task(task *parent_task) { - parent_task_ = parent_task; - } - context_switcher::continuation get_continuation() { return std::move(continuation_); } @@ -76,7 +48,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { return context_switcher::enter_context(stack_memory_, stack_size_, std::forward(lambda)); } - // TODO: Proper access control + // TODO: Proper access control and split it up into responsibilities // Stack/Continuation Management char *stack_memory_; size_t stack_size_; @@ -84,6 +56,8 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { // Work-Stealing std::atomic traded_field_{}; + task *resource_stack_next_{}; + std::atomic resource_stack_root_{{0, 0}}; // Task Tree (we have a parent that we want to continue when we finish) task *parent_task_; diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 85c1558..d1462b2 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -6,8 +6,6 @@ #include #include -#include "context_switcher/continuation.h" - #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/external_trading_deque.h" @@ -27,28 +25,18 @@ class task_manager { data_structures::aligned_stack static_stack_space, size_t num_tasks, size_t stack_size, - external_trading_deque &deque) : num_tasks_{num_tasks}, - this_thread_tasks_{tasks}, - active_task_{&tasks[0]}, - deque_{deque} { - for (size_t i = 0; i < num_tasks - 1; i++) { - tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0); - if (i > 0) { - tasks[i].set_prev(&tasks[i - 1]); - } - if (i < num_tasks - 2) { - tasks[i].set_next(&tasks[i + 1]); - } - } - } + external_trading_deque &deque); - task &get_this_thread_task(size_t depth) { - return this_thread_tasks_[depth]; + void push_resource_on_task(task *target_task, task *spare_task_chain); + task* pop_resource_from_task(task *target_task); + + task *get_this_thread_task(size_t depth) { + return &this_thread_tasks_[depth]; } void set_thread_id(unsigned id) { for (size_t i = 0; i < num_tasks_; i++) { - this_thread_tasks_[i].set_thread_id(id); + this_thread_tasks_[i].thread_id_ = id; } } @@ -57,30 +45,7 @@ class task_manager { } template - void spawn_child(F &&lambda) { - // TODO: Here is some potential for optimization. We could try placing everything manually on the stack. - active_task_->get_next()->run_as_task([lambda, this](context_switcher::continuation cont) { - auto *last_task = active_task_; - auto *this_task = active_task_->get_next(); - - last_task->set_continuation(std::move(cont)); - active_task_ = this_task; - - traded_cas_field expected_cas_value = deque_.push_bot(active_task_); - traded_cas_field empty_cas; - - lambda(); - - if (active_task_->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { - active_task_ = last_task; - deque_.popped_bot(); - return std::move(last_task->get_continuation()); - } else { - deque_.empty_deque(); - PLS_ERROR("Slow Path/Stealing not implemented!"); - } - }); - } + void spawn_child(F &&lambda); private: size_t num_tasks_; @@ -113,4 +78,6 @@ class static_task_manager { } } } +#include "task_manager_impl.h" + #endif //PLS_TASK_MANAGER_H_ diff --git a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h new file mode 100644 index 0000000..8439bc1 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h @@ -0,0 +1,93 @@ + +#ifndef PLS_TASK_MANAGER_IMPL_H_ +#define PLS_TASK_MANAGER_IMPL_H_ + +#include +#include +#include + +#include "context_switcher/continuation.h" + +#include "pls/internal/scheduling/scheduler.h" +#include "pls/internal/scheduling/task.h" +#include "pls/internal/scheduling/thread_state.h" + +namespace pls { +namespace internal { +namespace scheduling { + +template +void task_manager::spawn_child(F &&lambda) { + auto continuation = active_task_->next_->run_as_task([lambda, this](context_switcher::continuation cont) { + auto *last_task = active_task_; + auto *this_task = active_task_->next_; + + last_task->continuation_ = std::move(cont); + active_task_ = this_task; + + traded_cas_field expected_cas_value = deque_.push_bot(active_task_); + traded_cas_field empty_cas; + + lambda(); + + if (active_task_->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { + // Fast path, simply continue execution where we left of before spawn. + // This requires no coordination with the resource stack. + active_task_ = last_task; + deque_.popped_bot(); + return std::move(last_task->continuation_); + } else { + // Slow path, the continuation was stolen. + // First empty our own deque (everything below must have been stolen already). + deque_.empty_deque(); + + // TODO: This whole process can be taken out into a function, no need to copy it in every lambda. + // Also split smaller functions out, this is a mess right now... + // Try to get a clean resource chain to go back to the main stealing loop + task *clean_chain = pop_resource_from_task(last_task); + if (clean_chain == nullptr) { + // double-check if we are really last one or we only have unlucky timing + auto cas_field = last_task->traded_field_.load(); + if (cas_field.is_filled_with_object()) { + traded_cas_field empty_target; + if (last_task->traded_field_.compare_exchange_strong(cas_field, empty_target)) { + clean_chain = cas_field.get_trade_object(); + } else { + clean_chain = pop_resource_from_task(last_task); + } + } + } + + if (clean_chain != nullptr) { + // We got a clean chain to continue working on. + PLS_ASSERT(active_task_->prev_->depth_ == clean_chain->depth_, + "Resources must only reside in the correct depth!"); + active_task_->prev_ = clean_chain; + // Walk back chain to make first task active + while (active_task_->prev_ != nullptr) { + active_task_ = active_task_->prev_; + } + + // jump back to continuation in main scheduling loop, time to steal some work + return std::move(thread_state::get().get_main_continuation()); + } else { + // We are the last one working on this task. Thus the sync must be finished, continue working. + active_task_ = last_task; + return std::move(last_task->continuation_); + } + } + + PLS_ERROR("Slow Path/Stealing not implemented!"); + }); + + if (continuation.valid()) { + // We jumped in here from the main loop, keep track! + thread_state::get().set_main_continuation(std::move(continuation)); + } +} + +} +} +} + +#endif //PLS_TASK_MANAGER_IMPL_H_ diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 32eda14..82f425e 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -4,14 +4,16 @@ #include #include +#include -#include "pls/internal/scheduling/task_manager.h" +#include "context_switcher/continuation.h" namespace pls { namespace internal { namespace scheduling { class scheduler; +class task_manager; struct task; struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { @@ -21,6 +23,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { task_manager &task_manager_; alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_; + alignas(base::system_details::CACHE_LINE_SIZE) context_switcher::continuation main_loop_continuation_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; public: @@ -43,7 +46,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { unsigned get_id() { return id_; } void set_id(unsigned id) { id_ = id; - task_manager_.set_thread_id(id); } task_manager &get_task_manager() { return task_manager_; } scheduler &get_scheduler() { return *scheduler_; } @@ -54,6 +56,13 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { return random_(); } + void set_main_continuation(context_switcher::continuation &&continuation) { + main_loop_continuation_ = std::move(continuation); + } + context_switcher::continuation get_main_continuation() { + return std::move(main_loop_continuation_); + } + // Do not allow move/copy operations. // State is a pure memory container with references/pointers into it from all over the code. // It should be allocated, used and de-allocated, nothing more. diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index ae2036c..479c724 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -1,9 +1,10 @@ #include "pls/internal/scheduling/scheduler.h" + +#include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/thread_state.h" #include "pls/internal/base/thread.h" #include "pls/internal/base/error_handling.h" -#include "pls/internal/helpers/profiler.h" namespace pls { namespace internal { @@ -23,6 +24,7 @@ scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, b // Placement new is required, as the memory of `memory_` is not required to be initialized. memory.thread_state_for(i).set_scheduler(this); memory.thread_state_for(i).set_id(i); + memory.thread_state_for(i).get_task_manager().set_thread_id(i); if (reuse_thread && i == 0) { continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. diff --git a/lib/pls/src/internal/scheduling/task.cpp b/lib/pls/src/internal/scheduling/task.cpp index 5e78b37..4b9b0e3 100644 --- a/lib/pls/src/internal/scheduling/task.cpp +++ b/lib/pls/src/internal/scheduling/task.cpp @@ -1,8 +1,4 @@ -#include "pls/internal/helpers/profiler.h" - -#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/task.h" -#include "pls/internal/scheduling/thread_state.h" namespace pls { namespace internal {