#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/lock_free/task_manager.h" #include "pls/internal/scheduling/lock_free/task.h" namespace pls::internal::scheduling::lock_free { task_manager::task_manager(unsigned thread_id, size_t num_tasks, size_t stack_size, std::shared_ptr &stack_allocator) : stack_allocator_{stack_allocator}, tasks_{}, deque_{thread_id, num_tasks} { tasks_.reserve(num_tasks); for (size_t i = 0; i < num_tasks; i++) { char *stack_memory = stack_allocator->allocate_stack(stack_size); for (size_t j = 0; j < stack_size; j += base::system_details::CACHE_LINE_SIZE) { stack_memory[j] = 'a'; // Touch all stacks, forces the OS to actually allocate them } tasks_.emplace_back(std::make_unique(stack_memory, stack_size, i, thread_id)); if (i > 0) { tasks_[i - 1]->next_ = tasks_[i].get(); tasks_[i]->prev_ = tasks_[i - 1].get(); } } } task_manager::~task_manager() { for (auto &task : tasks_) { stack_allocator_->free_stack(task->stack_size_, task->stack_memory_); } } void task_manager::push_local_task(base_task *pushed_task) { deque_.push_bot(static_cast(pushed_task)); } base_task *task_manager::pop_local_task() { return deque_.pop_bot(); } std::tuple task_manager::steal_task(thread_state &stealing_state) { PLS_ASSERT(stealing_state.get_active_task()->depth_ == 0, "Must only steal with clean task chain."); PLS_ASSERT_EXPENSIVE(scheduler::check_task_chain(*stealing_state.get_active_task()), "Must only steal with clean task chain."); auto peek = deque_.peek_top(); if (peek.top_task_) { task *stolen_task = peek.top_task_; // get a suitable task to trade in task *traded_task = static_cast(scheduler::get_trade_task(stolen_task, stealing_state)); base_task *chain_after_stolen_task = traded_task->next_; // mark that we would like to push the traded in task traded_task->prepare_for_push(stealing_state.get_thread_id()); // perform the actual pop operation task *pop_result_task = deque_.pop_top(traded_task, peek); if (pop_result_task) { PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_, "It is impossible to steal an task we already own!"); PLS_ASSERT(pop_result_task == stolen_task, "We must only steal the task that we peeked at!"); // Update the resource stack associated with the stolen task. bool push_success = stolen_task->push_task_chain(traded_task, stealing_state.get_thread_id()); auto peeked_traded_object = external_trading_deque::peek_traded_object(stolen_task); task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task, peeked_traded_object); if (optional_exchanged_task) { PLS_ASSERT(optional_exchanged_task == traded_task, "We are currently executing this, no one else can put another task in this field!"); PLS_ASSERT(push_success, "Push must only be interrupted if someone took the task we tried to push!"); } else { // Someone explicitly took the traded task from us, remove it from the stack if we pushed it. if (push_success) { stolen_task->reset_task_chain(traded_task); } } return std::tuple{stolen_task, chain_after_stolen_task, true}; } else { return std::tuple{nullptr, nullptr, false}; } } else { return std::tuple{nullptr, nullptr, true}; } } base_task *task_manager::pop_clean_task_chain(base_task *base_task) { task *target_task = static_cast(base_task); traded_cas_field peeked_task_cas_before, peeked_task_cas_after; peeked_task_cas_after = external_trading_deque::peek_traded_object(target_task); while (true) { // Try to get a clean resource chain to go back to the main stealing loop peeked_task_cas_before = peeked_task_cas_after; task *pop_result = target_task->pop_task_chain(); if (pop_result) { PLS_ASSERT(scheduler::check_task_chain_backward(*pop_result), "Must only pop proper task chains."); return pop_result; // Got something, so we are simply done here } peeked_task_cas_after = external_trading_deque::peek_traded_object(target_task); if (peeked_task_cas_before != peeked_task_cas_after) { continue; } PLS_ASSERT(!peeked_task_cas_after.is_filled_with_trade_request(), "The resource stack must never be empty while the task is up for being stolen."); if (peeked_task_cas_after.is_empty()) { // The task was 'stable' during our pop from the stack. // Or in other words: no other thread operated on the task. // We are therefore the last child and do not get a clean task chain. return nullptr; } // The task was stable, but has a potential resource attached in its cas field. // Try to get it to not be blocked by the other preempted task. task *optional_cas_task = external_trading_deque::get_trade_object(target_task, peeked_task_cas_after); if (optional_cas_task) { // We got it, thus the other thread has not got it and will remove it from the queue. return optional_cas_task; } } } }