diff --git a/app/benchmark_unbalanced/main.cpp b/app/benchmark_unbalanced/main.cpp index 1b5cda6..e3e29d4 100644 --- a/app/benchmark_unbalanced/main.cpp +++ b/app/benchmark_unbalanced/main.cpp @@ -19,15 +19,14 @@ int count_child_nodes(uts::node &node) { return child_count; } - auto current_task = pls::task::current(); std::vector results(children.size()); for (size_t i = 0; i < children.size(); i++) { size_t index = i; auto lambda = [&, index] { results[index] = count_child_nodes(children[index]); }; pls::lambda_task_by_value sub_task(lambda); - current_task->spawn_child(sub_task); + pls::scheduler::spawn_child(sub_task); } - current_task->wait_for_all(); + pls::scheduler::wait_for_all(); for (auto result : results) { child_count += result; } @@ -36,43 +35,41 @@ int count_child_nodes(uts::node &node) { } int unbalanced_tree_search(int seed, int root_children, double q, int normal_children) { - static auto id = pls::unique_id::create(42); int result; auto lambda = [&] { uts::node root(seed, root_children, q, normal_children); result = count_child_nodes(root); }; - pls::lambda_task_by_reference task(lambda); pls::lambda_task_by_reference sub_task(lambda); - pls::task root_task{&sub_task, id}; - pls::scheduler::execute_task(root_task); + pls::scheduler::spawn_child(sub_task); + pls::scheduler::wait_for_all(); return result; } -// -//int main() { -// PROFILE_ENABLE -// pls::internal::helpers::run_mini_benchmark([&] { -// unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN); -// }, 8, 4000); -// -// PROFILE_SAVE("test_profile.prof") -//} int main() { PROFILE_ENABLE - pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18}; - pls::scheduler scheduler{&my_scheduler_memory, 8}; - - scheduler.perform_work([&] { - PROFILE_MAIN_THREAD - for (int i = 0; i < 50; i++) { - PROFILE_WORK_BLOCK("Top Level") - int result = unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN); - std::cout << result << std::endl; - } - }); + pls::internal::helpers::run_mini_benchmark([&] { + unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN); + }, 8, 2000); PROFILE_SAVE("test_profile.prof") } + +//int main() { +// PROFILE_ENABLE +// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18}; +// pls::scheduler scheduler{&my_scheduler_memory, 8}; +// +// scheduler.perform_work([&] { +// PROFILE_MAIN_THREAD +// for (int i = 0; i < 50; i++) { +// PROFILE_WORK_BLOCK("Top Level") +// int result = unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN); +// std::cout << result << std::endl; +// } +// }); +// +// PROFILE_SAVE("test_profile.prof") +//} diff --git a/app/invoke_parallel/main.cpp b/app/invoke_parallel/main.cpp index e469bad..4382168 100644 --- a/app/invoke_parallel/main.cpp +++ b/app/invoke_parallel/main.cpp @@ -91,7 +91,7 @@ int main() { PROFILE_MAIN_THREAD // Call looks just the same, only requirement is // the enclosure in the perform_work lambda. - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 10; i++) { PROFILE_WORK_BLOCK("Top Level FFT") complex_vector input = initial_input; fft(input.begin(), input.size()); diff --git a/lib/pls/include/pls/internal/scheduling/lambda_task.h b/lib/pls/include/pls/internal/scheduling/lambda_task.h new file mode 100644 index 0000000..fb6cc4a --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/lambda_task.h @@ -0,0 +1,41 @@ + +#ifndef PLS_LAMBDA_TASK_H_ +#define PLS_LAMBDA_TASK_H_ + +#include "pls/internal/scheduling/task.h" + +namespace pls { +namespace internal { +namespace scheduling { + +template +class lambda_task_by_reference : public task { + const Function &function_; + + public: + explicit lambda_task_by_reference(const Function &function) : task{}, function_{function} {}; + + protected: + void execute_internal() override { + function_(); + } +}; + +template +class lambda_task_by_value : public task { + const Function function_; + + public: + explicit lambda_task_by_value(const Function &function) : task{}, function_{function} {}; + + protected: + void execute_internal() override { + function_(); + } +}; + +} +} +} + +#endif //PLS_LAMBDA_TASK_H_ diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 3768e67..f55fecf 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -22,12 +22,23 @@ namespace scheduling { using scheduler_thread = base::thread; +/** + * The scheduler is the central part of the dispatching-framework. + * It manages a pool of worker threads (creates, sleeps/wakes up, destroys) + * and allows to execute parallel sections. + * + * It works in close rellation with the 'task' class for scheduling. + */ class scheduler { friend class task; const unsigned int num_threads_; scheduler_memory *memory_; base::barrier sync_barrier_; + + task *main_thread_root_task_; + bool work_section_done_; + bool terminated_; public: /** @@ -85,6 +96,9 @@ class scheduler { task *get_local_task(); task *steal_task(); + + bool try_execute_local(); + bool try_execute_stolen(); }; } diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index b2157b9..46265b7 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -2,35 +2,30 @@ #ifndef PLS_SCHEDULER_IMPL_H #define PLS_SCHEDULER_IMPL_H +#include "pls/internal/scheduling/lambda_task.h" + namespace pls { namespace internal { namespace scheduling { +// TODO: generally look into the performance implications of using many thread_state::get() calls + template void scheduler::perform_work(Function work_section) { PROFILE_WORK_BLOCK("scheduler::perform_work") -// root_task master{work_section}; -// -// // Push root task on stacks -// auto new_master = memory_->task_stack_for(0)->push(master); -// memory_->thread_state_for(0)->root_task_ = new_master; -// memory_->thread_state_for(0)->current_task_ = new_master; -// for (unsigned int i = 1; i < num_threads_; i++) { -// root_worker_task worker{new_master}; -// auto new_worker = memory_->task_stack_for(0)->push(worker); -// memory_->thread_state_for(i)->root_task_ = new_worker; -// memory_->thread_state_for(i)->current_task_ = new_worker; -// } -// -// // Perform and wait for work -// sync_barrier_.wait(); // Trigger threads to wake up -// sync_barrier_.wait(); // Wait for threads to finish + +// if (execute_main_thread) { +// work_section(); // -// // Clean up stack -// memory_->task_stack_for(0)->pop(); -// for (unsigned int i = 1; i < num_threads_; i++) { -// root_worker_task worker{new_master}; -// memory_->task_stack_for(0)->pop(); +// sync_barrier_.wait(); // Trigger threads to wake up +// sync_barrier_.wait(); // Wait for threads to finish +// } else { + lambda_task_by_reference root_task{work_section}; + main_thread_root_task_ = &root_task; + work_section_done_ = false; + + sync_barrier_.wait(); // Trigger threads to wake up + sync_barrier_.wait(); // Wait for threads to finish // } } @@ -39,12 +34,6 @@ void scheduler::spawn_child(T &sub_task) { thread_state::get()->current_task_->spawn_child(sub_task); } -void scheduler::wait_for_all() { - thread_state::get()->current_task_->wait_for_all(); -} - -thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); } - } } } diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index ed4d928..b11ec5f 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -39,14 +39,12 @@ class task { private: void execute(); - bool try_execute_local(); - bool try_execute_stolen(); }; template void task::spawn_child(T &sub_task) { PROFILE_FORK_JOIN_STEALING("spawn_child") - static_assert(std::is_base_of::value, "Only pass task subclasses!"); + static_assert(std::is_base_of::value, "Only pass task subclasses!"); // Keep our refcount up to date ref_count_++; diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 3b08c5c..814c0cc 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -19,7 +19,6 @@ class task; struct thread_state { alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_; - alignas(base::system_details::CACHE_LINE_SIZE) task *root_task_; alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_; alignas(base::system_details::CACHE_LINE_SIZE) data_structures::work_stealing_deque deque_; @@ -28,7 +27,6 @@ struct thread_state { thread_state() : scheduler_{nullptr}, - root_task_{nullptr}, current_task_{nullptr}, task_stack_{nullptr}, deque_{task_stack_}, @@ -37,7 +35,6 @@ struct thread_state { thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) : scheduler_{scheduler}, - root_task_{nullptr}, current_task_{nullptr}, task_stack_{task_stack}, deque_{task_stack_}, diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index a636103..7f725b2 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -30,19 +30,37 @@ scheduler::~scheduler() { } void scheduler::worker_routine() { - auto my_state = base::this_thread::state(); + auto my_state = thread_state::get(); + auto scheduler = my_state->scheduler_; while (true) { - my_state->scheduler_->sync_barrier_.wait(); - if (my_state->scheduler_->terminated_) { + // Wait to be triggered + scheduler->sync_barrier_.wait(); + + // Check for shutdown + if (scheduler->terminated_) { return; } - // The root task must only return when all work is done, - // because of this a simple call is enough to ensure the - // fork-join-section is done (logically joined back into our main thread). - my_state->root_task_->execute(); + // Execute work + if (my_state->id_ == 0) { + // Main Thread + auto root_task = scheduler->main_thread_root_task_; + root_task->parent_ = nullptr; + root_task->deque_state_ = my_state->deque_.save_state(); + + root_task->execute(); + scheduler->work_section_done_ = true; + } else { + // Worker Threads + while (!scheduler->work_section_done_) { + if (!scheduler->try_execute_local()) { + scheduler->try_execute_stolen(); + } + } + } + // Sync back with main thread my_state->scheduler_->sync_barrier_.wait(); } } @@ -100,6 +118,33 @@ task *scheduler::steal_task() { return nullptr; } +bool scheduler::try_execute_local() { + task *local_task = get_local_task(); + if (local_task != nullptr) { + local_task->execute(); + return true; + } else { + return false; + } +} + +bool scheduler::try_execute_stolen() { + task *stolen_task = steal_task(); + if (stolen_task != nullptr) { + stolen_task->deque_state_ = thread_state::get()->deque_.save_state(); + stolen_task->execute(); + return true; + } + + return false; +} + +void scheduler::wait_for_all() { + thread_state::get()->current_task_->wait_for_all(); +} + +thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); } + } } } diff --git a/lib/pls/src/internal/scheduling/task.cpp b/lib/pls/src/internal/scheduling/task.cpp index 118e409..7c06bc7 100644 --- a/lib/pls/src/internal/scheduling/task.cpp +++ b/lib/pls/src/internal/scheduling/task.cpp @@ -36,31 +36,12 @@ void task::execute() { } } -bool task::try_execute_local() { - task *local_task = thread_state::get()->scheduler_->get_local_task(); - if (local_task != nullptr) { - local_task->execute(); - return true; - } else { - return false; - } -} - -bool task::try_execute_stolen() { - task *stolen_task = thread_state::get()->scheduler_->steal_task(); - if (stolen_task != nullptr) { - stolen_task->deque_state_ = thread_state::get()->deque_.save_state(); - stolen_task->execute(); - return true; - } - - return false; -} - void task::wait_for_all() { + auto scheduler = thread_state::get()->scheduler_; + while (ref_count_ > 0) { - if (!try_execute_local()) { - try_execute_stolen(); + if (!scheduler->try_execute_local()) { + scheduler->try_execute_stolen(); } } thread_state::get()->deque_.release_memory_until(deque_state_);