Commit 374153ce by FritzFlorian

WIP: compiling version without abstract_task.

Please note that this does currently not execute, but only removed the 'old aspecs'. We are still missing launching the root fork_join task.
parent a7073632
...@@ -24,7 +24,7 @@ int count_child_nodes(uts::node &node) { ...@@ -24,7 +24,7 @@ int count_child_nodes(uts::node &node) {
for (size_t i = 0; i < children.size(); i++) { for (size_t i = 0; i < children.size(); i++) {
size_t index = i; size_t index = i;
auto lambda = [&, index] { results[index] = count_child_nodes(children[index]); }; auto lambda = [&, index] { results[index] = count_child_nodes(children[index]); };
pls::fork_join_lambda_by_value<typeof(lambda)> sub_task(lambda); pls::lambda_task_by_value<typeof(lambda)> sub_task(lambda);
current_task->spawn_child(sub_task); current_task->spawn_child(sub_task);
} }
current_task->wait_for_all(); current_task->wait_for_all();
...@@ -43,8 +43,8 @@ int unbalanced_tree_search(int seed, int root_children, double q, int normal_chi ...@@ -43,8 +43,8 @@ int unbalanced_tree_search(int seed, int root_children, double q, int normal_chi
uts::node root(seed, root_children, q, normal_children); uts::node root(seed, root_children, q, normal_children);
result = count_child_nodes(root); result = count_child_nodes(root);
}; };
pls::fork_join_lambda_by_reference<typeof(lambda)> task(lambda); pls::lambda_task_by_reference<typeof(lambda)> task(lambda);
pls::fork_join_lambda_by_reference<typeof(lambda)> sub_task(lambda); pls::lambda_task_by_reference<typeof(lambda)> sub_task(lambda);
pls::task root_task{&sub_task, id}; pls::task root_task{&sub_task, id};
pls::scheduler::execute_task(root_task); pls::scheduler::execute_task(root_task);
......
...@@ -20,7 +20,7 @@ add_library(pls STATIC ...@@ -20,7 +20,7 @@ add_library(pls STATIC
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp include/pls/internal/data_structures/locking_deque.h src/internal/data_structures/locking_deque.cpp
include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h
include/pls/internal/data_structures/stamped_integer.h include/pls/internal/data_structures/stamped_integer.h
...@@ -29,15 +29,12 @@ add_library(pls STATIC ...@@ -29,15 +29,12 @@ add_library(pls STATIC
include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/mini_benchmark.h
include/pls/internal/helpers/unique_id.h include/pls/internal/helpers/unique_id.h
src/internal/scheduling/root_task.cpp
include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/thread_state.h
src/internal/scheduling/abstract_task.cpp
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
src/internal/scheduling/run_on_n_threads_task.cpp
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
src/internal/scheduling/parallel_iterator_task.cpp) include/pls/internal/scheduling/lambda_task.h)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
......
...@@ -2,70 +2,35 @@ ...@@ -2,70 +2,35 @@
#ifndef PLS_INVOKE_PARALLEL_IMPL_H #ifndef PLS_INVOKE_PARALLEL_IMPL_H
#define PLS_INVOKE_PARALLEL_IMPL_H #define PLS_INVOKE_PARALLEL_IMPL_H
#include <pls/internal/scheduling/task.h>
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/lambda_task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h"
#include "pls/internal/base/alignment.h"
namespace pls { namespace pls {
namespace algorithm { namespace algorithm {
namespace internal {
using namespace ::pls::internal::scheduling;
template<typename Body>
inline void run_body(const Body &internal_body, const abstract_task::id &id) {
// Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
internal_body();
} else {
fork_join_lambda_by_reference<Body> root_body(internal_body);
task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
template<typename Function1, typename Function2> template<typename Function1, typename Function2>
void invoke_parallel(const Function1 &function1, const Function2 &function2) { void invoke_parallel(const Function1 &function1, const Function2 &function2) {
using namespace ::pls::internal::scheduling; using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<Function1, Function2>();
auto internal_body = [&]() {
auto current_task = task::current();
auto sub_task_2 = fork_join_lambda_by_reference<Function2>(function2);
current_task->spawn_child(sub_task_2); auto sub_task_2 = lambda_task_by_reference<Function2>(function2);
function1(); // Execute first function 'inline' without spawning a sub_task object
current_task->wait_for_all();
};
internal::run_body(internal_body, id); scheduler::spawn_child(sub_task_2);
function1(); // Execute first function 'inline' without spawning a sub_task object
scheduler::wait_for_all();
} }
template<typename Function1, typename Function2, typename Function3> template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3) { void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3) {
using namespace ::pls::internal::scheduling; using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
static abstract_task::id id = unique_id::create<Function1, Function2, Function3>();
auto internal_body = [&]() {
auto current_task = task::current();
auto sub_task_2 = fork_join_lambda_by_reference<Function2>(function2);
auto sub_task_3 = fork_join_lambda_by_reference<Function3>(function3);
current_task->spawn_child(sub_task_2); auto sub_task_2 = lambda_task_by_reference<Function2>(function2);
current_task->spawn_child(sub_task_3); auto sub_task_3 = lambda_task_by_reference<Function3>(function3);
function1(); // Execute first function 'inline' without spawning a sub_task object
current_task->wait_for_all();
};
internal::run_body(internal_body, id); scheduler::spawn_child(sub_task_2);
scheduler::spawn_child(sub_task_3);
function1(); // Execute first function 'inline' without spawning a sub_task object
scheduler::wait_for_all();
} }
} }
......
...@@ -8,9 +8,6 @@ namespace algorithm { ...@@ -8,9 +8,6 @@ namespace algorithm {
template<typename RandomIt, typename Function> template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function); void parallel_for(RandomIt first, RandomIt last, const Function &function);
template<typename RandomIt, typename Function>
void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function);
} }
} }
#include "parallel_for_impl.h" #include "parallel_for_impl.h"
......
...@@ -3,20 +3,16 @@ ...@@ -3,20 +3,16 @@
#define PLS_PARALLEL_FOR_IMPL_H #define PLS_PARALLEL_FOR_IMPL_H
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/parallel_iterator_task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
namespace pls { namespace pls {
namespace algorithm { namespace algorithm {
namespace internal {
template<typename RandomIt, typename Function> template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function) { void parallel_for(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling; using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
constexpr long min_elements = 4; constexpr long min_elements = 4;
long num_elements = std::distance(first, last); long num_elements = std::distance(first, last);
...@@ -29,39 +25,14 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) { ...@@ -29,39 +25,14 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) {
// Cut in half recursively // Cut in half recursively
long middle_index = num_elements / 2; long middle_index = num_elements / 2;
auto body = [=] { internal::parallel_for(first + middle_index, last, function); }; auto body = [=] { parallel_for(first + middle_index, last, function); };
fork_join_lambda_by_reference<decltype(body)> second_half_task(body); lambda_task_by_reference<decltype(body)> second_half_task(body);
task::current()->spawn_child(second_half_task); scheduler::spawn_child(second_half_task);
parallel_for(first, first + middle_index, function); parallel_for(first, first + middle_index, function);
task::current()->wait_for_all(); scheduler::wait_for_all();
} }
} }
}
template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<RandomIt, Function>();
parallel_iterator_task<RandomIt, Function> iterator_task{first, last, function, id};
scheduler::execute_task(iterator_task);
}
template<typename RandomIt, typename Function>
void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<RandomIt, Function>();
auto body = [=] { internal::parallel_for(first, last, function); };
fork_join_lambda_by_reference<decltype(body)> root_body(body);
task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
} }
} }
......
...@@ -11,24 +11,24 @@ namespace data_structures { ...@@ -11,24 +11,24 @@ namespace data_structures {
/** /**
* Turns any object into deque item when inheriting from this. * Turns any object into deque item when inheriting from this.
*/ */
class deque_item { class locking_deque_item {
friend class deque_internal; friend class locking_deque_internal;
deque_item *prev_; locking_deque_item *prev_;
deque_item *next_; locking_deque_item *next_;
}; };
class deque_internal { class locking_deque_internal {
protected: protected:
deque_item *head_; locking_deque_item *head_;
deque_item *tail_; locking_deque_item *tail_;
base::spin_lock lock_; base::spin_lock lock_;
deque_item *pop_head_internal(); locking_deque_item *pop_head_internal();
deque_item *pop_tail_internal(); locking_deque_item *pop_tail_internal();
void push_tail_internal(deque_item *new_item); void push_tail_internal(locking_deque_item *new_item);
}; };
/** /**
...@@ -38,9 +38,9 @@ class deque_internal { ...@@ -38,9 +38,9 @@ class deque_internal {
* @tparam Item The type of items stored in this deque * @tparam Item The type of items stored in this deque
*/ */
template<typename Item> template<typename Item>
class deque : deque_internal { class locking_deque : locking_deque_internal {
public: public:
explicit deque() : deque_internal{} {} explicit locking_deque() : locking_deque_internal{} {}
inline Item *pop_head() { inline Item *pop_head() {
return static_cast<Item *>(pop_head_internal()); return static_cast<Item *>(pop_head_internal());
......
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/thread_state.h"
#include "aligned_stack.h" #include "aligned_stack.h"
......
...@@ -12,20 +12,18 @@ ...@@ -12,20 +12,18 @@
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h" #include "pls/internal/base/barrier.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/scheduler_memory.h" #include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>; using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler { class scheduler {
friend class task; friend class task;
friend void worker_routine();
const unsigned int num_threads_; const unsigned int num_threads_;
scheduler_memory *memory_; scheduler_memory *memory_;
...@@ -72,22 +70,18 @@ class scheduler { ...@@ -72,22 +70,18 @@ class scheduler {
* @param sub_task the new task to be spawned * @param sub_task the new task to be spawned
*/ */
template<typename T> template<typename T>
void spawn_child(T &sub_task) { static void spawn_child(T &sub_task);
this_thread_state()->current_task_->spawn_child(sub_task);
}
/** /**
* Helper to wait for all children of the currently executing task. * Helper to wait for all children of the currently executing task.
*/ */
void wait_for_all() { static void wait_for_all();
this_thread_state()->current_task_->wait_for_all();
}
unsigned int num_threads() const { return num_threads_; } unsigned int num_threads() const { return num_threads_; }
private: private:
// Helpers for accessing thread states static void worker_routine();
thread_state *thread_state_for(size_t id) { return memory_->thread_state_for(id); } thread_state *thread_state_for(size_t id);
task *get_local_task(); task *get_local_task();
task *steal_task(); task *steal_task();
......
...@@ -9,31 +9,42 @@ namespace scheduling { ...@@ -9,31 +9,42 @@ namespace scheduling {
template<typename Function> template<typename Function>
void scheduler::perform_work(Function work_section) { void scheduler::perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work") PROFILE_WORK_BLOCK("scheduler::perform_work")
root_task<Function> master{work_section}; // root_task<Function> master{work_section};
//
// Push root task on stacks // // Push root task on stacks
auto new_master = memory_->task_stack_for(0)->push(master); // auto new_master = memory_->task_stack_for(0)->push(master);
memory_->thread_state_for(0)->root_task_ = new_master; // memory_->thread_state_for(0)->root_task_ = new_master;
memory_->thread_state_for(0)->current_task_ = new_master; // memory_->thread_state_for(0)->current_task_ = new_master;
for (unsigned int i = 1; i < num_threads_; i++) { // for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master}; // root_worker_task<Function> worker{new_master};
auto new_worker = memory_->task_stack_for(0)->push(worker); // auto new_worker = memory_->task_stack_for(0)->push(worker);
memory_->thread_state_for(i)->root_task_ = new_worker; // memory_->thread_state_for(i)->root_task_ = new_worker;
memory_->thread_state_for(i)->current_task_ = new_worker; // memory_->thread_state_for(i)->current_task_ = new_worker;
} // }
//
// Perform and wait for work // // Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up // sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish // sync_barrier_.wait(); // Wait for threads to finish
//
// Clean up stack // // Clean up stack
memory_->task_stack_for(0)->pop<typeof(master)>(); // memory_->task_stack_for(0)->pop<typeof(master)>();
for (unsigned int i = 1; i < num_threads_; i++) { // for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master}; // root_worker_task<Function> worker{new_master};
memory_->task_stack_for(0)->pop<typeof(worker)>(); // memory_->task_stack_for(0)->pop<typeof(worker)>();
} // }
} }
template<typename T>
void scheduler::spawn_child(T &sub_task) {
thread_state::get()->current_task_->spawn_child(sub_task);
}
void scheduler::wait_for_all() {
thread_state::get()->current_task_->wait_for_all();
}
thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); }
} }
} }
} }
......
#ifndef PLS_SCHEDULER_MEMORY_H
#define PLS_SCHEDULER_MEMORY_H
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "thread_state.h" #include "pls/internal/scheduling/thread_state.h"
#ifndef PLS_SCHEDULER_MEMORY_H
#define PLS_SCHEDULER_MEMORY_H
namespace pls { namespace pls {
namespace internal { namespace internal {
......
#ifndef PLS_TBB_LIKE_TASK_H #ifndef PLS_TASK_H
#define PLS_TBB_LIKE_TASK_H #define PLS_TASK_H
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/work_stealing_deque.h" #include "pls/internal/data_structures/work_stealing_deque.h"
#include "abstract_task.h" #include "pls/internal/scheduling/thread_state.h"
#include "thread_state.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -37,55 +36,32 @@ class task { ...@@ -37,55 +36,32 @@ class task {
template<typename T> template<typename T>
void spawn_child(T &sub_task); void spawn_child(T &sub_task);
void wait_for_all(); void wait_for_all();
private: private:
void execute(); void execute();
}; bool try_execute_local();
bool try_execute_stolen();
template<typename Function>
class fork_join_lambda_by_reference : public task {
const Function &function_;
public:
explicit fork_join_lambda_by_reference(const Function &function) : task{}, function_{function} {};
protected:
void execute_internal() override {
function_();
}
};
template<typename Function>
class fork_join_lambda_by_value : public task {
const Function function_;
public:
explicit fork_join_lambda_by_value(const Function &function) : task{}, function_{function} {};
protected:
void execute_internal() override {
function_();
}
}; };
template<typename T> template<typename T>
void task::spawn_child(T &sub_task) { void task::spawn_child(T &sub_task) {
PROFILE_FORK_JOIN_STEALING("spawn_child") PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<sub_task, T>::value, "Only pass task subclasses!"); static_assert(std::is_base_of<T, task>::value, "Only pass task subclasses!");
// Keep our refcount up to date // Keep our refcount up to date
ref_count_++; ref_count_++;
// Assign forced values // Assign forced values (for stack and parent management)
sub_task.parent_ = this; sub_task.parent_ = this;
sub_task.deque_state_ = scheduler::this_thread_state()->deque_.save_state(); sub_task.deque_state_ = thread_state::get()->deque_.save_state();
// Push on our deque // Push on our deque
const T const_task = sub_task; const T const_task = sub_task;
scheduler::this_thread_state()->deque_.push_tail(const_task); thread_state::get()->deque_.push_tail(const_task);
} }
} }
} }
} }
#endif //PLS_TBB_LIKE_TASK_H #endif //PLS_TASK_H
...@@ -4,9 +4,10 @@ ...@@ -4,9 +4,10 @@
#include <random> #include <random>
#include "pls/internal/base/thread.h"
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/deque.h" #include "pls/internal/data_structures/work_stealing_deque.h"
#include "pls/internal/scheduling/task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -14,13 +15,14 @@ namespace scheduling { ...@@ -14,13 +15,14 @@ namespace scheduling {
// forward declaration // forward declaration
class scheduler; class scheduler;
class task;
struct thread_state { struct thread_state {
alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_; alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_;
alignas(base::system_details::CACHE_LINE_SIZE) task *root_task_; alignas(base::system_details::CACHE_LINE_SIZE) task *root_task_;
alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_; alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_;
alignas(base::system_details::CACHE_LINE_SIZE) data_structures::deque<task> deque_; alignas(base::system_details::CACHE_LINE_SIZE) data_structures::work_stealing_deque<task> deque_;
alignas(base::system_details::CACHE_LINE_SIZE) size_t id_; alignas(base::system_details::CACHE_LINE_SIZE) size_t id_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
...@@ -50,11 +52,6 @@ struct thread_state { ...@@ -50,11 +52,6 @@ struct thread_state {
* @return The thread_state of this thread. * @return The thread_state of this thread.
*/ */
static thread_state *get() { return base::this_thread::state<thread_state>(); } static thread_state *get() { return base::this_thread::state<thread_state>(); }
static scheduler *scheduler() { return get()->scheduler_; }
static task *current_task() { return get()->current_task_; }
static void set_current_task(task *task) { get()->current_task_ = task; };
static data_structures::aligned_stack *task_stack() { return get()->task_stack_; }
static data_structures::deque<task> *deque() { return get()->deque_; }
}; };
} }
......
...@@ -19,12 +19,11 @@ using task_id = internal::scheduling::abstract_task::id; ...@@ -19,12 +19,11 @@ using task_id = internal::scheduling::abstract_task::id;
using unique_id = internal::helpers::unique_id; using unique_id = internal::helpers::unique_id;
using internal::scheduling::task; using internal::scheduling::task;
using internal::scheduling::fork_join_lambda_by_reference; using internal::scheduling::lambda_task_by_reference;
using internal::scheduling::fork_join_lambda_by_value; using internal::scheduling::lambda_task_by_value;
using internal::scheduling::task; using internal::scheduling::task;
using algorithm::invoke_parallel; using algorithm::invoke_parallel;
using algorithm::parallel_for_fork_join;
using algorithm::parallel_for; using algorithm::parallel_for;
} }
......
#include <mutex> #include <mutex>
#include "pls/internal/data_structures/deque.h" #include "pls/internal/data_structures/locking_deque.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
deque_item *deque_internal::pop_head_internal() { locking_deque_item *locking_deque_internal::pop_head_internal() {
std::lock_guard<base::spin_lock> lock{lock_}; std::lock_guard<base::spin_lock> lock{lock_};
if (head_ == nullptr) { if (head_ == nullptr) {
return nullptr; return nullptr;
} }
deque_item *result = head_; locking_deque_item *result = head_;
head_ = head_->next_; head_ = head_->next_;
if (head_ == nullptr) { if (head_ == nullptr) {
tail_ = nullptr; tail_ = nullptr;
...@@ -24,14 +24,14 @@ deque_item *deque_internal::pop_head_internal() { ...@@ -24,14 +24,14 @@ deque_item *deque_internal::pop_head_internal() {
return result; return result;
} }
deque_item *deque_internal::pop_tail_internal() { locking_deque_item *locking_deque_internal::pop_tail_internal() {
std::lock_guard<base::spin_lock> lock{lock_}; std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ == nullptr) { if (tail_ == nullptr) {
return nullptr; return nullptr;
} }
deque_item *result = tail_; locking_deque_item *result = tail_;
tail_ = tail_->prev_; tail_ = tail_->prev_;
if (tail_ == nullptr) { if (tail_ == nullptr) {
head_ = nullptr; head_ = nullptr;
...@@ -42,7 +42,7 @@ deque_item *deque_internal::pop_tail_internal() { ...@@ -42,7 +42,7 @@ deque_item *deque_internal::pop_tail_internal() {
return result; return result;
} }
void deque_internal::push_tail_internal(deque_item *new_item) { void locking_deque_internal::push_tail_internal(locking_deque_item *new_item) {
std::lock_guard<base::spin_lock> lock{lock_}; std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ != nullptr) { if (tail_ != nullptr) {
......
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
namespace pls { namespace pls {
...@@ -17,7 +20,7 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) : ...@@ -17,7 +20,7 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) :
for (unsigned int i = 0; i < num_threads_; i++) { for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized. // Placement new is required, as the memory of `memory_` is not required to be initialized.
new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i}; new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&worker_routine, new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&scheduler::worker_routine,
memory_->thread_state_for(i)); memory_->thread_state_for(i));
} }
} }
...@@ -26,7 +29,7 @@ scheduler::~scheduler() { ...@@ -26,7 +29,7 @@ scheduler::~scheduler() {
terminate(); terminate();
} }
void worker_routine() { void scheduler::worker_routine() {
auto my_state = base::this_thread::state<thread_state>(); auto my_state = base::this_thread::state<thread_state>();
while (true) { while (true) {
...@@ -59,6 +62,44 @@ void scheduler::terminate(bool wait_for_workers) { ...@@ -59,6 +62,44 @@ void scheduler::terminate(bool wait_for_workers) {
} }
} }
task *scheduler::get_local_task() {
PROFILE_STEALING("Get Local Task")
return thread_state::get()->deque_.pop_tail();
}
task *scheduler::steal_task() {
PROFILE_STEALING("Steal Task")
// Data for victim selection
const auto my_state = thread_state::get();
const auto my_id = my_state->id_;
const size_t offset = my_state->random_() % num_threads();
const size_t max_tries = num_threads(); // TODO: Tune this value
// Current strategy: random start, then round robin from there
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % num_threads();
// Skip our self for stealing
if (target == my_id) {
continue;
}
auto target_state = thread_state_for(target);
// TODO: See if we should re-try popping if it failed due to contention
auto result = target_state->deque_.pop_head();
if (result != nullptr) {
return result;
}
// TODO: See if we should backoff here (per missed steal)
}
// TODO: See if we should backoff here (after a 'round' of missed steals)
return nullptr;
}
} }
} }
} }
...@@ -36,73 +36,34 @@ void task::execute() { ...@@ -36,73 +36,34 @@ void task::execute() {
} }
} }
void task::wait_for_all() { bool task::try_execute_local() {
while (ref_count_ > 0) { task *local_task = thread_state::get()->scheduler_->get_local_task();
PROFILE_STEALING("get local sub task") if (local_task != nullptr) {
task *local_task = thread_state::get()->scheduler_->get_local_task(); local_task->execute();
PROFILE_END_BLOCK
if (local_task != nullptr) {
local_task->execute();
} else {
// Try to steal work.
// External steal will be executed implicitly if success
PROFILE_STEALING("steal work")
task *stolen_task = thread_state::get()->scheduler_->steal_task();
PROFILE_END_BLOCK
if (stolen_task != nullptr) {
stolen_task->execute();
}
}
}
tbb_task_->deque_.release_memory_until(deque_state_);
}
bool task::internal_stealing(abstract_task *other_task) {
PROFILE_STEALING("task::internal_stealin")
auto cast_other_task = reinterpret_cast<task *>(other_task);
auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
} else {
// Make sub-task belong to our task instance
stolen_sub_task->tbb_task_ = this;
stolen_sub_task->deque_state_ = deque_.save_state();
// We will execute this next without explicitly moving it onto our stack storage
last_stolen_ = stolen_sub_task;
return true; return true;
} else {
return false;
} }
} }
bool task::split_task(base::swmr_spin_lock *lock) { bool task::try_execute_stolen() {
PROFILE_STEALING("task::split_task") task *stolen_task = thread_state::get()->scheduler_->steal_task();
fork_join_sub_task *stolen_sub_task = get_stolen_sub_task(); if (stolen_task != nullptr) {
if (stolen_sub_task == nullptr) { stolen_task->deque_state_ = thread_state::get()->deque_.save_state();
return false; stolen_task->execute();
return true;
} }
task task{stolen_sub_task, this->unique_id()};
// In success case, unlock.
lock->reader_unlock();
scheduler::execute_task(task, depth()); return false;
return true;
} }
void task::execute() { void task::wait_for_all() {
PROFILE_WORK_BLOCK("execute task"); while (ref_count_ > 0) {
if (!try_execute_local()) {
// Bind this instance to our OS thread try_execute_stolen();
// TODO: See if we did this right }
// my_stack_ = base::this_thread::state<thread_state>()->task_stack_; }
deque_.reset_base_pointer(); thread_state::get()->deque_.release_memory_until(deque_state_);
root_task_->tbb_task_ = this;
root_task_->deque_state_ = deque_.save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
} }
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#include <pls/internal/base/system_details.h> #include <pls/internal/base/system_details.h>
#include <pls/internal/data_structures/aligned_stack.h> #include <pls/internal/data_structures/aligned_stack.h>
#include <pls/internal/data_structures/deque.h> #include <pls/internal/data_structures/locking_deque.h>
#include <pls/internal/data_structures/work_stealing_deque.h> #include <pls/internal/data_structures/work_stealing_deque.h>
#include <vector> #include <vector>
...@@ -77,11 +77,11 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a ...@@ -77,11 +77,11 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a
} }
TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]") { TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]") {
class my_item : public deque_item { class my_item : public locking_deque_item {
}; };
deque<my_item> deque; locking_deque<my_item> deque;
my_item one, two, three; my_item one, two, three;
SECTION("add and remove items form the tail") { SECTION("add and remove items form the tail") {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment