Commit 468aae43 by Florian Fritz

Merge branch 'pure_fork_join' into 'master'

Change to pure fork-join task model

See merge request !11
parents 64e2238c fc12ab9e
Pipeline #1257 passed with stages
in 3 minutes 42 seconds
...@@ -281,3 +281,104 @@ parallel_for, 512 heat array size): ...@@ -281,3 +281,104 @@ parallel_for, 512 heat array size):
We observe solid performance from our implementation. We observe solid performance from our implementation.
(Again, not very scientific test environment, but good enough for (Again, not very scientific test environment, but good enough for
our general direction) our general direction)
### Commit 3bdaba42 - Move to pure fork-join tasks (remove two level)
We moved away from our two-level scheduler approach towards a
pure fork-join task model (in order to remove any lock's in the
code more easily and to make further tests simpler/more focused
on one specific aspecs.
These are the measurements made after the change
(without any performance optimizations done):
FFT Average:
<img src="media/3bdaba42_fft_average.png" width="400"/>
Heat Diffusion Average:
<img src="media/3bdaba42_heat_average.png" width="400"/>
Matrix Multiplication Average:
<img src="media/3bdaba42_matrix_average.png" width="400"/>
Unbalanced Tree Search Average:
<img src="media/3bdaba42_unbalanced_average.png" width="400"/>
We note that in heat diffusion, matrix multiplication and unbalanced
tree search - all three benchmarks with mostly enough work avaliable at
all time - our implementation performs head on head with intel's
TBB. Only the FFT benchmark is a major problem four our library.
We notice a MAJOR drop in performance exactly at the hyperthreading
mark, indicating problems with limited resources due to the spinning
threads (threads without any actual work) and the threads actually
performing work. Most likely there is a resource on the same cache
line used that hinders the working threads, but we can not really
figure out which one it is.
### Commit be2cdbfe - Locking Deque
Switching to a locking deque has not improved (or even slightly hurt)
performance, we therefore think that the deque itself is not the
portion slowing down our execution.
### Commit 5044f0a1 - Performance Bottelneck in FFT FIXED
By moving from directly calling one of the parallel invocations
```c++
scheduler::spawn_child(sub_task_2);
function1(); // Execute first function 'inline' without spawning a sub_task object
```
to spawning two tasks
```c++
scheduler::spawn_child(sub_task_2);
scheduler::spawn_child(sub_task_1);
```
we where able to fix the bad performance of our framework in the
FFT benchmark (where there is a lot spinning/idling of some
worker threads).
We think this is due to some sort of cache misses/bus contemption
on the finishing counters. This would make sense, as the drop
at the hyperthreading mark indicates problems with this part of the
CPU pipeline (althought it did not show clearly in our profiling runs).
We will now try to find the exact spot where the problem originates and
fix the source rather then 'circumventing' it with these extra tasks.
(This then aigain, should hopefully even boost all other workloads
performance, as contemption on the bus/cache is always bad)
After some research we think that the issue is down to many threads
referencing the same atomic reference counter. We think so because
even cache aligning the shared refernce count does not fix the issue
when using the direct function call. Also, forcing a new method call
(going down in the call stack one function call) is not solving the
issue (thus making sure that it is not related with some caching issue
in the call itself).
In conclusion there seems to be a hyperthreading issue with this
shared reference count. We keep this in mind if we eventually get
tasks with changing data memebers (as this problem could reappear there,
as then the ref_count actualy is in the same memory region as our
'user variables'). For now we leave the code like it is.
FFT Average with new call method:
<img src="media/5044f0a1_fft_average.png" width="400"/>
The performance of our new call method looks shockingly similar
to TBB with a slight, constant performance drop behind it.
This makes sense, as the basic principle (lock-free, classic work
stealing deque and the parallel call structure) are nearly the same.
We will see if minor optimizations can even close this last gap.
Overall the performance at this point is good enough to move on
to implementing more functionality and to running tests on different
queues/stealing tactics etc.
...@@ -19,15 +19,14 @@ int count_child_nodes(uts::node &node) { ...@@ -19,15 +19,14 @@ int count_child_nodes(uts::node &node) {
return child_count; return child_count;
} }
auto current_task = pls::fork_join_sub_task::current();
std::vector<int> results(children.size()); std::vector<int> results(children.size());
for (size_t i = 0; i < children.size(); i++) { for (size_t i = 0; i < children.size(); i++) {
size_t index = i; size_t index = i;
auto lambda = [&, index] { results[index] = count_child_nodes(children[index]); }; auto lambda = [&, index] { results[index] = count_child_nodes(children[index]); };
pls::fork_join_lambda_by_value<typeof(lambda)> sub_task(lambda); pls::lambda_task_by_value<typeof(lambda)> sub_task(lambda);
current_task->spawn_child(sub_task); pls::scheduler::spawn_child(sub_task);
} }
current_task->wait_for_all(); pls::scheduler::wait_for_all();
for (auto result : results) { for (auto result : results) {
child_count += result; child_count += result;
} }
...@@ -36,43 +35,41 @@ int count_child_nodes(uts::node &node) { ...@@ -36,43 +35,41 @@ int count_child_nodes(uts::node &node) {
} }
int unbalanced_tree_search(int seed, int root_children, double q, int normal_children) { int unbalanced_tree_search(int seed, int root_children, double q, int normal_children) {
static auto id = pls::unique_id::create(42);
int result; int result;
auto lambda = [&] { auto lambda = [&] {
uts::node root(seed, root_children, q, normal_children); uts::node root(seed, root_children, q, normal_children);
result = count_child_nodes(root); result = count_child_nodes(root);
}; };
pls::fork_join_lambda_by_reference<typeof(lambda)> task(lambda); pls::lambda_task_by_reference<typeof(lambda)> sub_task(lambda);
pls::fork_join_lambda_by_reference<typeof(lambda)> sub_task(lambda); pls::scheduler::spawn_child(sub_task);
pls::fork_join_task root_task{&sub_task, id}; pls::scheduler::wait_for_all();
pls::scheduler::execute_task(root_task);
return result; return result;
} }
//
//int main() {
// PROFILE_ENABLE
// pls::internal::helpers::run_mini_benchmark([&] {
// unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
// }, 8, 4000);
//
// PROFILE_SAVE("test_profile.prof")
//}
int main() { int main() {
PROFILE_ENABLE PROFILE_ENABLE
pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18}; pls::internal::helpers::run_mini_benchmark([&] {
pls::scheduler scheduler{&my_scheduler_memory, 8}; unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
}, 8, 2000);
scheduler.perform_work([&] {
PROFILE_MAIN_THREAD
for (int i = 0; i < 50; i++) {
PROFILE_WORK_BLOCK("Top Level")
int result = unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
std::cout << result << std::endl;
}
});
PROFILE_SAVE("test_profile.prof") PROFILE_SAVE("test_profile.prof")
} }
//int main() {
// PROFILE_ENABLE
// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18};
// pls::scheduler scheduler{&my_scheduler_memory, 8};
//
// scheduler.perform_work([&] {
// PROFILE_MAIN_THREAD
// for (int i = 0; i < 50; i++) {
// PROFILE_WORK_BLOCK("Top Level")
// int result = unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
// std::cout << result << std::endl;
// }
// });
//
// PROFILE_SAVE("test_profile.prof")
//}
...@@ -91,7 +91,7 @@ int main() { ...@@ -91,7 +91,7 @@ int main() {
PROFILE_MAIN_THREAD PROFILE_MAIN_THREAD
// Call looks just the same, only requirement is // Call looks just the same, only requirement is
// the enclosure in the perform_work lambda. // the enclosure in the perform_work lambda.
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 10; i++) {
PROFILE_WORK_BLOCK("Top Level FFT") PROFILE_WORK_BLOCK("Top Level FFT")
complex_vector input = initial_input; complex_vector input = initial_input;
fft(input.begin(), input.size()); fft(input.begin(), input.size());
......
...@@ -7,8 +7,7 @@ ...@@ -7,8 +7,7 @@
#include <typeindex> #include <typeindex>
#include <tuple> #include <tuple>
#include <pls/internal/scheduling/root_task.h> #include <pls/pls.h>
#include <pls/internal/helpers/unique_id.h>
int main() { int main() {
......
...@@ -4,22 +4,22 @@ ...@@ -4,22 +4,22 @@
################################################################################# #################################################################################
# make sure a build type is set, default to release # make sure a build type is set, default to release
if(NOT CMAKE_BUILD_TYPE) if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Release) set(CMAKE_BUILD_TYPE Release)
endif() endif ()
message("-- Using Build Type: " ${CMAKE_BUILD_TYPE}) message("-- Using Build Type: " ${CMAKE_BUILD_TYPE})
# Enable optimizations in release builds # Enable optimizations in release builds
if(CMAKE_BUILD_TYPE STREQUAL "Release") if (CMAKE_BUILD_TYPE STREQUAL "Release")
# Link time optimization # Link time optimization
set(CMAKE_CXX_FLAGS "-Wall -Wextra") set(CMAKE_CXX_FLAGS "-Wall -Wextra")
# -O2 is often seen as 'the most speed', # -O2 is often seen as 'the most speed',
# but inlining functions and SIMD/Vectorization is # but inlining functions and SIMD/Vectorization is
# only enabled by -O3, thus it's way faster in some # only enabled by -O3, thus it's way faster in some
# array calculations. # array calculations.
set(CMAKE_CXX_FLAGS_RELEASE "-O3") set(CMAKE_CXX_FLAGS_RELEASE "-O3 -march=native")
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE) set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)
else() else ()
set(CMAKE_CXX_FLAGS_DEBUG "-g -O0") set(CMAKE_CXX_FLAGS_DEBUG "-g -O0")
endif() endif ()
\ No newline at end of file
...@@ -20,7 +20,7 @@ add_library(pls STATIC ...@@ -20,7 +20,7 @@ add_library(pls STATIC
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp include/pls/internal/data_structures/locking_deque.h
include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h
include/pls/internal/data_structures/stamped_integer.h include/pls/internal/data_structures/stamped_integer.h
...@@ -29,16 +29,12 @@ add_library(pls STATIC ...@@ -29,16 +29,12 @@ add_library(pls STATIC
include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/mini_benchmark.h
include/pls/internal/helpers/unique_id.h include/pls/internal/helpers/unique_id.h
include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp include/pls/internal/scheduling/thread_state.h
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/scheduling/parallel_iterator_task.h include/pls/internal/scheduling/parallel_iterator_task_impl.h include/pls/internal/scheduling/lambda_task.h include/pls/internal/data_structures/deque.h)
src/internal/scheduling/parallel_iterator_task.cpp)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#ifndef PLS_PARALLEL_INVOKE_H #ifndef PLS_PARALLEL_INVOKE_H
#define PLS_PARALLEL_INVOKE_H #define PLS_PARALLEL_INVOKE_H
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
namespace pls { namespace pls {
......
...@@ -2,70 +2,36 @@ ...@@ -2,70 +2,36 @@
#ifndef PLS_INVOKE_PARALLEL_IMPL_H #ifndef PLS_INVOKE_PARALLEL_IMPL_H
#define PLS_INVOKE_PARALLEL_IMPL_H #define PLS_INVOKE_PARALLEL_IMPL_H
#include <pls/internal/scheduling/fork_join_task.h> #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/lambda_task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/base/alignment.h"
namespace pls { namespace pls {
namespace algorithm { namespace algorithm {
namespace internal {
using namespace ::pls::internal::scheduling;
template<typename Body>
inline void run_body(const Body &internal_body, const abstract_task::id &id) {
// Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
internal_body();
} else {
fork_join_lambda_by_reference<Body> root_body(internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
template<typename Function1, typename Function2> template<typename Function1, typename Function2>
void invoke_parallel(const Function1 &function1, const Function2 &function2) { void invoke_parallel(const Function1 &function1, const Function2 &function2) {
using namespace ::pls::internal::scheduling; using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<Function1, Function2>();
auto internal_body = [&]() {
auto current_task = fork_join_sub_task::current();
auto sub_task_2 = fork_join_lambda_by_reference<Function2>(function2);
current_task->spawn_child(sub_task_2); auto sub_task_1 = lambda_task_by_reference<Function1>(function1);
function1(); // Execute first function 'inline' without spawning a sub_task object auto sub_task_2 = lambda_task_by_reference<Function2>(function2);
current_task->wait_for_all();
};
internal::run_body(internal_body, id); scheduler::spawn_child(sub_task_2);
scheduler::spawn_child_and_wait(sub_task_1);
} }
template<typename Function1, typename Function2, typename Function3> template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3) { void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3) {
using namespace ::pls::internal::scheduling; using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
static abstract_task::id id = unique_id::create<Function1, Function2, Function3>();
auto internal_body = [&]() {
auto current_task = fork_join_sub_task::current();
auto sub_task_2 = fork_join_lambda_by_reference<Function2>(function2);
auto sub_task_3 = fork_join_lambda_by_reference<Function3>(function3);
current_task->spawn_child(sub_task_2); auto sub_task_1 = lambda_task_by_reference<Function1>(function1);
current_task->spawn_child(sub_task_3); auto sub_task_2 = lambda_task_by_reference<Function2>(function2);
function1(); // Execute first function 'inline' without spawning a sub_task object auto sub_task_3 = lambda_task_by_reference<Function3>(function3);
current_task->wait_for_all();
};
internal::run_body(internal_body, id); scheduler::spawn_child(sub_task_3);
scheduler::spawn_child(sub_task_2);
scheduler::spawn_child_and_wait(sub_task_1);
} }
} }
......
...@@ -8,9 +8,6 @@ namespace algorithm { ...@@ -8,9 +8,6 @@ namespace algorithm {
template<typename RandomIt, typename Function> template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function); void parallel_for(RandomIt first, RandomIt last, const Function &function);
template<typename RandomIt, typename Function>
void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function);
} }
} }
#include "parallel_for_impl.h" #include "parallel_for_impl.h"
......
...@@ -2,21 +2,17 @@ ...@@ -2,21 +2,17 @@
#ifndef PLS_PARALLEL_FOR_IMPL_H #ifndef PLS_PARALLEL_FOR_IMPL_H
#define PLS_PARALLEL_FOR_IMPL_H #define PLS_PARALLEL_FOR_IMPL_H
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/parallel_iterator_task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
namespace pls { namespace pls {
namespace algorithm { namespace algorithm {
namespace internal {
template<typename RandomIt, typename Function> template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function) { void parallel_for(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling; using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
constexpr long min_elements = 4; constexpr long min_elements = 4;
long num_elements = std::distance(first, last); long num_elements = std::distance(first, last);
...@@ -29,39 +25,16 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) { ...@@ -29,39 +25,16 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) {
// Cut in half recursively // Cut in half recursively
long middle_index = num_elements / 2; long middle_index = num_elements / 2;
auto body = [=] { internal::parallel_for(first + middle_index, last, function); }; auto body2 = [=] { parallel_for(first + middle_index, last, function); };
fork_join_lambda_by_reference<decltype(body)> second_half_task(body); lambda_task_by_reference<decltype(body2)> second_half_task(body2);
fork_join_sub_task::current()->spawn_child(second_half_task); scheduler::spawn_child(second_half_task);
parallel_for(first, first + middle_index, function); auto body1 = [=] { parallel_for(first, first + middle_index, function); };
fork_join_sub_task::current()->wait_for_all(); lambda_task_by_reference<decltype(body1)> first_half_task(body1);
scheduler::spawn_child(first_half_task);
scheduler::wait_for_all();
} }
} }
}
template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<RandomIt, Function>();
parallel_iterator_task<RandomIt, Function> iterator_task{first, last, function, id};
scheduler::execute_task(iterator_task);
}
template<typename RandomIt, typename Function>
void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<RandomIt, Function>();
auto body = [=] { internal::parallel_for(first, last, function); };
fork_join_lambda_by_reference<decltype(body)> root_body(body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
} }
} }
......
#ifndef PLS_DEQUE_H #ifndef PLS_DEQUE_H_
#define PLS_DEQUE_H #define PLS_DEQUE_H_
#include "pls/internal/base/spin_lock.h" #include "work_stealing_deque.h"
#include "locking_deque.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
/**
* Turns any object into deque item when inheriting from this.
*/
class deque_item {
friend class deque_internal;
deque_item *prev_;
deque_item *next_;
};
class deque_internal {
protected:
deque_item *head_;
deque_item *tail_;
base::spin_lock lock_;
deque_item *pop_head_internal();
deque_item *pop_tail_internal();
void push_tail_internal(deque_item *new_item);
};
/**
* A double linked list based deque.
* Storage is therefore only needed for the individual items.
*
* @tparam Item The type of items stored in this deque
*/
template<typename Item> template<typename Item>
class deque : deque_internal { using deque = work_stealing_deque<Item>;
public:
explicit deque() : deque_internal{} {}
inline Item *pop_head() {
return static_cast<Item *>(pop_head_internal());
}
inline Item *pop_tail() {
return static_cast<Item *>(pop_tail_internal());
}
inline void push_tail(Item *new_item) {
push_tail_internal(new_item);
}
};
} }
} }
} }
#endif //PLS_DEQUE_H #endif //PLS_DEQUE_H_
#ifndef PLS_LOCKING_DEQUE_H
#define PLS_LOCKING_DEQUE_H
#include <mutex>
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/data_structures/aligned_stack.h"
namespace pls {
namespace internal {
namespace data_structures {
/**
* Wraps any object into a deque item.
*/
template<typename Item>
struct locking_deque_item {
Item *item_;
locking_deque_item *prev_;
locking_deque_item *next_;
};
template<typename Item, typename Content>
struct locking_deque_container : public locking_deque_item<Item> {
Content content_;
public:
explicit locking_deque_container(const Content &content_) : content_{content_} {}
};
/**
* A double linked list based deque.
* Storage is therefore only needed for the individual items.
*
* @tparam Item The type of items stored in this deque
*/
template<typename Item>
class locking_deque {
aligned_stack *stack_;
locking_deque_item<Item> *head_;
locking_deque_item<Item> *tail_;
base::spin_lock lock_;
public:
using state = aligned_stack::state;
explicit locking_deque(aligned_stack *stack)
: stack_{stack}, head_{nullptr}, tail_{nullptr}, lock_{} {}
template<typename T>
T *push_tail(const T &new_item) {
static_assert(std::is_same<Item, T>::value || std::is_base_of<Item, T>::value,
"Must only push types of <Item> onto work_stealing_deque<Item>");
std::lock_guard<base::spin_lock> lock{lock_};
auto deque_item = stack_->push(locking_deque_container<Item, T>{new_item});
deque_item->item_ = &deque_item->content_;
if (tail_ != nullptr) {
tail_->next_ = deque_item;
} else {
head_ = deque_item;
}
deque_item->prev_ = tail_;
deque_item->next_ = nullptr;
tail_ = deque_item;
return &deque_item->content_;
}
Item *pop_tail() {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ == nullptr) {
return nullptr;
}
auto result = tail_;
tail_ = tail_->prev_;
if (tail_ == nullptr) {
head_ = nullptr;
} else {
tail_->next_ = nullptr;
}
return result->item_;
}
Item *pop_head() {
std::lock_guard<base::spin_lock> lock{lock_};
if (head_ == nullptr) {
return nullptr;
}
auto result = head_;
head_ = head_->next_;
if (head_ == nullptr) {
tail_ = nullptr;
} else {
head_->prev_ = nullptr;
}
return result->item_;
}
void release_memory_until(state state) {
stack_->reset_state(state);
}
state save_state() {
return stack_->save_state();
}
};
}
}
}
#endif //PLS_LOCKING_DEQUE_H
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/thread_state.h"
#include "aligned_stack.h" #include "aligned_stack.h"
...@@ -22,10 +21,15 @@ using offset_t = stamped_integer::member_t; ...@@ -22,10 +21,15 @@ using offset_t = stamped_integer::member_t;
// Single Item in the deque // Single Item in the deque
class work_stealing_deque_item { class work_stealing_deque_item {
// TODO: In our opinion these atomic's are a pure formality to make the thread sanitizer happy,
// as the race occurs in 'pop_head', where ALL CASES reading a corrupt/old value are cases
// where the next CAS fails anywas, thus making these corrupted values have no influence on
// the overall program execution.
// ==> If we find performance problems in this queue, try removing the atoimcs again.
// Pointer to the actual data // Pointer to the actual data
pointer_t data_; std::atomic<pointer_t> data_;
// Index (relative to stack base) to the next and previous element // Index (relative to stack base) to the next and previous element
offset_t next_item_; std::atomic<offset_t> next_item_;
offset_t previous_item_; offset_t previous_item_;
public: public:
...@@ -33,7 +37,7 @@ class work_stealing_deque_item { ...@@ -33,7 +37,7 @@ class work_stealing_deque_item {
template<typename Item> template<typename Item>
Item *data() { Item *data() {
return reinterpret_cast<Item *>(data_); return reinterpret_cast<Item *>(data_.load());
} }
template<typename Item> template<typename Item>
...@@ -41,7 +45,7 @@ class work_stealing_deque_item { ...@@ -41,7 +45,7 @@ class work_stealing_deque_item {
data_ = reinterpret_cast<pointer_t >(data); data_ = reinterpret_cast<pointer_t >(data);
} }
offset_t next_item() const { return next_item_; } offset_t next_item() const { return next_item_.load(); }
void set_next_item(offset_t next_item) { next_item_ = next_item; } void set_next_item(offset_t next_item) { next_item_ = next_item; }
offset_t previous_item() const { return previous_item_; } offset_t previous_item() const { return previous_item_; }
...@@ -77,13 +81,6 @@ class work_stealing_deque { ...@@ -77,13 +81,6 @@ class work_stealing_deque {
tail_{other.tail_.load()}, tail_{other.tail_.load()},
previous_tail_{other.previous_tail_} {} previous_tail_{other.previous_tail_} {}
void reset_base_pointer();
work_stealing_deque_item *item_at(offset_t offset);
offset_t current_stack_offset();
template<typename T>
std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item);
template<typename T> template<typename T>
T *push_tail(const T &new_item); T *push_tail(const T &new_item);
Item *pop_tail(); Item *pop_tail();
...@@ -91,6 +88,14 @@ class work_stealing_deque { ...@@ -91,6 +88,14 @@ class work_stealing_deque {
void release_memory_until(state state); void release_memory_until(state state);
state save_state(); state save_state();
private:
void reset_base_pointer();
work_stealing_deque_item *item_at(offset_t offset);
offset_t current_stack_offset();
template<typename T>
std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item);
}; };
} }
......
#ifndef PLS_ABSTRACT_TASK_H
#define PLS_ABSTRACT_TASK_H
#include "pls/internal/base/swmr_spin_lock.h"
#include "pls/internal/helpers/unique_id.h"
namespace pls {
namespace internal {
namespace scheduling {
class abstract_task {
public:
using id = helpers::unique_id;
private:
unsigned int depth_;
abstract_task::id unique_id_;
abstract_task *volatile child_task_;
public:
abstract_task(const unsigned int depth, const abstract_task::id &unique_id) :
depth_{depth},
unique_id_{unique_id},
child_task_{nullptr} {}
virtual void execute() = 0;
void set_child(abstract_task *child_task) { child_task_ = child_task; }
abstract_task *child() const { return child_task_; }
void set_depth(unsigned int depth) { depth_ = depth; }
unsigned int depth() const { return depth_; }
id unique_id() const { return unique_id_; }
protected:
virtual bool internal_stealing(abstract_task *other_task) = 0;
virtual bool split_task(base::swmr_spin_lock *lock) = 0;
bool steal_work();
};
}
}
}
#endif //PLS_ABSTRACT_TASK_H
#ifndef PLS_TBB_LIKE_TASK_H
#define PLS_TBB_LIKE_TASK_H
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/work_stealing_deque.h"
#include "abstract_task.h"
#include "thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
class fork_join_task;
class fork_join_sub_task {
friend class fork_join_task;
// Coordinate finishing of sub_tasks
std::atomic_uint32_t ref_count_;
fork_join_sub_task *parent_;
// Access to TBB scheduling environment
fork_join_task *tbb_task_;
bool executed = false;
int executed_at = -1;
// Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::work_stealing_deque<fork_join_sub_task>::state deque_state_;
protected:
explicit fork_join_sub_task();
fork_join_sub_task(const fork_join_sub_task &other);
// Overwritten with behaviour of child tasks
virtual void execute_internal() = 0;
public:
// Only use them when actually executing this sub_task (only public for simpler API design)
template<typename T>
void spawn_child(T &sub_task);
void wait_for_all();
static fork_join_sub_task *current();
private:
void execute();
};
template<typename Function>
class fork_join_lambda_by_reference : public fork_join_sub_task {
const Function &function_;
public:
explicit fork_join_lambda_by_reference(const Function &function) : fork_join_sub_task{}, function_{function} {};
protected:
void execute_internal() override {
function_();
}
};
template<typename Function>
class fork_join_lambda_by_value : public fork_join_sub_task {
const Function function_;
public:
explicit fork_join_lambda_by_value(const Function &function) : fork_join_sub_task{}, function_{function} {};
protected:
void execute_internal() override {
function_();
}
};
class fork_join_task : public abstract_task {
friend class fork_join_sub_task;
fork_join_sub_task *root_task_;
fork_join_sub_task *currently_executing_;
// Double-Ended Queue management
data_structures::work_stealing_deque<fork_join_sub_task> deque_;
// Steal Management
fork_join_sub_task *last_stolen_;
fork_join_sub_task *get_local_sub_task();
fork_join_sub_task *get_stolen_sub_task();
bool internal_stealing(abstract_task *other_task) override;
bool split_task(base::swmr_spin_lock * /*lock*/) override;
public:
explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id);
void execute() override;
fork_join_sub_task *currently_executing() const;
};
template<typename T>
void fork_join_sub_task::spawn_child(T &task) {
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<fork_join_sub_task, T>::value, "Only pass fork_join_sub_task subclasses!");
// Keep our refcount up to date
ref_count_++;
// Assign forced values
task.parent_ = this;
task.tbb_task_ = tbb_task_;
task.deque_state_ = tbb_task_->deque_.save_state();
// Push on our deque
const T const_task = task;
tbb_task_->deque_.push_tail(const_task);
}
}
}
}
#endif //PLS_TBB_LIKE_TASK_H
#ifndef PLS_LAMBDA_TASK_H_
#define PLS_LAMBDA_TASK_H_
#include "pls/internal/scheduling/task.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class lambda_task_by_reference : public task {
const Function &function_;
public:
explicit lambda_task_by_reference(const Function &function) : task{}, function_{function} {};
protected:
void execute_internal() override {
function_();
}
};
template<typename Function>
class lambda_task_by_value : public task {
const Function function_;
public:
explicit lambda_task_by_value(const Function &function) : task{}, function_{function} {};
protected:
void execute_internal() override {
function_();
}
};
}
}
}
#endif //PLS_LAMBDA_TASK_H_
#ifndef PLS_PARALLEL_ITERATOR_TASK_H
#define PLS_PARALLEL_ITERATOR_TASK_H
#include "pls/internal/data_structures/stamped_integer.h"
#include "abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
using data_structures::stamped_integer;
template<typename RandomIt, typename Function>
class parallel_iterator_task : public abstract_task {
alignas(64) const int step = 8;
alignas(64) RandomIt first_, last_;
alignas(64) Function function_;
// External stealing
alignas(64) std::atomic<size_t> first_index_;
alignas(64) std::atomic<size_t> to_be_processed_;
alignas(64) std::atomic<stamped_integer> last_index_;
alignas(64) parallel_iterator_task *parent_;
bool steal_front(size_t &stolen_max_index);
bool steal_back(size_t &stolen_first_index, size_t &stolen_last_index);
protected:
bool internal_stealing(abstract_task *other_task) override;
bool split_task(base::swmr_spin_lock * /*lock*/) override;
public:
explicit parallel_iterator_task(RandomIt first, RandomIt last, Function function, const abstract_task::id &id);
parallel_iterator_task(const parallel_iterator_task &other);
void execute() override;
};
}
}
}
#include "parallel_iterator_task_impl.h"
#endif //PLS_PARALLEL_ITERATOR_TASK_H
#ifndef PLS_PARALLEL_ITERATOR_TASK_IMPL_H
#define PLS_PARALLEL_ITERATOR_TASK_IMPL_H
#include "scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename RandomIt, typename Function>
parallel_iterator_task<RandomIt, Function>::parallel_iterator_task
(RandomIt first, RandomIt last, Function function, const abstract_task::id &id):
abstract_task(0, id),
first_{first},
last_{last},
function_{function},
first_index_{0},
to_be_processed_{std::distance(first, last)},
last_index_{stamped_integer{0, std::distance(first, last)}},
parent_{nullptr} {}
template<typename RandomIt, typename Function>
parallel_iterator_task<RandomIt,
Function>::parallel_iterator_task(const pls::internal::scheduling::parallel_iterator_task<
RandomIt,
Function> &other):
abstract_task{other.depth(), other.unique_id()},
first_{other.first_},
last_{other.last_},
function_{other.function_},
first_index_{other.first_index_.load()},
to_be_processed_{other.to_be_processed_.load()},
last_index_{other.last_index_.load()},
parent_{other.parent_} {}
template<typename RandomIt, typename Function>
void parallel_iterator_task<RandomIt, Function>::execute() {
// Start processing at beginning of our data
size_t current_index = 0;
auto current_iterator = first_;
// Keep going as long as we have data
while (true) {
// Claim next chunk of data for us
size_t local_max_index;
if (!steal_front(local_max_index)) {
break;
}
// Process Chunk
for (; current_index != local_max_index; current_index++) {
function_(*(current_iterator++));
}
}
to_be_processed_ -= current_index;
while (to_be_processed_.load() > 0)
steal_work();
if (parent_ != nullptr) {
parent_->to_be_processed_ -= std::distance(first_, last_);
}
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::steal_front(size_t &stolen_max) {
auto local_first_index = first_index_.load();
auto local_last_index = last_index_.load();
if (local_first_index >= local_last_index.value) {
return false;
}
// Proceed the first index == take part of the work for us
auto new_first_index = std::min(local_first_index + step, local_last_index.value);
first_index_ = new_first_index;
// Reload last index
local_last_index = last_index_.load();
// Enough distance
if (new_first_index < local_last_index.value) {
stolen_max = new_first_index;
return true;
}
// Fight over last element
if (new_first_index == local_last_index.value) {
auto new_last_index = stamped_integer{local_last_index.stamp + 1, local_last_index.value};
if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) {
stolen_max = new_first_index;
return true;
}
}
// All iterator elements are assigned to some executor
return false;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::steal_back(size_t &stolen_first_index, size_t &stolen_last_index) {
auto local_first_index = first_index_.load();
auto local_last_index = last_index_.load();
if (local_first_index >= local_last_index.value) {
return false;
}
// Try to steal using cas
auto target_last_index = std::max(local_last_index.value - step, local_first_index);
auto new_last_index = stamped_integer{local_last_index.stamp + 1, target_last_index};
if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) {
stolen_first_index = new_last_index.value;
stolen_last_index = local_last_index.value;
return true;
}
return false;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::split_task(base::swmr_spin_lock *lock) {
auto depth = this->depth();
auto id = this->unique_id();
size_t stolen_first_index, stolen_last_index;
if (!steal_back(stolen_first_index, stolen_last_index)) {
lock->reader_unlock();
return false;
}
lock->reader_unlock();
parallel_iterator_task new_task{first_ + stolen_first_index, first_ + stolen_last_index, function_, id};
new_task.parent_ = this;
scheduler::execute_task(new_task, depth);
return true;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::internal_stealing(abstract_task */*other_task*/) {
// Do not allow for now, eases up on ABA problem
return false;
}
}
}
}
#endif //PLS_PARALLEL_ITERATOR_TASK_IMPL_H
#ifndef PLS_ROOT_MASTER_TASK_H
#define PLS_ROOT_MASTER_TASK_H
#include <mutex>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/swmr_spin_lock.h"
#include "abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class root_task : public abstract_task {
Function function_;
std::atomic_uint8_t finished_;
public:
static constexpr auto create_id = helpers::unique_id::create<root_task<Function>>;
explicit root_task(Function function) :
abstract_task{0, create_id()},
function_{function},
finished_{0} {}
root_task(const root_task &other) :
abstract_task{0, create_id()},
function_{other.function_},
finished_{0} {}
bool finished() {
return finished_;
}
void execute() override {
PROFILE_WORK_BLOCK("execute root_task");
function_();
finished_ = 1;
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false;
}
};
template<typename Function>
class root_worker_task : public abstract_task {
root_task<Function> *master_task_;
public:
static constexpr auto create_id = root_task<Function>::create_id;
explicit root_worker_task(root_task<Function> *master_task) :
abstract_task{0, create_id()},
master_task_{master_task} {}
void execute() override {
PROFILE_WORK_BLOCK("execute root_task");
do {
steal_work();
} while (!master_task_->finished());
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false;
}
};
}
}
}
#endif //PLS_ROOT_MASTER_TASK_H
#ifndef PLS_RUN_ON_N_THREADS_TASK_H
#define PLS_RUN_ON_N_THREADS_TASK_H
#include <mutex>
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/thread.h"
#include "abstract_task.h"
#include "thread_state.h"
#include "scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class run_on_n_threads_task : public abstract_task {
template<typename F>
friend
class run_on_n_threads_task_worker;
Function function_;
// Improvement: Remove lock and replace by atomic variable (performance)
int counter;
base::spin_lock counter_lock_;
int decrement_counter() {
std::lock_guard<base::spin_lock> lock{counter_lock_};
counter--;
return counter;
}
int get_counter() {
std::lock_guard<base::spin_lock> lock{counter_lock_};
return counter;
}
public:
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task<Function>>;
run_on_n_threads_task(Function function, int num_threads) :
abstract_task{0, create_id()},
function_{function},
counter{num_threads - 1} {}
void execute() override {
// Execute our function ONCE
function_();
// Steal until we are finished (other threads executed)
do {
steal_work();
} while (get_counter() > 0);
std::cout << "Finished Master!" << std::endl;
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::swmr_spin_lock *lock) override;
};
template<typename Function>
class run_on_n_threads_task_worker : public abstract_task {
Function function_;
run_on_n_threads_task<Function> *root_;
public:
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task_worker<Function>>;
run_on_n_threads_task_worker(Function function, run_on_n_threads_task<Function> *root) :
abstract_task{0, create_id()},
function_{function},
root_{root} {}
void execute() override {
if (root_->decrement_counter() >= 0) {
function_();
std::cout << "Finished Worker!" << std::endl;
} else {
std::cout << "Abandoned Worker!" << std::endl;
}
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false;
}
};
template<typename Function>
bool run_on_n_threads_task<Function>::split_task(base::swmr_spin_lock *lock) {
if (get_counter() <= 0) {
return false;
}
// In success case, unlock.
lock->reader_unlock();
auto scheduler = base::this_thread::state<thread_state>()->scheduler_;
auto task = run_on_n_threads_task_worker<Function>{function_, this};
scheduler->execute_task(task, depth());
return true;
}
template<typename Function>
run_on_n_threads_task<Function> create_run_on_n_threads_task(Function function, int num_threads) {
return run_on_n_threads_task<Function>{function, num_threads};
}
}
}
}
#endif //PLS_RUN_ON_N_THREADS_TASK_H
...@@ -12,32 +12,55 @@ ...@@ -12,32 +12,55 @@
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h" #include "pls/internal/base/barrier.h"
#include "thread_state.h" #include "pls/internal/scheduling/scheduler_memory.h"
#include "root_task.h" #include "pls/internal/scheduling/thread_state.h"
#include "scheduler_memory.h" #include "pls/internal/scheduling/task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>; using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
/**
* The scheduler is the central part of the dispatching-framework.
* It manages a pool of worker threads (creates, sleeps/wakes up, destroys)
* and allows to execute parallel sections.
*
* It works in close rellation with the 'task' class for scheduling.
*/
class scheduler { class scheduler {
friend void worker_routine(); friend class task;
const unsigned int num_threads_; const unsigned int num_threads_;
scheduler_memory *memory_; scheduler_memory *memory_;
base::barrier sync_barrier_; base::barrier sync_barrier_;
task *main_thread_root_task_;
std::atomic<bool> work_section_done_;
bool terminated_; bool terminated_;
public: public:
/**
* Initializes a scheduler instance with the given number of threads.
* This will spawn the threads and put them to sleep, ready to process an
* upcoming parallel section.
*
* @param memory All memory is allocated statically, thus the user is required to provide the memory instance.
* @param num_threads The number of worker threads to be created.
*/
explicit scheduler(scheduler_memory *memory, unsigned int num_threads); explicit scheduler(scheduler_memory *memory, unsigned int num_threads);
/**
* The scheduler is implicitly terminated as soon as it leaves the scope.
*/
~scheduler(); ~scheduler();
/** /**
* Wakes up the thread pool. * Wakes up the thread pool.
* Code inside the Function lambda can invoke all parallel APIs. * Code inside the Function lambda can invoke all parallel APIs.
* This is meant to cleanly sleep and wake up the scheduler during an application run,
* e.g. to run parallel code on a timer loop/after interrupts.
* *
* @param work_section generic function or lambda to be executed in the scheduler's context. * @param work_section generic function or lambda to be executed in the scheduler's context.
*/ */
...@@ -45,20 +68,46 @@ class scheduler { ...@@ -45,20 +68,46 @@ class scheduler {
void perform_work(Function work_section); void perform_work(Function work_section);
/** /**
* Executes a top-level-task (children of abstract_task) on this thread. * Explicitly terminate the worker threads. Scheduler must not be used after this.
* *
* @param task The task to be executed. * @param wait_for_workers Set to true if you wish to return from this method only after the workers are shut down.
* @param depth Optional: depth of the new task, otherwise set implicitly.
*/ */
template<typename Task> void terminate(bool wait_for_workers = true);
static void execute_task(Task &task, int depth = -1);
static abstract_task *current_task() { return base::this_thread::state<thread_state>()->current_task_; } /**
* Helper to spawn a child on the currently running task.
*
* @tparam T type of the new task
* @param sub_task the new task to be spawned
*/
template<typename T>
static void spawn_child(T &sub_task);
void terminate(bool wait_for_workers = true); /**
* Helper to spawn a child on the currently running task and waiting for it (skipping over the task-deque).
*
* @tparam T type of the new task
* @param sub_task the new task to be spawned
*/
template<typename T>
static void spawn_child_and_wait(T &sub_task);
/**
* Helper to wait for all children of the currently executing task.
*/
static void wait_for_all();
unsigned int num_threads() const { return num_threads_; } unsigned int num_threads() const { return num_threads_; }
thread_state *thread_state_for(size_t id) { return memory_->thread_state_for(id); }
private:
static void worker_routine();
thread_state *thread_state_for(size_t id);
task *get_local_task();
task *steal_task();
bool try_execute_local();
bool try_execute_stolen();
}; };
} }
......
...@@ -2,71 +2,41 @@ ...@@ -2,71 +2,41 @@
#ifndef PLS_SCHEDULER_IMPL_H #ifndef PLS_SCHEDULER_IMPL_H
#define PLS_SCHEDULER_IMPL_H #define PLS_SCHEDULER_IMPL_H
#include "pls/internal/scheduling/lambda_task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
// TODO: generally look into the performance implications of using many thread_state::get() calls
template<typename Function> template<typename Function>
void scheduler::perform_work(Function work_section) { void scheduler::perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work") PROFILE_WORK_BLOCK("scheduler::perform_work")
root_task<Function> master{work_section};
// Push root task on stacks // if (execute_main_thread) {
auto new_master = memory_->task_stack_for(0)->push(master); // work_section();
memory_->thread_state_for(0)->root_task_ = new_master; //
memory_->thread_state_for(0)->current_task_ = new_master; // sync_barrier_.wait(); // Trigger threads to wake up
for (unsigned int i = 1; i < num_threads_; i++) { // sync_barrier_.wait(); // Wait for threads to finish
root_worker_task<Function> worker{new_master}; // } else {
auto new_worker = memory_->task_stack_for(0)->push(worker); lambda_task_by_reference<Function> root_task{work_section};
memory_->thread_state_for(i)->root_task_ = new_worker; main_thread_root_task_ = &root_task;
memory_->thread_state_for(i)->current_task_ = new_worker; work_section_done_ = false;
}
// Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish sync_barrier_.wait(); // Wait for threads to finish
// }
// Clean up stack
memory_->task_stack_for(0)->pop<typeof(master)>();
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
memory_->task_stack_for(0)->pop<typeof(worker)>();
}
} }
template<typename Task> template<typename T>
void scheduler::execute_task(Task &task, int depth) { void scheduler::spawn_child(T &sub_task) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!"); thread_state::get()->current_task_->spawn_child(sub_task);
}
auto my_state = base::this_thread::state<thread_state>();
abstract_task *old_task;
abstract_task *new_task;
// Init Task
old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
{
my_state->lock_.writer_lock();
my_state->current_task_ = new_task;
old_task->set_child(new_task);
my_state->lock_.writer_unlock();
}
// Run Task
new_task->execute();
// Teardown state back to before the task was executed
my_state->task_stack_->pop<Task>();
{ template<typename T>
my_state->lock_.writer_lock(); void scheduler::spawn_child_and_wait(T &sub_task) {
old_task->set_child(nullptr); thread_state::get()->current_task_->spawn_child_and_wait(sub_task);
my_state->current_task_ = old_task;
my_state->lock_.writer_unlock();
}
} }
} }
......
#ifndef PLS_SCHEDULER_MEMORY_H
#define PLS_SCHEDULER_MEMORY_H
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "thread_state.h" #include "pls/internal/scheduling/thread_state.h"
#ifndef PLS_SCHEDULER_MEMORY_H
#define PLS_SCHEDULER_MEMORY_H
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -14,11 +14,36 @@ void worker_routine(); ...@@ -14,11 +14,36 @@ void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>; using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler_memory { class scheduler_memory {
private:
size_t max_threads_;
thread_state **thread_states_;
scheduler_thread **threads_;
data_structures::aligned_stack **task_stacks_;
protected:
void init(size_t max_therads,
thread_state **thread_states,
scheduler_thread **threads,
data_structures::aligned_stack **task_stacks) {
max_threads_ = max_therads;
thread_states_ = thread_states;
threads_ = threads;
task_stacks_ = task_stacks;
}
public: public:
virtual size_t max_threads() const = 0; size_t max_threads() const {
virtual thread_state *thread_state_for(size_t id) = 0; return max_threads_;
virtual scheduler_thread *thread_for(size_t id) = 0; }
virtual data_structures::aligned_stack *task_stack_for(size_t id) = 0; thread_state *thread_state_for(size_t id) const {
return thread_states_[id];
}
scheduler_thread *thread_for(size_t id) const {
return threads_[id];
}
data_structures::aligned_stack *task_stack_for(size_t id) const {
return task_stacks_[id];
}
}; };
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE> template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
...@@ -31,23 +56,30 @@ class static_scheduler_memory : public scheduler_memory { ...@@ -31,23 +56,30 @@ class static_scheduler_memory : public scheduler_memory {
using aligned_thread_stack = base::alignment::aligned_wrapper<std::array<char, TASK_STACK_SIZE>>; using aligned_thread_stack = base::alignment::aligned_wrapper<std::array<char, TASK_STACK_SIZE>>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>; using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
// Actual Memory
std::array<aligned_thread, MAX_THREADS> threads_; std::array<aligned_thread, MAX_THREADS> threads_;
std::array<aligned_thread_state, MAX_THREADS> thread_states_; std::array<aligned_thread_state, MAX_THREADS> thread_states_;
std::array<aligned_thread_stack, MAX_THREADS> task_stacks_memory_; std::array<aligned_thread_stack, MAX_THREADS> task_stacks_memory_;
std::array<aligned_aligned_stack, MAX_THREADS> task_stacks_; std::array<aligned_aligned_stack, MAX_THREADS> task_stacks_;
// References for parent
std::array<scheduler_thread *, MAX_THREADS> thread_refs_;
std::array<thread_state *, MAX_THREADS> thread_state_refs_;
std::array<data_structures::aligned_stack *, MAX_THREADS> task_stack_refs_;
public: public:
static_scheduler_memory() { static_scheduler_memory() : scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) { for (size_t i = 0; i < MAX_THREADS; i++) {
new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(), new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(),
TASK_STACK_SIZE); TASK_STACK_SIZE);
thread_refs_[i] = threads_[i].pointer();
thread_state_refs_[i] = thread_states_[i].pointer();
task_stack_refs_[i] = task_stacks_[i].pointer();
} }
}
size_t max_threads() const override { return MAX_THREADS; } init(MAX_THREADS, thread_state_refs_.data(), thread_refs_.data(), task_stack_refs_.data());
thread_state *thread_state_for(size_t id) override { return thread_states_[id].pointer(); } }
scheduler_thread *thread_for(size_t id) override { return threads_[id].pointer(); }
data_structures::aligned_stack *task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
}; };
class malloc_scheduler_memory : public scheduler_memory { class malloc_scheduler_memory : public scheduler_memory {
...@@ -60,18 +92,20 @@ class malloc_scheduler_memory : public scheduler_memory { ...@@ -60,18 +92,20 @@ class malloc_scheduler_memory : public scheduler_memory {
const size_t num_threads_; const size_t num_threads_;
// Actual Memory
aligned_thread *threads_; aligned_thread *threads_;
aligned_thread_state *thread_states_; aligned_thread_state *thread_states_;
char **task_stacks_memory_; char **task_stacks_memory_;
aligned_aligned_stack *task_stacks_; aligned_aligned_stack *task_stacks_;
// References for parent
scheduler_thread **thread_refs_;
thread_state **thread_state_refs_;
data_structures::aligned_stack **task_stack_refs_;
public: public:
explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16); explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16);
~malloc_scheduler_memory(); ~malloc_scheduler_memory();
size_t max_threads() const override { return num_threads_; }
thread_state *thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
scheduler_thread *thread_for(size_t id) override { return threads_[id].pointer(); }
data_structures::aligned_stack *task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
}; };
} }
......
#ifndef PLS_TASK_H
#define PLS_TASK_H
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/deque.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
class task {
friend class scheduler;
// Coordinate finishing of sub_tasks
std::atomic<unsigned int> ref_count_;
task *parent_;
// Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::deque<task>::state deque_state_;
protected:
// TODO: Double Check with copy and move constructors, try to minimize overhead while keeping a clean API.
explicit task();
task(const task &other);
/**
* Overwrite this with the actual behaviour of concrete tasks.
*/
virtual void execute_internal() = 0;
template<typename T>
void spawn_child(T &&sub_task);
template<typename T>
void spawn_child_and_wait(T &&sub_task);
void wait_for_all();
private:
void execute();
};
template<typename T>
void task::spawn_child(T &&sub_task) {
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<task, typename std::remove_reference<T>::type>::value, "Only pass task subclasses!");
// Keep our refcount up to date
ref_count_++;
// Assign forced values (for stack and parent management)
sub_task.parent_ = this;
sub_task.deque_state_ = thread_state::get()->deque_.save_state();
// Push on our deque
const T const_task = sub_task;
thread_state::get()->deque_.push_tail(const_task);
}
template<typename T>
void task::spawn_child_and_wait(T &&sub_task) {
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<task, typename std::remove_reference<T>::type>::value, "Only pass task subclasses!");
// Assign forced values (for stack and parent management)
sub_task.parent_ = nullptr;
sub_task.deque_state_ = thread_state::get()->deque_.save_state();
sub_task.execute();
wait_for_all();
}
}
}
}
#endif //PLS_TASK_H
...@@ -4,9 +4,10 @@ ...@@ -4,9 +4,10 @@
#include <random> #include <random>
#include "pls/internal/base/thread.h"
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/swmr_spin_lock.h" #include "pls/internal/data_structures/deque.h"
#include "abstract_task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -14,33 +15,40 @@ namespace scheduling { ...@@ -14,33 +15,40 @@ namespace scheduling {
// forward declaration // forward declaration
class scheduler; class scheduler;
class task;
struct thread_state { struct thread_state {
alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_; alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_;
alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *root_task_; alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_; alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_;
alignas(base::system_details::CACHE_LINE_SIZE) data_structures::deque<task> deque_;
alignas(base::system_details::CACHE_LINE_SIZE) size_t id_; alignas(base::system_details::CACHE_LINE_SIZE) size_t id_;
alignas(base::system_details::CACHE_LINE_SIZE) base::swmr_spin_lock lock_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
thread_state() : thread_state() :
scheduler_{nullptr}, scheduler_{nullptr},
root_task_{nullptr},
current_task_{nullptr}, current_task_{nullptr},
task_stack_{nullptr}, task_stack_{nullptr},
deque_{task_stack_},
id_{0}, id_{0},
lock_{},
random_{id_} {}; random_{id_} {};
thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) : thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) :
scheduler_{scheduler}, scheduler_{scheduler},
root_task_{nullptr},
current_task_{nullptr}, current_task_{nullptr},
task_stack_{task_stack}, task_stack_{task_stack},
deque_{task_stack_},
id_{id}, id_{id},
lock_{},
random_{id_} {} random_{id_} {}
/**
* Convenience helper to get the thread_state instance associated with this thread.
* Must only be called on threads that are associated with a thread_state,
* this will most likely be threads created by the scheduler.
*
* @return The thread_state of this thread.
*/
static thread_state *get() { return base::this_thread::state<thread_state>(); }
}; };
} }
......
...@@ -3,8 +3,7 @@ ...@@ -3,8 +3,7 @@
#include "pls/algorithms/invoke_parallel.h" #include "pls/algorithms/invoke_parallel.h"
#include "pls/algorithms/parallel_for.h" #include "pls/algorithms/parallel_for.h"
#include "pls/internal/scheduling/abstract_task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
...@@ -14,17 +13,15 @@ using internal::scheduling::static_scheduler_memory; ...@@ -14,17 +13,15 @@ using internal::scheduling::static_scheduler_memory;
using internal::scheduling::malloc_scheduler_memory; using internal::scheduling::malloc_scheduler_memory;
using internal::scheduling::scheduler; using internal::scheduling::scheduler;
using task_id = internal::scheduling::abstract_task::id;
using unique_id = internal::helpers::unique_id; using unique_id = internal::helpers::unique_id;
using internal::scheduling::fork_join_sub_task; using internal::scheduling::task;
using internal::scheduling::fork_join_lambda_by_reference; using internal::scheduling::lambda_task_by_reference;
using internal::scheduling::fork_join_lambda_by_value; using internal::scheduling::lambda_task_by_value;
using internal::scheduling::fork_join_task; using internal::scheduling::task;
using algorithm::invoke_parallel; using algorithm::invoke_parallel;
using algorithm::parallel_for_fork_join;
using algorithm::parallel_for; using algorithm::parallel_for;
} }
......
#include <mutex>
#include "pls/internal/data_structures/deque.h"
namespace pls {
namespace internal {
namespace data_structures {
deque_item *deque_internal::pop_head_internal() {
std::lock_guard<base::spin_lock> lock{lock_};
if (head_ == nullptr) {
return nullptr;
}
deque_item *result = head_;
head_ = head_->next_;
if (head_ == nullptr) {
tail_ = nullptr;
} else {
head_->prev_ = nullptr;
}
return result;
}
deque_item *deque_internal::pop_tail_internal() {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ == nullptr) {
return nullptr;
}
deque_item *result = tail_;
tail_ = tail_->prev_;
if (tail_ == nullptr) {
head_ = nullptr;
} else {
tail_->next_ = nullptr;
}
return result;
}
void deque_internal::push_tail_internal(deque_item *new_item) {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ != nullptr) {
tail_->next_ = new_item;
} else {
head_ = new_item;
}
new_item->prev_ = tail_;
new_item->next_ = nullptr;
tail_ = new_item;
}
}
}
}
#include <pls/internal/base/backoff.h>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/abstract_task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
bool abstract_task::steal_work() {
// thread_local static base::backoff backoff{};
PROFILE_STEALING("abstract_task::steal_work")
const auto my_state = base::this_thread::state<thread_state>();
const auto my_scheduler = my_state->scheduler_;
const size_t my_id = my_state->id_;
const size_t offset = my_state->random_() % my_scheduler->num_threads();
const size_t max_tries = my_scheduler->num_threads(); // TODO: Tune this value
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % my_scheduler->num_threads();
if (target == my_id) {
continue;
}
auto target_state = my_scheduler->thread_state_for(target);
if (!target_state->lock_.reader_try_lock()) {
continue;
}
// Dig down to our level
PROFILE_STEALING("Go to our level")
abstract_task *current_task = target_state->root_task_;
while (current_task != nullptr && current_task->depth() < depth()) {
current_task = current_task->child();
}
PROFILE_END_BLOCK
// Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation
PROFILE_STEALING("Internal Steal")
if (current_task != nullptr) {
// See if it equals our type and depth of task
if (current_task->unique_id_ == unique_id_ &&
current_task->depth_ == depth_) {
if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler
target_state->lock_.reader_unlock();
// backoff.reset();
return true;
}
// No success, we need to steal work from a deeper level using 'top level task stealing'
current_task = current_task->child();
}
}
PROFILE_END_BLOCK;
// Execute 'top level task steal' if possible
// (only try deeper tasks to keep depth restricted stealing).
PROFILE_STEALING("Top Level Steal")
while (current_task != nullptr) {
auto lock = &target_state->lock_;
if (current_task->split_task(lock)) {
// top level steal was a success (we did a top level task steal)
// backoff.reset();
return false;
}
current_task = current_task->child_task_;
}
PROFILE_END_BLOCK;
target_state->lock_.reader_unlock();
}
// internal steal was no success
// backoff.do_backoff();
// base::this_thread::sleep(5);
return false;
}
}
}
}
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/fork_join_task.h"
namespace pls {
namespace internal {
namespace scheduling {
fork_join_sub_task::fork_join_sub_task() :
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
deque_state_{0} {}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) :
ref_count_{0},
parent_{other.parent_},
tbb_task_{other.tbb_task_},
deque_state_{other.deque_state_} {}
void fork_join_sub_task::execute() {
PROFILE_WORK_BLOCK("execute sub_task")
auto last_executing = tbb_task_->currently_executing_;
tbb_task_->currently_executing_ = this;
execute_internal();
tbb_task_->currently_executing_ = last_executing;
PROFILE_END_BLOCK
wait_for_all();
if (parent_ != nullptr) {
parent_->ref_count_--;
}
}
void fork_join_sub_task::wait_for_all() {
while (ref_count_ > 0) {
PROFILE_STEALING("get local sub task")
fork_join_sub_task *local_task = tbb_task_->get_local_sub_task();
PROFILE_END_BLOCK
if (local_task != nullptr) {
local_task->execute();
} else {
// Try to steal work.
// External steal will be executed implicitly if success
PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work();
PROFILE_END_BLOCK
if (internal_steal_success) {
tbb_task_->last_stolen_->execute();
}
}
}
tbb_task_->deque_.release_memory_until(deque_state_);
}
fork_join_sub_task *fork_join_task::get_local_sub_task() {
return deque_.pop_tail();
}
fork_join_sub_task *fork_join_task::get_stolen_sub_task() {
return deque_.pop_head();
}
fork_join_sub_task *fork_join_sub_task::current() {
return dynamic_cast<fork_join_task *>(scheduler::current_task())->currently_executing();
}
bool fork_join_task::internal_stealing(abstract_task *other_task) {
PROFILE_STEALING("fork_join_task::internal_stealin")
auto cast_other_task = reinterpret_cast<fork_join_task *>(other_task);
auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
} else {
// Make sub-task belong to our fork_join_task instance
stolen_sub_task->tbb_task_ = this;
stolen_sub_task->deque_state_ = deque_.save_state();
// We will execute this next without explicitly moving it onto our stack storage
last_stolen_ = stolen_sub_task;
return true;
}
}
bool fork_join_task::split_task(base::swmr_spin_lock *lock) {
PROFILE_STEALING("fork_join_task::split_task")
fork_join_sub_task *stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
}
fork_join_task task{stolen_sub_task, this->unique_id()};
// In success case, unlock.
lock->reader_unlock();
scheduler::execute_task(task, depth());
return true;
}
void fork_join_task::execute() {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
// TODO: See if we did this right
// my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
deque_.reset_base_pointer();
root_task_->tbb_task_ = this;
root_task_->deque_state_ = deque_.save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
}
fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; }
fork_join_task::fork_join_task(fork_join_sub_task *root_task,
const abstract_task::id &id) :
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
deque_{base::this_thread::state<thread_state>()->task_stack_},
last_stolen_{nullptr} {}
}
}
}
#include "pls/internal/scheduling/parallel_iterator_task.h"
#include "pls/internal/scheduling/root_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/run_on_n_threads_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
namespace pls { namespace pls {
...@@ -17,7 +20,7 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) : ...@@ -17,7 +20,7 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) :
for (unsigned int i = 0; i < num_threads_; i++) { for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized. // Placement new is required, as the memory of `memory_` is not required to be initialized.
new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i}; new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&worker_routine, new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&scheduler::worker_routine,
memory_->thread_state_for(i)); memory_->thread_state_for(i));
} }
} }
...@@ -26,20 +29,38 @@ scheduler::~scheduler() { ...@@ -26,20 +29,38 @@ scheduler::~scheduler() {
terminate(); terminate();
} }
void worker_routine() { void scheduler::worker_routine() {
auto my_state = base::this_thread::state<thread_state>(); auto my_state = thread_state::get();
auto scheduler = my_state->scheduler_;
while (true) { while (true) {
my_state->scheduler_->sync_barrier_.wait(); // Wait to be triggered
if (my_state->scheduler_->terminated_) { scheduler->sync_barrier_.wait();
// Check for shutdown
if (scheduler->terminated_) {
return; return;
} }
// The root task must only return when all work is done, // Execute work
// because of this a simple call is enough to ensure the if (my_state->id_ == 0) {
// fork-join-section is done (logically joined back into our main thread). // Main Thread
my_state->root_task_->execute(); auto root_task = scheduler->main_thread_root_task_;
root_task->parent_ = nullptr;
root_task->deque_state_ = my_state->deque_.save_state();
root_task->execute();
scheduler->work_section_done_ = true;
} else {
// Worker Threads
while (!scheduler->work_section_done_) {
if (!scheduler->try_execute_local()) {
scheduler->try_execute_stolen();
}
}
}
// Sync back with main thread
my_state->scheduler_->sync_barrier_.wait(); my_state->scheduler_->sync_barrier_.wait();
} }
} }
...@@ -59,6 +80,69 @@ void scheduler::terminate(bool wait_for_workers) { ...@@ -59,6 +80,69 @@ void scheduler::terminate(bool wait_for_workers) {
} }
} }
task *scheduler::get_local_task() {
PROFILE_STEALING("Get Local Task")
return thread_state::get()->deque_.pop_tail();
}
task *scheduler::steal_task() {
PROFILE_STEALING("Steal Task")
// Data for victim selection
const auto my_state = thread_state::get();
const auto my_id = my_state->id_;
const size_t offset = my_state->random_() % num_threads();
const size_t max_tries = num_threads(); // TODO: Tune this value
// Current strategy: random start, then round robin from there
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % num_threads();
// Skip our self for stealing
target = ((target == my_id) + target) % num_threads();
auto target_state = thread_state_for(target);
// TODO: See if we should re-try popping if it failed due to contention
auto result = target_state->deque_.pop_head();
if (result != nullptr) {
return result;
}
// TODO: See if we should backoff here (per missed steal)
}
// TODO: See if we should backoff here (after a 'round' of missed steals)
return nullptr;
}
bool scheduler::try_execute_local() {
task *local_task = get_local_task();
if (local_task != nullptr) {
local_task->execute();
return true;
} else {
return false;
}
}
bool scheduler::try_execute_stolen() {
task *stolen_task = steal_task();
if (stolen_task != nullptr) {
stolen_task->deque_state_ = thread_state::get()->deque_.save_state();
stolen_task->execute();
return true;
}
return false;
}
void scheduler::wait_for_all() {
thread_state::get()->current_task_->wait_for_all();
}
thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); }
} }
} }
} }
...@@ -10,14 +10,25 @@ malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const ...@@ -10,14 +10,25 @@ malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const
reinterpret_cast<aligned_thread *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread))); reinterpret_cast<aligned_thread *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread)));
thread_states_ = reinterpret_cast<aligned_thread_state *>(base::alignment::allocate_aligned( thread_states_ = reinterpret_cast<aligned_thread_state *>(base::alignment::allocate_aligned(
num_threads * sizeof(aligned_thread_state))); num_threads * sizeof(aligned_thread_state)));
task_stacks_ = reinterpret_cast<aligned_aligned_stack *>(base::alignment::allocate_aligned( task_stacks_ = reinterpret_cast<aligned_aligned_stack *>(base::alignment::allocate_aligned(
num_threads * sizeof(aligned_aligned_stack))); num_threads * sizeof(aligned_aligned_stack)));
task_stacks_memory_ = reinterpret_cast<char **>(base::alignment::allocate_aligned(num_threads * sizeof(char *))); task_stacks_memory_ = reinterpret_cast<char **>(base::alignment::allocate_aligned(num_threads * sizeof(char *)));
thread_refs_ = static_cast<scheduler_thread **>(malloc(num_threads * sizeof(scheduler_thread *)));
thread_state_refs_ = static_cast<thread_state **>(malloc(num_threads * sizeof(thread_state *)));
task_stack_refs_ =
static_cast<data_structures::aligned_stack **>(malloc(num_threads * sizeof(data_structures::aligned_stack *)));
for (size_t i = 0; i < num_threads_; i++) { for (size_t i = 0; i < num_threads_; i++) {
task_stacks_memory_[i] = reinterpret_cast<char *>(base::alignment::allocate_aligned(memory_per_stack)); task_stacks_memory_[i] = reinterpret_cast<char *>(base::alignment::allocate_aligned(memory_per_stack));
new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack); new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack);
thread_refs_[i] = threads_[i].pointer();
thread_state_refs_[i] = thread_states_[i].pointer();
task_stack_refs_[i] = task_stacks_[i].pointer();
} }
init(num_threads, thread_state_refs_, thread_refs_, task_stack_refs_);
} }
malloc_scheduler_memory::~malloc_scheduler_memory() { malloc_scheduler_memory::~malloc_scheduler_memory() {
...@@ -29,6 +40,10 @@ malloc_scheduler_memory::~malloc_scheduler_memory() { ...@@ -29,6 +40,10 @@ malloc_scheduler_memory::~malloc_scheduler_memory() {
} }
free(task_stacks_); free(task_stacks_);
free(task_stacks_memory_); free(task_stacks_memory_);
free(thread_refs_);
free(thread_state_refs_);
free(task_stack_refs_);
} }
} }
......
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
task::task() :
ref_count_{0},
parent_{nullptr},
deque_state_{0} {}
task::task(const task &other) :
ref_count_{0},
parent_{other.parent_},
deque_state_{other.deque_state_} {}
void task::execute() {
PROFILE_WORK_BLOCK("execute task")
auto last_executing = thread_state::get()->current_task_;
thread_state::get()->current_task_ = this;
execute_internal();
PROFILE_END_BLOCK
wait_for_all();
thread_state::get()->current_task_ = last_executing;
if (parent_ != nullptr) {
parent_->ref_count_--;
}
}
void task::wait_for_all() {
auto scheduler = thread_state::get()->scheduler_;
while (ref_count_ > 0) {
if (!scheduler->try_execute_local()) {
scheduler->try_execute_stolen();
}
}
thread_state::get()->deque_.release_memory_until(deque_state_);
}
}
}
}
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
add_executable(tests add_executable(tests
main.cpp main.cpp
data_structures_test.cpp) data_structures_test.cpp
scheduling_tests.cpp)
target_link_libraries(tests catch2 pls) target_link_libraries(tests catch2 pls)
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#include <pls/internal/base/system_details.h> #include <pls/internal/base/system_details.h>
#include <pls/internal/data_structures/aligned_stack.h> #include <pls/internal/data_structures/aligned_stack.h>
#include <pls/internal/data_structures/deque.h> #include <pls/internal/data_structures/locking_deque.h>
#include <pls/internal/data_structures/work_stealing_deque.h> #include <pls/internal/data_structures/work_stealing_deque.h>
#include <vector> #include <vector>
...@@ -77,58 +77,62 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a ...@@ -77,58 +77,62 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a
} }
TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]") { TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]") {
class my_item : public deque_item { class my_item {
}; };
deque<my_item> deque; constexpr long data_size = 2 << 14;
my_item one, two, three; char data[data_size];
aligned_stack stack{data, data_size};
locking_deque<int> deque{&stack};
int one = 1, two = 2, three = 3;
SECTION("add and remove items form the tail") { SECTION("add and remove items form the tail") {
deque.push_tail(&one); deque.push_tail(one);
deque.push_tail(&two); deque.push_tail(two);
deque.push_tail(&three); deque.push_tail(three);
REQUIRE(deque.pop_tail() == &three); REQUIRE(*deque.pop_tail() == three);
REQUIRE(deque.pop_tail() == &two); REQUIRE(*deque.pop_tail() == two);
REQUIRE(deque.pop_tail() == &one); REQUIRE(*deque.pop_tail() == one);
} }
SECTION("handles getting empty by popping the tail correctly") { SECTION("handles getting empty by popping the tail correctly") {
deque.push_tail(&one); deque.push_tail(one);
REQUIRE(deque.pop_tail() == &one); REQUIRE(*deque.pop_tail() == one);
deque.push_tail(&two); deque.push_tail(two);
REQUIRE(deque.pop_tail() == &two); REQUIRE(*deque.pop_tail() == two);
} }
SECTION("remove items form the head") { SECTION("remove items form the head") {
deque.push_tail(&one); deque.push_tail(one);
deque.push_tail(&two); deque.push_tail(two);
deque.push_tail(&three); deque.push_tail(three);
REQUIRE(deque.pop_head() == &one); REQUIRE(*deque.pop_head() == one);
REQUIRE(deque.pop_head() == &two); REQUIRE(*deque.pop_head() == two);
REQUIRE(deque.pop_head() == &three); REQUIRE(*deque.pop_head() == three);
} }
SECTION("handles getting empty by popping the head correctly") { SECTION("handles getting empty by popping the head correctly") {
deque.push_tail(&one); deque.push_tail(one);
REQUIRE(deque.pop_head() == &one); REQUIRE(*deque.pop_head() == one);
deque.push_tail(&two); deque.push_tail(two);
REQUIRE(deque.pop_head() == &two); REQUIRE(*deque.pop_head() == two);
} }
SECTION("handles getting empty by popping the head and tail correctly") { SECTION("handles getting empty by popping the head and tail correctly") {
deque.push_tail(&one); deque.push_tail(one);
REQUIRE(deque.pop_tail() == &one); REQUIRE(*deque.pop_tail() == one);
deque.push_tail(&two); deque.push_tail(two);
REQUIRE(deque.pop_head() == &two); REQUIRE(*deque.pop_head() == two);
deque.push_tail(&three); deque.push_tail(three);
REQUIRE(deque.pop_tail() == &three); REQUIRE(*deque.pop_tail() == three);
} }
} }
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
using namespace pls; using namespace pls;
class once_sub_task : public fork_join_sub_task { class once_sub_task : public task {
std::atomic<int> *counter_; std::atomic<int> *counter_;
int children_; int children_;
...@@ -18,12 +18,12 @@ class once_sub_task : public fork_join_sub_task { ...@@ -18,12 +18,12 @@ class once_sub_task : public fork_join_sub_task {
public: public:
explicit once_sub_task(std::atomic<int> *counter, int children) : explicit once_sub_task(std::atomic<int> *counter, int children) :
fork_join_sub_task(), task{},
counter_{counter}, counter_{counter},
children_{children} {} children_{children} {}
}; };
class force_steal_sub_task : public fork_join_sub_task { class force_steal_sub_task : public task {
std::atomic<int> *parent_counter_; std::atomic<int> *parent_counter_;
std::atomic<int> *overall_counter_; std::atomic<int> *overall_counter_;
...@@ -41,7 +41,7 @@ class force_steal_sub_task : public fork_join_sub_task { ...@@ -41,7 +41,7 @@ class force_steal_sub_task : public fork_join_sub_task {
public: public:
explicit force_steal_sub_task(std::atomic<int> *parent_counter, std::atomic<int> *overall_counter) : explicit force_steal_sub_task(std::atomic<int> *parent_counter, std::atomic<int> *overall_counter) :
fork_join_sub_task(), task{},
parent_counter_{parent_counter}, parent_counter_{parent_counter},
overall_counter_{overall_counter} {} overall_counter_{overall_counter} {}
}; };
...@@ -57,8 +57,7 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta ...@@ -57,8 +57,7 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta
my_scheduler.perform_work([&]() { my_scheduler.perform_work([&]() {
once_sub_task sub_task{&counter, start_counter}; once_sub_task sub_task{&counter, start_counter};
fork_join_task task{&sub_task, unique_id::create(42)}; scheduler::spawn_child(sub_task);
scheduler::execute_task(task);
}); });
REQUIRE(counter.load() == total_tasks); REQUIRE(counter.load() == total_tasks);
...@@ -70,8 +69,10 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta ...@@ -70,8 +69,10 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta
my_scheduler.perform_work([&]() { my_scheduler.perform_work([&]() {
std::atomic<int> dummy_parent{1}, overall_counter{8}; std::atomic<int> dummy_parent{1}, overall_counter{8};
force_steal_sub_task sub_task{&dummy_parent, &overall_counter}; force_steal_sub_task sub_task{&dummy_parent, &overall_counter};
fork_join_task task{&sub_task, unique_id::create(42)}; scheduler::spawn_child(sub_task);
scheduler::execute_task(task);
// Required, as child operates on our stack's memory!!!
scheduler::wait_for_all();
}); });
my_scheduler.terminate(true); my_scheduler.terminate(true);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment