diff --git a/PERFORMANCE.md b/PERFORMANCE.md
index ea9e983..7112439 100644
--- a/PERFORMANCE.md
+++ b/PERFORMANCE.md
@@ -281,3 +281,104 @@ parallel_for, 512 heat array size):
We observe solid performance from our implementation.
(Again, not very scientific test environment, but good enough for
our general direction)
+
+### Commit 3bdaba42 - Move to pure fork-join tasks (remove two level)
+
+We moved away from our two-level scheduler approach towards a
+pure fork-join task model (in order to remove any lock's in the
+code more easily and to make further tests simpler/more focused
+on one specific aspecs.
+These are the measurements made after the change
+(without any performance optimizations done):
+
+FFT Average:
+
+
+
+Heat Diffusion Average:
+
+
+
+Matrix Multiplication Average:
+
+
+
+Unbalanced Tree Search Average:
+
+
+
+
+We note that in heat diffusion, matrix multiplication and unbalanced
+tree search - all three benchmarks with mostly enough work avaliable at
+all time - our implementation performs head on head with intel's
+TBB. Only the FFT benchmark is a major problem four our library.
+We notice a MAJOR drop in performance exactly at the hyperthreading
+mark, indicating problems with limited resources due to the spinning
+threads (threads without any actual work) and the threads actually
+performing work. Most likely there is a resource on the same cache
+line used that hinders the working threads, but we can not really
+figure out which one it is.
+
+### Commit be2cdbfe - Locking Deque
+
+Switching to a locking deque has not improved (or even slightly hurt)
+performance, we therefore think that the deque itself is not the
+portion slowing down our execution.
+
+### Commit 5044f0a1 - Performance Bottelneck in FFT FIXED
+
+By moving from directly calling one of the parallel invocations
+
+```c++
+scheduler::spawn_child(sub_task_2);
+function1(); // Execute first function 'inline' without spawning a sub_task object
+```
+
+to spawning two tasks
+```c++
+scheduler::spawn_child(sub_task_2);
+scheduler::spawn_child(sub_task_1);
+```
+
+we where able to fix the bad performance of our framework in the
+FFT benchmark (where there is a lot spinning/idling of some
+worker threads).
+
+We think this is due to some sort of cache misses/bus contemption
+on the finishing counters. This would make sense, as the drop
+at the hyperthreading mark indicates problems with this part of the
+CPU pipeline (althought it did not show clearly in our profiling runs).
+We will now try to find the exact spot where the problem originates and
+fix the source rather then 'circumventing' it with these extra tasks.
+(This then aigain, should hopefully even boost all other workloads
+performance, as contemption on the bus/cache is always bad)
+
+
+After some research we think that the issue is down to many threads
+referencing the same atomic reference counter. We think so because
+even cache aligning the shared refernce count does not fix the issue
+when using the direct function call. Also, forcing a new method call
+(going down in the call stack one function call) is not solving the
+issue (thus making sure that it is not related with some caching issue
+in the call itself).
+
+In conclusion there seems to be a hyperthreading issue with this
+shared reference count. We keep this in mind if we eventually get
+tasks with changing data memebers (as this problem could reappear there,
+as then the ref_count actualy is in the same memory region as our
+'user variables'). For now we leave the code like it is.
+
+
+FFT Average with new call method:
+
+
+
+The performance of our new call method looks shockingly similar
+to TBB with a slight, constant performance drop behind it.
+This makes sense, as the basic principle (lock-free, classic work
+stealing deque and the parallel call structure) are nearly the same.
+
+We will see if minor optimizations can even close this last gap.
+Overall the performance at this point is good enough to move on
+to implementing more functionality and to running tests on different
+queues/stealing tactics etc.
diff --git a/app/benchmark_unbalanced/main.cpp b/app/benchmark_unbalanced/main.cpp
index 1860877..e3e29d4 100644
--- a/app/benchmark_unbalanced/main.cpp
+++ b/app/benchmark_unbalanced/main.cpp
@@ -19,15 +19,14 @@ int count_child_nodes(uts::node &node) {
return child_count;
}
- auto current_task = pls::fork_join_sub_task::current();
std::vector results(children.size());
for (size_t i = 0; i < children.size(); i++) {
size_t index = i;
auto lambda = [&, index] { results[index] = count_child_nodes(children[index]); };
- pls::fork_join_lambda_by_value sub_task(lambda);
- current_task->spawn_child(sub_task);
+ pls::lambda_task_by_value sub_task(lambda);
+ pls::scheduler::spawn_child(sub_task);
}
- current_task->wait_for_all();
+ pls::scheduler::wait_for_all();
for (auto result : results) {
child_count += result;
}
@@ -36,43 +35,41 @@ int count_child_nodes(uts::node &node) {
}
int unbalanced_tree_search(int seed, int root_children, double q, int normal_children) {
- static auto id = pls::unique_id::create(42);
int result;
auto lambda = [&] {
uts::node root(seed, root_children, q, normal_children);
result = count_child_nodes(root);
};
- pls::fork_join_lambda_by_reference task(lambda);
- pls::fork_join_lambda_by_reference sub_task(lambda);
- pls::fork_join_task root_task{&sub_task, id};
- pls::scheduler::execute_task(root_task);
+ pls::lambda_task_by_reference sub_task(lambda);
+ pls::scheduler::spawn_child(sub_task);
+ pls::scheduler::wait_for_all();
return result;
}
-//
-//int main() {
-// PROFILE_ENABLE
-// pls::internal::helpers::run_mini_benchmark([&] {
-// unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
-// }, 8, 4000);
-//
-// PROFILE_SAVE("test_profile.prof")
-//}
int main() {
PROFILE_ENABLE
- pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18};
- pls::scheduler scheduler{&my_scheduler_memory, 8};
-
- scheduler.perform_work([&] {
- PROFILE_MAIN_THREAD
- for (int i = 0; i < 50; i++) {
- PROFILE_WORK_BLOCK("Top Level")
- int result = unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
- std::cout << result << std::endl;
- }
- });
+ pls::internal::helpers::run_mini_benchmark([&] {
+ unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
+ }, 8, 2000);
PROFILE_SAVE("test_profile.prof")
}
+
+//int main() {
+// PROFILE_ENABLE
+// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18};
+// pls::scheduler scheduler{&my_scheduler_memory, 8};
+//
+// scheduler.perform_work([&] {
+// PROFILE_MAIN_THREAD
+// for (int i = 0; i < 50; i++) {
+// PROFILE_WORK_BLOCK("Top Level")
+// int result = unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
+// std::cout << result << std::endl;
+// }
+// });
+//
+// PROFILE_SAVE("test_profile.prof")
+//}
diff --git a/app/invoke_parallel/main.cpp b/app/invoke_parallel/main.cpp
index e469bad..4382168 100644
--- a/app/invoke_parallel/main.cpp
+++ b/app/invoke_parallel/main.cpp
@@ -91,7 +91,7 @@ int main() {
PROFILE_MAIN_THREAD
// Call looks just the same, only requirement is
// the enclosure in the perform_work lambda.
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 10; i++) {
PROFILE_WORK_BLOCK("Top Level FFT")
complex_vector input = initial_input;
fft(input.begin(), input.size());
diff --git a/app/playground/main.cpp b/app/playground/main.cpp
index d3a7a50..cde3abd 100644
--- a/app/playground/main.cpp
+++ b/app/playground/main.cpp
@@ -7,8 +7,7 @@
#include
#include
-#include
-#include
+#include
int main() {
diff --git a/cmake/SetupOptimizationLevel.cmake b/cmake/SetupOptimizationLevel.cmake
index a7e20f5..1f4031a 100644
--- a/cmake/SetupOptimizationLevel.cmake
+++ b/cmake/SetupOptimizationLevel.cmake
@@ -4,22 +4,22 @@
#################################################################################
# make sure a build type is set, default to release
-if(NOT CMAKE_BUILD_TYPE)
+if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Release)
-endif()
+endif ()
message("-- Using Build Type: " ${CMAKE_BUILD_TYPE})
# Enable optimizations in release builds
-if(CMAKE_BUILD_TYPE STREQUAL "Release")
+if (CMAKE_BUILD_TYPE STREQUAL "Release")
# Link time optimization
set(CMAKE_CXX_FLAGS "-Wall -Wextra")
# -O2 is often seen as 'the most speed',
# but inlining functions and SIMD/Vectorization is
# only enabled by -O3, thus it's way faster in some
# array calculations.
- set(CMAKE_CXX_FLAGS_RELEASE "-O3")
+ set(CMAKE_CXX_FLAGS_RELEASE "-O3 -march=native")
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)
-else()
+else ()
set(CMAKE_CXX_FLAGS_DEBUG "-g -O0")
-endif()
\ No newline at end of file
+endif ()
diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt
index d834402..f30cd59 100644
--- a/lib/pls/CMakeLists.txt
+++ b/lib/pls/CMakeLists.txt
@@ -20,7 +20,7 @@ add_library(pls STATIC
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
- include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp
+ include/pls/internal/data_structures/locking_deque.h
include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h
include/pls/internal/data_structures/stamped_integer.h
@@ -29,16 +29,12 @@ add_library(pls STATIC
include/pls/internal/helpers/mini_benchmark.h
include/pls/internal/helpers/unique_id.h
- include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp
- include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
- include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp
+ include/pls/internal/scheduling/thread_state.h
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h
- include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp
- include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp
+ include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
- include/pls/internal/scheduling/parallel_iterator_task.h include/pls/internal/scheduling/parallel_iterator_task_impl.h
- src/internal/scheduling/parallel_iterator_task.cpp)
+ include/pls/internal/scheduling/lambda_task.h include/pls/internal/data_structures/deque.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
diff --git a/lib/pls/include/pls/algorithms/invoke_parallel.h b/lib/pls/include/pls/algorithms/invoke_parallel.h
index 17b439e..e311a71 100644
--- a/lib/pls/include/pls/algorithms/invoke_parallel.h
+++ b/lib/pls/include/pls/algorithms/invoke_parallel.h
@@ -2,7 +2,7 @@
#ifndef PLS_PARALLEL_INVOKE_H
#define PLS_PARALLEL_INVOKE_H
-#include "pls/internal/scheduling/fork_join_task.h"
+#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
diff --git a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h
index 9bfa185..ef634bd 100644
--- a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h
+++ b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h
@@ -2,70 +2,36 @@
#ifndef PLS_INVOKE_PARALLEL_IMPL_H
#define PLS_INVOKE_PARALLEL_IMPL_H
-#include
-#include "pls/internal/scheduling/fork_join_task.h"
+#include "pls/internal/scheduling/task.h"
+#include "pls/internal/scheduling/lambda_task.h"
#include "pls/internal/scheduling/scheduler.h"
-#include "pls/internal/helpers/unique_id.h"
-#include "pls/internal/base/alignment.h"
+#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace algorithm {
-namespace internal {
-
-using namespace ::pls::internal::scheduling;
-
-template
-inline void run_body(const Body &internal_body, const abstract_task::id &id) {
- // Make sure we are in the context of this invoke_parallel instance,
- // if not we will spawn it as a new 'fork-join-style' task.
- auto current_task = scheduler::current_task();
- if (current_task->unique_id() == id) {
- internal_body();
- } else {
- fork_join_lambda_by_reference root_body(internal_body);
- fork_join_task root_task{&root_body, id};
- scheduler::execute_task(root_task);
- }
-}
-}
template
void invoke_parallel(const Function1 &function1, const Function2 &function2) {
using namespace ::pls::internal::scheduling;
- using namespace ::pls::internal::helpers;
- using namespace ::pls::internal::base;
- static abstract_task::id id = unique_id::create();
-
- auto internal_body = [&]() {
- auto current_task = fork_join_sub_task::current();
- auto sub_task_2 = fork_join_lambda_by_reference(function2);
- current_task->spawn_child(sub_task_2);
- function1(); // Execute first function 'inline' without spawning a sub_task object
- current_task->wait_for_all();
- };
+ auto sub_task_1 = lambda_task_by_reference(function1);
+ auto sub_task_2 = lambda_task_by_reference(function2);
- internal::run_body(internal_body, id);
+ scheduler::spawn_child(sub_task_2);
+ scheduler::spawn_child_and_wait(sub_task_1);
}
template
void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3) {
using namespace ::pls::internal::scheduling;
- using namespace ::pls::internal::helpers;
- static abstract_task::id id = unique_id::create();
-
- auto internal_body = [&]() {
- auto current_task = fork_join_sub_task::current();
- auto sub_task_2 = fork_join_lambda_by_reference(function2);
- auto sub_task_3 = fork_join_lambda_by_reference(function3);
- current_task->spawn_child(sub_task_2);
- current_task->spawn_child(sub_task_3);
- function1(); // Execute first function 'inline' without spawning a sub_task object
- current_task->wait_for_all();
- };
+ auto sub_task_1 = lambda_task_by_reference(function1);
+ auto sub_task_2 = lambda_task_by_reference(function2);
+ auto sub_task_3 = lambda_task_by_reference(function3);
- internal::run_body(internal_body, id);
+ scheduler::spawn_child(sub_task_3);
+ scheduler::spawn_child(sub_task_2);
+ scheduler::spawn_child_and_wait(sub_task_1);
}
}
diff --git a/lib/pls/include/pls/algorithms/parallel_for.h b/lib/pls/include/pls/algorithms/parallel_for.h
index 58334fe..4df1104 100644
--- a/lib/pls/include/pls/algorithms/parallel_for.h
+++ b/lib/pls/include/pls/algorithms/parallel_for.h
@@ -8,9 +8,6 @@ namespace algorithm {
template
void parallel_for(RandomIt first, RandomIt last, const Function &function);
-template
-void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function);
-
}
}
#include "parallel_for_impl.h"
diff --git a/lib/pls/include/pls/algorithms/parallel_for_impl.h b/lib/pls/include/pls/algorithms/parallel_for_impl.h
index 5b79468..b787b44 100644
--- a/lib/pls/include/pls/algorithms/parallel_for_impl.h
+++ b/lib/pls/include/pls/algorithms/parallel_for_impl.h
@@ -2,21 +2,17 @@
#ifndef PLS_PARALLEL_FOR_IMPL_H
#define PLS_PARALLEL_FOR_IMPL_H
-#include "pls/internal/scheduling/fork_join_task.h"
-#include "pls/internal/scheduling/parallel_iterator_task.h"
-#include "pls/internal/scheduling/scheduler.h"
+#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h"
namespace pls {
namespace algorithm {
-namespace internal {
+
template
void parallel_for(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling;
- using namespace ::pls::internal::helpers;
- using namespace ::pls::internal::base;
constexpr long min_elements = 4;
long num_elements = std::distance(first, last);
@@ -29,39 +25,16 @@ void parallel_for(RandomIt first, RandomIt last, const Function &function) {
// Cut in half recursively
long middle_index = num_elements / 2;
- auto body = [=] { internal::parallel_for(first + middle_index, last, function); };
- fork_join_lambda_by_reference second_half_task(body);
- fork_join_sub_task::current()->spawn_child(second_half_task);
+ auto body2 = [=] { parallel_for(first + middle_index, last, function); };
+ lambda_task_by_reference second_half_task(body2);
+ scheduler::spawn_child(second_half_task);
- parallel_for(first, first + middle_index, function);
- fork_join_sub_task::current()->wait_for_all();
+ auto body1 = [=] { parallel_for(first, first + middle_index, function); };
+ lambda_task_by_reference first_half_task(body1);
+ scheduler::spawn_child(first_half_task);
+ scheduler::wait_for_all();
}
}
-}
-
-template
-void parallel_for(RandomIt first, RandomIt last, const Function &function) {
- using namespace ::pls::internal::scheduling;
- using namespace ::pls::internal::helpers;
- using namespace ::pls::internal::base;
- static abstract_task::id id = unique_id::create();
-
- parallel_iterator_task iterator_task{first, last, function, id};
- scheduler::execute_task(iterator_task);
-}
-
-template
-void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function) {
- using namespace ::pls::internal::scheduling;
- using namespace ::pls::internal::helpers;
- using namespace ::pls::internal::base;
- static abstract_task::id id = unique_id::create();
-
- auto body = [=] { internal::parallel_for(first, last, function); };
- fork_join_lambda_by_reference root_body(body);
- fork_join_task root_task{&root_body, id};
- scheduler::execute_task(root_task);
-}
}
}
diff --git a/lib/pls/include/pls/internal/data_structures/deque.h b/lib/pls/include/pls/internal/data_structures/deque.h
index 8f555da..f8a3f39 100644
--- a/lib/pls/include/pls/internal/data_structures/deque.h
+++ b/lib/pls/include/pls/internal/data_structures/deque.h
@@ -1,62 +1,19 @@
-#ifndef PLS_DEQUE_H
-#define PLS_DEQUE_H
+#ifndef PLS_DEQUE_H_
+#define PLS_DEQUE_H_
-#include "pls/internal/base/spin_lock.h"
+#include "work_stealing_deque.h"
+#include "locking_deque.h"
namespace pls {
namespace internal {
namespace data_structures {
-/**
- * Turns any object into deque item when inheriting from this.
- */
-class deque_item {
- friend class deque_internal;
-
- deque_item *prev_;
- deque_item *next_;
-
-};
-
-class deque_internal {
- protected:
- deque_item *head_;
- deque_item *tail_;
-
- base::spin_lock lock_;
-
- deque_item *pop_head_internal();
- deque_item *pop_tail_internal();
- void push_tail_internal(deque_item *new_item);
-};
-
-/**
- * A double linked list based deque.
- * Storage is therefore only needed for the individual items.
- *
- * @tparam Item The type of items stored in this deque
- */
template
-class deque : deque_internal {
- public:
- explicit deque() : deque_internal{} {}
-
- inline Item *pop_head() {
- return static_cast- (pop_head_internal());
- }
-
- inline Item *pop_tail() {
- return static_cast
- (pop_tail_internal());
- }
-
- inline void push_tail(Item *new_item) {
- push_tail_internal(new_item);
- }
-};
+using deque = work_stealing_deque
- ;
}
}
}
-#endif //PLS_DEQUE_H
+#endif //PLS_DEQUE_H_
diff --git a/lib/pls/include/pls/internal/data_structures/locking_deque.h b/lib/pls/include/pls/internal/data_structures/locking_deque.h
new file mode 100644
index 0000000..0c2a253
--- /dev/null
+++ b/lib/pls/include/pls/internal/data_structures/locking_deque.h
@@ -0,0 +1,124 @@
+
+#ifndef PLS_LOCKING_DEQUE_H
+#define PLS_LOCKING_DEQUE_H
+
+#include
+
+#include "pls/internal/base/spin_lock.h"
+#include "pls/internal/data_structures/aligned_stack.h"
+
+namespace pls {
+namespace internal {
+namespace data_structures {
+
+/**
+ * Wraps any object into a deque item.
+ */
+template
+struct locking_deque_item {
+ Item *item_;
+
+ locking_deque_item *prev_;
+ locking_deque_item *next_;
+
+};
+
+template
+struct locking_deque_container : public locking_deque_item
- {
+ Content content_;
+
+ public:
+ explicit locking_deque_container(const Content &content_) : content_{content_} {}
+};
+
+/**
+ * A double linked list based deque.
+ * Storage is therefore only needed for the individual items.
+ *
+ * @tparam Item The type of items stored in this deque
+ */
+template
+class locking_deque {
+ aligned_stack *stack_;
+
+ locking_deque_item
- *head_;
+ locking_deque_item
- *tail_;
+
+ base::spin_lock lock_;
+
+ public:
+ using state = aligned_stack::state;
+
+ explicit locking_deque(aligned_stack *stack)
+ : stack_{stack}, head_{nullptr}, tail_{nullptr}, lock_{} {}
+
+ template
+ T *push_tail(const T &new_item) {
+ static_assert(std::is_same
- ::value || std::is_base_of
- ::value,
+ "Must only push types of
- onto work_stealing_deque
- ");
+
+ std::lock_guard lock{lock_};
+ auto deque_item = stack_->push(locking_deque_container
- {new_item});
+ deque_item->item_ = &deque_item->content_;
+
+ if (tail_ != nullptr) {
+ tail_->next_ = deque_item;
+ } else {
+ head_ = deque_item;
+ }
+ deque_item->prev_ = tail_;
+ deque_item->next_ = nullptr;
+ tail_ = deque_item;
+
+ return &deque_item->content_;
+ }
+
+ Item *pop_tail() {
+ std::lock_guard lock{lock_};
+
+ if (tail_ == nullptr) {
+ return nullptr;
+ }
+
+ auto result = tail_;
+ tail_ = tail_->prev_;
+ if (tail_ == nullptr) {
+ head_ = nullptr;
+ } else {
+ tail_->next_ = nullptr;
+ }
+
+ return result->item_;
+ }
+
+ Item *pop_head() {
+ std::lock_guard lock{lock_};
+
+ if (head_ == nullptr) {
+ return nullptr;
+ }
+
+ auto result = head_;
+ head_ = head_->next_;
+ if (head_ == nullptr) {
+ tail_ = nullptr;
+ } else {
+ head_->prev_ = nullptr;
+ }
+
+ return result->item_;
+ }
+
+ void release_memory_until(state state) {
+ stack_->reset_state(state);
+ }
+ state save_state() {
+ return stack_->save_state();
+ }
+};
+
+}
+}
+}
+
+#endif //PLS_LOCKING_DEQUE_H
diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h
index 5c8ce86..4c89b6b 100644
--- a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h
+++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h
@@ -6,7 +6,6 @@
#include "pls/internal/base/error_handling.h"
#include "pls/internal/data_structures/stamped_integer.h"
-#include "pls/internal/scheduling/thread_state.h"
#include "aligned_stack.h"
@@ -22,10 +21,15 @@ using offset_t = stamped_integer::member_t;
// Single Item in the deque
class work_stealing_deque_item {
+ // TODO: In our opinion these atomic's are a pure formality to make the thread sanitizer happy,
+ // as the race occurs in 'pop_head', where ALL CASES reading a corrupt/old value are cases
+ // where the next CAS fails anywas, thus making these corrupted values have no influence on
+ // the overall program execution.
+ // ==> If we find performance problems in this queue, try removing the atoimcs again.
// Pointer to the actual data
- pointer_t data_;
+ std::atomic data_;
// Index (relative to stack base) to the next and previous element
- offset_t next_item_;
+ std::atomic next_item_;
offset_t previous_item_;
public:
@@ -33,7 +37,7 @@ class work_stealing_deque_item {
template
Item *data() {
- return reinterpret_cast
- (data_);
+ return reinterpret_cast
- (data_.load());
}
template
@@ -41,7 +45,7 @@ class work_stealing_deque_item {
data_ = reinterpret_cast(data);
}
- offset_t next_item() const { return next_item_; }
+ offset_t next_item() const { return next_item_.load(); }
void set_next_item(offset_t next_item) { next_item_ = next_item; }
offset_t previous_item() const { return previous_item_; }
@@ -77,13 +81,6 @@ class work_stealing_deque {
tail_{other.tail_.load()},
previous_tail_{other.previous_tail_} {}
- void reset_base_pointer();
- work_stealing_deque_item *item_at(offset_t offset);
- offset_t current_stack_offset();
-
- template
- std::pair *allocate_item(const T &new_item);
-
template
T *push_tail(const T &new_item);
Item *pop_tail();
@@ -91,6 +88,14 @@ class work_stealing_deque {
void release_memory_until(state state);
state save_state();
+
+ private:
+ void reset_base_pointer();
+ work_stealing_deque_item *item_at(offset_t offset);
+ offset_t current_stack_offset();
+
+ template
+ std::pair *allocate_item(const T &new_item);
};
}
diff --git a/lib/pls/include/pls/internal/scheduling/abstract_task.h b/lib/pls/include/pls/internal/scheduling/abstract_task.h
deleted file mode 100644
index 21d7357..0000000
--- a/lib/pls/include/pls/internal/scheduling/abstract_task.h
+++ /dev/null
@@ -1,45 +0,0 @@
-
-#ifndef PLS_ABSTRACT_TASK_H
-#define PLS_ABSTRACT_TASK_H
-
-#include "pls/internal/base/swmr_spin_lock.h"
-#include "pls/internal/helpers/unique_id.h"
-
-namespace pls {
-namespace internal {
-namespace scheduling {
-
-class abstract_task {
- public:
- using id = helpers::unique_id;
-
- private:
- unsigned int depth_;
- abstract_task::id unique_id_;
- abstract_task *volatile child_task_;
-
- public:
- abstract_task(const unsigned int depth, const abstract_task::id &unique_id) :
- depth_{depth},
- unique_id_{unique_id},
- child_task_{nullptr} {}
-
- virtual void execute() = 0;
- void set_child(abstract_task *child_task) { child_task_ = child_task; }
- abstract_task *child() const { return child_task_; }
-
- void set_depth(unsigned int depth) { depth_ = depth; }
- unsigned int depth() const { return depth_; }
- id unique_id() const { return unique_id_; }
- protected:
- virtual bool internal_stealing(abstract_task *other_task) = 0;
- virtual bool split_task(base::swmr_spin_lock *lock) = 0;
-
- bool steal_work();
-};
-
-}
-}
-}
-
-#endif //PLS_ABSTRACT_TASK_H
diff --git a/lib/pls/include/pls/internal/scheduling/fork_join_task.h b/lib/pls/include/pls/internal/scheduling/fork_join_task.h
deleted file mode 100644
index 33a6c2f..0000000
--- a/lib/pls/include/pls/internal/scheduling/fork_join_task.h
+++ /dev/null
@@ -1,123 +0,0 @@
-
-#ifndef PLS_TBB_LIKE_TASK_H
-#define PLS_TBB_LIKE_TASK_H
-
-#include "pls/internal/helpers/profiler.h"
-
-#include "pls/internal/data_structures/aligned_stack.h"
-#include "pls/internal/data_structures/work_stealing_deque.h"
-
-#include "abstract_task.h"
-#include "thread_state.h"
-
-namespace pls {
-namespace internal {
-namespace scheduling {
-
-class fork_join_task;
-class fork_join_sub_task {
- friend class fork_join_task;
-
- // Coordinate finishing of sub_tasks
- std::atomic_uint32_t ref_count_;
- fork_join_sub_task *parent_;
-
- // Access to TBB scheduling environment
- fork_join_task *tbb_task_;
-
- bool executed = false;
- int executed_at = -1;
-
- // Stack Management (reset stack pointer after wait_for_all() calls)
- data_structures::work_stealing_deque::state deque_state_;
- protected:
- explicit fork_join_sub_task();
- fork_join_sub_task(const fork_join_sub_task &other);
-
- // Overwritten with behaviour of child tasks
- virtual void execute_internal() = 0;
-
- public:
- // Only use them when actually executing this sub_task (only public for simpler API design)
- template
- void spawn_child(T &sub_task);
- void wait_for_all();
-
- static fork_join_sub_task *current();
- private:
- void execute();
-};
-
-template
-class fork_join_lambda_by_reference : public fork_join_sub_task {
- const Function &function_;
-
- public:
- explicit fork_join_lambda_by_reference(const Function &function) : fork_join_sub_task{}, function_{function} {};
-
- protected:
- void execute_internal() override {
- function_();
- }
-};
-
-template
-class fork_join_lambda_by_value : public fork_join_sub_task {
- const Function function_;
-
- public:
- explicit fork_join_lambda_by_value(const Function &function) : fork_join_sub_task{}, function_{function} {};
-
- protected:
- void execute_internal() override {
- function_();
- }
-};
-
-class fork_join_task : public abstract_task {
- friend class fork_join_sub_task;
-
- fork_join_sub_task *root_task_;
- fork_join_sub_task *currently_executing_;
-
- // Double-Ended Queue management
- data_structures::work_stealing_deque deque_;
-
- // Steal Management
- fork_join_sub_task *last_stolen_;
-
- fork_join_sub_task *get_local_sub_task();
- fork_join_sub_task *get_stolen_sub_task();
-
- bool internal_stealing(abstract_task *other_task) override;
- bool split_task(base::swmr_spin_lock * /*lock*/) override;
-
- public:
- explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id);
- void execute() override;
- fork_join_sub_task *currently_executing() const;
-};
-
-template
-void fork_join_sub_task::spawn_child(T &task) {
- PROFILE_FORK_JOIN_STEALING("spawn_child")
- static_assert(std::is_base_of::value, "Only pass fork_join_sub_task subclasses!");
-
- // Keep our refcount up to date
- ref_count_++;
-
- // Assign forced values
- task.parent_ = this;
- task.tbb_task_ = tbb_task_;
- task.deque_state_ = tbb_task_->deque_.save_state();
-
- // Push on our deque
- const T const_task = task;
- tbb_task_->deque_.push_tail(const_task);
-}
-
-}
-}
-}
-
-#endif //PLS_TBB_LIKE_TASK_H
diff --git a/lib/pls/include/pls/internal/scheduling/lambda_task.h b/lib/pls/include/pls/internal/scheduling/lambda_task.h
new file mode 100644
index 0000000..fb6cc4a
--- /dev/null
+++ b/lib/pls/include/pls/internal/scheduling/lambda_task.h
@@ -0,0 +1,41 @@
+
+#ifndef PLS_LAMBDA_TASK_H_
+#define PLS_LAMBDA_TASK_H_
+
+#include "pls/internal/scheduling/task.h"
+
+namespace pls {
+namespace internal {
+namespace scheduling {
+
+template
+class lambda_task_by_reference : public task {
+ const Function &function_;
+
+ public:
+ explicit lambda_task_by_reference(const Function &function) : task{}, function_{function} {};
+
+ protected:
+ void execute_internal() override {
+ function_();
+ }
+};
+
+template
+class lambda_task_by_value : public task {
+ const Function function_;
+
+ public:
+ explicit lambda_task_by_value(const Function &function) : task{}, function_{function} {};
+
+ protected:
+ void execute_internal() override {
+ function_();
+ }
+};
+
+}
+}
+}
+
+#endif //PLS_LAMBDA_TASK_H_
diff --git a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h
deleted file mode 100644
index 304df79..0000000
--- a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h
+++ /dev/null
@@ -1,45 +0,0 @@
-
-#ifndef PLS_PARALLEL_ITERATOR_TASK_H
-#define PLS_PARALLEL_ITERATOR_TASK_H
-
-#include "pls/internal/data_structures/stamped_integer.h"
-#include "abstract_task.h"
-
-namespace pls {
-namespace internal {
-namespace scheduling {
-
-using data_structures::stamped_integer;
-
-template
-class parallel_iterator_task : public abstract_task {
- alignas(64) const int step = 8;
-
- alignas(64) RandomIt first_, last_;
- alignas(64) Function function_;
-
- // External stealing
- alignas(64) std::atomic first_index_;
- alignas(64) std::atomic to_be_processed_;
- alignas(64) std::atomic last_index_;
-
- alignas(64) parallel_iterator_task *parent_;
-
- bool steal_front(size_t &stolen_max_index);
- bool steal_back(size_t &stolen_first_index, size_t &stolen_last_index);
-
- protected:
- bool internal_stealing(abstract_task *other_task) override;
- bool split_task(base::swmr_spin_lock * /*lock*/) override;
-
- public:
- explicit parallel_iterator_task(RandomIt first, RandomIt last, Function function, const abstract_task::id &id);
- parallel_iterator_task(const parallel_iterator_task &other);
- void execute() override;
-};
-}
-}
-}
-#include "parallel_iterator_task_impl.h"
-
-#endif //PLS_PARALLEL_ITERATOR_TASK_H
diff --git a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h
deleted file mode 100644
index f3a2026..0000000
--- a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h
+++ /dev/null
@@ -1,144 +0,0 @@
-
-#ifndef PLS_PARALLEL_ITERATOR_TASK_IMPL_H
-#define PLS_PARALLEL_ITERATOR_TASK_IMPL_H
-
-#include "scheduler.h"
-namespace pls {
-namespace internal {
-namespace scheduling {
-template
-parallel_iterator_task::parallel_iterator_task
- (RandomIt first, RandomIt last, Function function, const abstract_task::id &id):
- abstract_task(0, id),
- first_{first},
- last_{last},
- function_{function},
- first_index_{0},
- to_be_processed_{std::distance(first, last)},
- last_index_{stamped_integer{0, std::distance(first, last)}},
- parent_{nullptr} {}
-
-template
-parallel_iterator_task::parallel_iterator_task(const pls::internal::scheduling::parallel_iterator_task<
- RandomIt,
- Function> &other):
- abstract_task{other.depth(), other.unique_id()},
- first_{other.first_},
- last_{other.last_},
- function_{other.function_},
- first_index_{other.first_index_.load()},
- to_be_processed_{other.to_be_processed_.load()},
- last_index_{other.last_index_.load()},
- parent_{other.parent_} {}
-
-template
-void parallel_iterator_task::execute() {
- // Start processing at beginning of our data
- size_t current_index = 0;
- auto current_iterator = first_;
-
- // Keep going as long as we have data
- while (true) {
- // Claim next chunk of data for us
- size_t local_max_index;
- if (!steal_front(local_max_index)) {
- break;
- }
-
- // Process Chunk
- for (; current_index != local_max_index; current_index++) {
- function_(*(current_iterator++));
- }
- }
-
- to_be_processed_ -= current_index;
- while (to_be_processed_.load() > 0)
- steal_work();
- if (parent_ != nullptr) {
- parent_->to_be_processed_ -= std::distance(first_, last_);
- }
-}
-
-template
-bool parallel_iterator_task::steal_front(size_t &stolen_max) {
- auto local_first_index = first_index_.load();
- auto local_last_index = last_index_.load();
-
- if (local_first_index >= local_last_index.value) {
- return false;
- }
-
- // Proceed the first index == take part of the work for us
- auto new_first_index = std::min(local_first_index + step, local_last_index.value);
- first_index_ = new_first_index;
- // Reload last index
- local_last_index = last_index_.load();
- // Enough distance
- if (new_first_index < local_last_index.value) {
- stolen_max = new_first_index;
- return true;
- }
-
- // Fight over last element
- if (new_first_index == local_last_index.value) {
- auto new_last_index = stamped_integer{local_last_index.stamp + 1, local_last_index.value};
- if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) {
- stolen_max = new_first_index;
- return true;
- }
- }
-
- // All iterator elements are assigned to some executor
- return false;
-}
-
-template
-bool parallel_iterator_task::steal_back(size_t &stolen_first_index, size_t &stolen_last_index) {
- auto local_first_index = first_index_.load();
- auto local_last_index = last_index_.load();
-
- if (local_first_index >= local_last_index.value) {
- return false;
- }
-
- // Try to steal using cas
- auto target_last_index = std::max(local_last_index.value - step, local_first_index);
- auto new_last_index = stamped_integer{local_last_index.stamp + 1, target_last_index};
- if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) {
- stolen_first_index = new_last_index.value;
- stolen_last_index = local_last_index.value;
- return true;
- }
-
- return false;
-}
-
-template
-bool parallel_iterator_task::split_task(base::swmr_spin_lock *lock) {
- auto depth = this->depth();
- auto id = this->unique_id();
- size_t stolen_first_index, stolen_last_index;
- if (!steal_back(stolen_first_index, stolen_last_index)) {
- lock->reader_unlock();
- return false;
- }
-
- lock->reader_unlock();
- parallel_iterator_task new_task{first_ + stolen_first_index, first_ + stolen_last_index, function_, id};
- new_task.parent_ = this;
- scheduler::execute_task(new_task, depth);
-
- return true;
-}
-
-template
-bool parallel_iterator_task::internal_stealing(abstract_task */*other_task*/) {
- // Do not allow for now, eases up on ABA problem
- return false;
-}
-}
-}
-}
-
-#endif //PLS_PARALLEL_ITERATOR_TASK_IMPL_H
diff --git a/lib/pls/include/pls/internal/scheduling/root_task.h b/lib/pls/include/pls/internal/scheduling/root_task.h
deleted file mode 100644
index 32a6ea2..0000000
--- a/lib/pls/include/pls/internal/scheduling/root_task.h
+++ /dev/null
@@ -1,82 +0,0 @@
-
-#ifndef PLS_ROOT_MASTER_TASK_H
-#define PLS_ROOT_MASTER_TASK_H
-
-#include
-
-#include "pls/internal/helpers/profiler.h"
-#include "pls/internal/base/swmr_spin_lock.h"
-
-#include "abstract_task.h"
-
-namespace pls {
-namespace internal {
-namespace scheduling {
-
-template
-class root_task : public abstract_task {
- Function function_;
- std::atomic_uint8_t finished_;
- public:
- static constexpr auto create_id = helpers::unique_id::create>;
-
- explicit root_task(Function function) :
- abstract_task{0, create_id()},
- function_{function},
- finished_{0} {}
- root_task(const root_task &other) :
- abstract_task{0, create_id()},
- function_{other.function_},
- finished_{0} {}
-
- bool finished() {
- return finished_;
- }
-
- void execute() override {
- PROFILE_WORK_BLOCK("execute root_task");
- function_();
- finished_ = 1;
- }
-
- bool internal_stealing(abstract_task * /*other_task*/) override {
- return false;
- }
-
- bool split_task(base::swmr_spin_lock * /*lock*/) override {
- return false;
- }
-};
-
-template
-class root_worker_task : public abstract_task {
- root_task *master_task_;
-
- public:
- static constexpr auto create_id = root_task::create_id;
-
- explicit root_worker_task(root_task *master_task) :
- abstract_task{0, create_id()},
- master_task_{master_task} {}
-
- void execute() override {
- PROFILE_WORK_BLOCK("execute root_task");
- do {
- steal_work();
- } while (!master_task_->finished());
- }
-
- bool internal_stealing(abstract_task * /*other_task*/) override {
- return false;
- }
-
- bool split_task(base::swmr_spin_lock * /*lock*/) override {
- return false;
- }
-};
-
-}
-}
-}
-
-#endif //PLS_ROOT_MASTER_TASK_H
diff --git a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h
deleted file mode 100644
index f412cbd..0000000
--- a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h
+++ /dev/null
@@ -1,120 +0,0 @@
-
-#ifndef PLS_RUN_ON_N_THREADS_TASK_H
-#define PLS_RUN_ON_N_THREADS_TASK_H
-
-#include
-
-#include "pls/internal/base/spin_lock.h"
-#include "pls/internal/base/thread.h"
-
-#include "abstract_task.h"
-#include "thread_state.h"
-#include "scheduler.h"
-
-namespace pls {
-namespace internal {
-namespace scheduling {
-
-template
-class run_on_n_threads_task : public abstract_task {
- template
- friend
- class run_on_n_threads_task_worker;
-
- Function function_;
-
- // Improvement: Remove lock and replace by atomic variable (performance)
- int counter;
- base::spin_lock counter_lock_;
-
- int decrement_counter() {
- std::lock_guard lock{counter_lock_};
- counter--;
- return counter;
- }
-
- int get_counter() {
- std::lock_guard lock{counter_lock_};
- return counter;
- }
- public:
- static constexpr auto create_id = helpers::unique_id::create>;
-
- run_on_n_threads_task(Function function, int num_threads) :
- abstract_task{0, create_id()},
- function_{function},
- counter{num_threads - 1} {}
-
- void execute() override {
- // Execute our function ONCE
- function_();
-
- // Steal until we are finished (other threads executed)
- do {
- steal_work();
- } while (get_counter() > 0);
-
- std::cout << "Finished Master!" << std::endl;
- }
-
- bool internal_stealing(abstract_task * /*other_task*/) override {
- return false;
- }
-
- bool split_task(base::swmr_spin_lock *lock) override;
-};
-
-template
-class run_on_n_threads_task_worker : public abstract_task {
- Function function_;
- run_on_n_threads_task *root_;
- public:
- static constexpr auto create_id = helpers::unique_id::create>;
-
- run_on_n_threads_task_worker(Function function, run_on_n_threads_task *root) :
- abstract_task{0, create_id()},
- function_{function},
- root_{root} {}
-
- void execute() override {
- if (root_->decrement_counter() >= 0) {
- function_();
- std::cout << "Finished Worker!" << std::endl;
- } else {
- std::cout << "Abandoned Worker!" << std::endl;
- }
- }
-
- bool internal_stealing(abstract_task * /*other_task*/) override {
- return false;
- }
-
- bool split_task(base::swmr_spin_lock * /*lock*/) override {
- return false;
- }
-};
-
-template
-bool run_on_n_threads_task::split_task(base::swmr_spin_lock *lock) {
- if (get_counter() <= 0) {
- return false;
- }
- // In success case, unlock.
- lock->reader_unlock();
-
- auto scheduler = base::this_thread::state()->scheduler_;
- auto task = run_on_n_threads_task_worker{function_, this};
- scheduler->execute_task(task, depth());
- return true;
-}
-
-template
-run_on_n_threads_task create_run_on_n_threads_task(Function function, int num_threads) {
- return run_on_n_threads_task{function, num_threads};
-}
-
-}
-}
-}
-
-#endif //PLS_RUN_ON_N_THREADS_TASK_H
diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h
index 583262a..54075fc 100644
--- a/lib/pls/include/pls/internal/scheduling/scheduler.h
+++ b/lib/pls/include/pls/internal/scheduling/scheduler.h
@@ -12,32 +12,55 @@
#include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h"
-#include "thread_state.h"
-#include "root_task.h"
-#include "scheduler_memory.h"
+#include "pls/internal/scheduling/scheduler_memory.h"
+#include "pls/internal/scheduling/thread_state.h"
+#include "pls/internal/scheduling/task.h"
namespace pls {
namespace internal {
namespace scheduling {
-void worker_routine();
using scheduler_thread = base::thread;
+/**
+ * The scheduler is the central part of the dispatching-framework.
+ * It manages a pool of worker threads (creates, sleeps/wakes up, destroys)
+ * and allows to execute parallel sections.
+ *
+ * It works in close rellation with the 'task' class for scheduling.
+ */
class scheduler {
- friend void worker_routine();
-
+ friend class task;
const unsigned int num_threads_;
scheduler_memory *memory_;
base::barrier sync_barrier_;
+
+ task *main_thread_root_task_;
+ std::atomic work_section_done_;
+
bool terminated_;
public:
+ /**
+ * Initializes a scheduler instance with the given number of threads.
+ * This will spawn the threads and put them to sleep, ready to process an
+ * upcoming parallel section.
+ *
+ * @param memory All memory is allocated statically, thus the user is required to provide the memory instance.
+ * @param num_threads The number of worker threads to be created.
+ */
explicit scheduler(scheduler_memory *memory, unsigned int num_threads);
+
+ /**
+ * The scheduler is implicitly terminated as soon as it leaves the scope.
+ */
~scheduler();
/**
* Wakes up the thread pool.
* Code inside the Function lambda can invoke all parallel APIs.
+ * This is meant to cleanly sleep and wake up the scheduler during an application run,
+ * e.g. to run parallel code on a timer loop/after interrupts.
*
* @param work_section generic function or lambda to be executed in the scheduler's context.
*/
@@ -45,20 +68,46 @@ class scheduler {
void perform_work(Function work_section);
/**
- * Executes a top-level-task (children of abstract_task) on this thread.
+ * Explicitly terminate the worker threads. Scheduler must not be used after this.
*
- * @param task The task to be executed.
- * @param depth Optional: depth of the new task, otherwise set implicitly.
+ * @param wait_for_workers Set to true if you wish to return from this method only after the workers are shut down.
*/
- template
- static void execute_task(Task &task, int depth = -1);
+ void terminate(bool wait_for_workers = true);
- static abstract_task *current_task() { return base::this_thread::state()->current_task_; }
+ /**
+ * Helper to spawn a child on the currently running task.
+ *
+ * @tparam T type of the new task
+ * @param sub_task the new task to be spawned
+ */
+ template
+ static void spawn_child(T &sub_task);
- void terminate(bool wait_for_workers = true);
+ /**
+ * Helper to spawn a child on the currently running task and waiting for it (skipping over the task-deque).
+ *
+ * @tparam T type of the new task
+ * @param sub_task the new task to be spawned
+ */
+ template
+ static void spawn_child_and_wait(T &sub_task);
+
+ /**
+ * Helper to wait for all children of the currently executing task.
+ */
+ static void wait_for_all();
unsigned int num_threads() const { return num_threads_; }
- thread_state *thread_state_for(size_t id) { return memory_->thread_state_for(id); }
+
+ private:
+ static void worker_routine();
+ thread_state *thread_state_for(size_t id);
+
+ task *get_local_task();
+ task *steal_task();
+
+ bool try_execute_local();
+ bool try_execute_stolen();
};
}
diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h
index 0af46c7..98156dc 100644
--- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h
+++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h
@@ -2,71 +2,41 @@
#ifndef PLS_SCHEDULER_IMPL_H
#define PLS_SCHEDULER_IMPL_H
+#include "pls/internal/scheduling/lambda_task.h"
+
namespace pls {
namespace internal {
namespace scheduling {
+// TODO: generally look into the performance implications of using many thread_state::get() calls
+
template
void scheduler::perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work")
- root_task master{work_section};
- // Push root task on stacks
- auto new_master = memory_->task_stack_for(0)->push(master);
- memory_->thread_state_for(0)->root_task_ = new_master;
- memory_->thread_state_for(0)->current_task_ = new_master;
- for (unsigned int i = 1; i < num_threads_; i++) {
- root_worker_task worker{new_master};
- auto new_worker = memory_->task_stack_for(0)->push(worker);
- memory_->thread_state_for(i)->root_task_ = new_worker;
- memory_->thread_state_for(i)->current_task_ = new_worker;
- }
+// if (execute_main_thread) {
+// work_section();
+//
+// sync_barrier_.wait(); // Trigger threads to wake up
+// sync_barrier_.wait(); // Wait for threads to finish
+// } else {
+ lambda_task_by_reference root_task{work_section};
+ main_thread_root_task_ = &root_task;
+ work_section_done_ = false;
- // Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
-
- // Clean up stack
- memory_->task_stack_for(0)->pop();
- for (unsigned int i = 1; i < num_threads_; i++) {
- root_worker_task worker{new_master};
- memory_->task_stack_for(0)->pop();
- }
+// }
}
-template
-void scheduler::execute_task(Task &task, int depth) {
- static_assert(std::is_base_of::value, "Only pass abstract_task subclasses!");
-
- auto my_state = base::this_thread::state();
- abstract_task *old_task;
- abstract_task *new_task;
-
- // Init Task
- old_task = my_state->current_task_;
- new_task = my_state->task_stack_->push(task);
-
- new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
-
- {
- my_state->lock_.writer_lock();
- my_state->current_task_ = new_task;
- old_task->set_child(new_task);
- my_state->lock_.writer_unlock();
- }
-
- // Run Task
- new_task->execute();
-
- // Teardown state back to before the task was executed
- my_state->task_stack_->pop();
+template
+void scheduler::spawn_child(T &sub_task) {
+ thread_state::get()->current_task_->spawn_child(sub_task);
+}
- {
- my_state->lock_.writer_lock();
- old_task->set_child(nullptr);
- my_state->current_task_ = old_task;
- my_state->lock_.writer_unlock();
- }
+template
+void scheduler::spawn_child_and_wait(T &sub_task) {
+ thread_state::get()->current_task_->spawn_child_and_wait(sub_task);
}
}
diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h
index 602736a..57a53dc 100644
--- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h
+++ b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h
@@ -1,10 +1,10 @@
+#ifndef PLS_SCHEDULER_MEMORY_H
+#define PLS_SCHEDULER_MEMORY_H
+
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/thread.h"
-#include "thread_state.h"
-
-#ifndef PLS_SCHEDULER_MEMORY_H
-#define PLS_SCHEDULER_MEMORY_H
+#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
@@ -14,11 +14,36 @@ void worker_routine();
using scheduler_thread = base::thread;
class scheduler_memory {
+ private:
+ size_t max_threads_;
+ thread_state **thread_states_;
+ scheduler_thread **threads_;
+ data_structures::aligned_stack **task_stacks_;
+
+ protected:
+ void init(size_t max_therads,
+ thread_state **thread_states,
+ scheduler_thread **threads,
+ data_structures::aligned_stack **task_stacks) {
+ max_threads_ = max_therads;
+ thread_states_ = thread_states;
+ threads_ = threads;
+ task_stacks_ = task_stacks;
+ }
+
public:
- virtual size_t max_threads() const = 0;
- virtual thread_state *thread_state_for(size_t id) = 0;
- virtual scheduler_thread *thread_for(size_t id) = 0;
- virtual data_structures::aligned_stack *task_stack_for(size_t id) = 0;
+ size_t max_threads() const {
+ return max_threads_;
+ }
+ thread_state *thread_state_for(size_t id) const {
+ return thread_states_[id];
+ }
+ scheduler_thread *thread_for(size_t id) const {
+ return threads_[id];
+ }
+ data_structures::aligned_stack *task_stack_for(size_t id) const {
+ return task_stacks_[id];
+ }
};
template
@@ -31,23 +56,30 @@ class static_scheduler_memory : public scheduler_memory {
using aligned_thread_stack = base::alignment::aligned_wrapper>;
using aligned_aligned_stack = base::alignment::aligned_wrapper;
+ // Actual Memory
std::array threads_;
std::array thread_states_;
std::array task_stacks_memory_;
std::array task_stacks_;
+ // References for parent
+ std::array thread_refs_;
+ std::array thread_state_refs_;
+ std::array task_stack_refs_;
+
public:
- static_scheduler_memory() {
+ static_scheduler_memory() : scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(),
TASK_STACK_SIZE);
+
+ thread_refs_[i] = threads_[i].pointer();
+ thread_state_refs_[i] = thread_states_[i].pointer();
+ task_stack_refs_[i] = task_stacks_[i].pointer();
}
- }
- size_t max_threads() const override { return MAX_THREADS; }
- thread_state *thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
- scheduler_thread *thread_for(size_t id) override { return threads_[id].pointer(); }
- data_structures::aligned_stack *task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
+ init(MAX_THREADS, thread_state_refs_.data(), thread_refs_.data(), task_stack_refs_.data());
+ }
};
class malloc_scheduler_memory : public scheduler_memory {
@@ -60,18 +92,20 @@ class malloc_scheduler_memory : public scheduler_memory {
const size_t num_threads_;
+ // Actual Memory
aligned_thread *threads_;
aligned_thread_state *thread_states_;
char **task_stacks_memory_;
aligned_aligned_stack *task_stacks_;
+
+ // References for parent
+ scheduler_thread **thread_refs_;
+ thread_state **thread_state_refs_;
+ data_structures::aligned_stack **task_stack_refs_;
+
public:
explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16);
~malloc_scheduler_memory();
-
- size_t max_threads() const override { return num_threads_; }
- thread_state *thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
- scheduler_thread *thread_for(size_t id) override { return threads_[id].pointer(); }
- data_structures::aligned_stack *task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
};
}
diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h
new file mode 100644
index 0000000..cdeb9f2
--- /dev/null
+++ b/lib/pls/include/pls/internal/scheduling/task.h
@@ -0,0 +1,80 @@
+
+#ifndef PLS_TASK_H
+#define PLS_TASK_H
+
+#include "pls/internal/helpers/profiler.h"
+
+#include "pls/internal/data_structures/aligned_stack.h"
+#include "pls/internal/data_structures/deque.h"
+
+#include "pls/internal/scheduling/thread_state.h"
+
+namespace pls {
+namespace internal {
+namespace scheduling {
+
+class task {
+ friend class scheduler;
+
+ // Coordinate finishing of sub_tasks
+ std::atomic ref_count_;
+ task *parent_;
+
+ // Stack Management (reset stack pointer after wait_for_all() calls)
+ data_structures::deque::state deque_state_;
+
+ protected:
+ // TODO: Double Check with copy and move constructors, try to minimize overhead while keeping a clean API.
+ explicit task();
+ task(const task &other);
+
+ /**
+ * Overwrite this with the actual behaviour of concrete tasks.
+ */
+ virtual void execute_internal() = 0;
+
+ template
+ void spawn_child(T &&sub_task);
+ template
+ void spawn_child_and_wait(T &&sub_task);
+ void wait_for_all();
+
+ private:
+ void execute();
+};
+
+template
+void task::spawn_child(T &&sub_task) {
+ PROFILE_FORK_JOIN_STEALING("spawn_child")
+ static_assert(std::is_base_of::type>::value, "Only pass task subclasses!");
+
+ // Keep our refcount up to date
+ ref_count_++;
+
+ // Assign forced values (for stack and parent management)
+ sub_task.parent_ = this;
+ sub_task.deque_state_ = thread_state::get()->deque_.save_state();
+
+ // Push on our deque
+ const T const_task = sub_task;
+ thread_state::get()->deque_.push_tail(const_task);
+}
+
+template
+void task::spawn_child_and_wait(T &&sub_task) {
+ PROFILE_FORK_JOIN_STEALING("spawn_child")
+ static_assert(std::is_base_of::type>::value, "Only pass task subclasses!");
+
+ // Assign forced values (for stack and parent management)
+ sub_task.parent_ = nullptr;
+ sub_task.deque_state_ = thread_state::get()->deque_.save_state();
+ sub_task.execute();
+
+ wait_for_all();
+}
+
+}
+}
+}
+
+#endif //PLS_TASK_H
diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h
index 0efbf78..0787555 100644
--- a/lib/pls/include/pls/internal/scheduling/thread_state.h
+++ b/lib/pls/include/pls/internal/scheduling/thread_state.h
@@ -4,9 +4,10 @@
#include
+#include "pls/internal/base/thread.h"
+
#include "pls/internal/data_structures/aligned_stack.h"
-#include "pls/internal/base/swmr_spin_lock.h"
-#include "abstract_task.h"
+#include "pls/internal/data_structures/deque.h"
namespace pls {
namespace internal {
@@ -14,33 +15,40 @@ namespace scheduling {
// forward declaration
class scheduler;
+class task;
struct thread_state {
alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_;
- alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *root_task_;
- alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *current_task_;
+ alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_;
+ alignas(base::system_details::CACHE_LINE_SIZE) data_structures::deque deque_;
alignas(base::system_details::CACHE_LINE_SIZE) size_t id_;
- alignas(base::system_details::CACHE_LINE_SIZE) base::swmr_spin_lock lock_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
thread_state() :
scheduler_{nullptr},
- root_task_{nullptr},
current_task_{nullptr},
task_stack_{nullptr},
+ deque_{task_stack_},
id_{0},
- lock_{},
random_{id_} {};
thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) :
scheduler_{scheduler},
- root_task_{nullptr},
current_task_{nullptr},
task_stack_{task_stack},
+ deque_{task_stack_},
id_{id},
- lock_{},
random_{id_} {}
+
+ /**
+ * Convenience helper to get the thread_state instance associated with this thread.
+ * Must only be called on threads that are associated with a thread_state,
+ * this will most likely be threads created by the scheduler.
+ *
+ * @return The thread_state of this thread.
+ */
+ static thread_state *get() { return base::this_thread::state(); }
};
}
diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h
index accbcc3..0602610 100644
--- a/lib/pls/include/pls/pls.h
+++ b/lib/pls/include/pls/pls.h
@@ -3,8 +3,7 @@
#include "pls/algorithms/invoke_parallel.h"
#include "pls/algorithms/parallel_for.h"
-#include "pls/internal/scheduling/abstract_task.h"
-#include "pls/internal/scheduling/fork_join_task.h"
+#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h"
@@ -14,17 +13,15 @@ using internal::scheduling::static_scheduler_memory;
using internal::scheduling::malloc_scheduler_memory;
using internal::scheduling::scheduler;
-using task_id = internal::scheduling::abstract_task::id;
using unique_id = internal::helpers::unique_id;
-using internal::scheduling::fork_join_sub_task;
-using internal::scheduling::fork_join_lambda_by_reference;
-using internal::scheduling::fork_join_lambda_by_value;
-using internal::scheduling::fork_join_task;
+using internal::scheduling::task;
+using internal::scheduling::lambda_task_by_reference;
+using internal::scheduling::lambda_task_by_value;
+using internal::scheduling::task;
using algorithm::invoke_parallel;
-using algorithm::parallel_for_fork_join;
using algorithm::parallel_for;
}
diff --git a/lib/pls/src/internal/data_structures/deque.cpp b/lib/pls/src/internal/data_structures/deque.cpp
deleted file mode 100644
index 9f13b0d..0000000
--- a/lib/pls/src/internal/data_structures/deque.cpp
+++ /dev/null
@@ -1,60 +0,0 @@
-#include
-
-#include "pls/internal/data_structures/deque.h"
-
-namespace pls {
-namespace internal {
-namespace data_structures {
-
-deque_item *deque_internal::pop_head_internal() {
- std::lock_guard lock{lock_};
-
- if (head_ == nullptr) {
- return nullptr;
- }
-
- deque_item *result = head_;
- head_ = head_->next_;
- if (head_ == nullptr) {
- tail_ = nullptr;
- } else {
- head_->prev_ = nullptr;
- }
-
- return result;
-}
-
-deque_item *deque_internal::pop_tail_internal() {
- std::lock_guard lock{lock_};
-
- if (tail_ == nullptr) {
- return nullptr;
- }
-
- deque_item *result = tail_;
- tail_ = tail_->prev_;
- if (tail_ == nullptr) {
- head_ = nullptr;
- } else {
- tail_->next_ = nullptr;
- }
-
- return result;
-}
-
-void deque_internal::push_tail_internal(deque_item *new_item) {
- std::lock_guard lock{lock_};
-
- if (tail_ != nullptr) {
- tail_->next_ = new_item;
- } else {
- head_ = new_item;
- }
- new_item->prev_ = tail_;
- new_item->next_ = nullptr;
- tail_ = new_item;
-}
-
-}
-}
-}
diff --git a/lib/pls/src/internal/scheduling/abstract_task.cpp b/lib/pls/src/internal/scheduling/abstract_task.cpp
deleted file mode 100644
index 0fb15d2..0000000
--- a/lib/pls/src/internal/scheduling/abstract_task.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-#include
-#include "pls/internal/helpers/profiler.h"
-
-#include "pls/internal/scheduling/thread_state.h"
-#include "pls/internal/scheduling/abstract_task.h"
-#include "pls/internal/scheduling/scheduler.h"
-
-namespace pls {
-namespace internal {
-namespace scheduling {
-
-bool abstract_task::steal_work() {
-// thread_local static base::backoff backoff{};
-
- PROFILE_STEALING("abstract_task::steal_work")
- const auto my_state = base::this_thread::state();
- const auto my_scheduler = my_state->scheduler_;
-
- const size_t my_id = my_state->id_;
- const size_t offset = my_state->random_() % my_scheduler->num_threads();
- const size_t max_tries = my_scheduler->num_threads(); // TODO: Tune this value
- for (size_t i = 0; i < max_tries; i++) {
- size_t target = (offset + i) % my_scheduler->num_threads();
- if (target == my_id) {
- continue;
- }
- auto target_state = my_scheduler->thread_state_for(target);
-
- if (!target_state->lock_.reader_try_lock()) {
- continue;
- }
-
- // Dig down to our level
- PROFILE_STEALING("Go to our level")
- abstract_task *current_task = target_state->root_task_;
- while (current_task != nullptr && current_task->depth() < depth()) {
- current_task = current_task->child();
- }
- PROFILE_END_BLOCK
-
- // Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation
- PROFILE_STEALING("Internal Steal")
- if (current_task != nullptr) {
- // See if it equals our type and depth of task
- if (current_task->unique_id_ == unique_id_ &&
- current_task->depth_ == depth_) {
- if (internal_stealing(current_task)) {
- // internal steal was a success, hand it back to the internal scheduler
- target_state->lock_.reader_unlock();
-// backoff.reset();
- return true;
- }
-
- // No success, we need to steal work from a deeper level using 'top level task stealing'
- current_task = current_task->child();
- }
- }
- PROFILE_END_BLOCK;
-
-
- // Execute 'top level task steal' if possible
- // (only try deeper tasks to keep depth restricted stealing).
- PROFILE_STEALING("Top Level Steal")
- while (current_task != nullptr) {
- auto lock = &target_state->lock_;
- if (current_task->split_task(lock)) {
- // top level steal was a success (we did a top level task steal)
-// backoff.reset();
- return false;
- }
-
- current_task = current_task->child_task_;
- }
- PROFILE_END_BLOCK;
- target_state->lock_.reader_unlock();
- }
-
- // internal steal was no success
-// backoff.do_backoff();
-// base::this_thread::sleep(5);
- return false;
-}
-
-}
-}
-}
diff --git a/lib/pls/src/internal/scheduling/fork_join_task.cpp b/lib/pls/src/internal/scheduling/fork_join_task.cpp
deleted file mode 100644
index 115de70..0000000
--- a/lib/pls/src/internal/scheduling/fork_join_task.cpp
+++ /dev/null
@@ -1,129 +0,0 @@
-#include "pls/internal/helpers/profiler.h"
-
-#include "pls/internal/scheduling/scheduler.h"
-#include "pls/internal/scheduling/fork_join_task.h"
-
-namespace pls {
-namespace internal {
-namespace scheduling {
-
-fork_join_sub_task::fork_join_sub_task() :
- ref_count_{0},
- parent_{nullptr},
- tbb_task_{nullptr},
- deque_state_{0} {}
-
-fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) :
- ref_count_{0},
- parent_{other.parent_},
- tbb_task_{other.tbb_task_},
- deque_state_{other.deque_state_} {}
-
-void fork_join_sub_task::execute() {
- PROFILE_WORK_BLOCK("execute sub_task")
- auto last_executing = tbb_task_->currently_executing_;
- tbb_task_->currently_executing_ = this;
- execute_internal();
- tbb_task_->currently_executing_ = last_executing;
- PROFILE_END_BLOCK
- wait_for_all();
-
- if (parent_ != nullptr) {
- parent_->ref_count_--;
- }
-}
-
-void fork_join_sub_task::wait_for_all() {
- while (ref_count_ > 0) {
- PROFILE_STEALING("get local sub task")
- fork_join_sub_task *local_task = tbb_task_->get_local_sub_task();
- PROFILE_END_BLOCK
- if (local_task != nullptr) {
- local_task->execute();
- } else {
- // Try to steal work.
- // External steal will be executed implicitly if success
- PROFILE_STEALING("steal work")
- bool internal_steal_success = tbb_task_->steal_work();
- PROFILE_END_BLOCK
- if (internal_steal_success) {
- tbb_task_->last_stolen_->execute();
- }
- }
- }
- tbb_task_->deque_.release_memory_until(deque_state_);
-}
-
-fork_join_sub_task *fork_join_task::get_local_sub_task() {
- return deque_.pop_tail();
-}
-
-fork_join_sub_task *fork_join_task::get_stolen_sub_task() {
- return deque_.pop_head();
-}
-
-fork_join_sub_task *fork_join_sub_task::current() {
- return dynamic_cast(scheduler::current_task())->currently_executing();
-}
-
-bool fork_join_task::internal_stealing(abstract_task *other_task) {
- PROFILE_STEALING("fork_join_task::internal_stealin")
- auto cast_other_task = reinterpret_cast(other_task);
-
- auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
- if (stolen_sub_task == nullptr) {
- return false;
- } else {
- // Make sub-task belong to our fork_join_task instance
- stolen_sub_task->tbb_task_ = this;
- stolen_sub_task->deque_state_ = deque_.save_state();
- // We will execute this next without explicitly moving it onto our stack storage
- last_stolen_ = stolen_sub_task;
-
- return true;
- }
-}
-
-bool fork_join_task::split_task(base::swmr_spin_lock *lock) {
- PROFILE_STEALING("fork_join_task::split_task")
- fork_join_sub_task *stolen_sub_task = get_stolen_sub_task();
- if (stolen_sub_task == nullptr) {
- return false;
- }
- fork_join_task task{stolen_sub_task, this->unique_id()};
-
- // In success case, unlock.
- lock->reader_unlock();
-
- scheduler::execute_task(task, depth());
- return true;
-}
-
-void fork_join_task::execute() {
- PROFILE_WORK_BLOCK("execute fork_join_task");
-
- // Bind this instance to our OS thread
- // TODO: See if we did this right
- // my_stack_ = base::this_thread::state()->task_stack_;
- deque_.reset_base_pointer();
-
- root_task_->tbb_task_ = this;
- root_task_->deque_state_ = deque_.save_state();
-
- // Execute it on our OS thread until its finished
- root_task_->execute();
-}
-
-fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; }
-
-fork_join_task::fork_join_task(fork_join_sub_task *root_task,
- const abstract_task::id &id) :
- abstract_task{0, id},
- root_task_{root_task},
- currently_executing_{nullptr},
- deque_{base::this_thread::state()->task_stack_},
- last_stolen_{nullptr} {}
-
-}
-}
-}
diff --git a/lib/pls/src/internal/scheduling/parallel_iterator_task.cpp b/lib/pls/src/internal/scheduling/parallel_iterator_task.cpp
deleted file mode 100644
index 4b22133..0000000
--- a/lib/pls/src/internal/scheduling/parallel_iterator_task.cpp
+++ /dev/null
@@ -1 +0,0 @@
-#include "pls/internal/scheduling/parallel_iterator_task.h"
diff --git a/lib/pls/src/internal/scheduling/root_task.cpp b/lib/pls/src/internal/scheduling/root_task.cpp
deleted file mode 100644
index 7e3ef89..0000000
--- a/lib/pls/src/internal/scheduling/root_task.cpp
+++ /dev/null
@@ -1,9 +0,0 @@
-#include "pls/internal/scheduling/root_task.h"
-
-namespace pls {
-namespace internal {
-namespace scheduling {
-
-}
-}
-}
diff --git a/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp b/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp
deleted file mode 100644
index 178afbb..0000000
--- a/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp
+++ /dev/null
@@ -1,9 +0,0 @@
-#include "pls/internal/scheduling/run_on_n_threads_task.h"
-
-namespace pls {
-namespace internal {
-namespace scheduling {
-
-}
-}
-}
diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp
index 9491a06..4a36132 100644
--- a/lib/pls/src/internal/scheduling/scheduler.cpp
+++ b/lib/pls/src/internal/scheduling/scheduler.cpp
@@ -1,4 +1,7 @@
#include "pls/internal/scheduling/scheduler.h"
+#include "pls/internal/scheduling/thread_state.h"
+#include "pls/internal/scheduling/task.h"
+
#include "pls/internal/base/error_handling.h"
namespace pls {
@@ -17,7 +20,7 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) :
for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized.
new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
- new((void *) memory_->thread_for(i))base::thread(&worker_routine,
+ new((void *) memory_->thread_for(i))base::thread(&scheduler::worker_routine,
memory_->thread_state_for(i));
}
}
@@ -26,20 +29,38 @@ scheduler::~scheduler() {
terminate();
}
-void worker_routine() {
- auto my_state = base::this_thread::state();
+void scheduler::worker_routine() {
+ auto my_state = thread_state::get();
+ auto scheduler = my_state->scheduler_;
while (true) {
- my_state->scheduler_->sync_barrier_.wait();
- if (my_state->scheduler_->terminated_) {
+ // Wait to be triggered
+ scheduler->sync_barrier_.wait();
+
+ // Check for shutdown
+ if (scheduler->terminated_) {
return;
}
- // The root task must only return when all work is done,
- // because of this a simple call is enough to ensure the
- // fork-join-section is done (logically joined back into our main thread).
- my_state->root_task_->execute();
+ // Execute work
+ if (my_state->id_ == 0) {
+ // Main Thread
+ auto root_task = scheduler->main_thread_root_task_;
+ root_task->parent_ = nullptr;
+ root_task->deque_state_ = my_state->deque_.save_state();
+ root_task->execute();
+ scheduler->work_section_done_ = true;
+ } else {
+ // Worker Threads
+ while (!scheduler->work_section_done_) {
+ if (!scheduler->try_execute_local()) {
+ scheduler->try_execute_stolen();
+ }
+ }
+ }
+
+ // Sync back with main thread
my_state->scheduler_->sync_barrier_.wait();
}
}
@@ -59,6 +80,69 @@ void scheduler::terminate(bool wait_for_workers) {
}
}
+task *scheduler::get_local_task() {
+ PROFILE_STEALING("Get Local Task")
+ return thread_state::get()->deque_.pop_tail();
+}
+
+task *scheduler::steal_task() {
+ PROFILE_STEALING("Steal Task")
+
+ // Data for victim selection
+ const auto my_state = thread_state::get();
+
+ const auto my_id = my_state->id_;
+ const size_t offset = my_state->random_() % num_threads();
+ const size_t max_tries = num_threads(); // TODO: Tune this value
+
+ // Current strategy: random start, then round robin from there
+ for (size_t i = 0; i < max_tries; i++) {
+ size_t target = (offset + i) % num_threads();
+
+ // Skip our self for stealing
+ target = ((target == my_id) + target) % num_threads();
+
+ auto target_state = thread_state_for(target);
+ // TODO: See if we should re-try popping if it failed due to contention
+ auto result = target_state->deque_.pop_head();
+ if (result != nullptr) {
+ return result;
+ }
+
+ // TODO: See if we should backoff here (per missed steal)
+ }
+
+ // TODO: See if we should backoff here (after a 'round' of missed steals)
+ return nullptr;
+}
+
+bool scheduler::try_execute_local() {
+ task *local_task = get_local_task();
+ if (local_task != nullptr) {
+ local_task->execute();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool scheduler::try_execute_stolen() {
+ task *stolen_task = steal_task();
+ if (stolen_task != nullptr) {
+ stolen_task->deque_state_ = thread_state::get()->deque_.save_state();
+ stolen_task->execute();
+ return true;
+ }
+
+ return false;
+}
+
+void scheduler::wait_for_all() {
+ thread_state::get()->current_task_->wait_for_all();
+}
+
+thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); }
+
}
}
}
diff --git a/lib/pls/src/internal/scheduling/scheduler_memory.cpp b/lib/pls/src/internal/scheduling/scheduler_memory.cpp
index 7d46744..e59abd3 100644
--- a/lib/pls/src/internal/scheduling/scheduler_memory.cpp
+++ b/lib/pls/src/internal/scheduling/scheduler_memory.cpp
@@ -10,14 +10,25 @@ malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const
reinterpret_cast(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread)));
thread_states_ = reinterpret_cast(base::alignment::allocate_aligned(
num_threads * sizeof(aligned_thread_state)));
-
task_stacks_ = reinterpret_cast(base::alignment::allocate_aligned(
num_threads * sizeof(aligned_aligned_stack)));
task_stacks_memory_ = reinterpret_cast(base::alignment::allocate_aligned(num_threads * sizeof(char *)));
+
+ thread_refs_ = static_cast(malloc(num_threads * sizeof(scheduler_thread *)));
+ thread_state_refs_ = static_cast(malloc(num_threads * sizeof(thread_state *)));
+ task_stack_refs_ =
+ static_cast(malloc(num_threads * sizeof(data_structures::aligned_stack *)));
+
for (size_t i = 0; i < num_threads_; i++) {
task_stacks_memory_[i] = reinterpret_cast(base::alignment::allocate_aligned(memory_per_stack));
new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack);
+
+ thread_refs_[i] = threads_[i].pointer();
+ thread_state_refs_[i] = thread_states_[i].pointer();
+ task_stack_refs_[i] = task_stacks_[i].pointer();
}
+
+ init(num_threads, thread_state_refs_, thread_refs_, task_stack_refs_);
}
malloc_scheduler_memory::~malloc_scheduler_memory() {
@@ -29,6 +40,10 @@ malloc_scheduler_memory::~malloc_scheduler_memory() {
}
free(task_stacks_);
free(task_stacks_memory_);
+
+ free(thread_refs_);
+ free(thread_state_refs_);
+ free(task_stack_refs_);
}
}
diff --git a/lib/pls/src/internal/scheduling/task.cpp b/lib/pls/src/internal/scheduling/task.cpp
new file mode 100644
index 0000000..5a4b1f4
--- /dev/null
+++ b/lib/pls/src/internal/scheduling/task.cpp
@@ -0,0 +1,50 @@
+#include "pls/internal/helpers/profiler.h"
+
+#include "pls/internal/scheduling/scheduler.h"
+#include "pls/internal/scheduling/task.h"
+#include "pls/internal/scheduling/thread_state.h"
+
+namespace pls {
+namespace internal {
+namespace scheduling {
+
+task::task() :
+ ref_count_{0},
+ parent_{nullptr},
+ deque_state_{0} {}
+
+task::task(const task &other) :
+ ref_count_{0},
+ parent_{other.parent_},
+ deque_state_{other.deque_state_} {}
+
+void task::execute() {
+ PROFILE_WORK_BLOCK("execute task")
+ auto last_executing = thread_state::get()->current_task_;
+ thread_state::get()->current_task_ = this;
+
+ execute_internal();
+ PROFILE_END_BLOCK
+
+ wait_for_all();
+ thread_state::get()->current_task_ = last_executing;
+
+ if (parent_ != nullptr) {
+ parent_->ref_count_--;
+ }
+}
+
+void task::wait_for_all() {
+ auto scheduler = thread_state::get()->scheduler_;
+
+ while (ref_count_ > 0) {
+ if (!scheduler->try_execute_local()) {
+ scheduler->try_execute_stolen();
+ }
+ }
+ thread_state::get()->deque_.release_memory_until(deque_state_);
+}
+
+}
+}
+}
diff --git a/lib/pls/src/internal/scheduling/thread_state.cpp b/lib/pls/src/internal/scheduling/thread_state.cpp
deleted file mode 100644
index f503b6a..0000000
--- a/lib/pls/src/internal/scheduling/thread_state.cpp
+++ /dev/null
@@ -1,9 +0,0 @@
-#include "pls/internal/scheduling/thread_state.h"
-
-namespace pls {
-namespace internal {
-namespace scheduling {
-
-}
-}
-}
diff --git a/media/3bdaba42_fft_average.png b/media/3bdaba42_fft_average.png
new file mode 100644
index 0000000..b558191
Binary files /dev/null and b/media/3bdaba42_fft_average.png differ
diff --git a/media/3bdaba42_heat_average.png b/media/3bdaba42_heat_average.png
new file mode 100644
index 0000000..edb98d2
Binary files /dev/null and b/media/3bdaba42_heat_average.png differ
diff --git a/media/3bdaba42_matrix_average.png b/media/3bdaba42_matrix_average.png
new file mode 100644
index 0000000..9538c36
Binary files /dev/null and b/media/3bdaba42_matrix_average.png differ
diff --git a/media/3bdaba42_unbalanced_average.png b/media/3bdaba42_unbalanced_average.png
new file mode 100644
index 0000000..ee1415b
Binary files /dev/null and b/media/3bdaba42_unbalanced_average.png differ
diff --git a/media/5044f0a1_fft_average.png b/media/5044f0a1_fft_average.png
new file mode 100644
index 0000000..73736b2
Binary files /dev/null and b/media/5044f0a1_fft_average.png differ
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 7f224a1..416b530 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -1,4 +1,5 @@
add_executable(tests
main.cpp
- data_structures_test.cpp)
+ data_structures_test.cpp
+ scheduling_tests.cpp)
target_link_libraries(tests catch2 pls)
diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp
index 97f91ca..0b27ee0 100644
--- a/test/data_structures_test.cpp
+++ b/test/data_structures_test.cpp
@@ -3,7 +3,7 @@
#include
#include
-#include
+#include
#include
#include
@@ -77,58 +77,62 @@ TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/a
}
TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]") {
- class my_item : public deque_item {
+ class my_item {
};
- deque deque;
- my_item one, two, three;
+ constexpr long data_size = 2 << 14;
+ char data[data_size];
+ aligned_stack stack{data, data_size};
+
+ locking_deque deque{&stack};
+ int one = 1, two = 2, three = 3;
SECTION("add and remove items form the tail") {
- deque.push_tail(&one);
- deque.push_tail(&two);
- deque.push_tail(&three);
+ deque.push_tail(one);
+ deque.push_tail(two);
+ deque.push_tail(three);
- REQUIRE(deque.pop_tail() == &three);
- REQUIRE(deque.pop_tail() == &two);
- REQUIRE(deque.pop_tail() == &one);
+ REQUIRE(*deque.pop_tail() == three);
+ REQUIRE(*deque.pop_tail() == two);
+ REQUIRE(*deque.pop_tail() == one);
}
SECTION("handles getting empty by popping the tail correctly") {
- deque.push_tail(&one);
- REQUIRE(deque.pop_tail() == &one);
+ deque.push_tail(one);
+ REQUIRE(*deque.pop_tail() == one);
- deque.push_tail(&two);
- REQUIRE(deque.pop_tail() == &two);
+ deque.push_tail(two);
+ REQUIRE(*deque.pop_tail() == two);
}
SECTION("remove items form the head") {
- deque.push_tail(&one);
- deque.push_tail(&two);
- deque.push_tail(&three);
+ deque.push_tail(one);
+ deque.push_tail(two);
+ deque.push_tail(three);
- REQUIRE(deque.pop_head() == &one);
- REQUIRE(deque.pop_head() == &two);
- REQUIRE(deque.pop_head() == &three);
+ REQUIRE(*deque.pop_head() == one);
+ REQUIRE(*deque.pop_head() == two);
+ REQUIRE(*deque.pop_head() == three);
}
SECTION("handles getting empty by popping the head correctly") {
- deque.push_tail(&one);
- REQUIRE(deque.pop_head() == &one);
+ deque.push_tail(one);
+ REQUIRE(*deque.pop_head() == one);
- deque.push_tail(&two);
- REQUIRE(deque.pop_head() == &two);
+ deque.push_tail(two);
+ REQUIRE(*deque.pop_head() == two);
}
SECTION("handles getting empty by popping the head and tail correctly") {
- deque.push_tail(&one);
- REQUIRE(deque.pop_tail() == &one);
+ deque.push_tail(one);
+ REQUIRE(*deque.pop_tail() == one);
- deque.push_tail(&two);
- REQUIRE(deque.pop_head() == &two);
+ deque.push_tail(two);
+ REQUIRE(*deque.pop_head() == two);
- deque.push_tail(&three);
- REQUIRE(deque.pop_tail() == &three);
+ deque.push_tail(three);
+ REQUIRE(*deque.pop_tail() == three);
}
}
diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp
index a442abd..710fc80 100644
--- a/test/scheduling_tests.cpp
+++ b/test/scheduling_tests.cpp
@@ -4,7 +4,7 @@
using namespace pls;
-class once_sub_task : public fork_join_sub_task {
+class once_sub_task : public task {
std::atomic *counter_;
int children_;
@@ -18,12 +18,12 @@ class once_sub_task : public fork_join_sub_task {
public:
explicit once_sub_task(std::atomic *counter, int children) :
- fork_join_sub_task(),
+ task{},
counter_{counter},
children_{children} {}
};
-class force_steal_sub_task : public fork_join_sub_task {
+class force_steal_sub_task : public task {
std::atomic *parent_counter_;
std::atomic *overall_counter_;
@@ -41,7 +41,7 @@ class force_steal_sub_task : public fork_join_sub_task {
public:
explicit force_steal_sub_task(std::atomic *parent_counter, std::atomic *overall_counter) :
- fork_join_sub_task(),
+ task{},
parent_counter_{parent_counter},
overall_counter_{overall_counter} {}
};
@@ -57,8 +57,7 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta
my_scheduler.perform_work([&]() {
once_sub_task sub_task{&counter, start_counter};
- fork_join_task task{&sub_task, unique_id::create(42)};
- scheduler::execute_task(task);
+ scheduler::spawn_child(sub_task);
});
REQUIRE(counter.load() == total_tasks);
@@ -70,8 +69,10 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta
my_scheduler.perform_work([&]() {
std::atomic dummy_parent{1}, overall_counter{8};
force_steal_sub_task sub_task{&dummy_parent, &overall_counter};
- fork_join_task task{&sub_task, unique_id::create(42)};
- scheduler::execute_task(task);
+ scheduler::spawn_child(sub_task);
+
+ // Required, as child operates on our stack's memory!!!
+ scheduler::wait_for_all();
});
my_scheduler.terminate(true);
}