diff --git a/app/benchmark_unbalanced/main.cpp b/app/benchmark_unbalanced/main.cpp index 603bd78..1b5cda6 100644 --- a/app/benchmark_unbalanced/main.cpp +++ b/app/benchmark_unbalanced/main.cpp @@ -24,7 +24,7 @@ int count_child_nodes(uts::node &node) { for (size_t i = 0; i < children.size(); i++) { size_t index = i; auto lambda = [&, index] { results[index] = count_child_nodes(children[index]); }; - pls::fork_join_lambda_by_value sub_task(lambda); + pls::lambda_task_by_value sub_task(lambda); current_task->spawn_child(sub_task); } current_task->wait_for_all(); @@ -43,8 +43,8 @@ int unbalanced_tree_search(int seed, int root_children, double q, int normal_chi uts::node root(seed, root_children, q, normal_children); result = count_child_nodes(root); }; - pls::fork_join_lambda_by_reference task(lambda); - pls::fork_join_lambda_by_reference sub_task(lambda); + pls::lambda_task_by_reference task(lambda); + pls::lambda_task_by_reference sub_task(lambda); pls::task root_task{&sub_task, id}; pls::scheduler::execute_task(root_task); diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 42a1c06..c777def 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -20,7 +20,7 @@ add_library(pls STATIC include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h - include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp + include/pls/internal/data_structures/locking_deque.h src/internal/data_structures/locking_deque.cpp include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h include/pls/internal/data_structures/stamped_integer.h @@ -29,15 +29,12 @@ add_library(pls STATIC include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/unique_id.h - src/internal/scheduling/root_task.cpp include/pls/internal/scheduling/thread_state.h - src/internal/scheduling/abstract_task.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h - src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp - src/internal/scheduling/parallel_iterator_task.cpp) + include/pls/internal/scheduling/lambda_task.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h index ec89a0f..4b337bd 100644 --- a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h +++ b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h @@ -2,70 +2,35 @@ #ifndef PLS_INVOKE_PARALLEL_IMPL_H #define PLS_INVOKE_PARALLEL_IMPL_H -#include #include "pls/internal/scheduling/task.h" +#include "pls/internal/scheduling/lambda_task.h" #include "pls/internal/scheduling/scheduler.h" -#include "pls/internal/helpers/unique_id.h" -#include "pls/internal/base/alignment.h" namespace pls { namespace algorithm { -namespace internal { - -using namespace ::pls::internal::scheduling; - -template -inline void run_body(const Body &internal_body, const abstract_task::id &id) { - // Make sure we are in the context of this invoke_parallel instance, - // if not we will spawn it as a new 'fork-join-style' task. - auto current_task = scheduler::current_task(); - if (current_task->unique_id() == id) { - internal_body(); - } else { - fork_join_lambda_by_reference root_body(internal_body); - task root_task{&root_body, id}; - scheduler::execute_task(root_task); - } -} -} template void invoke_parallel(const Function1 &function1, const Function2 &function2) { using namespace ::pls::internal::scheduling; - using namespace ::pls::internal::helpers; - using namespace ::pls::internal::base; - static abstract_task::id id = unique_id::create(); - - auto internal_body = [&]() { - auto current_task = task::current(); - auto sub_task_2 = fork_join_lambda_by_reference(function2); - current_task->spawn_child(sub_task_2); - function1(); // Execute first function 'inline' without spawning a sub_task object - current_task->wait_for_all(); - }; + auto sub_task_2 = lambda_task_by_reference(function2); - internal::run_body(internal_body, id); + scheduler::spawn_child(sub_task_2); + function1(); // Execute first function 'inline' without spawning a sub_task object + scheduler::wait_for_all(); } template void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3) { using namespace ::pls::internal::scheduling; - using namespace ::pls::internal::helpers; - static abstract_task::id id = unique_id::create(); - - auto internal_body = [&]() { - auto current_task = task::current(); - auto sub_task_2 = fork_join_lambda_by_reference(function2); - auto sub_task_3 = fork_join_lambda_by_reference(function3); - current_task->spawn_child(sub_task_2); - current_task->spawn_child(sub_task_3); - function1(); // Execute first function 'inline' without spawning a sub_task object - current_task->wait_for_all(); - }; + auto sub_task_2 = lambda_task_by_reference(function2); + auto sub_task_3 = lambda_task_by_reference(function3); - internal::run_body(internal_body, id); + scheduler::spawn_child(sub_task_2); + scheduler::spawn_child(sub_task_3); + function1(); // Execute first function 'inline' without spawning a sub_task object + scheduler::wait_for_all(); } } diff --git a/lib/pls/include/pls/algorithms/parallel_for.h b/lib/pls/include/pls/algorithms/parallel_for.h index 58334fe..4df1104 100644 --- a/lib/pls/include/pls/algorithms/parallel_for.h +++ b/lib/pls/include/pls/algorithms/parallel_for.h @@ -8,9 +8,6 @@ namespace algorithm { template void parallel_for(RandomIt first, RandomIt last, const Function &function); -template -void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function); - } } #include "parallel_for_impl.h" diff --git a/lib/pls/include/pls/algorithms/parallel_for_impl.h b/lib/pls/include/pls/algorithms/parallel_for_impl.h index f6caa97..786b875 100644 --- a/lib/pls/include/pls/algorithms/parallel_for_impl.h +++ b/lib/pls/include/pls/algorithms/parallel_for_impl.h @@ -3,20 +3,16 @@ #define PLS_PARALLEL_FOR_IMPL_H #include "pls/internal/scheduling/task.h" -#include "pls/internal/scheduling/parallel_iterator_task.h" -#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/helpers/unique_id.h" namespace pls { namespace algorithm { -namespace internal { + template void parallel_for(RandomIt first, RandomIt last, const Function &function) { using namespace ::pls::internal::scheduling; - using namespace ::pls::internal::helpers; - using namespace ::pls::internal::base; constexpr long min_elements = 4; long num_elements = std::distance(first, last); @@ -29,39 +25,14 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) { // Cut in half recursively long middle_index = num_elements / 2; - auto body = [=] { internal::parallel_for(first + middle_index, last, function); }; - fork_join_lambda_by_reference second_half_task(body); - task::current()->spawn_child(second_half_task); + auto body = [=] { parallel_for(first + middle_index, last, function); }; + lambda_task_by_reference second_half_task(body); + scheduler::spawn_child(second_half_task); parallel_for(first, first + middle_index, function); - task::current()->wait_for_all(); + scheduler::wait_for_all(); } } -} - -template -void parallel_for(RandomIt first, RandomIt last, const Function &function) { - using namespace ::pls::internal::scheduling; - using namespace ::pls::internal::helpers; - using namespace ::pls::internal::base; - static abstract_task::id id = unique_id::create(); - - parallel_iterator_task iterator_task{first, last, function, id}; - scheduler::execute_task(iterator_task); -} - -template -void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function) { - using namespace ::pls::internal::scheduling; - using namespace ::pls::internal::helpers; - using namespace ::pls::internal::base; - static abstract_task::id id = unique_id::create(); - - auto body = [=] { internal::parallel_for(first, last, function); }; - fork_join_lambda_by_reference root_body(body); - task root_task{&root_body, id}; - scheduler::execute_task(root_task); -} } } diff --git a/lib/pls/include/pls/internal/data_structures/deque.h b/lib/pls/include/pls/internal/data_structures/locking_deque.h similarity index 74% rename from lib/pls/include/pls/internal/data_structures/deque.h rename to lib/pls/include/pls/internal/data_structures/locking_deque.h index 8f555da..fb19071 100644 --- a/lib/pls/include/pls/internal/data_structures/deque.h +++ b/lib/pls/include/pls/internal/data_structures/locking_deque.h @@ -11,24 +11,24 @@ namespace data_structures { /** * Turns any object into deque item when inheriting from this. */ -class deque_item { - friend class deque_internal; +class locking_deque_item { + friend class locking_deque_internal; - deque_item *prev_; - deque_item *next_; + locking_deque_item *prev_; + locking_deque_item *next_; }; -class deque_internal { +class locking_deque_internal { protected: - deque_item *head_; - deque_item *tail_; + locking_deque_item *head_; + locking_deque_item *tail_; base::spin_lock lock_; - deque_item *pop_head_internal(); - deque_item *pop_tail_internal(); - void push_tail_internal(deque_item *new_item); + locking_deque_item *pop_head_internal(); + locking_deque_item *pop_tail_internal(); + void push_tail_internal(locking_deque_item *new_item); }; /** @@ -38,9 +38,9 @@ class deque_internal { * @tparam Item The type of items stored in this deque */ template -class deque : deque_internal { +class locking_deque : locking_deque_internal { public: - explicit deque() : deque_internal{} {} + explicit locking_deque() : locking_deque_internal{} {} inline Item *pop_head() { return static_cast(pop_head_internal()); diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h index 5c8ce86..ffdf77f 100644 --- a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h @@ -6,7 +6,6 @@ #include "pls/internal/base/error_handling.h" #include "pls/internal/data_structures/stamped_integer.h" -#include "pls/internal/scheduling/thread_state.h" #include "aligned_stack.h" diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 28a9f09..3768e67 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -12,20 +12,18 @@ #include "pls/internal/base/thread.h" #include "pls/internal/base/barrier.h" -#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/scheduler_memory.h" +#include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/task.h" namespace pls { namespace internal { namespace scheduling { -void worker_routine(); using scheduler_thread = base::thread; class scheduler { friend class task; - friend void worker_routine(); - const unsigned int num_threads_; scheduler_memory *memory_; @@ -72,22 +70,18 @@ class scheduler { * @param sub_task the new task to be spawned */ template - void spawn_child(T &sub_task) { - this_thread_state()->current_task_->spawn_child(sub_task); - } + static void spawn_child(T &sub_task); /** * Helper to wait for all children of the currently executing task. */ - void wait_for_all() { - this_thread_state()->current_task_->wait_for_all(); - } + static void wait_for_all(); unsigned int num_threads() const { return num_threads_; } private: - // Helpers for accessing thread states - thread_state *thread_state_for(size_t id) { return memory_->thread_state_for(id); } + static void worker_routine(); + thread_state *thread_state_for(size_t id); task *get_local_task(); task *steal_task(); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index f345d49..b2157b9 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -9,31 +9,42 @@ namespace scheduling { template void scheduler::perform_work(Function work_section) { PROFILE_WORK_BLOCK("scheduler::perform_work") - root_task master{work_section}; - - // Push root task on stacks - auto new_master = memory_->task_stack_for(0)->push(master); - memory_->thread_state_for(0)->root_task_ = new_master; - memory_->thread_state_for(0)->current_task_ = new_master; - for (unsigned int i = 1; i < num_threads_; i++) { - root_worker_task worker{new_master}; - auto new_worker = memory_->task_stack_for(0)->push(worker); - memory_->thread_state_for(i)->root_task_ = new_worker; - memory_->thread_state_for(i)->current_task_ = new_worker; - } - - // Perform and wait for work - sync_barrier_.wait(); // Trigger threads to wake up - sync_barrier_.wait(); // Wait for threads to finish - - // Clean up stack - memory_->task_stack_for(0)->pop(); - for (unsigned int i = 1; i < num_threads_; i++) { - root_worker_task worker{new_master}; - memory_->task_stack_for(0)->pop(); - } +// root_task master{work_section}; +// +// // Push root task on stacks +// auto new_master = memory_->task_stack_for(0)->push(master); +// memory_->thread_state_for(0)->root_task_ = new_master; +// memory_->thread_state_for(0)->current_task_ = new_master; +// for (unsigned int i = 1; i < num_threads_; i++) { +// root_worker_task worker{new_master}; +// auto new_worker = memory_->task_stack_for(0)->push(worker); +// memory_->thread_state_for(i)->root_task_ = new_worker; +// memory_->thread_state_for(i)->current_task_ = new_worker; +// } +// +// // Perform and wait for work +// sync_barrier_.wait(); // Trigger threads to wake up +// sync_barrier_.wait(); // Wait for threads to finish +// +// // Clean up stack +// memory_->task_stack_for(0)->pop(); +// for (unsigned int i = 1; i < num_threads_; i++) { +// root_worker_task worker{new_master}; +// memory_->task_stack_for(0)->pop(); +// } } +template +void scheduler::spawn_child(T &sub_task) { + thread_state::get()->current_task_->spawn_child(sub_task); +} + +void scheduler::wait_for_all() { + thread_state::get()->current_task_->wait_for_all(); +} + +thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); } + } } } diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h index 602736a..988ef1d 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h @@ -1,10 +1,10 @@ +#ifndef PLS_SCHEDULER_MEMORY_H +#define PLS_SCHEDULER_MEMORY_H + #include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/base/thread.h" -#include "thread_state.h" - -#ifndef PLS_SCHEDULER_MEMORY_H -#define PLS_SCHEDULER_MEMORY_H +#include "pls/internal/scheduling/thread_state.h" namespace pls { namespace internal { diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 68c856e..ed4d928 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -1,14 +1,13 @@ -#ifndef PLS_TBB_LIKE_TASK_H -#define PLS_TBB_LIKE_TASK_H +#ifndef PLS_TASK_H +#define PLS_TASK_H #include "pls/internal/helpers/profiler.h" #include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/work_stealing_deque.h" -#include "abstract_task.h" -#include "thread_state.h" +#include "pls/internal/scheduling/thread_state.h" namespace pls { namespace internal { @@ -37,55 +36,32 @@ class task { template void spawn_child(T &sub_task); void wait_for_all(); + private: void execute(); -}; - -template -class fork_join_lambda_by_reference : public task { - const Function &function_; - - public: - explicit fork_join_lambda_by_reference(const Function &function) : task{}, function_{function} {}; - - protected: - void execute_internal() override { - function_(); - } -}; - -template -class fork_join_lambda_by_value : public task { - const Function function_; - - public: - explicit fork_join_lambda_by_value(const Function &function) : task{}, function_{function} {}; - - protected: - void execute_internal() override { - function_(); - } + bool try_execute_local(); + bool try_execute_stolen(); }; template void task::spawn_child(T &sub_task) { PROFILE_FORK_JOIN_STEALING("spawn_child") - static_assert(std::is_base_of::value, "Only pass task subclasses!"); + static_assert(std::is_base_of::value, "Only pass task subclasses!"); // Keep our refcount up to date ref_count_++; - // Assign forced values + // Assign forced values (for stack and parent management) sub_task.parent_ = this; - sub_task.deque_state_ = scheduler::this_thread_state()->deque_.save_state(); + sub_task.deque_state_ = thread_state::get()->deque_.save_state(); // Push on our deque const T const_task = sub_task; - scheduler::this_thread_state()->deque_.push_tail(const_task); + thread_state::get()->deque_.push_tail(const_task); } } } } -#endif //PLS_TBB_LIKE_TASK_H +#endif //PLS_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index b7fb030..3b08c5c 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -4,9 +4,10 @@ #include +#include "pls/internal/base/thread.h" + #include "pls/internal/data_structures/aligned_stack.h" -#include "pls/internal/data_structures/deque.h" -#include "pls/internal/scheduling/task.h" +#include "pls/internal/data_structures/work_stealing_deque.h" namespace pls { namespace internal { @@ -14,13 +15,14 @@ namespace scheduling { // forward declaration class scheduler; +class task; struct thread_state { alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_; alignas(base::system_details::CACHE_LINE_SIZE) task *root_task_; alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_; - alignas(base::system_details::CACHE_LINE_SIZE) data_structures::deque deque_; + alignas(base::system_details::CACHE_LINE_SIZE) data_structures::work_stealing_deque deque_; alignas(base::system_details::CACHE_LINE_SIZE) size_t id_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; @@ -50,11 +52,6 @@ struct thread_state { * @return The thread_state of this thread. */ static thread_state *get() { return base::this_thread::state(); } - static scheduler *scheduler() { return get()->scheduler_; } - static task *current_task() { return get()->current_task_; } - static void set_current_task(task *task) { get()->current_task_ = task; }; - static data_structures::aligned_stack *task_stack() { return get()->task_stack_; } - static data_structures::deque *deque() { return get()->deque_; } }; } diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index 36c83d0..df3ade5 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -19,12 +19,11 @@ using task_id = internal::scheduling::abstract_task::id; using unique_id = internal::helpers::unique_id; using internal::scheduling::task; -using internal::scheduling::fork_join_lambda_by_reference; -using internal::scheduling::fork_join_lambda_by_value; +using internal::scheduling::lambda_task_by_reference; +using internal::scheduling::lambda_task_by_value; using internal::scheduling::task; using algorithm::invoke_parallel; -using algorithm::parallel_for_fork_join; using algorithm::parallel_for; } diff --git a/lib/pls/src/internal/data_structures/deque.cpp b/lib/pls/src/internal/data_structures/locking_deque.cpp similarity index 87% rename from lib/pls/src/internal/data_structures/deque.cpp rename to lib/pls/src/internal/data_structures/locking_deque.cpp index 9f13b0d..90971ce 100644 --- a/lib/pls/src/internal/data_structures/deque.cpp +++ b/lib/pls/src/internal/data_structures/locking_deque.cpp @@ -1,19 +1,19 @@ #include -#include "pls/internal/data_structures/deque.h" +#include "pls/internal/data_structures/locking_deque.h" namespace pls { namespace internal { namespace data_structures { -deque_item *deque_internal::pop_head_internal() { +locking_deque_item *locking_deque_internal::pop_head_internal() { std::lock_guard lock{lock_}; if (head_ == nullptr) { return nullptr; } - deque_item *result = head_; + locking_deque_item *result = head_; head_ = head_->next_; if (head_ == nullptr) { tail_ = nullptr; @@ -24,14 +24,14 @@ deque_item *deque_internal::pop_head_internal() { return result; } -deque_item *deque_internal::pop_tail_internal() { +locking_deque_item *locking_deque_internal::pop_tail_internal() { std::lock_guard lock{lock_}; if (tail_ == nullptr) { return nullptr; } - deque_item *result = tail_; + locking_deque_item *result = tail_; tail_ = tail_->prev_; if (tail_ == nullptr) { head_ = nullptr; @@ -42,7 +42,7 @@ deque_item *deque_internal::pop_tail_internal() { return result; } -void deque_internal::push_tail_internal(deque_item *new_item) { +void locking_deque_internal::push_tail_internal(locking_deque_item *new_item) { std::lock_guard lock{lock_}; if (tail_ != nullptr) { diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 9491a06..a636103 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -1,4 +1,7 @@ #include "pls/internal/scheduling/scheduler.h" +#include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/task.h" + #include "pls/internal/base/error_handling.h" namespace pls { @@ -17,7 +20,7 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) : for (unsigned int i = 0; i < num_threads_; i++) { // Placement new is required, as the memory of `memory_` is not required to be initialized. new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i}; - new((void *) memory_->thread_for(i))base::thread(&worker_routine, + new((void *) memory_->thread_for(i))base::thread(&scheduler::worker_routine, memory_->thread_state_for(i)); } } @@ -26,7 +29,7 @@ scheduler::~scheduler() { terminate(); } -void worker_routine() { +void scheduler::worker_routine() { auto my_state = base::this_thread::state(); while (true) { @@ -59,6 +62,44 @@ void scheduler::terminate(bool wait_for_workers) { } } +task *scheduler::get_local_task() { + PROFILE_STEALING("Get Local Task") + return thread_state::get()->deque_.pop_tail(); +} + +task *scheduler::steal_task() { + PROFILE_STEALING("Steal Task") + + // Data for victim selection + const auto my_state = thread_state::get(); + + const auto my_id = my_state->id_; + const size_t offset = my_state->random_() % num_threads(); + const size_t max_tries = num_threads(); // TODO: Tune this value + + // Current strategy: random start, then round robin from there + for (size_t i = 0; i < max_tries; i++) { + size_t target = (offset + i) % num_threads(); + + // Skip our self for stealing + if (target == my_id) { + continue; + } + + auto target_state = thread_state_for(target); + // TODO: See if we should re-try popping if it failed due to contention + auto result = target_state->deque_.pop_head(); + if (result != nullptr) { + return result; + } + + // TODO: See if we should backoff here (per missed steal) + } + + // TODO: See if we should backoff here (after a 'round' of missed steals) + return nullptr; +} + } } } diff --git a/lib/pls/src/internal/scheduling/task.cpp b/lib/pls/src/internal/scheduling/task.cpp index 276a407..118e409 100644 --- a/lib/pls/src/internal/scheduling/task.cpp +++ b/lib/pls/src/internal/scheduling/task.cpp @@ -36,73 +36,34 @@ void task::execute() { } } -void task::wait_for_all() { - while (ref_count_ > 0) { - PROFILE_STEALING("get local sub task") - task *local_task = thread_state::get()->scheduler_->get_local_task(); - PROFILE_END_BLOCK - if (local_task != nullptr) { - local_task->execute(); - } else { - // Try to steal work. - // External steal will be executed implicitly if success - PROFILE_STEALING("steal work") - task *stolen_task = thread_state::get()->scheduler_->steal_task(); - PROFILE_END_BLOCK - if (stolen_task != nullptr) { - stolen_task->execute(); - } - } - } - tbb_task_->deque_.release_memory_until(deque_state_); -} - -bool task::internal_stealing(abstract_task *other_task) { - PROFILE_STEALING("task::internal_stealin") - auto cast_other_task = reinterpret_cast(other_task); - - auto stolen_sub_task = cast_other_task->get_stolen_sub_task(); - if (stolen_sub_task == nullptr) { - return false; - } else { - // Make sub-task belong to our task instance - stolen_sub_task->tbb_task_ = this; - stolen_sub_task->deque_state_ = deque_.save_state(); - // We will execute this next without explicitly moving it onto our stack storage - last_stolen_ = stolen_sub_task; - +bool task::try_execute_local() { + task *local_task = thread_state::get()->scheduler_->get_local_task(); + if (local_task != nullptr) { + local_task->execute(); return true; + } else { + return false; } } -bool task::split_task(base::swmr_spin_lock *lock) { - PROFILE_STEALING("task::split_task") - fork_join_sub_task *stolen_sub_task = get_stolen_sub_task(); - if (stolen_sub_task == nullptr) { - return false; +bool task::try_execute_stolen() { + task *stolen_task = thread_state::get()->scheduler_->steal_task(); + if (stolen_task != nullptr) { + stolen_task->deque_state_ = thread_state::get()->deque_.save_state(); + stolen_task->execute(); + return true; } - task task{stolen_sub_task, this->unique_id()}; - - // In success case, unlock. - lock->reader_unlock(); - scheduler::execute_task(task, depth()); - return true; + return false; } -void task::execute() { - PROFILE_WORK_BLOCK("execute task"); - - // Bind this instance to our OS thread - // TODO: See if we did this right - // my_stack_ = base::this_thread::state()->task_stack_; - deque_.reset_base_pointer(); - - root_task_->tbb_task_ = this; - root_task_->deque_state_ = deque_.save_state(); - - // Execute it on our OS thread until its finished - root_task_->execute(); +void task::wait_for_all() { + while (ref_count_ > 0) { + if (!try_execute_local()) { + try_execute_stolen(); + } + } + thread_state::get()->deque_.release_memory_until(deque_state_); } } diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index 97f91ca..add0e4b 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -3,7 +3,7 @@ #include #include -#include +#include #include #include @@ -77,11 +77,11 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a } TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]") { - class my_item : public deque_item { + class my_item : public locking_deque_item { }; - deque deque; + locking_deque deque; my_item one, two, three; SECTION("add and remove items form the tail") {