scheduler.cpp 4.14 KB
Newer Older
1
#include "pls/internal/scheduling/scheduler.h"
2 3 4
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h"

5
#include "pls/internal/base/error_handling.h"
6 7

namespace pls {
8 9 10
namespace internal {
namespace scheduling {

11
scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads, bool reuse_thread) :
12
    num_threads_{num_threads},
13
    reuse_thread_{reuse_thread},
14
    memory_{memory},
15
    sync_barrier_{num_threads + 1 - reuse_thread},
16 17 18 19 20 21 22 23
    terminated_{false} {
  if (num_threads_ > memory_->max_threads()) {
    PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
  }

  for (unsigned int i = 0; i < num_threads_; i++) {
    // Placement new is required, as the memory of `memory_` is not required to be initialized.
    new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
24 25 26 27

    if (reuse_thread && i == 0) {
      continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
    }
28
    new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&scheduler::worker_routine,
29
                                                                               memory_->thread_state_for(i));
30

31 32 33 34 35 36 37
  }
}

scheduler::~scheduler() {
  terminate();
}

38
void scheduler::worker_routine() {
39 40
  auto my_state = thread_state::get();
  auto scheduler = my_state->scheduler_;
41 42

  while (true) {
43 44 45 46 47
    // Wait to be triggered
    scheduler->sync_barrier_.wait();

    // Check for shutdown
    if (scheduler->terminated_) {
48 49 50
      return;
    }

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
    // Execute work
    if (my_state->id_ == 0) {
      // Main Thread
      auto root_task = scheduler->main_thread_root_task_;
      root_task->parent_ = nullptr;
      root_task->deque_state_ = my_state->deque_.save_state();

      root_task->execute();
      scheduler->work_section_done_ = true;
    } else {
      // Worker Threads
      while (!scheduler->work_section_done_) {
        if (!scheduler->try_execute_local()) {
          scheduler->try_execute_stolen();
        }
      }
    }
68

69
    // Sync back with main thread
70 71 72 73 74 75 76 77 78 79 80 81 82 83
    my_state->scheduler_->sync_barrier_.wait();
  }
}

void scheduler::terminate(bool wait_for_workers) {
  if (terminated_) {
    return;
  }

  terminated_ = true;
  sync_barrier_.wait();

  if (wait_for_workers) {
    for (unsigned int i = 0; i < num_threads_; i++) {
84 85 86
      if (reuse_thread_ && i == 0) {
        continue;
      }
87
      memory_->thread_for(i)->join();
88
    }
89 90 91
  }
}

92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
task *scheduler::get_local_task() {
  PROFILE_STEALING("Get Local Task")
  return thread_state::get()->deque_.pop_tail();
}

task *scheduler::steal_task() {
  PROFILE_STEALING("Steal Task")

  // Data for victim selection
  const auto my_state = thread_state::get();

  const auto my_id = my_state->id_;
  const size_t offset = my_state->random_() % num_threads();
  const size_t max_tries = num_threads(); // TODO: Tune this value

  // Current strategy: random start, then round robin from there
  for (size_t i = 0; i < max_tries; i++) {
    size_t target = (offset + i) % num_threads();

    // Skip our self for stealing
112
    target = ((target == my_id) + target) % num_threads();
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127

    auto target_state = thread_state_for(target);
    // TODO: See if we should re-try popping if it failed due to contention
    auto result = target_state->deque_.pop_head();
    if (result != nullptr) {
      return result;
    }

    // TODO: See if we should backoff here (per missed steal)
  }

  // TODO: See if we should backoff here (after a 'round' of missed steals)
  return nullptr;
}

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
bool scheduler::try_execute_local() {
  task *local_task = get_local_task();
  if (local_task != nullptr) {
    local_task->execute();
    return true;
  } else {
    return false;
  }
}

bool scheduler::try_execute_stolen() {
  task *stolen_task = steal_task();
  if (stolen_task != nullptr) {
    stolen_task->deque_state_ = thread_state::get()->deque_.save_state();
    stolen_task->execute();
    return true;
  }

  return false;
}

void scheduler::wait_for_all() {
  thread_state::get()->current_task_->wait_for_all();
}

thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); }

155 156
}
}
157
}