task_manager.cpp 8.89 KB
Newer Older
1 2 3 4
#include "pls/internal/scheduling/task_manager.h"

#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/thread_state.h"
5
#include "pls/internal/scheduling/scheduler.h"
6

7
namespace pls::internal::scheduling {
8

9
task_manager::task_manager(unsigned thread_id,
10 11
                           size_t num_tasks,
                           size_t stack_size,
12
                           std::shared_ptr<stack_allocator> &stack_allocator) : stack_allocator_{stack_allocator},
13 14
                                                                               tasks_{},
                                                                               deque_{thread_id, num_tasks} {
15 16
  tasks_.reserve(num_tasks);

17
  for (size_t i = 0; i < num_tasks - 1; i++) {
18
    char *stack_memory = stack_allocator->allocate_stack(stack_size);
19 20
    tasks_.emplace_back(std::make_unique<task>(stack_memory, stack_size, i, thread_id));

21
    if (i > 0) {
22 23
      tasks_[i - 1]->next_ = tasks_[i].get();
      tasks_[i]->prev_ = tasks_[i - 1].get();
24 25
    }
  }
26 27 28 29 30
  active_task_ = tasks_[0].get();
}

task_manager::~task_manager() {
  for (auto &task : tasks_) {
31
    stack_allocator_->free_stack(task->stack_size_, task->stack_memory_);
32
  }
33 34
}

35
static task &find_task(unsigned id, unsigned depth) {
36 37 38
  return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_this_thread_task(depth);
}

39
task *task_manager::steal_task(task_manager &stealing_task_manager) {
40
  PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain.");
41
  PLS_ASSERT(stealing_task_manager.check_task_chain(), "Must only steal with clean task chain.");
42 43

  auto peek = deque_.peek_top();
44
  if (peek.top_task_) {
45
    // search for the task we want to trade in
46
    task *stolen_task = *peek.top_task_;
47
    task *traded_task = stealing_task_manager.active_task_;
48
    for (unsigned i = 0; i < stolen_task->depth_; i++) {
49 50 51
      traded_task = traded_task->next_;
    }

52 53
    // keep a reference to the rest of the task chain that we keep
    task *next_own_task = traded_task->next_;
54
    // 'unchain' the traded tasks (to help us find bugs)
55 56
    traded_task->next_ = nullptr;

57
    // perform the actual pop operation
58
    auto pop_result_task = deque_.pop_top(traded_task, peek);
59 60
    if (pop_result_task) {
      PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_,
61
                 "It is impossible to steal an task we already own!");
62
      PLS_ASSERT(*pop_result_task == stolen_task,
63
                 "We must only steal the task that we peeked at!");
64

65
      // the steal was a success, link the chain so we own the stolen part
66 67 68
      stolen_task->next_ = next_own_task;
      next_own_task->prev_ = stolen_task;
      stealing_task_manager.active_task_ = stolen_task;
69

70
      return traded_task;
71
    } else {
72 73 74
      // the steal failed, reset our chain to its old, clean state (re-link what we have broken)
      traded_task->next_ = next_own_task;

75
      return nullptr;
76 77
    }
  } else {
78
    return nullptr;
79
  }
80 81
}

82 83 84 85 86 87 88 89 90 91 92 93 94 95
void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) {
  PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_,
             "Makes no sense to push task onto itself, as it is not clean by definition.");
  PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth.");

  data_structures::stamped_integer current_root;
  data_structures::stamped_integer target_root;
  do {
    current_root = target_task->resource_stack_root_.load();
    target_root.stamp = current_root.stamp + 1;
    target_root.value = spare_task_chain->thread_id_ + 1;

    if (current_root.value == 0) {
      // Empty, simply push in with no successor
96
      spare_task_chain->resource_stack_next_.store(nullptr);
97 98
    } else {
      // Already an entry. Find it's corresponding task and set it as our successor.
99 100
      auto &current_root_task = find_task(current_root.value - 1, target_task->depth_);
      spare_task_chain->resource_stack_next_.store(&current_root_task);
101 102 103 104 105
    }

  } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
}

106 107 108
task *task_manager::pop_resource_from_task(task *target_task) {
  data_structures::stamped_integer current_root;
  data_structures::stamped_integer target_root;
109
  task *output_task;
110 111 112 113 114 115 116
  do {
    current_root = target_task->resource_stack_root_.load();
    if (current_root.value == 0) {
      // Empty...
      return nullptr;
    } else {
      // Found something, try to pop it
117 118
      auto &current_root_task = find_task(current_root.value - 1, target_task->depth_);
      auto *next_stack_task = current_root_task.resource_stack_next_.load();
119 120

      target_root.stamp = current_root.stamp + 1;
121
      target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
122

123
      output_task = &current_root_task;
124
    }
125 126
  } while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));

127
  PLS_ASSERT(check_task_chain_backward(output_task), "Must only pop proper task chains.");
128
  output_task->resource_stack_next_.store(nullptr);
129
  return output_task;
130 131
}

132
void task_manager::sync() {
133 134 135
  auto *spawning_task_manager = this;
  auto *last_task = spawning_task_manager->active_task_;
  auto *spawned_task = spawning_task_manager->active_task_->next_;
136

137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
  if (last_task->is_synchronized_) {
    return; // We are already the sole owner of last_task
  } else {
    auto continuation = spawned_task->run_as_task([=](context_switcher::continuation cont) {
      last_task->continuation_ = std::move(cont);
      spawning_task_manager->active_task_ = spawned_task;

      context_switcher::continuation result_cont;
      if (spawning_task_manager->try_clean_return(result_cont)) {
        // We return back to the main scheduling loop
        return result_cont;
      } else {
        // We finish up the last task
        return result_cont;
      }
    });

    PLS_ASSERT(!continuation.valid(),
               "We only return to a sync point, never jump to it directly."
               "This must therefore never return an unfinished fiber/continuation.");

    return; // We cleanly synced to the last one finishing work on last_task
  }
160 161 162 163 164 165
}

bool task_manager::try_clean_return(context_switcher::continuation &result_cont) {
  task *this_task = active_task_;
  task *last_task = active_task_->prev_;

166 167
  PLS_ASSERT(last_task != nullptr,
             "Must never try to return from a task at level 0 (no last task), as we must have a target to return to.");
168 169 170 171 172

  // Try to get a clean resource chain to go back to the main stealing loop
  task *clean_chain = pop_resource_from_task(last_task);
  if (clean_chain == nullptr) {
    // double-check if we are really last one or we only have unlucky timing
173 174 175 176 177
    auto optional_cas_task = external_trading_deque::get_trade_object(last_task);
    if (optional_cas_task) {
      clean_chain = *optional_cas_task;
    } else {
      clean_chain = pop_resource_from_task(last_task);
178 179 180 181 182 183 184
    }
  }

  if (clean_chain != nullptr) {
    // We got a clean chain to continue working on.
    PLS_ASSERT(last_task->depth_ == clean_chain->depth_,
               "Resources must only reside in the correct depth!");
185 186 187 188
    PLS_ASSERT(clean_chain != last_task,
               "We want to swap out the last task and its chain to use a clean one, thus they must differ.");
    PLS_ASSERT(check_task_chain_backward(clean_chain),
               "Can only acquire clean chains for clean returns!");
189 190
    this_task->prev_ = clean_chain;
    clean_chain->next_ = this_task;
191

192 193 194 195 196 197
    // Walk back chain to make first task active
    active_task_ = clean_chain;
    while (active_task_->prev_ != nullptr) {
      active_task_ = active_task_->prev_;
    }

198
    // jump back to the continuation in main scheduling loop, time to steal some work
199 200
    result_cont = std::move(thread_state::get().main_continuation());
    PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
201 202
    return true;
  } else {
203 204 205 206
    // Make sure that we are owner fo this full continuation/task chain.
    last_task->next_ = this_task;
    this_task->prev_ = last_task;

207 208 209
    // We are the last one working on this task. Thus the sync must be finished, continue working.
    active_task_ = last_task;

210
    last_task->is_synchronized_ = true;
211
    result_cont = std::move(last_task->continuation_);
212
    PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
213 214 215 216 217 218
    return false;
  }
}

bool task_manager::check_task_chain_forward(task *start_task) {
  while (start_task->next_ != nullptr) {
219 220 221
    if (start_task->next_->prev_ != start_task) {
      return false;
    }
222 223 224 225 226 227 228
    start_task = start_task->next_;
  }
  return true;
}

bool task_manager::check_task_chain_backward(task *start_task) {
  while (start_task->prev_ != nullptr) {
229 230 231
    if (start_task->prev_->next_ != start_task) {
      return false;
    }
232 233 234 235 236 237
    start_task = start_task->prev_;
  }
  return true;
}

bool task_manager::check_task_chain() {
238
  return check_task_chain_backward(active_task_) && check_task_chain_forward(active_task_);
239 240
}

241
}