fork_join_task.cpp 3.73 KB
Newer Older
1
#include "pls/internal/helpers/profiler.h"
2

3
#include "pls/internal/scheduling/scheduler.h"
4
#include "pls/internal/scheduling/fork_join_task.h"
5 6

namespace pls {
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
namespace internal {
namespace scheduling {

fork_join_sub_task::fork_join_sub_task() :
    data_structures::deque_item{},
    ref_count_{0},
    parent_{nullptr},
    tbb_task_{nullptr},
    stack_state_{nullptr} {}

fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) :
    data_structures::deque_item(other),
    ref_count_{0},
    parent_{nullptr},
    tbb_task_{nullptr},
    stack_state_{nullptr} {}

void fork_join_sub_task::execute() {
  PROFILE_WORK_BLOCK("execute sub_task")
  tbb_task_->currently_executing_ = this;
  execute_internal();
  tbb_task_->currently_executing_ = nullptr;
  PROFILE_END_BLOCK
  wait_for_all();

  if (parent_ != nullptr) {
    parent_->ref_count_--;
  }
}

void fork_join_sub_task::spawn_child_internal(fork_join_sub_task *sub_task) {
  // Keep our refcount up to date
  ref_count_++;

  // Assign forced values
  sub_task->parent_ = this;
  sub_task->tbb_task_ = tbb_task_;
  sub_task->stack_state_ = tbb_task_->my_stack_->save_state();

  tbb_task_->deque_.push_tail(sub_task);
}

void fork_join_sub_task::wait_for_all() {
  while (ref_count_ > 0) {
    PROFILE_STEALING("get local sub task")
    fork_join_sub_task *local_task = tbb_task_->get_local_sub_task();
    PROFILE_END_BLOCK
    if (local_task != nullptr) {
      local_task->execute();
    } else {
      // Try to steal work.
      // External steal will be executed implicitly if success
      PROFILE_STEALING("steal work")
      bool internal_steal_success = tbb_task_->steal_work();
      PROFILE_END_BLOCK
      if (internal_steal_success) {
        tbb_task_->last_stolen_->execute();
      }
65
    }
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
  }
  tbb_task_->my_stack_->reset_state(stack_state_);
}

fork_join_sub_task *fork_join_task::get_local_sub_task() {
  return deque_.pop_tail();
}

fork_join_sub_task *fork_join_task::get_stolen_sub_task() {
  return deque_.pop_head();
}

bool fork_join_task::internal_stealing(abstract_task *other_task) {
  PROFILE_STEALING("fork_join_task::internal_stealin")
  auto cast_other_task = reinterpret_cast<fork_join_task *>(other_task);

  auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
  if (stolen_sub_task == nullptr) {
    return false;
  } else {
    // Make sub-task belong to our fork_join_task instance
    stolen_sub_task->tbb_task_ = this;
    stolen_sub_task->stack_state_ = my_stack_->save_state();
    // We will execute this next without explicitly moving it onto our stack storage
    last_stolen_ = stolen_sub_task;

    return true;
  }
}

bool fork_join_task::split_task(base::spin_lock *lock) {
  PROFILE_STEALING("fork_join_task::split_task")
  fork_join_sub_task *stolen_sub_task = get_stolen_sub_task();
  if (stolen_sub_task == nullptr) {
    return false;
  }
  fork_join_task task{stolen_sub_task, this->unique_id()};

  // In success case, unlock.
  // TODO: this locking is complicated and error prone.
  lock->unlock();

  scheduler::execute_task(task, depth());
  return true;
}

void fork_join_task::execute() {
  PROFILE_WORK_BLOCK("execute fork_join_task");

  // Bind this instance to our OS thread
  my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
  root_task_->tbb_task_ = this;
  root_task_->stack_state_ = my_stack_->save_state();

  // Execute it on our OS thread until its finished
  root_task_->execute();
}

fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; }

fork_join_task::fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id) :
    abstract_task{0, id},
    root_task_{root_task},
    currently_executing_{nullptr},
    my_stack_{nullptr},
    deque_{},
    last_stolen_{nullptr} {}

}
}
136
}