fork_join_task.cpp 4.25 KB
Newer Older
1
#include "pls/internal/helpers/profiler.h"
2

3
#include "pls/internal/scheduling/scheduler.h"
4
#include "pls/internal/scheduling/fork_join_task.h"
5 6 7 8

namespace pls {
    namespace internal {
        namespace scheduling {
9
            fork_join_sub_task::fork_join_sub_task():
10
                data_structures::deque_item{},
11 12 13
                ref_count_{0},
                parent_{nullptr},
                tbb_task_{nullptr},
14
                stack_state_{nullptr} {}
15

16
            fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other):
17
                data_structures::deque_item(other),
18 19 20 21
                ref_count_{0},
                parent_{nullptr},
                tbb_task_{nullptr},
                stack_state_{nullptr} {}
22

23
            void fork_join_sub_task::execute()  {
24
                PROFILE_WORK_BLOCK("execute sub_task")
25
                tbb_task_->currently_executing_ = this;
26
                execute_internal();
27
                tbb_task_->currently_executing_ = nullptr;
28
                PROFILE_END_BLOCK
29
                wait_for_all();
30 31 32 33

                if (parent_ != nullptr) {
                    parent_->ref_count_--;
                }
34 35
            }

36
            void fork_join_sub_task::spawn_child_internal(fork_join_sub_task* sub_task) {
37 38 39 40 41 42 43 44
                // Keep our refcount up to date
                ref_count_++;

                // Assign forced values
                sub_task->parent_ = this;
                sub_task->tbb_task_ = tbb_task_;
                sub_task->stack_state_ = tbb_task_->my_stack_->save_state();

45
                tbb_task_->deque_.push_tail(sub_task);
46 47
            }

48
            void fork_join_sub_task::wait_for_all() {
49
                while (ref_count_ > 0) {
50
                    PROFILE_STEALING("get local sub task")
51
                    fork_join_sub_task* local_task = tbb_task_->get_local_sub_task();
52
                    PROFILE_END_BLOCK
53 54 55 56
                    if (local_task != nullptr) {
                        local_task->execute();
                    } else {
                        // Try to steal work.
57
                        // External steal will be executed implicitly if success
58
                        PROFILE_STEALING("steal work")
59
                        bool internal_steal_success = tbb_task_->steal_work();
60
                        PROFILE_END_BLOCK
61
                        if (internal_steal_success) {
62
                            tbb_task_->last_stolen_->execute();
63 64 65
                        }
                    }
                }
66 67 68
                tbb_task_->my_stack_->reset_state(stack_state_);
            }

69
            fork_join_sub_task* fork_join_task::get_local_sub_task() {
70
                return deque_.pop_tail();
71 72
            }

73
            fork_join_sub_task* fork_join_task::get_stolen_sub_task() {
74
                return deque_.pop_head();
75 76
            }

77
            bool fork_join_task::internal_stealing(abstract_task* other_task) {
78
                PROFILE_STEALING("fork_join_task::internal_stealin")
79
                auto cast_other_task = reinterpret_cast<fork_join_task*>(other_task);
80 81 82 83 84

                auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
                if (stolen_sub_task == nullptr) {
                    return false;
                } else {
85
                    // Make sub-task belong to our fork_join_task instance
86 87 88 89 90 91 92 93 94
                    stolen_sub_task->tbb_task_ = this;
                    stolen_sub_task->stack_state_ = my_stack_->save_state();
                    // We will execute this next without explicitly moving it onto our stack storage
                    last_stolen_ = stolen_sub_task;

                    return true;
                }
            }

95
            bool fork_join_task::split_task(base::spin_lock* lock) {
96
                PROFILE_STEALING("fork_join_task::split_task")
97
                fork_join_sub_task* stolen_sub_task = get_stolen_sub_task();
98 99 100
                if (stolen_sub_task == nullptr) {
                    return false;
                }
101
                fork_join_task task{stolen_sub_task, this->unique_id()};
102

103 104 105
                // In success case, unlock.
                // TODO: this locking is complicated and error prone.
                lock->unlock();
106 107 108

                scheduler::execute_task(task, depth());
                return true;
109 110 111 112
            }
        }
    }
}