#ifndef PLS_SCHEDULER_H #define PLS_SCHEDULER_H #include #include #include #include #include "pls/internal/build_flavour.h" #include "pls/internal/base/barrier.h" #include "pls/internal/base/stack_allocator.h" #include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/strain_local_resource.h" #include "pls/internal/profiling/profiler.h" namespace pls::internal::scheduling { /** * The scheduler is the central part of the dispatching-framework. * It manages a pool of worker threads (creates, sleeps/wakes up, destroys) * and allows to execute parallel sections. * * It works in close relation with the 'task' and 'task_manager' class for scheduling. * The task_manager handles the data structure for stealing/resource trading, * the scheduler handles the high level execution flow (allowing the stealing implementation to be exchanged). */ class scheduler { public: /** * Initializes a scheduler instance with the given number of threads. * This will allocate ALL runtime resources, spawn the worker threads * and put them to sleep, ready to process an upcoming parallel section. * * The initialization should be seen as a heavy and not very predictable operation. * After it is done the scheduler must (if configured correctly) never run out of resources * and deliver tight time bounds of randomized work-stealing. * * @param num_threads The number of worker threads to be created. */ explicit scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread = true, size_t serial_stack_size = 4096 * 1); template explicit scheduler(unsigned int num_threads, size_t computation_depth, size_t stack_size, bool reuse_thread, size_t serial_stack_size, ALLOC &&stack_allocator); /** * The scheduler is implicitly terminated as soon as it leaves the scope. * Resources follow a clean RAII style. */ ~scheduler(); /** * Wakes up the thread pool. * Code inside the Function lambda can invoke all parallel APIs. * This is meant to cleanly sleep and wake up the scheduler during an application run, * e.g. to run parallel code on a timer loop/after interrupts. * * @param work_section generic function or lambda to be executed in the scheduler's context. */ template void perform_work(Function work_section); /** * Main parallelism construct, spawns a function for potential parallel execution. * * The result of the spawned function must not be relied on until sync() is called. * Best see the lambda as if executed on a thread, e.g. it can cause race conditions * and it is only finished after you join it back into the parent thread using sync(). * * @param lambda the lambda to be executed in parallel. */ template static void spawn(Function &&lambda) { #ifdef PLS_SERIAL_ELUSION lambda(); #else if (thread_state::is_scheduler_active()) { spawn_internal(std::forward(lambda)); } else { lambda(); } #endif } /** * Waits for all potentially parallel child tasks created with spawn(...). */ static void sync() { #ifdef PLS_SERIAL_ELUSION return; #else if (thread_state::is_scheduler_active()) { sync_internal(); } else { return; } #endif } /** * Equivialent to calling spawn(lambda); sync(); * * Faster than the direct notation, as stealing the continuation before the sync is not useful and only overhead. */ template static void spawn_and_sync(Function &&lambda) { #ifdef PLS_SERIAL_ELUSION lambda(); #else if (thread_state::is_scheduler_active()) { #if PLS_PROFILING_ENABLED // no nee to re-do the profiling annotations for the optimized path spawn_internal(std::forward(lambda)); sync_internal(); #else spawn_and_sync_internal(std::forward(lambda)); #endif } else { lambda(); } #endif } /** * Runs a function in the tail call portion of the stack. * Can have nested parallelism if the scheduler is configured accordingly. */ template static void serial(Function &&lambda) { #ifdef PLS_SERIAL_ELUSION lambda(); #else if (thread_state::is_scheduler_active()) { serial_internal(std::forward(lambda)); } else { lambda(); } #endif } /** * Explicitly terminate the worker threads. Scheduler must not be used after this. */ void terminate(); [[nodiscard]] unsigned int num_threads() const { return num_threads_; } [[nodiscard]] static base_task *get_trade_task(base_task *stolen_task, thread_state &calling_state); static bool check_task_chain_forward(base_task &start_task); static bool check_task_chain_backward(base_task &start_task); static bool check_task_chain(base_task &start_task); thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; } task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; } #if PLS_PROFILING_ENABLED profiling::profiler &get_profiler() { return profiler_; } #endif private: template static void spawn_internal(Function &&lambda); template static void spawn_and_sync_internal(Function &&lambda); static void sync_internal(); template static void serial_internal(Function &&lambda); static context_switcher::continuation slow_return(thread_state &calling_state, bool in_sync); static void work_thread_main_loop(); void work_thread_work_section(); const unsigned int num_threads_; const bool reuse_thread_; base::barrier sync_barrier_; std::vector worker_threads_; std::vector> task_managers_; std::vector> thread_states_; class init_function; template class init_function_impl; init_function *main_thread_starter_function_; std::atomic work_section_done_; bool terminated_; std::shared_ptr stack_allocator_; size_t serial_stack_size_; #if PLS_PROFILING_ENABLED profiling::profiler profiler_; #endif #if PLS_SLEEP_WORKERS_ON_EMPTY PLS_CACHE_ALIGN std::atomic empty_queue_counter_{0}; void empty_queue_try_sleep_worker(); void empty_queue_increase_counter(); void empty_queue_decrease_counter_and_wake(); void empty_queue_reset_and_wake_all(); #endif }; } #include "scheduler_impl.h" #endif //PLS_SCHEDULER_H