diff --git a/CMakeLists.txt b/CMakeLists.txt
index 941c725..0a66e63 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -7,7 +7,7 @@ set(CMAKE_CXX_STANDARD 11)
# seperate library and test/example executable output paths.
set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin)
-set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/lib)
+set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/lib)
# specific setup code is located in individual files.
include(cmake/DisabelInSource.cmake)
@@ -34,11 +34,12 @@ add_subdirectory(app/playground)
add_subdirectory(app/test_for_new)
add_subdirectory(app/invoke_parallel)
add_subdirectory(app/benchmark_fft)
+add_subdirectory(app/benchmark_unbalanced)
# Add optional tests
option(PACKAGE_TESTS "Build the tests" ON)
-if(PACKAGE_TESTS)
+if (PACKAGE_TESTS)
enable_testing()
add_subdirectory(test)
add_test(NAME AllTests COMMAND tests)
-endif()
+endif ()
diff --git a/NOTES.md b/NOTES.md
index 076b2ca..da478c6 100644
--- a/NOTES.md
+++ b/NOTES.md
@@ -4,6 +4,33 @@ A collection of stuff that we noticed during development.
Useful later on two write a project report and to go back
in time to find out why certain decisions where made.
+## 06.05.2019 - Relaxed Atomics
+
+At the end of the atomic talk it is mentioned how the relaxed
+atomics correspond to modern hardware, stating that starting with
+ARMv8 and x86 there is no real need for relaxed atomics,
+as the 'strict' ordering has no real performance impact.
+
+We will therefore ignore this for now (taking some potential
+performance hits on our banana pi m2 ARMv7 architecture),
+as it adds a lot of complexity and introduces many possibilities
+for subtle concurrency bugs.
+TODO: find some papers on the topic in case we want to mention
+it in our project report.
+
+## 06.05.2019 - Atomics
+
+Atomics are a big part to get lock-free code running and are also
+often useful for details like counters to spin on (e.g. is a task
+finished). Resources to understand them are found in
+[a talk online](https://herbsutter.com/2013/02/11/atomic-weapons-the-c-memory-model-and-modern-hardware/)
+and in the book 'C++ concurrency in action, Anthony Williams'.
+
+The key takeaway is that we need to be careful about ordering
+possibilities of variable reads/writes. We will start of to
+use strict ordering to begin with and will have to see if this
+impacts performance and if we need to optimize it.
+
## 12.04.2019 - Unique IDs
Assigning unique IDs to logical different tasks is key to the
diff --git a/PERFORMANCE.md b/PERFORMANCE.md
index c19cc1c..abe4521 100644
--- a/PERFORMANCE.md
+++ b/PERFORMANCE.md
@@ -63,3 +63,62 @@ to contemption, but could not resolve it with any combination
of `tas_spinlock` vs `ttas_spinlock` and `lock` vs `try_lock`.
This issue clearly needs further investigation.
+
+### Commit aa27064 - Performance with ttsa spinlocks (and 'full blocking' top level)
+
+
+
+### Commit d16ad3e - Performance with rw-lock and backoff
+
+
+
+### Commit 18b2d744 - Performance with lock-free deque
+
+After much tinkering we still have performance problems with higher
+thread counts in the FFT benchmark. Upward from 4/5 threads the
+performance gains start to saturate (before removing the top level
+locks we even saw a slight drop in performance).
+
+Currently the FFT benchmark shows the following results (average):
+
+
+
+We want to positively note that the overall trend of 'performance drops'
+at the hyperthreading mark is not really bad anymore, it rather
+seems similar to EMBB now (with backoff + lockfree deque + top level
+reader-writers lock). This comes partly because the spike at 4 threads
+is lower (less performance at 4 threads). We also see better times
+on the multiprogramed system with the lock-free deque.
+
+This is discouraging after many tests. To see where the overhead lies
+we also implemented the unbalanced tree search benchmark,
+resulting in the following, suprisingly good, results (average):
+
+
+
+The main difference between the two benchmarks is, that the second
+one has more work and the work is relatively independent.
+Additionaly, the first one uses our high level API (parallel invoke),
+while the second one uses our low level API.
+It is worth investigating if either or high level API or the structure
+of the memory access in FFT are the problem.
+
+### Commit cf056856 - Remove two-level scheduler
+
+In this test we replace the two level scheduler with ONLY fork_join
+tasks. This removes the top level steal overhead and performs only
+internal stealing. For this we set the fork_join task as the only
+possible task type and removed the top level rw-lock, the digging
+down to our level and solely use internal stealing.
+
+Average results FFT:
+
+
+
+Average results Unbalanced:
+
+
+
+There seems to be only a minor performance difference between the two,
+suggesting tha our two-level approach is not the part causing our
+weaker performance.
diff --git a/README.md b/README.md
index 16565fa..75d5d8a 100644
--- a/README.md
+++ b/README.md
@@ -7,6 +7,8 @@
This section will give a brief introduction on how to get a minimal
project setup that uses the PLS library.
+Further [general notes](NOTES.md) and [performance notes](PERFORMANCE.md) can be found in
+their respective files.
Further notes on [performance](PERFORMANCE.md) and general
[notes](NOTES.md) on the development progress can be found in
diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp
index 0b0f505..f6ed20e 100644
--- a/app/benchmark_fft/main.cpp
+++ b/app/benchmark_fft/main.cpp
@@ -6,82 +6,81 @@
#include
#include
-static constexpr int CUTOFF = 10;
+static constexpr int CUTOFF = 16;
static constexpr int NUM_ITERATIONS = 1000;
-static constexpr int INPUT_SIZE = 2064;
+static constexpr int INPUT_SIZE = 8192;
typedef std::vector> complex_vector;
void divide(complex_vector::iterator data, int n) {
- complex_vector tmp_odd_elements(n / 2);
- for (int i = 0; i < n / 2; i++) {
- tmp_odd_elements[i] = data[i * 2 + 1];
- }
- for (int i = 0; i < n / 2; i++) {
- data[i] = data[i * 2];
- }
- for (int i = 0; i < n / 2; i++) {
- data[i + n / 2] = tmp_odd_elements[i];
- }
+ complex_vector tmp_odd_elements(n / 2);
+ for (int i = 0; i < n / 2; i++) {
+ tmp_odd_elements[i] = data[i * 2 + 1];
+ }
+ for (int i = 0; i < n / 2; i++) {
+ data[i] = data[i * 2];
+ }
+ for (int i = 0; i < n / 2; i++) {
+ data[i + n / 2] = tmp_odd_elements[i];
+ }
}
void combine(complex_vector::iterator data, int n) {
- for (int i = 0; i < n / 2; i++) {
- std::complex even = data[i];
- std::complex odd = data[i + n / 2];
+ for (int i = 0; i < n / 2; i++) {
+ std::complex even = data[i];
+ std::complex odd = data[i + n / 2];
- // w is the "twiddle-factor".
- // this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
- // so it won't impact the performance comparison.
- std::complex w = exp(std::complex(0, -2. * M_PI * i / n));
+ // w is the "twiddle-factor".
+ // this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
+ // so it won't impact the performance comparison.
+ std::complex w = exp(std::complex(0, -2. * M_PI * i / n));
- data[i] = even + w * odd;
- data[i + n / 2] = even - w * odd;
- }
+ data[i] = even + w * odd;
+ data[i + n / 2] = even - w * odd;
+ }
}
void fft(complex_vector::iterator data, int n) {
- if (n < 2) {
- return;
- }
+ if (n < 2) {
+ return;
+ }
- divide(data, n);
- if (n <= CUTOFF) {
- fft(data, n / 2);
- fft(data + n / 2, n / 2);
- } else {
- pls::invoke_parallel(
- [&] { fft(data, n / 2); },
- [&] { fft(data + n / 2, n / 2); }
- );
- }
- combine(data, n);
+ divide(data, n);
+ if (n <= CUTOFF) {
+ fft(data, n / 2);
+ fft(data + n / 2, n / 2);
+ } else {
+ pls::invoke_parallel(
+ [&] { fft(data, n / 2); },
+ [&] { fft(data + n / 2, n / 2); }
+ );
+ }
+ combine(data, n);
}
complex_vector prepare_input(int input_size) {
- std::vector known_frequencies{2, 11, 52, 88, 256};
- complex_vector data(input_size);
+ std::vector known_frequencies{2, 11, 52, 88, 256};
+ complex_vector data(input_size);
- // Set our input data to match a time series of the known_frequencies.
- // When applying fft to this time-series we should find these frequencies.
- for (int i = 0; i < input_size; i++) {
- data[i] = std::complex(0.0, 0.0);
- for (auto frequencie : known_frequencies) {
- data[i] += sin(2 * M_PI * frequencie * i / input_size);
- }
+ // Set our input data to match a time series of the known_frequencies.
+ // When applying fft to this time-series we should find these frequencies.
+ for (int i = 0; i < input_size; i++) {
+ data[i] = std::complex(0.0, 0.0);
+ for (auto frequencie : known_frequencies) {
+ data[i] += sin(2 * M_PI * frequencie * i / input_size);
}
+ }
- return data;
+ return data;
}
-
int main() {
- PROFILE_ENABLE
- complex_vector initial_input = prepare_input(INPUT_SIZE);
+ PROFILE_ENABLE
+ complex_vector initial_input = prepare_input(INPUT_SIZE);
- pls::internal::helpers::run_mini_benchmark([&] {
- complex_vector input = initial_input;
- fft(input.begin(), input.size());
- }, 8, 4000);
+ pls::internal::helpers::run_mini_benchmark([&] {
+ complex_vector input = initial_input;
+ fft(input.begin(), input.size());
+ }, 8, 4000);
- PROFILE_SAVE("test_profile.prof")
+ PROFILE_SAVE("test_profile.prof")
}
diff --git a/app/benchmark_unbalanced/CMakeLists.txt b/app/benchmark_unbalanced/CMakeLists.txt
new file mode 100644
index 0000000..00c95ab
--- /dev/null
+++ b/app/benchmark_unbalanced/CMakeLists.txt
@@ -0,0 +1,5 @@
+add_executable(benchmark_unbalanced main.cpp node.h node.cpp picosha2.h)
+target_link_libraries(benchmark_unbalanced pls)
+if (EASY_PROFILER)
+ target_link_libraries(benchmark_unbalanced easy_profiler)
+endif ()
diff --git a/app/benchmark_unbalanced/LICENSE_PICOSA2 b/app/benchmark_unbalanced/LICENSE_PICOSA2
new file mode 100644
index 0000000..4e22100
--- /dev/null
+++ b/app/benchmark_unbalanced/LICENSE_PICOSA2
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017 okdshin
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/app/benchmark_unbalanced/main.cpp b/app/benchmark_unbalanced/main.cpp
new file mode 100644
index 0000000..27cdaf3
--- /dev/null
+++ b/app/benchmark_unbalanced/main.cpp
@@ -0,0 +1,78 @@
+#include
+#include
+#include
+
+#include "node.h"
+
+const int SEED = 42;
+const int ROOT_CHILDREN = 140;
+const double Q = 0.124875;
+const int NORMAL_CHILDREN = 8;
+
+const int NUM_NODES = 71069;
+
+int count_child_nodes(uts::node &node) {
+ int child_count = 1;
+ std::vector children = node.spawn_child_nodes();
+
+ if (children.empty()) {
+ return child_count;
+ }
+
+ auto current_task = pls::fork_join_sub_task::current();
+ std::vector results(children.size());
+ for (size_t i = 0; i < children.size(); i++) {
+ size_t index = i;
+ auto lambda = [&, index] { results[index] = count_child_nodes(children[index]); };
+ pls::fork_join_lambda_by_value sub_task(lambda);
+ current_task->spawn_child(sub_task);
+ }
+ current_task->wait_for_all();
+ for (auto result : results) {
+ child_count += result;
+ }
+
+ return child_count;
+}
+
+int unbalanced_tree_search(int seed, int root_children, double q, int normal_children) {
+ static auto id = pls::unique_id::create(42);
+ int result;
+
+ auto lambda = [&] {
+ uts::node root(seed, root_children, q, normal_children);
+ result = count_child_nodes(root);
+ };
+ pls::fork_join_lambda_by_reference task(lambda);
+ pls::fork_join_lambda_by_reference sub_task(lambda);
+ pls::fork_join_task root_task{&sub_task, id};
+ pls::scheduler::execute_task(root_task);
+
+ return result;
+}
+
+int main() {
+ PROFILE_ENABLE
+ pls::internal::helpers::run_mini_benchmark([&] {
+ unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
+ }, 8, 4000);
+
+ PROFILE_SAVE("test_profile.prof")
+}
+
+//int main() {
+// PROFILE_ENABLE
+// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18};
+// pls::scheduler scheduler{&my_scheduler_memory, 8};
+//
+// scheduler.perform_work([&] {
+// PROFILE_MAIN_THREAD
+// for (int i = 0; i < 10; i++) {
+// PROFILE_WORK_BLOCK("Top Level")
+// int result = unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
+// std::cout << result << std::endl;
+// }
+// });
+//
+// PROFILE_SAVE("test_profile.prof")
+//}
diff --git a/app/benchmark_unbalanced/node.cpp b/app/benchmark_unbalanced/node.cpp
new file mode 100644
index 0000000..1cb931e
--- /dev/null
+++ b/app/benchmark_unbalanced/node.cpp
@@ -0,0 +1,28 @@
+#include "node.h"
+
+namespace uts {
+node_state node::generate_child_state(uint32_t index) {
+ node_state result;
+
+ picosha2::hash256_one_by_one hasher;
+ hasher.process(state_.begin(), state_.end());
+ auto index_begin = reinterpret_cast(&index);
+ hasher.process(index_begin, index_begin + 4);
+ hasher.finish();
+ hasher.get_hash_bytes(result.begin(), result.end());
+
+ return result;
+}
+
+double node::get_state_random() {
+ int32_t state_random_integer;
+ uint32_t b = ((uint32_t) state_[16] << 24) |
+ ((uint32_t) state_[17] << 16) |
+ ((uint32_t) state_[18] << 8) |
+ ((uint32_t) state_[19] << 0);
+ b = b & 0x7fffffff; // Mask out negative values
+ state_random_integer = static_cast(b);
+
+ return (double) state_random_integer / (double) INT32_MAX;
+}
+}
diff --git a/app/benchmark_unbalanced/node.h b/app/benchmark_unbalanced/node.h
new file mode 100644
index 0000000..5111059
--- /dev/null
+++ b/app/benchmark_unbalanced/node.h
@@ -0,0 +1,73 @@
+
+#ifndef UTS_NODE_H
+#define UTS_NODE_H
+
+#include
+#include
+#include
+
+#include "picosha2.h"
+
+namespace uts {
+using node_state = std::array;
+
+/**
+ * Node of an unballanced binomial tree (https://www.cs.unc.edu/~olivier/LCPC06.pdf).
+ * To build up the tree recursivly call spawn_child_nodes on each node until leaves are reached.
+ * The tree is not built up directly in memory, but rather by the recursive calls.
+ */
+class node {
+ // The state is used to allow a deterministic tree construction using sha256 hashes.
+ node_state state_;
+
+ // Set this to a positive number for the root node to start the tree with a specific size
+ int root_children_;
+
+ // general branching factors
+ double q_;
+ int b_;
+
+ // Private constructor for children
+ node(node_state state, double q, int b) : state_{state}, root_children_{-1}, q_{q}, b_{b} {}
+
+ std::array generate_child_state(uint32_t index);
+ double get_state_random();
+
+ public:
+ node(int seed, int root_children, double q, int b) : state_({{}}), root_children_{root_children}, q_{q}, b_{b} {
+ for (int i = 0; i < 16; i++) {
+ state_[i] = 0;
+ }
+ state_[16] = static_cast(0xFF & (seed >> 24));
+ state_[17] = static_cast(0xFF & (seed >> 16));
+ state_[18] = static_cast(0xFF & (seed >> 8));
+ state_[19] = static_cast(0xFF & (seed >> 0));
+
+ picosha2::hash256_one_by_one hasher;
+ hasher.process(state_.begin(), state_.end());
+ hasher.finish();
+ hasher.get_hash_bytes(state_.begin(), state_.end());
+ }
+
+ std::vector spawn_child_nodes() {
+ double state_random = get_state_random();
+ int num_children;
+ if (root_children_ > 0) {
+ num_children = root_children_; // Root always spawns children
+ } else if (state_random < q_) {
+ num_children = b_;
+ } else {
+ num_children = 0;
+ }
+
+ std::vector result;
+ for (int i = 0; i < num_children; i++) {
+ result.push_back(node(generate_child_state(i), q_, b_));
+ }
+
+ return result;
+ }
+};
+}
+
+#endif //UTS_NODE_H
diff --git a/app/benchmark_unbalanced/picosha2.h b/app/benchmark_unbalanced/picosha2.h
new file mode 100644
index 0000000..bc00c74
--- /dev/null
+++ b/app/benchmark_unbalanced/picosha2.h
@@ -0,0 +1,377 @@
+/*
+The MIT License (MIT)
+
+Copyright (C) 2017 okdshin
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+*/
+#ifndef PICOSHA2_H
+#define PICOSHA2_H
+// picosha2:20140213
+
+#ifndef PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR
+#define PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR \
+ 1048576 //=1024*1024: default is 1MB memory
+#endif
+
+#include
+#include
+#include
+#include
+#include
+#include
+namespace picosha2 {
+typedef unsigned long word_t;
+typedef unsigned char byte_t;
+
+static const size_t k_digest_size = 32;
+
+namespace detail {
+inline byte_t mask_8bit(byte_t x) { return x & 0xff; }
+
+inline word_t mask_32bit(word_t x) { return x & 0xffffffff; }
+
+const word_t add_constant[64] = {
+ 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1,
+ 0x923f82a4, 0xab1c5ed5, 0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3,
+ 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174, 0xe49b69c1, 0xefbe4786,
+ 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da,
+ 0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147,
+ 0x06ca6351, 0x14292967, 0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13,
+ 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85, 0xa2bfe8a1, 0xa81a664b,
+ 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070,
+ 0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a,
+ 0x5b9cca4f, 0x682e6ff3, 0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208,
+ 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2};
+
+const word_t initial_message_digest[8] = {0x6a09e667, 0xbb67ae85, 0x3c6ef372,
+ 0xa54ff53a, 0x510e527f, 0x9b05688c,
+ 0x1f83d9ab, 0x5be0cd19};
+
+inline word_t ch(word_t x, word_t y, word_t z) { return (x & y) ^ ((~x) & z); }
+
+inline word_t maj(word_t x, word_t y, word_t z) {
+ return (x & y) ^ (x & z) ^ (y & z);
+}
+
+inline word_t rotr(word_t x, std::size_t n) {
+ assert(n < 32);
+ return mask_32bit((x >> n) | (x << (32 - n)));
+}
+
+inline word_t bsig0(word_t x) { return rotr(x, 2) ^ rotr(x, 13) ^ rotr(x, 22); }
+
+inline word_t bsig1(word_t x) { return rotr(x, 6) ^ rotr(x, 11) ^ rotr(x, 25); }
+
+inline word_t shr(word_t x, std::size_t n) {
+ assert(n < 32);
+ return x >> n;
+}
+
+inline word_t ssig0(word_t x) { return rotr(x, 7) ^ rotr(x, 18) ^ shr(x, 3); }
+
+inline word_t ssig1(word_t x) { return rotr(x, 17) ^ rotr(x, 19) ^ shr(x, 10); }
+
+template
+void hash256_block(RaIter1 message_digest, RaIter2 first, RaIter2 last) {
+ assert(first + 64 == last);
+ static_cast(last); // for avoiding unused-variable warning
+ word_t w[64];
+ std::fill(w, w + 64, 0);
+ for (std::size_t i = 0; i < 16; ++i) {
+ w[i] = (static_cast(mask_8bit(*(first + i * 4))) << 24) |
+ (static_cast(mask_8bit(*(first + i * 4 + 1))) << 16) |
+ (static_cast(mask_8bit(*(first + i * 4 + 2))) << 8) |
+ (static_cast(mask_8bit(*(first + i * 4 + 3))));
+ }
+ for (std::size_t i = 16; i < 64; ++i) {
+ w[i] = mask_32bit(ssig1(w[i - 2]) + w[i - 7] + ssig0(w[i - 15]) +
+ w[i - 16]);
+ }
+
+ word_t a = *message_digest;
+ word_t b = *(message_digest + 1);
+ word_t c = *(message_digest + 2);
+ word_t d = *(message_digest + 3);
+ word_t e = *(message_digest + 4);
+ word_t f = *(message_digest + 5);
+ word_t g = *(message_digest + 6);
+ word_t h = *(message_digest + 7);
+
+ for (std::size_t i = 0; i < 64; ++i) {
+ word_t temp1 = h + bsig1(e) + ch(e, f, g) + add_constant[i] + w[i];
+ word_t temp2 = bsig0(a) + maj(a, b, c);
+ h = g;
+ g = f;
+ f = e;
+ e = mask_32bit(d + temp1);
+ d = c;
+ c = b;
+ b = a;
+ a = mask_32bit(temp1 + temp2);
+ }
+ *message_digest += a;
+ *(message_digest + 1) += b;
+ *(message_digest + 2) += c;
+ *(message_digest + 3) += d;
+ *(message_digest + 4) += e;
+ *(message_digest + 5) += f;
+ *(message_digest + 6) += g;
+ *(message_digest + 7) += h;
+ for (std::size_t i = 0; i < 8; ++i) {
+ *(message_digest + i) = mask_32bit(*(message_digest + i));
+ }
+}
+
+} // namespace detail
+
+template
+void output_hex(InIter first, InIter last, std::ostream& os) {
+ os.setf(std::ios::hex, std::ios::basefield);
+ while (first != last) {
+ os.width(2);
+ os.fill('0');
+ os << static_cast(*first);
+ ++first;
+ }
+ os.setf(std::ios::dec, std::ios::basefield);
+}
+
+template
+void bytes_to_hex_string(InIter first, InIter last, std::string& hex_str) {
+ std::ostringstream oss;
+ output_hex(first, last, oss);
+ hex_str.assign(oss.str());
+}
+
+template
+void bytes_to_hex_string(const InContainer& bytes, std::string& hex_str) {
+ bytes_to_hex_string(bytes.begin(), bytes.end(), hex_str);
+}
+
+template
+std::string bytes_to_hex_string(InIter first, InIter last) {
+ std::string hex_str;
+ bytes_to_hex_string(first, last, hex_str);
+ return hex_str;
+}
+
+template
+std::string bytes_to_hex_string(const InContainer& bytes) {
+ std::string hex_str;
+ bytes_to_hex_string(bytes, hex_str);
+ return hex_str;
+}
+
+class hash256_one_by_one {
+ public:
+ hash256_one_by_one() { init(); }
+
+ void init() {
+ buffer_.clear();
+ std::fill(data_length_digits_, data_length_digits_ + 4, 0);
+ std::copy(detail::initial_message_digest,
+ detail::initial_message_digest + 8, h_);
+ }
+
+ template
+ void process(RaIter first, RaIter last) {
+ add_to_data_length(static_cast(std::distance(first, last)));
+ std::copy(first, last, std::back_inserter(buffer_));
+ std::size_t i = 0;
+ for (; i + 64 <= buffer_.size(); i += 64) {
+ detail::hash256_block(h_, buffer_.begin() + i,
+ buffer_.begin() + i + 64);
+ }
+ buffer_.erase(buffer_.begin(), buffer_.begin() + i);
+ }
+
+ void finish() {
+ byte_t temp[64];
+ std::fill(temp, temp + 64, 0);
+ std::size_t remains = buffer_.size();
+ std::copy(buffer_.begin(), buffer_.end(), temp);
+ temp[remains] = 0x80;
+
+ if (remains > 55) {
+ std::fill(temp + remains + 1, temp + 64, 0);
+ detail::hash256_block(h_, temp, temp + 64);
+ std::fill(temp, temp + 64 - 4, 0);
+ } else {
+ std::fill(temp + remains + 1, temp + 64 - 4, 0);
+ }
+
+ write_data_bit_length(&(temp[56]));
+ detail::hash256_block(h_, temp, temp + 64);
+ }
+
+ template
+ void get_hash_bytes(OutIter first, OutIter last) const {
+ for (const word_t* iter = h_; iter != h_ + 8; ++iter) {
+ for (std::size_t i = 0; i < 4 && first != last; ++i) {
+ *(first++) = detail::mask_8bit(
+ static_cast((*iter >> (24 - 8 * i))));
+ }
+ }
+ }
+
+ private:
+ void add_to_data_length(word_t n) {
+ word_t carry = 0;
+ data_length_digits_[0] += n;
+ for (std::size_t i = 0; i < 4; ++i) {
+ data_length_digits_[i] += carry;
+ if (data_length_digits_[i] >= 65536u) {
+ carry = data_length_digits_[i] >> 16;
+ data_length_digits_[i] &= 65535u;
+ } else {
+ break;
+ }
+ }
+ }
+ void write_data_bit_length(byte_t* begin) {
+ word_t data_bit_length_digits[4];
+ std::copy(data_length_digits_, data_length_digits_ + 4,
+ data_bit_length_digits);
+
+ // convert byte length to bit length (multiply 8 or shift 3 times left)
+ word_t carry = 0;
+ for (std::size_t i = 0; i < 4; ++i) {
+ word_t before_val = data_bit_length_digits[i];
+ data_bit_length_digits[i] <<= 3;
+ data_bit_length_digits[i] |= carry;
+ data_bit_length_digits[i] &= 65535u;
+ carry = (before_val >> (16 - 3)) & 65535u;
+ }
+
+ // write data_bit_length
+ for (int i = 3; i >= 0; --i) {
+ (*begin++) = static_cast(data_bit_length_digits[i] >> 8);
+ (*begin++) = static_cast(data_bit_length_digits[i]);
+ }
+ }
+ std::vector buffer_;
+ word_t data_length_digits_[4]; // as 64bit integer (16bit x 4 integer)
+ word_t h_[8];
+};
+
+inline void get_hash_hex_string(const hash256_one_by_one& hasher,
+ std::string& hex_str) {
+ byte_t hash[k_digest_size];
+ hasher.get_hash_bytes(hash, hash + k_digest_size);
+ return bytes_to_hex_string(hash, hash + k_digest_size, hex_str);
+}
+
+inline std::string get_hash_hex_string(const hash256_one_by_one& hasher) {
+ std::string hex_str;
+ get_hash_hex_string(hasher, hex_str);
+ return hex_str;
+}
+
+namespace impl {
+template
+void hash256_impl(RaIter first, RaIter last, OutIter first2, OutIter last2, int,
+ std::random_access_iterator_tag) {
+ hash256_one_by_one hasher;
+ // hasher.init();
+ hasher.process(first, last);
+ hasher.finish();
+ hasher.get_hash_bytes(first2, last2);
+}
+
+template
+void hash256_impl(InputIter first, InputIter last, OutIter first2,
+ OutIter last2, int buffer_size, std::input_iterator_tag) {
+ std::vector buffer(buffer_size);
+ hash256_one_by_one hasher;
+ // hasher.init();
+ while (first != last) {
+ int size = buffer_size;
+ for (int i = 0; i != buffer_size; ++i, ++first) {
+ if (first == last) {
+ size = i;
+ break;
+ }
+ buffer[i] = *first;
+ }
+ hasher.process(buffer.begin(), buffer.begin() + size);
+ }
+ hasher.finish();
+ hasher.get_hash_bytes(first2, last2);
+}
+}
+
+template
+void hash256(InIter first, InIter last, OutIter first2, OutIter last2,
+ int buffer_size = PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR) {
+ picosha2::impl::hash256_impl(
+ first, last, first2, last2, buffer_size,
+ typename std::iterator_traits::iterator_category());
+}
+
+template
+void hash256(InIter first, InIter last, OutContainer& dst) {
+ hash256(first, last, dst.begin(), dst.end());
+}
+
+template
+void hash256(const InContainer& src, OutIter first, OutIter last) {
+ hash256(src.begin(), src.end(), first, last);
+}
+
+template
+void hash256(const InContainer& src, OutContainer& dst) {
+ hash256(src.begin(), src.end(), dst.begin(), dst.end());
+}
+
+template
+void hash256_hex_string(InIter first, InIter last, std::string& hex_str) {
+ byte_t hashed[k_digest_size];
+ hash256(first, last, hashed, hashed + k_digest_size);
+ std::ostringstream oss;
+ output_hex(hashed, hashed + k_digest_size, oss);
+ hex_str.assign(oss.str());
+}
+
+template
+std::string hash256_hex_string(InIter first, InIter last) {
+ std::string hex_str;
+ hash256_hex_string(first, last, hex_str);
+ return hex_str;
+}
+
+inline void hash256_hex_string(const std::string& src, std::string& hex_str) {
+ hash256_hex_string(src.begin(), src.end(), hex_str);
+}
+
+template
+void hash256_hex_string(const InContainer& src, std::string& hex_str) {
+ hash256_hex_string(src.begin(), src.end(), hex_str);
+}
+
+template
+std::string hash256_hex_string(const InContainer& src) {
+ return hash256_hex_string(src.begin(), src.end());
+}
+templatevoid hash256(std::ifstream& f, OutIter first, OutIter last){
+ hash256(std::istreambuf_iterator(f), std::istreambuf_iterator(), first,last);
+
+}
+}// namespace picosha2
+#endif // PICOSHA2_H
diff --git a/app/invoke_parallel/main.cpp b/app/invoke_parallel/main.cpp
index 4ae48ef..4382168 100644
--- a/app/invoke_parallel/main.cpp
+++ b/app/invoke_parallel/main.cpp
@@ -2,50 +2,101 @@
#include
#include
+#include
+#include
-static pls::static_scheduler_memory<8, 2 << 14> my_scheduler_memory;
+static constexpr int CUTOFF = 16;
+static constexpr int INPUT_SIZE = 8192;
+typedef std::vector> complex_vector;
-static constexpr int CUTOFF = 10;
+void divide(complex_vector::iterator data, int n) {
+ complex_vector tmp_odd_elements(n / 2);
+ for (int i = 0; i < n / 2; i++) {
+ tmp_odd_elements[i] = data[i * 2 + 1];
+ }
+ for (int i = 0; i < n / 2; i++) {
+ data[i] = data[i * 2];
+ }
+ for (int i = 0; i < n / 2; i++) {
+ data[i + n / 2] = tmp_odd_elements[i];
+ }
+}
-long fib_serial(long n) {
- if (n == 0) {
- return 0;
- }
- if (n == 1) {
- return 1;
- }
+void combine(complex_vector::iterator data, int n) {
+ for (int i = 0; i < n / 2; i++) {
+ std::complex even = data[i];
+ std::complex odd = data[i + n / 2];
+
+ // w is the "twiddle-factor".
+ // this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
+ // so it won't impact the performance comparison.
+ std::complex w = exp(std::complex(0, -2. * M_PI * i / n));
- return fib_serial(n - 1) + fib_serial(n - 2);
+ data[i] = even + w * odd;
+ data[i + n / 2] = even - w * odd;
+ }
}
-long fib(long n) {
- if (n <= CUTOFF) {
- return fib_serial(n);
- }
+void fft(complex_vector::iterator data, int n) {
+ if (n < 2) {
+ return;
+ }
- // Actual 'invoke_parallel' logic/code
- int left, right;
+ PROFILE_WORK_BLOCK("Divide")
+ divide(data, n);
+ PROFILE_END_BLOCK
+ PROFILE_WORK_BLOCK("Invoke Parallel")
+ if (n == CUTOFF) {
+ PROFILE_WORK_BLOCK("FFT Serial")
+ fft(data, n / 2);
+ fft(data + n / 2, n / 2);
+ } else if (n <= CUTOFF) {
+ fft(data, n / 2);
+ fft(data + n / 2, n / 2);
+ } else {
pls::invoke_parallel(
- [&] { left = fib(n - 1); },
- [&] { right = fib(n - 2); }
+ [&] { fft(data, n / 2); },
+ [&] { fft(data + n / 2, n / 2); }
);
- return left + right;
+ }
+ PROFILE_END_BLOCK
+ PROFILE_WORK_BLOCK("Combine")
+ combine(data, n);
+ PROFILE_END_BLOCK
+}
+
+complex_vector prepare_input(int input_size) {
+ std::vector known_frequencies{2, 11, 52, 88, 256};
+ complex_vector data(input_size);
+
+ // Set our input data to match a time series of the known_frequencies.
+ // When applying fft to this time-series we should find these frequencies.
+ for (int i = 0; i < input_size; i++) {
+ data[i] = std::complex(0.0, 0.0);
+ for (auto frequencie : known_frequencies) {
+ data[i] += sin(2 * M_PI * frequencie * i / input_size);
+ }
+ }
+
+ return data;
}
int main() {
- PROFILE_ENABLE
- pls::scheduler scheduler{&my_scheduler_memory, 8};
-
- long result;
- scheduler.perform_work([&] {
- PROFILE_MAIN_THREAD
- // Call looks just the same, only requirement is
- // the enclosure in the perform_work lambda.
- for (int i = 0; i < 10; i++) {
- result = fib(30);
- std::cout << "Fib(30)=" << result << std::endl;
- }
- });
-
- PROFILE_SAVE("test_profile.prof")
+ PROFILE_ENABLE
+ pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 14};
+ pls::scheduler scheduler{&my_scheduler_memory, 8};
+
+ complex_vector initial_input = prepare_input(INPUT_SIZE);
+ scheduler.perform_work([&] {
+ PROFILE_MAIN_THREAD
+ // Call looks just the same, only requirement is
+ // the enclosure in the perform_work lambda.
+ for (int i = 0; i < 10; i++) {
+ PROFILE_WORK_BLOCK("Top Level FFT")
+ complex_vector input = initial_input;
+ fft(input.begin(), input.size());
+ }
+ });
+
+ PROFILE_SAVE("test_profile.prof")
}
diff --git a/app/playground/main.cpp b/app/playground/main.cpp
index cc7e784..d3a7a50 100644
--- a/app/playground/main.cpp
+++ b/app/playground/main.cpp
@@ -10,8 +10,6 @@
#include
#include
-
int main() {
- std::cout << pls::internal::scheduling::root_task::create_id().type_.hash_code() << std::endl;
- std::cout << pls::internal::helpers::unique_id::create>().type_.hash_code() << std::endl;
+
}
diff --git a/app/test_for_new/main.cpp b/app/test_for_new/main.cpp
index 2e74529..51f454c 100644
--- a/app/test_for_new/main.cpp
+++ b/app/test_for_new/main.cpp
@@ -5,9 +5,8 @@ using namespace pls::internal::base;
int global = 0;
-
int main() {
- // Try to use every feature, to trigger the prohibited use of new if found somewhere
- auto t1 = start_thread([] (){});
- t1.join();
+ // Try to use every feature, to trigger the prohibited use of new if found somewhere
+ auto t1 = start_thread([]() {});
+ t1.join();
}
diff --git a/lib/pls/include/pls/algorithms/invoke_parallel.h b/lib/pls/include/pls/algorithms/invoke_parallel.h
index dc44469..17b439e 100644
--- a/lib/pls/include/pls/algorithms/invoke_parallel.h
+++ b/lib/pls/include/pls/algorithms/invoke_parallel.h
@@ -6,15 +6,17 @@
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
- namespace algorithm {
- template
- void invoke_parallel(const Function1& function1, const Function2& function2);
+namespace algorithm {
- template
- void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3);
+template
+void invoke_parallel(const Function1 &function1, const Function2 &function2);
- // ...and so on, add more if we decide to keep this design
- }
+template
+void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3);
+
+// ...and so on, add more if we decide to keep this design
+
+}
}
#include "invoke_parallel_impl.h"
diff --git a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h
index 7dfbef8..9bfa185 100644
--- a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h
+++ b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h
@@ -2,70 +2,73 @@
#ifndef PLS_INVOKE_PARALLEL_IMPL_H
#define PLS_INVOKE_PARALLEL_IMPL_H
+#include
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h"
+#include "pls/internal/base/alignment.h"
namespace pls {
- namespace algorithm {
- namespace internal {
- using namespace ::pls::internal::scheduling;
+namespace algorithm {
+namespace internal {
- template
- inline void run_body(const Body& internal_body, const abstract_task::id& id) {
- // Make sure we are in the context of this invoke_parallel instance,
- // if not we will spawn it as a new 'fork-join-style' task.
- auto current_task = scheduler::current_task();
- if (current_task->unique_id() == id) {
- auto current_sub_task = reinterpret_cast(current_task)->currently_executing();
- internal_body(current_sub_task);
- } else {
- fork_join_lambda root_body(&internal_body);
- fork_join_task root_task{&root_body, id};
- scheduler::execute_task(root_task);
- }
- }
- }
+using namespace ::pls::internal::scheduling;
- template
- void invoke_parallel(const Function1& function1, const Function2& function2) {
- using namespace ::pls::internal::scheduling;
- using namespace ::pls::internal::helpers;
- static abstract_task::id id = unique_id::create();
+template
+inline void run_body(const Body &internal_body, const abstract_task::id &id) {
+ // Make sure we are in the context of this invoke_parallel instance,
+ // if not we will spawn it as a new 'fork-join-style' task.
+ auto current_task = scheduler::current_task();
+ if (current_task->unique_id() == id) {
+ internal_body();
+ } else {
+ fork_join_lambda_by_reference root_body(internal_body);
+ fork_join_task root_task{&root_body, id};
+ scheduler::execute_task(root_task);
+ }
+}
+}
- auto internal_body = [&] (fork_join_sub_task* this_task){
- auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
- auto sub_task_1 = fork_join_lambda(&sub_task_body_1);
+template
+void invoke_parallel(const Function1 &function1, const Function2 &function2) {
+ using namespace ::pls::internal::scheduling;
+ using namespace ::pls::internal::helpers;
+ using namespace ::pls::internal::base;
+ static abstract_task::id id = unique_id::create();
- this_task->spawn_child(sub_task_1);
- function2(); // Execute last function 'inline' without spawning a sub_task object
- this_task->wait_for_all();
- };
+ auto internal_body = [&]() {
+ auto current_task = fork_join_sub_task::current();
+ auto sub_task_2 = fork_join_lambda_by_reference(function2);
- internal::run_body(internal_body, id);
- }
+ current_task->spawn_child(sub_task_2);
+ function1(); // Execute first function 'inline' without spawning a sub_task object
+ current_task->wait_for_all();
+ };
+
+ internal::run_body(internal_body, id);
+}
- template
- void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3) {
- using namespace ::pls::internal::scheduling;
- using namespace ::pls::internal::helpers;
- static abstract_task::id id = unique_id::create();
+template
+void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3) {
+ using namespace ::pls::internal::scheduling;
+ using namespace ::pls::internal::helpers;
+ static abstract_task::id id = unique_id::create();
- auto internal_body = [&] (fork_join_sub_task* this_task){
- auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
- auto sub_task_1 = fork_join_lambda(&sub_task_body_1);
- auto sub_task_body_2 = [&] (fork_join_sub_task*){ function2(); };
- auto sub_task_2 = fork_join_lambda(&sub_task_body_2);
+ auto internal_body = [&]() {
+ auto current_task = fork_join_sub_task::current();
+ auto sub_task_2 = fork_join_lambda_by_reference(function2);
+ auto sub_task_3 = fork_join_lambda_by_reference(function3);
- this_task->spawn_child(sub_task_1);
- this_task->spawn_child(sub_task_2);
- function3(); // Execute last function 'inline' without spawning a sub_task object
- this_task->wait_for_all();
- };
+ current_task->spawn_child(sub_task_2);
+ current_task->spawn_child(sub_task_3);
+ function1(); // Execute first function 'inline' without spawning a sub_task object
+ current_task->wait_for_all();
+ };
- internal::run_body(internal_body, id);
- }
- }
+ internal::run_body(internal_body, id);
+}
+
+}
}
#endif //PLS_INVOKE_PARALLEL_IMPL_H
diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h
index 2dfb474..5797932 100644
--- a/lib/pls/include/pls/internal/base/alignment.h
+++ b/lib/pls/include/pls/internal/base/alignment.h
@@ -8,21 +8,45 @@
#include "system_details.h"
namespace pls {
- namespace internal {
- namespace base {
- namespace alignment {
- template
- struct aligned_wrapper {
- alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)];
- T* pointer() { return reinterpret_cast(data); }
- };
- void* allocate_aligned(size_t size);
-
- std::uintptr_t next_alignment(std::uintptr_t size);
- char* next_alignment(char* pointer);
- }
- }
- }
+namespace internal {
+namespace base {
+namespace alignment {
+
+template
+struct aligned_wrapper {
+ alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)];
+ T *pointer() { return reinterpret_cast(data); }
+};
+void *allocate_aligned(size_t size);
+
+system_details::pointer_t next_alignment(system_details::pointer_t size);
+system_details::pointer_t previous_alignment(system_details::pointer_t size);
+char *next_alignment(char *pointer);
+
+}
+
+template
+struct aligned_aba_pointer {
+ const system_details::pointer_t pointer_;
+
+ explicit aligned_aba_pointer(T *pointer, unsigned int aba = 0) : pointer_{
+ reinterpret_cast(pointer) + aba} {}
+
+ T *pointer() const {
+ return reinterpret_cast(pointer_ & system_details::CACHE_LINE_ADDRESS_USED_BITS);
+ }
+
+ unsigned int aba() const {
+ return pointer_ & system_details::CACHE_LINE_ADDRESS_UNUSED_BITS;
+ }
+
+ aligned_aba_pointer set_aba(unsigned int aba) const {
+ return aligned_aba_pointer(pointer(), aba);
+ }
+};
+
+}
+}
}
#endif //PLS_ALIGNMENT_H
diff --git a/lib/pls/include/pls/internal/base/backoff.h b/lib/pls/include/pls/internal/base/backoff.h
new file mode 100644
index 0000000..9e7cbf5
--- /dev/null
+++ b/lib/pls/include/pls/internal/base/backoff.h
@@ -0,0 +1,54 @@
+
+#ifndef PLS_BACKOFF_H_
+#define PLS_BACKOFF_H_
+
+#include "pls/internal/base/system_details.h"
+#include "pls/internal/helpers/profiler.h"
+#include "pls/internal/base/thread.h"
+
+#include
+#include
+
+namespace pls {
+namespace internal {
+namespace base {
+
+class backoff {
+ const unsigned long INITIAL_SPIN_ITERS = 2u << 4u;
+ const unsigned long MAX_SPIN_ITERS = 2u << 8u;
+ const unsigned long MAX_ITERS = 2u << 10u;
+ const unsigned long YELD_ITERS = 2u << 10u;
+
+ unsigned long current_ = INITIAL_SPIN_ITERS;
+ std::minstd_rand random_;
+
+ static void spin(unsigned long iterations) {
+ for (volatile unsigned long i = 0; i < iterations; i++)
+ system_details::relax_cpu(); // Spin
+ }
+
+ public:
+ backoff() : current_{INITIAL_SPIN_ITERS}, random_{std::random_device{}()} {}
+
+ void do_backoff() {
+ PROFILE_LOCK("Backoff")
+ spin(random_() % std::min(current_, MAX_SPIN_ITERS));
+
+ if (current_ >= YELD_ITERS) {
+ PROFILE_LOCK("Yield")
+ this_thread::yield();
+ }
+
+ current_ = std::min(current_ * 2, MAX_ITERS);
+ }
+
+ void reset() {
+ current_ = INITIAL_SPIN_ITERS;
+ }
+};
+
+}
+}
+}
+
+#endif //PLS_BACKOFF_H_
diff --git a/lib/pls/include/pls/internal/base/barrier.h b/lib/pls/include/pls/internal/base/barrier.h
index 996f0e0..7f7653e 100644
--- a/lib/pls/include/pls/internal/base/barrier.h
+++ b/lib/pls/include/pls/internal/base/barrier.h
@@ -5,27 +5,29 @@
#include
namespace pls {
- namespace internal {
- namespace base {
- /**
- * Provides standard barrier behaviour.
- * `count` threads have to call `wait()` before any of the `wait()` calls returns,
- * thus blocking all threads until everyone reached the barrier.
- *
- * PORTABILITY:
- * Current implementation is based on pthreads.
- */
- class barrier {
- pthread_barrier_t barrier_;
-
- public:
- explicit barrier(unsigned int count);
- ~barrier();
-
- void wait();
- };
- }
- }
+namespace internal {
+namespace base {
+
+/**
+ * Provides standard barrier behaviour.
+ * `count` threads have to call `wait()` before any of the `wait()` calls returns,
+ * thus blocking all threads until everyone reached the barrier.
+ *
+ * PORTABILITY:
+ * Current implementation is based on pthreads.
+ */
+class barrier {
+ pthread_barrier_t barrier_;
+
+ public:
+ explicit barrier(unsigned int count);
+ ~barrier();
+
+ void wait();
+};
+
+}
+}
}
#endif //PLS_BARRIER_H
diff --git a/lib/pls/include/pls/internal/base/error_handling.h b/lib/pls/include/pls/internal/base/error_handling.h
index 235964e..381758a 100644
--- a/lib/pls/include/pls/internal/base/error_handling.h
+++ b/lib/pls/include/pls/internal/base/error_handling.h
@@ -11,5 +11,6 @@
* (or its inclusion adds too much overhead).
*/
#define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1);
+#define PLS_ASSERT(cond, msg) if (!cond) { PLS_ERROR(msg) }
#endif //PLS_ERROR_HANDLING_H
diff --git a/lib/pls/include/pls/internal/base/spin_lock.h b/lib/pls/include/pls/internal/base/spin_lock.h
index 36f4bc3..497dcb1 100644
--- a/lib/pls/include/pls/internal/base/spin_lock.h
+++ b/lib/pls/include/pls/internal/base/spin_lock.h
@@ -6,12 +6,14 @@
#include "ttas_spin_lock.h"
namespace pls {
- namespace internal {
- namespace base {
- // Default Spin-Lock implementation for this project.
- using spin_lock = tas_spin_lock;
- }
- }
+namespace internal {
+namespace base {
+
+// Default Spin-Lock implementation for this project.
+using spin_lock = ttas_spin_lock;
+
+}
+}
}
#endif //PLS_SPINLOCK_H
diff --git a/lib/pls/include/pls/internal/base/swmr_spin_lock.h b/lib/pls/include/pls/internal/base/swmr_spin_lock.h
new file mode 100644
index 0000000..bfe9284
--- /dev/null
+++ b/lib/pls/include/pls/internal/base/swmr_spin_lock.h
@@ -0,0 +1,38 @@
+
+#ifndef PLS_SWMR_SPIN_LOCK_LOCK_H_
+#define PLS_SWMR_SPIN_LOCK_LOCK_H_
+
+#include
+
+#include "pls/internal/helpers/profiler.h"
+
+namespace pls {
+namespace internal {
+namespace base {
+
+/**
+ * Single writer, multiple reader spin lock.
+ * The writer is required to be the same thread all the time (single writer),
+ * while multiple threads can read.
+ * Readers fail to lock when the writer requests the lock,
+ * the acquires the lock after all remaining readers left the critical section.
+ */
+class swmr_spin_lock {
+ std::atomic readers_;
+ std::atomic write_request_;
+
+ public:
+ explicit swmr_spin_lock() : readers_{0}, write_request_{0} {}
+
+ bool reader_try_lock();
+ void reader_unlock();
+
+ void writer_lock();
+ void writer_unlock();
+};
+
+}
+}
+}
+
+#endif //PLS_SWMR_SPIN_LOCK_LOCK_H_
diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h
index 28f7dff..92e4c65 100644
--- a/lib/pls/include/pls/internal/base/system_details.h
+++ b/lib/pls/include/pls/internal/base/system_details.h
@@ -3,31 +3,76 @@
#define PLS_SYSTEM_DETAILS_H
#include
+#if (COMPILER == MVCC)
+#include
+#endif
namespace pls {
- namespace internal {
- namespace base {
- /**
- * Collection of system details, e.g. hardware cache line size.
- *
- * PORTABILITY:
- * Currently sane default values for x86.
- */
- namespace system_details {
- /**
- * Most processors have 64 byte cache lines
- */
- constexpr std::uintptr_t CACHE_LINE_SIZE = 64;
-
- /**
- * Choose one of the following ways to store thread specific data.
- * Try to choose the fastest available on this processor/system.
- */
-// #define PLS_THREAD_SPECIFIC_PTHREAD
- #define PLS_THREAD_SPECIFIC_COMPILER
- }
- }
- }
+namespace internal {
+namespace base {
+
+/**
+ * Collection of system details, e.g. hardware cache line size.
+ *
+ * PORTABILITY:
+ * Currently sane default values for x86.
+ */
+namespace system_details {
+
+/**
+ * Pointer Types needed for ABA protection mixed into addresses.
+ * pointer_t should be an integer type capable of holding ANY pointer value.
+ */
+using pointer_t = std::uintptr_t;
+constexpr pointer_t ZERO_POINTER = 0;
+constexpr pointer_t MAX_POINTER = ~ZERO_POINTER;
+
+/**
+ * Biggest type that supports atomic CAS operations.
+ * Usually it is sane to assume a pointer can be swapped in a single CAS operation.
+ */
+using cas_integer = pointer_t;
+constexpr cas_integer MIN_CAS_INTEGER = 0;
+constexpr cas_integer MAX_CAS_INTEGER = ~MIN_CAS_INTEGER;
+constexpr cas_integer FIRST_HALF_CAS_INTEGER = MAX_CAS_INTEGER << ((sizeof(cas_integer) / 2) * 8);
+constexpr cas_integer SECOND_HALF_CAS_INTEGER = ~FIRST_HALF_CAS_INTEGER;
+
+/**
+ * Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings).
+ */
+constexpr unsigned int CACHE_LINE_ADDRESS_BITS = 6;
+constexpr pointer_t CACHE_LINE_SIZE = 2u << (CACHE_LINE_ADDRESS_BITS - 1);
+constexpr pointer_t CACHE_LINE_ADDRESS_USED_BITS = MAX_POINTER << CACHE_LINE_ADDRESS_BITS;
+constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BITS;
+
+/**
+ * Choose one of the following ways to store thread specific data.
+ * Try to choose the fastest available on this processor/system.
+ */
+//#define PLS_THREAD_SPECIFIC_PTHREAD
+#define PLS_THREAD_SPECIFIC_COMPILER
+
+/**
+ * When spinning one wants to 'relax' the CPU from some task,
+ * e.g. disabling speculative execution/branch prediction
+ * or reducing its clock speed.
+ * This is both good for power draw, as well as for hyperthreading.
+ *
+ * Choose the implementation appropriate for your compiler-cpu combination.
+ */
+#if (COMPILER == MVCC)
+inline void relax_cpu() {
+ _mm_pause();
+}
+#elif (COMPILER == GCC || COMPILER == LLVM)
+inline void relax_cpu() {
+ asm("pause");
+}
+#endif
+
+}
+}
+}
}
#endif //PLS_SYSTEM_DETAILS_H
diff --git a/lib/pls/include/pls/internal/base/tas_spin_lock.h b/lib/pls/include/pls/internal/base/tas_spin_lock.h
index 17db757..74a11a5 100644
--- a/lib/pls/include/pls/internal/base/tas_spin_lock.h
+++ b/lib/pls/include/pls/internal/base/tas_spin_lock.h
@@ -10,30 +10,29 @@
#include "pls/internal/base/thread.h"
namespace pls {
- namespace internal {
- namespace base {
- /**
- * A simple set and test_and_set based spin lock implementation.
- *
- * PORTABILITY:
- * Current implementation is based on C++ 11 atomic_flag.
- */
- class tas_spin_lock {
- std::atomic_flag flag_;
- unsigned int yield_at_tries_;
-
-
- public:
- tas_spin_lock(): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {};
- tas_spin_lock(const tas_spin_lock& other): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {}
-
- void lock();
- bool try_lock(unsigned int num_tries=1);
- void unlock();
- };
- }
- }
-}
+namespace internal {
+namespace base {
+
+/**
+ * A simple set and test_and_set based spin lock implementation.
+ *
+ * PORTABILITY:
+ * Current implementation is based on C++ 11 atomic_flag.
+ */
+class tas_spin_lock {
+ std::atomic_flag flag_;
+
+ public:
+ tas_spin_lock() : flag_{ATOMIC_FLAG_INIT} {};
+ tas_spin_lock(const tas_spin_lock &/*other*/) : flag_{ATOMIC_FLAG_INIT} {}
+
+ void lock();
+ bool try_lock(unsigned int num_tries = 1);
+ void unlock();
+};
+}
+}
+}
#endif //PLS_TAS_SPIN_LOCK_H
diff --git a/lib/pls/include/pls/internal/base/thread.h b/lib/pls/include/pls/internal/base/thread.h
index c06b10b..3ffdf88 100644
--- a/lib/pls/include/pls/internal/base/thread.h
+++ b/lib/pls/include/pls/internal/base/thread.h
@@ -9,113 +9,122 @@
#include
#include
#include
+#include
#include "system_details.h"
namespace pls {
- namespace internal {
- namespace base {
- using thread_entrypoint = void();
-
- /**
- * Static methods than can be performed on the current thread.
- *
- * usage:
- * this_thread::yield();
- * T* state = this_thread::state();
- *
- * PORTABILITY:
- * Current implementation is based on pthreads.
- */
- class this_thread {
- template
- friend class thread;
+namespace internal {
+namespace base {
+
+using thread_entrypoint = void();
+
+/**
+ * Static methods than can be performed on the current thread.
+ *
+ * usage:
+ * this_thread::yield();
+ * T* state = this_thread::state();
+ *
+ * PORTABILITY:
+ * Current implementation is based on pthreads.
+ */
+class this_thread {
+ template
+ friend
+ class thread;
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
- static pthread_key_t local_storage_key_;
- static bool local_storage_key_initialized_;
+ static pthread_key_t local_storage_key_;
+ static bool local_storage_key_initialized_;
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
- static __thread void* local_state_;
+ static __thread void *local_state_;
#endif
- public:
- static void yield() {
- pthread_yield();
- }
-
- /**
- * Retrieves the local state pointer.
- *
- * @tparam T The type of the state that is stored.
- * @return The state pointer hold for this thread.
- */
- template
- static T* state();
-
- /**
- * Stores a pointer to the thread local state object.
- * The memory management for this has to be done by the user,
- * we only keep the pointer.
- *
- * @tparam T The type of the state that is stored.
- * @param state_pointer A pointer to the threads state object.
- */
- template
- static void set_state(T* state_pointer);
- };
-
- /**
- * Abstraction for starting a function in a separate thread.
- *
- * @tparam Function Lambda being started on the new thread.
- * @tparam State State type held for this thread.
- *
- * usage:
- * T* state;
- * auto thread = start_thread([] {
- * // Run on new thread
- * }, state);
- * thread.join(); // Wait for it to finish
- *
- * PORTABILITY:
- * Current implementation is based on pthreads.
- */
- template
- class thread {
- friend class this_thread;
- // Keep a copy of the function (lambda) in this object to make sure it is valid when called!
- Function function_;
- State* state_pointer_;
-
- // Wee need to wait for the started function to read
- // the function_ and state_pointer_ property before returning
- // from the constructor, as the object might be moved after this.
- std::atomic_flag* startup_flag_;
-
- // Keep handle to native implementation
- pthread_t pthread_thread_;
-
- static void* start_pthread_internal(void* thread_pointer);
-
- public:
- explicit thread(const Function& function, State* state_pointer);
-
- public:
- void join();
-
- // make object move only
- thread(thread&&) noexcept = default;
- thread& operator=(thread&&) noexcept = default;
-
- thread(const thread&) = delete;
- thread& operator=(const thread&) = delete;
- };
-
- template
- thread start_thread(const Function& function, State* state_pointer);
- template
- thread start_thread(const Function& function);
- }
- }
+ public:
+ static void yield() {
+ pthread_yield();
+ }
+
+ static void sleep(long microseconds) {
+ timespec time{0, 1000 * microseconds};
+ nanosleep(&time, nullptr);
+ }
+
+ /**
+ * Retrieves the local state pointer.
+ *
+ * @tparam T The type of the state that is stored.
+ * @return The state pointer hold for this thread.
+ */
+ template
+ static T *state();
+
+ /**
+ * Stores a pointer to the thread local state object.
+ * The memory management for this has to be done by the user,
+ * we only keep the pointer.
+ *
+ * @tparam T The type of the state that is stored.
+ * @param state_pointer A pointer to the threads state object.
+ */
+ template
+ static void set_state(T *state_pointer);
+};
+
+/**
+ * Abstraction for starting a function in a separate thread.
+ *
+ * @tparam Function Lambda being started on the new thread.
+ * @tparam State State type held for this thread.
+ *
+ * usage:
+ * T* state;
+ * auto thread = start_thread([] {
+ * // Run on new thread
+ * }, state);
+ * thread.join(); // Wait for it to finish
+ *
+ * PORTABILITY:
+ * Current implementation is based on pthreads.
+ */
+template
+class thread {
+ friend class this_thread;
+ // Keep a copy of the function (lambda) in this object to make sure it is valid when called!
+ Function function_;
+ State *state_pointer_;
+
+ // Wee need to wait for the started function to read
+ // the function_ and state_pointer_ property before returning
+ // from the constructor, as the object might be moved after this.
+ std::atomic_flag *startup_flag_;
+
+ // Keep handle to native implementation
+ pthread_t pthread_thread_;
+
+ static void *start_pthread_internal(void *thread_pointer);
+
+ public:
+ explicit thread(const Function &function, State *state_pointer);
+
+ public:
+ void join();
+
+ // make object move only
+ thread(thread &&) noexcept = default;
+ thread &operator=(thread &&) noexcept = default;
+
+ thread(const thread &) = delete;
+ thread &operator=(const thread &) = delete;
+};
+
+template
+thread start_thread(const Function &function, State *state_pointer);
+template
+thread start_thread(const Function &function);
+
+}
+}
}
#include "thread_impl.h"
diff --git a/lib/pls/include/pls/internal/base/thread_impl.h b/lib/pls/include/pls/internal/base/thread_impl.h
index 1ac356a..64320b6 100644
--- a/lib/pls/include/pls/internal/base/thread_impl.h
+++ b/lib/pls/include/pls/internal/base/thread_impl.h
@@ -3,86 +3,87 @@
#define PLS_THREAD_IMPL_H
namespace pls {
- namespace internal {
- namespace base {
- template
- T* this_thread::state() {
+namespace internal {
+namespace base {
+
+template
+T *this_thread::state() {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
- return reinterpret_cast(pthread_getspecific(local_storage_key_));
+ return reinterpret_cast(pthread_getspecific(local_storage_key_));
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
- return reinterpret_cast(local_state_);
+ return reinterpret_cast(local_state_);
#endif
- }
+}
- template
- void this_thread::set_state(T* state_pointer) {
+template
+void this_thread::set_state(T *state_pointer) {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
- pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
+ pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
- local_state_ = state_pointer;
+ local_state_ = state_pointer;
#endif
- }
+}
- template
- void* thread::start_pthread_internal(void* thread_pointer) {
- auto my_thread = reinterpret_cast(thread_pointer);
- Function my_function_copy = my_thread->function_;
- State* my_state_pointer_copy = my_thread->state_pointer_;
+template
+void *thread::start_pthread_internal(void *thread_pointer) {
+ auto my_thread = reinterpret_cast(thread_pointer);
+ Function my_function_copy = my_thread->function_;
+ State *my_state_pointer_copy = my_thread->state_pointer_;
- // Now we have copies of everything we need on the stack.
- // The original thread object can be moved freely (no more
- // references to its memory location).
- my_thread->startup_flag_->clear();
+ // Now we have copies of everything we need on the stack.
+ // The original thread object can be moved freely (no more
+ // references to its memory location).
+ my_thread->startup_flag_->clear();
- this_thread::set_state(my_state_pointer_copy);
- my_function_copy();
+ this_thread::set_state(my_state_pointer_copy);
+ my_function_copy();
- // Finished executing the user function
- pthread_exit(nullptr);
- }
+ // Finished executing the user function
+ pthread_exit(nullptr);
+}
- template
- thread::thread(const Function& function, State* state_pointer):
- function_{function},
- state_pointer_{state_pointer},
- startup_flag_{nullptr},
- pthread_thread_{} {
+template
+thread::thread(const Function &function, State *state_pointer):
+ function_{function},
+ state_pointer_{state_pointer},
+ startup_flag_{nullptr},
+ pthread_thread_{} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
- if (!this_thread::local_storage_key_initialized_) {
- pthread_key_create(&this_thread::local_storage_key_, nullptr);
- this_thread::local_storage_key_initialized_ = true;
- }
+ if (!this_thread::local_storage_key_initialized_) {
+ pthread_key_create(&this_thread::local_storage_key_, nullptr);
+ this_thread::local_storage_key_initialized_ = true;
+ }
#endif
- // We only need this during startup, will be destroyed when out of scope
- std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
- startup_flag_ = &startup_flag;
-
- startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
- pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this));
- while (startup_flag.test_and_set())
- ; // Busy waiting for the starting flag to clear
- }
-
- template
- void thread::join() {
- pthread_join(pthread_thread_, nullptr);
- }
-
- template
- thread start_thread(const Function& function, State* state_pointer) {
- return thread(function, state_pointer);
- }
-
- template
- thread start_thread(const Function& function) {
- return thread(function, nullptr);
- }
- }
- }
+ // We only need this during startup, will be destroyed when out of scope
+ std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
+ startup_flag_ = &startup_flag;
+
+ startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
+ pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *) (this));
+ while (startup_flag.test_and_set()); // Busy waiting for the starting flag to clear
+}
+
+template
+void thread::join() {
+ pthread_join(pthread_thread_, nullptr);
+}
+
+template
+thread start_thread(const Function &function, State *state_pointer) {
+ return thread(function, state_pointer);
+}
+
+template
+thread start_thread(const Function &function) {
+ return thread(function, nullptr);
+}
+
+}
+}
}
#endif //PLS_THREAD_IMPL_H
diff --git a/lib/pls/include/pls/internal/base/ttas_spin_lock.h b/lib/pls/include/pls/internal/base/ttas_spin_lock.h
index 592c847..787f772 100644
--- a/lib/pls/include/pls/internal/base/ttas_spin_lock.h
+++ b/lib/pls/include/pls/internal/base/ttas_spin_lock.h
@@ -6,32 +6,30 @@
#include
#include "pls/internal/base/thread.h"
+#include "pls/internal/base/backoff.h"
namespace pls {
- namespace internal {
- namespace base {
- /**
- * A simple set and test_and_set based spin lock implementation.
- *
- * PORTABILITY:
- * Current implementation is based on C++ 11 atomic_flag.
- */
- class ttas_spin_lock {
- std::atomic flag_;
- const unsigned int yield_at_tries_;
-
-
- public:
- ttas_spin_lock(): flag_{0}, yield_at_tries_{1024} {};
- ttas_spin_lock(const ttas_spin_lock& other): flag_{0}, yield_at_tries_{other.yield_at_tries_} {}
-
- void lock();
- bool try_lock(unsigned int num_tries=1);
- void unlock();
- };
- }
- }
+namespace internal {
+namespace base {
+/**
+ * A simple set and test_and_set based spin lock implementation.
+ *
+ * PORTABILITY:
+ * Current implementation is based on C++ 11 atomic_flag.
+ */
+class ttas_spin_lock {
+ std::atomic flag_;
+
+ public:
+ ttas_spin_lock() : flag_{0} {};
+ ttas_spin_lock(const ttas_spin_lock &/*other*/) : flag_{0} {}
+
+ void lock();
+ bool try_lock(unsigned int num_tries = 1);
+ void unlock();
+};
+}
+}
}
-
#endif //PLS_TTAS_SPIN_LOCK_H
diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h
index 3bb3f72..2e04702 100644
--- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h
+++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h
@@ -9,45 +9,50 @@
#include "pls/internal/base/alignment.h"
namespace pls {
- namespace internal {
- namespace data_structures {
- /**
- * Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region.
- * The objects will be stored aligned in the stack, making the storage cache friendly and very fast
- * (as long as one can live with the stack restrictions).
- *
- * IMPORTANT: Does not call destructors on stored objects! Do not allocate resources in the objects!
- *
- * Usage:
- * aligned_stack stack{pointer_to_memory, size_of_memory};
- * T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack
- * stack.pop(); // Deconstruct the top object of type T
- */
- class aligned_stack {
- // Keep bounds of our memory block
- char* memory_start_;
- char* memory_end_;
-
- // Current head will always be aligned to cache lines
- char* head_;
- public:
- typedef char* state;
-
- aligned_stack(): memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {};
- aligned_stack(char* memory_region, std::size_t size);
-
- template
- T* push(const T& object);
- template
- void* push();
- template
- T pop();
-
- state save_state() const { return head_; }
- void reset_state(state new_state) { head_ = new_state; }
- };
- }
- }
+namespace internal {
+namespace data_structures {
+
+using base::system_details::pointer_t;
+
+/**
+ * Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region.
+ * The objects will be stored aligned in the stack, making the storage cache friendly and very fast
+ * (as long as one can live with the stack restrictions).
+ *
+ * IMPORTANT: Does not call destructors on stored objects! Do not allocate resources in the objects!
+ *
+ * Usage:
+ * aligned_stack stack{pointer_to_memory, size_of_memory};
+ * T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack
+ * stack.pop(); // Deconstruct the top object of type T
+ */
+class aligned_stack {
+ // Keep bounds of our memory block
+ pointer_t memory_start_;
+ pointer_t memory_end_;
+
+ // Current head will always be aligned to cache lines
+ pointer_t head_;
+ public:
+ typedef pointer_t state;
+
+ aligned_stack() : memory_start_{0}, memory_end_{0}, head_{0} {};
+ aligned_stack(pointer_t memory_region, std::size_t size);
+ aligned_stack(char *memory_region, std::size_t size);
+
+ template
+ T *push(const T &object);
+ template
+ void *push();
+ template
+ T pop();
+
+ state save_state() const { return head_; }
+ void reset_state(state new_state) { head_ = new_state; }
+};
+
+}
+}
}
#include "aligned_stack_impl.h"
diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h
index 8a3a759..849971a 100644
--- a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h
+++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h
@@ -3,34 +3,36 @@
#define PLS_ALIGNED_STACK_IMPL_H
namespace pls {
- namespace internal {
- namespace data_structures {
- template
- T* aligned_stack::push(const T& object) {
- // Copy-Construct
- return new ((void*)push())T(object);
- }
-
- template
- void* aligned_stack::push() {
- void* result = reinterpret_cast(head_);
-
- // Move head to next aligned position after new object
- head_ = base::alignment::next_alignment(head_ + sizeof(T));
- if (head_ >= memory_end_) {
- PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
- }
-
- return result;
- }
-
- template
- T aligned_stack::pop() {
- head_ = head_ - base::alignment::next_alignment(sizeof(T));
- return *reinterpret_cast(head_);
- }
- }
- }
+namespace internal {
+namespace data_structures {
+
+template
+T *aligned_stack::push(const T &object) {
+ // Copy-Construct
+ return new(push < T > ())T(object);
+}
+
+template
+void *aligned_stack::push() {
+ void *result = reinterpret_cast(head_);
+
+ // Move head to next aligned position after new object
+ head_ = base::alignment::next_alignment(head_ + sizeof(T));
+ if (head_ >= memory_end_) {
+ PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
+ }
+
+ return result;
+}
+
+template
+T aligned_stack::pop() {
+ head_ = head_ - base::alignment::next_alignment(sizeof(T));
+ return *reinterpret_cast(head_);
+}
+
+}
+}
}
#endif //PLS_ALIGNED_STACK_IMPL_H
diff --git a/lib/pls/include/pls/internal/data_structures/deque.h b/lib/pls/include/pls/internal/data_structures/deque.h
index 8652cc3..8f555da 100644
--- a/lib/pls/include/pls/internal/data_structures/deque.h
+++ b/lib/pls/include/pls/internal/data_structures/deque.h
@@ -5,56 +5,58 @@
#include "pls/internal/base/spin_lock.h"
namespace pls {
- namespace internal {
- namespace data_structures {
- /**
- * Turns any object into deque item when inheriting from this.
- */
- class deque_item {
- friend class deque_internal;
-
- deque_item* prev_;
- deque_item* next_;
-
- };
-
- class deque_internal {
- protected:
- deque_item* head_;
- deque_item* tail_;
-
- base::spin_lock lock_;
-
- deque_item* pop_head_internal();
- deque_item* pop_tail_internal();
- void push_tail_internal(deque_item *new_item);
- };
-
- /**
- * A double linked list based deque.
- * Storage is therefore only needed for the individual items.
- *
- * @tparam Item The type of items stored in this deque
- */
- template
- class deque: deque_internal {
- public:
- explicit deque(): deque_internal{} {}
-
- inline Item* pop_head() {
- return static_cast- (pop_head_internal());
- }
-
- inline Item* pop_tail() {
- return static_cast
- (pop_tail_internal());
- }
-
- inline void push_tail(Item* new_item) {
- push_tail_internal(new_item);
- }
- };
- }
- }
+namespace internal {
+namespace data_structures {
+
+/**
+ * Turns any object into deque item when inheriting from this.
+ */
+class deque_item {
+ friend class deque_internal;
+
+ deque_item *prev_;
+ deque_item *next_;
+
+};
+
+class deque_internal {
+ protected:
+ deque_item *head_;
+ deque_item *tail_;
+
+ base::spin_lock lock_;
+
+ deque_item *pop_head_internal();
+ deque_item *pop_tail_internal();
+ void push_tail_internal(deque_item *new_item);
+};
+
+/**
+ * A double linked list based deque.
+ * Storage is therefore only needed for the individual items.
+ *
+ * @tparam Item The type of items stored in this deque
+ */
+template
+class deque : deque_internal {
+ public:
+ explicit deque() : deque_internal{} {}
+
+ inline Item *pop_head() {
+ return static_cast
- (pop_head_internal());
+ }
+
+ inline Item *pop_tail() {
+ return static_cast
- (pop_tail_internal());
+ }
+
+ inline void push_tail(Item *new_item) {
+ push_tail_internal(new_item);
+ }
+};
+
+}
+}
}
#endif //PLS_DEQUE_H
diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h
new file mode 100644
index 0000000..23f734d
--- /dev/null
+++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h
@@ -0,0 +1,240 @@
+
+#ifndef PLS_WORK_STEALING_DEQUE_H_
+#define PLS_WORK_STEALING_DEQUE_H_
+
+#include
+#include
+#include
+
+#include "pls/internal/base/system_details.h"
+#include "pls/internal/base/spin_lock.h"
+#include "pls/internal/base/error_handling.h"
+
+#include "aligned_stack.h"
+
+namespace pls {
+namespace internal {
+namespace data_structures {
+
+using cas_integer = base::system_details::cas_integer;
+using pointer_t = base::system_details::pointer_t;
+static cas_integer get_stamp(cas_integer n) {
+ return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8);
+}
+static cas_integer get_offset(cas_integer n) {
+ return n & base::system_details::SECOND_HALF_CAS_INTEGER;
+}
+static cas_integer set_stamp(cas_integer n, cas_integer new_value) {
+ return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER);
+}
+//static cas_integer set_offset(cas_integer n, cas_integer new_value) {
+// return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER);
+//}
+
+class work_stealing_deque_item {
+ // Pointer to the actual data
+ pointer_t data_;
+ // Index (relative to stack base) to the next and previous element
+ cas_integer next_item_;
+ cas_integer previous_item_;
+
+ public:
+ work_stealing_deque_item() : data_{0}, next_item_{0}, previous_item_{0} {}
+
+ template
+ Item *data() {
+ return reinterpret_cast
- (data_);
+ }
+
+ template
+ void set_data(Item *data) {
+ data_ = reinterpret_cast(data);
+ }
+
+ cas_integer next_item() {
+ return next_item_;
+ }
+ void set_next_item(cas_integer next_item) {
+ next_item_ = next_item;
+ }
+ cas_integer previous_item() {
+ return previous_item_;
+ }
+ void set_previous_item(cas_integer previous_item) {
+ previous_item_ = previous_item;
+ }
+};
+static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE,
+ "Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!");
+
+template
+class work_stealing_deque {
+ // Deque 'takes over' stack and handles memory management while in use.
+ // At any point in time the deque can stop using more memory and the stack can be used by other entities.
+ aligned_stack *stack_;
+ pointer_t base_pointer_;
+
+ std::atomic head_;
+ std::atomic tail_;
+ cas_integer previous_tail_;
+
+ base::spin_lock lock_{}; // TODO: Remove after debugging
+
+
+ public:
+ using state = aligned_stack::state;
+
+ explicit work_stealing_deque(aligned_stack *stack) : stack_{stack},
+ base_pointer_{0},
+ head_{0},
+ tail_{0},
+ previous_tail_{0} {
+ reset_base_pointer();
+ }
+ work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_},
+ base_pointer_{other.base_pointer_},
+ head_{other.head_.load()},
+ tail_{other.tail_.load()},
+ previous_tail_{other.previous_tail_} {}
+
+ void reset_base_pointer() {
+ base_pointer_ = reinterpret_cast(stack_->save_state()); // Keep the base of our region in the stack
+ }
+
+ work_stealing_deque_item *item_at(cas_integer position) {
+ return reinterpret_cast(base_pointer_
+ + (base::system_details::CACHE_LINE_SIZE * position));
+ }
+
+ cas_integer current_stack_offset() {
+ return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
+ }
+
+ template
+ std::pair *allocate_item(const T &new_item) {
+ // 'Union' type to push both on stack
+ using pair_t = std::pair;
+ // Allocate space on stack
+ auto new_pair = reinterpret_cast(stack_->push());
+ // Initialize memory on stack
+ new((void *) &(new_pair->first)) work_stealing_deque_item();
+ new((void *) &(new_pair->second)) T(new_item);
+
+ return new_pair;
+ }
+
+ template
+ Item *push_tail(const T &new_item) {
+ cas_integer local_tail = tail_;
+
+ auto new_pair = allocate_item(new_item);
+ // Prepare current tail to point to correct next items
+ auto tail_deque_item = item_at(local_tail);
+ tail_deque_item->set_data(&(new_pair->second));
+ tail_deque_item->set_next_item(current_stack_offset());
+ tail_deque_item->set_previous_item(previous_tail_);
+ previous_tail_ = local_tail;
+
+ // Linearization point, item appears after this write
+ cas_integer new_tail = current_stack_offset();
+ tail_ = new_tail;
+
+ return &(new_pair->second);
+ }
+
+ Item *pop_tail() {
+ cas_integer local_tail = tail_;
+ cas_integer local_head = head_;
+
+ if (local_tail <= get_offset(local_head)) {
+ return nullptr; // EMPTY
+ }
+
+ work_stealing_deque_item *previous_tail_item = item_at(previous_tail_);
+ cas_integer new_tail = previous_tail_;
+ previous_tail_ = previous_tail_item->previous_item();
+
+ // Publish our wish to set the tail back
+ tail_ = new_tail;
+ // Get the state of local head AFTER we published our wish
+ local_head = head_; // Linearization point, outside knows list is empty
+
+ if (get_offset(local_head) < new_tail) {
+ return previous_tail_item->data
- (); // Success, enough distance to other threads
+ }
+
+ if (get_offset(local_head) == new_tail) {
+ cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1);
+ // Try competing with consumers by updating the head's stamp value
+ if (head_.compare_exchange_strong(local_head, new_head)) {
+ return previous_tail_item->data
- (); // SUCCESS, we won the competition with other threads
+ }
+ }
+
+ // Some other thread either won the competition or it already set the head further than we are
+ // before we even tried to compete with it.
+ // Reset the queue into an empty state => head_ = tail_
+ tail_ = get_offset(local_head); // ...we give up to the other winning thread
+
+ return nullptr; // EMPTY, we lost the competition with other threads
+ }
+
+ Item *pop_head() {
+ cas_integer local_head = head_;
+ cas_integer local_tail = tail_;
+
+ if (local_tail <= get_offset(local_head)) {
+ return nullptr; // EMPTY
+ }
+ // Load info on current deque item.
+ // In case we have a race with a new (aba) overwritten item at this position,
+ // there has to be a competition over the tail -> the stamp increased and our next
+ // operation will fail anyways!
+ work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head));
+ cas_integer next_item_offset = head_deque_item->next_item();
+ Item *head_data_item = head_deque_item->data
- ();
+
+ // We try to set the head to this new position.
+ // Possible outcomes:
+ // 1) no one interrupted us, we win this competition
+ // 2) other thread took the head, we lose to this
+ // 3) owning thread removed tail, we lose to this
+ cas_integer new_head = set_stamp(next_item_offset, get_stamp(local_head) + 1);
+ if (head_.compare_exchange_strong(local_head, new_head)) {
+ return head_data_item; // SUCCESS, we won the competition
+ }
+
+ return nullptr; // EMPTY, we lost the competition
+
+ }
+
+ void release_memory_until(state state) {
+ cas_integer item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
+
+ cas_integer local_head = head_;
+ cas_integer local_tail = tail_;
+
+ stack_->reset_state(state);
+
+ if (item_offset < local_tail) {
+ tail_ = item_offset;
+ if (get_offset(local_head) >= local_tail) {
+ head_ = set_stamp(item_offset, get_stamp(local_head) + 1);
+ }
+ }
+ }
+
+ void release_memory_until(Item *item) {
+ release_memory_until(reinterpret_cast(item));
+ }
+
+ state save_state() {
+ return stack_->save_state();
+ }
+};
+
+}
+}
+}
+
+#endif //PLS_WORK_STEALING_DEQUE_H_
diff --git a/lib/pls/include/pls/internal/helpers/mini_benchmark.h b/lib/pls/include/pls/internal/helpers/mini_benchmark.h
index dac7237..d5bcce4 100644
--- a/lib/pls/include/pls/internal/helpers/mini_benchmark.h
+++ b/lib/pls/include/pls/internal/helpers/mini_benchmark.h
@@ -9,45 +9,47 @@
#include
namespace pls {
- namespace internal {
- namespace helpers {
- // TODO: Clean up (separate into small functions and .cpp file)
- template
- void run_mini_benchmark(const Function& lambda, size_t max_threads, unsigned long max_runtime_ms=1000) {
- using namespace std;
- using namespace pls::internal::scheduling;
-
- malloc_scheduler_memory scheduler_memory{max_threads};
- for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) {
- scheduler local_scheduler{&scheduler_memory, num_threads};
-
- chrono::high_resolution_clock::time_point start_time;
- chrono::high_resolution_clock::time_point end_time;
- unsigned long iterations = 0;
- local_scheduler.perform_work([&] {
- start_time = chrono::high_resolution_clock::now();
- end_time = start_time;
- chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms);
-
- while (end_time < planned_end_time) {
- lambda();
- end_time = chrono::high_resolution_clock::now();
- iterations++;
- }
- });
-
- long time = chrono::duration_cast(end_time - start_time).count();
- double time_per_iteration = (double)time / iterations;
-
- std::cout << time_per_iteration;
- if (num_threads < max_threads) {
- std::cout << ",";
- }
- }
- std::cout << std::endl;
- }
- }
+namespace internal {
+namespace helpers {
+
+// TODO: Clean up (separate into small functions and .cpp file)
+template
+void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned long max_runtime_ms = 1000) {
+ using namespace std;
+ using namespace pls::internal::scheduling;
+
+ malloc_scheduler_memory scheduler_memory{max_threads};
+ for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) {
+ scheduler local_scheduler{&scheduler_memory, num_threads};
+
+ chrono::high_resolution_clock::time_point start_time;
+ chrono::high_resolution_clock::time_point end_time;
+ unsigned long iterations = 0;
+ local_scheduler.perform_work([&] {
+ start_time = chrono::high_resolution_clock::now();
+ end_time = start_time;
+ chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms);
+
+ while (end_time < planned_end_time) {
+ lambda();
+ end_time = chrono::high_resolution_clock::now();
+ iterations++;
+ }
+ });
+
+ long time = chrono::duration_cast(end_time - start_time).count();
+ double time_per_iteration = (double) time / iterations;
+
+ std::cout << time_per_iteration;
+ if (num_threads < max_threads) {
+ std::cout << ",";
}
+ }
+ std::cout << std::endl;
+}
+
+}
+}
}
#endif //PLS_MINI_BENCHMARK_H
diff --git a/lib/pls/include/pls/internal/helpers/prohibit_new.h b/lib/pls/include/pls/internal/helpers/prohibit_new.h
index 4ea59cc..d55b34e 100644
--- a/lib/pls/include/pls/internal/helpers/prohibit_new.h
+++ b/lib/pls/include/pls/internal/helpers/prohibit_new.h
@@ -15,9 +15,9 @@
#ifdef NEW_LINK_ERROR
// This will cause a linker error if new is used in the code.
// We also exit if it is somehow still called.
-inline void * operator new (std::size_t) {
- extern int bare_new_erroneously_called();
- exit(bare_new_erroneously_called() | 1);
+inline void *operator new(std::size_t) {
+ extern int bare_new_erroneously_called();
+ exit(bare_new_erroneously_called() | 1);
}
#else
// Use this + debugging point to find out where we use a new
diff --git a/lib/pls/include/pls/internal/helpers/unique_id.h b/lib/pls/include/pls/internal/helpers/unique_id.h
index 918021c..358d17e 100644
--- a/lib/pls/include/pls/internal/helpers/unique_id.h
+++ b/lib/pls/include/pls/internal/helpers/unique_id.h
@@ -7,25 +7,27 @@
#include
namespace pls {
- namespace internal {
- namespace helpers {
- struct unique_id {
- const uint32_t id_;
- const std::type_info& type_;
- bool operator==(const unique_id& other) const { return id_ == other.id_ && type_ == other.type_; }
+namespace internal {
+namespace helpers {
- static constexpr unique_id create(const uint32_t id) {
- return unique_id(id, typeid(void));
- }
- template
- static constexpr unique_id create() {
- return unique_id(UINT32_MAX, typeid(std::tuple));
- }
- private:
- explicit constexpr unique_id(const uint32_t id, const std::type_info& type): id_{id}, type_{type} {};
- };
- }
- }
+struct unique_id {
+ const uint32_t id_;
+ const std::type_info &type_;
+ bool operator==(const unique_id &other) const { return id_ == other.id_ && type_ == other.type_; }
+
+ static constexpr unique_id create(const uint32_t id) {
+ return unique_id(id, typeid(void));
+ }
+ template
+ static constexpr unique_id create() {
+ return unique_id(UINT32_MAX, typeid(std::tuple));
+ }
+ private:
+ explicit constexpr unique_id(const uint32_t id, const std::type_info &type) : id_{id}, type_{type} {};
+};
+
+}
+}
}
#endif //PLS_UNIQUE_ID_H
diff --git a/lib/pls/include/pls/internal/scheduling/abstract_task.h b/lib/pls/include/pls/internal/scheduling/abstract_task.h
index c239811..21d7357 100644
--- a/lib/pls/include/pls/internal/scheduling/abstract_task.h
+++ b/lib/pls/include/pls/internal/scheduling/abstract_task.h
@@ -2,42 +2,44 @@
#ifndef PLS_ABSTRACT_TASK_H
#define PLS_ABSTRACT_TASK_H
-#include "pls/internal/base/spin_lock.h"
+#include "pls/internal/base/swmr_spin_lock.h"
#include "pls/internal/helpers/unique_id.h"
namespace pls {
- namespace internal {
- namespace scheduling {
- class abstract_task {
- public:
- using id = helpers::unique_id;
-
- private:
- unsigned int depth_;
- abstract_task::id unique_id_;
- abstract_task* child_task_;
-
- public:
- abstract_task(const unsigned int depth, const abstract_task::id& unique_id):
- depth_{depth},
- unique_id_{unique_id},
- child_task_{nullptr} {}
-
- virtual void execute() = 0;
- void set_child(abstract_task* child_task) { child_task_ = child_task; }
- abstract_task* child() { return child_task_; }
-
- void set_depth(unsigned int depth) { depth_ = depth; }
- unsigned int depth() const { return depth_; }
- id unique_id() const { return unique_id_; }
- protected:
- virtual bool internal_stealing(abstract_task* other_task) = 0;
- virtual bool split_task(base::spin_lock* lock) = 0;
-
- bool steal_work();
- };
- }
- }
+namespace internal {
+namespace scheduling {
+
+class abstract_task {
+ public:
+ using id = helpers::unique_id;
+
+ private:
+ unsigned int depth_;
+ abstract_task::id unique_id_;
+ abstract_task *volatile child_task_;
+
+ public:
+ abstract_task(const unsigned int depth, const abstract_task::id &unique_id) :
+ depth_{depth},
+ unique_id_{unique_id},
+ child_task_{nullptr} {}
+
+ virtual void execute() = 0;
+ void set_child(abstract_task *child_task) { child_task_ = child_task; }
+ abstract_task *child() const { return child_task_; }
+
+ void set_depth(unsigned int depth) { depth_ = depth; }
+ unsigned int depth() const { return depth_; }
+ id unique_id() const { return unique_id_; }
+ protected:
+ virtual bool internal_stealing(abstract_task *other_task) = 0;
+ virtual bool split_task(base::swmr_spin_lock *lock) = 0;
+
+ bool steal_work();
+};
+
+}
+}
}
#endif //PLS_ABSTRACT_TASK_H
diff --git a/lib/pls/include/pls/internal/scheduling/fork_join_task.h b/lib/pls/include/pls/internal/scheduling/fork_join_task.h
index 6a54f09..33a6c2f 100644
--- a/lib/pls/include/pls/internal/scheduling/fork_join_task.h
+++ b/lib/pls/include/pls/internal/scheduling/fork_join_task.h
@@ -5,94 +5,119 @@
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/data_structures/aligned_stack.h"
-#include "pls/internal/data_structures/deque.h"
+#include "pls/internal/data_structures/work_stealing_deque.h"
#include "abstract_task.h"
#include "thread_state.h"
namespace pls {
- namespace internal {
- namespace scheduling {
- class fork_join_task;
- class fork_join_sub_task: public data_structures::deque_item {
- friend class fork_join_task;
-
- // Coordinate finishing of sub_tasks
- std::atomic_uint32_t ref_count_;
- fork_join_sub_task* parent_;
-
- // Access to TBB scheduling environment
- fork_join_task* tbb_task_;
-
- // Stack Management (reset stack pointer after wait_for_all() calls)
- data_structures::aligned_stack::state stack_state_;
- protected:
- explicit fork_join_sub_task();
- fork_join_sub_task(const fork_join_sub_task& other);
-
- // Overwritten with behaviour of child tasks
- virtual void execute_internal() = 0;
-
- public:
- // Only use them when actually executing this sub_task (only public for simpler API design)
- template
- void spawn_child(const T& sub_task);
- void wait_for_all();
-
- private:
- void spawn_child_internal(fork_join_sub_task* sub_task);
- void execute();
- };
-
- template
- class fork_join_lambda: public fork_join_sub_task {
- const Function* function_;
-
- public:
- explicit fork_join_lambda(const Function* function): function_{function} {};
-
- protected:
- void execute_internal() override {
- (*function_)(this);
- }
- };
-
- class fork_join_task: public abstract_task {
- friend class fork_join_sub_task;
-
- fork_join_sub_task* root_task_;
- fork_join_sub_task* currently_executing_;
- data_structures::aligned_stack* my_stack_;
-
- // Double-Ended Queue management
- data_structures::deque deque_;
-
- // Steal Management
- fork_join_sub_task* last_stolen_;
-
- fork_join_sub_task* get_local_sub_task();
- fork_join_sub_task* get_stolen_sub_task();
-
- protected:
- bool internal_stealing(abstract_task* other_task) override;
- bool split_task(base::spin_lock* /*lock*/) override;
-
- public:
- explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id);
- void execute() override;
- fork_join_sub_task* currently_executing() const;
- };
-
- template
- void fork_join_sub_task::spawn_child(const T& task) {
- PROFILE_FORK_JOIN_STEALING("spawn_child")
- static_assert(std::is_base_of::value, "Only pass fork_join_sub_task subclasses!");
-
- T* new_task = tbb_task_->my_stack_->push(task);
- spawn_child_internal(new_task);
- }
- }
- }
+namespace internal {
+namespace scheduling {
+
+class fork_join_task;
+class fork_join_sub_task {
+ friend class fork_join_task;
+
+ // Coordinate finishing of sub_tasks
+ std::atomic_uint32_t ref_count_;
+ fork_join_sub_task *parent_;
+
+ // Access to TBB scheduling environment
+ fork_join_task *tbb_task_;
+
+ bool executed = false;
+ int executed_at = -1;
+
+ // Stack Management (reset stack pointer after wait_for_all() calls)
+ data_structures::work_stealing_deque::state deque_state_;
+ protected:
+ explicit fork_join_sub_task();
+ fork_join_sub_task(const fork_join_sub_task &other);
+
+ // Overwritten with behaviour of child tasks
+ virtual void execute_internal() = 0;
+
+ public:
+ // Only use them when actually executing this sub_task (only public for simpler API design)
+ template
+ void spawn_child(T &sub_task);
+ void wait_for_all();
+
+ static fork_join_sub_task *current();
+ private:
+ void execute();
+};
+
+template
+class fork_join_lambda_by_reference : public fork_join_sub_task {
+ const Function &function_;
+
+ public:
+ explicit fork_join_lambda_by_reference(const Function &function) : fork_join_sub_task{}, function_{function} {};
+
+ protected:
+ void execute_internal() override {
+ function_();
+ }
+};
+
+template
+class fork_join_lambda_by_value : public fork_join_sub_task {
+ const Function function_;
+
+ public:
+ explicit fork_join_lambda_by_value(const Function &function) : fork_join_sub_task{}, function_{function} {};
+
+ protected:
+ void execute_internal() override {
+ function_();
+ }
+};
+
+class fork_join_task : public abstract_task {
+ friend class fork_join_sub_task;
+
+ fork_join_sub_task *root_task_;
+ fork_join_sub_task *currently_executing_;
+
+ // Double-Ended Queue management
+ data_structures::work_stealing_deque deque_;
+
+ // Steal Management
+ fork_join_sub_task *last_stolen_;
+
+ fork_join_sub_task *get_local_sub_task();
+ fork_join_sub_task *get_stolen_sub_task();
+
+ bool internal_stealing(abstract_task *other_task) override;
+ bool split_task(base::swmr_spin_lock * /*lock*/) override;
+
+ public:
+ explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id);
+ void execute() override;
+ fork_join_sub_task *currently_executing() const;
+};
+
+template