fork_join_task.cpp 3.63 KB
Newer Older
1
#include "pls/internal/scheduling/scheduler.h"
2
#include "pls/internal/scheduling/fork_join_task.h"
3 4 5 6

namespace pls {
    namespace internal {
        namespace scheduling {
7
            fork_join_sub_task::fork_join_sub_task():
8
                base::deque_item{},
9 10 11
                ref_count_{0},
                parent_{nullptr},
                tbb_task_{nullptr},
12
                stack_state_{nullptr} {}
13

14
            fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other): base::deque_item(other) {
15
                // Do Nothing, will be inited after this anyways
16 17
            }

18
            void fork_join_sub_task::execute()  {
19
                tbb_task_->currently_executing_ = this;
20
                execute_internal();
21
                tbb_task_->currently_executing_ = nullptr;
22
                wait_for_all();
23 24 25 26

                if (parent_ != nullptr) {
                    parent_->ref_count_--;
                }
27 28
            }

29
            void fork_join_sub_task::spawn_child_internal(fork_join_sub_task* sub_task) {
30 31 32 33 34 35 36 37
                // Keep our refcount up to date
                ref_count_++;

                // Assign forced values
                sub_task->parent_ = this;
                sub_task->tbb_task_ = tbb_task_;
                sub_task->stack_state_ = tbb_task_->my_stack_->save_state();

38
                tbb_task_->deque_.push_tail(sub_task);
39 40
            }

41
            void fork_join_sub_task::wait_for_all() {
42
                while (ref_count_ > 0) {
43
                    fork_join_sub_task* local_task = tbb_task_->get_local_sub_task();
44 45 46 47
                    if (local_task != nullptr) {
                        local_task->execute();
                    } else {
                        // Try to steal work.
48
                        // External steal will be executed implicitly if success
49
                        if (tbb_task_->steal_work()) {
50
                            tbb_task_->last_stolen_->execute();
51 52 53
                        }
                    }
                }
54 55 56
                tbb_task_->my_stack_->reset_state(stack_state_);
            }

57
            fork_join_sub_task* fork_join_task::get_local_sub_task() {
58
                return deque_.pop_tail();
59 60
            }

61
            fork_join_sub_task* fork_join_task::get_stolen_sub_task() {
62
                return deque_.pop_head();
63 64
            }

65 66
            bool fork_join_task::internal_stealing(abstract_task* other_task) {
                auto cast_other_task = reinterpret_cast<fork_join_task*>(other_task);
67 68 69 70 71

                auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
                if (stolen_sub_task == nullptr) {
                    return false;
                } else {
72
                    // Make sub-task belong to our fork_join_task instance
73 74 75 76 77 78 79 80 81
                    stolen_sub_task->tbb_task_ = this;
                    stolen_sub_task->stack_state_ = my_stack_->save_state();
                    // We will execute this next without explicitly moving it onto our stack storage
                    last_stolen_ = stolen_sub_task;

                    return true;
                }
            }

82 83
            bool fork_join_task::split_task(base::spin_lock* lock) {
                fork_join_sub_task* stolen_sub_task = get_stolen_sub_task();
84 85 86
                if (stolen_sub_task == nullptr) {
                    return false;
                }
87
                fork_join_task task{stolen_sub_task, this->unique_id()};
88

89 90 91
                // In success case, unlock.
                // TODO: this locking is complicated and error prone.
                lock->unlock();
92 93 94

                scheduler::execute_task(task, depth());
                return true;
95 96 97 98
            }
        }
    }
}