scheduler.cpp 2.45 KB
Newer Older
1
#include "pls/internal/scheduling/scheduler.h"
2 3 4
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h"

5
#include "pls/internal/base/error_handling.h"
6 7

namespace pls {
8 9 10
namespace internal {
namespace scheduling {

11
scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, bool reuse_thread) :
12
    num_threads_{num_threads},
13
    reuse_thread_{reuse_thread},
14
    memory_{memory},
15
    sync_barrier_{num_threads + 1 - reuse_thread},
16
    terminated_{false} {
17
  if (num_threads_ > memory.max_threads()) {
18 19 20 21 22
    PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
  }

  for (unsigned int i = 0; i < num_threads_; i++) {
    // Placement new is required, as the memory of `memory_` is not required to be initialized.
23 24
    memory.thread_state_for(i).scheduler_ = this;
    memory.thread_state_for(i).id_ = i;
25 26 27 28

    if (reuse_thread && i == 0) {
      continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
    }
29
    memory.thread_for(i) = base::thread(&scheduler::work_thread_main_loop, &memory_.thread_state_for(i));
30 31 32 33 34 35 36
  }
}

scheduler::~scheduler() {
  terminate();
}

37 38
void scheduler::work_thread_main_loop() {
  auto &scheduler = thread_state::get().get_scheduler();
39
  while (true) {
40
    // Wait to be triggered
41
    scheduler.sync_barrier_.wait();
42 43

    // Check for shutdown
44
    if (scheduler.terminated_) {
45 46 47
      return;
    }

48
    scheduler.work_thread_work_section();
49

50
    // Sync back with main thread
51 52 53 54 55 56 57 58 59 60
    scheduler.sync_barrier_.wait();
  }
}

void scheduler::work_thread_work_section() {
  auto &my_state = thread_state::get();

  if (my_state.get_id() == 0) {
    // Main Thread, kick of by executing the user's main code block.
    main_thread_starter_function_->run();
61
  }
62 63 64 65 66 67 68 69 70

  do {
    // TODO: Implement other threads, for now we are happy if it compiles and runs on one thread
    //       For now we can test without this, as the fast path should never hit this.
    // 1) Try Steal
    // 2) Copy Over
    // 3) Finish Steal
    // 4) Execute Local Copy
  } while (!work_section_done_);
71 72
}

73
void scheduler::terminate() {
74 75 76 77 78 79 80
  if (terminated_) {
    return;
  }

  terminated_ = true;
  sync_barrier_.wait();

81 82 83
  for (unsigned int i = 0; i < num_threads_; i++) {
    if (reuse_thread_ && i == 0) {
      continue;
84
    }
85
    memory_.thread_for(i).join();
86 87 88
  }
}

89
thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); }
90

91 92
}
}
93
}