scheduler.cpp 5.07 KB
Newer Older
1
#include "pls/internal/scheduling/scheduler.h"
2

3 4
#include "context_switcher/context_switcher.h"

5
#include "pls/internal/base/thread.h"
6
#include "pls/internal/base/error_handling.h"
7

8
namespace pls::internal::scheduling {
9

10
scheduler::scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread) :
11
    num_threads_{num_threads},
12 13
    reuse_thread_{reuse_thread},
    sync_barrier_{num_threads + 1 - reuse_thread},
14 15 16 17
    worker_threads_{},
    thread_states_{},
    main_thread_starter_function_{nullptr},
    work_section_done_{false},
18 19
    terminated_{false} {

20 21 22
  worker_threads_.reserve(num_threads);
  task_managers_.reserve(num_threads);
  thread_states_.reserve(num_threads);
23
  for (unsigned int i = 0; i < num_threads_; i++) {
24 25 26
    auto &this_task_manager =
        task_managers_.emplace_back(std::make_unique<task_manager>(i, computation_depth, stack_size, stack_allocator_));
    auto &this_thread_state = thread_states_.emplace_back(std::make_unique<thread_state>(*this, i, *this_task_manager));
27 28

    if (reuse_thread && i == 0) {
29
      worker_threads_.emplace_back();
30 31
      continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
    }
32 33 34 35 36 37

    auto *this_thread_state_pointer = this_thread_state.get();
    worker_threads_.emplace_back([this_thread_state_pointer] {
      thread_state::set(this_thread_state_pointer);
      work_thread_main_loop();
    });
38 39 40 41 42 43 44
  }
}

scheduler::~scheduler() {
  terminate();
}

45 46
void scheduler::work_thread_main_loop() {
  auto &scheduler = thread_state::get().get_scheduler();
47
  while (true) {
48
    // Wait to be triggered
49
    scheduler.sync_barrier_.wait();
50 51

    // Check for shutdown
52
    if (scheduler.terminated_) {
53 54 55
      return;
    }

56
    scheduler.work_thread_work_section();
57

58
    // Sync back with main thread
59 60 61 62 63 64
    scheduler.sync_barrier_.wait();
  }
}

void scheduler::work_thread_work_section() {
  auto &my_state = thread_state::get();
65
  auto &my_task_manager = my_state.get_task_manager();
66 67

  auto const num_threads = my_state.get_scheduler().num_threads();
68

69
  if (my_state.get_thread_id() == 0) {
70
    // Main Thread, kick off by executing the user's main code block.
71
    main_thread_starter_function_->run();
72
  }
73

74
  unsigned int failed_steals = 0;
75
  while (!work_section_done_) {
76 77
    PLS_ASSERT(my_task_manager.check_task_chain(), "Must start stealing with a clean task chain.");

78 79
    // TODO: move steal routine into separate function
    const size_t target = my_state.get_rand() % num_threads;
80
    if (target == my_state.get_thread_id()) {
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
      continue;
    }

    auto &target_state = my_state.get_scheduler().thread_state_for(target);
    task *traded_task = target_state.get_task_manager().steal_task(my_task_manager);

    if (traded_task != nullptr) {
      // The stealing procedure correctly changed our chain and active task.
      // Now we need to perform the 'post steal' actions (manage resources and execute the stolen task).
      PLS_ASSERT(my_task_manager.check_task_chain_forward(&my_task_manager.get_active_task()),
                 "We are sole owner of this chain, it has to be valid!");

      // Move the traded in resource of this active task over to the stack of resources.
      auto *stolen_task = &my_task_manager.get_active_task();
      // Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns.
      my_task_manager.push_resource_on_task(stolen_task, traded_task);

      auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task);
      if (optional_exchanged_task) {
        // All good, we pushed the task over to the stack, nothing more to do
        PLS_ASSERT(*optional_exchanged_task == traded_task,
                   "We are currently executing this, no one else can put another task in this field!");
      } else {
        // The last other active thread took it as its spare resource...
        // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
        auto current_root = stolen_task->resource_stack_root_.load();
        current_root.stamp++;
        current_root.value = 0;
        stolen_task->resource_stack_root_.store(current_root);
110
      }
111 112 113 114

      // Execute the stolen task by jumping to it's continuation.
      PLS_ASSERT(stolen_task->continuation_.valid(),
                 "A task that we can steal must have a valid continuation for us to start working.");
115
      stolen_task->is_synchronized_ = false;
116 117
      context_switcher::switch_context(std::move(stolen_task->continuation_));
      // We will continue execution in this line when we finished the stolen work.
118 119 120 121 122 123
      failed_steals = 0;
    } else {
      failed_steals++;
      if (failed_steals >= num_threads) {
        base::this_thread::yield();
      }
124 125
    }
  }
126 127
}

128
void scheduler::terminate() {
129 130 131 132 133 134 135
  if (terminated_) {
    return;
  }

  terminated_ = true;
  sync_barrier_.wait();

136 137 138
  for (unsigned int i = 0; i < num_threads_; i++) {
    if (reuse_thread_ && i == 0) {
      continue;
139
    }
140
    worker_threads_[i].join();
141 142 143
  }
}

144 145 146 147
void scheduler::sync() {
  thread_state::get().get_task_manager().sync();
}

148
}