diff --git a/app/benchmark_matrix/main.cpp b/app/benchmark_matrix/main.cpp index 6bc24c8..5d69b36 100644 --- a/app/benchmark_matrix/main.cpp +++ b/app/benchmark_matrix/main.cpp @@ -21,7 +21,7 @@ class pls_matrix : public matrix::matrix { }; constexpr int MAX_NUM_TASKS = 32; -constexpr int MAX_STACK_SIZE = 1024 * 1; +constexpr int MAX_STACK_SIZE = 4096 * 1; int main(int argc, char **argv) { int num_threads; diff --git a/extern/benchmark_runner/benchmark_runner.h b/extern/benchmark_runner/benchmark_runner.h index 7a9ef2b..37a2ac6 100644 --- a/extern/benchmark_runner/benchmark_runner.h +++ b/extern/benchmark_runner/benchmark_runner.h @@ -32,15 +32,17 @@ class benchmark_runner { } public: - benchmark_runner(string csv_path, string csv_name) : csv_path_{std::move(csv_path)}, - csv_name_{std::move(csv_name)}, - times_{} { + benchmark_runner(string csv_path, string csv_name, int num_measurements = 10000) : csv_path_{std::move(csv_path)}, + csv_name_{std::move(csv_name)}, + times_{} { string command = "mkdir -p " + csv_path_; int res = system(command.c_str()); if (res) { cout << "Error while creating directory!" << endl; exit(1); } + + times_.reserve(num_measurements); } static void read_args(int argc, char **argv, int &num_threads, string &path) { diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 7c967fc..b0fec82 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -1,5 +1,6 @@ # List all required files here (cmake best practice to NOT automate this step!) add_library(pls STATIC + include/pls/algorithms/loop_partition_strategy.h include/pls/algorithms/for_each.h include/pls/algorithms/for_each_impl.h include/pls/algorithms/invoke.h include/pls/algorithms/invoke_impl.h include/pls/algorithms/loop_partition_strategy.h @@ -30,12 +31,15 @@ add_library(pls STATIC include/pls/internal/helpers/seqence.h include/pls/internal/helpers/member_function.h - include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/scheduler.h include/pls/internal/scheduling/scheduler_impl.h src/internal/scheduling/scheduler.cpp - include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task_manager_impl.h src/internal/scheduling/task_manager.cpp - include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp - include/pls/internal/scheduling/external_trading_deque.h src/internal/scheduling/external_trading_deque.cpp - include/pls/internal/scheduling/traded_cas_field.h include/pls/algorithms/loop_partition_strategy.h) + include/pls/internal/scheduling/base_task.h src/internal/scheduling/base_task.cpp + include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp + include/pls/internal/scheduling/task_manager.h + + include/pls/internal/scheduling/lock_free/task.h + include/pls/internal/scheduling/lock_free/task_manager.h src/internal/scheduling/lock_free/task_manager.cpp + include/pls/internal/scheduling/lock_free/external_trading_deque.h src/internal/scheduling/lock_free/external_trading_deque.cpp + include/pls/internal/scheduling/lock_free/traded_cas_field.h) # Dependencies for pls target_link_libraries(pls Threads::Threads) diff --git a/lib/pls/include/pls/internal/helpers/tsan_fiber_api.h b/lib/pls/include/pls/internal/helpers/tsan_fiber_api.h deleted file mode 100644 index 04df6f0..0000000 --- a/lib/pls/include/pls/internal/helpers/tsan_fiber_api.h +++ /dev/null @@ -1,21 +0,0 @@ - -#ifndef PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ -#define PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ - -extern "C" { -// Fiber switching API. -// - TSAN context for fiber can be created by __tsan_create_fiber -// and freed by __tsan_destroy_fiber. -// - TSAN context of current fiber or thread can be obtained -// by calling __tsan_get_current_fiber. -// - __tsan_switch_to_fiber should be called immediatly before switch -// to fiber, such as call of swapcontext. -// - Fiber name can be set by __tsan_set_fiber_name. -void *__tsan_get_current_fiber(void); -void *__tsan_create_fiber(unsigned flags); -void __tsan_destroy_fiber(void *fiber); -void __tsan_switch_to_fiber(void *fiber, unsigned flags); -void __tsan_set_fiber_name(void *fiber, const char *name); -}; - -#endif //PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/base_task.h similarity index 66% rename from lib/pls/include/pls/internal/scheduling/task.h rename to lib/pls/include/pls/internal/scheduling/base_task.h index 32888fd..5cbe16e 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/base_task.h @@ -1,5 +1,5 @@ -#ifndef PLS_TASK_H -#define PLS_TASK_H +#ifndef PLS_BASE_TASK_H +#define PLS_BASE_TASK_H #include #include @@ -7,10 +7,6 @@ #include "context_switcher/continuation.h" #include "context_switcher/context_switcher.h" -#include "pls/internal/base/system_details.h" -#include "pls/internal/data_structures/stamped_integer.h" -#include "pls/internal/scheduling/traded_cas_field.h" - namespace pls::internal::scheduling { /** * A task is the smallest unit of execution seen by the runtime system. @@ -23,52 +19,48 @@ namespace pls::internal::scheduling { * - initialized (no execution state) * - running (currently executing user code) * - suspended (suspended by switching to a different task). + * + * This base_task can be extended by different trading/stealing implementations, + * to add for example additional flags. The scheduler itself always works solely with this base version. */ -struct PLS_CACHE_ALIGN task { - task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : +struct base_task { + base_task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : + depth_{depth}, + thread_id_{thread_id}, stack_memory_{stack_memory}, stack_size_{stack_size}, is_synchronized_{false}, - depth_{depth}, - thread_id_{thread_id}, prev_{nullptr}, next_{nullptr} {} // Do not allow accidental copy/move operations. // The whole runtime relies on tasks never changing memory positions during execution. // Create tasks ONCE and use them until the runtime is shut down. - task(const task &other) = delete; - task(task &&other) = delete; - task &operator=(const task &other) = delete; - task &operator=(task &&other) = delete; + base_task(const base_task &other) = delete; + base_task(base_task &&other) = delete; + base_task &operator=(const base_task &other) = delete; + base_task &operator=(base_task &&other) = delete; template context_switcher::continuation run_as_task(F &&lambda) { return context_switcher::enter_context(stack_memory_, stack_size_, std::forward(lambda)); } - // TODO: Proper access control and split it up into responsibilities - // Stack/Continuation Management + // General task information + unsigned depth_; + unsigned thread_id_; + + // Stack/continuation management char *stack_memory_; size_t stack_size_; context_switcher::continuation continuation_; bool is_synchronized_; - // TODO: Clean up responsibilities - // Work-Stealing - std::atomic external_trading_deque_cas_{}; - std::atomic resource_stack_next_{}; - std::atomic resource_stack_root_{{0, 0}}; - - // Task Tree (we have a parent that we want to continue when we finish) - unsigned depth_; - unsigned thread_id_; - - // Memory Linked List - task *prev_; - task *next_; + // Linked list for trading/memory management + base_task *prev_; + base_task *next_; }; } -#endif //PLS_TASK_H +#endif //PLS_BASE_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h deleted file mode 100644 index 07c44c2..0000000 --- a/lib/pls/include/pls/internal/scheduling/heap_scheduler_memory.h +++ /dev/null @@ -1,59 +0,0 @@ -#ifndef PLS_HEAP_SCHEDULER_MEMORY_H -#define PLS_HEAP_SCHEDULER_MEMORY_H - -#include - -#include "pls/internal/base/thread.h" - -#include "pls/internal/scheduling/scheduler_memory.h" -#include "pls/internal/scheduling/thread_state.h" -#include "pls/internal/scheduling/thread_state_static.h" - -namespace pls { -namespace internal { -namespace scheduling { - -template -class heap_scheduler_memory : public scheduler_memory { - public: - explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads}, - thread_vector_{}, - thread_state_vector_{}, - thread_state_pointers_{} { - thread_vector_.reserve(max_threads); - thread_state_vector_.reserve(max_threads); - - for (size_t i = 0; i < max_threads; i++) { - thread_vector_.emplace_back(); - thread_state_vector_.emplace_back(); - thread_state_pointers_.emplace_back(&thread_state_vector_[i].get_thread_state()); - } - thread_states_array_ = thread_state_pointers_.data(); - } - - size_t max_threads() const override { - return max_threads_; - } - - base::thread &thread_for(size_t id) override { - return thread_vector_[id]; - } - private: - using thread_state_type = thread_state_static; - // thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require - // the new operator to obey alignments bigger than 16, cache lines are usually 64). - // To allow this object to be allocated using 'new' (which the vector does internally), - // we need to wrap it in an non aligned object. - using thread_state_wrapper = base::alignment::cache_alignment_wrapper; - - size_t max_threads_; - std::vector thread_vector_; - std::vector thread_state_vector_; - std::vector thread_state_pointers_; -}; - -} -} -} - -#endif // PLS_HEOP_SCHEDULER_MEMORY_H diff --git a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h b/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h similarity index 97% rename from lib/pls/include/pls/internal/scheduling/external_trading_deque.h rename to lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h index e873fe1..75b95e2 100644 --- a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h +++ b/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h @@ -12,10 +12,9 @@ #include "pls/internal/data_structures/optional.h" #include "pls/internal/data_structures/stamped_integer.h" -#include "pls/internal/scheduling/traded_cas_field.h" -#include "pls/internal/scheduling/task.h" +#include "pls/internal/scheduling/lock_free/task.h" -namespace pls::internal::scheduling { +namespace pls::internal::scheduling::lock_free { using namespace data_structures; diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/task.h b/lib/pls/include/pls/internal/scheduling/lock_free/task.h new file mode 100644 index 0000000..50d4b41 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/lock_free/task.h @@ -0,0 +1,23 @@ + +#ifndef PLS_LOCK_FREE_TASK_H_ +#define PLS_LOCK_FREE_TASK_H_ + +#include "pls/internal/scheduling/base_task.h" +#include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/scheduling/lock_free/traded_cas_field.h" + +namespace pls::internal::scheduling::lock_free { + +struct task : public base_task { + task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : + base_task(stack_memory, stack_size, depth, thread_id) {} + + // Additional info for lock-free stealing and resource trading. + std::atomic external_trading_deque_cas_{}; + std::atomic resource_stack_next_{}; + std::atomic resource_stack_root_{{0, 0}}; +}; + +} + +#endif //PLS_LOCK_FREE_TASK_H_ diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/task_manager.h b/lib/pls/include/pls/internal/scheduling/lock_free/task_manager.h new file mode 100644 index 0000000..013a2d2 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/lock_free/task_manager.h @@ -0,0 +1,59 @@ + +#ifndef PLS_LOCK_FREE_TASK_MANAGER_H_ +#define PLS_LOCK_FREE_TASK_MANAGER_H_ + +#include +#include +#include + +#include "pls/internal/base/stack_allocator.h" + +#include "pls/internal/scheduling/lock_free/task.h" +#include "pls/internal/scheduling/lock_free/external_trading_deque.h" + +namespace pls::internal::scheduling { +struct thread_state; +} + +namespace pls::internal::scheduling::lock_free { + +/** + * Handles management of tasks in the system. Each thread has a local task manager, + * responsible for allocating, freeing and publishing tasks for stealing. + * + * All interaction for spawning, stealing and task trading are managed through this class. + */ +class task_manager { + using stack_allocator = pls::internal::base::stack_allocator; + + public: + explicit task_manager(unsigned thread_id, + size_t num_tasks, + size_t stack_size, + std::shared_ptr &stack_allocator); + ~task_manager(); + + task *get_task(size_t index) { return tasks_[index].get(); } + + // Local scheduling + void push_local_task(base_task *pushed_task); + base_task *pop_local_task(); + + // Stealing work, automatically trades in another task + base_task *steal_task(thread_state &stealing_state); + + // Sync/memory management + base_task *pop_clean_task_chain(base_task *task); + private: + // Internal helpers for resource stack on tasks + void push_resource_on_task(task *target_task, task *spare_task_chain); + task *pop_resource_from_task(task *target_task); + + std::shared_ptr stack_allocator_; + std::vector> tasks_; + + external_trading_deque deque_; +}; +} + +#endif //PLS_LOCK_FREE_TASK_MANAGER_H_ diff --git a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h b/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h similarity index 95% rename from lib/pls/include/pls/internal/scheduling/traded_cas_field.h rename to lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h index 3d31528..d61a28d 100644 --- a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h +++ b/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h @@ -1,13 +1,13 @@ -#ifndef PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ -#define PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ +#ifndef PLS_LOCK_FREE_TRADED_CAS_FIELD_H_ +#define PLS_LOCK_FREE_TRADED_CAS_FIELD_H_ #include #include "pls/internal/base/error_handling.h" #include "pls/internal/base/system_details.h" -namespace pls::internal::scheduling { +namespace pls::internal::scheduling::lock_free { struct task; struct traded_cas_field { @@ -81,4 +81,4 @@ struct traded_cas_field { } -#endif //PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ +#endif //PLS_LOCK_FREE_TRADED_CAS_FIELD_H_ diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index ec0e4fb..0162b82 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -16,15 +16,14 @@ #include "pls/internal/scheduling/task_manager.h" namespace pls::internal::scheduling { - -struct task; - /** * The scheduler is the central part of the dispatching-framework. * It manages a pool of worker threads (creates, sleeps/wakes up, destroys) * and allows to execute parallel sections. * - * It works in close relation with the 'task' class for scheduling. + * It works in close relation with the 'task' and 'task_manager' class for scheduling. + * The task_manager handles the data structure for stealing/resource trading, + * the scheduler handles the high level execution flow (allowing the stealing implementation to be exchanged). */ class scheduler { public: @@ -85,17 +84,24 @@ class scheduler { */ static void sync(); - thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; } - task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; } - /** * Explicitly terminate the worker threads. Scheduler must not be used after this. */ void terminate(); [[nodiscard]] unsigned int num_threads() const { return num_threads_; } + [[nodiscard]] static base_task &task_chain_at(unsigned int depth, thread_state &calling_state); + + static bool check_task_chain_forward(base_task &start_task); + static bool check_task_chain_backward(base_task &start_task); + static bool check_task_chain(base_task &start_task); + + thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; } + task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; } private: + static context_switcher::continuation slow_return(thread_state &calling_state); + static void work_thread_main_loop(); void work_thread_work_section(); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 3c3a73b..bdcc241 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -7,11 +7,12 @@ #include "context_switcher/context_switcher.h" #include "context_switcher/continuation.h" -#include "pls/internal/scheduling/task_manager.h" -#include "pls/internal/scheduling/task.h" - #include "pls/internal/helpers/profiler.h" +#include "pls/internal/scheduling/task_manager.h" +#include "pls/internal/scheduling/base_task.h" +#include "base_task.h" + namespace pls::internal::scheduling { template @@ -63,8 +64,8 @@ class scheduler::init_function_impl : public init_function { public: explicit init_function_impl(F &function) : function_{function} {} void run() override { - auto &root_task = thread_state::get().get_task_manager().get_active_task(); - root_task.run_as_task([&](context_switcher::continuation cont) { + base_task *root_task = thread_state::get().get_active_task(); + root_task->run_as_task([root_task, this](::context_switcher::continuation cont) { thread_state::get().main_continuation() = std::move(cont); function_(); thread_state::get().get_scheduler().work_section_done_.store(true); @@ -100,7 +101,53 @@ void scheduler::perform_work(Function work_section) { template void scheduler::spawn(Function &&lambda) { - thread_state::get().get_task_manager().spawn_child(std::forward(lambda)); + thread_state &spawning_state = thread_state::get(); + + base_task *last_task = spawning_state.get_active_task(); + base_task *spawned_task = last_task->next_; + + auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) { + // allow stealing threads to continue the last task. + last_task->continuation_ = std::move(cont); + + // we are now executing the new task, allow others to steal the last task continuation. + spawned_task->is_synchronized_ = true; + spawning_state.set_active_task(spawned_task); + spawning_state.get_task_manager().push_local_task(last_task); + + // execute the lambda itself, which could lead to a different thread returning. + lambda(); + thread_state &syncing_state = thread_state::get(); + PLS_ASSERT(syncing_state.get_active_task() == spawned_task, + "Task manager must always point its active task onto whats executing."); + + // try to pop a task of the syncing task manager. + // possible outcomes: + // - this is a different task manager, it must have an empty deque and fail + // - this is the same task manager and someone stole last tasks, thus this will fail + // - this is the same task manager and no one stole the last task, this this will succeed + base_task *popped_task = syncing_state.get_task_manager().pop_local_task(); + if (popped_task) { + // Fast path, simply continue execution where we left of before spawn. + PLS_ASSERT(popped_task == last_task, + "Fast path, nothing can have changed until here."); + PLS_ASSERT(&spawning_state == &syncing_state, + "Fast path, we must only return if the task has not been stolen/moved to other thread."); + PLS_ASSERT(last_task->continuation_.valid(), + "Fast path, no one can have continued working on the last task."); + + syncing_state.set_active_task(last_task); + return std::move(last_task->continuation_); + } else { + // Slow path, the last task was stolen. This path is common to sync() events. + return slow_return(syncing_state); + } + }); + + if (continuation.valid()) { + // We jumped in here from the main loop, keep track! + thread_state::get().main_continuation() = std::move(continuation); + } } } diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 7ec2637..92a0899 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -1,86 +1,31 @@ #ifndef PLS_TASK_MANAGER_H_ #define PLS_TASK_MANAGER_H_ - -#include -#include -#include - -#include "pls/internal/scheduling/task.h" -#include "pls/internal/scheduling/external_trading_deque.h" - -#include "pls/internal/base/stack_allocator.h" - -namespace pls::internal::scheduling { - /** - * Handles management of tasks in the system. Each thread has a local task manager, - * responsible for allocating, freeing and publishing tasks for stealing. - * - * All interaction for spawning, stealing and task trading are managed through this class. + * Decision point for different task managing variants: + * - custom, lock-free variant (implemented) + * - basic, locking variant (planned) + * - transactional variant (planned) */ -class task_manager { - using stack_allocator = pls::internal::base::stack_allocator; - - public: - explicit task_manager(unsigned thread_id, - size_t num_tasks, - size_t stack_size, - std::shared_ptr &stack_allocator); - ~task_manager(); - - void push_resource_on_task(task *target_task, task *spare_task_chain); - task *pop_resource_from_task(task *target_task); +#include "lock_free/task_manager.h" - task &get_this_thread_task(size_t depth) { - return *tasks_[depth]; - } - - task &get_active_task() { - return *active_task_; - } - void set_active_task(task *active_task) { - active_task_ = active_task; - } - - template - void spawn_child(F &&lambda); - void sync(); - - task *steal_task(task_manager &stealing_task_manager); - - bool try_clean_return(context_switcher::continuation &result_cont); - - /** - * Helper to check if a task chain is correctly chained forward form the given starting task. - * - * @param start_task The start of the 'to be clean' chain - * @return true if the chain is clean/consistent. - */ - bool check_task_chain_forward(task *start_task); - /** - * Helper to check if a task chain is correctly chained backward form the given starting task. - * - * @param start_task The end of the 'to be clean' chain - * @return true if the chain was is clean/consistent. - */ - bool check_task_chain_backward(task *start_task); - /** - * Check the task chain maintained by this task manager. - * - * @return true if the chain is in a clean/consistent state. - */ - bool check_task_chain(); - - private: - std::shared_ptr stack_allocator_; - std::vector> tasks_; - task *active_task_; +namespace pls::internal::scheduling { - external_trading_deque deque_; -}; +#define PLS_DEQUE_LOCK_FREE 0 +#define PLS_DEQUE_LOCKING 1 +#define PLS_DEQUE_TRANSACTIONAL 2 +#define PLS_DEQUE_VARIANT PLS_DEQUE_LOCK_FREE + +#if PLS_DEQUE_VARIANT == PLS_DEQUE_LOCK_FREE +using pls::internal::scheduling::lock_free::task_manager; +#endif +#if PLS_DEQUE_VARIANT == PLS_DEQUE_LOCKING +#error "Not Implemented!" +#endif +#if PLS_DEQUE_VARIANT == PLS_DEQUE_TRANSACTIONAL +#error "Not Implemented!" +#endif } -#include "task_manager_impl.h" #endif //PLS_TASK_MANAGER_H_ diff --git a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h deleted file mode 100644 index 07d1d78..0000000 --- a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h +++ /dev/null @@ -1,78 +0,0 @@ - -#ifndef PLS_TASK_MANAGER_IMPL_H_ -#define PLS_TASK_MANAGER_IMPL_H_ - -#include -#include -#include - -#include "context_switcher/continuation.h" - -#include "pls/internal/scheduling/task.h" -#include "pls/internal/scheduling/thread_state.h" - -namespace pls::internal::scheduling { - -template -void task_manager::spawn_child(F &&lambda) { - auto *spawning_task_manager = this; - auto *last_task = spawning_task_manager->active_task_; - auto *spawned_task = spawning_task_manager->active_task_->next_; - - auto continuation = - spawned_task->run_as_task([=](context_switcher::continuation cont) { - // allow stealing threads to continue the last task. - last_task->continuation_ = std::move(cont); - - // we are now executing the new task, allow others to steal the last task continuation. - spawned_task->is_synchronized_ = true; - spawning_task_manager->active_task_ = spawned_task; - spawning_task_manager->deque_.push_bot(last_task); - - // execute the lambda itself, which could lead to a different thread returning. - lambda(); - auto *syncing_task_manager = &thread_state::get().get_task_manager(); - PLS_ASSERT(syncing_task_manager->active_task_ == spawned_task, - "Task manager must always point its active task onto whats executing."); - - // try to pop a task of the syncing task manager. - // possible outcomes: - // - this is a different task manager, it must have an empty deque and fail - // - this is the same task manager and someone stole last tasks, thus this will fail - // - this is the same task manager and no one stole the last task, this this will succeed - auto pop_result = syncing_task_manager->deque_.pop_bot(); - if (pop_result) { - // Fast path, simply continue execution where we left of before spawn. - PLS_ASSERT(*pop_result == last_task, - "Fast path, nothing can have changed until here."); - PLS_ASSERT(spawning_task_manager == syncing_task_manager, - "Fast path, nothing can have changed here."); - PLS_ASSERT(last_task->continuation_.valid(), - "Fast path, no one can have continued working on the last task."); - - syncing_task_manager->active_task_ = last_task; - return std::move(last_task->continuation_); - } else { - // Slow path, the last task was stolen. Sync using the resource stack. - context_switcher::continuation result_cont; - if (syncing_task_manager->try_clean_return(result_cont)) { - // We return back to the main scheduling loop - PLS_ASSERT(result_cont.valid(), "Must only return valid continuations..."); - return result_cont; - } else { - // We finish up the last task and are the sole owner again - PLS_ASSERT(result_cont.valid(), "Must only return valid continuations..."); - return result_cont; - } - } - }); - - if (continuation.valid()) { - // We jumped in here from the main loop, keep track! - thread_state::get().main_continuation() = std::move(continuation); - } -} - -} - -#endif //PLS_TASK_MANAGER_IMPL_H_ diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 7f717fd..5df607c 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -10,11 +10,12 @@ #include "pls/internal/base/system_details.h" +#include "pls/internal/scheduling/base_task.h" +#include "pls/internal/scheduling/task_manager.h" + namespace pls::internal::scheduling { class scheduler; -class task_manager; - /** * Proxy-Object for thread local state needed during scheduling. * The main use is to perform thread_state::get() as a thread local @@ -29,8 +30,9 @@ struct PLS_CACHE_ALIGN thread_state { scheduler &scheduler_; task_manager &task_manager_; - PLS_CACHE_ALIGN context_switcher::continuation main_loop_continuation_; - PLS_CACHE_ALIGN std::minstd_rand random_; + context_switcher::continuation main_loop_continuation_; + std::minstd_rand random_; + base_task *active_task_; public: explicit thread_state(scheduler &scheduler, @@ -39,7 +41,8 @@ struct PLS_CACHE_ALIGN thread_state { thread_id_{thread_id}, scheduler_{scheduler}, task_manager_{task_manager}, - random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count())} {}; + random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count()) + thread_id}, + active_task_{task_manager.get_task(0)} {}; // Do not allow accidental copy/move operations. thread_state(const thread_state &) = delete; @@ -69,6 +72,9 @@ struct PLS_CACHE_ALIGN thread_state { [[nodiscard]] context_switcher::continuation &main_continuation() { return main_loop_continuation_; } + + void set_active_task(base_task *active_task) { active_task_ = active_task; } + base_task *get_active_task() const { return active_task_; } }; } diff --git a/lib/pls/src/internal/scheduling/task.cpp b/lib/pls/src/internal/scheduling/base_task.cpp similarity index 85% rename from lib/pls/src/internal/scheduling/task.cpp rename to lib/pls/src/internal/scheduling/base_task.cpp index 4b9b0e3..47b739e 100644 --- a/lib/pls/src/internal/scheduling/task.cpp +++ b/lib/pls/src/internal/scheduling/base_task.cpp @@ -1,4 +1,4 @@ -#include "pls/internal/scheduling/task.h" +#include "pls/internal/scheduling/base_task.h" namespace pls { namespace internal { diff --git a/lib/pls/src/internal/scheduling/external_trading_deque.cpp b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp similarity index 97% rename from lib/pls/src/internal/scheduling/external_trading_deque.cpp rename to lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp index 927528e..c2d322d 100644 --- a/lib/pls/src/internal/scheduling/external_trading_deque.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp @@ -1,6 +1,7 @@ -#include "pls/internal/scheduling/external_trading_deque.h" +#include "pls/internal/scheduling/lock_free/external_trading_deque.h" +#include "pls/internal/scheduling/lock_free/traded_cas_field.h" -namespace pls::internal::scheduling { +namespace pls::internal::scheduling::lock_free { optional external_trading_deque::peek_traded_object(task *target_task) { traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); diff --git a/lib/pls/src/internal/scheduling/task_manager.cpp b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp similarity index 54% rename from lib/pls/src/internal/scheduling/task_manager.cpp rename to lib/pls/src/internal/scheduling/lock_free/task_manager.cpp index 8a2aa7d..c279f34 100644 --- a/lib/pls/src/internal/scheduling/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp @@ -1,17 +1,17 @@ -#include "pls/internal/scheduling/task_manager.h" - -#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/scheduler.h" -namespace pls::internal::scheduling { +#include "pls/internal/scheduling/lock_free/task_manager.h" +#include "pls/internal/scheduling/lock_free/task.h" + +namespace pls::internal::scheduling::lock_free { task_manager::task_manager(unsigned thread_id, size_t num_tasks, size_t stack_size, std::shared_ptr &stack_allocator) : stack_allocator_{stack_allocator}, - tasks_{}, - deque_{thread_id, num_tasks} { + tasks_{}, + deque_{thread_id, num_tasks} { tasks_.reserve(num_tasks); for (size_t i = 0; i < num_tasks - 1; i++) { @@ -23,7 +23,6 @@ task_manager::task_manager(unsigned thread_id, tasks_[i]->prev_ = tasks_[i - 1].get(); } } - active_task_ = tasks_[0].get(); } task_manager::~task_manager() { @@ -32,25 +31,35 @@ task_manager::~task_manager() { } } -static task &find_task(unsigned id, unsigned depth) { - return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth); +static task *find_task(unsigned id, unsigned depth) { + return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth); +} + +void task_manager::push_local_task(base_task *pushed_task) { + deque_.push_bot(static_cast(pushed_task)); +} + +base_task *task_manager::pop_local_task() { + auto result = deque_.pop_bot(); + if (result) { + return *result; + } else { + return nullptr; + } } -task *task_manager::steal_task(task_manager &stealing_task_manager) { - PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain."); - PLS_ASSERT(stealing_task_manager.check_task_chain(), "Must only steal with clean task chain."); +base_task *task_manager::steal_task(thread_state &stealing_state) { + PLS_ASSERT(stealing_state.get_active_task()->depth_ == 0, "Must only steal with clean task chain."); + PLS_ASSERT(scheduler::check_task_chain(*stealing_state.get_active_task()), "Must only steal with clean task chain."); auto peek = deque_.peek_top(); if (peek.top_task_) { // search for the task we want to trade in - task *stolen_task = *peek.top_task_; - task *traded_task = stealing_task_manager.active_task_; - for (unsigned i = 0; i < stolen_task->depth_; i++) { - traded_task = traded_task->next_; - } + task *stolen_task = static_cast(*peek.top_task_); + task *traded_task = static_cast(&scheduler::task_chain_at(stolen_task->depth_, stealing_state)); // keep a reference to the rest of the task chain that we keep - task *next_own_task = traded_task->next_; + base_task *next_own_task = traded_task->next_; // 'unchain' the traded tasks (to help us find bugs) traded_task->next_ = nullptr; @@ -62,12 +71,31 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) { PLS_ASSERT(*pop_result_task == stolen_task, "We must only steal the task that we peeked at!"); + // TODO: the re-chaining should not be part of the task manager. + // The manager should only perform the steal + resource push. + // the steal was a success, link the chain so we own the stolen part stolen_task->next_ = next_own_task; next_own_task->prev_ = stolen_task; - stealing_task_manager.active_task_ = stolen_task; - return traded_task; + // update the resource stack associated with the stolen task + push_resource_on_task(stolen_task, traded_task); + + auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task); + if (optional_exchanged_task) { + // All good, we pushed the task over to the stack, nothing more to do + PLS_ASSERT(*optional_exchanged_task == traded_task, + "We are currently executing this, no one else can put another task in this field!"); + } else { + // The last other active thread took it as its spare resource... + // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). + auto current_root = stolen_task->resource_stack_root_.load(); + current_root.stamp++; + current_root.value = 0; + stolen_task->resource_stack_root_.store(current_root); + } + + return stolen_task; } else { // the steal failed, reset our chain to its old, clean state (re-link what we have broken) traded_task->next_ = next_own_task; @@ -79,10 +107,28 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) { } } +base_task *task_manager::pop_clean_task_chain(base_task *base_task) { + task *popped_task = static_cast(base_task); + // Try to get a clean resource chain to go back to the main stealing loop + task *clean_chain = pop_resource_from_task(popped_task); + if (clean_chain == nullptr) { + // double-check if we are really last one or we only have unlucky timing + auto optional_cas_task = external_trading_deque::get_trade_object(popped_task); + if (optional_cas_task) { + clean_chain = *optional_cas_task; + } else { + clean_chain = pop_resource_from_task(popped_task); + } + } + + return clean_chain; +} + void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) { PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_, "Makes no sense to push task onto itself, as it is not clean by definition."); - PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth."); + PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, + "Must only push tasks with correct depth."); data_structures::stamped_integer current_root; data_structures::stamped_integer target_root; @@ -96,8 +142,8 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha spare_task_chain->resource_stack_next_.store(nullptr); } else { // Already an entry. Find it's corresponding task and set it as our successor. - auto ¤t_root_task = find_task(current_root.value - 1, target_task->depth_); - spare_task_chain->resource_stack_next_.store(¤t_root_task); + auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); + spare_task_chain->resource_stack_next_.store(current_root_task); } } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); @@ -114,128 +160,19 @@ task *task_manager::pop_resource_from_task(task *target_task) { return nullptr; } else { // Found something, try to pop it - auto ¤t_root_task = find_task(current_root.value - 1, target_task->depth_); - auto *next_stack_task = current_root_task.resource_stack_next_.load(); + auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); + auto *next_stack_task = current_root_task->resource_stack_next_.load(); target_root.stamp = current_root.stamp + 1; target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; - output_task = ¤t_root_task; + output_task = current_root_task; } } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); - PLS_ASSERT(check_task_chain_backward(output_task), "Must only pop proper task chains."); + PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains."); output_task->resource_stack_next_.store(nullptr); return output_task; } -void task_manager::sync() { - auto *spawning_task_manager = this; - auto *last_task = spawning_task_manager->active_task_; - auto *spawned_task = spawning_task_manager->active_task_->next_; - - if (last_task->is_synchronized_) { - return; // We are already the sole owner of last_task - } else { - auto continuation = spawned_task->run_as_task([=](context_switcher::continuation cont) { - last_task->continuation_ = std::move(cont); - spawning_task_manager->active_task_ = spawned_task; - - context_switcher::continuation result_cont; - if (spawning_task_manager->try_clean_return(result_cont)) { - // We return back to the main scheduling loop - return result_cont; - } else { - // We finish up the last task - return result_cont; - } - }); - - PLS_ASSERT(!continuation.valid(), - "We only return to a sync point, never jump to it directly." - "This must therefore never return an unfinished fiber/continuation."); - - return; // We cleanly synced to the last one finishing work on last_task - } -} - -bool task_manager::try_clean_return(context_switcher::continuation &result_cont) { - task *this_task = active_task_; - task *last_task = active_task_->prev_; - - PLS_ASSERT(last_task != nullptr, - "Must never try to return from a task at level 0 (no last task), as we must have a target to return to."); - - // Try to get a clean resource chain to go back to the main stealing loop - task *clean_chain = pop_resource_from_task(last_task); - if (clean_chain == nullptr) { - // double-check if we are really last one or we only have unlucky timing - auto optional_cas_task = external_trading_deque::get_trade_object(last_task); - if (optional_cas_task) { - clean_chain = *optional_cas_task; - } else { - clean_chain = pop_resource_from_task(last_task); - } - } - - if (clean_chain != nullptr) { - // We got a clean chain to continue working on. - PLS_ASSERT(last_task->depth_ == clean_chain->depth_, - "Resources must only reside in the correct depth!"); - PLS_ASSERT(clean_chain != last_task, - "We want to swap out the last task and its chain to use a clean one, thus they must differ."); - PLS_ASSERT(check_task_chain_backward(clean_chain), - "Can only acquire clean chains for clean returns!"); - this_task->prev_ = clean_chain; - clean_chain->next_ = this_task; - - // Walk back chain to make first task active - active_task_ = clean_chain; - while (active_task_->prev_ != nullptr) { - active_task_ = active_task_->prev_; - } - - // jump back to the continuation in main scheduling loop, time to steal some work - result_cont = std::move(thread_state::get().main_continuation()); - PLS_ASSERT(result_cont.valid(), "Must return a valid continuation."); - return true; - } else { - // Make sure that we are owner fo this full continuation/task chain. - last_task->next_ = this_task; - this_task->prev_ = last_task; - - // We are the last one working on this task. Thus the sync must be finished, continue working. - active_task_ = last_task; - - last_task->is_synchronized_ = true; - result_cont = std::move(last_task->continuation_); - PLS_ASSERT(result_cont.valid(), "Must return a valid continuation."); - return false; - } -} - -bool task_manager::check_task_chain_forward(task *start_task) { - while (start_task->next_ != nullptr) { - if (start_task->next_->prev_ != start_task) { - return false; - } - start_task = start_task->next_; - } - return true; -} - -bool task_manager::check_task_chain_backward(task *start_task) { - while (start_task->prev_ != nullptr) { - if (start_task->prev_->next_ != start_task) { - return false; - } - start_task = start_task->prev_; - } - return true; -} - -bool task_manager::check_task_chain() { - return check_task_chain_backward(active_task_) && check_task_chain_forward(active_task_); -} - } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 29a776a..042f57d 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -38,10 +38,8 @@ void scheduler::work_thread_main_loop() { } void scheduler::work_thread_work_section() { - auto &my_state = thread_state::get(); - auto &my_task_manager = my_state.get_task_manager(); - - auto const num_threads = my_state.get_scheduler().num_threads(); + thread_state &my_state = thread_state::get(); + unsigned const num_threads = my_state.get_scheduler().num_threads(); if (my_state.get_thread_id() == 0) { // Main Thread, kick off by executing the user's main code block. @@ -50,42 +48,24 @@ void scheduler::work_thread_work_section() { unsigned int failed_steals = 0; while (!work_section_done_) { - PLS_ASSERT(my_task_manager.check_task_chain(), "Must start stealing with a clean task chain."); - - // TODO: move steal routine into separate function - const size_t target = my_state.get_rand() % num_threads; - if (target == my_state.get_thread_id()) { - continue; - } - - auto &target_state = my_state.get_scheduler().thread_state_for(target); - task *traded_task = target_state.get_task_manager().steal_task(my_task_manager); - - if (traded_task != nullptr) { - // The stealing procedure correctly changed our chain and active task. - // Now we need to perform the 'post steal' actions (manage resources and execute the stolen task). - PLS_ASSERT(my_task_manager.check_task_chain_forward(&my_task_manager.get_active_task()), + PLS_ASSERT(check_task_chain(*my_state.get_active_task()), "Must start stealing with a clean task chain."); + + size_t target; + do { + target = my_state.get_rand() % num_threads; + } while (target == my_state.get_thread_id()); + + thread_state &target_state = my_state.get_scheduler().thread_state_for(target); + base_task *stolen_task = target_state.get_task_manager().steal_task(my_state); + if (stolen_task) { + my_state.set_active_task(stolen_task); + // TODO: Figure out how to model 'steal' interaction . + // The scheduler should decide on 'what to steal' and on how 'to manage the chains'. + // The task_manager should perform the act of actually performing the steal/trade. + // Maybe also give the chain management to the task_manager and associate resources with the traded tasks. + PLS_ASSERT(check_task_chain_forward(*my_state.get_active_task()), "We are sole owner of this chain, it has to be valid!"); - // Move the traded in resource of this active task over to the stack of resources. - auto *stolen_task = &my_task_manager.get_active_task(); - // Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns. - my_task_manager.push_resource_on_task(stolen_task, traded_task); - - auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task); - if (optional_exchanged_task) { - // All good, we pushed the task over to the stack, nothing more to do - PLS_ASSERT(*optional_exchanged_task == traded_task, - "We are currently executing this, no one else can put another task in this field!"); - } else { - // The last other active thread took it as its spare resource... - // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). - auto current_root = stolen_task->resource_stack_root_.load(); - current_root.stamp++; - current_root.value = 0; - stolen_task->resource_stack_root_.store(current_root); - } - // Execute the stolen task by jumping to it's continuation. PLS_ASSERT(stolen_task->continuation_.valid(), "A task that we can steal must have a valid continuation for us to start working."); @@ -102,6 +82,94 @@ void scheduler::work_thread_work_section() { } } +void scheduler::sync() { + thread_state &syncing_state = thread_state::get(); + + base_task *active_task = syncing_state.get_active_task(); + base_task *spawned_task = active_task->next_; + + if (active_task->is_synchronized_) { + return; // We are already the sole owner of last_task + } else { + auto continuation = + spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) { + active_task->continuation_ = std::move(cont); + syncing_state.set_active_task(spawned_task); + + return slow_return(syncing_state); + }); + + PLS_ASSERT(!continuation.valid(), + "We only return to a sync point, never jump to it directly." + "This must therefore never return an unfinished fiber/continuation."); + return; // We cleanly synced to the last one finishing work on last_task + } +} + +context_switcher::continuation scheduler::slow_return(thread_state &calling_state) { + base_task *this_task = calling_state.get_active_task(); + PLS_ASSERT(this_task->depth_ > 0, + "Must never try to return from a task at level 0 (no last task), as we must have a target to return to."); + base_task *last_task = this_task->prev_; + + // Slow return means we try to finish the child 'this_task' of 'last_task' and we + // do not know if we are the last child to finish. + // If we are not the last one, we get a spare task chain for our resources and can return to the main scheduling loop. + base_task *pop_result = calling_state.get_task_manager().pop_clean_task_chain(last_task); + + if (pop_result != nullptr) { + base_task *clean_chain = pop_result; + + // We got a clean chain to fill up our resources. + PLS_ASSERT(last_task->depth_ == clean_chain->depth_, + "Resources must only reside in the correct depth!"); + PLS_ASSERT(last_task != clean_chain, + "We want to swap out the last task and its chain to use a clean one, thus they must differ."); + PLS_ASSERT(check_task_chain_backward(*clean_chain), + "Can only acquire clean chains for clean returns!"); + + // Acquire it/merge it with our task chain. + this_task->prev_ = clean_chain; + clean_chain->next_ = this_task; + + base_task *active_task = clean_chain; + while (active_task->depth_ > 0) { + active_task = active_task->prev_; + } + calling_state.set_active_task(active_task); + + // Jump back to the continuation in main scheduling loop. + context_switcher::continuation result_cont = std::move(thread_state::get().main_continuation()); + PLS_ASSERT(result_cont.valid(), "Must return a valid continuation."); + return result_cont; + } else { + // Make sure that we are owner of this full continuation/task chain. + last_task->next_ = this_task; + + // We are the last one working on this task. Thus the sync must be finished, continue working. + calling_state.set_active_task(last_task); + last_task->is_synchronized_ = true; + + // Jump to parent task and continue working on it. + context_switcher::continuation result_cont = std::move(last_task->continuation_); + PLS_ASSERT(result_cont.valid(), "Must return a valid continuation."); + return result_cont; + } +} + +base_task &scheduler::task_chain_at(unsigned int depth, thread_state &calling_state) { + // TODO: possible optimize with cache array at steal events + base_task *result = calling_state.get_active_task(); + while (result->depth_ > depth) { + result = result->prev_; + } + while (result->depth_ < depth) { + result = result->next_; + } + + return *result; +} + void scheduler::terminate() { if (terminated_) { return; @@ -118,8 +186,30 @@ void scheduler::terminate() { } } -void scheduler::sync() { - thread_state::get().get_task_manager().sync(); +bool scheduler::check_task_chain_forward(base_task &start_task) { + base_task *current = &start_task; + while (current->next_) { + if (current->next_->prev_ != current) { + return false; + } + current = current->next_; + } + return true; +} + +bool scheduler::check_task_chain_backward(base_task &start_task) { + base_task *current = &start_task; + while (current->prev_) { + if (current->prev_->next_ != current) { + return false; + } + current = current->prev_; + } + return true; +} + +bool scheduler::check_task_chain(base_task &start_task) { + return check_task_chain_backward(start_task) && check_task_chain_forward(start_task); } } diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index 826bdf5..728d736 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -2,11 +2,12 @@ #include -#include "pls/internal/scheduling/traded_cas_field.h" -#include "pls/internal/scheduling/external_trading_deque.h" +#include "pls/internal/scheduling/lock_free/traded_cas_field.h" +#include "pls/internal/scheduling/lock_free/external_trading_deque.h" #include "pls/pls.h" using namespace pls::internal::scheduling; +using namespace pls::internal::scheduling::lock_free; constexpr int MAX_NUM_TASKS = 32; constexpr int MAX_STACK_SIZE = 1024 * 8;