scheduler.cpp 5.26 KB
Newer Older
1
#include "pls/internal/scheduling/scheduler.h"
2

3 4
#include "context_switcher/context_switcher.h"

5
#include "pls/internal/scheduling/task_manager.h"
6 7
#include "pls/internal/scheduling/thread_state.h"

8
#include "pls/internal/base/thread.h"
9
#include "pls/internal/base/error_handling.h"
10 11

namespace pls {
12 13 14
namespace internal {
namespace scheduling {

15
scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, bool reuse_thread) :
16
    num_threads_{num_threads},
17
    reuse_thread_{reuse_thread},
18
    memory_{memory},
19
    sync_barrier_{num_threads + 1 - reuse_thread},
20
    terminated_{false} {
21
  if (num_threads_ > memory.max_threads()) {
22 23 24 25 26
    PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
  }

  for (unsigned int i = 0; i < num_threads_; i++) {
    // Placement new is required, as the memory of `memory_` is not required to be initialized.
27 28
    memory.thread_state_for(i).set_scheduler(this);
    memory.thread_state_for(i).set_id(i);
29
    memory.thread_state_for(i).get_task_manager().set_thread_id(i);
30 31 32 33

    if (reuse_thread && i == 0) {
      continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
    }
34
    memory.thread_for(i) = base::thread(&scheduler::work_thread_main_loop, &memory_.thread_state_for(i));
35 36 37 38 39 40 41
  }
}

scheduler::~scheduler() {
  terminate();
}

42 43
void scheduler::work_thread_main_loop() {
  auto &scheduler = thread_state::get().get_scheduler();
44
  while (true) {
45
    // Wait to be triggered
46
    scheduler.sync_barrier_.wait();
47 48

    // Check for shutdown
49
    if (scheduler.terminated_) {
50 51 52
      return;
    }

53
    scheduler.work_thread_work_section();
54

55
    // Sync back with main thread
56 57 58 59 60 61
    scheduler.sync_barrier_.wait();
  }
}

void scheduler::work_thread_work_section() {
  auto &my_state = thread_state::get();
62
  auto &my_task_manager = my_state.get_task_manager();
63 64 65

  auto const num_threads = my_state.get_scheduler().num_threads();
  auto const my_id = my_state.get_id();
66 67

  if (my_state.get_id() == 0) {
68
    // Main Thread, kick off by executing the user's main code block.
69
    main_thread_starter_function_->run();
70
  }
71

72
  while (!work_section_done_) {
73 74
    // Steal Routine (will be continuously executed when there are no more fall through's).
    // TODO: move into separate function
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
    const size_t offset = my_state.get_rand() % num_threads;
    const size_t max_tries = num_threads;
    for (size_t i = 0; i < max_tries; i++) {
      // Perform steal
      size_t target = (offset + i) % num_threads;
      auto &target_state = my_state.get_scheduler().thread_state_for(target);
      auto *stolen_task = target_state.get_task_manager().steal_task(my_task_manager);

      // Handle successful steal
      if (stolen_task != nullptr) {
        // Adapt our task chain
        // Note: This differs from how it worked before. The aquiring of new chains happens
        //       right at the steal. Whenever we start to work on an continuation we aquire the full
        //       'dirty' chain below it. We fix this up at the sync points later on by popping of the resource stack.
        auto *exchanged_task = &my_task_manager.get_active_task();
        for (unsigned j = 0; j < stolen_task->depth_; j++) {
          exchanged_task = exchanged_task->next_;
        }
        auto *next_own_task = exchanged_task->next_;

        next_own_task->prev_ = stolen_task;
        stolen_task->next_ = next_own_task;

        my_task_manager.set_active_task(stolen_task);

        // move the traded in resource of this active task over to the stack of resources.
        my_task_manager.push_resource_on_task(stolen_task, exchanged_task);
        traded_cas_field empty_field;
        traded_cas_field expected_field;
        expected_field.fill_with_trade_object(exchanged_task);
        if (stolen_task->traded_field_.compare_exchange_strong(expected_field, empty_field)) {
          // All good, nothing more to do
        } else {
          // The last other active thread took it as its spare resource...
          // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
          PLS_ASSERT(expected_field.is_empty(),
                     "Must be empty, as otherwise no one will steal the 'spare traded task'.");

          auto current_root = stolen_task->resource_stack_root_.load();
          current_root.stamp++;
          current_root.value = 0;
          stolen_task->resource_stack_root_.store(current_root);
        }

        // execute the stolen task by jumping to it's continuation.
        PLS_ASSERT(stolen_task->continuation_.valid(),
                   "A task that we can steal must have a valid continuation for us to start working.");
        context_switcher::switch_context(std::move(stolen_task->continuation_));

        // ...now we are done with this steal attempt, loop over.
        break;
      }
    }
128 129 130
//    if (!my_cont_manager.falling_through()) {
//      base::this_thread::sleep(5);
//    }
131
  }
132 133
}

134
void scheduler::terminate() {
135 136 137 138 139 140 141
  if (terminated_) {
    return;
  }

  terminated_ = true;
  sync_barrier_.wait();

142 143 144
  for (unsigned int i = 0; i < num_threads_; i++) {
    if (reuse_thread_ && i == 0) {
      continue;
145
    }
146
    memory_.thread_for(i).join();
147 148 149
  }
}

150
thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); }
151

152 153
}
}
154
}