fork_join_task.cpp 3.52 KB
Newer Older
1
#include "pls/internal/scheduling/scheduler.h"
2
#include "pls/internal/scheduling/fork_join_task.h"
3 4 5 6

namespace pls {
    namespace internal {
        namespace scheduling {
7
            fork_join_sub_task::fork_join_sub_task():
8
                base::deque_item{},
9 10 11
                ref_count_{0},
                parent_{nullptr},
                tbb_task_{nullptr},
12
                stack_state_{nullptr} {}
13

14
            fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other): base::deque_item(other) {
15
                // Do Nothing, will be inited after this anyways
16 17
            }

18
            void fork_join_sub_task::execute()  {
19 20
                execute_internal();
                wait_for_all();
21 22 23 24

                if (parent_ != nullptr) {
                    parent_->ref_count_--;
                }
25 26
            }

27
            void fork_join_sub_task::spawn_child_internal(fork_join_sub_task* sub_task) {
28 29 30 31 32 33 34 35
                // Keep our refcount up to date
                ref_count_++;

                // Assign forced values
                sub_task->parent_ = this;
                sub_task->tbb_task_ = tbb_task_;
                sub_task->stack_state_ = tbb_task_->my_stack_->save_state();

36
                tbb_task_->deque_.push_tail(sub_task);
37 38
            }

39
            void fork_join_sub_task::wait_for_all() {
40
                while (ref_count_ > 0) {
41
                    fork_join_sub_task* local_task = tbb_task_->get_local_sub_task();
42 43 44 45
                    if (local_task != nullptr) {
                        local_task->execute();
                    } else {
                        // Try to steal work.
46
                        // External steal will be executed implicitly if success
47
                        if (tbb_task_->steal_work()) {
48
                            tbb_task_->last_stolen_->execute();
49 50 51
                        }
                    }
                }
52 53 54
                tbb_task_->my_stack_->reset_state(stack_state_);
            }

55
            fork_join_sub_task* fork_join_task::get_local_sub_task() {
56
                return deque_.pop_tail();
57 58
            }

59
            fork_join_sub_task* fork_join_task::get_stolen_sub_task() {
60
                return deque_.pop_head();
61 62
            }

63 64
            bool fork_join_task::internal_stealing(abstract_task* other_task) {
                auto cast_other_task = reinterpret_cast<fork_join_task*>(other_task);
65 66 67 68 69

                auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
                if (stolen_sub_task == nullptr) {
                    return false;
                } else {
70
                    // Make sub-task belong to our fork_join_task instance
71 72 73 74 75 76 77 78 79
                    stolen_sub_task->tbb_task_ = this;
                    stolen_sub_task->stack_state_ = my_stack_->save_state();
                    // We will execute this next without explicitly moving it onto our stack storage
                    last_stolen_ = stolen_sub_task;

                    return true;
                }
            }

80 81
            bool fork_join_task::split_task(base::spin_lock* lock) {
                fork_join_sub_task* stolen_sub_task = get_stolen_sub_task();
82 83 84
                if (stolen_sub_task == nullptr) {
                    return false;
                }
85
                fork_join_task task{stolen_sub_task, this->unique_id()};
86

87 88 89
                // In success case, unlock.
                // TODO: this locking is complicated and error prone.
                lock->unlock();
90 91 92

                scheduler::execute_task(task, depth());
                return true;
93 94 95 96
            }
        }
    }
}