scheduler.h 6.71 KB
Newer Older
1 2 3 4

#ifndef PLS_SCHEDULER_H
#define PLS_SCHEDULER_H

5
#include <atomic>
6 7
#include <thread>
#include <vector>
8
#include <memory>
9

10 11
#include "pls/internal/build_flavour.h"

12
#include "pls/internal/base/barrier.h"
13 14
#include "pls/internal/base/stack_allocator.h"

15
#include "pls/internal/scheduling/thread_state.h"
16
#include "pls/internal/scheduling/task_manager.h"
17
#include "pls/internal/scheduling/strain_local_resource.h"
18

19 20
#include "pls/internal/profiling/profiler.h"

21
namespace pls::internal::scheduling {
22 23 24 25 26
/**
 * The scheduler is the central part of the dispatching-framework.
 * It manages a pool of worker threads (creates, sleeps/wakes up, destroys)
 * and allows to execute parallel sections.
 *
27 28 29
 * It works in close relation with the 'task' and 'task_manager' class for scheduling.
 * The task_manager handles the data structure for stealing/resource trading,
 * the scheduler handles the high level execution flow (allowing the stealing implementation to be exchanged).
30
 */
31 32
class scheduler {
 public:
33 34
  /**
   * Initializes a scheduler instance with the given number of threads.
35 36 37 38 39 40
   * This will allocate ALL runtime resources, spawn the  worker threads
   * and put them to sleep, ready to process an upcoming parallel section.
   *
   * The initialization should be seen as a heavy and not very predictable operation.
   * After it is done the scheduler must (if configured correctly) never run out of resources
   * and deliver tight time bounds of randomized work-stealing.
41 42 43
   *
   * @param num_threads The number of worker threads to be created.
   */
44 45 46
  explicit scheduler(unsigned int num_threads,
                     size_t computation_depth,
                     size_t stack_size,
47
                     bool reuse_thread = true,
48
                     size_t serial_stack_size = 4096 * 1);
49 50 51 52 53 54

  template<typename ALLOC>
  explicit scheduler(unsigned int num_threads,
                     size_t computation_depth,
                     size_t stack_size,
                     bool reuse_thread,
55
                     size_t serial_stack_size,
56
                     ALLOC &&stack_allocator);
57 58 59

  /**
   * The scheduler is implicitly terminated as soon as it leaves the scope.
60
   * Resources follow a clean RAII style.
61
   */
62 63 64 65 66
  ~scheduler();

  /**
   * Wakes up the thread pool.
   * Code inside the Function lambda can invoke all parallel APIs.
67 68
   * This is meant to cleanly sleep and wake up the scheduler during an application run,
   * e.g. to run parallel code on a timer loop/after interrupts.
69 70 71 72 73 74
   *
   * @param work_section generic function or lambda to be executed in the scheduler's context.
   */
  template<typename Function>
  void perform_work(Function work_section);

75 76 77 78 79 80 81 82 83
  /**
   * Main parallelism construct, spawns a function for potential parallel execution.
   *
   * The result of the spawned function must not be relied on until sync() is called.
   * Best see the lambda as if executed on a thread, e.g. it can cause race conditions
   * and it is only finished after you join it back into the parent thread using sync().
   *
   * @param lambda the lambda to be executed in parallel.
   */
84
  template<typename Function>
85 86 87 88
  static void spawn(Function &&lambda) {
#ifdef PLS_SERIAL_ELUSION
    lambda();
#else
89 90 91 92 93
    if (thread_state::is_scheduler_active()) {
      spawn_internal(std::forward<Function>(lambda));
    } else {
      lambda();
    }
94 95
#endif
  }
96 97 98 99

  /**
   * Waits for all potentially parallel child tasks created with spawn(...).
   */
100 101 102 103
  static void sync() {
#ifdef PLS_SERIAL_ELUSION
    return;
#else
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
    if (thread_state::is_scheduler_active()) {
      sync_internal();
    } else {
      return;
    }
#endif
  }

  /**
   * Equivialent to calling spawn(lambda); sync();
   *
   * Faster than the direct notation, as stealing the continuation before the sync is not useful and only overhead.
 */
  template<typename Function>
  static void spawn_and_sync(Function &&lambda) {
#ifdef PLS_SERIAL_ELUSION
    lambda();
#else
    if (thread_state::is_scheduler_active()) {
123 124 125 126 127
#if PLS_PROFILING_ENABLED
      // no nee to re-do the profiling annotations for the optimized path
      spawn_internal(std::forward<Function>(lambda));
      sync_internal();
#else
128
      spawn_and_sync_internal(std::forward<Function>(lambda));
129
#endif
130 131 132
    } else {
      lambda();
    }
133 134
#endif
  }
135

136
  /**
137 138 139 140 141 142 143 144
   * Runs a function in the tail call portion of the stack.
   * Can have nested parallelism if the scheduler is configured accordingly.
   */
  template<typename Function>
  static void serial(Function &&lambda) {
#ifdef PLS_SERIAL_ELUSION
    lambda();
#else
145 146 147 148 149
    if (thread_state::is_scheduler_active()) {
      serial_internal(std::forward<Function>(lambda));
    } else {
      lambda();
    }
150 151 152 153
#endif
  }

  /**
154
   * Explicitly terminate the worker threads. Scheduler must not be used after this.
155
   */
156
  void terminate();
157

158
  [[nodiscard]] unsigned int num_threads() const { return num_threads_; }
159
  [[nodiscard]] static base_task *get_trade_task(base_task *stolen_task, thread_state &calling_state);
160 161 162 163 164 165 166

  static bool check_task_chain_forward(base_task &start_task);
  static bool check_task_chain_backward(base_task &start_task);
  static bool check_task_chain(base_task &start_task);

  thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; }
  task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; }
167

FritzFlorian committed
168 169 170 171 172 173
#if PLS_PROFILING_ENABLED
  profiling::profiler &get_profiler() {
    return profiler_;
  }
#endif

174
 private:
175 176
  template<typename Function>
  static void spawn_internal(Function &&lambda);
177 178
  template<typename Function>
  static void spawn_and_sync_internal(Function &&lambda);
179
  static void sync_internal();
180 181
  template<typename Function>
  static void serial_internal(Function &&lambda);
182

183
  static context_switcher::continuation slow_return(thread_state &calling_state, bool in_sync);
184

185 186
  static void work_thread_main_loop();
  void work_thread_work_section();
187

188 189 190 191
  const unsigned int num_threads_;
  const bool reuse_thread_;
  base::barrier sync_barrier_;

192 193 194 195
  std::vector<std::thread> worker_threads_;
  std::vector<std::unique_ptr<task_manager>> task_managers_;
  std::vector<std::unique_ptr<thread_state>> thread_states_;

196 197 198 199 200 201 202 203
  class init_function;
  template<typename F>
  class init_function_impl;

  init_function *main_thread_starter_function_;
  std::atomic<bool> work_section_done_;

  bool terminated_;
204

205
  std::shared_ptr<base::stack_allocator> stack_allocator_;
206

207 208
  size_t serial_stack_size_;

209 210 211 212
#if PLS_PROFILING_ENABLED
  profiling::profiler profiler_;
#endif

213 214 215 216 217 218
#if PLS_SLEEP_WORKERS_ON_EMPTY
  PLS_CACHE_ALIGN std::atomic<int32_t> empty_queue_counter_{0};

  void empty_queue_try_sleep_worker();
  void empty_queue_increase_counter();
  void empty_queue_decrease_counter_and_wake();
219
  void empty_queue_reset_and_wake_all();
220
#endif
221 222 223
};

}
224
#include "scheduler_impl.h"
225 226

#endif //PLS_SCHEDULER_H