task_manager.cpp 5.45 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/scheduler.h"

#include "pls/internal/scheduling/lock_free/task_manager.h"
#include "pls/internal/scheduling/lock_free/task.h"

namespace pls::internal::scheduling::lock_free {

task_manager::task_manager(unsigned thread_id,
                           size_t num_tasks,
                           size_t stack_size,
                           std::shared_ptr<stack_allocator> &stack_allocator) : stack_allocator_{stack_allocator},
                                                                                tasks_{},
                                                                                deque_{thread_id, num_tasks} {
  tasks_.reserve(num_tasks);

17
  for (size_t i = 0; i < num_tasks; i++) {
18
    char *stack_memory = stack_allocator->allocate_stack(stack_size);
19 20 21
    for (size_t j = 0; j < stack_size; j += base::system_details::CACHE_LINE_SIZE) {
      stack_memory[j] = 'a'; // Touch all stacks, forces the OS to actually allocate them
    }
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
    tasks_.emplace_back(std::make_unique<task>(stack_memory, stack_size, i, thread_id));

    if (i > 0) {
      tasks_[i - 1]->next_ = tasks_[i].get();
      tasks_[i]->prev_ = tasks_[i - 1].get();
    }
  }
}

task_manager::~task_manager() {
  for (auto &task : tasks_) {
    stack_allocator_->free_stack(task->stack_size_, task->stack_memory_);
  }
}

void task_manager::push_local_task(base_task *pushed_task) {
  deque_.push_bot(static_cast<task *>(pushed_task));
}

base_task *task_manager::pop_local_task() {
42
  return deque_.pop_bot();
43 44
}

45
std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state &stealing_state) {
46
  PLS_ASSERT(stealing_state.get_active_task()->depth_ == 0, "Must only steal with clean task chain.");
47
  PLS_ASSERT_EXPENSIVE(scheduler::check_task_chain(*stealing_state.get_active_task()), "Must only steal with clean task chain.");
48 49 50

  auto peek = deque_.peek_top();
  if (peek.top_task_) {
51
    task *stolen_task = peek.top_task_;
52
    // get a suitable task to trade in
53
    task *traded_task = static_cast<task *>(scheduler::get_trade_task(stolen_task, stealing_state));
54
    base_task *chain_after_stolen_task = traded_task->next_;
55

56 57
    // mark that we would like to push the traded in task
    traded_task->prepare_for_push(stealing_state.get_thread_id());
58

59
    // perform the actual pop operation
60
    task *pop_result_task = deque_.pop_top(traded_task, peek);
61 62 63
    if (pop_result_task) {
      PLS_ASSERT(stolen_task->thread_id_ != traded_task->thread_id_,
                 "It is impossible to steal an task we already own!");
64
      PLS_ASSERT(pop_result_task == stolen_task,
65 66
                 "We must only steal the task that we peeked at!");

67
      // Update the resource stack associated with the stolen task.
68
      bool push_success = stolen_task->push_task_chain(traded_task, stealing_state.get_thread_id());
69

70
      auto peeked_traded_object = external_trading_deque::peek_traded_object(stolen_task);
71
      task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task, peeked_traded_object);
72
      if (optional_exchanged_task) {
73
        PLS_ASSERT(optional_exchanged_task == traded_task,
74
                   "We are currently executing this, no one else can put another task in this field!");
75 76
        PLS_ASSERT(push_success,
                   "Push must only be interrupted if someone took the task we tried to push!");
77
      } else {
78 79 80 81
        // Someone explicitly took the traded task from us, remove it from the stack if we pushed it.
        if (push_success) {
          stolen_task->reset_task_chain(traded_task);
        }
82 83
      }

84
      return std::tuple{stolen_task, chain_after_stolen_task, true};
85
    } else {
86
      return std::tuple{nullptr, nullptr, false};
87 88
    }
  } else {
89
    return std::tuple{nullptr, nullptr, true};
90 91 92 93
  }
}

base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
94 95
  task *target_task = static_cast<task *>(base_task);

96 97
  traded_cas_field peeked_task_cas_before, peeked_task_cas_after;
  peeked_task_cas_after = external_trading_deque::peek_traded_object(target_task);
98 99
  while (true) {
    // Try to get a clean resource chain to go back to the main stealing loop
100
    peeked_task_cas_before = peeked_task_cas_after;
101 102
    task *pop_result = target_task->pop_task_chain();
    if (pop_result) {
103
      PLS_ASSERT(scheduler::check_task_chain_backward(*pop_result), "Must only pop proper task chains.");
104 105
      return pop_result; // Got something, so we are simply done here
    }
106
    peeked_task_cas_after = external_trading_deque::peek_traded_object(target_task);
107 108 109

    if (peeked_task_cas_before != peeked_task_cas_after) {
      continue;
110 111
    }

112 113
    PLS_ASSERT(!peeked_task_cas_after.is_filled_with_trade_request(),
               "The resource stack must never be empty while the task is up for being stolen.");
114

115
    if (peeked_task_cas_after.is_empty()) {
116 117 118 119 120 121 122 123
      // The task was 'stable' during our pop from the stack.
      // Or in other words: no other thread operated on the task.
      // We are therefore the last child and do not get a clean task chain.
      return nullptr;
    }

    // The task was stable, but has a potential resource attached in its cas field.
    // Try to get it to not be blocked by the other preempted task.
124 125 126 127 128
    task *optional_cas_task = external_trading_deque::get_trade_object(target_task, peeked_task_cas_after);
    if (optional_cas_task) {
      // We got it, thus the other thread has not got it and will remove it from the queue.
      return optional_cas_task;
    }
129
  }
130 131 132
}

}