scheduler.cpp 5.48 KB
Newer Older
1
#include "pls/internal/scheduling/scheduler.h"
2

3 4
#include "context_switcher/context_switcher.h"

5
#include "pls/internal/scheduling/task_manager.h"
6 7
#include "pls/internal/scheduling/thread_state.h"

8
#include "pls/internal/base/thread.h"
9
#include "pls/internal/base/error_handling.h"
10 11

namespace pls {
12 13 14
namespace internal {
namespace scheduling {

15
scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, bool reuse_thread) :
16
    num_threads_{num_threads},
17
    reuse_thread_{reuse_thread},
18
    memory_{memory},
19
    sync_barrier_{num_threads + 1 - reuse_thread},
20
    terminated_{false} {
21
  if (num_threads_ > memory.max_threads()) {
22 23 24 25 26
    PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
  }

  for (unsigned int i = 0; i < num_threads_; i++) {
    // Placement new is required, as the memory of `memory_` is not required to be initialized.
27 28
    memory.thread_state_for(i).set_scheduler(this);
    memory.thread_state_for(i).set_id(i);
29
    memory.thread_state_for(i).get_task_manager().set_thread_id(i);
30 31 32 33

    if (reuse_thread && i == 0) {
      continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
    }
34
    memory.thread_for(i) = base::thread(&scheduler::work_thread_main_loop, &memory_.thread_state_for(i));
35 36 37 38 39 40 41
  }
}

scheduler::~scheduler() {
  terminate();
}

42 43
void scheduler::work_thread_main_loop() {
  auto &scheduler = thread_state::get().get_scheduler();
44
  while (true) {
45
    // Wait to be triggered
46
    scheduler.sync_barrier_.wait();
47 48

    // Check for shutdown
49
    if (scheduler.terminated_) {
50 51 52
      return;
    }

53
    scheduler.work_thread_work_section();
54

55
    // Sync back with main thread
56 57 58 59 60 61
    scheduler.sync_barrier_.wait();
  }
}

void scheduler::work_thread_work_section() {
  auto &my_state = thread_state::get();
62
  auto &my_task_manager = my_state.get_task_manager();
63 64 65

  auto const num_threads = my_state.get_scheduler().num_threads();
  auto const my_id = my_state.get_id();
66 67

  if (my_state.get_id() == 0) {
68
    // Main Thread, kick off by executing the user's main code block.
69
    main_thread_starter_function_->run();
70
  }
71

72
  while (!work_section_done_) {
73 74
    PLS_ASSERT(my_task_manager.check_task_chain(), "Must start stealing with a clean task chain.");

75 76
    // Steal Routine (will be continuously executed when there are no more fall through's).
    // TODO: move into separate function
77 78 79 80 81 82
    const size_t offset = my_state.get_rand() % num_threads;
    const size_t max_tries = num_threads;
    for (size_t i = 0; i < max_tries; i++) {
      // Perform steal
      size_t target = (offset + i) % num_threads;
      auto &target_state = my_state.get_scheduler().thread_state_for(target);
83 84 85 86 87 88 89 90 91 92
      bool steal_success = target_state.get_task_manager().steal_task(my_task_manager);

      if (steal_success) {
        // The stealing procedure correctly changed our chain and active task.
        // Now we need to perform the 'post steal' actions (manage resources and execute the stolen task).
        PLS_ASSERT(my_task_manager.check_task_chain_forward(&my_task_manager.get_active_task()),
                   "We are sole owner of this chain, it has to be valid!");

        // Move the traded in resource of this active task over to the stack of resources.
        auto *stolen_task = &my_task_manager.get_active_task();
93
        traded_cas_field stolen_task_cas = stolen_task->external_trading_deque_cas_.load();
94 95 96 97
        if (stolen_task_cas.is_filled_with_object()) {
          // Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns.
          auto *exchanged_task = stolen_task_cas.get_trade_object();
          my_task_manager.push_resource_on_task(stolen_task, exchanged_task);
98

99 100 101
          traded_cas_field empty_field;
          traded_cas_field expected_field;
          expected_field.fill_with_trade_object(exchanged_task);
102
          if (stolen_task->external_trading_deque_cas_.compare_exchange_strong(expected_field, empty_field)) {
103 104 105 106 107 108 109 110 111 112 113 114
            // All good, nothing more to do
          } else {
            // The last other active thread took it as its spare resource...
            // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
            PLS_ASSERT(expected_field.is_empty(),
                       "Must be empty, as otherwise no one will steal the 'spare traded task'.");

            auto current_root = stolen_task->resource_stack_root_.load();
            current_root.stamp++;
            current_root.value = 0;
            stolen_task->resource_stack_root_.store(current_root);
          }
115 116
        }

117
        // Execute the stolen task by jumping to it's continuation.
118 119 120 121 122 123 124 125
        PLS_ASSERT(stolen_task->continuation_.valid(),
                   "A task that we can steal must have a valid continuation for us to start working.");
        context_switcher::switch_context(std::move(stolen_task->continuation_));

        // ...now we are done with this steal attempt, loop over.
        break;
      }
    }
126 127 128
//    if (!my_cont_manager.falling_through()) {
//      base::this_thread::sleep(5);
//    }
129
  }
130 131
}

132
void scheduler::terminate() {
133 134 135 136 137 138 139
  if (terminated_) {
    return;
  }

  terminated_ = true;
  sync_barrier_.wait();

140 141 142
  for (unsigned int i = 0; i < num_threads_; i++) {
    if (reuse_thread_ && i == 0) {
      continue;
143
    }
144
    memory_.thread_for(i).join();
145 146 147
  }
}

148
thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); }
149

150 151 152 153
void scheduler::sync() {
  thread_state::get().get_task_manager().sync();
}

154 155
}
}
156
}