#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/lock_free/task_manager.h" #include "pls/internal/scheduling/lock_free/task.h" namespace pls::internal::scheduling::lock_free { task_manager::task_manager(unsigned thread_id, size_t num_tasks, size_t stack_size, std::shared_ptr &stack_allocator) : stack_allocator_{stack_allocator}, tasks_{}, deque_{thread_id, num_tasks} { tasks_.reserve(num_tasks); for (size_t i = 0; i < num_tasks - 1; i++) { char *stack_memory = stack_allocator->allocate_stack(stack_size); tasks_.emplace_back(std::make_unique(stack_memory, stack_size, i, thread_id)); if (i > 0) { tasks_[i - 1]->next_ = tasks_[i].get(); tasks_[i]->prev_ = tasks_[i - 1].get(); } } } task_manager::~task_manager() { for (auto &task : tasks_) { stack_allocator_->free_stack(task->stack_size_, task->stack_memory_); } } static task *find_task(unsigned id, unsigned depth) { return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth); } void task_manager::push_local_task(base_task *pushed_task) { deque_.push_bot(static_cast(pushed_task)); } base_task *task_manager::pop_local_task() { auto result = deque_.pop_bot(); if (result) { return *result; } else { return nullptr; } } base_task *task_manager::steal_task(thread_state &stealing_state) { PLS_ASSERT(stealing_state.get_active_task()->depth_ == 0, "Must only steal with clean task chain."); PLS_ASSERT(scheduler::check_task_chain(*stealing_state.get_active_task()), "Must only steal with clean task chain."); auto peek = deque_.peek_top(); if (peek.top_task_) { // search for the task we want to trade in task *stolen_task = static_cast(*peek.top_task_); task *traded_task = static_cast(&scheduler::task_chain_at(stolen_task->depth_, stealing_state)); // keep a reference to the rest of the task chain that we keep base_task *next_own_task = traded_task->next_; // 'unchain' the traded tasks (to help us find bugs) traded_task->next_ = nullptr; // perform the actual pop operation auto pop_result_task = deque_.pop_top(traded_task, peek); if (pop_result_task) { PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_, "It is impossible to steal an task we already own!"); PLS_ASSERT(*pop_result_task == stolen_task, "We must only steal the task that we peeked at!"); // TODO: the re-chaining should not be part of the task manager. // The manager should only perform the steal + resource push. // the steal was a success, link the chain so we own the stolen part stolen_task->next_ = next_own_task; next_own_task->prev_ = stolen_task; // update the resource stack associated with the stolen task push_resource_on_task(stolen_task, traded_task); auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task); if (optional_exchanged_task) { // All good, we pushed the task over to the stack, nothing more to do PLS_ASSERT(*optional_exchanged_task == traded_task, "We are currently executing this, no one else can put another task in this field!"); } else { // The last other active thread took it as its spare resource... // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). auto current_root = stolen_task->resource_stack_root_.load(); current_root.stamp++; current_root.value = 0; stolen_task->resource_stack_root_.store(current_root); } return stolen_task; } else { // the steal failed, reset our chain to its old, clean state (re-link what we have broken) traded_task->next_ = next_own_task; return nullptr; } } else { return nullptr; } } base_task *task_manager::pop_clean_task_chain(base_task *base_task) { task *popped_task = static_cast(base_task); // Try to get a clean resource chain to go back to the main stealing loop task *clean_chain = pop_resource_from_task(popped_task); if (clean_chain == nullptr) { // double-check if we are really last one or we only have unlucky timing auto optional_cas_task = external_trading_deque::get_trade_object(popped_task); if (optional_cas_task) { clean_chain = *optional_cas_task; } else { clean_chain = pop_resource_from_task(popped_task); } } return clean_chain; } void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) { PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_, "Makes no sense to push task onto itself, as it is not clean by definition."); PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth."); data_structures::stamped_integer current_root; data_structures::stamped_integer target_root; do { current_root = target_task->resource_stack_root_.load(); target_root.stamp = current_root.stamp + 1; target_root.value = spare_task_chain->thread_id_ + 1; if (current_root.value == 0) { // Empty, simply push in with no successor spare_task_chain->resource_stack_next_.store(nullptr); } else { // Already an entry. Find it's corresponding task and set it as our successor. auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); spare_task_chain->resource_stack_next_.store(current_root_task); } } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); } task *task_manager::pop_resource_from_task(task *target_task) { data_structures::stamped_integer current_root; data_structures::stamped_integer target_root; task *output_task; do { current_root = target_task->resource_stack_root_.load(); if (current_root.value == 0) { // Empty... return nullptr; } else { // Found something, try to pop it auto *current_root_task = find_task(current_root.value - 1, target_task->depth_); auto *next_stack_task = current_root_task->resource_stack_next_.load(); target_root.stamp = current_root.stamp + 1; target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; output_task = current_root_task; } } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root)); PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains."); output_task->resource_stack_next_.store(nullptr); return output_task; } }