scheduler.cpp 3.14 KB
Newer Older
1
#include "pls/internal/scheduling/scheduler.h"
2 3

#include "pls/internal/scheduling/task_manager.h"
4 5
#include "pls/internal/scheduling/thread_state.h"

6
#include "pls/internal/base/thread.h"
7
#include "pls/internal/base/error_handling.h"
8 9

namespace pls {
10 11 12
namespace internal {
namespace scheduling {

13
scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, bool reuse_thread) :
14
    num_threads_{num_threads},
15
    reuse_thread_{reuse_thread},
16
    memory_{memory},
17
    sync_barrier_{num_threads + 1 - reuse_thread},
18
    terminated_{false} {
19
  if (num_threads_ > memory.max_threads()) {
20 21 22 23 24
    PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
  }

  for (unsigned int i = 0; i < num_threads_; i++) {
    // Placement new is required, as the memory of `memory_` is not required to be initialized.
25 26
    memory.thread_state_for(i).set_scheduler(this);
    memory.thread_state_for(i).set_id(i);
27
    memory.thread_state_for(i).get_task_manager().set_thread_id(i);
28 29 30 31

    if (reuse_thread && i == 0) {
      continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
    }
32
    memory.thread_for(i) = base::thread(&scheduler::work_thread_main_loop, &memory_.thread_state_for(i));
33 34 35 36 37 38 39
  }
}

scheduler::~scheduler() {
  terminate();
}

40 41
void scheduler::work_thread_main_loop() {
  auto &scheduler = thread_state::get().get_scheduler();
42
  while (true) {
43
    // Wait to be triggered
44
    scheduler.sync_barrier_.wait();
45 46

    // Check for shutdown
47
    if (scheduler.terminated_) {
48 49 50
      return;
    }

51
    scheduler.work_thread_work_section();
52

53
    // Sync back with main thread
54 55 56 57 58 59
    scheduler.sync_barrier_.wait();
  }
}

void scheduler::work_thread_work_section() {
  auto &my_state = thread_state::get();
60
  auto &my_task_manager = my_state.get_task_manager();
61 62 63

  auto const num_threads = my_state.get_scheduler().num_threads();
  auto const my_id = my_state.get_id();
64 65

  if (my_state.get_id() == 0) {
66
    // Main Thread, kick off by executing the user's main code block.
67
    main_thread_starter_function_->run();
68
  }
69 70

  do {
71 72
    // Steal Routine (will be continuously executed when there are no more fall through's).
    // TODO: move into separate function
73 74 75 76 77 78 79 80 81 82 83
//    const size_t offset = my_state.get_rand() % num_threads;
//    const size_t max_tries = num_threads;
//    for (size_t i = 0; i < max_tries; i++) {
//      size_t target = (offset + i) % num_threads;
//      auto &target_state = my_state.get_scheduler().thread_state_for(target);
//
//      auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager);
//      if (stolen_task != nullptr) {
//        stolen_task->execute();
//      }
//    }
84 85 86
//    if (!my_cont_manager.falling_through()) {
//      base::this_thread::sleep(5);
//    }
87
  } while (!work_section_done_);
88 89
}

90
void scheduler::terminate() {
91 92 93 94 95 96 97
  if (terminated_) {
    return;
  }

  terminated_ = true;
  sync_barrier_.wait();

98 99 100
  for (unsigned int i = 0; i < num_threads_; i++) {
    if (reuse_thread_ && i == 0) {
      continue;
101
    }
102
    memory_.thread_for(i).join();
103 104 105
  }
}

106
thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); }
107

108 109
}
}
110
}