#include "pls/internal/helpers/profiler.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/fork_join_task.h" namespace pls { namespace internal { namespace scheduling { fork_join_sub_task::fork_join_sub_task() : data_structures::deque_item{}, ref_count_{0}, parent_{nullptr}, tbb_task_{nullptr}, stack_state_{nullptr} {} fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) : data_structures::deque_item(other), ref_count_{0}, parent_{nullptr}, tbb_task_{nullptr}, stack_state_{nullptr} {} void fork_join_sub_task::execute() { PROFILE_WORK_BLOCK("execute sub_task") tbb_task_->currently_executing_ = this; execute_internal(); tbb_task_->currently_executing_ = nullptr; PROFILE_END_BLOCK wait_for_all(); if (parent_ != nullptr) { parent_->ref_count_--; } } void fork_join_sub_task::spawn_child_internal(fork_join_sub_task *sub_task) { // Keep our refcount up to date ref_count_++; // Assign forced values sub_task->parent_ = this; sub_task->tbb_task_ = tbb_task_; sub_task->stack_state_ = tbb_task_->my_stack_->save_state(); tbb_task_->deque_.push_tail(sub_task); } void fork_join_sub_task::wait_for_all() { while (ref_count_ > 0) { PROFILE_STEALING("get local sub task") fork_join_sub_task *local_task = tbb_task_->get_local_sub_task(); PROFILE_END_BLOCK if (local_task != nullptr) { local_task->execute(); } else { // Try to steal work. // External steal will be executed implicitly if success PROFILE_STEALING("steal work") bool internal_steal_success = tbb_task_->steal_work(); PROFILE_END_BLOCK if (internal_steal_success) { tbb_task_->last_stolen_->execute(); } } } tbb_task_->my_stack_->reset_state(stack_state_); } fork_join_sub_task *fork_join_task::get_local_sub_task() { return deque_.pop_tail(); } fork_join_sub_task *fork_join_task::get_stolen_sub_task() { return deque_.pop_head(); } bool fork_join_task::internal_stealing(abstract_task *other_task) { PROFILE_STEALING("fork_join_task::internal_stealin") auto cast_other_task = reinterpret_cast(other_task); auto stolen_sub_task = cast_other_task->get_stolen_sub_task(); if (stolen_sub_task == nullptr) { return false; } else { // Make sub-task belong to our fork_join_task instance stolen_sub_task->tbb_task_ = this; stolen_sub_task->stack_state_ = my_stack_->save_state(); // We will execute this next without explicitly moving it onto our stack storage last_stolen_ = stolen_sub_task; return true; } } bool fork_join_task::split_task(base::spin_lock *lock) { PROFILE_STEALING("fork_join_task::split_task") fork_join_sub_task *stolen_sub_task = get_stolen_sub_task(); if (stolen_sub_task == nullptr) { return false; } fork_join_task task{stolen_sub_task, this->unique_id()}; // In success case, unlock. // TODO: this locking is complicated and error prone. lock->unlock(); scheduler::execute_task(task, depth()); return true; } void fork_join_task::execute() { PROFILE_WORK_BLOCK("execute fork_join_task"); // Bind this instance to our OS thread my_stack_ = base::this_thread::state()->task_stack_; root_task_->tbb_task_ = this; root_task_->stack_state_ = my_stack_->save_state(); // Execute it on our OS thread until its finished root_task_->execute(); } fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; } fork_join_task::fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id) : abstract_task{0, id}, root_task_{root_task}, currently_executing_{nullptr}, my_stack_{nullptr}, deque_{}, last_stolen_{nullptr} {} } } }