task_manager.cpp 8.93 KB
Newer Older
1 2 3 4
#include "pls/internal/scheduling/task_manager.h"

#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/thread_state.h"
5
#include "pls/internal/scheduling/scheduler.h"
6

7
namespace pls::internal::scheduling {
8

9
task_manager::task_manager(unsigned thread_id,
10 11
                           size_t num_tasks,
                           size_t stack_size,
12 13 14 15 16 17
                           stack_allocator &stack_allocator) : num_tasks_{num_tasks},
                                                               stack_allocator_{stack_allocator},
                                                               tasks_{},
                                                               deque_{thread_id, num_tasks_} {
  tasks_.reserve(num_tasks);

18
  for (size_t i = 0; i < num_tasks - 1; i++) {
19 20 21
    char *stack_memory = stack_allocator.allocate_stack(stack_size);
    tasks_.emplace_back(std::make_unique<task>(stack_memory, stack_size, i, thread_id));

22
    if (i > 0) {
23 24
      tasks_[i - 1]->next_ = tasks_[i].get();
      tasks_[i]->prev_ = tasks_[i - 1].get();
25 26
    }
  }
27 28 29 30 31 32 33
  active_task_ = tasks_[0].get();
}

task_manager::~task_manager() {
  for (auto &task : tasks_) {
    stack_allocator_.free_stack(task->stack_size_, task->stack_memory_);
  }
34 35
}

36
static task &find_task(unsigned id, unsigned depth) {
37 38 39
  return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth);
}

40
task *task_manager::steal_task(task_manager &stealing_task_manager) {
41
  PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain.");
42
  PLS_ASSERT(stealing_task_manager.check_task_chain(), "Must only steal with clean task chain.");
43 44

  auto peek = deque_.peek_top();
45
  if (peek.top_task_) {
46
    // search for the task we want to trade in
47
    task *stolen_task = *peek.top_task_;
48
    task *traded_task = stealing_task_manager.active_task_;
49
    for (unsigned i = 0; i < stolen_task->depth_; i++) {
50 51 52
      traded_task = traded_task->next_;
    }

53 54
    // keep a reference to the rest of the task chain that we keep
    task *next_own_task = traded_task->next_;
55
    // 'unchain' the traded tasks (to help us find bugs)
56 57
    traded_task->next_ = nullptr;

58
    // perform the actual pop operation
59
    auto pop_result_task = deque_.pop_top(traded_task, peek);
60 61
    if (pop_result_task) {
      PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_,
62
                 "It is impossible to steal an task we already own!");
63
      PLS_ASSERT(*pop_result_task == stolen_task,
64
                 "We must only steal the task that we peeked at!");
65

66
      // the steal was a success, link the chain so we own the stolen part
67 68 69
      stolen_task->next_ = next_own_task;
      next_own_task->prev_ = stolen_task;
      stealing_task_manager.active_task_ = stolen_task;
70

71
      return traded_task;
72
    } else {
73 74 75
      // the steal failed, reset our chain to its old, clean state (re-link what we have broken)
      traded_task->next_ = next_own_task;

76
      return nullptr;
77 78
    }
  } else {
79
    return nullptr;
80
  }
81 82
}

83 84 85 86 87 88 89 90 91 92 93 94 95 96
void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) {
  PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_,
             "Makes no sense to push task onto itself, as it is not clean by definition.");
  PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth.");

  data_structures::stamped_integer current_root;
  data_structures::stamped_integer target_root;
  do {
    current_root = target_task->resource_stack_root_.load();
    target_root.stamp = current_root.stamp + 1;
    target_root.value = spare_task_chain->thread_id_ + 1;

    if (current_root.value == 0) {
      // Empty, simply push in with no successor
97
      spare_task_chain->resource_stack_next_.store(nullptr);
98 99
    } else {
      // Already an entry. Find it's corresponding task and set it as our successor.
100 101
      auto &current_root_task = find_task(current_root.value - 1, target_task->depth_);
      spare_task_chain->resource_stack_next_.store(&current_root_task);
102 103 104 105 106
    }

  } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
}

107 108 109
task *task_manager::pop_resource_from_task(task *target_task) {
  data_structures::stamped_integer current_root;
  data_structures::stamped_integer target_root;
110
  task *output_task;
111 112 113 114 115 116 117
  do {
    current_root = target_task->resource_stack_root_.load();
    if (current_root.value == 0) {
      // Empty...
      return nullptr;
    } else {
      // Found something, try to pop it
118 119
      auto &current_root_task = find_task(current_root.value - 1, target_task->depth_);
      auto *next_stack_task = current_root_task.resource_stack_next_.load();
120 121

      target_root.stamp = current_root.stamp + 1;
122
      target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
123

124
      output_task = &current_root_task;
125
    }
126 127
  } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));

128
  PLS_ASSERT(check_task_chain_backward(output_task), "Must only pop proper task chains.");
129
  output_task->resource_stack_next_.store(nullptr);
130
  return output_task;
131 132
}

133
void task_manager::sync() {
134 135 136
  auto *spawning_task_manager = this;
  auto *last_task = spawning_task_manager->active_task_;
  auto *spawned_task = spawning_task_manager->active_task_->next_;
137

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
  if (last_task->is_synchronized_) {
    return; // We are already the sole owner of last_task
  } else {
    auto continuation = spawned_task->run_as_task([=](context_switcher::continuation cont) {
      last_task->continuation_ = std::move(cont);
      spawning_task_manager->active_task_ = spawned_task;

      context_switcher::continuation result_cont;
      if (spawning_task_manager->try_clean_return(result_cont)) {
        // We return back to the main scheduling loop
        return result_cont;
      } else {
        // We finish up the last task
        return result_cont;
      }
    });

    PLS_ASSERT(!continuation.valid(),
               "We only return to a sync point, never jump to it directly."
               "This must therefore never return an unfinished fiber/continuation.");

    return; // We cleanly synced to the last one finishing work on last_task
  }
161 162 163 164 165 166
}

bool task_manager::try_clean_return(context_switcher::continuation &result_cont) {
  task *this_task = active_task_;
  task *last_task = active_task_->prev_;

167 168
  PLS_ASSERT(last_task != nullptr,
             "Must never try to return from a task at level 0 (no last task), as we must have a target to return to.");
169 170 171 172 173

  // Try to get a clean resource chain to go back to the main stealing loop
  task *clean_chain = pop_resource_from_task(last_task);
  if (clean_chain == nullptr) {
    // double-check if we are really last one or we only have unlucky timing
174 175 176 177 178
    auto optional_cas_task = external_trading_deque::get_trade_object(last_task);
    if (optional_cas_task) {
      clean_chain = *optional_cas_task;
    } else {
      clean_chain = pop_resource_from_task(last_task);
179 180 181 182 183 184 185
    }
  }

  if (clean_chain != nullptr) {
    // We got a clean chain to continue working on.
    PLS_ASSERT(last_task->depth_ == clean_chain->depth_,
               "Resources must only reside in the correct depth!");
186 187 188 189
    PLS_ASSERT(clean_chain != last_task,
               "We want to swap out the last task and its chain to use a clean one, thus they must differ.");
    PLS_ASSERT(check_task_chain_backward(clean_chain),
               "Can only acquire clean chains for clean returns!");
190 191
    this_task->prev_ = clean_chain;
    clean_chain->next_ = this_task;
192

193 194 195 196 197 198
    // Walk back chain to make first task active
    active_task_ = clean_chain;
    while (active_task_->prev_ != nullptr) {
      active_task_ = active_task_->prev_;
    }

199
    // jump back to the continuation in main scheduling loop, time to steal some work
200 201
    result_cont = std::move(thread_state::get().main_continuation());
    PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
202 203
    return true;
  } else {
204 205 206 207
    // Make sure that we are owner fo this full continuation/task chain.
    last_task->next_ = this_task;
    this_task->prev_ = last_task;

208 209 210
    // We are the last one working on this task. Thus the sync must be finished, continue working.
    active_task_ = last_task;

211
    last_task->is_synchronized_ = true;
212
    result_cont = std::move(last_task->continuation_);
213
    PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
214 215 216 217 218 219
    return false;
  }
}

bool task_manager::check_task_chain_forward(task *start_task) {
  while (start_task->next_ != nullptr) {
220 221 222
    if (start_task->next_->prev_ != start_task) {
      return false;
    }
223 224 225 226 227 228 229
    start_task = start_task->next_;
  }
  return true;
}

bool task_manager::check_task_chain_backward(task *start_task) {
  while (start_task->prev_ != nullptr) {
230 231 232
    if (start_task->prev_->next_ != start_task) {
      return false;
    }
233 234 235 236 237 238
    start_task = start_task->prev_;
  }
  return true;
}

bool task_manager::check_task_chain() {
239
  return check_task_chain_backward(active_task_) && check_task_chain_forward(active_task_);
240 241
}

242
}