task_manager.cpp 4.05 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/scheduler.h"

#include "pls/internal/scheduling/lock_free/task_manager.h"
#include "pls/internal/scheduling/lock_free/task.h"

namespace pls::internal::scheduling::lock_free {

task_manager::task_manager(unsigned thread_id,
                           size_t num_tasks,
                           size_t stack_size,
                           std::shared_ptr<stack_allocator> &stack_allocator) : stack_allocator_{stack_allocator},
                                                                                tasks_{},
                                                                                deque_{thread_id, num_tasks} {
  tasks_.reserve(num_tasks);

17
  for (size_t i = 0; i < num_tasks; i++) {
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
    char *stack_memory = stack_allocator->allocate_stack(stack_size);
    tasks_.emplace_back(std::make_unique<task>(stack_memory, stack_size, i, thread_id));

    if (i > 0) {
      tasks_[i - 1]->next_ = tasks_[i].get();
      tasks_[i]->prev_ = tasks_[i - 1].get();
    }
  }
}

task_manager::~task_manager() {
  for (auto &task : tasks_) {
    stack_allocator_->free_stack(task->stack_size_, task->stack_memory_);
  }
}

void task_manager::push_local_task(base_task *pushed_task) {
  deque_.push_bot(static_cast<task *>(pushed_task));
}

base_task *task_manager::pop_local_task() {
  auto result = deque_.pop_bot();
  if (result) {
    return *result;
  } else {
    return nullptr;
  }
}

47
std::tuple<base_task *, base_task *> task_manager::steal_task(thread_state &stealing_state) {
48 49 50 51 52 53
  PLS_ASSERT(stealing_state.get_active_task()->depth_ == 0, "Must only steal with clean task chain.");
  PLS_ASSERT(scheduler::check_task_chain(*stealing_state.get_active_task()), "Must only steal with clean task chain.");

  auto peek = deque_.peek_top();
  if (peek.top_task_) {
    task *stolen_task = static_cast<task *>(*peek.top_task_);
54 55
    // get a suitable task to trade in
    // TODO: opt. add debug marker to traded in tasks that we do not accidentally use them.
56
    task *traded_task = static_cast<task *>(&scheduler::task_chain_at(stolen_task->depth_, stealing_state));
57
    base_task *chain_after_stolen_task = traded_task->next_;
58 59 60 61 62 63 64 65 66 67

    // perform the actual pop operation
    auto pop_result_task = deque_.pop_top(traded_task, peek);
    if (pop_result_task) {
      PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_,
                 "It is impossible to steal an task we already own!");
      PLS_ASSERT(*pop_result_task == stolen_task,
                 "We must only steal the task that we peeked at!");

      // update the resource stack associated with the stolen task
68
      stolen_task->push_task_chain(traded_task);
69 70 71 72 73 74 75 76 77

      auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task);
      if (optional_exchanged_task) {
        // All good, we pushed the task over to the stack, nothing more to do
        PLS_ASSERT(*optional_exchanged_task == traded_task,
                   "We are currently executing this, no one else can put another task in this field!");
      } else {
        // The last other active thread took it as its spare resource...
        // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
78
        stolen_task->reset_task_chain();
79 80
      }

81
      return std::pair{stolen_task, chain_after_stolen_task};
82
    } else {
83
      return std::pair{nullptr, nullptr};
84 85
    }
  } else {
86
    return std::pair{nullptr, nullptr};
87 88 89 90 91 92
  }
}

base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
  task *popped_task = static_cast<task *>(base_task);
  // Try to get a clean resource chain to go back to the main stealing loop
93
  task *clean_chain = popped_task->pop_task_chain();
94 95 96 97 98 99
  if (clean_chain == nullptr) {
    // double-check if we are really last one or we only have unlucky timing
    auto optional_cas_task = external_trading_deque::get_trade_object(popped_task);
    if (optional_cas_task) {
      clean_chain = *optional_cas_task;
    } else {
100
      clean_chain = popped_task->pop_task_chain();
101 102 103 104 105 106 107
    }
  }

  return clean_chain;
}

}