fork_join_task.cpp 3.72 KB
Newer Older
1
#include "pls/internal/helpers/profiler.h"
2

3
#include "pls/internal/scheduling/scheduler.h"
4
#include "pls/internal/scheduling/fork_join_task.h"
5 6

namespace pls {
7 8 9 10 11 12 13
namespace internal {
namespace scheduling {

fork_join_sub_task::fork_join_sub_task() :
    ref_count_{0},
    parent_{nullptr},
    tbb_task_{nullptr},
14
    deque_state_{0} {}
15 16 17

fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) :
    ref_count_{0},
18 19 20
    parent_{other.parent_},
    tbb_task_{other.tbb_task_},
    deque_state_{other.deque_state_} {}
21 22 23 24

void fork_join_sub_task::execute() {
  PROFILE_WORK_BLOCK("execute sub_task")
  tbb_task_->currently_executing_ = this;
25 26 27 28 29
  if (executed) {
    PLS_ERROR("Double Execution!")
  }
  executed = true;
  executed_at = base::this_thread::state<thread_state>()->id_;
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  execute_internal();
  tbb_task_->currently_executing_ = nullptr;
  PROFILE_END_BLOCK
  wait_for_all();

  if (parent_ != nullptr) {
    parent_->ref_count_--;
  }
}

void fork_join_sub_task::wait_for_all() {
  while (ref_count_ > 0) {
    PROFILE_STEALING("get local sub task")
    fork_join_sub_task *local_task = tbb_task_->get_local_sub_task();
    PROFILE_END_BLOCK
    if (local_task != nullptr) {
      local_task->execute();
    } else {
48 49 50 51 52 53 54
      // Try to steal work.
      // External steal will be executed implicitly if success
      PROFILE_STEALING("steal work")
      bool internal_steal_success = tbb_task_->steal_work();
      PROFILE_END_BLOCK
      if (internal_steal_success) {
        tbb_task_->last_stolen_->execute();
55
      }
56
    }
57
  }
58
  tbb_task_->deque_.release_memory_until(deque_state_);
59 60 61 62 63 64 65
}

fork_join_sub_task *fork_join_task::get_local_sub_task() {
  return deque_.pop_tail();
}

fork_join_sub_task *fork_join_task::get_stolen_sub_task() {
66
  return deque_.pop_head();
67 68
}

69 70 71 72
fork_join_sub_task *fork_join_sub_task::current() {
  return dynamic_cast<fork_join_task *>(scheduler::current_task())->currently_executing();
}

73 74 75 76 77 78 79 80 81 82
bool fork_join_task::internal_stealing(abstract_task *other_task) {
  PROFILE_STEALING("fork_join_task::internal_stealin")
  auto cast_other_task = reinterpret_cast<fork_join_task *>(other_task);

  auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
  if (stolen_sub_task == nullptr) {
    return false;
  } else {
    // Make sub-task belong to our fork_join_task instance
    stolen_sub_task->tbb_task_ = this;
83
    stolen_sub_task->deque_state_ = deque_.save_state();
84 85 86 87 88 89 90
    // We will execute this next without explicitly moving it onto our stack storage
    last_stolen_ = stolen_sub_task;

    return true;
  }
}

91
bool fork_join_task::split_task(base::swmr_spin_lock *lock) {
92 93 94 95 96 97 98 99
  PROFILE_STEALING("fork_join_task::split_task")
  fork_join_sub_task *stolen_sub_task = get_stolen_sub_task();
  if (stolen_sub_task == nullptr) {
    return false;
  }
  fork_join_task task{stolen_sub_task, this->unique_id()};

  // In success case, unlock.
100
  lock->reader_unlock();
101 102 103 104 105 106 107 108 109

  scheduler::execute_task(task, depth());
  return true;
}

void fork_join_task::execute() {
  PROFILE_WORK_BLOCK("execute fork_join_task");

  // Bind this instance to our OS thread
110 111 112 113
  // TODO: See if we did this right
  // my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
  deque_.reset_base_pointer();

114
  root_task_->tbb_task_ = this;
115
  root_task_->deque_state_ = deque_.save_state();
116 117 118 119 120 121 122

  // Execute it on our OS thread until its finished
  root_task_->execute();
}

fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; }

123 124
fork_join_task::fork_join_task(fork_join_sub_task *root_task,
                               const abstract_task::id &id) :
125 126 127
    abstract_task{0, id},
    root_task_{root_task},
    currently_executing_{nullptr},
128
    deque_{base::this_thread::state<thread_state>()->task_stack_},
129 130 131 132
    last_stolen_{nullptr} {}

}
}
133
}