From f9e6fc51a542bc0f82e02e3254d14eaabb45ccca Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Thu, 23 Apr 2020 16:23:00 +0200 Subject: [PATCH] WIP: separate deque/stealing/trading logic from general scheduler logic. --- app/benchmark_matrix/main.cpp | 2 +- extern/benchmark_runner/benchmark_runner.h | 8 +++++--- lib/pls/CMakeLists.txt | 14 +++++++++----- lib/pls/include/pls/internal/helpers/tsan_fiber_api.h | 21 --------------------- lib/pls/include/pls/internal/scheduling/base_task.h | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/external_trading_deque.h | 106 ---------------------------------------------------------------------------------------------------------- lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h | 59 ----------------------------------------------------------- lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/lock_free/task.h | 23 +++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/lock_free/task_manager.h | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/scheduler.h | 20 +++++++++++++------- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++------ lib/pls/include/pls/internal/scheduling/task.h | 74 -------------------------------------------------------------------------- lib/pls/include/pls/internal/scheduling/task_manager.h | 95 ++++++++++++++++++++--------------------------------------------------------------------------- lib/pls/include/pls/internal/scheduling/task_manager_impl.h | 78 ------------------------------------------------------------------------------ lib/pls/include/pls/internal/scheduling/thread_state.h | 16 +++++++++++----- lib/pls/include/pls/internal/scheduling/traded_cas_field.h | 84 ------------------------------------------------------------------------------------ lib/pls/src/internal/scheduling/base_task.cpp | 9 +++++++++ lib/pls/src/internal/scheduling/external_trading_deque.cpp | 136 ---------------------------------------------------------------------------------------------------------------------------------------- lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp | 137 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/src/internal/scheduling/lock_free/task_manager.cpp | 178 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/src/internal/scheduling/scheduler.cpp | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------------- lib/pls/src/internal/scheduling/task.cpp | 9 --------- lib/pls/src/internal/scheduling/task_manager.cpp | 241 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- test/scheduling_tests.cpp | 5 +++-- 26 files changed, 906 insertions(+), 952 deletions(-) delete mode 100644 lib/pls/include/pls/internal/helpers/tsan_fiber_api.h create mode 100644 lib/pls/include/pls/internal/scheduling/base_task.h delete mode 100644 lib/pls/include/pls/internal/scheduling/external_trading_deque.h delete mode 100644 lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h create mode 100644 lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h create mode 100644 lib/pls/include/pls/internal/scheduling/lock_free/task.h create mode 100644 lib/pls/include/pls/internal/scheduling/lock_free/task_manager.h create mode 100644 lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h delete mode 100644 lib/pls/include/pls/internal/scheduling/task.h delete mode 100644 lib/pls/include/pls/internal/scheduling/task_manager_impl.h delete mode 100644 lib/pls/include/pls/internal/scheduling/traded_cas_field.h create mode 100644 lib/pls/src/internal/scheduling/base_task.cpp delete mode 100644 lib/pls/src/internal/scheduling/external_trading_deque.cpp create mode 100644 lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp create mode 100644 lib/pls/src/internal/scheduling/lock_free/task_manager.cpp delete mode 100644 lib/pls/src/internal/scheduling/task.cpp delete mode 100644 lib/pls/src/internal/scheduling/task_manager.cpp diff --git a/app/benchmark_matrix/main.cpp b/app/benchmark_matrix/main.cpp index 6bc24c8..5d69b36 100644 --- a/app/benchmark_matrix/main.cpp +++ b/app/benchmark_matrix/main.cpp @@ -21,7 +21,7 @@ class pls_matrix : public matrix::matrix { }; constexpr int MAX_NUM_TASKS = 32; -constexpr int MAX_STACK_SIZE = 1024 * 1; +constexpr int MAX_STACK_SIZE = 4096 * 1; int main(int argc, char **argv) { int num_threads; diff --git a/extern/benchmark_runner/benchmark_runner.h b/extern/benchmark_runner/benchmark_runner.h index 7a9ef2b..37a2ac6 100644 --- a/extern/benchmark_runner/benchmark_runner.h +++ b/extern/benchmark_runner/benchmark_runner.h @@ -32,15 +32,17 @@ class benchmark_runner { } public: - benchmark_runner(string csv_path, string csv_name) : csv_path_{std::move(csv_path)}, - csv_name_{std::move(csv_name)}, - times_{} { + benchmark_runner(string csv_path, string csv_name, int num_measurements = 10000) : csv_path_{std::move(csv_path)}, + csv_name_{std::move(csv_name)}, + times_{} { string command = "mkdir -p " + csv_path_; int res = system(command.c_str()); if (res) { cout << "Error while creating directory!" << endl; exit(1); } + + times_.reserve(num_measurements); } static void read_args(int argc, char **argv, int &num_threads, string &path) { diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 7c967fc..b0fec82 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -1,5 +1,6 @@ # List all required files here (cmake best practice to NOT automate this step!) add_library(pls STATIC + include/pls/algorithms/loop_partition_strategy.h include/pls/algorithms/for_each.h include/pls/algorithms/for_each_impl.h include/pls/algorithms/invoke.h include/pls/algorithms/invoke_impl.h include/pls/algorithms/loop_partition_strategy.h @@ -30,12 +31,15 @@ add_library(pls STATIC include/pls/internal/helpers/seqence.h include/pls/internal/helpers/member_function.h - include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/scheduler.h include/pls/internal/scheduling/scheduler_impl.h src/internal/scheduling/scheduler.cpp - include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task_manager_impl.h src/internal/scheduling/task_manager.cpp - include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp - include/pls/internal/scheduling/external_trading_deque.h src/internal/scheduling/external_trading_deque.cpp - include/pls/internal/scheduling/traded_cas_field.h include/pls/algorithms/loop_partition_strategy.h) + include/pls/internal/scheduling/base_task.h src/internal/scheduling/base_task.cpp + include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp + include/pls/internal/scheduling/task_manager.h + + include/pls/internal/scheduling/lock_free/task.h + include/pls/internal/scheduling/lock_free/task_manager.h src/internal/scheduling/lock_free/task_manager.cpp + include/pls/internal/scheduling/lock_free/external_trading_deque.h src/internal/scheduling/lock_free/external_trading_deque.cpp + include/pls/internal/scheduling/lock_free/traded_cas_field.h) # Dependencies for pls target_link_libraries(pls Threads::Threads) diff --git a/lib/pls/include/pls/internal/helpers/tsan_fiber_api.h b/lib/pls/include/pls/internal/helpers/tsan_fiber_api.h deleted file mode 100644 index 04df6f0..0000000 --- a/lib/pls/include/pls/internal/helpers/tsan_fiber_api.h +++ /dev/null @@ -1,21 +0,0 @@ - -#ifndef PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ -#define PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ - -extern "C" { -// Fiber switching API. -// - TSAN context for fiber can be created by __tsan_create_fiber -// and freed by __tsan_destroy_fiber. -// - TSAN context of current fiber or thread can be obtained -// by calling __tsan_get_current_fiber. -// - __tsan_switch_to_fiber should be called immediatly before switch -// to fiber, such as call of swapcontext. -// - Fiber name can be set by __tsan_set_fiber_name. -void *__tsan_get_current_fiber(void); -void *__tsan_create_fiber(unsigned flags); -void __tsan_destroy_fiber(void *fiber); -void __tsan_switch_to_fiber(void *fiber, unsigned flags); -void __tsan_set_fiber_name(void *fiber, const char *name); -}; - -#endif //PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ diff --git a/lib/pls/include/pls/internal/scheduling/base_task.h b/lib/pls/include/pls/internal/scheduling/base_task.h new file mode 100644 index 0000000..5cbe16e --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/base_task.h @@ -0,0 +1,66 @@ +#ifndef PLS_BASE_TASK_H +#define PLS_BASE_TASK_H + +#include +#include + +#include "context_switcher/continuation.h" +#include "context_switcher/context_switcher.h" + +namespace pls::internal::scheduling { +/** + * A task is the smallest unit of execution seen by the runtime system. + * + * Tasks represent a action dispatched by a potentially parallel call. + * Tasks have their own execution context (stack and register state), making them stackefull coroutines. + * Tasks can be suspended and resumed (stealing happens by resuming a task). + * + * Being coroutines tasks go through a very deliberate state machine: + * - initialized (no execution state) + * - running (currently executing user code) + * - suspended (suspended by switching to a different task). + * + * This base_task can be extended by different trading/stealing implementations, + * to add for example additional flags. The scheduler itself always works solely with this base version. + */ +struct base_task { + base_task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : + depth_{depth}, + thread_id_{thread_id}, + stack_memory_{stack_memory}, + stack_size_{stack_size}, + is_synchronized_{false}, + prev_{nullptr}, + next_{nullptr} {} + + // Do not allow accidental copy/move operations. + // The whole runtime relies on tasks never changing memory positions during execution. + // Create tasks ONCE and use them until the runtime is shut down. + base_task(const base_task &other) = delete; + base_task(base_task &&other) = delete; + base_task &operator=(const base_task &other) = delete; + base_task &operator=(base_task &&other) = delete; + + template + context_switcher::continuation run_as_task(F &&lambda) { + return context_switcher::enter_context(stack_memory_, stack_size_, std::forward(lambda)); + } + + // General task information + unsigned depth_; + unsigned thread_id_; + + // Stack/continuation management + char *stack_memory_; + size_t stack_size_; + context_switcher::continuation continuation_; + bool is_synchronized_; + + // Linked list for trading/memory management + base_task *prev_; + base_task *next_; +}; + +} + +#endif //PLS_BASE_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h deleted file mode 100644 index e873fe1..0000000 --- a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h +++ /dev/null @@ -1,106 +0,0 @@ - -#ifndef PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ -#define PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ - -#include -#include -#include - -#include "pls/internal/base/error_handling.h" -#include "pls/internal/base/system_details.h" - -#include "pls/internal/data_structures/optional.h" -#include "pls/internal/data_structures/stamped_integer.h" - -#include "pls/internal/scheduling/traded_cas_field.h" -#include "pls/internal/scheduling/task.h" - -namespace pls::internal::scheduling { - -using namespace data_structures; - -struct trading_deque_entry { - std::atomic traded_task_{nullptr}; - std::atomic forwarding_stamp_{}; -}; - -/** - * A work stealing deque (single produces/consumer at the end, multiple consumers at the start). - * A task object can only be acquired by stealing consumers (from the start), - * when they also offer a task to trade in for it. - * - * The exchange of 'goods' (here tasks) happens atomically at a linearization point. - * This means that the owning thread always gets a tasks for each and every task that was - * successfully stolen. - * - * As each task is associated with memory this suffices to exchange memory blocks needed for execution. - */ -class external_trading_deque { - public: - external_trading_deque(unsigned thread_id, size_t num_entries) : thread_id_(thread_id), entries_(num_entries) {} - - static optional peek_traded_object(task *target_task); - static optional get_trade_object(task *target_task); - - /** - * Pushes a task on the bottom of the deque. - * The task itself wil be filled with the unique, synchronizing cas word. - * - * @param published_task The task to publish on the bottom of the deque. - */ - void push_bot(task *published_task); - - /** - * Tries to pop the last task on the deque. - * - * @return optional holding the popped task if successful. - */ - optional pop_bot(); - - struct peek_result { - peek_result(optional top_task, stamped_integer top_pointer) : top_task_{std::move(top_task)}, - top_pointer_{top_pointer} {}; - optional top_task_; - stamped_integer top_pointer_; - }; - - /** - * Peek at the current task on top of the deque. - * This is required, as we need to look at the task to figure out what we trade in for it. - * (Note: we could go without this by doing some tricks with top/bot pointers, but this - * is simpler and also more flexible if the traded objects are not as trivial as currently). - * - * @return a peek result containing the optional top task (if present) and the current head pointer. - */ - peek_result peek_top(); - - /** - * Tries to pop the task on top of the deque that was - * previously observed by 'peeking' at the deque. - * - * Returns the task if successful, returns nothing if - * either the peeked task is no longer at the top of the deque - * or another thread interfered and 'won' the task. - * - * @return optional holding the popped task if successful. - */ - optional pop_top(task *offered_task, peek_result peek_result); - - private: - void reset_bot_and_top(); - void decrease_bot(); - - // info on this deque - unsigned thread_id_; - std::vector entries_; - - // fields for stealing/interacting - stamped_integer bot_internal_{0, 0}; - - PLS_CACHE_ALIGN std::atomic top_{{0, 0}}; - PLS_CACHE_ALIGN std::atomic bot_{0}; -}; - -} - -#endif //PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h deleted file mode 100644 index 07c44c2..0000000 --- a/lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h +++ /dev/null @@ -1,59 +0,0 @@ -#ifndef PLS_HEAP_SCHEDULER_MEMORY_H -#define PLS_HEAP_SCHEDULER_MEMORY_H - -#include - -#include "pls/internal/base/thread.h" - -#include "pls/internal/scheduling/scheduler_memory.h" -#include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/scheduling/thread_state_static.h" - -namespace pls { -namespace internal { -namespace scheduling { - -template -class heap_scheduler_memory : public scheduler_memory { - public: - explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads}, - thread_vector_{}, - thread_state_vector_{}, - thread_state_pointers_{} { - thread_vector_.reserve(max_threads); - thread_state_vector_.reserve(max_threads); - - for (size_t i = 0; i < max_threads; i++) { - thread_vector_.emplace_back(); - thread_state_vector_.emplace_back(); - thread_state_pointers_.emplace_back(&thread_state_vector_[i].get_thread_state()); - } - thread_states_array_ = thread_state_pointers_.data(); - } - - size_t max_threads() const override { - return max_threads_; - } - - base::thread &thread_for(size_t id) override { - return thread_vector_[id]; - } - private: - using thread_state_type = thread_state_static; - // thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require - // the new operator to obey alignments bigger than 16, cache lines are usually 64). - // To allow this object to be allocated using 'new' (which the vector does internally), - // we need to wrap it in an non aligned object. - using thread_state_wrapper = base::alignment::cache_alignment_wrapper; - - size_t max_threads_; - std::vector thread_vector_; - std::vector thread_state_vector_; - std::vector thread_state_pointers_; -}; - -} -} -} - -#endif // PLS_HEOP_SCHEDULER_MEMORY_H diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h b/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h new file mode 100644 index 0000000..75b95e2 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h @@ -0,0 +1,105 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ +#define PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ + +#include +#include +#include + +#include "pls/internal/base/error_handling.h" +#include "pls/internal/base/system_details.h" + +#include "pls/internal/data_structures/optional.h" +#include "pls/internal/data_structures/stamped_integer.h" + +#include "pls/internal/scheduling/lock_free/task.h" + +namespace pls::internal::scheduling::lock_free { + +using namespace data_structures; + +struct trading_deque_entry { + std::atomic traded_task_{nullptr}; + std::atomic forwarding_stamp_{}; +}; + +/** + * A work stealing deque (single produces/consumer at the end, multiple consumers at the start). + * A task object can only be acquired by stealing consumers (from the start), + * when they also offer a task to trade in for it. + * + * The exchange of 'goods' (here tasks) happens atomically at a linearization point. + * This means that the owning thread always gets a tasks for each and every task that was + * successfully stolen. + * + * As each task is associated with memory this suffices to exchange memory blocks needed for execution. + */ +class external_trading_deque { + public: + external_trading_deque(unsigned thread_id, size_t num_entries) : thread_id_(thread_id), entries_(num_entries) {} + + static optional peek_traded_object(task *target_task); + static optional get_trade_object(task *target_task); + + /** + * Pushes a task on the bottom of the deque. + * The task itself wil be filled with the unique, synchronizing cas word. + * + * @param published_task The task to publish on the bottom of the deque. + */ + void push_bot(task *published_task); + + /** + * Tries to pop the last task on the deque. + * + * @return optional holding the popped task if successful. + */ + optional pop_bot(); + + struct peek_result { + peek_result(optional top_task, stamped_integer top_pointer) : top_task_{std::move(top_task)}, + top_pointer_{top_pointer} {}; + optional top_task_; + stamped_integer top_pointer_; + }; + + /** + * Peek at the current task on top of the deque. + * This is required, as we need to look at the task to figure out what we trade in for it. + * (Note: we could go without this by doing some tricks with top/bot pointers, but this + * is simpler and also more flexible if the traded objects are not as trivial as currently). + * + * @return a peek result containing the optional top task (if present) and the current head pointer. + */ + peek_result peek_top(); + + /** + * Tries to pop the task on top of the deque that was + * previously observed by 'peeking' at the deque. + * + * Returns the task if successful, returns nothing if + * either the peeked task is no longer at the top of the deque + * or another thread interfered and 'won' the task. + * + * @return optional holding the popped task if successful. + */ + optional pop_top(task *offered_task, peek_result peek_result); + + private: + void reset_bot_and_top(); + void decrease_bot(); + + // info on this deque + unsigned thread_id_; + std::vector entries_; + + // fields for stealing/interacting + stamped_integer bot_internal_{0, 0}; + + PLS_CACHE_ALIGN std::atomic top_{{0, 0}}; + PLS_CACHE_ALIGN std::atomic bot_{0}; +}; + +} + +#endif //PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/task.h b/lib/pls/include/pls/internal/scheduling/lock_free/task.h new file mode 100644 index 0000000..50d4b41 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/lock_free/task.h @@ -0,0 +1,23 @@ + +#ifndef PLS_LOCK_FREE_TASK_H_ +#define PLS_LOCK_FREE_TASK_H_ + +#include "pls/internal/scheduling/base_task.h" +#include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/scheduling/lock_free/traded_cas_field.h" + +namespace pls::internal::scheduling::lock_free { + +struct task : public base_task { + task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : + base_task(stack_memory, stack_size, depth, thread_id) {} + + // Additional info for lock-free stealing and resource trading. + std::atomic external_trading_deque_cas_{}; + std::atomic resource_stack_next_{}; + std::atomic resource_stack_root_{{0, 0}}; +}; + +} + +#endif //PLS_LOCK_FREE_TASK_H_ diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/task_manager.h b/lib/pls/include/pls/internal/scheduling/lock_free/task_manager.h new file mode 100644 index 0000000..013a2d2 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/lock_free/task_manager.h @@ -0,0 +1,59 @@ + +#ifndef PLS_LOCK_FREE_TASK_MANAGER_H_ +#define PLS_LOCK_FREE_TASK_MANAGER_H_ + +#include +#include +#include + +#include "pls/internal/base/stack_allocator.h" + +#include "pls/internal/scheduling/lock_free/task.h" +#include "pls/internal/scheduling/lock_free/external_trading_deque.h" + +namespace pls::internal::scheduling { +struct thread_state; +} + +namespace pls::internal::scheduling::lock_free { + +/** + * Handles management of tasks in the system. Each thread has a local task manager, + * responsible for allocating, freeing and publishing tasks for stealing. + * + * All interaction for spawning, stealing and task trading are managed through this class. + */ +class task_manager { + using stack_allocator = pls::internal::base::stack_allocator; + + public: + explicit task_manager(unsigned thread_id, + size_t num_tasks, + size_t stack_size, + std::shared_ptr &stack_allocator); + ~task_manager(); + + task *get_task(size_t index) { return tasks_[index].get(); } + + // Local scheduling + void push_local_task(base_task *pushed_task); + base_task *pop_local_task(); + + // Stealing work, automatically trades in another task + base_task *steal_task(thread_state &stealing_state); + + // Sync/memory management + base_task *pop_clean_task_chain(base_task *task); + private: + // Internal helpers for resource stack on tasks + void push_resource_on_task(task *target_task, task *spare_task_chain); + task *pop_resource_from_task(task *target_task); + + std::shared_ptr stack_allocator_; + std::vector> tasks_; + + external_trading_deque deque_; +}; +} + +#endif //PLS_LOCK_FREE_TASK_MANAGER_H_ diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h b/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h new file mode 100644 index 0000000..d61a28d --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h @@ -0,0 +1,84 @@ + +#ifndef PLS_LOCK_FREE_TRADED_CAS_FIELD_H_ +#define PLS_LOCK_FREE_TRADED_CAS_FIELD_H_ + +#include + +#include "pls/internal/base/error_handling.h" +#include "pls/internal/base/system_details.h" + +namespace pls::internal::scheduling::lock_free { + +struct task; +struct traded_cas_field { + static_assert(base::system_details::CACHE_LINE_SIZE >= 4, + "Traded objects must not use their last address bits, as we use them for status flags." + "As traded objects are usually cache aligned, we need big enough cache lines."); + + // Base size of our CAS integer/pointer + static constexpr base::system_details::cas_integer CAS_SIZE = base::system_details::CAS_SIZE; + + // States of the integer (tag indicating current content) + static constexpr base::system_details::cas_integer EMPTY_TAG = 0x0lu; + static constexpr base::system_details::cas_integer STAMP_TAG = 0x1lu; + static constexpr base::system_details::cas_integer TRADE_TAG = 0x2lu; + + // Bitmasks and shifts for cas_integer_, two variants: + // cas_integer_ = traded object | tag + // cas_integer_ = stamp | id | tag + static constexpr base::system_details::cas_integer TAG_SIZE = 2ul; + static constexpr base::system_details::cas_integer TAG_BITS = ~((~0x0ul) << TAG_SIZE); + + static constexpr base::system_details::cas_integer TRADED_OBJECT_SIZE = CAS_SIZE - TAG_SIZE; + static constexpr base::system_details::cas_integer TRADED_OBJECT_SHIFT = TAG_SIZE; + static constexpr base::system_details::cas_integer + TRADE_OBJECT_BITS = ~((~0x0ul) << TRADED_OBJECT_SIZE) << TRADED_OBJECT_SHIFT; + + static constexpr base::system_details::cas_integer ID_SIZE = 10ul; // Up to 1024 cores + static constexpr base::system_details::cas_integer ID_SHIFT = TAG_SIZE; + static constexpr base::system_details::cas_integer ID_BITS = ~((~0x0ul) << ID_SIZE) << ID_SHIFT; + + static constexpr base::system_details::cas_integer STAMP_SIZE = CAS_SIZE - TAG_SIZE - ID_SIZE; + static constexpr base::system_details::cas_integer STAMP_SHIFT = TAG_SIZE + ID_SIZE; + static constexpr base::system_details::cas_integer STAMP_BITS = ~((~0x0ul) << STAMP_SIZE) << STAMP_SHIFT; + + public: + void fill_with_stamp(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) { + cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | STAMP_TAG); + } + base::system_details::cas_integer get_stamp() { + PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); + return (((base::system_details::cas_integer) cas_integer_) & STAMP_BITS) >> STAMP_SHIFT; + } + base::system_details::cas_integer get_deque_id() { + PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); + return (((base::system_details::cas_integer) cas_integer_) & ID_BITS) >> ID_SHIFT; + } + bool is_filled_with_stamp() { + return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == STAMP_TAG; + } + + void fill_with_trade_object(task *new_task) { + PLS_ASSERT((((base::system_details::cas_integer) new_task) & TAG_BITS) == 0, + "Must only store aligned objects in this data structure (last bits are needed for tag bit)"); + cas_integer_ = (((base::system_details::cas_integer) new_task) | TRADE_TAG); + } + task *get_trade_object() { + PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one."); + return reinterpret_cast(((base::system_details::cas_integer) cas_integer_) & TRADE_OBJECT_BITS); + } + bool is_filled_with_object() { + return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == TRADE_TAG; + } + + bool is_empty() { + return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == EMPTY_TAG; + } + + private: + base::system_details::cas_integer cas_integer_{}; +}; + +} + +#endif //PLS_LOCK_FREE_TRADED_CAS_FIELD_H_ diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index ec0e4fb..0162b82 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -16,15 +16,14 @@ #include "pls/internal/scheduling/task_manager.h" namespace pls::internal::scheduling { - -struct task; - /** * The scheduler is the central part of the dispatching-framework. * It manages a pool of worker threads (creates, sleeps/wakes up, destroys) * and allows to execute parallel sections. * - * It works in close relation with the 'task' class for scheduling. + * It works in close relation with the 'task' and 'task_manager' class for scheduling. + * The task_manager handles the data structure for stealing/resource trading, + * the scheduler handles the high level execution flow (allowing the stealing implementation to be exchanged). */ class scheduler { public: @@ -85,17 +84,24 @@ class scheduler { */ static void sync(); - thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; } - task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; } - /** * Explicitly terminate the worker threads. Scheduler must not be used after this. */ void terminate(); [[nodiscard]] unsigned int num_threads() const { return num_threads_; } + [[nodiscard]] static base_task &task_chain_at(unsigned int depth, thread_state &calling_state); + + static bool check_task_chain_forward(base_task &start_task); + static bool check_task_chain_backward(base_task &start_task); + static bool check_task_chain(base_task &start_task); + + thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; } + task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; } private: + static context_switcher::continuation slow_return(thread_state &calling_state); + static void work_thread_main_loop(); void work_thread_work_section(); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 3c3a73b..bdcc241 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -7,11 +7,12 @@ #include "context_switcher/context_switcher.h" #include "context_switcher/continuation.h" -#include "pls/internal/scheduling/task_manager.h" -#include "pls/internal/scheduling/task.h" - #include "pls/internal/helpers/profiler.h" +#include "pls/internal/scheduling/task_manager.h" +#include "pls/internal/scheduling/base_task.h" +#include "base_task.h" + namespace pls::internal::scheduling { template @@ -63,8 +64,8 @@ class scheduler::init_function_impl : public init_function { public: explicit init_function_impl(F &function) : function_{function} {} void run() override { - auto &root_task = thread_state::get().get_task_manager().get_active_task(); - root_task.run_as_task([&](context_switcher::continuation cont) { + base_task *root_task = thread_state::get().get_active_task(); + root_task->run_as_task([root_task, this](::context_switcher::continuation cont) { thread_state::get().main_continuation() = std::move(cont); function_(); thread_state::get().get_scheduler().work_section_done_.store(true); @@ -100,7 +101,53 @@ void scheduler::perform_work(Function work_section) { template void scheduler::spawn(Function &&lambda) { - thread_state::get().get_task_manager().spawn_child(std::forward(lambda)); + thread_state &spawning_state = thread_state::get(); + + base_task *last_task = spawning_state.get_active_task(); + base_task *spawned_task = last_task->next_; + + auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) { + // allow stealing threads to continue the last task. + last_task->continuation_ = std::move(cont); + + // we are now executing the new task, allow others to steal the last task continuation. + spawned_task->is_synchronized_ = true; + spawning_state.set_active_task(spawned_task); + spawning_state.get_task_manager().push_local_task(last_task); + + // execute the lambda itself, which could lead to a different thread returning. + lambda(); + thread_state &syncing_state = thread_state::get(); + PLS_ASSERT(syncing_state.get_active_task() == spawned_task, + "Task manager must always point its active task onto whats executing."); + + // try to pop a task of the syncing task manager. + // possible outcomes: + // - this is a different task manager, it must have an empty deque and fail + // - this is the same task manager and someone stole last tasks, thus this will fail + // - this is the same task manager and no one stole the last task, this this will succeed + base_task *popped_task = syncing_state.get_task_manager().pop_local_task(); + if (popped_task) { + // Fast path, simply continue execution where we left of before spawn. + PLS_ASSERT(popped_task == last_task, + "Fast path, nothing can have changed until here."); + PLS_ASSERT(&spawning_state == &syncing_state, + "Fast path, we must only return if the task has not been stolen/moved to other thread."); + PLS_ASSERT(last_task->continuation_.valid(), + "Fast path, no one can have continued working on the last task."); + + syncing_state.set_active_task(last_task); + return std::move(last_task->continuation_); + } else { + // Slow path, the last task was stolen. This path is common to sync() events. + return slow_return(syncing_state); + } + }); + + if (continuation.valid()) { + // We jumped in here from the main loop, keep track! + thread_state::get().main_continuation() = std::move(continuation); + } } } diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h deleted file mode 100644 index 32888fd..0000000 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ /dev/null @@ -1,74 +0,0 @@ -#ifndef PLS_TASK_H -#define PLS_TASK_H - -#include -#include - -#include "context_switcher/continuation.h" -#include "context_switcher/context_switcher.h" - -#include "pls/internal/base/system_details.h" -#include "pls/internal/data_structures/stamped_integer.h" -#include "pls/internal/scheduling/traded_cas_field.h" - -namespace pls::internal::scheduling { -/** - * A task is the smallest unit of execution seen by the runtime system. - * - * Tasks represent a action dispatched by a potentially parallel call. - * Tasks have their own execution context (stack and register state), making them stackefull coroutines. - * Tasks can be suspended and resumed (stealing happens by resuming a task). - * - * Being coroutines tasks go through a very deliberate state machine: - * - initialized (no execution state) - * - running (currently executing user code) - * - suspended (suspended by switching to a different task). - */ -struct PLS_CACHE_ALIGN task { - task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : - stack_memory_{stack_memory}, - stack_size_{stack_size}, - is_synchronized_{false}, - depth_{depth}, - thread_id_{thread_id}, - prev_{nullptr}, - next_{nullptr} {} - - // Do not allow accidental copy/move operations. - // The whole runtime relies on tasks never changing memory positions during execution. - // Create tasks ONCE and use them until the runtime is shut down. - task(const task &other) = delete; - task(task &&other) = delete; - task &operator=(const task &other) = delete; - task &operator=(task &&other) = delete; - - template - context_switcher::continuation run_as_task(F &&lambda) { - return context_switcher::enter_context(stack_memory_, stack_size_, std::forward(lambda)); - } - - // TODO: Proper access control and split it up into responsibilities - // Stack/Continuation Management - char *stack_memory_; - size_t stack_size_; - context_switcher::continuation continuation_; - bool is_synchronized_; - - // TODO: Clean up responsibilities - // Work-Stealing - std::atomic external_trading_deque_cas_{}; - std::atomic resource_stack_next_{}; - std::atomic resource_stack_root_{{0, 0}}; - - // Task Tree (we have a parent that we want to continue when we finish) - unsigned depth_; - unsigned thread_id_; - - // Memory Linked List - task *prev_; - task *next_; -}; - -} - -#endif //PLS_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 7ec2637..92a0899 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -1,86 +1,31 @@ #ifndef PLS_TASK_MANAGER_H_ #define PLS_TASK_MANAGER_H_ - -#include -#include -#include - -#include "pls/internal/scheduling/task.h" -#include "pls/internal/scheduling/external_trading_deque.h" - -#include "pls/internal/base/stack_allocator.h" - -namespace pls::internal::scheduling { - /** - * Handles management of tasks in the system. Each thread has a local task manager, - * responsible for allocating, freeing and publishing tasks for stealing. - * - * All interaction for spawning, stealing and task trading are managed through this class. + * Decision point for different task managing variants: + * - custom, lock-free variant (implemented) + * - basic, locking variant (planned) + * - transactional variant (planned) */ -class task_manager { - using stack_allocator = pls::internal::base::stack_allocator; - - public: - explicit task_manager(unsigned thread_id, - size_t num_tasks, - size_t stack_size, - std::shared_ptr &stack_allocator); - ~task_manager(); - - void push_resource_on_task(task *target_task, task *spare_task_chain); - task *pop_resource_from_task(task *target_task); +#include "lock_free/task_manager.h" - task &get_this_thread_task(size_t depth) { - return *tasks_[depth]; - } - - task &get_active_task() { - return *active_task_; - } - void set_active_task(task *active_task) { - active_task_ = active_task; - } - - template - void spawn_child(F &&lambda); - void sync(); - - task *steal_task(task_manager &stealing_task_manager); - - bool try_clean_return(context_switcher::continuation &result_cont); - - /** - * Helper to check if a task chain is correctly chained forward form the given starting task. - * - * @param start_task The start of the 'to be clean' chain - * @return true if the chain is clean/consistent. - */ - bool check_task_chain_forward(task *start_task); - /** - * Helper to check if a task chain is correctly chained backward form the given starting task. - * - * @param start_task The end of the 'to be clean' chain - * @return true if the chain was is clean/consistent. - */ - bool check_task_chain_backward(task *start_task); - /** - * Check the task chain maintained by this task manager. - * - * @return true if the chain is in a clean/consistent state. - */ - bool check_task_chain(); - - private: - std::shared_ptr stack_allocator_; - std::vector> tasks_; - task *active_task_; +namespace pls::internal::scheduling { - external_trading_deque deque_; -}; +#define PLS_DEQUE_LOCK_FREE 0 +#define PLS_DEQUE_LOCKING 1 +#define PLS_DEQUE_TRANSACTIONAL 2 +#define PLS_DEQUE_VARIANT PLS_DEQUE_LOCK_FREE + +#if PLS_DEQUE_VARIANT == PLS_DEQUE_LOCK_FREE +using pls::internal::scheduling::lock_free::task_manager; +#endif +#if PLS_DEQUE_VARIANT == PLS_DEQUE_LOCKING +#error "Not Implemented!" +#endif +#if PLS_DEQUE_VARIANT == PLS_DEQUE_TRANSACTIONAL +#error "Not Implemented!" +#endif } -#include "task_manager_impl.h" #endif //PLS_TASK_MANAGER_H_ diff --git a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h deleted file mode 100644 index 07d1d78..0000000 --- a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h +++ /dev/null @@ -1,78 +0,0 @@ - -#ifndef PLS_TASK_MANAGER_IMPL_H_ -#define PLS_TASK_MANAGER_IMPL_H_ - -#include -#include -#include - -#include "context_switcher/continuation.h" - -#include "pls/internal/scheduling/task.h" -#include "pls/internal/scheduling/thread_state.h" - -namespace pls::internal::scheduling { - -template -void task_manager::spawn_child(F &&lambda) { - auto *spawning_task_manager = this; - auto *last_task = spawning_task_manager->active_task_; - auto *spawned_task = spawning_task_manager->active_task_->next_; - - auto continuation = - spawned_task->run_as_task([=](context_switcher::continuation cont) { - // allow stealing threads to continue the last task. - last_task->continuation_ = std::move(cont); - - // we are now executing the new task, allow others to steal the last task continuation. - spawned_task->is_synchronized_ = true; - spawning_task_manager->active_task_ = spawned_task; - spawning_task_manager->deque_.push_bot(last_task); - - // execute the lambda itself, which could lead to a different thread returning. - lambda(); - auto *syncing_task_manager = &thread_state::get().get_task_manager(); - PLS_ASSERT(syncing_task_manager->active_task_ == spawned_task, - "Task manager must always point its active task onto whats executing."); - - // try to pop a task of the syncing task manager. - // possible outcomes: - // - this is a different task manager, it must have an empty deque and fail - // - this is the same task manager and someone stole last tasks, thus this will fail - // - this is the same task manager and no one stole the last task, this this will succeed - auto pop_result = syncing_task_manager->deque_.pop_bot(); - if (pop_result) { - // Fast path, simply continue execution where we left of before spawn. - PLS_ASSERT(*pop_result == last_task, - "Fast path, nothing can have changed until here."); - PLS_ASSERT(spawning_task_manager == syncing_task_manager, - "Fast path, nothing can have changed here."); - PLS_ASSERT(last_task->continuation_.valid(), - "Fast path, no one can have continued working on the last task."); - - syncing_task_manager->active_task_ = last_task; - return std::move(last_task->continuation_); - } else { - // Slow path, the last task was stolen. Sync using the resource stack. - context_switcher::continuation result_cont; - if (syncing_task_manager->try_clean_return(result_cont)) { - // We return back to the main scheduling loop - PLS_ASSERT(result_cont.valid(), "Must only return valid continuations..."); - return result_cont; - } else { - // We finish up the last task and are the sole owner again - PLS_ASSERT(result_cont.valid(), "Must only return valid continuations..."); - return result_cont; - } - } - }); - - if (continuation.valid()) { - // We jumped in here from the main loop, keep track! - thread_state::get().main_continuation() = std::move(continuation); - } -} - -} - -#endif //PLS_TASK_MANAGER_IMPL_H_ diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 7f717fd..5df607c 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -10,11 +10,12 @@ #include "pls/internal/base/system_details.h" +#include "pls/internal/scheduling/base_task.h" +#include "pls/internal/scheduling/task_manager.h" + namespace pls::internal::scheduling { class scheduler; -class task_manager; - /** * Proxy-Object for thread local state needed during scheduling. * The main use is to perform thread_state::get() as a thread local @@ -29,8 +30,9 @@ struct PLS_CACHE_ALIGN thread_state { scheduler &scheduler_; task_manager &task_manager_; - PLS_CACHE_ALIGN context_switcher::continuation main_loop_continuation_; - PLS_CACHE_ALIGN std::minstd_rand random_; + context_switcher::continuation main_loop_continuation_; + std::minstd_rand random_; + base_task *active_task_; public: explicit thread_state(scheduler &scheduler, @@ -39,7 +41,8 @@ struct PLS_CACHE_ALIGN thread_state { thread_id_{thread_id}, scheduler_{scheduler}, task_manager_{task_manager}, - random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count())} {}; + random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count()) + thread_id}, + active_task_{task_manager.get_task(0)} {}; // Do not allow accidental copy/move operations. thread_state(const thread_state &) = delete; @@ -69,6 +72,9 @@ struct PLS_CACHE_ALIGN thread_state { [[nodiscard]] context_switcher::continuation &main_continuation() { return main_loop_continuation_; } + + void set_active_task(base_task *active_task) { active_task_ = active_task; } + base_task *get_active_task() const { return active_task_; } }; } diff --git a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h deleted file mode 100644 index 3d31528..0000000 --- a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h +++ /dev/null @@ -1,84 +0,0 @@ - -#ifndef PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ -#define PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ - -#include - -#include "pls/internal/base/error_handling.h" -#include "pls/internal/base/system_details.h" - -namespace pls::internal::scheduling { - -struct task; -struct traded_cas_field { - static_assert(base::system_details::CACHE_LINE_SIZE >= 4, - "Traded objects must not use their last address bits, as we use them for status flags." - "As traded objects are usually cache aligned, we need big enough cache lines."); - - // Base size of our CAS integer/pointer - static constexpr base::system_details::cas_integer CAS_SIZE = base::system_details::CAS_SIZE; - - // States of the integer (tag indicating current content) - static constexpr base::system_details::cas_integer EMPTY_TAG = 0x0lu; - static constexpr base::system_details::cas_integer STAMP_TAG = 0x1lu; - static constexpr base::system_details::cas_integer TRADE_TAG = 0x2lu; - - // Bitmasks and shifts for cas_integer_, two variants: - // cas_integer_ = traded object | tag - // cas_integer_ = stamp | id | tag - static constexpr base::system_details::cas_integer TAG_SIZE = 2ul; - static constexpr base::system_details::cas_integer TAG_BITS = ~((~0x0ul) << TAG_SIZE); - - static constexpr base::system_details::cas_integer TRADED_OBJECT_SIZE = CAS_SIZE - TAG_SIZE; - static constexpr base::system_details::cas_integer TRADED_OBJECT_SHIFT = TAG_SIZE; - static constexpr base::system_details::cas_integer - TRADE_OBJECT_BITS = ~((~0x0ul) << TRADED_OBJECT_SIZE) << TRADED_OBJECT_SHIFT; - - static constexpr base::system_details::cas_integer ID_SIZE = 10ul; // Up to 1024 cores - static constexpr base::system_details::cas_integer ID_SHIFT = TAG_SIZE; - static constexpr base::system_details::cas_integer ID_BITS = ~((~0x0ul) << ID_SIZE) << ID_SHIFT; - - static constexpr base::system_details::cas_integer STAMP_SIZE = CAS_SIZE - TAG_SIZE - ID_SIZE; - static constexpr base::system_details::cas_integer STAMP_SHIFT = TAG_SIZE + ID_SIZE; - static constexpr base::system_details::cas_integer STAMP_BITS = ~((~0x0ul) << STAMP_SIZE) << STAMP_SHIFT; - - public: - void fill_with_stamp(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) { - cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | STAMP_TAG); - } - base::system_details::cas_integer get_stamp() { - PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); - return (((base::system_details::cas_integer) cas_integer_) & STAMP_BITS) >> STAMP_SHIFT; - } - base::system_details::cas_integer get_deque_id() { - PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); - return (((base::system_details::cas_integer) cas_integer_) & ID_BITS) >> ID_SHIFT; - } - bool is_filled_with_stamp() { - return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == STAMP_TAG; - } - - void fill_with_trade_object(task *new_task) { - PLS_ASSERT((((base::system_details::cas_integer) new_task) & TAG_BITS) == 0, - "Must only store aligned objects in this data structure (last bits are needed for tag bit)"); - cas_integer_ = (((base::system_details::cas_integer) new_task) | TRADE_TAG); - } - task *get_trade_object() { - PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one."); - return reinterpret_cast(((base::system_details::cas_integer) cas_integer_) & TRADE_OBJECT_BITS); - } - bool is_filled_with_object() { - return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == TRADE_TAG; - } - - bool is_empty() { - return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == EMPTY_TAG; - } - - private: - base::system_details::cas_integer cas_integer_{}; -}; - -} - -#endif //PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ diff --git a/lib/pls/src/internal/scheduling/base_task.cpp b/lib/pls/src/internal/scheduling/base_task.cpp new file mode 100644 index 0000000..47b739e --- /dev/null +++ b/lib/pls/src/internal/scheduling/base_task.cpp @@ -0,0 +1,9 @@ +#include "pls/internal/scheduling/base_task.h" + +namespace pls { +namespace internal { +namespace scheduling { + +} +} +} diff --git a/lib/pls/src/internal/scheduling/external_trading_deque.cpp b/lib/pls/src/internal/scheduling/external_trading_deque.cpp deleted file mode 100644 index 927528e..0000000 --- a/lib/pls/src/internal/scheduling/external_trading_deque.cpp +++ /dev/null @@ -1,136 +0,0 @@ -#include "pls/internal/scheduling/external_trading_deque.h" - -namespace pls::internal::scheduling { - -optional external_trading_deque::peek_traded_object(task *target_task) { - traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); - if (current_cas.is_filled_with_object()) { - return optional{current_cas.get_trade_object()}; - } else { - return optional{}; - } -} - -optional external_trading_deque::get_trade_object(task *target_task) { - traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); - if (current_cas.is_filled_with_object()) { - task *result = current_cas.get_trade_object(); - traded_cas_field empty_cas; - if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { - return optional{result}; - } - } - - return optional{}; -} - -void external_trading_deque::push_bot(task *published_task) { - auto expected_stamp = bot_internal_.stamp; - auto ¤t_entry = entries_[bot_internal_.value]; - - // Publish the prepared task in the deque. - current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); - current_entry.traded_task_.store(published_task, std::memory_order_relaxed); - - // Field that all threads synchronize on. - // This happens not in the deque itself, but in the published task. - traded_cas_field sync_cas_field; - sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); - published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); - - // Advance the bot pointer. Linearization point for making the task public. - bot_internal_.stamp++; - bot_internal_.value++; - bot_.store(bot_internal_.value, std::memory_order_release); -} - -void external_trading_deque::reset_bot_and_top() { - bot_internal_.value = 0; - bot_internal_.stamp++; - - bot_.store(0); - top_.store({bot_internal_.stamp, 0}); -} - -void external_trading_deque::decrease_bot() { - bot_internal_.value--; - bot_.store(bot_internal_.value, std::memory_order_relaxed); -} - -optional external_trading_deque::pop_bot() { - if (bot_internal_.value == 0) { - reset_bot_and_top(); - return optional{}; - } - decrease_bot(); - - auto ¤t_entry = entries_[bot_internal_.value]; - auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); - auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); - - // We know what value must be in the cas field if no other thread stole it. - traded_cas_field expected_sync_cas_field; - expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); - traded_cas_field empty_cas_field; - - if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, - empty_cas_field, - std::memory_order_acq_rel)) { - return optional{popped_task}; - } else { - reset_bot_and_top(); - return optional{}; - } -} - -external_trading_deque::peek_result external_trading_deque::peek_top() { - auto local_top = top_.load(); - auto local_bot = bot_.load(); - - if (local_top.value < local_bot) { - return peek_result{optional{entries_[local_top.value].traded_task_}, local_top}; - } else { - return peek_result{optional{}, local_top}; - } -} - -optional external_trading_deque::pop_top(task *offered_task, peek_result peek_result) { - stamped_integer expected_top = peek_result.top_pointer_; - auto local_bot = bot_.load(); - if (expected_top.value >= local_bot) { - return data_structures::optional{}; - } - - auto &target_entry = entries_[expected_top.value]; - - // Read our potential result - task *result = target_entry.traded_task_.load(); - unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); - - // Try to get it by CAS with the expected field entry, giving up our offered_task for it - traded_cas_field expected_sync_cas_field; - expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_); - - traded_cas_field offered_field; - offered_field.fill_with_trade_object(offered_task); - - if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { - // We got it, for sure move the top pointer forward. - top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); - // Return the stolen task - return data_structures::optional{result}; - } else { - // We did not get it...help forwarding the top pointer anyway. - if (expected_top.stamp == forwarding_stamp) { - // ...move the pointer forward if someone else put a valid trade object in there. - top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); - } else { - // ...we failed because the top tag lags behind...try to fix it. - // This means only updating the tag, as this location can still hold data we need. - top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value}); - } - return data_structures::optional{}; - } -} - -} diff --git a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp new file mode 100644 index 0000000..c2d322d --- /dev/null +++ b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp @@ -0,0 +1,137 @@ +#include "pls/internal/scheduling/lock_free/external_trading_deque.h" +#include "pls/internal/scheduling/lock_free/traded_cas_field.h" + +namespace pls::internal::scheduling::lock_free { + +optional external_trading_deque::peek_traded_object(task *target_task) { + traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); + if (current_cas.is_filled_with_object()) { + return optional{current_cas.get_trade_object()}; + } else { + return optional{}; + } +} + +optional external_trading_deque::get_trade_object(task *target_task) { + traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); + if (current_cas.is_filled_with_object()) { + task *result = current_cas.get_trade_object(); + traded_cas_field empty_cas; + if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { + return optional{result}; + } + } + + return optional{}; +} + +void external_trading_deque::push_bot(task *published_task) { + auto expected_stamp = bot_internal_.stamp; + auto ¤t_entry = entries_[bot_internal_.value]; + + // Publish the prepared task in the deque. + current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); + current_entry.traded_task_.store(published_task, std::memory_order_relaxed); + + // Field that all threads synchronize on. + // This happens not in the deque itself, but in the published task. + traded_cas_field sync_cas_field; + sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); + published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); + + // Advance the bot pointer. Linearization point for making the task public. + bot_internal_.stamp++; + bot_internal_.value++; + bot_.store(bot_internal_.value, std::memory_order_release); +} + +void external_trading_deque::reset_bot_and_top() { + bot_internal_.value = 0; + bot_internal_.stamp++; + + bot_.store(0); + top_.store({bot_internal_.stamp, 0}); +} + +void external_trading_deque::decrease_bot() { + bot_internal_.value--; + bot_.store(bot_internal_.value, std::memory_order_relaxed); +} + +optional external_trading_deque::pop_bot() { + if (bot_internal_.value == 0) { + reset_bot_and_top(); + return optional{}; + } + decrease_bot(); + + auto ¤t_entry = entries_[bot_internal_.value]; + auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); + auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); + + // We know what value must be in the cas field if no other thread stole it. + traded_cas_field expected_sync_cas_field; + expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); + traded_cas_field empty_cas_field; + + if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, + empty_cas_field, + std::memory_order_acq_rel)) { + return optional{popped_task}; + } else { + reset_bot_and_top(); + return optional{}; + } +} + +external_trading_deque::peek_result external_trading_deque::peek_top() { + auto local_top = top_.load(); + auto local_bot = bot_.load(); + + if (local_top.value < local_bot) { + return peek_result{optional{entries_[local_top.value].traded_task_}, local_top}; + } else { + return peek_result{optional{}, local_top}; + } +} + +optional external_trading_deque::pop_top(task *offered_task, peek_result peek_result) { + stamped_integer expected_top = peek_result.top_pointer_; + auto local_bot = bot_.load(); + if (expected_top.value >= local_bot) { + return data_structures::optional{}; + } + + auto &target_entry = entries_[expected_top.value]; + + // Read our potential result + task *result = target_entry.traded_task_.load(); + unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); + + // Try to get it by CAS with the expected field entry, giving up our offered_task for it + traded_cas_field expected_sync_cas_field; + expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_); + + traded_cas_field offered_field; + offered_field.fill_with_trade_object(offered_task); + + if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { + // We got it, for sure move the top pointer forward. + top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); + // Return the stolen task + return data_structures::optional{result}; + } else { + // We did not get it...help forwarding the top pointer anyway. + if (expected_top.stamp == forwarding_stamp) { + // ...move the pointer forward if someone else put a valid trade object in there. + top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); + } else { + // ...we failed because the top tag lags behind...try to fix it. + // This means only updating the tag, as this location can still hold data we need. + top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value}); + } + return data_structures::optional{}; + } +} + +} diff --git a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp new file mode 100644 index 0000000..c279f34 --- /dev/null +++ b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp @@ -0,0 +1,178 @@ +#include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/scheduler.h" + +#include "pls/internal/scheduling/lock_free/task_manager.h" +#include "pls/internal/scheduling/lock_free/task.h" + +namespace pls::internal::scheduling::lock_free { + +task_manager::task_manager(unsigned thread_id, + size_t num_tasks, + size_t stack_size, + std::shared_ptr &stack_allocator) : stack_allocator_{stack_allocator}, + tasks_{}, + deque_{thread_id, num_tasks} { + tasks_.reserve(num_tasks); + + for (size_t i = 0; i < num_tasks - 1; i++) { + char *stack_memory = stack_allocator->allocate_stack(stack_size); + tasks_.emplace_back(std::make_unique(stack_memory, stack_size, i, thread_id)); + + if (i > 0) { + tasks_[i - 1]->next_ = tasks_[i].get(); + tasks_[i]->prev_ = tasks_[i - 1].get(); + } + } +} + +task_manager::~task_manager() { + for (auto &task : tasks_) { + stack_allocator_->free_stack(task->stack_size_, task->stack_memory_); + } +} + +static task *find_task(unsigned id, unsigned depth) { + return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth); +} + +void task_manager::push_local_task(base_task *pushed_task) { + deque_.push_bot(static_cast(pushed_task)); +} + +base_task *task_manager::pop_local_task() { + auto result = deque_.pop_bot(); + if (result) { + return *result; + } else { + return nullptr; + } +} + +base_task *task_manager::steal_task(thread_state &stealing_state) { + PLS_ASSERT(stealing_state.get_active_task()->depth_ == 0, "Must only steal with clean task chain."); + PLS_ASSERT(scheduler::check_task_chain(*stealing_state.get_active_task()), "Must only steal with clean task chain."); + + auto peek = deque_.peek_top(); + if (peek.top_task_) { + // search for the task we want to trade in + task *stolen_task = static_cast(*peek.top_task_); + task *traded_task = static_cast(&scheduler::task_chain_at(stolen_task->depth_, stealing_state)); + + // keep a reference to the rest of the task chain that we keep + base_task *next_own_task = traded_task->next_; + // 'unchain' the traded tasks (to help us find bugs) + traded_task->next_ = nullptr; + + // perform the actual pop operation + auto pop_result_task = deque_.pop_top(traded_task, peek); + if (pop_result_task) { + PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_, + "It is impossible to steal an task we already own!"); + PLS_ASSERT(*pop_result_task == stolen_task, + "We must only steal the task that we peeked at!"); + + // TODO: the re-chaining should not be part of the task manager. + // The manager should only perform the steal + resource push. + + // the steal was a success, link the chain so we own the stolen part + stolen_task->next_ = next_own_task; + next_own_task->prev_ = stolen_task; + + // update the resource stack associated with the stolen task + push_resource_on_task(stolen_task, traded_task); + + auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task); + if (optional_exchanged_task) { + // All good, we pushed the task over to the stack, nothing more to do + PLS_ASSERT(*optional_exchanged_task == traded_task, + "We are currently executing this, no one else can put another task in this field!"); + } else { + // The last other active thread took it as its spare resource... + // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). + auto current_root = stolen_task->resource_stack_root_.load(); + current_root.stamp++; + current_root.value = 0; + stolen_task->resource_stack_root_.store(current_root); + } + + return stolen_task; + } else { + // the steal failed, reset our chain to its old, clean state (re-link what we have broken) + traded_task->next_ = next_own_task; + + return nullptr; + } + } else { + return nullptr; + } +} + +base_task *task_manager::pop_clean_task_chain(base_task *base_task) { + task *popped_task = static_cast(base_task); + // Try to get a clean resource chain to go back to the main stealing loop + task *clean_chain = pop_resource_from_task(popped_task); + if (clean_chain == nullptr) { + // double-check if we are really last one or we only have unlucky timing + auto optional_cas_task = external_trading_deque::get_trade_object(popped_task); + if (optional_cas_task) { + clean_chain = *optional_cas_task; + } else { + clean_chain = pop_resource_from_task(popped_task); + } + } + + return clean_chain; +} + +void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) { + PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_, + "Makes no sense to push task onto itself, as it is not clean by definition."); + PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, + "Must only push tasks with correct depth."); + + data_structures::stamped_integer current_root; + data_structures::stamped_integer target_root; + do { + current_root = target_task->resource_stack_root_.load(); + target_root.stamp = current_root.stamp + 1; + target_root.value = spare_task_chain->thread_id_ + 1; + + if (current_root.value == 0) { + // Empty, simply push in with no successor + spare_task_chain->resource_stack_next_.store(nullptr); + } else { + // Already an entry. Find it's corresponding task and set it as our successor. + auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); + spare_task_chain->resource_stack_next_.store(current_root_task); + } + + } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); +} + +task *task_manager::pop_resource_from_task(task *target_task) { + data_structures::stamped_integer current_root; + data_structures::stamped_integer target_root; + task *output_task; + do { + current_root = target_task->resource_stack_root_.load(); + if (current_root.value == 0) { + // Empty... + return nullptr; + } else { + // Found something, try to pop it + auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); + auto *next_stack_task = current_root_task->resource_stack_next_.load(); + + target_root.stamp = current_root.stamp + 1; + target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; + + output_task = current_root_task; + } + } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); + + PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains."); + output_task->resource_stack_next_.store(nullptr); + return output_task; +} + +} diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 29a776a..042f57d 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -38,10 +38,8 @@ void scheduler::work_thread_main_loop() { } void scheduler::work_thread_work_section() { - auto &my_state = thread_state::get(); - auto &my_task_manager = my_state.get_task_manager(); - - auto const num_threads = my_state.get_scheduler().num_threads(); + thread_state &my_state = thread_state::get(); + unsigned const num_threads = my_state.get_scheduler().num_threads(); if (my_state.get_thread_id() == 0) { // Main Thread, kick off by executing the user's main code block. @@ -50,42 +48,24 @@ void scheduler::work_thread_work_section() { unsigned int failed_steals = 0; while (!work_section_done_) { - PLS_ASSERT(my_task_manager.check_task_chain(), "Must start stealing with a clean task chain."); - - // TODO: move steal routine into separate function - const size_t target = my_state.get_rand() % num_threads; - if (target == my_state.get_thread_id()) { - continue; - } - - auto &target_state = my_state.get_scheduler().thread_state_for(target); - task *traded_task = target_state.get_task_manager().steal_task(my_task_manager); - - if (traded_task != nullptr) { - // The stealing procedure correctly changed our chain and active task. - // Now we need to perform the 'post steal' actions (manage resources and execute the stolen task). - PLS_ASSERT(my_task_manager.check_task_chain_forward(&my_task_manager.get_active_task()), + PLS_ASSERT(check_task_chain(*my_state.get_active_task()), "Must start stealing with a clean task chain."); + + size_t target; + do { + target = my_state.get_rand() % num_threads; + } while (target == my_state.get_thread_id()); + + thread_state &target_state = my_state.get_scheduler().thread_state_for(target); + base_task *stolen_task = target_state.get_task_manager().steal_task(my_state); + if (stolen_task) { + my_state.set_active_task(stolen_task); + // TODO: Figure out how to model 'steal' interaction . + // The scheduler should decide on 'what to steal' and on how 'to manage the chains'. + // The task_manager should perform the act of actually performing the steal/trade. + // Maybe also give the chain management to the task_manager and associate resources with the traded tasks. + PLS_ASSERT(check_task_chain_forward(*my_state.get_active_task()), "We are sole owner of this chain, it has to be valid!"); - // Move the traded in resource of this active task over to the stack of resources. - auto *stolen_task = &my_task_manager.get_active_task(); - // Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns. - my_task_manager.push_resource_on_task(stolen_task, traded_task); - - auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task); - if (optional_exchanged_task) { - // All good, we pushed the task over to the stack, nothing more to do - PLS_ASSERT(*optional_exchanged_task == traded_task, - "We are currently executing this, no one else can put another task in this field!"); - } else { - // The last other active thread took it as its spare resource... - // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). - auto current_root = stolen_task->resource_stack_root_.load(); - current_root.stamp++; - current_root.value = 0; - stolen_task->resource_stack_root_.store(current_root); - } - // Execute the stolen task by jumping to it's continuation. PLS_ASSERT(stolen_task->continuation_.valid(), "A task that we can steal must have a valid continuation for us to start working."); @@ -102,6 +82,94 @@ void scheduler::work_thread_work_section() { } } +void scheduler::sync() { + thread_state &syncing_state = thread_state::get(); + + base_task *active_task = syncing_state.get_active_task(); + base_task *spawned_task = active_task->next_; + + if (active_task->is_synchronized_) { + return; // We are already the sole owner of last_task + } else { + auto continuation = + spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) { + active_task->continuation_ = std::move(cont); + syncing_state.set_active_task(spawned_task); + + return slow_return(syncing_state); + }); + + PLS_ASSERT(!continuation.valid(), + "We only return to a sync point, never jump to it directly." + "This must therefore never return an unfinished fiber/continuation."); + return; // We cleanly synced to the last one finishing work on last_task + } +} + +context_switcher::continuation scheduler::slow_return(thread_state &calling_state) { + base_task *this_task = calling_state.get_active_task(); + PLS_ASSERT(this_task->depth_ > 0, + "Must never try to return from a task at level 0 (no last task), as we must have a target to return to."); + base_task *last_task = this_task->prev_; + + // Slow return means we try to finish the child 'this_task' of 'last_task' and we + // do not know if we are the last child to finish. + // If we are not the last one, we get a spare task chain for our resources and can return to the main scheduling loop. + base_task *pop_result = calling_state.get_task_manager().pop_clean_task_chain(last_task); + + if (pop_result != nullptr) { + base_task *clean_chain = pop_result; + + // We got a clean chain to fill up our resources. + PLS_ASSERT(last_task->depth_ == clean_chain->depth_, + "Resources must only reside in the correct depth!"); + PLS_ASSERT(last_task != clean_chain, + "We want to swap out the last task and its chain to use a clean one, thus they must differ."); + PLS_ASSERT(check_task_chain_backward(*clean_chain), + "Can only acquire clean chains for clean returns!"); + + // Acquire it/merge it with our task chain. + this_task->prev_ = clean_chain; + clean_chain->next_ = this_task; + + base_task *active_task = clean_chain; + while (active_task->depth_ > 0) { + active_task = active_task->prev_; + } + calling_state.set_active_task(active_task); + + // Jump back to the continuation in main scheduling loop. + context_switcher::continuation result_cont = std::move(thread_state::get().main_continuation()); + PLS_ASSERT(result_cont.valid(), "Must return a valid continuation."); + return result_cont; + } else { + // Make sure that we are owner of this full continuation/task chain. + last_task->next_ = this_task; + + // We are the last one working on this task. Thus the sync must be finished, continue working. + calling_state.set_active_task(last_task); + last_task->is_synchronized_ = true; + + // Jump to parent task and continue working on it. + context_switcher::continuation result_cont = std::move(last_task->continuation_); + PLS_ASSERT(result_cont.valid(), "Must return a valid continuation."); + return result_cont; + } +} + +base_task &scheduler::task_chain_at(unsigned int depth, thread_state &calling_state) { + // TODO: possible optimize with cache array at steal events + base_task *result = calling_state.get_active_task(); + while (result->depth_ > depth) { + result = result->prev_; + } + while (result->depth_ < depth) { + result = result->next_; + } + + return *result; +} + void scheduler::terminate() { if (terminated_) { return; @@ -118,8 +186,30 @@ void scheduler::terminate() { } } -void scheduler::sync() { - thread_state::get().get_task_manager().sync(); +bool scheduler::check_task_chain_forward(base_task &start_task) { + base_task *current = &start_task; + while (current->next_) { + if (current->next_->prev_ != current) { + return false; + } + current = current->next_; + } + return true; +} + +bool scheduler::check_task_chain_backward(base_task &start_task) { + base_task *current = &start_task; + while (current->prev_) { + if (current->prev_->next_ != current) { + return false; + } + current = current->prev_; + } + return true; +} + +bool scheduler::check_task_chain(base_task &start_task) { + return check_task_chain_backward(start_task) && check_task_chain_forward(start_task); } } diff --git a/lib/pls/src/internal/scheduling/task.cpp b/lib/pls/src/internal/scheduling/task.cpp deleted file mode 100644 index 4b9b0e3..0000000 --- a/lib/pls/src/internal/scheduling/task.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "pls/internal/scheduling/task.h" - -namespace pls { -namespace internal { -namespace scheduling { - -} -} -} diff --git a/lib/pls/src/internal/scheduling/task_manager.cpp b/lib/pls/src/internal/scheduling/task_manager.cpp deleted file mode 100644 index 8a2aa7d..0000000 --- a/lib/pls/src/internal/scheduling/task_manager.cpp +++ /dev/null @@ -1,241 +0,0 @@ -#include "pls/internal/scheduling/task_manager.h" - -#include "pls/internal/scheduling/task.h" -#include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/scheduling/scheduler.h" - -namespace pls::internal::scheduling { - -task_manager::task_manager(unsigned thread_id, - size_t num_tasks, - size_t stack_size, - std::shared_ptr &stack_allocator) : stack_allocator_{stack_allocator}, - tasks_{}, - deque_{thread_id, num_tasks} { - tasks_.reserve(num_tasks); - - for (size_t i = 0; i < num_tasks - 1; i++) { - char *stack_memory = stack_allocator->allocate_stack(stack_size); - tasks_.emplace_back(std::make_unique(stack_memory, stack_size, i, thread_id)); - - if (i > 0) { - tasks_[i - 1]->next_ = tasks_[i].get(); - tasks_[i]->prev_ = tasks_[i - 1].get(); - } - } - active_task_ = tasks_[0].get(); -} - -task_manager::~task_manager() { - for (auto &task : tasks_) { - stack_allocator_->free_stack(task->stack_size_, task->stack_memory_); - } -} - -static task &find_task(unsigned id, unsigned depth) { - return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth); -} - -task *task_manager::steal_task(task_manager &stealing_task_manager) { - PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain."); - PLS_ASSERT(stealing_task_manager.check_task_chain(), "Must only steal with clean task chain."); - - auto peek = deque_.peek_top(); - if (peek.top_task_) { - // search for the task we want to trade in - task *stolen_task = *peek.top_task_; - task *traded_task = stealing_task_manager.active_task_; - for (unsigned i = 0; i < stolen_task->depth_; i++) { - traded_task = traded_task->next_; - } - - // keep a reference to the rest of the task chain that we keep - task *next_own_task = traded_task->next_; - // 'unchain' the traded tasks (to help us find bugs) - traded_task->next_ = nullptr; - - // perform the actual pop operation - auto pop_result_task = deque_.pop_top(traded_task, peek); - if (pop_result_task) { - PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_, - "It is impossible to steal an task we already own!"); - PLS_ASSERT(*pop_result_task == stolen_task, - "We must only steal the task that we peeked at!"); - - // the steal was a success, link the chain so we own the stolen part - stolen_task->next_ = next_own_task; - next_own_task->prev_ = stolen_task; - stealing_task_manager.active_task_ = stolen_task; - - return traded_task; - } else { - // the steal failed, reset our chain to its old, clean state (re-link what we have broken) - traded_task->next_ = next_own_task; - - return nullptr; - } - } else { - return nullptr; - } -} - -void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) { - PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_, - "Makes no sense to push task onto itself, as it is not clean by definition."); - PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth."); - - data_structures::stamped_integer current_root; - data_structures::stamped_integer target_root; - do { - current_root = target_task->resource_stack_root_.load(); - target_root.stamp = current_root.stamp + 1; - target_root.value = spare_task_chain->thread_id_ + 1; - - if (current_root.value == 0) { - // Empty, simply push in with no successor - spare_task_chain->resource_stack_next_.store(nullptr); - } else { - // Already an entry. Find it's corresponding task and set it as our successor. - auto ¤t_root_task = find_task(current_root.value - 1, target_task->depth_); - spare_task_chain->resource_stack_next_.store(¤t_root_task); - } - - } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); -} - -task *task_manager::pop_resource_from_task(task *target_task) { - data_structures::stamped_integer current_root; - data_structures::stamped_integer target_root; - task *output_task; - do { - current_root = target_task->resource_stack_root_.load(); - if (current_root.value == 0) { - // Empty... - return nullptr; - } else { - // Found something, try to pop it - auto ¤t_root_task = find_task(current_root.value - 1, target_task->depth_); - auto *next_stack_task = current_root_task.resource_stack_next_.load(); - - target_root.stamp = current_root.stamp + 1; - target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; - - output_task = ¤t_root_task; - } - } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); - - PLS_ASSERT(check_task_chain_backward(output_task), "Must only pop proper task chains."); - output_task->resource_stack_next_.store(nullptr); - return output_task; -} - -void task_manager::sync() { - auto *spawning_task_manager = this; - auto *last_task = spawning_task_manager->active_task_; - auto *spawned_task = spawning_task_manager->active_task_->next_; - - if (last_task->is_synchronized_) { - return; // We are already the sole owner of last_task - } else { - auto continuation = spawned_task->run_as_task([=](context_switcher::continuation cont) { - last_task->continuation_ = std::move(cont); - spawning_task_manager->active_task_ = spawned_task; - - context_switcher::continuation result_cont; - if (spawning_task_manager->try_clean_return(result_cont)) { - // We return back to the main scheduling loop - return result_cont; - } else { - // We finish up the last task - return result_cont; - } - }); - - PLS_ASSERT(!continuation.valid(), - "We only return to a sync point, never jump to it directly." - "This must therefore never return an unfinished fiber/continuation."); - - return; // We cleanly synced to the last one finishing work on last_task - } -} - -bool task_manager::try_clean_return(context_switcher::continuation &result_cont) { - task *this_task = active_task_; - task *last_task = active_task_->prev_; - - PLS_ASSERT(last_task != nullptr, - "Must never try to return from a task at level 0 (no last task), as we must have a target to return to."); - - // Try to get a clean resource chain to go back to the main stealing loop - task *clean_chain = pop_resource_from_task(last_task); - if (clean_chain == nullptr) { - // double-check if we are really last one or we only have unlucky timing - auto optional_cas_task = external_trading_deque::get_trade_object(last_task); - if (optional_cas_task) { - clean_chain = *optional_cas_task; - } else { - clean_chain = pop_resource_from_task(last_task); - } - } - - if (clean_chain != nullptr) { - // We got a clean chain to continue working on. - PLS_ASSERT(last_task->depth_ == clean_chain->depth_, - "Resources must only reside in the correct depth!"); - PLS_ASSERT(clean_chain != last_task, - "We want to swap out the last task and its chain to use a clean one, thus they must differ."); - PLS_ASSERT(check_task_chain_backward(clean_chain), - "Can only acquire clean chains for clean returns!"); - this_task->prev_ = clean_chain; - clean_chain->next_ = this_task; - - // Walk back chain to make first task active - active_task_ = clean_chain; - while (active_task_->prev_ != nullptr) { - active_task_ = active_task_->prev_; - } - - // jump back to the continuation in main scheduling loop, time to steal some work - result_cont = std::move(thread_state::get().main_continuation()); - PLS_ASSERT(result_cont.valid(), "Must return a valid continuation."); - return true; - } else { - // Make sure that we are owner fo this full continuation/task chain. - last_task->next_ = this_task; - this_task->prev_ = last_task; - - // We are the last one working on this task. Thus the sync must be finished, continue working. - active_task_ = last_task; - - last_task->is_synchronized_ = true; - result_cont = std::move(last_task->continuation_); - PLS_ASSERT(result_cont.valid(), "Must return a valid continuation."); - return false; - } -} - -bool task_manager::check_task_chain_forward(task *start_task) { - while (start_task->next_ != nullptr) { - if (start_task->next_->prev_ != start_task) { - return false; - } - start_task = start_task->next_; - } - return true; -} - -bool task_manager::check_task_chain_backward(task *start_task) { - while (start_task->prev_ != nullptr) { - if (start_task->prev_->next_ != start_task) { - return false; - } - start_task = start_task->prev_; - } - return true; -} - -bool task_manager::check_task_chain() { - return check_task_chain_backward(active_task_) && check_task_chain_forward(active_task_); -} - -} diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index 826bdf5..728d736 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -2,11 +2,12 @@ #include -#include "pls/internal/scheduling/traded_cas_field.h" -#include "pls/internal/scheduling/external_trading_deque.h" +#include "pls/internal/scheduling/lock_free/traded_cas_field.h" +#include "pls/internal/scheduling/lock_free/external_trading_deque.h" #include "pls/pls.h" using namespace pls::internal::scheduling; +using namespace pls::internal::scheduling::lock_free; constexpr int MAX_NUM_TASKS = 32; constexpr int MAX_STACK_SIZE = 1024 * 8; -- libgit2 0.26.0