scheduler.cpp 2.9 KB
Newer Older
1
#include "pls/internal/scheduling/scheduler.h"
2 3 4
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h"

5
#include "pls/internal/base/error_handling.h"
6 7

namespace pls {
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
namespace internal {
namespace scheduling {

scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) :
    num_threads_{num_threads},
    memory_{memory},
    sync_barrier_{num_threads + 1},
    terminated_{false} {
  if (num_threads_ > memory_->max_threads()) {
    PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
  }

  for (unsigned int i = 0; i < num_threads_; i++) {
    // Placement new is required, as the memory of `memory_` is not required to be initialized.
    new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
23
    new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&scheduler::worker_routine,
24 25 26 27 28 29 30 31
                                                                               memory_->thread_state_for(i));
  }
}

scheduler::~scheduler() {
  terminate();
}

32
void scheduler::worker_routine() {
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
  auto my_state = base::this_thread::state<thread_state>();

  while (true) {
    my_state->scheduler_->sync_barrier_.wait();
    if (my_state->scheduler_->terminated_) {
      return;
    }

    // The root task must only return when all work is done,
    // because of this a simple call is enough to ensure the
    // fork-join-section is done (logically joined back into our main thread).
    my_state->root_task_->execute();

    my_state->scheduler_->sync_barrier_.wait();
  }
}

void scheduler::terminate(bool wait_for_workers) {
  if (terminated_) {
    return;
  }

  terminated_ = true;
  sync_barrier_.wait();

  if (wait_for_workers) {
    for (unsigned int i = 0; i < num_threads_; i++) {
      memory_->thread_for(i)->join();
61
    }
62 63 64
  }
}

65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
task *scheduler::get_local_task() {
  PROFILE_STEALING("Get Local Task")
  return thread_state::get()->deque_.pop_tail();
}

task *scheduler::steal_task() {
  PROFILE_STEALING("Steal Task")

  // Data for victim selection
  const auto my_state = thread_state::get();

  const auto my_id = my_state->id_;
  const size_t offset = my_state->random_() % num_threads();
  const size_t max_tries = num_threads(); // TODO: Tune this value

  // Current strategy: random start, then round robin from there
  for (size_t i = 0; i < max_tries; i++) {
    size_t target = (offset + i) % num_threads();

    // Skip our self for stealing
    if (target == my_id) {
      continue;
    }

    auto target_state = thread_state_for(target);
    // TODO: See if we should re-try popping if it failed due to contention
    auto result = target_state->deque_.pop_head();
    if (result != nullptr) {
      return result;
    }

    // TODO: See if we should backoff here (per missed steal)
  }

  // TODO: See if we should backoff here (after a 'round' of missed steals)
  return nullptr;
}

103 104
}
}
105
}