Commit f9e6fc51 by FritzFlorian

WIP: separate deque/stealing/trading logic from general scheduler logic.

parent 4469bcd3
Pipeline #1440 passed with stages
in 3 minutes 19 seconds
...@@ -21,7 +21,7 @@ class pls_matrix : public matrix::matrix<T, SIZE> { ...@@ -21,7 +21,7 @@ class pls_matrix : public matrix::matrix<T, SIZE> {
}; };
constexpr int MAX_NUM_TASKS = 32; constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 1; constexpr int MAX_STACK_SIZE = 4096 * 1;
int main(int argc, char **argv) { int main(int argc, char **argv) {
int num_threads; int num_threads;
......
...@@ -32,15 +32,17 @@ class benchmark_runner { ...@@ -32,15 +32,17 @@ class benchmark_runner {
} }
public: public:
benchmark_runner(string csv_path, string csv_name) : csv_path_{std::move(csv_path)}, benchmark_runner(string csv_path, string csv_name, int num_measurements = 10000) : csv_path_{std::move(csv_path)},
csv_name_{std::move(csv_name)}, csv_name_{std::move(csv_name)},
times_{} { times_{} {
string command = "mkdir -p " + csv_path_; string command = "mkdir -p " + csv_path_;
int res = system(command.c_str()); int res = system(command.c_str());
if (res) { if (res) {
cout << "Error while creating directory!" << endl; cout << "Error while creating directory!" << endl;
exit(1); exit(1);
} }
times_.reserve(num_measurements);
} }
static void read_args(int argc, char **argv, int &num_threads, string &path) { static void read_args(int argc, char **argv, int &num_threads, string &path) {
......
# List all required files here (cmake best practice to NOT automate this step!) # List all required files here (cmake best practice to NOT automate this step!)
add_library(pls STATIC add_library(pls STATIC
include/pls/algorithms/loop_partition_strategy.h
include/pls/algorithms/for_each.h include/pls/algorithms/for_each_impl.h include/pls/algorithms/for_each.h include/pls/algorithms/for_each_impl.h
include/pls/algorithms/invoke.h include/pls/algorithms/invoke_impl.h include/pls/algorithms/invoke.h include/pls/algorithms/invoke_impl.h
include/pls/algorithms/loop_partition_strategy.h include/pls/algorithms/loop_partition_strategy.h
...@@ -30,12 +31,15 @@ add_library(pls STATIC ...@@ -30,12 +31,15 @@ add_library(pls STATIC
include/pls/internal/helpers/seqence.h include/pls/internal/helpers/seqence.h
include/pls/internal/helpers/member_function.h include/pls/internal/helpers/member_function.h
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/scheduler.h include/pls/internal/scheduling/scheduler_impl.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h include/pls/internal/scheduling/scheduler_impl.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task_manager_impl.h src/internal/scheduling/task_manager.cpp include/pls/internal/scheduling/base_task.h src/internal/scheduling/base_task.cpp
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/external_trading_deque.h src/internal/scheduling/external_trading_deque.cpp include/pls/internal/scheduling/task_manager.h
include/pls/internal/scheduling/traded_cas_field.h include/pls/algorithms/loop_partition_strategy.h)
include/pls/internal/scheduling/lock_free/task.h
include/pls/internal/scheduling/lock_free/task_manager.h src/internal/scheduling/lock_free/task_manager.cpp
include/pls/internal/scheduling/lock_free/external_trading_deque.h src/internal/scheduling/lock_free/external_trading_deque.cpp
include/pls/internal/scheduling/lock_free/traded_cas_field.h)
# Dependencies for pls # Dependencies for pls
target_link_libraries(pls Threads::Threads) target_link_libraries(pls Threads::Threads)
......
#ifndef PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_
#define PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_
extern "C" {
// Fiber switching API.
// - TSAN context for fiber can be created by __tsan_create_fiber
// and freed by __tsan_destroy_fiber.
// - TSAN context of current fiber or thread can be obtained
// by calling __tsan_get_current_fiber.
// - __tsan_switch_to_fiber should be called immediatly before switch
// to fiber, such as call of swapcontext.
// - Fiber name can be set by __tsan_set_fiber_name.
void *__tsan_get_current_fiber(void);
void *__tsan_create_fiber(unsigned flags);
void __tsan_destroy_fiber(void *fiber);
void __tsan_switch_to_fiber(void *fiber, unsigned flags);
void __tsan_set_fiber_name(void *fiber, const char *name);
};
#endif //PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_
#ifndef PLS_TASK_H #ifndef PLS_BASE_TASK_H
#define PLS_TASK_H #define PLS_BASE_TASK_H
#include <utility> #include <utility>
#include <atomic> #include <atomic>
...@@ -7,10 +7,6 @@ ...@@ -7,10 +7,6 @@
#include "context_switcher/continuation.h" #include "context_switcher/continuation.h"
#include "context_switcher/context_switcher.h" #include "context_switcher/context_switcher.h"
#include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/traded_cas_field.h"
namespace pls::internal::scheduling { namespace pls::internal::scheduling {
/** /**
* A task is the smallest unit of execution seen by the runtime system. * A task is the smallest unit of execution seen by the runtime system.
...@@ -23,52 +19,48 @@ namespace pls::internal::scheduling { ...@@ -23,52 +19,48 @@ namespace pls::internal::scheduling {
* - initialized (no execution state) * - initialized (no execution state)
* - running (currently executing user code) * - running (currently executing user code)
* - suspended (suspended by switching to a different task). * - suspended (suspended by switching to a different task).
*
* This base_task can be extended by different trading/stealing implementations,
* to add for example additional flags. The scheduler itself always works solely with this base version.
*/ */
struct PLS_CACHE_ALIGN task { struct base_task {
task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : base_task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) :
depth_{depth},
thread_id_{thread_id},
stack_memory_{stack_memory}, stack_memory_{stack_memory},
stack_size_{stack_size}, stack_size_{stack_size},
is_synchronized_{false}, is_synchronized_{false},
depth_{depth},
thread_id_{thread_id},
prev_{nullptr}, prev_{nullptr},
next_{nullptr} {} next_{nullptr} {}
// Do not allow accidental copy/move operations. // Do not allow accidental copy/move operations.
// The whole runtime relies on tasks never changing memory positions during execution. // The whole runtime relies on tasks never changing memory positions during execution.
// Create tasks ONCE and use them until the runtime is shut down. // Create tasks ONCE and use them until the runtime is shut down.
task(const task &other) = delete; base_task(const base_task &other) = delete;
task(task &&other) = delete; base_task(base_task &&other) = delete;
task &operator=(const task &other) = delete; base_task &operator=(const base_task &other) = delete;
task &operator=(task &&other) = delete; base_task &operator=(base_task &&other) = delete;
template<typename F> template<typename F>
context_switcher::continuation run_as_task(F &&lambda) { context_switcher::continuation run_as_task(F &&lambda) {
return context_switcher::enter_context(stack_memory_, stack_size_, std::forward<F>(lambda)); return context_switcher::enter_context(stack_memory_, stack_size_, std::forward<F>(lambda));
} }
// TODO: Proper access control and split it up into responsibilities // General task information
// Stack/Continuation Management unsigned depth_;
unsigned thread_id_;
// Stack/continuation management
char *stack_memory_; char *stack_memory_;
size_t stack_size_; size_t stack_size_;
context_switcher::continuation continuation_; context_switcher::continuation continuation_;
bool is_synchronized_; bool is_synchronized_;
// TODO: Clean up responsibilities // Linked list for trading/memory management
// Work-Stealing base_task *prev_;
std::atomic<traded_cas_field> external_trading_deque_cas_{}; base_task *next_;
std::atomic<task *> resource_stack_next_{};
std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}};
// Task Tree (we have a parent that we want to continue when we finish)
unsigned depth_;
unsigned thread_id_;
// Memory Linked List
task *prev_;
task *next_;
}; };
} }
#endif //PLS_TASK_H #endif //PLS_BASE_TASK_H
#ifndef PLS_HEAP_SCHEDULER_MEMORY_H
#define PLS_HEAP_SCHEDULER_MEMORY_H
#include <vector>
#include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/thread_state_static.h"
namespace pls {
namespace internal {
namespace scheduling {
template<size_t NUM_TASKS, size_t STACK_SIZE>
class heap_scheduler_memory : public scheduler_memory {
public:
explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads},
thread_vector_{},
thread_state_vector_{},
thread_state_pointers_{} {
thread_vector_.reserve(max_threads);
thread_state_vector_.reserve(max_threads);
for (size_t i = 0; i < max_threads; i++) {
thread_vector_.emplace_back();
thread_state_vector_.emplace_back();
thread_state_pointers_.emplace_back(&thread_state_vector_[i].get_thread_state());
}
thread_states_array_ = thread_state_pointers_.data();
}
size_t max_threads() const override {
return max_threads_;
}
base::thread &thread_for(size_t id) override {
return thread_vector_[id];
}
private:
using thread_state_type = thread_state_static<NUM_TASKS, STACK_SIZE>;
// thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require
// the new operator to obey alignments bigger than 16, cache lines are usually 64).
// To allow this object to be allocated using 'new' (which the vector does internally),
// we need to wrap it in an non aligned object.
using thread_state_wrapper = base::alignment::cache_alignment_wrapper<thread_state_type>;
size_t max_threads_;
std::vector<base::thread> thread_vector_;
std::vector<thread_state_wrapper> thread_state_vector_;
std::vector<thread_state *> thread_state_pointers_;
};
}
}
}
#endif // PLS_HEOP_SCHEDULER_MEMORY_H
...@@ -12,10 +12,9 @@ ...@@ -12,10 +12,9 @@
#include "pls/internal/data_structures/optional.h" #include "pls/internal/data_structures/optional.h"
#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/traded_cas_field.h" #include "pls/internal/scheduling/lock_free/task.h"
#include "pls/internal/scheduling/task.h"
namespace pls::internal::scheduling { namespace pls::internal::scheduling::lock_free {
using namespace data_structures; using namespace data_structures;
......
#ifndef PLS_LOCK_FREE_TASK_H_
#define PLS_LOCK_FREE_TASK_H_
#include "pls/internal/scheduling/base_task.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/lock_free/traded_cas_field.h"
namespace pls::internal::scheduling::lock_free {
struct task : public base_task {
task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) :
base_task(stack_memory, stack_size, depth, thread_id) {}
// Additional info for lock-free stealing and resource trading.
std::atomic<traded_cas_field> external_trading_deque_cas_{};
std::atomic<base_task *> resource_stack_next_{};
std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}};
};
}
#endif //PLS_LOCK_FREE_TASK_H_
#ifndef PLS_LOCK_FREE_TASK_MANAGER_H_
#define PLS_LOCK_FREE_TASK_MANAGER_H_
#include <memory>
#include <utility>
#include <array>
#include "pls/internal/base/stack_allocator.h"
#include "pls/internal/scheduling/lock_free/task.h"
#include "pls/internal/scheduling/lock_free/external_trading_deque.h"
namespace pls::internal::scheduling {
struct thread_state;
}
namespace pls::internal::scheduling::lock_free {
/**
* Handles management of tasks in the system. Each thread has a local task manager,
* responsible for allocating, freeing and publishing tasks for stealing.
*
* All interaction for spawning, stealing and task trading are managed through this class.
*/
class task_manager {
using stack_allocator = pls::internal::base::stack_allocator;
public:
explicit task_manager(unsigned thread_id,
size_t num_tasks,
size_t stack_size,
std::shared_ptr<stack_allocator> &stack_allocator);
~task_manager();
task *get_task(size_t index) { return tasks_[index].get(); }
// Local scheduling
void push_local_task(base_task *pushed_task);
base_task *pop_local_task();
// Stealing work, automatically trades in another task
base_task *steal_task(thread_state &stealing_state);
// Sync/memory management
base_task *pop_clean_task_chain(base_task *task);
private:
// Internal helpers for resource stack on tasks
void push_resource_on_task(task *target_task, task *spare_task_chain);
task *pop_resource_from_task(task *target_task);
std::shared_ptr<stack_allocator> stack_allocator_;
std::vector<std::unique_ptr<task>> tasks_;
external_trading_deque deque_;
};
}
#endif //PLS_LOCK_FREE_TASK_MANAGER_H_
#ifndef PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ #ifndef PLS_LOCK_FREE_TRADED_CAS_FIELD_H_
#define PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ #define PLS_LOCK_FREE_TRADED_CAS_FIELD_H_
#include <atomic> #include <atomic>
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
namespace pls::internal::scheduling { namespace pls::internal::scheduling::lock_free {
struct task; struct task;
struct traded_cas_field { struct traded_cas_field {
...@@ -81,4 +81,4 @@ struct traded_cas_field { ...@@ -81,4 +81,4 @@ struct traded_cas_field {
} }
#endif //PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ #endif //PLS_LOCK_FREE_TRADED_CAS_FIELD_H_
...@@ -16,15 +16,14 @@ ...@@ -16,15 +16,14 @@
#include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/task_manager.h"
namespace pls::internal::scheduling { namespace pls::internal::scheduling {
struct task;
/** /**
* The scheduler is the central part of the dispatching-framework. * The scheduler is the central part of the dispatching-framework.
* It manages a pool of worker threads (creates, sleeps/wakes up, destroys) * It manages a pool of worker threads (creates, sleeps/wakes up, destroys)
* and allows to execute parallel sections. * and allows to execute parallel sections.
* *
* It works in close relation with the 'task' class for scheduling. * It works in close relation with the 'task' and 'task_manager' class for scheduling.
* The task_manager handles the data structure for stealing/resource trading,
* the scheduler handles the high level execution flow (allowing the stealing implementation to be exchanged).
*/ */
class scheduler { class scheduler {
public: public:
...@@ -85,17 +84,24 @@ class scheduler { ...@@ -85,17 +84,24 @@ class scheduler {
*/ */
static void sync(); static void sync();
thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; }
task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; }
/** /**
* Explicitly terminate the worker threads. Scheduler must not be used after this. * Explicitly terminate the worker threads. Scheduler must not be used after this.
*/ */
void terminate(); void terminate();
[[nodiscard]] unsigned int num_threads() const { return num_threads_; } [[nodiscard]] unsigned int num_threads() const { return num_threads_; }
[[nodiscard]] static base_task &task_chain_at(unsigned int depth, thread_state &calling_state);
static bool check_task_chain_forward(base_task &start_task);
static bool check_task_chain_backward(base_task &start_task);
static bool check_task_chain(base_task &start_task);
thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; }
task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; }
private: private:
static context_switcher::continuation slow_return(thread_state &calling_state);
static void work_thread_main_loop(); static void work_thread_main_loop();
void work_thread_work_section(); void work_thread_work_section();
......
...@@ -7,11 +7,12 @@ ...@@ -7,11 +7,12 @@
#include "context_switcher/context_switcher.h" #include "context_switcher/context_switcher.h"
#include "context_switcher/continuation.h" #include "context_switcher/continuation.h"
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/base_task.h"
#include "base_task.h"
namespace pls::internal::scheduling { namespace pls::internal::scheduling {
template<typename ALLOC> template<typename ALLOC>
...@@ -63,8 +64,8 @@ class scheduler::init_function_impl : public init_function { ...@@ -63,8 +64,8 @@ class scheduler::init_function_impl : public init_function {
public: public:
explicit init_function_impl(F &function) : function_{function} {} explicit init_function_impl(F &function) : function_{function} {}
void run() override { void run() override {
auto &root_task = thread_state::get().get_task_manager().get_active_task(); base_task *root_task = thread_state::get().get_active_task();
root_task.run_as_task([&](context_switcher::continuation cont) { root_task->run_as_task([root_task, this](::context_switcher::continuation cont) {
thread_state::get().main_continuation() = std::move(cont); thread_state::get().main_continuation() = std::move(cont);
function_(); function_();
thread_state::get().get_scheduler().work_section_done_.store(true); thread_state::get().get_scheduler().work_section_done_.store(true);
...@@ -100,7 +101,53 @@ void scheduler::perform_work(Function work_section) { ...@@ -100,7 +101,53 @@ void scheduler::perform_work(Function work_section) {
template<typename Function> template<typename Function>
void scheduler::spawn(Function &&lambda) { void scheduler::spawn(Function &&lambda) {
thread_state::get().get_task_manager().spawn_child(std::forward<Function>(lambda)); thread_state &spawning_state = thread_state::get();
base_task *last_task = spawning_state.get_active_task();
base_task *spawned_task = last_task->next_;
auto continuation = spawned_task->run_as_task([last_task, spawned_task, lambda, &spawning_state](auto cont) {
// allow stealing threads to continue the last task.
last_task->continuation_ = std::move(cont);
// we are now executing the new task, allow others to steal the last task continuation.
spawned_task->is_synchronized_ = true;
spawning_state.set_active_task(spawned_task);
spawning_state.get_task_manager().push_local_task(last_task);
// execute the lambda itself, which could lead to a different thread returning.
lambda();
thread_state &syncing_state = thread_state::get();
PLS_ASSERT(syncing_state.get_active_task() == spawned_task,
"Task manager must always point its active task onto whats executing.");
// try to pop a task of the syncing task manager.
// possible outcomes:
// - this is a different task manager, it must have an empty deque and fail
// - this is the same task manager and someone stole last tasks, thus this will fail
// - this is the same task manager and no one stole the last task, this this will succeed
base_task *popped_task = syncing_state.get_task_manager().pop_local_task();
if (popped_task) {
// Fast path, simply continue execution where we left of before spawn.
PLS_ASSERT(popped_task == last_task,
"Fast path, nothing can have changed until here.");
PLS_ASSERT(&spawning_state == &syncing_state,
"Fast path, we must only return if the task has not been stolen/moved to other thread.");
PLS_ASSERT(last_task->continuation_.valid(),
"Fast path, no one can have continued working on the last task.");
syncing_state.set_active_task(last_task);
return std::move(last_task->continuation_);
} else {
// Slow path, the last task was stolen. This path is common to sync() events.
return slow_return(syncing_state);
}
});
if (continuation.valid()) {
// We jumped in here from the main loop, keep track!
thread_state::get().main_continuation() = std::move(continuation);
}
} }
} }
......
#ifndef PLS_TASK_MANAGER_H_ #ifndef PLS_TASK_MANAGER_H_
#define PLS_TASK_MANAGER_H_ #define PLS_TASK_MANAGER_H_
#include <memory>
#include <utility>
#include <array>
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/external_trading_deque.h"
#include "pls/internal/base/stack_allocator.h"
namespace pls::internal::scheduling {
/** /**
* Handles management of tasks in the system. Each thread has a local task manager, * Decision point for different task managing variants:
* responsible for allocating, freeing and publishing tasks for stealing. * - custom, lock-free variant (implemented)
* * - basic, locking variant (planned)
* All interaction for spawning, stealing and task trading are managed through this class. * - transactional variant (planned)
*/ */
class task_manager { #include "lock_free/task_manager.h"
using stack_allocator = pls::internal::base::stack_allocator;
public:
explicit task_manager(unsigned thread_id,
size_t num_tasks,
size_t stack_size,
std::shared_ptr<stack_allocator> &stack_allocator);
~task_manager();
void push_resource_on_task(task *target_task, task *spare_task_chain);
task *pop_resource_from_task(task *target_task);
task &get_this_thread_task(size_t depth) { namespace pls::internal::scheduling {
return *tasks_[depth];
}
task &get_active_task() {
return *active_task_;
}
void set_active_task(task *active_task) {
active_task_ = active_task;
}
template<typename F>
void spawn_child(F &&lambda);
void sync();
task *steal_task(task_manager &stealing_task_manager);
bool try_clean_return(context_switcher::continuation &result_cont);
/**
* Helper to check if a task chain is correctly chained forward form the given starting task.
*
* @param start_task The start of the 'to be clean' chain
* @return true if the chain is clean/consistent.
*/
bool check_task_chain_forward(task *start_task);
/**
* Helper to check if a task chain is correctly chained backward form the given starting task.
*
* @param start_task The end of the 'to be clean' chain
* @return true if the chain was is clean/consistent.
*/
bool check_task_chain_backward(task *start_task);
/**
* Check the task chain maintained by this task manager.
*
* @return true if the chain is in a clean/consistent state.
*/
bool check_task_chain();
private:
std::shared_ptr<stack_allocator> stack_allocator_;
std::vector<std::unique_ptr<task>> tasks_;
task *active_task_;
external_trading_deque deque_; #define PLS_DEQUE_LOCK_FREE 0
}; #define PLS_DEQUE_LOCKING 1
#define PLS_DEQUE_TRANSACTIONAL 2
#define PLS_DEQUE_VARIANT PLS_DEQUE_LOCK_FREE
#if PLS_DEQUE_VARIANT == PLS_DEQUE_LOCK_FREE
using pls::internal::scheduling::lock_free::task_manager;
#endif
#if PLS_DEQUE_VARIANT == PLS_DEQUE_LOCKING
#error "Not Implemented!"
#endif
#if PLS_DEQUE_VARIANT == PLS_DEQUE_TRANSACTIONAL
#error "Not Implemented!"
#endif
} }
#include "task_manager_impl.h"
#endif //PLS_TASK_MANAGER_H_ #endif //PLS_TASK_MANAGER_H_
#ifndef PLS_TASK_MANAGER_IMPL_H_
#define PLS_TASK_MANAGER_IMPL_H_
#include <memory>
#include <utility>
#include <array>
#include "context_switcher/continuation.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls::internal::scheduling {
template<typename F>
void task_manager::spawn_child(F &&lambda) {
auto *spawning_task_manager = this;
auto *last_task = spawning_task_manager->active_task_;
auto *spawned_task = spawning_task_manager->active_task_->next_;
auto continuation =
spawned_task->run_as_task([=](context_switcher::continuation cont) {
// allow stealing threads to continue the last task.
last_task->continuation_ = std::move(cont);
// we are now executing the new task, allow others to steal the last task continuation.
spawned_task->is_synchronized_ = true;
spawning_task_manager->active_task_ = spawned_task;
spawning_task_manager->deque_.push_bot(last_task);
// execute the lambda itself, which could lead to a different thread returning.
lambda();
auto *syncing_task_manager = &thread_state::get().get_task_manager();
PLS_ASSERT(syncing_task_manager->active_task_ == spawned_task,
"Task manager must always point its active task onto whats executing.");
// try to pop a task of the syncing task manager.
// possible outcomes:
// - this is a different task manager, it must have an empty deque and fail
// - this is the same task manager and someone stole last tasks, thus this will fail
// - this is the same task manager and no one stole the last task, this this will succeed
auto pop_result = syncing_task_manager->deque_.pop_bot();
if (pop_result) {
// Fast path, simply continue execution where we left of before spawn.
PLS_ASSERT(*pop_result == last_task,
"Fast path, nothing can have changed until here.");
PLS_ASSERT(spawning_task_manager == syncing_task_manager,
"Fast path, nothing can have changed here.");
PLS_ASSERT(last_task->continuation_.valid(),
"Fast path, no one can have continued working on the last task.");
syncing_task_manager->active_task_ = last_task;
return std::move(last_task->continuation_);
} else {
// Slow path, the last task was stolen. Sync using the resource stack.
context_switcher::continuation result_cont;
if (syncing_task_manager->try_clean_return(result_cont)) {
// We return back to the main scheduling loop
PLS_ASSERT(result_cont.valid(), "Must only return valid continuations...");
return result_cont;
} else {
// We finish up the last task and are the sole owner again
PLS_ASSERT(result_cont.valid(), "Must only return valid continuations...");
return result_cont;
}
}
});
if (continuation.valid()) {
// We jumped in here from the main loop, keep track!
thread_state::get().main_continuation() = std::move(continuation);
}
}
}
#endif //PLS_TASK_MANAGER_IMPL_H_
...@@ -10,11 +10,12 @@ ...@@ -10,11 +10,12 @@
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
#include "pls/internal/scheduling/base_task.h"
#include "pls/internal/scheduling/task_manager.h"
namespace pls::internal::scheduling { namespace pls::internal::scheduling {
class scheduler; class scheduler;
class task_manager;
/** /**
* Proxy-Object for thread local state needed during scheduling. * Proxy-Object for thread local state needed during scheduling.
* The main use is to perform thread_state::get() as a thread local * The main use is to perform thread_state::get() as a thread local
...@@ -29,8 +30,9 @@ struct PLS_CACHE_ALIGN thread_state { ...@@ -29,8 +30,9 @@ struct PLS_CACHE_ALIGN thread_state {
scheduler &scheduler_; scheduler &scheduler_;
task_manager &task_manager_; task_manager &task_manager_;
PLS_CACHE_ALIGN context_switcher::continuation main_loop_continuation_; context_switcher::continuation main_loop_continuation_;
PLS_CACHE_ALIGN std::minstd_rand random_; std::minstd_rand random_;
base_task *active_task_;
public: public:
explicit thread_state(scheduler &scheduler, explicit thread_state(scheduler &scheduler,
...@@ -39,7 +41,8 @@ struct PLS_CACHE_ALIGN thread_state { ...@@ -39,7 +41,8 @@ struct PLS_CACHE_ALIGN thread_state {
thread_id_{thread_id}, thread_id_{thread_id},
scheduler_{scheduler}, scheduler_{scheduler},
task_manager_{task_manager}, task_manager_{task_manager},
random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {}; random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count()) + thread_id},
active_task_{task_manager.get_task(0)} {};
// Do not allow accidental copy/move operations. // Do not allow accidental copy/move operations.
thread_state(const thread_state &) = delete; thread_state(const thread_state &) = delete;
...@@ -69,6 +72,9 @@ struct PLS_CACHE_ALIGN thread_state { ...@@ -69,6 +72,9 @@ struct PLS_CACHE_ALIGN thread_state {
[[nodiscard]] context_switcher::continuation &main_continuation() { [[nodiscard]] context_switcher::continuation &main_continuation() {
return main_loop_continuation_; return main_loop_continuation_;
} }
void set_active_task(base_task *active_task) { active_task_ = active_task; }
base_task *get_active_task() const { return active_task_; }
}; };
} }
......
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/base_task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
......
#include "pls/internal/scheduling/external_trading_deque.h" #include "pls/internal/scheduling/lock_free/external_trading_deque.h"
#include "pls/internal/scheduling/lock_free/traded_cas_field.h"
namespace pls::internal::scheduling { namespace pls::internal::scheduling::lock_free {
optional<task *> external_trading_deque::peek_traded_object(task *target_task) { optional<task *> external_trading_deque::peek_traded_object(task *target_task) {
traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
......
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
namespace pls::internal::scheduling { #include "pls/internal/scheduling/lock_free/task_manager.h"
#include "pls/internal/scheduling/lock_free/task.h"
namespace pls::internal::scheduling::lock_free {
task_manager::task_manager(unsigned thread_id, task_manager::task_manager(unsigned thread_id,
size_t num_tasks, size_t num_tasks,
size_t stack_size, size_t stack_size,
std::shared_ptr<stack_allocator> &stack_allocator) : stack_allocator_{stack_allocator}, std::shared_ptr<stack_allocator> &stack_allocator) : stack_allocator_{stack_allocator},
tasks_{}, tasks_{},
deque_{thread_id, num_tasks} { deque_{thread_id, num_tasks} {
tasks_.reserve(num_tasks); tasks_.reserve(num_tasks);
for (size_t i = 0; i < num_tasks - 1; i++) { for (size_t i = 0; i < num_tasks - 1; i++) {
...@@ -23,7 +23,6 @@ task_manager::task_manager(unsigned thread_id, ...@@ -23,7 +23,6 @@ task_manager::task_manager(unsigned thread_id,
tasks_[i]->prev_ = tasks_[i - 1].get(); tasks_[i]->prev_ = tasks_[i - 1].get();
} }
} }
active_task_ = tasks_[0].get();
} }
task_manager::~task_manager() { task_manager::~task_manager() {
...@@ -32,25 +31,35 @@ task_manager::~task_manager() { ...@@ -32,25 +31,35 @@ task_manager::~task_manager() {
} }
} }
static task &find_task(unsigned id, unsigned depth) { static task *find_task(unsigned id, unsigned depth) {
return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth); return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth);
}
void task_manager::push_local_task(base_task *pushed_task) {
deque_.push_bot(static_cast<task *>(pushed_task));
}
base_task *task_manager::pop_local_task() {
auto result = deque_.pop_bot();
if (result) {
return *result;
} else {
return nullptr;
}
} }
task *task_manager::steal_task(task_manager &stealing_task_manager) { base_task *task_manager::steal_task(thread_state &stealing_state) {
PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain."); PLS_ASSERT(stealing_state.get_active_task()->depth_ == 0, "Must only steal with clean task chain.");
PLS_ASSERT(stealing_task_manager.check_task_chain(), "Must only steal with clean task chain."); PLS_ASSERT(scheduler::check_task_chain(*stealing_state.get_active_task()), "Must only steal with clean task chain.");
auto peek = deque_.peek_top(); auto peek = deque_.peek_top();
if (peek.top_task_) { if (peek.top_task_) {
// search for the task we want to trade in // search for the task we want to trade in
task *stolen_task = *peek.top_task_; task *stolen_task = static_cast<task *>(*peek.top_task_);
task *traded_task = stealing_task_manager.active_task_; task *traded_task = static_cast<task *>(&scheduler::task_chain_at(stolen_task->depth_, stealing_state));
for (unsigned i = 0; i < stolen_task->depth_; i++) {
traded_task = traded_task->next_;
}
// keep a reference to the rest of the task chain that we keep // keep a reference to the rest of the task chain that we keep
task *next_own_task = traded_task->next_; base_task *next_own_task = traded_task->next_;
// 'unchain' the traded tasks (to help us find bugs) // 'unchain' the traded tasks (to help us find bugs)
traded_task->next_ = nullptr; traded_task->next_ = nullptr;
...@@ -62,12 +71,31 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) { ...@@ -62,12 +71,31 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) {
PLS_ASSERT(*pop_result_task == stolen_task, PLS_ASSERT(*pop_result_task == stolen_task,
"We must only steal the task that we peeked at!"); "We must only steal the task that we peeked at!");
// TODO: the re-chaining should not be part of the task manager.
// The manager should only perform the steal + resource push.
// the steal was a success, link the chain so we own the stolen part // the steal was a success, link the chain so we own the stolen part
stolen_task->next_ = next_own_task; stolen_task->next_ = next_own_task;
next_own_task->prev_ = stolen_task; next_own_task->prev_ = stolen_task;
stealing_task_manager.active_task_ = stolen_task;
return traded_task; // update the resource stack associated with the stolen task
push_resource_on_task(stolen_task, traded_task);
auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task);
if (optional_exchanged_task) {
// All good, we pushed the task over to the stack, nothing more to do
PLS_ASSERT(*optional_exchanged_task == traded_task,
"We are currently executing this, no one else can put another task in this field!");
} else {
// The last other active thread took it as its spare resource...
// ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
auto current_root = stolen_task->resource_stack_root_.load();
current_root.stamp++;
current_root.value = 0;
stolen_task->resource_stack_root_.store(current_root);
}
return stolen_task;
} else { } else {
// the steal failed, reset our chain to its old, clean state (re-link what we have broken) // the steal failed, reset our chain to its old, clean state (re-link what we have broken)
traded_task->next_ = next_own_task; traded_task->next_ = next_own_task;
...@@ -79,10 +107,28 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) { ...@@ -79,10 +107,28 @@ task *task_manager::steal_task(task_manager &stealing_task_manager) {
} }
} }
base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
task *popped_task = static_cast<task *>(base_task);
// Try to get a clean resource chain to go back to the main stealing loop
task *clean_chain = pop_resource_from_task(popped_task);
if (clean_chain == nullptr) {
// double-check if we are really last one or we only have unlucky timing
auto optional_cas_task = external_trading_deque::get_trade_object(popped_task);
if (optional_cas_task) {
clean_chain = *optional_cas_task;
} else {
clean_chain = pop_resource_from_task(popped_task);
}
}
return clean_chain;
}
void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) { void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) {
PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_, PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_,
"Makes no sense to push task onto itself, as it is not clean by definition."); "Makes no sense to push task onto itself, as it is not clean by definition.");
PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth."); PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_,
"Must only push tasks with correct depth.");
data_structures::stamped_integer current_root; data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root; data_structures::stamped_integer target_root;
...@@ -96,8 +142,8 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha ...@@ -96,8 +142,8 @@ void task_manager::push_resource_on_task(task *target_task, task *spare_task_cha
spare_task_chain->resource_stack_next_.store(nullptr); spare_task_chain->resource_stack_next_.store(nullptr);
} else { } else {
// Already an entry. Find it's corresponding task and set it as our successor. // Already an entry. Find it's corresponding task and set it as our successor.
auto &current_root_task = find_task(current_root.value - 1, target_task->depth_); auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
spare_task_chain->resource_stack_next_.store(&current_root_task); spare_task_chain->resource_stack_next_.store(current_root_task);
} }
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
...@@ -114,128 +160,19 @@ task *task_manager::pop_resource_from_task(task *target_task) { ...@@ -114,128 +160,19 @@ task *task_manager::pop_resource_from_task(task *target_task) {
return nullptr; return nullptr;
} else { } else {
// Found something, try to pop it // Found something, try to pop it
auto &current_root_task = find_task(current_root.value - 1, target_task->depth_); auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
auto *next_stack_task = current_root_task.resource_stack_next_.load(); auto *next_stack_task = current_root_task->resource_stack_next_.load();
target_root.stamp = current_root.stamp + 1; target_root.stamp = current_root.stamp + 1;
target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
output_task = &current_root_task; output_task = current_root_task;
} }
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
PLS_ASSERT(check_task_chain_backward(output_task), "Must only pop proper task chains."); PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains.");
output_task->resource_stack_next_.store(nullptr); output_task->resource_stack_next_.store(nullptr);
return output_task; return output_task;
} }
void task_manager::sync() {
auto *spawning_task_manager = this;
auto *last_task = spawning_task_manager->active_task_;
auto *spawned_task = spawning_task_manager->active_task_->next_;
if (last_task->is_synchronized_) {
return; // We are already the sole owner of last_task
} else {
auto continuation = spawned_task->run_as_task([=](context_switcher::continuation cont) {
last_task->continuation_ = std::move(cont);
spawning_task_manager->active_task_ = spawned_task;
context_switcher::continuation result_cont;
if (spawning_task_manager->try_clean_return(result_cont)) {
// We return back to the main scheduling loop
return result_cont;
} else {
// We finish up the last task
return result_cont;
}
});
PLS_ASSERT(!continuation.valid(),
"We only return to a sync point, never jump to it directly."
"This must therefore never return an unfinished fiber/continuation.");
return; // We cleanly synced to the last one finishing work on last_task
}
}
bool task_manager::try_clean_return(context_switcher::continuation &result_cont) {
task *this_task = active_task_;
task *last_task = active_task_->prev_;
PLS_ASSERT(last_task != nullptr,
"Must never try to return from a task at level 0 (no last task), as we must have a target to return to.");
// Try to get a clean resource chain to go back to the main stealing loop
task *clean_chain = pop_resource_from_task(last_task);
if (clean_chain == nullptr) {
// double-check if we are really last one or we only have unlucky timing
auto optional_cas_task = external_trading_deque::get_trade_object(last_task);
if (optional_cas_task) {
clean_chain = *optional_cas_task;
} else {
clean_chain = pop_resource_from_task(last_task);
}
}
if (clean_chain != nullptr) {
// We got a clean chain to continue working on.
PLS_ASSERT(last_task->depth_ == clean_chain->depth_,
"Resources must only reside in the correct depth!");
PLS_ASSERT(clean_chain != last_task,
"We want to swap out the last task and its chain to use a clean one, thus they must differ.");
PLS_ASSERT(check_task_chain_backward(clean_chain),
"Can only acquire clean chains for clean returns!");
this_task->prev_ = clean_chain;
clean_chain->next_ = this_task;
// Walk back chain to make first task active
active_task_ = clean_chain;
while (active_task_->prev_ != nullptr) {
active_task_ = active_task_->prev_;
}
// jump back to the continuation in main scheduling loop, time to steal some work
result_cont = std::move(thread_state::get().main_continuation());
PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
return true;
} else {
// Make sure that we are owner fo this full continuation/task chain.
last_task->next_ = this_task;
this_task->prev_ = last_task;
// We are the last one working on this task. Thus the sync must be finished, continue working.
active_task_ = last_task;
last_task->is_synchronized_ = true;
result_cont = std::move(last_task->continuation_);
PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
return false;
}
}
bool task_manager::check_task_chain_forward(task *start_task) {
while (start_task->next_ != nullptr) {
if (start_task->next_->prev_ != start_task) {
return false;
}
start_task = start_task->next_;
}
return true;
}
bool task_manager::check_task_chain_backward(task *start_task) {
while (start_task->prev_ != nullptr) {
if (start_task->prev_->next_ != start_task) {
return false;
}
start_task = start_task->prev_;
}
return true;
}
bool task_manager::check_task_chain() {
return check_task_chain_backward(active_task_) && check_task_chain_forward(active_task_);
}
} }
...@@ -38,10 +38,8 @@ void scheduler::work_thread_main_loop() { ...@@ -38,10 +38,8 @@ void scheduler::work_thread_main_loop() {
} }
void scheduler::work_thread_work_section() { void scheduler::work_thread_work_section() {
auto &my_state = thread_state::get(); thread_state &my_state = thread_state::get();
auto &my_task_manager = my_state.get_task_manager(); unsigned const num_threads = my_state.get_scheduler().num_threads();
auto const num_threads = my_state.get_scheduler().num_threads();
if (my_state.get_thread_id() == 0) { if (my_state.get_thread_id() == 0) {
// Main Thread, kick off by executing the user's main code block. // Main Thread, kick off by executing the user's main code block.
...@@ -50,42 +48,24 @@ void scheduler::work_thread_work_section() { ...@@ -50,42 +48,24 @@ void scheduler::work_thread_work_section() {
unsigned int failed_steals = 0; unsigned int failed_steals = 0;
while (!work_section_done_) { while (!work_section_done_) {
PLS_ASSERT(my_task_manager.check_task_chain(), "Must start stealing with a clean task chain."); PLS_ASSERT(check_task_chain(*my_state.get_active_task()), "Must start stealing with a clean task chain.");
// TODO: move steal routine into separate function size_t target;
const size_t target = my_state.get_rand() % num_threads; do {
if (target == my_state.get_thread_id()) { target = my_state.get_rand() % num_threads;
continue; } while (target == my_state.get_thread_id());
}
thread_state &target_state = my_state.get_scheduler().thread_state_for(target);
auto &target_state = my_state.get_scheduler().thread_state_for(target); base_task *stolen_task = target_state.get_task_manager().steal_task(my_state);
task *traded_task = target_state.get_task_manager().steal_task(my_task_manager); if (stolen_task) {
my_state.set_active_task(stolen_task);
if (traded_task != nullptr) { // TODO: Figure out how to model 'steal' interaction .
// The stealing procedure correctly changed our chain and active task. // The scheduler should decide on 'what to steal' and on how 'to manage the chains'.
// Now we need to perform the 'post steal' actions (manage resources and execute the stolen task). // The task_manager should perform the act of actually performing the steal/trade.
PLS_ASSERT(my_task_manager.check_task_chain_forward(&my_task_manager.get_active_task()), // Maybe also give the chain management to the task_manager and associate resources with the traded tasks.
PLS_ASSERT(check_task_chain_forward(*my_state.get_active_task()),
"We are sole owner of this chain, it has to be valid!"); "We are sole owner of this chain, it has to be valid!");
// Move the traded in resource of this active task over to the stack of resources.
auto *stolen_task = &my_task_manager.get_active_task();
// Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns.
my_task_manager.push_resource_on_task(stolen_task, traded_task);
auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task);
if (optional_exchanged_task) {
// All good, we pushed the task over to the stack, nothing more to do
PLS_ASSERT(*optional_exchanged_task == traded_task,
"We are currently executing this, no one else can put another task in this field!");
} else {
// The last other active thread took it as its spare resource...
// ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
auto current_root = stolen_task->resource_stack_root_.load();
current_root.stamp++;
current_root.value = 0;
stolen_task->resource_stack_root_.store(current_root);
}
// Execute the stolen task by jumping to it's continuation. // Execute the stolen task by jumping to it's continuation.
PLS_ASSERT(stolen_task->continuation_.valid(), PLS_ASSERT(stolen_task->continuation_.valid(),
"A task that we can steal must have a valid continuation for us to start working."); "A task that we can steal must have a valid continuation for us to start working.");
...@@ -102,6 +82,94 @@ void scheduler::work_thread_work_section() { ...@@ -102,6 +82,94 @@ void scheduler::work_thread_work_section() {
} }
} }
void scheduler::sync() {
thread_state &syncing_state = thread_state::get();
base_task *active_task = syncing_state.get_active_task();
base_task *spawned_task = active_task->next_;
if (active_task->is_synchronized_) {
return; // We are already the sole owner of last_task
} else {
auto continuation =
spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) {
active_task->continuation_ = std::move(cont);
syncing_state.set_active_task(spawned_task);
return slow_return(syncing_state);
});
PLS_ASSERT(!continuation.valid(),
"We only return to a sync point, never jump to it directly."
"This must therefore never return an unfinished fiber/continuation.");
return; // We cleanly synced to the last one finishing work on last_task
}
}
context_switcher::continuation scheduler::slow_return(thread_state &calling_state) {
base_task *this_task = calling_state.get_active_task();
PLS_ASSERT(this_task->depth_ > 0,
"Must never try to return from a task at level 0 (no last task), as we must have a target to return to.");
base_task *last_task = this_task->prev_;
// Slow return means we try to finish the child 'this_task' of 'last_task' and we
// do not know if we are the last child to finish.
// If we are not the last one, we get a spare task chain for our resources and can return to the main scheduling loop.
base_task *pop_result = calling_state.get_task_manager().pop_clean_task_chain(last_task);
if (pop_result != nullptr) {
base_task *clean_chain = pop_result;
// We got a clean chain to fill up our resources.
PLS_ASSERT(last_task->depth_ == clean_chain->depth_,
"Resources must only reside in the correct depth!");
PLS_ASSERT(last_task != clean_chain,
"We want to swap out the last task and its chain to use a clean one, thus they must differ.");
PLS_ASSERT(check_task_chain_backward(*clean_chain),
"Can only acquire clean chains for clean returns!");
// Acquire it/merge it with our task chain.
this_task->prev_ = clean_chain;
clean_chain->next_ = this_task;
base_task *active_task = clean_chain;
while (active_task->depth_ > 0) {
active_task = active_task->prev_;
}
calling_state.set_active_task(active_task);
// Jump back to the continuation in main scheduling loop.
context_switcher::continuation result_cont = std::move(thread_state::get().main_continuation());
PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
return result_cont;
} else {
// Make sure that we are owner of this full continuation/task chain.
last_task->next_ = this_task;
// We are the last one working on this task. Thus the sync must be finished, continue working.
calling_state.set_active_task(last_task);
last_task->is_synchronized_ = true;
// Jump to parent task and continue working on it.
context_switcher::continuation result_cont = std::move(last_task->continuation_);
PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
return result_cont;
}
}
base_task &scheduler::task_chain_at(unsigned int depth, thread_state &calling_state) {
// TODO: possible optimize with cache array at steal events
base_task *result = calling_state.get_active_task();
while (result->depth_ > depth) {
result = result->prev_;
}
while (result->depth_ < depth) {
result = result->next_;
}
return *result;
}
void scheduler::terminate() { void scheduler::terminate() {
if (terminated_) { if (terminated_) {
return; return;
...@@ -118,8 +186,30 @@ void scheduler::terminate() { ...@@ -118,8 +186,30 @@ void scheduler::terminate() {
} }
} }
void scheduler::sync() { bool scheduler::check_task_chain_forward(base_task &start_task) {
thread_state::get().get_task_manager().sync(); base_task *current = &start_task;
while (current->next_) {
if (current->next_->prev_ != current) {
return false;
}
current = current->next_;
}
return true;
}
bool scheduler::check_task_chain_backward(base_task &start_task) {
base_task *current = &start_task;
while (current->prev_) {
if (current->prev_->next_ != current) {
return false;
}
current = current->prev_;
}
return true;
}
bool scheduler::check_task_chain(base_task &start_task) {
return check_task_chain_backward(start_task) && check_task_chain_forward(start_task);
} }
} }
...@@ -2,11 +2,12 @@ ...@@ -2,11 +2,12 @@
#include <atomic> #include <atomic>
#include "pls/internal/scheduling/traded_cas_field.h" #include "pls/internal/scheduling/lock_free/traded_cas_field.h"
#include "pls/internal/scheduling/external_trading_deque.h" #include "pls/internal/scheduling/lock_free/external_trading_deque.h"
#include "pls/pls.h" #include "pls/pls.h"
using namespace pls::internal::scheduling; using namespace pls::internal::scheduling;
using namespace pls::internal::scheduling::lock_free;
constexpr int MAX_NUM_TASKS = 32; constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 8; constexpr int MAX_STACK_SIZE = 1024 * 8;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment