task_manager.cpp 6.89 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/scheduler.h"

#include "pls/internal/scheduling/lock_free/task_manager.h"
#include "pls/internal/scheduling/lock_free/task.h"

namespace pls::internal::scheduling::lock_free {

task_manager::task_manager(unsigned thread_id,
                           size_t num_tasks,
                           size_t stack_size,
                           std::shared_ptr<stack_allocator> &stack_allocator) : stack_allocator_{stack_allocator},
                                                                                tasks_{},
                                                                                deque_{thread_id, num_tasks} {
  tasks_.reserve(num_tasks);

  for (size_t i = 0; i < num_tasks - 1; i++) {
    char *stack_memory = stack_allocator->allocate_stack(stack_size);
    tasks_.emplace_back(std::make_unique<task>(stack_memory, stack_size, i, thread_id));

    if (i > 0) {
      tasks_[i - 1]->next_ = tasks_[i].get();
      tasks_[i]->prev_ = tasks_[i - 1].get();
    }
  }
}

task_manager::~task_manager() {
  for (auto &task : tasks_) {
    stack_allocator_->free_stack(task->stack_size_, task->stack_memory_);
  }
}

static task *find_task(unsigned id, unsigned depth) {
  return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth);
}

void task_manager::push_local_task(base_task *pushed_task) {
  deque_.push_bot(static_cast<task *>(pushed_task));
}

base_task *task_manager::pop_local_task() {
  auto result = deque_.pop_bot();
  if (result) {
    return *result;
  } else {
    return nullptr;
  }
}

base_task *task_manager::steal_task(thread_state &stealing_state) {
  PLS_ASSERT(stealing_state.get_active_task()->depth_ == 0, "Must only steal with clean task chain.");
  PLS_ASSERT(scheduler::check_task_chain(*stealing_state.get_active_task()), "Must only steal with clean task chain.");

  auto peek = deque_.peek_top();
  if (peek.top_task_) {
    // search for the task we want to trade in
    task *stolen_task = static_cast<task *>(*peek.top_task_);
    task *traded_task = static_cast<task *>(&scheduler::task_chain_at(stolen_task->depth_, stealing_state));

    // keep a reference to the rest of the task chain that we keep
    base_task *next_own_task = traded_task->next_;
    // 'unchain' the traded tasks (to help us find bugs)
    traded_task->next_ = nullptr;

    // perform the actual pop operation
    auto pop_result_task = deque_.pop_top(traded_task, peek);
    if (pop_result_task) {
      PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_,
                 "It is impossible to steal an task we already own!");
      PLS_ASSERT(*pop_result_task == stolen_task,
                 "We must only steal the task that we peeked at!");

      // TODO: the re-chaining should not be part of the task manager.
      //  The manager should only perform the steal + resource push.

      // the steal was a success, link the chain so we own the stolen part
      stolen_task->next_ = next_own_task;
      next_own_task->prev_ = stolen_task;

      // update the resource stack associated with the stolen task
      push_resource_on_task(stolen_task, traded_task);

      auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task);
      if (optional_exchanged_task) {
        // All good, we pushed the task over to the stack, nothing more to do
        PLS_ASSERT(*optional_exchanged_task == traded_task,
                   "We are currently executing this, no one else can put another task in this field!");
      } else {
        // The last other active thread took it as its spare resource...
        // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
        auto current_root = stolen_task->resource_stack_root_.load();
        current_root.stamp++;
        current_root.value = 0;
        stolen_task->resource_stack_root_.store(current_root);
      }

      return stolen_task;
    } else {
      // the steal failed, reset our chain to its old, clean state (re-link what we have broken)
      traded_task->next_ = next_own_task;

      return nullptr;
    }
  } else {
    return nullptr;
  }
}

base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
  task *popped_task = static_cast<task *>(base_task);
  // Try to get a clean resource chain to go back to the main stealing loop
  task *clean_chain = pop_resource_from_task(popped_task);
  if (clean_chain == nullptr) {
    // double-check if we are really last one or we only have unlucky timing
    auto optional_cas_task = external_trading_deque::get_trade_object(popped_task);
    if (optional_cas_task) {
      clean_chain = *optional_cas_task;
    } else {
      clean_chain = pop_resource_from_task(popped_task);
    }
  }

  return clean_chain;
}

void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) {
  PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_,
             "Makes no sense to push task onto itself, as it is not clean by definition.");
  PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_,
             "Must only push tasks with correct depth.");

  data_structures::stamped_integer current_root;
  data_structures::stamped_integer target_root;
  do {
    current_root = target_task->resource_stack_root_.load();
    target_root.stamp = current_root.stamp + 1;
    target_root.value = spare_task_chain->thread_id_ + 1;

    if (current_root.value == 0) {
      // Empty, simply push in with no successor
      spare_task_chain->resource_stack_next_.store(nullptr);
    } else {
      // Already an entry. Find it's corresponding task and set it as our successor.
      auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
      spare_task_chain->resource_stack_next_.store(current_root_task);
    }

  } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
}

task *task_manager::pop_resource_from_task(task *target_task) {
  data_structures::stamped_integer current_root;
  data_structures::stamped_integer target_root;
  task *output_task;
  do {
    current_root = target_task->resource_stack_root_.load();
    if (current_root.value == 0) {
      // Empty...
      return nullptr;
    } else {
      // Found something, try to pop it
      auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
      auto *next_stack_task = current_root_task->resource_stack_next_.load();

      target_root.stamp = current_root.stamp + 1;
      target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;

      output_task = current_root_task;
    }
  } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));

  PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains.");
  output_task->resource_stack_next_.store(nullptr);
  return output_task;
}

}