Commit dd8fb1e9 by FritzFlorian

Merge remote-tracking branch 'remotes/origin/master' into parallel_for

# Conflicts:
#	PERFORMANCE.md
#	lib/pls/CMakeLists.txt
#	lib/pls/include/pls/internal/scheduling/fork_join_task.h
parents 4fe555b7 70f72790
...@@ -7,7 +7,7 @@ set(CMAKE_CXX_STANDARD 11) ...@@ -7,7 +7,7 @@ set(CMAKE_CXX_STANDARD 11)
# seperate library and test/example executable output paths. # seperate library and test/example executable output paths.
set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin) set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin)
set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/lib) set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/lib)
# specific setup code is located in individual files. # specific setup code is located in individual files.
include(cmake/DisabelInSource.cmake) include(cmake/DisabelInSource.cmake)
...@@ -34,11 +34,12 @@ add_subdirectory(app/playground) ...@@ -34,11 +34,12 @@ add_subdirectory(app/playground)
add_subdirectory(app/test_for_new) add_subdirectory(app/test_for_new)
add_subdirectory(app/invoke_parallel) add_subdirectory(app/invoke_parallel)
add_subdirectory(app/benchmark_fft) add_subdirectory(app/benchmark_fft)
add_subdirectory(app/benchmark_unbalanced)
# Add optional tests # Add optional tests
option(PACKAGE_TESTS "Build the tests" ON) option(PACKAGE_TESTS "Build the tests" ON)
if(PACKAGE_TESTS) if (PACKAGE_TESTS)
enable_testing() enable_testing()
add_subdirectory(test) add_subdirectory(test)
add_test(NAME AllTests COMMAND tests) add_test(NAME AllTests COMMAND tests)
endif() endif ()
...@@ -4,6 +4,33 @@ A collection of stuff that we noticed during development. ...@@ -4,6 +4,33 @@ A collection of stuff that we noticed during development.
Useful later on two write a project report and to go back Useful later on two write a project report and to go back
in time to find out why certain decisions where made. in time to find out why certain decisions where made.
## 06.05.2019 - Relaxed Atomics
At the end of the atomic talk it is mentioned how the relaxed
atomics correspond to modern hardware, stating that starting with
ARMv8 and x86 there is no real need for relaxed atomics,
as the 'strict' ordering has no real performance impact.
We will therefore ignore this for now (taking some potential
performance hits on our banana pi m2 ARMv7 architecture),
as it adds a lot of complexity and introduces many possibilities
for subtle concurrency bugs.
TODO: find some papers on the topic in case we want to mention
it in our project report.
## 06.05.2019 - Atomics
Atomics are a big part to get lock-free code running and are also
often useful for details like counters to spin on (e.g. is a task
finished). Resources to understand them are found in
[a talk online](https://herbsutter.com/2013/02/11/atomic-weapons-the-c-memory-model-and-modern-hardware/)
and in the book 'C++ concurrency in action, Anthony Williams'.
The key takeaway is that we need to be careful about ordering
possibilities of variable reads/writes. We will start of to
use strict ordering to begin with and will have to see if this
impacts performance and if we need to optimize it.
## 12.04.2019 - Unique IDs ## 12.04.2019 - Unique IDs
Assigning unique IDs to logical different tasks is key to the Assigning unique IDs to logical different tasks is key to the
......
...@@ -63,3 +63,62 @@ to contemption, but could not resolve it with any combination ...@@ -63,3 +63,62 @@ to contemption, but could not resolve it with any combination
of `tas_spinlock` vs `ttas_spinlock` and `lock` vs `try_lock`. of `tas_spinlock` vs `ttas_spinlock` and `lock` vs `try_lock`.
This issue clearly needs further investigation. This issue clearly needs further investigation.
### Commit aa27064 - Performance with ttsa spinlocks (and 'full blocking' top level)
<img src="media/aa27064_fft_average.png" width="400"/>
### Commit d16ad3e - Performance with rw-lock and backoff
<img src="media/d16ad3e_fft_average.png" width="400"/>
### Commit 18b2d744 - Performance with lock-free deque
After much tinkering we still have performance problems with higher
thread counts in the FFT benchmark. Upward from 4/5 threads the
performance gains start to saturate (before removing the top level
locks we even saw a slight drop in performance).
Currently the FFT benchmark shows the following results (average):
<img src="media/18b2d744_fft_average.png" width="400"/>
We want to positively note that the overall trend of 'performance drops'
at the hyperthreading mark is not really bad anymore, it rather
seems similar to EMBB now (with backoff + lockfree deque + top level
reader-writers lock). This comes partly because the spike at 4 threads
is lower (less performance at 4 threads). We also see better times
on the multiprogramed system with the lock-free deque.
This is discouraging after many tests. To see where the overhead lies
we also implemented the unbalanced tree search benchmark,
resulting in the following, suprisingly good, results (average):
<img src="media/18b2d744_unbalanced_average.png" width="400"/>
The main difference between the two benchmarks is, that the second
one has more work and the work is relatively independent.
Additionaly, the first one uses our high level API (parallel invoke),
while the second one uses our low level API.
It is worth investigating if either or high level API or the structure
of the memory access in FFT are the problem.
### Commit cf056856 - Remove two-level scheduler
In this test we replace the two level scheduler with ONLY fork_join
tasks. This removes the top level steal overhead and performs only
internal stealing. For this we set the fork_join task as the only
possible task type and removed the top level rw-lock, the digging
down to our level and solely use internal stealing.
Average results FFT:
<img src="media/cf056856_fft_average.png" width="400"/>
Average results Unbalanced:
<img src="media/cf056856_unbalanced_average.png" width="400"/>
There seems to be only a minor performance difference between the two,
suggesting tha our two-level approach is not the part causing our
weaker performance.
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
This section will give a brief introduction on how to get a minimal This section will give a brief introduction on how to get a minimal
project setup that uses the PLS library. project setup that uses the PLS library.
Further [general notes](NOTES.md) and [performance notes](PERFORMANCE.md) can be found in
their respective files.
Further notes on [performance](PERFORMANCE.md) and general Further notes on [performance](PERFORMANCE.md) and general
[notes](NOTES.md) on the development progress can be found in [notes](NOTES.md) on the development progress can be found in
......
...@@ -6,82 +6,81 @@ ...@@ -6,82 +6,81 @@
#include <complex> #include <complex>
#include <vector> #include <vector>
static constexpr int CUTOFF = 10; static constexpr int CUTOFF = 16;
static constexpr int NUM_ITERATIONS = 1000; static constexpr int NUM_ITERATIONS = 1000;
static constexpr int INPUT_SIZE = 2064; static constexpr int INPUT_SIZE = 8192;
typedef std::vector<std::complex<double>> complex_vector; typedef std::vector<std::complex<double>> complex_vector;
void divide(complex_vector::iterator data, int n) { void divide(complex_vector::iterator data, int n) {
complex_vector tmp_odd_elements(n / 2); complex_vector tmp_odd_elements(n / 2);
for (int i = 0; i < n / 2; i++) { for (int i = 0; i < n / 2; i++) {
tmp_odd_elements[i] = data[i * 2 + 1]; tmp_odd_elements[i] = data[i * 2 + 1];
} }
for (int i = 0; i < n / 2; i++) { for (int i = 0; i < n / 2; i++) {
data[i] = data[i * 2]; data[i] = data[i * 2];
} }
for (int i = 0; i < n / 2; i++) { for (int i = 0; i < n / 2; i++) {
data[i + n / 2] = tmp_odd_elements[i]; data[i + n / 2] = tmp_odd_elements[i];
} }
} }
void combine(complex_vector::iterator data, int n) { void combine(complex_vector::iterator data, int n) {
for (int i = 0; i < n / 2; i++) { for (int i = 0; i < n / 2; i++) {
std::complex<double> even = data[i]; std::complex<double> even = data[i];
std::complex<double> odd = data[i + n / 2]; std::complex<double> odd = data[i + n / 2];
// w is the "twiddle-factor". // w is the "twiddle-factor".
// this could be cached, but we run the same 'data_structures' algorithm parallel/serial, // this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
// so it won't impact the performance comparison. // so it won't impact the performance comparison.
std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n)); std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n));
data[i] = even + w * odd; data[i] = even + w * odd;
data[i + n / 2] = even - w * odd; data[i + n / 2] = even - w * odd;
} }
} }
void fft(complex_vector::iterator data, int n) { void fft(complex_vector::iterator data, int n) {
if (n < 2) { if (n < 2) {
return; return;
} }
divide(data, n); divide(data, n);
if (n <= CUTOFF) { if (n <= CUTOFF) {
fft(data, n / 2); fft(data, n / 2);
fft(data + n / 2, n / 2); fft(data + n / 2, n / 2);
} else { } else {
pls::invoke_parallel( pls::invoke_parallel(
[&] { fft(data, n / 2); }, [&] { fft(data, n / 2); },
[&] { fft(data + n / 2, n / 2); } [&] { fft(data + n / 2, n / 2); }
); );
} }
combine(data, n); combine(data, n);
} }
complex_vector prepare_input(int input_size) { complex_vector prepare_input(int input_size) {
std::vector<double> known_frequencies{2, 11, 52, 88, 256}; std::vector<double> known_frequencies{2, 11, 52, 88, 256};
complex_vector data(input_size); complex_vector data(input_size);
// Set our input data to match a time series of the known_frequencies. // Set our input data to match a time series of the known_frequencies.
// When applying fft to this time-series we should find these frequencies. // When applying fft to this time-series we should find these frequencies.
for (int i = 0; i < input_size; i++) { for (int i = 0; i < input_size; i++) {
data[i] = std::complex<double>(0.0, 0.0); data[i] = std::complex<double>(0.0, 0.0);
for (auto frequencie : known_frequencies) { for (auto frequencie : known_frequencies) {
data[i] += sin(2 * M_PI * frequencie * i / input_size); data[i] += sin(2 * M_PI * frequencie * i / input_size);
}
} }
}
return data; return data;
} }
int main() { int main() {
PROFILE_ENABLE PROFILE_ENABLE
complex_vector initial_input = prepare_input(INPUT_SIZE); complex_vector initial_input = prepare_input(INPUT_SIZE);
pls::internal::helpers::run_mini_benchmark([&] { pls::internal::helpers::run_mini_benchmark([&] {
complex_vector input = initial_input; complex_vector input = initial_input;
fft(input.begin(), input.size()); fft(input.begin(), input.size());
}, 8, 4000); }, 8, 4000);
PROFILE_SAVE("test_profile.prof") PROFILE_SAVE("test_profile.prof")
} }
add_executable(benchmark_unbalanced main.cpp node.h node.cpp picosha2.h)
target_link_libraries(benchmark_unbalanced pls)
if (EASY_PROFILER)
target_link_libraries(benchmark_unbalanced easy_profiler)
endif ()
MIT License
Copyright (c) 2017 okdshin
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
#include <pls/pls.h>
#include <pls/internal/helpers/profiler.h>
#include <pls/internal/helpers/mini_benchmark.h>
#include "node.h"
const int SEED = 42;
const int ROOT_CHILDREN = 140;
const double Q = 0.124875;
const int NORMAL_CHILDREN = 8;
const int NUM_NODES = 71069;
int count_child_nodes(uts::node &node) {
int child_count = 1;
std::vector<uts::node> children = node.spawn_child_nodes();
if (children.empty()) {
return child_count;
}
auto current_task = pls::fork_join_sub_task::current();
std::vector<int> results(children.size());
for (size_t i = 0; i < children.size(); i++) {
size_t index = i;
auto lambda = [&, index] { results[index] = count_child_nodes(children[index]); };
pls::fork_join_lambda_by_value<typeof(lambda)> sub_task(lambda);
current_task->spawn_child(sub_task);
}
current_task->wait_for_all();
for (auto result : results) {
child_count += result;
}
return child_count;
}
int unbalanced_tree_search(int seed, int root_children, double q, int normal_children) {
static auto id = pls::unique_id::create(42);
int result;
auto lambda = [&] {
uts::node root(seed, root_children, q, normal_children);
result = count_child_nodes(root);
};
pls::fork_join_lambda_by_reference<typeof(lambda)> task(lambda);
pls::fork_join_lambda_by_reference<typeof(lambda)> sub_task(lambda);
pls::fork_join_task root_task{&sub_task, id};
pls::scheduler::execute_task(root_task);
return result;
}
int main() {
PROFILE_ENABLE
pls::internal::helpers::run_mini_benchmark([&] {
unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
}, 8, 4000);
PROFILE_SAVE("test_profile.prof")
}
//int main() {
// PROFILE_ENABLE
// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18};
// pls::scheduler scheduler{&my_scheduler_memory, 8};
//
// scheduler.perform_work([&] {
// PROFILE_MAIN_THREAD
// for (int i = 0; i < 10; i++) {
// PROFILE_WORK_BLOCK("Top Level")
// int result = unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
// std::cout << result << std::endl;
// }
// });
//
// PROFILE_SAVE("test_profile.prof")
//}
#include "node.h"
namespace uts {
node_state node::generate_child_state(uint32_t index) {
node_state result;
picosha2::hash256_one_by_one hasher;
hasher.process(state_.begin(), state_.end());
auto index_begin = reinterpret_cast<uint8_t *>(&index);
hasher.process(index_begin, index_begin + 4);
hasher.finish();
hasher.get_hash_bytes(result.begin(), result.end());
return result;
}
double node::get_state_random() {
int32_t state_random_integer;
uint32_t b = ((uint32_t) state_[16] << 24) |
((uint32_t) state_[17] << 16) |
((uint32_t) state_[18] << 8) |
((uint32_t) state_[19] << 0);
b = b & 0x7fffffff; // Mask out negative values
state_random_integer = static_cast<int32_t>(b);
return (double) state_random_integer / (double) INT32_MAX;
}
}
#ifndef UTS_NODE_H
#define UTS_NODE_H
#include <cstdint>
#include <array>
#include <vector>
#include "picosha2.h"
namespace uts {
using node_state = std::array<uint8_t, 20>;
/**
* Node of an unballanced binomial tree (https://www.cs.unc.edu/~olivier/LCPC06.pdf).
* To build up the tree recursivly call spawn_child_nodes on each node until leaves are reached.
* The tree is not built up directly in memory, but rather by the recursive calls.
*/
class node {
// The state is used to allow a deterministic tree construction using sha256 hashes.
node_state state_;
// Set this to a positive number for the root node to start the tree with a specific size
int root_children_;
// general branching factors
double q_;
int b_;
// Private constructor for children
node(node_state state, double q, int b) : state_{state}, root_children_{-1}, q_{q}, b_{b} {}
std::array<uint8_t, 20> generate_child_state(uint32_t index);
double get_state_random();
public:
node(int seed, int root_children, double q, int b) : state_({{}}), root_children_{root_children}, q_{q}, b_{b} {
for (int i = 0; i < 16; i++) {
state_[i] = 0;
}
state_[16] = static_cast<uint8_t>(0xFF & (seed >> 24));
state_[17] = static_cast<uint8_t>(0xFF & (seed >> 16));
state_[18] = static_cast<uint8_t>(0xFF & (seed >> 8));
state_[19] = static_cast<uint8_t>(0xFF & (seed >> 0));
picosha2::hash256_one_by_one hasher;
hasher.process(state_.begin(), state_.end());
hasher.finish();
hasher.get_hash_bytes(state_.begin(), state_.end());
}
std::vector<node> spawn_child_nodes() {
double state_random = get_state_random();
int num_children;
if (root_children_ > 0) {
num_children = root_children_; // Root always spawns children
} else if (state_random < q_) {
num_children = b_;
} else {
num_children = 0;
}
std::vector<node> result;
for (int i = 0; i < num_children; i++) {
result.push_back(node(generate_child_state(i), q_, b_));
}
return result;
}
};
}
#endif //UTS_NODE_H
...@@ -2,50 +2,101 @@ ...@@ -2,50 +2,101 @@
#include <pls/internal/helpers/profiler.h> #include <pls/internal/helpers/profiler.h>
#include <iostream> #include <iostream>
#include <complex>
#include <vector>
static pls::static_scheduler_memory<8, 2 << 14> my_scheduler_memory; static constexpr int CUTOFF = 16;
static constexpr int INPUT_SIZE = 8192;
typedef std::vector<std::complex<double>> complex_vector;
static constexpr int CUTOFF = 10; void divide(complex_vector::iterator data, int n) {
complex_vector tmp_odd_elements(n / 2);
for (int i = 0; i < n / 2; i++) {
tmp_odd_elements[i] = data[i * 2 + 1];
}
for (int i = 0; i < n / 2; i++) {
data[i] = data[i * 2];
}
for (int i = 0; i < n / 2; i++) {
data[i + n / 2] = tmp_odd_elements[i];
}
}
long fib_serial(long n) { void combine(complex_vector::iterator data, int n) {
if (n == 0) { for (int i = 0; i < n / 2; i++) {
return 0; std::complex<double> even = data[i];
} std::complex<double> odd = data[i + n / 2];
if (n == 1) {
return 1; // w is the "twiddle-factor".
} // this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
// so it won't impact the performance comparison.
std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n));
return fib_serial(n - 1) + fib_serial(n - 2); data[i] = even + w * odd;
data[i + n / 2] = even - w * odd;
}
} }
long fib(long n) { void fft(complex_vector::iterator data, int n) {
if (n <= CUTOFF) { if (n < 2) {
return fib_serial(n); return;
} }
// Actual 'invoke_parallel' logic/code PROFILE_WORK_BLOCK("Divide")
int left, right; divide(data, n);
PROFILE_END_BLOCK
PROFILE_WORK_BLOCK("Invoke Parallel")
if (n == CUTOFF) {
PROFILE_WORK_BLOCK("FFT Serial")
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else if (n <= CUTOFF) {
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else {
pls::invoke_parallel( pls::invoke_parallel(
[&] { left = fib(n - 1); }, [&] { fft(data, n / 2); },
[&] { right = fib(n - 2); } [&] { fft(data + n / 2, n / 2); }
); );
return left + right; }
PROFILE_END_BLOCK
PROFILE_WORK_BLOCK("Combine")
combine(data, n);
PROFILE_END_BLOCK
}
complex_vector prepare_input(int input_size) {
std::vector<double> known_frequencies{2, 11, 52, 88, 256};
complex_vector data(input_size);
// Set our input data to match a time series of the known_frequencies.
// When applying fft to this time-series we should find these frequencies.
for (int i = 0; i < input_size; i++) {
data[i] = std::complex<double>(0.0, 0.0);
for (auto frequencie : known_frequencies) {
data[i] += sin(2 * M_PI * frequencie * i / input_size);
}
}
return data;
} }
int main() { int main() {
PROFILE_ENABLE PROFILE_ENABLE
pls::scheduler scheduler{&my_scheduler_memory, 8}; pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 14};
pls::scheduler scheduler{&my_scheduler_memory, 8};
long result;
scheduler.perform_work([&] { complex_vector initial_input = prepare_input(INPUT_SIZE);
PROFILE_MAIN_THREAD scheduler.perform_work([&] {
// Call looks just the same, only requirement is PROFILE_MAIN_THREAD
// the enclosure in the perform_work lambda. // Call looks just the same, only requirement is
for (int i = 0; i < 10; i++) { // the enclosure in the perform_work lambda.
result = fib(30); for (int i = 0; i < 10; i++) {
std::cout << "Fib(30)=" << result << std::endl; PROFILE_WORK_BLOCK("Top Level FFT")
} complex_vector input = initial_input;
}); fft(input.begin(), input.size());
}
PROFILE_SAVE("test_profile.prof") });
PROFILE_SAVE("test_profile.prof")
} }
...@@ -10,8 +10,6 @@ ...@@ -10,8 +10,6 @@
#include <pls/internal/scheduling/root_task.h> #include <pls/internal/scheduling/root_task.h>
#include <pls/internal/helpers/unique_id.h> #include <pls/internal/helpers/unique_id.h>
int main() { int main() {
std::cout << pls::internal::scheduling::root_task<void(*)>::create_id().type_.hash_code() << std::endl;
std::cout << pls::internal::helpers::unique_id::create<pls::internal::scheduling::root_task<void(*)>>().type_.hash_code() << std::endl;
} }
...@@ -5,9 +5,8 @@ using namespace pls::internal::base; ...@@ -5,9 +5,8 @@ using namespace pls::internal::base;
int global = 0; int global = 0;
int main() { int main() {
// Try to use every feature, to trigger the prohibited use of new if found somewhere // Try to use every feature, to trigger the prohibited use of new if found somewhere
auto t1 = start_thread([] (){}); auto t1 = start_thread([]() {});
t1.join(); t1.join();
} }
...@@ -6,15 +6,17 @@ ...@@ -6,15 +6,17 @@
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
namespace pls { namespace pls {
namespace algorithm { namespace algorithm {
template<typename Function1, typename Function2>
void invoke_parallel(const Function1& function1, const Function2& function2);
template<typename Function1, typename Function2, typename Function3> template<typename Function1, typename Function2>
void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3); void invoke_parallel(const Function1 &function1, const Function2 &function2);
// ...and so on, add more if we decide to keep this design template<typename Function1, typename Function2, typename Function3>
} void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3);
// ...and so on, add more if we decide to keep this design
}
} }
#include "invoke_parallel_impl.h" #include "invoke_parallel_impl.h"
......
...@@ -2,70 +2,73 @@ ...@@ -2,70 +2,73 @@
#ifndef PLS_INVOKE_PARALLEL_IMPL_H #ifndef PLS_INVOKE_PARALLEL_IMPL_H
#define PLS_INVOKE_PARALLEL_IMPL_H #define PLS_INVOKE_PARALLEL_IMPL_H
#include <pls/internal/scheduling/fork_join_task.h>
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
#include "pls/internal/base/alignment.h"
namespace pls { namespace pls {
namespace algorithm { namespace algorithm {
namespace internal { namespace internal {
using namespace ::pls::internal::scheduling;
template<typename Body> using namespace ::pls::internal::scheduling;
inline void run_body(const Body& internal_body, const abstract_task::id& id) {
// Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
auto current_sub_task = reinterpret_cast<fork_join_task*>(current_task)->currently_executing();
internal_body(current_sub_task);
} else {
fork_join_lambda<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
template<typename Function1, typename Function2> template<typename Body>
void invoke_parallel(const Function1& function1, const Function2& function2) { inline void run_body(const Body &internal_body, const abstract_task::id &id) {
using namespace ::pls::internal::scheduling; // Make sure we are in the context of this invoke_parallel instance,
using namespace ::pls::internal::helpers; // if not we will spawn it as a new 'fork-join-style' task.
static abstract_task::id id = unique_id::create<Function1, Function2>(); auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
internal_body();
} else {
fork_join_lambda_by_reference<Body> root_body(internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
auto internal_body = [&] (fork_join_sub_task* this_task){ template<typename Function1, typename Function2>
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; void invoke_parallel(const Function1 &function1, const Function2 &function2) {
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1); using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<Function1, Function2>();
this_task->spawn_child(sub_task_1); auto internal_body = [&]() {
function2(); // Execute last function 'inline' without spawning a sub_task object auto current_task = fork_join_sub_task::current();
this_task->wait_for_all(); auto sub_task_2 = fork_join_lambda_by_reference<Function2>(function2);
};
internal::run_body(internal_body, id); current_task->spawn_child(sub_task_2);
} function1(); // Execute first function 'inline' without spawning a sub_task object
current_task->wait_for_all();
};
internal::run_body(internal_body, id);
}
template<typename Function1, typename Function2, typename Function3> template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3) { void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3) {
using namespace ::pls::internal::scheduling; using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers; using namespace ::pls::internal::helpers;
static abstract_task::id id = unique_id::create<Function1, Function2, Function3>(); static abstract_task::id id = unique_id::create<Function1, Function2, Function3>();
auto internal_body = [&] (fork_join_sub_task* this_task){ auto internal_body = [&]() {
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; auto current_task = fork_join_sub_task::current();
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1); auto sub_task_2 = fork_join_lambda_by_reference<Function2>(function2);
auto sub_task_body_2 = [&] (fork_join_sub_task*){ function2(); }; auto sub_task_3 = fork_join_lambda_by_reference<Function3>(function3);
auto sub_task_2 = fork_join_lambda<decltype(sub_task_body_2)>(&sub_task_body_2);
this_task->spawn_child(sub_task_1); current_task->spawn_child(sub_task_2);
this_task->spawn_child(sub_task_2); current_task->spawn_child(sub_task_3);
function3(); // Execute last function 'inline' without spawning a sub_task object function1(); // Execute first function 'inline' without spawning a sub_task object
this_task->wait_for_all(); current_task->wait_for_all();
}; };
internal::run_body(internal_body, id); internal::run_body(internal_body, id);
} }
}
}
} }
#endif //PLS_INVOKE_PARALLEL_IMPL_H #endif //PLS_INVOKE_PARALLEL_IMPL_H
...@@ -8,21 +8,45 @@ ...@@ -8,21 +8,45 @@
#include "system_details.h" #include "system_details.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
namespace alignment { namespace alignment {
template<typename T>
struct aligned_wrapper { template<typename T>
alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)]; struct aligned_wrapper {
T* pointer() { return reinterpret_cast<T*>(data); } alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)];
}; T *pointer() { return reinterpret_cast<T *>(data); }
void* allocate_aligned(size_t size); };
void *allocate_aligned(size_t size);
std::uintptr_t next_alignment(std::uintptr_t size);
char* next_alignment(char* pointer); system_details::pointer_t next_alignment(system_details::pointer_t size);
} system_details::pointer_t previous_alignment(system_details::pointer_t size);
} char *next_alignment(char *pointer);
}
}
template<typename T>
struct aligned_aba_pointer {
const system_details::pointer_t pointer_;
explicit aligned_aba_pointer(T *pointer, unsigned int aba = 0) : pointer_{
reinterpret_cast<system_details::pointer_t >(pointer) + aba} {}
T *pointer() const {
return reinterpret_cast<T *>(pointer_ & system_details::CACHE_LINE_ADDRESS_USED_BITS);
}
unsigned int aba() const {
return pointer_ & system_details::CACHE_LINE_ADDRESS_UNUSED_BITS;
}
aligned_aba_pointer set_aba(unsigned int aba) const {
return aligned_aba_pointer(pointer(), aba);
}
};
}
}
} }
#endif //PLS_ALIGNMENT_H #endif //PLS_ALIGNMENT_H
#ifndef PLS_BACKOFF_H_
#define PLS_BACKOFF_H_
#include "pls/internal/base/system_details.h"
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/thread.h"
#include <random>
#include <math.h>
namespace pls {
namespace internal {
namespace base {
class backoff {
const unsigned long INITIAL_SPIN_ITERS = 2u << 4u;
const unsigned long MAX_SPIN_ITERS = 2u << 8u;
const unsigned long MAX_ITERS = 2u << 10u;
const unsigned long YELD_ITERS = 2u << 10u;
unsigned long current_ = INITIAL_SPIN_ITERS;
std::minstd_rand random_;
static void spin(unsigned long iterations) {
for (volatile unsigned long i = 0; i < iterations; i++)
system_details::relax_cpu(); // Spin
}
public:
backoff() : current_{INITIAL_SPIN_ITERS}, random_{std::random_device{}()} {}
void do_backoff() {
PROFILE_LOCK("Backoff")
spin(random_() % std::min(current_, MAX_SPIN_ITERS));
if (current_ >= YELD_ITERS) {
PROFILE_LOCK("Yield")
this_thread::yield();
}
current_ = std::min(current_ * 2, MAX_ITERS);
}
void reset() {
current_ = INITIAL_SPIN_ITERS;
}
};
}
}
}
#endif //PLS_BACKOFF_H_
...@@ -5,27 +5,29 @@ ...@@ -5,27 +5,29 @@
#include <pthread.h> #include <pthread.h>
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
/**
* Provides standard barrier behaviour. /**
* `count` threads have to call `wait()` before any of the `wait()` calls returns, * Provides standard barrier behaviour.
* thus blocking all threads until everyone reached the barrier. * `count` threads have to call `wait()` before any of the `wait()` calls returns,
* * thus blocking all threads until everyone reached the barrier.
* PORTABILITY: *
* Current implementation is based on pthreads. * PORTABILITY:
*/ * Current implementation is based on pthreads.
class barrier { */
pthread_barrier_t barrier_; class barrier {
pthread_barrier_t barrier_;
public:
explicit barrier(unsigned int count); public:
~barrier(); explicit barrier(unsigned int count);
~barrier();
void wait();
}; void wait();
} };
}
}
}
} }
#endif //PLS_BARRIER_H #endif //PLS_BARRIER_H
...@@ -11,5 +11,6 @@ ...@@ -11,5 +11,6 @@
* (or its inclusion adds too much overhead). * (or its inclusion adds too much overhead).
*/ */
#define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1); #define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1);
#define PLS_ASSERT(cond, msg) if (!cond) { PLS_ERROR(msg) }
#endif //PLS_ERROR_HANDLING_H #endif //PLS_ERROR_HANDLING_H
...@@ -6,12 +6,14 @@ ...@@ -6,12 +6,14 @@
#include "ttas_spin_lock.h" #include "ttas_spin_lock.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
// Default Spin-Lock implementation for this project.
using spin_lock = tas_spin_lock; // Default Spin-Lock implementation for this project.
} using spin_lock = ttas_spin_lock;
}
}
}
} }
#endif //PLS_SPINLOCK_H #endif //PLS_SPINLOCK_H
#ifndef PLS_SWMR_SPIN_LOCK_LOCK_H_
#define PLS_SWMR_SPIN_LOCK_LOCK_H_
#include <atomic>
#include "pls/internal/helpers/profiler.h"
namespace pls {
namespace internal {
namespace base {
/**
* Single writer, multiple reader spin lock.
* The writer is required to be the same thread all the time (single writer),
* while multiple threads can read.
* Readers fail to lock when the writer requests the lock,
* the acquires the lock after all remaining readers left the critical section.
*/
class swmr_spin_lock {
std::atomic<int> readers_;
std::atomic<int> write_request_;
public:
explicit swmr_spin_lock() : readers_{0}, write_request_{0} {}
bool reader_try_lock();
void reader_unlock();
void writer_lock();
void writer_unlock();
};
}
}
}
#endif //PLS_SWMR_SPIN_LOCK_LOCK_H_
...@@ -3,31 +3,76 @@ ...@@ -3,31 +3,76 @@
#define PLS_SYSTEM_DETAILS_H #define PLS_SYSTEM_DETAILS_H
#include <cstdint> #include <cstdint>
#if (COMPILER == MVCC)
#include <emmintrin.h>
#endif
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
/**
* Collection of system details, e.g. hardware cache line size. /**
* * Collection of system details, e.g. hardware cache line size.
* PORTABILITY: *
* Currently sane default values for x86. * PORTABILITY:
*/ * Currently sane default values for x86.
namespace system_details { */
/** namespace system_details {
* Most processors have 64 byte cache lines
*/ /**
constexpr std::uintptr_t CACHE_LINE_SIZE = 64; * Pointer Types needed for ABA protection mixed into addresses.
* pointer_t should be an integer type capable of holding ANY pointer value.
/** */
* Choose one of the following ways to store thread specific data. using pointer_t = std::uintptr_t;
* Try to choose the fastest available on this processor/system. constexpr pointer_t ZERO_POINTER = 0;
*/ constexpr pointer_t MAX_POINTER = ~ZERO_POINTER;
// #define PLS_THREAD_SPECIFIC_PTHREAD
#define PLS_THREAD_SPECIFIC_COMPILER /**
} * Biggest type that supports atomic CAS operations.
} * Usually it is sane to assume a pointer can be swapped in a single CAS operation.
} */
using cas_integer = pointer_t;
constexpr cas_integer MIN_CAS_INTEGER = 0;
constexpr cas_integer MAX_CAS_INTEGER = ~MIN_CAS_INTEGER;
constexpr cas_integer FIRST_HALF_CAS_INTEGER = MAX_CAS_INTEGER << ((sizeof(cas_integer) / 2) * 8);
constexpr cas_integer SECOND_HALF_CAS_INTEGER = ~FIRST_HALF_CAS_INTEGER;
/**
* Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings).
*/
constexpr unsigned int CACHE_LINE_ADDRESS_BITS = 6;
constexpr pointer_t CACHE_LINE_SIZE = 2u << (CACHE_LINE_ADDRESS_BITS - 1);
constexpr pointer_t CACHE_LINE_ADDRESS_USED_BITS = MAX_POINTER << CACHE_LINE_ADDRESS_BITS;
constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BITS;
/**
* Choose one of the following ways to store thread specific data.
* Try to choose the fastest available on this processor/system.
*/
//#define PLS_THREAD_SPECIFIC_PTHREAD
#define PLS_THREAD_SPECIFIC_COMPILER
/**
* When spinning one wants to 'relax' the CPU from some task,
* e.g. disabling speculative execution/branch prediction
* or reducing its clock speed.
* This is both good for power draw, as well as for hyperthreading.
*
* Choose the implementation appropriate for your compiler-cpu combination.
*/
#if (COMPILER == MVCC)
inline void relax_cpu() {
_mm_pause();
}
#elif (COMPILER == GCC || COMPILER == LLVM)
inline void relax_cpu() {
asm("pause");
}
#endif
}
}
}
} }
#endif //PLS_SYSTEM_DETAILS_H #endif //PLS_SYSTEM_DETAILS_H
...@@ -10,30 +10,29 @@ ...@@ -10,30 +10,29 @@
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
/**
* A simple set and test_and_set based spin lock implementation. /**
* * A simple set and test_and_set based spin lock implementation.
* PORTABILITY: *
* Current implementation is based on C++ 11 atomic_flag. * PORTABILITY:
*/ * Current implementation is based on C++ 11 atomic_flag.
class tas_spin_lock { */
std::atomic_flag flag_; class tas_spin_lock {
unsigned int yield_at_tries_; std::atomic_flag flag_;
public:
public: tas_spin_lock() : flag_{ATOMIC_FLAG_INIT} {};
tas_spin_lock(): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {}; tas_spin_lock(const tas_spin_lock &/*other*/) : flag_{ATOMIC_FLAG_INIT} {}
tas_spin_lock(const tas_spin_lock& other): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {}
void lock();
void lock(); bool try_lock(unsigned int num_tries = 1);
bool try_lock(unsigned int num_tries=1); void unlock();
void unlock(); };
};
}
}
}
}
}
}
#endif //PLS_TAS_SPIN_LOCK_H #endif //PLS_TAS_SPIN_LOCK_H
...@@ -9,113 +9,122 @@ ...@@ -9,113 +9,122 @@
#include <functional> #include <functional>
#include <pthread.h> #include <pthread.h>
#include <atomic> #include <atomic>
#include <time.h>
#include "system_details.h" #include "system_details.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
using thread_entrypoint = void();
using thread_entrypoint = void();
/**
* Static methods than can be performed on the current thread. /**
* * Static methods than can be performed on the current thread.
* usage: *
* this_thread::yield(); * usage:
* T* state = this_thread::state<T>(); * this_thread::yield();
* * T* state = this_thread::state<T>();
* PORTABILITY: *
* Current implementation is based on pthreads. * PORTABILITY:
*/ * Current implementation is based on pthreads.
class this_thread { */
template<typename Function, typename State> class this_thread {
friend class thread; template<typename Function, typename State>
friend
class thread;
#ifdef PLS_THREAD_SPECIFIC_PTHREAD #ifdef PLS_THREAD_SPECIFIC_PTHREAD
static pthread_key_t local_storage_key_; static pthread_key_t local_storage_key_;
static bool local_storage_key_initialized_; static bool local_storage_key_initialized_;
#endif #endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER #ifdef PLS_THREAD_SPECIFIC_COMPILER
static __thread void* local_state_; static __thread void *local_state_;
#endif #endif
public: public:
static void yield() { static void yield() {
pthread_yield(); pthread_yield();
} }
/** static void sleep(long microseconds) {
* Retrieves the local state pointer. timespec time{0, 1000 * microseconds};
* nanosleep(&time, nullptr);
* @tparam T The type of the state that is stored. }
* @return The state pointer hold for this thread.
*/ /**
template<typename T> * Retrieves the local state pointer.
static T* state(); *
* @tparam T The type of the state that is stored.
/** * @return The state pointer hold for this thread.
* Stores a pointer to the thread local state object. */
* The memory management for this has to be done by the user, template<typename T>
* we only keep the pointer. static T *state();
*
* @tparam T The type of the state that is stored. /**
* @param state_pointer A pointer to the threads state object. * Stores a pointer to the thread local state object.
*/ * The memory management for this has to be done by the user,
template<typename T> * we only keep the pointer.
static void set_state(T* state_pointer); *
}; * @tparam T The type of the state that is stored.
* @param state_pointer A pointer to the threads state object.
/** */
* Abstraction for starting a function in a separate thread. template<typename T>
* static void set_state(T *state_pointer);
* @tparam Function Lambda being started on the new thread. };
* @tparam State State type held for this thread.
* /**
* usage: * Abstraction for starting a function in a separate thread.
* T* state; *
* auto thread = start_thread([] { * @tparam Function Lambda being started on the new thread.
* // Run on new thread * @tparam State State type held for this thread.
* }, state); *
* thread.join(); // Wait for it to finish * usage:
* * T* state;
* PORTABILITY: * auto thread = start_thread([] {
* Current implementation is based on pthreads. * // Run on new thread
*/ * }, state);
template<typename Function, typename State> * thread.join(); // Wait for it to finish
class thread { *
friend class this_thread; * PORTABILITY:
// Keep a copy of the function (lambda) in this object to make sure it is valid when called! * Current implementation is based on pthreads.
Function function_; */
State* state_pointer_; template<typename Function, typename State>
class thread {
// Wee need to wait for the started function to read friend class this_thread;
// the function_ and state_pointer_ property before returning // Keep a copy of the function (lambda) in this object to make sure it is valid when called!
// from the constructor, as the object might be moved after this. Function function_;
std::atomic_flag* startup_flag_; State *state_pointer_;
// Keep handle to native implementation // Wee need to wait for the started function to read
pthread_t pthread_thread_; // the function_ and state_pointer_ property before returning
// from the constructor, as the object might be moved after this.
static void* start_pthread_internal(void* thread_pointer); std::atomic_flag *startup_flag_;
public: // Keep handle to native implementation
explicit thread(const Function& function, State* state_pointer); pthread_t pthread_thread_;
public: static void *start_pthread_internal(void *thread_pointer);
void join();
public:
// make object move only explicit thread(const Function &function, State *state_pointer);
thread(thread&&) noexcept = default;
thread& operator=(thread&&) noexcept = default; public:
void join();
thread(const thread&) = delete;
thread& operator=(const thread&) = delete; // make object move only
}; thread(thread &&) noexcept = default;
thread &operator=(thread &&) noexcept = default;
template<typename Function, typename State>
thread<Function, State> start_thread(const Function& function, State* state_pointer); thread(const thread &) = delete;
template<typename Function> thread &operator=(const thread &) = delete;
thread<Function, void> start_thread(const Function& function); };
}
} template<typename Function, typename State>
thread<Function, State> start_thread(const Function &function, State *state_pointer);
template<typename Function>
thread<Function, void> start_thread(const Function &function);
}
}
} }
#include "thread_impl.h" #include "thread_impl.h"
......
...@@ -3,86 +3,87 @@ ...@@ -3,86 +3,87 @@
#define PLS_THREAD_IMPL_H #define PLS_THREAD_IMPL_H
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
template<typename T>
T* this_thread::state() { template<typename T>
T *this_thread::state() {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD #ifdef PLS_THREAD_SPECIFIC_PTHREAD
return reinterpret_cast<T*>(pthread_getspecific(local_storage_key_)); return reinterpret_cast<T*>(pthread_getspecific(local_storage_key_));
#endif #endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER #ifdef PLS_THREAD_SPECIFIC_COMPILER
return reinterpret_cast<T*>(local_state_); return reinterpret_cast<T *>(local_state_);
#endif #endif
} }
template<typename T> template<typename T>
void this_thread::set_state(T* state_pointer) { void this_thread::set_state(T *state_pointer) {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD #ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer); pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
#endif #endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER #ifdef PLS_THREAD_SPECIFIC_COMPILER
local_state_ = state_pointer; local_state_ = state_pointer;
#endif #endif
} }
template<typename Function, typename State> template<typename Function, typename State>
void* thread<Function, State>::start_pthread_internal(void* thread_pointer) { void *thread<Function, State>::start_pthread_internal(void *thread_pointer) {
auto my_thread = reinterpret_cast<thread*>(thread_pointer); auto my_thread = reinterpret_cast<thread *>(thread_pointer);
Function my_function_copy = my_thread->function_; Function my_function_copy = my_thread->function_;
State* my_state_pointer_copy = my_thread->state_pointer_; State *my_state_pointer_copy = my_thread->state_pointer_;
// Now we have copies of everything we need on the stack. // Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more // The original thread object can be moved freely (no more
// references to its memory location). // references to its memory location).
my_thread->startup_flag_->clear(); my_thread->startup_flag_->clear();
this_thread::set_state(my_state_pointer_copy); this_thread::set_state(my_state_pointer_copy);
my_function_copy(); my_function_copy();
// Finished executing the user function // Finished executing the user function
pthread_exit(nullptr); pthread_exit(nullptr);
} }
template<typename Function, typename State> template<typename Function, typename State>
thread<Function, State>::thread(const Function& function, State* state_pointer): thread<Function, State>::thread(const Function &function, State *state_pointer):
function_{function}, function_{function},
state_pointer_{state_pointer}, state_pointer_{state_pointer},
startup_flag_{nullptr}, startup_flag_{nullptr},
pthread_thread_{} { pthread_thread_{} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD #ifdef PLS_THREAD_SPECIFIC_PTHREAD
if (!this_thread::local_storage_key_initialized_) { if (!this_thread::local_storage_key_initialized_) {
pthread_key_create(&this_thread::local_storage_key_, nullptr); pthread_key_create(&this_thread::local_storage_key_, nullptr);
this_thread::local_storage_key_initialized_ = true; this_thread::local_storage_key_initialized_ = true;
} }
#endif #endif
// We only need this during startup, will be destroyed when out of scope // We only need this during startup, will be destroyed when out of scope
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
startup_flag_ = &startup_flag; startup_flag_ = &startup_flag;
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this)); pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *) (this));
while (startup_flag.test_and_set()) while (startup_flag.test_and_set()); // Busy waiting for the starting flag to clear
; // Busy waiting for the starting flag to clear }
}
template<typename Function, typename State>
template<typename Function, typename State> void thread<Function, State>::join() {
void thread<Function, State>::join() { pthread_join(pthread_thread_, nullptr);
pthread_join(pthread_thread_, nullptr); }
}
template<typename Function, typename State>
template<typename Function, typename State> thread<Function, State> start_thread(const Function &function, State *state_pointer) {
thread<Function, State> start_thread(const Function& function, State* state_pointer) { return thread<Function, State>(function, state_pointer);
return thread<Function, State>(function, state_pointer); }
}
template<typename Function>
template<typename Function> thread<Function, void> start_thread(const Function &function) {
thread<Function, void> start_thread(const Function& function) { return thread<Function, void>(function, nullptr);
return thread<Function, void>(function, nullptr); }
}
} }
} }
} }
#endif //PLS_THREAD_IMPL_H #endif //PLS_THREAD_IMPL_H
...@@ -6,32 +6,30 @@ ...@@ -6,32 +6,30 @@
#include <iostream> #include <iostream>
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "pls/internal/base/backoff.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
/** /**
* A simple set and test_and_set based spin lock implementation. * A simple set and test_and_set based spin lock implementation.
* *
* PORTABILITY: * PORTABILITY:
* Current implementation is based on C++ 11 atomic_flag. * Current implementation is based on C++ 11 atomic_flag.
*/ */
class ttas_spin_lock { class ttas_spin_lock {
std::atomic<int> flag_; std::atomic<int> flag_;
const unsigned int yield_at_tries_;
public:
ttas_spin_lock() : flag_{0} {};
public: ttas_spin_lock(const ttas_spin_lock &/*other*/) : flag_{0} {}
ttas_spin_lock(): flag_{0}, yield_at_tries_{1024} {};
ttas_spin_lock(const ttas_spin_lock& other): flag_{0}, yield_at_tries_{other.yield_at_tries_} {} void lock();
bool try_lock(unsigned int num_tries = 1);
void lock(); void unlock();
bool try_lock(unsigned int num_tries=1); };
void unlock(); }
}; }
}
}
} }
#endif //PLS_TTAS_SPIN_LOCK_H #endif //PLS_TTAS_SPIN_LOCK_H
...@@ -9,45 +9,50 @@ ...@@ -9,45 +9,50 @@
#include "pls/internal/base/alignment.h" #include "pls/internal/base/alignment.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
/**
* Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region. using base::system_details::pointer_t;
* The objects will be stored aligned in the stack, making the storage cache friendly and very fast
* (as long as one can live with the stack restrictions). /**
* * Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region.
* IMPORTANT: Does not call destructors on stored objects! Do not allocate resources in the objects! * The objects will be stored aligned in the stack, making the storage cache friendly and very fast
* * (as long as one can live with the stack restrictions).
* Usage: *
* aligned_stack stack{pointer_to_memory, size_of_memory}; * IMPORTANT: Does not call destructors on stored objects! Do not allocate resources in the objects!
* T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack *
* stack.pop<T>(); // Deconstruct the top object of type T * Usage:
*/ * aligned_stack stack{pointer_to_memory, size_of_memory};
class aligned_stack { * T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack
// Keep bounds of our memory block * stack.pop<T>(); // Deconstruct the top object of type T
char* memory_start_; */
char* memory_end_; class aligned_stack {
// Keep bounds of our memory block
// Current head will always be aligned to cache lines pointer_t memory_start_;
char* head_; pointer_t memory_end_;
public:
typedef char* state; // Current head will always be aligned to cache lines
pointer_t head_;
aligned_stack(): memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {}; public:
aligned_stack(char* memory_region, std::size_t size); typedef pointer_t state;
template<typename T> aligned_stack() : memory_start_{0}, memory_end_{0}, head_{0} {};
T* push(const T& object); aligned_stack(pointer_t memory_region, std::size_t size);
template<typename T> aligned_stack(char *memory_region, std::size_t size);
void* push();
template<typename T> template<typename T>
T pop(); T *push(const T &object);
template<typename T>
state save_state() const { return head_; } void *push();
void reset_state(state new_state) { head_ = new_state; } template<typename T>
}; T pop();
}
} state save_state() const { return head_; }
void reset_state(state new_state) { head_ = new_state; }
};
}
}
} }
#include "aligned_stack_impl.h" #include "aligned_stack_impl.h"
......
...@@ -3,34 +3,36 @@ ...@@ -3,34 +3,36 @@
#define PLS_ALIGNED_STACK_IMPL_H #define PLS_ALIGNED_STACK_IMPL_H
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
template<typename T>
T* aligned_stack::push(const T& object) { template<typename T>
// Copy-Construct T *aligned_stack::push(const T &object) {
return new ((void*)push<T>())T(object); // Copy-Construct
} return new(push < T > ())T(object);
}
template<typename T>
void* aligned_stack::push() { template<typename T>
void* result = reinterpret_cast<T*>(head_); void *aligned_stack::push() {
void *result = reinterpret_cast<T *>(head_);
// Move head to next aligned position after new object
head_ = base::alignment::next_alignment(head_ + sizeof(T)); // Move head to next aligned position after new object
if (head_ >= memory_end_) { head_ = base::alignment::next_alignment(head_ + sizeof(T));
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!"); if (head_ >= memory_end_) {
} PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
return result;
} return result;
}
template<typename T>
T aligned_stack::pop() { template<typename T>
head_ = head_ - base::alignment::next_alignment(sizeof(T)); T aligned_stack::pop() {
return *reinterpret_cast<T*>(head_); head_ = head_ - base::alignment::next_alignment(sizeof(T));
} return *reinterpret_cast<T *>(head_);
} }
}
}
}
} }
#endif //PLS_ALIGNED_STACK_IMPL_H #endif //PLS_ALIGNED_STACK_IMPL_H
...@@ -5,56 +5,58 @@ ...@@ -5,56 +5,58 @@
#include "pls/internal/base/spin_lock.h" #include "pls/internal/base/spin_lock.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
/**
* Turns any object into deque item when inheriting from this. /**
*/ * Turns any object into deque item when inheriting from this.
class deque_item { */
friend class deque_internal; class deque_item {
friend class deque_internal;
deque_item* prev_;
deque_item* next_; deque_item *prev_;
deque_item *next_;
};
};
class deque_internal {
protected: class deque_internal {
deque_item* head_; protected:
deque_item* tail_; deque_item *head_;
deque_item *tail_;
base::spin_lock lock_;
base::spin_lock lock_;
deque_item* pop_head_internal();
deque_item* pop_tail_internal(); deque_item *pop_head_internal();
void push_tail_internal(deque_item *new_item); deque_item *pop_tail_internal();
}; void push_tail_internal(deque_item *new_item);
};
/**
* A double linked list based deque. /**
* Storage is therefore only needed for the individual items. * A double linked list based deque.
* * Storage is therefore only needed for the individual items.
* @tparam Item The type of items stored in this deque *
*/ * @tparam Item The type of items stored in this deque
template<typename Item> */
class deque: deque_internal { template<typename Item>
public: class deque : deque_internal {
explicit deque(): deque_internal{} {} public:
explicit deque() : deque_internal{} {}
inline Item* pop_head() {
return static_cast<Item*>(pop_head_internal()); inline Item *pop_head() {
} return static_cast<Item *>(pop_head_internal());
}
inline Item* pop_tail() {
return static_cast<Item*>(pop_tail_internal()); inline Item *pop_tail() {
} return static_cast<Item *>(pop_tail_internal());
}
inline void push_tail(Item* new_item) {
push_tail_internal(new_item); inline void push_tail(Item *new_item) {
} push_tail_internal(new_item);
}; }
} };
}
}
}
} }
#endif //PLS_DEQUE_H #endif //PLS_DEQUE_H
#ifndef PLS_WORK_STEALING_DEQUE_H_
#define PLS_WORK_STEALING_DEQUE_H_
#include <atomic>
#include <mutex>
#include <pls/internal/scheduling/thread_state.h>
#include "pls/internal/base/system_details.h"
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/error_handling.h"
#include "aligned_stack.h"
namespace pls {
namespace internal {
namespace data_structures {
using cas_integer = base::system_details::cas_integer;
using pointer_t = base::system_details::pointer_t;
static cas_integer get_stamp(cas_integer n) {
return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8);
}
static cas_integer get_offset(cas_integer n) {
return n & base::system_details::SECOND_HALF_CAS_INTEGER;
}
static cas_integer set_stamp(cas_integer n, cas_integer new_value) {
return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER);
}
//static cas_integer set_offset(cas_integer n, cas_integer new_value) {
// return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER);
//}
class work_stealing_deque_item {
// Pointer to the actual data
pointer_t data_;
// Index (relative to stack base) to the next and previous element
cas_integer next_item_;
cas_integer previous_item_;
public:
work_stealing_deque_item() : data_{0}, next_item_{0}, previous_item_{0} {}
template<typename Item>
Item *data() {
return reinterpret_cast<Item *>(data_);
}
template<typename Item>
void set_data(Item *data) {
data_ = reinterpret_cast<pointer_t >(data);
}
cas_integer next_item() {
return next_item_;
}
void set_next_item(cas_integer next_item) {
next_item_ = next_item;
}
cas_integer previous_item() {
return previous_item_;
}
void set_previous_item(cas_integer previous_item) {
previous_item_ = previous_item;
}
};
static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE,
"Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!");
template<typename Item>
class work_stealing_deque {
// Deque 'takes over' stack and handles memory management while in use.
// At any point in time the deque can stop using more memory and the stack can be used by other entities.
aligned_stack *stack_;
pointer_t base_pointer_;
std::atomic<cas_integer> head_;
std::atomic<cas_integer> tail_;
cas_integer previous_tail_;
base::spin_lock lock_{}; // TODO: Remove after debugging
public:
using state = aligned_stack::state;
explicit work_stealing_deque(aligned_stack *stack) : stack_{stack},
base_pointer_{0},
head_{0},
tail_{0},
previous_tail_{0} {
reset_base_pointer();
}
work_stealing_deque(const work_stealing_deque &other) : stack_{other.stack_},
base_pointer_{other.base_pointer_},
head_{other.head_.load()},
tail_{other.tail_.load()},
previous_tail_{other.previous_tail_} {}
void reset_base_pointer() {
base_pointer_ = reinterpret_cast<pointer_t >(stack_->save_state()); // Keep the base of our region in the stack
}
work_stealing_deque_item *item_at(cas_integer position) {
return reinterpret_cast<work_stealing_deque_item *>(base_pointer_
+ (base::system_details::CACHE_LINE_SIZE * position));
}
cas_integer current_stack_offset() {
return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
}
template<typename T>
std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item) {
// 'Union' type to push both on stack
using pair_t = std::pair<work_stealing_deque_item, T>;
// Allocate space on stack
auto new_pair = reinterpret_cast<pair_t *>(stack_->push<pair_t>());
// Initialize memory on stack
new((void *) &(new_pair->first)) work_stealing_deque_item();
new((void *) &(new_pair->second)) T(new_item);
return new_pair;
}
template<typename T>
Item *push_tail(const T &new_item) {
cas_integer local_tail = tail_;
auto new_pair = allocate_item(new_item);
// Prepare current tail to point to correct next items
auto tail_deque_item = item_at(local_tail);
tail_deque_item->set_data(&(new_pair->second));
tail_deque_item->set_next_item(current_stack_offset());
tail_deque_item->set_previous_item(previous_tail_);
previous_tail_ = local_tail;
// Linearization point, item appears after this write
cas_integer new_tail = current_stack_offset();
tail_ = new_tail;
return &(new_pair->second);
}
Item *pop_tail() {
cas_integer local_tail = tail_;
cas_integer local_head = head_;
if (local_tail <= get_offset(local_head)) {
return nullptr; // EMPTY
}
work_stealing_deque_item *previous_tail_item = item_at(previous_tail_);
cas_integer new_tail = previous_tail_;
previous_tail_ = previous_tail_item->previous_item();
// Publish our wish to set the tail back
tail_ = new_tail;
// Get the state of local head AFTER we published our wish
local_head = head_; // Linearization point, outside knows list is empty
if (get_offset(local_head) < new_tail) {
return previous_tail_item->data<Item>(); // Success, enough distance to other threads
}
if (get_offset(local_head) == new_tail) {
cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1);
// Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) {
return previous_tail_item->data<Item>(); // SUCCESS, we won the competition with other threads
}
}
// Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_
tail_ = get_offset(local_head); // ...we give up to the other winning thread
return nullptr; // EMPTY, we lost the competition with other threads
}
Item *pop_head() {
cas_integer local_head = head_;
cas_integer local_tail = tail_;
if (local_tail <= get_offset(local_head)) {
return nullptr; // EMPTY
}
// Load info on current deque item.
// In case we have a race with a new (aba) overwritten item at this position,
// there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head));
cas_integer next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>();
// We try to set the head to this new position.
// Possible outcomes:
// 1) no one interrupted us, we win this competition
// 2) other thread took the head, we lose to this
// 3) owning thread removed tail, we lose to this
cas_integer new_head = set_stamp(next_item_offset, get_stamp(local_head) + 1);
if (head_.compare_exchange_strong(local_head, new_head)) {
return head_data_item; // SUCCESS, we won the competition
}
return nullptr; // EMPTY, we lost the competition
}
void release_memory_until(state state) {
cas_integer item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
cas_integer local_head = head_;
cas_integer local_tail = tail_;
stack_->reset_state(state);
if (item_offset < local_tail) {
tail_ = item_offset;
if (get_offset(local_head) >= local_tail) {
head_ = set_stamp(item_offset, get_stamp(local_head) + 1);
}
}
}
void release_memory_until(Item *item) {
release_memory_until(reinterpret_cast<pointer_t >(item));
}
state save_state() {
return stack_->save_state();
}
};
}
}
}
#endif //PLS_WORK_STEALING_DEQUE_H_
...@@ -9,45 +9,47 @@ ...@@ -9,45 +9,47 @@
#include <iostream> #include <iostream>
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace helpers { namespace helpers {
// TODO: Clean up (separate into small functions and .cpp file)
template<typename Function> // TODO: Clean up (separate into small functions and .cpp file)
void run_mini_benchmark(const Function& lambda, size_t max_threads, unsigned long max_runtime_ms=1000) { template<typename Function>
using namespace std; void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned long max_runtime_ms = 1000) {
using namespace pls::internal::scheduling; using namespace std;
using namespace pls::internal::scheduling;
malloc_scheduler_memory scheduler_memory{max_threads};
for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) { malloc_scheduler_memory scheduler_memory{max_threads};
scheduler local_scheduler{&scheduler_memory, num_threads}; for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) {
scheduler local_scheduler{&scheduler_memory, num_threads};
chrono::high_resolution_clock::time_point start_time;
chrono::high_resolution_clock::time_point end_time; chrono::high_resolution_clock::time_point start_time;
unsigned long iterations = 0; chrono::high_resolution_clock::time_point end_time;
local_scheduler.perform_work([&] { unsigned long iterations = 0;
start_time = chrono::high_resolution_clock::now(); local_scheduler.perform_work([&] {
end_time = start_time; start_time = chrono::high_resolution_clock::now();
chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms); end_time = start_time;
chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms);
while (end_time < planned_end_time) {
lambda(); while (end_time < planned_end_time) {
end_time = chrono::high_resolution_clock::now(); lambda();
iterations++; end_time = chrono::high_resolution_clock::now();
} iterations++;
}); }
});
long time = chrono::duration_cast<chrono::microseconds>(end_time - start_time).count();
double time_per_iteration = (double)time / iterations; long time = chrono::duration_cast<chrono::microseconds>(end_time - start_time).count();
double time_per_iteration = (double) time / iterations;
std::cout << time_per_iteration;
if (num_threads < max_threads) { std::cout << time_per_iteration;
std::cout << ","; if (num_threads < max_threads) {
} std::cout << ",";
}
std::cout << std::endl;
}
}
} }
}
std::cout << std::endl;
}
}
}
} }
#endif //PLS_MINI_BENCHMARK_H #endif //PLS_MINI_BENCHMARK_H
...@@ -15,9 +15,9 @@ ...@@ -15,9 +15,9 @@
#ifdef NEW_LINK_ERROR #ifdef NEW_LINK_ERROR
// This will cause a linker error if new is used in the code. // This will cause a linker error if new is used in the code.
// We also exit if it is somehow still called. // We also exit if it is somehow still called.
inline void * operator new (std::size_t) { inline void *operator new(std::size_t) {
extern int bare_new_erroneously_called(); extern int bare_new_erroneously_called();
exit(bare_new_erroneously_called() | 1); exit(bare_new_erroneously_called() | 1);
} }
#else #else
// Use this + debugging point to find out where we use a new // Use this + debugging point to find out where we use a new
......
...@@ -7,25 +7,27 @@ ...@@ -7,25 +7,27 @@
#include <stdint.h> #include <stdint.h>
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace helpers { namespace helpers {
struct unique_id {
const uint32_t id_;
const std::type_info& type_;
bool operator==(const unique_id& other) const { return id_ == other.id_ && type_ == other.type_; }
static constexpr unique_id create(const uint32_t id) { struct unique_id {
return unique_id(id, typeid(void)); const uint32_t id_;
} const std::type_info &type_;
template<typename ...T> bool operator==(const unique_id &other) const { return id_ == other.id_ && type_ == other.type_; }
static constexpr unique_id create() {
return unique_id(UINT32_MAX, typeid(std::tuple<T...>)); static constexpr unique_id create(const uint32_t id) {
} return unique_id(id, typeid(void));
private: }
explicit constexpr unique_id(const uint32_t id, const std::type_info& type): id_{id}, type_{type} {}; template<typename ...T>
}; static constexpr unique_id create() {
} return unique_id(UINT32_MAX, typeid(std::tuple<T...>));
} }
private:
explicit constexpr unique_id(const uint32_t id, const std::type_info &type) : id_{id}, type_{type} {};
};
}
}
} }
#endif //PLS_UNIQUE_ID_H #endif //PLS_UNIQUE_ID_H
...@@ -2,42 +2,44 @@ ...@@ -2,42 +2,44 @@
#ifndef PLS_ABSTRACT_TASK_H #ifndef PLS_ABSTRACT_TASK_H
#define PLS_ABSTRACT_TASK_H #define PLS_ABSTRACT_TASK_H
#include "pls/internal/base/spin_lock.h" #include "pls/internal/base/swmr_spin_lock.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
class abstract_task {
public: class abstract_task {
using id = helpers::unique_id; public:
using id = helpers::unique_id;
private:
unsigned int depth_; private:
abstract_task::id unique_id_; unsigned int depth_;
abstract_task* child_task_; abstract_task::id unique_id_;
abstract_task *volatile child_task_;
public:
abstract_task(const unsigned int depth, const abstract_task::id& unique_id): public:
depth_{depth}, abstract_task(const unsigned int depth, const abstract_task::id &unique_id) :
unique_id_{unique_id}, depth_{depth},
child_task_{nullptr} {} unique_id_{unique_id},
child_task_{nullptr} {}
virtual void execute() = 0;
void set_child(abstract_task* child_task) { child_task_ = child_task; } virtual void execute() = 0;
abstract_task* child() { return child_task_; } void set_child(abstract_task *child_task) { child_task_ = child_task; }
abstract_task *child() const { return child_task_; }
void set_depth(unsigned int depth) { depth_ = depth; }
unsigned int depth() const { return depth_; } void set_depth(unsigned int depth) { depth_ = depth; }
id unique_id() const { return unique_id_; } unsigned int depth() const { return depth_; }
protected: id unique_id() const { return unique_id_; }
virtual bool internal_stealing(abstract_task* other_task) = 0; protected:
virtual bool split_task(base::spin_lock* lock) = 0; virtual bool internal_stealing(abstract_task *other_task) = 0;
virtual bool split_task(base::swmr_spin_lock *lock) = 0;
bool steal_work();
}; bool steal_work();
} };
}
}
}
} }
#endif //PLS_ABSTRACT_TASK_H #endif //PLS_ABSTRACT_TASK_H
...@@ -5,94 +5,119 @@ ...@@ -5,94 +5,119 @@
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/deque.h" #include "pls/internal/data_structures/work_stealing_deque.h"
#include "abstract_task.h" #include "abstract_task.h"
#include "thread_state.h" #include "thread_state.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
class fork_join_task;
class fork_join_sub_task: public data_structures::deque_item { class fork_join_task;
friend class fork_join_task; class fork_join_sub_task {
friend class fork_join_task;
// Coordinate finishing of sub_tasks
std::atomic_uint32_t ref_count_; // Coordinate finishing of sub_tasks
fork_join_sub_task* parent_; std::atomic_uint32_t ref_count_;
fork_join_sub_task *parent_;
// Access to TBB scheduling environment
fork_join_task* tbb_task_; // Access to TBB scheduling environment
fork_join_task *tbb_task_;
// Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::aligned_stack::state stack_state_; bool executed = false;
protected: int executed_at = -1;
explicit fork_join_sub_task();
fork_join_sub_task(const fork_join_sub_task& other); // Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::work_stealing_deque<fork_join_sub_task>::state deque_state_;
// Overwritten with behaviour of child tasks protected:
virtual void execute_internal() = 0; explicit fork_join_sub_task();
fork_join_sub_task(const fork_join_sub_task &other);
public:
// Only use them when actually executing this sub_task (only public for simpler API design) // Overwritten with behaviour of child tasks
template<typename T> virtual void execute_internal() = 0;
void spawn_child(const T& sub_task);
void wait_for_all(); public:
// Only use them when actually executing this sub_task (only public for simpler API design)
private: template<typename T>
void spawn_child_internal(fork_join_sub_task* sub_task); void spawn_child(T &sub_task);
void execute(); void wait_for_all();
};
static fork_join_sub_task *current();
template<typename Function> private:
class fork_join_lambda: public fork_join_sub_task { void execute();
const Function* function_; };
public: template<typename Function>
explicit fork_join_lambda(const Function* function): function_{function} {}; class fork_join_lambda_by_reference : public fork_join_sub_task {
const Function &function_;
protected:
void execute_internal() override { public:
(*function_)(this); explicit fork_join_lambda_by_reference(const Function &function) : fork_join_sub_task{}, function_{function} {};
}
}; protected:
void execute_internal() override {
class fork_join_task: public abstract_task { function_();
friend class fork_join_sub_task; }
};
fork_join_sub_task* root_task_;
fork_join_sub_task* currently_executing_; template<typename Function>
data_structures::aligned_stack* my_stack_; class fork_join_lambda_by_value : public fork_join_sub_task {
const Function function_;
// Double-Ended Queue management
data_structures::deque<fork_join_sub_task> deque_; public:
explicit fork_join_lambda_by_value(const Function &function) : fork_join_sub_task{}, function_{function} {};
// Steal Management
fork_join_sub_task* last_stolen_; protected:
void execute_internal() override {
fork_join_sub_task* get_local_sub_task(); function_();
fork_join_sub_task* get_stolen_sub_task(); }
};
protected:
bool internal_stealing(abstract_task* other_task) override; class fork_join_task : public abstract_task {
bool split_task(base::spin_lock* /*lock*/) override; friend class fork_join_sub_task;
public: fork_join_sub_task *root_task_;
explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id); fork_join_sub_task *currently_executing_;
void execute() override;
fork_join_sub_task* currently_executing() const; // Double-Ended Queue management
}; data_structures::work_stealing_deque<fork_join_sub_task> deque_;
template<typename T> // Steal Management
void fork_join_sub_task::spawn_child(const T& task) { fork_join_sub_task *last_stolen_;
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<fork_join_sub_task, T>::value, "Only pass fork_join_sub_task subclasses!"); fork_join_sub_task *get_local_sub_task();
fork_join_sub_task *get_stolen_sub_task();
T* new_task = tbb_task_->my_stack_->push(task);
spawn_child_internal(new_task); bool internal_stealing(abstract_task *other_task) override;
} bool split_task(base::swmr_spin_lock * /*lock*/) override;
}
} public:
explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id);
void execute() override;
fork_join_sub_task *currently_executing() const;
};
template<typename T>
void fork_join_sub_task::spawn_child(T &task) {
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<fork_join_sub_task, T>::value, "Only pass fork_join_sub_task subclasses!");
// Keep our refcount up to date
ref_count_++;
// Assign forced values
task.parent_ = this;
task.tbb_task_ = tbb_task_;
task.deque_state_ = tbb_task_->deque_.save_state();
// Push on our deque
const T const_task = task;
tbb_task_->deque_.push_tail(const_task);
}
}
}
} }
#endif //PLS_TBB_LIKE_TASK_H #endif //PLS_TBB_LIKE_TASK_H
...@@ -5,76 +5,78 @@ ...@@ -5,76 +5,78 @@
#include <mutex> #include <mutex>
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/spin_lock.h" #include "pls/internal/base/swmr_spin_lock.h"
#include "abstract_task.h" #include "abstract_task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
template<typename Function>
class root_task : public abstract_task { template<typename Function>
Function function_; class root_task : public abstract_task {
std::atomic_uint8_t finished_; Function function_;
public: std::atomic_uint8_t finished_;
static constexpr auto create_id = helpers::unique_id::create<root_task<Function>>; public:
static constexpr auto create_id = helpers::unique_id::create<root_task<Function>>;
explicit root_task(Function function):
abstract_task{0, create_id()}, explicit root_task(Function function) :
function_{function}, abstract_task{0, create_id()},
finished_{0} {} function_{function},
root_task(const root_task& other): finished_{0} {}
abstract_task{0, create_id()}, root_task(const root_task &other) :
function_{other.function_}, abstract_task{0, create_id()},
finished_{0} {} function_{other.function_},
finished_{0} {}
bool finished() {
return finished_; bool finished() {
} return finished_;
}
void execute() override {
PROFILE_WORK_BLOCK("execute root_task"); void execute() override {
function_(); PROFILE_WORK_BLOCK("execute root_task");
finished_ = 1; function_();
} finished_ = 1;
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false; bool internal_stealing(abstract_task * /*other_task*/) override {
} return false;
}
bool split_task(base::spin_lock* /*lock*/) override {
return false; bool split_task(base::swmr_spin_lock * /*lock*/) override {
} return false;
}; }
};
template<typename Function>
class root_worker_task : public abstract_task { template<typename Function>
root_task<Function>* master_task_; class root_worker_task : public abstract_task {
root_task<Function> *master_task_;
public:
static constexpr auto create_id = root_task<Function>::create_id; public:
static constexpr auto create_id = root_task<Function>::create_id;
explicit root_worker_task(root_task<Function>* master_task):
abstract_task{0, create_id()}, explicit root_worker_task(root_task<Function> *master_task) :
master_task_{master_task} {} abstract_task{0, create_id()},
master_task_{master_task} {}
void execute() override {
PROFILE_WORK_BLOCK("execute root_task"); void execute() override {
do { PROFILE_WORK_BLOCK("execute root_task");
steal_work(); do {
} while (!master_task_->finished()); steal_work();
} } while (!master_task_->finished());
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false; bool internal_stealing(abstract_task * /*other_task*/) override {
} return false;
}
bool split_task(base::spin_lock* /*lock*/) override {
return false; bool split_task(base::swmr_spin_lock * /*lock*/) override {
} return false;
}; }
} };
}
}
}
} }
#endif //PLS_ROOT_MASTER_TASK_H #endif //PLS_ROOT_MASTER_TASK_H
...@@ -12,107 +12,109 @@ ...@@ -12,107 +12,109 @@
#include "scheduler.h" #include "scheduler.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
template<typename Function>
class run_on_n_threads_task : public abstract_task { template<typename Function>
template<typename F> class run_on_n_threads_task : public abstract_task {
friend class run_on_n_threads_task_worker; template<typename F>
friend
Function function_; class run_on_n_threads_task_worker;
// Improvement: Remove lock and replace by atomic variable (performance) Function function_;
int counter;
base::spin_lock counter_lock_; // Improvement: Remove lock and replace by atomic variable (performance)
int counter;
int decrement_counter() { base::spin_lock counter_lock_;
std::lock_guard<base::spin_lock> lock{counter_lock_};
counter--; int decrement_counter() {
return counter; std::lock_guard<base::spin_lock> lock{counter_lock_};
} counter--;
return counter;
int get_counter() { }
std::lock_guard<base::spin_lock> lock{counter_lock_};
return counter; int get_counter() {
} std::lock_guard<base::spin_lock> lock{counter_lock_};
public: return counter;
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task<Function>>; }
public:
run_on_n_threads_task(Function function, int num_threads): static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task<Function>>;
abstract_task{0, create_id()},
function_{function}, run_on_n_threads_task(Function function, int num_threads) :
counter{num_threads - 1} {} abstract_task{0, create_id()},
function_{function},
void execute() override { counter{num_threads - 1} {}
// Execute our function ONCE
function_(); void execute() override {
// Execute our function ONCE
// Steal until we are finished (other threads executed) function_();
do {
steal_work(); // Steal until we are finished (other threads executed)
} while (get_counter() > 0); do {
steal_work();
std::cout << "Finished Master!" << std::endl; } while (get_counter() > 0);
}
std::cout << "Finished Master!" << std::endl;
bool internal_stealing(abstract_task* /*other_task*/) override { }
return false;
} bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
bool split_task(base::spin_lock* lock) override; }
};
bool split_task(base::swmr_spin_lock *lock) override;
template<typename Function> };
class run_on_n_threads_task_worker : public abstract_task {
Function function_; template<typename Function>
run_on_n_threads_task<Function>* root_; class run_on_n_threads_task_worker : public abstract_task {
public: Function function_;
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task_worker<Function>>; run_on_n_threads_task<Function> *root_;
public:
run_on_n_threads_task_worker(Function function, run_on_n_threads_task<Function>* root): static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task_worker<Function>>;
abstract_task{0, create_id()},
function_{function}, run_on_n_threads_task_worker(Function function, run_on_n_threads_task<Function> *root) :
root_{root} {} abstract_task{0, create_id()},
function_{function},
void execute() override { root_{root} {}
if (root_->decrement_counter() >= 0) {
function_(); void execute() override {
std::cout << "Finished Worker!" << std::endl; if (root_->decrement_counter() >= 0) {
} else { function_();
std::cout << "Abandoned Worker!" << std::endl; std::cout << "Finished Worker!" << std::endl;
} } else {
} std::cout << "Abandoned Worker!" << std::endl;
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task(base::spin_lock* /*lock*/) override {
return false;
}
};
template<typename Function>
bool run_on_n_threads_task<Function>::split_task(base::spin_lock* lock) {
if (get_counter() <= 0) {
return false;
}
// In success case, unlock.
// TODO: this locking is complicated and error prone.
lock->unlock();
auto scheduler = base::this_thread::state<thread_state>()->scheduler_;
auto task = run_on_n_threads_task_worker<Function>{function_, this};
scheduler->execute_task(task, depth());
return true;
}
template<typename Function>
run_on_n_threads_task<Function> create_run_on_n_threads_task(Function function, int num_threads) {
return run_on_n_threads_task<Function>{function, num_threads};
}
}
} }
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false;
}
};
template<typename Function>
bool run_on_n_threads_task<Function>::split_task(base::swmr_spin_lock *lock) {
if (get_counter() <= 0) {
return false;
}
// In success case, unlock.
lock->reader_unlock();
auto scheduler = base::this_thread::state<thread_state>()->scheduler_;
auto task = run_on_n_threads_task_worker<Function>{function_, this};
scheduler->execute_task(task, depth());
return true;
}
template<typename Function>
run_on_n_threads_task<Function> create_run_on_n_threads_task(Function function, int num_threads) {
return run_on_n_threads_task<Function>{function, num_threads};
}
}
}
} }
#endif //PLS_RUN_ON_N_THREADS_TASK_H #endif //PLS_RUN_ON_N_THREADS_TASK_H
...@@ -17,50 +17,52 @@ ...@@ -17,50 +17,52 @@
#include "scheduler_memory.h" #include "scheduler_memory.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>; void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler {
friend void worker_routine(); class scheduler {
friend void worker_routine();
const unsigned int num_threads_;
scheduler_memory* memory_; const unsigned int num_threads_;
scheduler_memory *memory_;
base::barrier sync_barrier_;
bool terminated_; base::barrier sync_barrier_;
public: bool terminated_;
explicit scheduler(scheduler_memory* memory, unsigned int num_threads); public:
~scheduler(); explicit scheduler(scheduler_memory *memory, unsigned int num_threads);
~scheduler();
/**
* Wakes up the thread pool. /**
* Code inside the Function lambda can invoke all parallel APIs. * Wakes up the thread pool.
* * Code inside the Function lambda can invoke all parallel APIs.
* @param work_section generic function or lambda to be executed in the scheduler's context. *
*/ * @param work_section generic function or lambda to be executed in the scheduler's context.
template<typename Function> */
void perform_work(Function work_section); template<typename Function>
void perform_work(Function work_section);
/**
* Executes a top-level-task (children of abstract_task) on this thread. /**
* * Executes a top-level-task (children of abstract_task) on this thread.
* @param task The task to be executed. *
* @param depth Optional: depth of the new task, otherwise set implicitly. * @param task The task to be executed.
*/ * @param depth Optional: depth of the new task, otherwise set implicitly.
template<typename Task> */
static void execute_task(Task& task, int depth=-1); template<typename Task>
static void execute_task(Task &task, int depth = -1);
static abstract_task* current_task() { return base::this_thread::state<thread_state>()->current_task_; }
static abstract_task *current_task() { return base::this_thread::state<thread_state>()->current_task_; }
void terminate(bool wait_for_workers=true);
void terminate(bool wait_for_workers = true);
unsigned int num_threads() const { return num_threads_; }
thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); } unsigned int num_threads() const { return num_threads_; }
}; thread_state *thread_state_for(size_t id) { return memory_->thread_state_for(id); }
} };
}
}
}
} }
#include "scheduler_impl.h" #include "scheduler_impl.h"
......
...@@ -3,70 +3,74 @@ ...@@ -3,70 +3,74 @@
#define PLS_SCHEDULER_IMPL_H #define PLS_SCHEDULER_IMPL_H
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
template<typename Function>
void scheduler::perform_work(Function work_section) { template<typename Function>
PROFILE_WORK_BLOCK("scheduler::perform_work") void scheduler::perform_work(Function work_section) {
root_task<Function> master{work_section}; PROFILE_WORK_BLOCK("scheduler::perform_work")
root_task<Function> master{work_section};
// Push root task on stacks
auto new_master = memory_->task_stack_for(0)->push(master); // Push root task on stacks
memory_->thread_state_for(0)->root_task_ = new_master; auto new_master = memory_->task_stack_for(0)->push(master);
memory_->thread_state_for(0)->current_task_ = new_master; memory_->thread_state_for(0)->root_task_ = new_master;
for (unsigned int i = 1; i < num_threads_; i++) { memory_->thread_state_for(0)->current_task_ = new_master;
root_worker_task<Function> worker{new_master}; for (unsigned int i = 1; i < num_threads_; i++) {
auto new_worker = memory_->task_stack_for(0)->push(worker); root_worker_task<Function> worker{new_master};
memory_->thread_state_for(i)->root_task_ = new_worker; auto new_worker = memory_->task_stack_for(0)->push(worker);
memory_->thread_state_for(i)->current_task_ = new_worker; memory_->thread_state_for(i)->root_task_ = new_worker;
} memory_->thread_state_for(i)->current_task_ = new_worker;
}
// Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up // Perform and wait for work
sync_barrier_.wait(); // Wait for threads to finish sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
// Clean up stack
memory_->task_stack_for(0)->pop<typeof(master)>(); // Clean up stack
for (unsigned int i = 1; i < num_threads_; i++) { memory_->task_stack_for(0)->pop<typeof(master)>();
root_worker_task<Function> worker{new_master}; for (unsigned int i = 1; i < num_threads_; i++) {
memory_->task_stack_for(0)->pop<typeof(worker)>(); root_worker_task<Function> worker{new_master};
} memory_->task_stack_for(0)->pop<typeof(worker)>();
} }
}
template<typename Task>
void scheduler::execute_task(Task& task, int depth) { template<typename Task>
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!"); void scheduler::execute_task(Task &task, int depth) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
auto my_state = base::this_thread::state<thread_state>();
abstract_task* old_task; auto my_state = base::this_thread::state<thread_state>();
abstract_task* new_task; abstract_task *old_task;
abstract_task *new_task;
// Init Task
{ // Init Task
std::lock_guard<base::spin_lock> lock{my_state->lock_}; old_task = my_state->current_task_;
old_task = my_state->current_task_; new_task = my_state->task_stack_->push(task);
new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
my_state->current_task_ = new_task; {
old_task->set_child(new_task); my_state->lock_.writer_lock();
} my_state->current_task_ = new_task;
old_task->set_child(new_task);
// Run Task my_state->lock_.writer_unlock();
new_task->execute(); }
// Teardown state back to before the task was executed // Run Task
{ new_task->execute();
std::lock_guard<base::spin_lock> lock{my_state->lock_};
// Teardown state back to before the task was executed
old_task->set_child(nullptr); my_state->task_stack_->pop<Task>();
my_state->current_task_ = old_task;
{
my_state->task_stack_->pop<Task>(); my_state->lock_.writer_lock();
} old_task->set_child(nullptr);
} my_state->current_task_ = old_task;
} my_state->lock_.writer_unlock();
} }
}
}
}
} }
#endif //PLS_SCHEDULER_IMPL_H #endif //PLS_SCHEDULER_IMPL_H
...@@ -7,72 +7,75 @@ ...@@ -7,72 +7,75 @@
#define PLS_SCHEDULER_MEMORY_H #define PLS_SCHEDULER_MEMORY_H
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler_memory { void worker_routine();
public: using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
virtual size_t max_threads() const = 0;
virtual thread_state* thread_state_for(size_t id) = 0;
virtual scheduler_thread* thread_for(size_t id) = 0;
virtual data_structures::aligned_stack* task_stack_for(size_t id) = 0;
};
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE> class scheduler_memory {
class static_scheduler_memory: public scheduler_memory { public:
// Everyone of these types has to live on its own cache line, virtual size_t max_threads() const = 0;
// as each thread uses one of them independently. virtual thread_state *thread_state_for(size_t id) = 0;
// Therefore it would be a major performance hit if we shared cache lines on these. virtual scheduler_thread *thread_for(size_t id) = 0;
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>; virtual data_structures::aligned_stack *task_stack_for(size_t id) = 0;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>; };
using aligned_thread_stack = base::alignment::aligned_wrapper<std::array<char, TASK_STACK_SIZE>>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
std::array<aligned_thread, MAX_THREADS> threads_; template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
std::array<aligned_thread_state, MAX_THREADS> thread_states_; class static_scheduler_memory : public scheduler_memory {
std::array<aligned_thread_stack, MAX_THREADS> task_stacks_memory_; // Everyone of these types has to live on its own cache line,
std::array<aligned_aligned_stack, MAX_THREADS> task_stacks_; // as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_thread_stack = base::alignment::aligned_wrapper<std::array<char, TASK_STACK_SIZE>>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
public: std::array<aligned_thread, MAX_THREADS> threads_;
static_scheduler_memory() { std::array<aligned_thread_state, MAX_THREADS> thread_states_;
for (size_t i = 0; i < MAX_THREADS; i++) { std::array<aligned_thread_stack, MAX_THREADS> task_stacks_memory_;
new ((void*)task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(), TASK_STACK_SIZE); std::array<aligned_aligned_stack, MAX_THREADS> task_stacks_;
}
}
size_t max_threads() const override { return MAX_THREADS; } public:
thread_state* thread_state_for(size_t id) override { return thread_states_[id].pointer(); } static_scheduler_memory() {
scheduler_thread* thread_for(size_t id) override { return threads_[id].pointer(); } for (size_t i = 0; i < MAX_THREADS; i++) {
data_structures::aligned_stack* task_stack_for(size_t id) override { return task_stacks_[id].pointer(); } new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(),
}; TASK_STACK_SIZE);
}
}
class malloc_scheduler_memory: public scheduler_memory { size_t max_threads() const override { return MAX_THREADS; }
// Everyone of these types has to live on its own cache line, thread_state *thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
// as each thread uses one of them independently. scheduler_thread *thread_for(size_t id) override { return threads_[id].pointer(); }
// Therefore it would be a major performance hit if we shared cache lines on these. data_structures::aligned_stack *task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>; };
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
const size_t num_threads_; class malloc_scheduler_memory : public scheduler_memory {
// Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
aligned_thread* threads_; const size_t num_threads_;
aligned_thread_state * thread_states_;
char** task_stacks_memory_;
aligned_aligned_stack * task_stacks_;
public:
explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16);
~malloc_scheduler_memory();
size_t max_threads() const override { return num_threads_; } aligned_thread *threads_;
thread_state* thread_state_for(size_t id) override { return thread_states_[id].pointer(); } aligned_thread_state *thread_states_;
scheduler_thread* thread_for(size_t id) override { return threads_[id].pointer(); } char **task_stacks_memory_;
data_structures::aligned_stack* task_stack_for(size_t id) override { return task_stacks_[id].pointer(); } aligned_aligned_stack *task_stacks_;
}; public:
} explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16);
} ~malloc_scheduler_memory();
size_t max_threads() const override { return num_threads_; }
thread_state *thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
scheduler_thread *thread_for(size_t id) override { return threads_[id].pointer(); }
data_structures::aligned_stack *task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
};
}
}
} }
#endif //PLS_SCHEDULER_MEMORY_H #endif //PLS_SCHEDULER_MEMORY_H
...@@ -5,41 +5,46 @@ ...@@ -5,41 +5,46 @@
#include <random> #include <random>
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/swmr_spin_lock.h"
#include "abstract_task.h" #include "abstract_task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
// forward declaration
class scheduler; // forward declaration
class scheduler;
struct thread_state {
scheduler* scheduler_; struct thread_state {
abstract_task* root_task_; alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_;
abstract_task* current_task_; alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *root_task_;
data_structures::aligned_stack* task_stack_; alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *current_task_;
size_t id_; alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_;
base::spin_lock lock_; alignas(base::system_details::CACHE_LINE_SIZE) size_t id_;
std::minstd_rand random_; alignas(base::system_details::CACHE_LINE_SIZE) base::swmr_spin_lock lock_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
thread_state():
scheduler_{nullptr}, thread_state() :
root_task_{nullptr}, scheduler_{nullptr},
current_task_{nullptr}, root_task_{nullptr},
task_stack_{nullptr}, current_task_{nullptr},
id_{0}, task_stack_{nullptr},
random_{id_} {}; id_{0},
lock_{},
thread_state(scheduler* scheduler, data_structures::aligned_stack* task_stack, unsigned int id): random_{id_} {};
scheduler_{scheduler},
root_task_{nullptr}, thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) :
current_task_{nullptr}, scheduler_{scheduler},
task_stack_{task_stack}, root_task_{nullptr},
id_{id}, current_task_{nullptr},
random_{id_} {} task_stack_{task_stack},
}; id_{id},
} lock_{},
} random_{id_} {}
};
}
}
} }
#endif //PLS_THREAD_STATE_H #endif //PLS_THREAD_STATE_H
...@@ -8,18 +8,22 @@ ...@@ -8,18 +8,22 @@
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
namespace pls { namespace pls {
using internal::scheduling::static_scheduler_memory;
using internal::scheduling::malloc_scheduler_memory;
using internal::scheduling::scheduler; using internal::scheduling::static_scheduler_memory;
using task_id = internal::scheduling::abstract_task::id; using internal::scheduling::malloc_scheduler_memory;
using unique_id = internal::helpers::unique_id; using internal::scheduling::scheduler;
using task_id = internal::scheduling::abstract_task::id;
using internal::scheduling::fork_join_sub_task; using unique_id = internal::helpers::unique_id;
using internal::scheduling::fork_join_task;
using internal::scheduling::fork_join_sub_task;
using internal::scheduling::fork_join_lambda_by_reference;
using internal::scheduling::fork_join_lambda_by_value;
using internal::scheduling::fork_join_task;
using algorithm::invoke_parallel;
using algorithm::invoke_parallel;
} }
#endif #endif
...@@ -2,26 +2,37 @@ ...@@ -2,26 +2,37 @@
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
namespace alignment { namespace alignment {
void* allocate_aligned(size_t size) {
return aligned_alloc(system_details::CACHE_LINE_SIZE, size);
}
std::uintptr_t next_alignment(std::uintptr_t size) { void *allocate_aligned(size_t size) {
std::uintptr_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE; return aligned_alloc(system_details::CACHE_LINE_SIZE, size);
if (miss_alignment == 0) { }
return size;
} else { system_details::pointer_t next_alignment(system_details::pointer_t size) {
return size + (base::system_details::CACHE_LINE_SIZE - miss_alignment); system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
} if (miss_alignment == 0) {
} return size;
} else {
return size + (base::system_details::CACHE_LINE_SIZE - miss_alignment);
}
}
system_details::pointer_t previous_alignment(system_details::pointer_t size) {
system_details::pointer_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
return size - miss_alignment;
}
}
char* next_alignment(char* pointer) { char *next_alignment(char *pointer) {
return reinterpret_cast<char*>(next_alignment(reinterpret_cast<std::uintptr_t >(pointer))); return reinterpret_cast<char *>(next_alignment(reinterpret_cast<system_details::pointer_t >(pointer)));
} }
}
} }
} }
}
} }
#include "pls/internal/base/barrier.h" #include "pls/internal/base/barrier.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
barrier::barrier(const unsigned int count): barrier_{} {
pthread_barrier_init(&barrier_, nullptr, count);
}
barrier::~barrier() { barrier::barrier(const unsigned int count) : barrier_{} {
pthread_barrier_destroy(&barrier_); pthread_barrier_init(&barrier_, nullptr, count);
} }
barrier::~barrier() {
pthread_barrier_destroy(&barrier_);
}
void barrier::wait() { void barrier::wait() {
pthread_barrier_wait(&barrier_); pthread_barrier_wait(&barrier_);
} }
}
} }
}
} }
#include "pls/internal/base/swmr_spin_lock.h"
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace base {
bool swmr_spin_lock::reader_try_lock() {
PROFILE_LOCK("Try Acquire Read Lock")
if (write_request_.load(std::memory_order_acquire) == 1) {
return false;
}
// We think we can enter the region
readers_.fetch_add(1, std::memory_order_acquire);
if (write_request_.load(std::memory_order_acquire) == 1) {
// Whoops, the writer acquires the lock, so we back off again
readers_.fetch_add(-1, std::memory_order_release);
return false;
}
return true;
}
void swmr_spin_lock::reader_unlock() {
PROFILE_LOCK("Release Read Lock")
readers_--;
}
void swmr_spin_lock::writer_lock() {
PROFILE_LOCK("Acquire Write Lock")
// Tell the readers that we would like to write
write_request_ = 1;
// Wait for all of them to exit the critical section
while (readers_ > 0)
system_details::relax_cpu(); // Spin, not expensive as relaxed load
}
void swmr_spin_lock::writer_unlock() {
PROFILE_LOCK("Release Write Lock")
write_request_ = 0;
}
}
}
}
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/tas_spin_lock.h" #include "pls/internal/base/tas_spin_lock.h"
#include "pls/internal/base/backoff.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
void tas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock") void tas_spin_lock::lock() {
int tries = 0; PROFILE_LOCK("Acquire Lock")
while (flag_.test_and_set(std::memory_order_acquire)) { backoff backoff_strategy;
tries++;
if (tries % yield_at_tries_ == 0) { while (true) {
this_thread::yield(); if (flag_.test_and_set(std::memory_order_acquire) == 0) {
} return;
} }
} backoff_strategy.do_backoff();
}
bool tas_spin_lock::try_lock(unsigned int num_tries) { }
PROFILE_LOCK("Try Acquire Lock")
while (flag_.test_and_set(std::memory_order_acquire)) { bool tas_spin_lock::try_lock(unsigned int num_tries) {
num_tries--; PROFILE_LOCK("Try Acquire Lock")
if (num_tries <= 0) { backoff backoff_strategy;
return false;
} while (true) {
} if (flag_.test_and_set(std::memory_order_acquire) == 0) {
return true; return true;
} }
void tas_spin_lock::unlock() { num_tries--;
flag_.clear(std::memory_order_release); if (num_tries <= 0) {
} return false;
}
} }
backoff_strategy.do_backoff();
}
}
void tas_spin_lock::unlock() {
PROFILE_LOCK("Unlock")
flag_.clear(std::memory_order_release);
}
}
}
} }
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD #ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_key_t this_thread::local_storage_key_ = false; pthread_key_t this_thread::local_storage_key_ = false;
bool this_thread::local_storage_key_initialized_; bool this_thread::local_storage_key_initialized_;
#endif #endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER #ifdef PLS_THREAD_SPECIFIC_COMPILER
__thread void* this_thread::local_state_; __thread void *this_thread::local_state_;
#endif #endif
// implementation in header (C++ templating) // implementation in header (C++ templating)
}
} }
}
} }
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/ttas_spin_lock.h" #include "pls/internal/base/ttas_spin_lock.h"
#include "pls/internal/base/backoff.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
void ttas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock") void ttas_spin_lock::lock() {
int tries = 0; PROFILE_LOCK("Acquire Lock")
int expected = 0; int expected = 0;
backoff backoff_;
do {
while (flag_.load(std::memory_order_relaxed) == 1) { while (true) {
tries++; while (flag_.load(std::memory_order_relaxed) == 1)
if (tries % yield_at_tries_ == 0) { system_details::relax_cpu(); // Spin
this_thread::yield();
} expected = 0;
} if (flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)) {
return;
expected = 0; }
} while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)); backoff_.do_backoff();
} }
}
bool ttas_spin_lock::try_lock(unsigned int num_tries) {
PROFILE_LOCK("Try Acquire Lock") bool ttas_spin_lock::try_lock(unsigned int num_tries) {
int expected = 0; PROFILE_LOCK("Try Acquire Lock")
int expected = 0;
do { backoff backoff_;
while (flag_.load(std::memory_order_relaxed) == 1) {
num_tries--; while (true) {
if (num_tries <= 0) { while (flag_.load() == 1) {
return false; num_tries--;
} if (num_tries <= 0) {
} return false;
}
expected = 0; system_details::relax_cpu();
} while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)); }
return true; expected = 0;
} if (flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire)) {
return true;
void ttas_spin_lock::unlock() {
flag_.store(0, std::memory_order_release);
}
}
} }
num_tries--;
if (num_tries <= 0) {
return false;
}
backoff_.do_backoff();
}
}
void ttas_spin_lock::unlock() {
PROFILE_LOCK("Unlock")
flag_.store(0, std::memory_order_release);
}
}
}
} }
...@@ -2,12 +2,19 @@ ...@@ -2,12 +2,19 @@
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
aligned_stack::aligned_stack(char* memory_region, const std::size_t size):
memory_start_{memory_region}, aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) :
memory_end_{memory_region + size}, memory_start_{memory_region},
head_{base::alignment::next_alignment(memory_start_)} {} memory_end_{memory_region + size},
} head_{base::alignment::next_alignment(memory_start_)} {}
}
aligned_stack::aligned_stack(char *memory_region, const std::size_t size) :
memory_start_{(pointer_t) memory_region},
memory_end_{(pointer_t) memory_region + size},
head_{base::alignment::next_alignment(memory_start_)} {}
}
}
} }
...@@ -3,56 +3,58 @@ ...@@ -3,56 +3,58 @@
#include "pls/internal/data_structures/deque.h" #include "pls/internal/data_structures/deque.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
deque_item* deque_internal::pop_head_internal() {
std::lock_guard<base::spin_lock> lock{lock_}; deque_item *deque_internal::pop_head_internal() {
std::lock_guard<base::spin_lock> lock{lock_};
if (head_ == nullptr) {
return nullptr; if (head_ == nullptr) {
} return nullptr;
}
deque_item* result = head_;
head_ = head_->prev_; deque_item *result = head_;
if (head_ == nullptr) { head_ = head_->next_;
tail_ = nullptr; if (head_ == nullptr) {
} else { tail_ = nullptr;
head_->next_ = nullptr; } else {
} head_->prev_ = nullptr;
}
return result;
} return result;
}
deque_item* deque_internal::pop_tail_internal() {
std::lock_guard<base::spin_lock> lock{lock_}; deque_item *deque_internal::pop_tail_internal() {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ == nullptr) {
return nullptr; if (tail_ == nullptr) {
} return nullptr;
}
deque_item* result = tail_;
tail_ = tail_->next_; deque_item *result = tail_;
if (tail_ == nullptr) { tail_ = tail_->prev_;
head_ = nullptr; if (tail_ == nullptr) {
} else { head_ = nullptr;
tail_->prev_ = nullptr; } else {
} tail_->next_ = nullptr;
}
return result;
} return result;
}
void deque_internal::push_tail_internal(deque_item *new_item) {
std::lock_guard<base::spin_lock> lock{lock_}; void deque_internal::push_tail_internal(deque_item *new_item) {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ != nullptr) {
tail_->prev_ = new_item; if (tail_ != nullptr) {
} else { tail_->next_ = new_item;
head_ = new_item; } else {
} head_ = new_item;
new_item->next_ = tail_; }
new_item->prev_ = nullptr; new_item->prev_ = tail_;
tail_ = new_item; new_item->next_ = nullptr;
} tail_ = new_item;
} }
}
}
}
} }
#include <pls/internal/base/backoff.h>
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
...@@ -5,72 +6,80 @@ ...@@ -5,72 +6,80 @@
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
bool abstract_task::steal_work() {
PROFILE_STEALING("abstract_task::steal_work")
const auto my_state = base::this_thread::state<thread_state>();
const auto my_scheduler = my_state->scheduler_;
const size_t my_id = my_state->id_; bool abstract_task::steal_work() {
const size_t offset = my_state->random_() % my_scheduler->num_threads(); thread_local static base::backoff backoff{};
const size_t max_tries = 1; // my_scheduler->num_threads(); TODO: Tune this value
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % my_scheduler->num_threads();
if (target == my_id) {
continue;
}
auto target_state = my_scheduler->thread_state_for(target);
// TODO: Cleaner Locking Using std::guarded_lock PROFILE_STEALING("abstract_task::steal_work")
target_state->lock_.lock(); const auto my_state = base::this_thread::state<thread_state>();
const auto my_scheduler = my_state->scheduler_;
// Dig down to our level const size_t my_id = my_state->id_;
PROFILE_STEALING("Go to our level") const size_t offset = my_state->random_() % my_scheduler->num_threads();
abstract_task* current_task = target_state->root_task_; const size_t max_tries = my_scheduler->num_threads() - 1; // TODO: Tune this value
while (current_task != nullptr && current_task->depth() < depth()) { for (size_t i = 0; i < max_tries; i++) {
current_task = current_task->child_task_; size_t target = (offset + i) % my_scheduler->num_threads();
} if (target == my_id) {
PROFILE_END_BLOCK target = (target + 1) % my_scheduler->num_threads();
}
auto target_state = my_scheduler->thread_state_for(target);
// Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation if (!target_state->lock_.reader_try_lock()) {
PROFILE_STEALING("Internal Steal") continue;
if (current_task != nullptr) { }
// See if it equals our type and depth of task
if (current_task->unique_id_ == unique_id_ &&
current_task->depth_ == depth_) {
if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler
target_state->lock_.unlock();
return true;
}
// No success, we need to steal work from a deeper level using 'top level task stealing' // Dig down to our level
current_task = current_task->child_task_; PROFILE_STEALING("Go to our level")
} abstract_task *current_task = target_state->root_task_;
} while (current_task != nullptr && current_task->depth() < depth()) {
PROFILE_END_BLOCK; current_task = current_task->child();
}
PROFILE_END_BLOCK
// Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation
PROFILE_STEALING("Internal Steal")
if (current_task != nullptr) {
// See if it equals our type and depth of task
if (current_task->unique_id_ == unique_id_ &&
current_task->depth_ == depth_) {
if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler
target_state->lock_.reader_unlock();
backoff.reset();
return true;
}
// Execute 'top level task steal' if possible // No success, we need to steal work from a deeper level using 'top level task stealing'
// (only try deeper tasks to keep depth restricted stealing). current_task = current_task->child();
PROFILE_STEALING("Top Level Steal") }
while (current_task != nullptr) { }
auto lock = &target_state->lock_; PROFILE_END_BLOCK;
if (current_task->split_task(lock)) {
// internal steal was no success (we did a top level task steal)
return false;
}
current_task = current_task->child_task_;
}
PROFILE_END_BLOCK;
target_state->lock_.unlock();
}
// internal steal was no success // Execute 'top level task steal' if possible
return false; // (only try deeper tasks to keep depth restricted stealing).
}; PROFILE_STEALING("Top Level Steal")
} while (current_task != nullptr) {
auto lock = &target_state->lock_;
if (current_task->split_task(lock)) {
// top level steal was a success (we did a top level task steal)
backoff.reset();
return false;
}
current_task = current_task->child_task_;
} }
PROFILE_END_BLOCK;
target_state->lock_.reader_unlock();
}
// internal steal was no success
backoff.do_backoff();
return false;
}
}
}
} }
...@@ -4,131 +4,126 @@ ...@@ -4,131 +4,126 @@
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/fork_join_task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
fork_join_sub_task::fork_join_sub_task():
data_structures::deque_item{}, fork_join_sub_task::fork_join_sub_task() :
ref_count_{0}, ref_count_{0},
parent_{nullptr}, parent_{nullptr},
tbb_task_{nullptr}, tbb_task_{nullptr},
stack_state_{nullptr} {} deque_state_{0} {}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other): fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) :
data_structures::deque_item(other), ref_count_{0},
ref_count_{0}, parent_{other.parent_},
parent_{nullptr}, tbb_task_{other.tbb_task_},
tbb_task_{nullptr}, deque_state_{other.deque_state_} {}
stack_state_{nullptr} {}
void fork_join_sub_task::execute() {
void fork_join_sub_task::execute() { PROFILE_WORK_BLOCK("execute sub_task")
PROFILE_WORK_BLOCK("execute sub_task") auto last_executing = tbb_task_->currently_executing_;
tbb_task_->currently_executing_ = this; tbb_task_->currently_executing_ = this;
execute_internal(); execute_internal();
tbb_task_->currently_executing_ = nullptr; tbb_task_->currently_executing_ = last_executing;
PROFILE_END_BLOCK PROFILE_END_BLOCK
wait_for_all(); wait_for_all();
if (parent_ != nullptr) { if (parent_ != nullptr) {
parent_->ref_count_--; parent_->ref_count_--;
} }
} }
void fork_join_sub_task::spawn_child_internal(fork_join_sub_task* sub_task) { void fork_join_sub_task::wait_for_all() {
// Keep our refcount up to date while (ref_count_ > 0) {
ref_count_++; PROFILE_STEALING("get local sub task")
fork_join_sub_task *local_task = tbb_task_->get_local_sub_task();
// Assign forced values PROFILE_END_BLOCK
sub_task->parent_ = this; if (local_task != nullptr) {
sub_task->tbb_task_ = tbb_task_; local_task->execute();
sub_task->stack_state_ = tbb_task_->my_stack_->save_state(); } else {
// Try to steal work.
tbb_task_->deque_.push_tail(sub_task); // External steal will be executed implicitly if success
} PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work();
void fork_join_sub_task::wait_for_all() { PROFILE_END_BLOCK
while (ref_count_ > 0) { if (internal_steal_success) {
PROFILE_STEALING("get local sub task") tbb_task_->last_stolen_->execute();
fork_join_sub_task* local_task = tbb_task_->get_local_sub_task(); }
PROFILE_END_BLOCK
if (local_task != nullptr) {
local_task->execute();
} else {
// Try to steal work.
// External steal will be executed implicitly if success
PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work();
PROFILE_END_BLOCK
if (internal_steal_success) {
tbb_task_->last_stolen_->execute();
}
}
}
tbb_task_->my_stack_->reset_state(stack_state_);
}
fork_join_sub_task* fork_join_task::get_local_sub_task() {
return deque_.pop_tail();
}
fork_join_sub_task* fork_join_task::get_stolen_sub_task() {
return deque_.pop_head();
}
bool fork_join_task::internal_stealing(abstract_task* other_task) {
PROFILE_STEALING("fork_join_task::internal_stealin")
auto cast_other_task = reinterpret_cast<fork_join_task*>(other_task);
auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
} else {
// Make sub-task belong to our fork_join_task instance
stolen_sub_task->tbb_task_ = this;
stolen_sub_task->stack_state_ = my_stack_->save_state();
// We will execute this next without explicitly moving it onto our stack storage
last_stolen_ = stolen_sub_task;
return true;
}
}
bool fork_join_task::split_task(base::spin_lock* lock) {
PROFILE_STEALING("fork_join_task::split_task")
fork_join_sub_task* stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
}
fork_join_task task{stolen_sub_task, this->unique_id()};
// In success case, unlock.
// TODO: this locking is complicated and error prone.
lock->unlock();
scheduler::execute_task(task, depth());
return true;
}
void fork_join_task::execute() {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
root_task_->tbb_task_ = this;
root_task_->stack_state_ = my_stack_->save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
}
fork_join_sub_task* fork_join_task::currently_executing() const { return currently_executing_; }
fork_join_task::fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id):
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr},
deque_{},
last_stolen_{nullptr} {};
}
} }
}
tbb_task_->deque_.release_memory_until(deque_state_);
}
fork_join_sub_task *fork_join_task::get_local_sub_task() {
return deque_.pop_tail();
}
fork_join_sub_task *fork_join_task::get_stolen_sub_task() {
return deque_.pop_head();
}
fork_join_sub_task *fork_join_sub_task::current() {
return dynamic_cast<fork_join_task *>(scheduler::current_task())->currently_executing();
}
bool fork_join_task::internal_stealing(abstract_task *other_task) {
PROFILE_STEALING("fork_join_task::internal_stealin")
auto cast_other_task = reinterpret_cast<fork_join_task *>(other_task);
auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
} else {
// Make sub-task belong to our fork_join_task instance
stolen_sub_task->tbb_task_ = this;
stolen_sub_task->deque_state_ = deque_.save_state();
// We will execute this next without explicitly moving it onto our stack storage
last_stolen_ = stolen_sub_task;
return true;
}
}
bool fork_join_task::split_task(base::swmr_spin_lock *lock) {
PROFILE_STEALING("fork_join_task::split_task")
fork_join_sub_task *stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
}
fork_join_task task{stolen_sub_task, this->unique_id()};
// In success case, unlock.
lock->reader_unlock();
scheduler::execute_task(task, depth());
return true;
}
void fork_join_task::execute() {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
// TODO: See if we did this right
// my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
deque_.reset_base_pointer();
root_task_->tbb_task_ = this;
root_task_->deque_state_ = deque_.save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
}
fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; }
fork_join_task::fork_join_task(fork_join_sub_task *root_task,
const abstract_task::id &id) :
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
deque_{base::this_thread::state<thread_state>()->task_stack_},
last_stolen_{nullptr} {}
}
}
} }
#include "pls/internal/scheduling/root_task.h" #include "pls/internal/scheduling/root_task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
} }
} }
} }
#include "pls/internal/scheduling/run_on_n_threads_task.h" #include "pls/internal/scheduling/run_on_n_threads_task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
} }
} }
} }
...@@ -2,60 +2,63 @@ ...@@ -2,60 +2,63 @@
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
scheduler::scheduler(scheduler_memory* memory, const unsigned int num_threads):
num_threads_{num_threads}, scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) :
memory_{memory}, num_threads_{num_threads},
sync_barrier_{num_threads + 1}, memory_{memory},
terminated_{false} { sync_barrier_{num_threads + 1},
if (num_threads_ > memory_->max_threads()) { terminated_{false} {
PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); if (num_threads_ > memory_->max_threads()) {
} PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
}
for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized. for (unsigned int i = 0; i < num_threads_; i++) {
new((void*)memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i}; // Placement new is required, as the memory of `memory_` is not required to be initialized.
new ((void*)memory_->thread_for(i))base::thread<void(*)(), thread_state>(&worker_routine, memory_->thread_state_for(i)); new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
} new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&worker_routine,
} memory_->thread_state_for(i));
}
scheduler::~scheduler() { }
terminate();
} scheduler::~scheduler() {
terminate();
void worker_routine() { }
auto my_state = base::this_thread::state<thread_state>();
void worker_routine() {
while (true) { auto my_state = base::this_thread::state<thread_state>();
my_state->scheduler_->sync_barrier_.wait();
if (my_state->scheduler_->terminated_) { while (true) {
return; my_state->scheduler_->sync_barrier_.wait();
} if (my_state->scheduler_->terminated_) {
return;
// The root task must only return when all work is done, }
// because of this a simple call is enough to ensure the
// fork-join-section is done (logically joined back into our main thread). // The root task must only return when all work is done,
my_state->root_task_->execute(); // because of this a simple call is enough to ensure the
// fork-join-section is done (logically joined back into our main thread).
my_state->scheduler_->sync_barrier_.wait(); my_state->root_task_->execute();
}
} my_state->scheduler_->sync_barrier_.wait();
}
void scheduler::terminate(bool wait_for_workers) { }
if (terminated_) {
return; void scheduler::terminate(bool wait_for_workers) {
} if (terminated_) {
return;
terminated_ = true; }
sync_barrier_.wait();
terminated_ = true;
if (wait_for_workers) { sync_barrier_.wait();
for (unsigned int i = 0; i < num_threads_; i++) {
memory_->thread_for(i)->join(); if (wait_for_workers) {
} for (unsigned int i = 0; i < num_threads_; i++) {
} memory_->thread_for(i)->join();
}
}
} }
}
}
}
}
} }
#include "pls/internal/scheduling/scheduler_memory.h" #include "pls/internal/scheduling/scheduler_memory.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const size_t memory_per_stack):
num_threads_{num_threads} {
threads_ = reinterpret_cast<aligned_thread *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread)));
thread_states_ = reinterpret_cast<aligned_thread_state *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread_state)));
task_stacks_ = reinterpret_cast<aligned_aligned_stack *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_aligned_stack))); malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const size_t memory_per_stack) :
task_stacks_memory_ = reinterpret_cast<char**>(base::alignment::allocate_aligned(num_threads * sizeof(char*))); num_threads_{num_threads} {
for (size_t i = 0; i < num_threads_; i++) { threads_ =
task_stacks_memory_[i] = reinterpret_cast<char*>(base::alignment::allocate_aligned(memory_per_stack)); reinterpret_cast<aligned_thread *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread)));
new ((void*)task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack); thread_states_ = reinterpret_cast<aligned_thread_state *>(base::alignment::allocate_aligned(
} num_threads * sizeof(aligned_thread_state)));
}
malloc_scheduler_memory::~malloc_scheduler_memory() { task_stacks_ = reinterpret_cast<aligned_aligned_stack *>(base::alignment::allocate_aligned(
free(threads_); num_threads * sizeof(aligned_aligned_stack)));
free(thread_states_); task_stacks_memory_ = reinterpret_cast<char **>(base::alignment::allocate_aligned(num_threads * sizeof(char *)));
for (size_t i = 0; i < num_threads_; i++) {
task_stacks_memory_[i] = reinterpret_cast<char *>(base::alignment::allocate_aligned(memory_per_stack));
new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack);
}
}
malloc_scheduler_memory::~malloc_scheduler_memory() {
free(threads_);
free(thread_states_);
for (size_t i = 0; i < num_threads_; i++) {
free(task_stacks_memory_[i]);
}
free(task_stacks_);
free(task_stacks_memory_);
}
for (size_t i = 0; i < num_threads_; i++) { }
free(task_stacks_memory_[i]); }
}
free(task_stacks_);
free(task_stacks_memory_);
}
}
}
} }
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
} }
} }
} }
add_executable(tests add_executable(tests
main.cpp main.cpp
base_tests.cpp scheduling_tests.cpp data_structures_test.cpp) data_structures_test.cpp)
target_link_libraries(tests catch2 pls) target_link_libraries(tests catch2 pls)
...@@ -13,73 +13,73 @@ static bool base_tests_visited; ...@@ -13,73 +13,73 @@ static bool base_tests_visited;
static int base_tests_local_value_one; static int base_tests_local_value_one;
static vector<int> base_tests_local_value_two; static vector<int> base_tests_local_value_two;
TEST_CASE( "thread creation and joining", "[internal/data_structures/thread.h]") { TEST_CASE("thread creation and joining", "[internal/data_structures/thread.h]") {
base_tests_visited = false; base_tests_visited = false;
auto t1 = start_thread([]() { base_tests_visited = true; }); auto t1 = start_thread([]() { base_tests_visited = true; });
t1.join(); t1.join();
REQUIRE(base_tests_visited); REQUIRE(base_tests_visited);
} }
TEST_CASE( "thread state", "[internal/data_structures/thread.h]") { TEST_CASE("thread state", "[internal/data_structures/thread.h]") {
int state_one = 1; int state_one = 1;
vector<int> state_two{1, 2}; vector<int> state_two{1, 2};
auto t1 = start_thread([]() { base_tests_local_value_one = *this_thread::state<int>(); }, &state_one); auto t1 = start_thread([]() { base_tests_local_value_one = *this_thread::state<int>(); }, &state_one);
auto t2 = start_thread([]() { base_tests_local_value_two = *this_thread::state<vector<int>>(); }, &state_two); auto t2 = start_thread([]() { base_tests_local_value_two = *this_thread::state<vector<int>>(); }, &state_two);
t1.join(); t1.join();
t2.join(); t2.join();
REQUIRE(base_tests_local_value_one == 1); REQUIRE(base_tests_local_value_one == 1);
REQUIRE(base_tests_local_value_two == vector<int>{1, 2}); REQUIRE(base_tests_local_value_two == vector<int>{1, 2});
} }
int base_tests_shared_counter; int base_tests_shared_counter;
TEST_CASE( "spinlock protects concurrent counter", "[internal/data_structures/spinlock.h]") { TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spinlock.h]") {
constexpr int num_iterations = 1000000; constexpr int num_iterations = 1000000;
base_tests_shared_counter = 0; base_tests_shared_counter = 0;
spin_lock lock{}; spin_lock lock{};
SECTION( "lock can be used by itself" ) { SECTION("lock can be used by itself") {
auto t1 = start_thread([&]() { auto t1 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
lock.lock(); lock.lock();
base_tests_shared_counter++; base_tests_shared_counter++;
lock.unlock(); lock.unlock();
} }
}); });
auto t2 = start_thread([&]() { auto t2 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
lock.lock(); lock.lock();
base_tests_shared_counter--; base_tests_shared_counter--;
lock.unlock(); lock.unlock();
} }
}); });
t1.join(); t1.join();
t2.join(); t2.join();
REQUIRE(base_tests_shared_counter == 0); REQUIRE(base_tests_shared_counter == 0);
} }
SECTION( "lock can be used with std::lock_guard" ) { SECTION("lock can be used with std::lock_guard") {
auto t1 = start_thread([&]() { auto t1 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
std::lock_guard<spin_lock> my_lock{lock}; std::lock_guard<spin_lock> my_lock{lock};
base_tests_shared_counter++; base_tests_shared_counter++;
} }
}); });
auto t2 = start_thread([&]() { auto t2 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
std::lock_guard<spin_lock> my_lock{lock}; std::lock_guard<spin_lock> my_lock{lock};
base_tests_shared_counter--; base_tests_shared_counter--;
} }
}); });
t1.join(); t1.join();
t2.join(); t2.join();
REQUIRE(base_tests_shared_counter == 0); REQUIRE(base_tests_shared_counter == 0);
} }
} }
...@@ -4,76 +4,75 @@ ...@@ -4,76 +4,75 @@
using namespace pls; using namespace pls;
class once_sub_task: public fork_join_sub_task { class once_sub_task : public fork_join_sub_task {
std::atomic<int>* counter_; std::atomic<int> *counter_;
int children_; int children_;
protected: protected:
void execute_internal() override { void execute_internal() override {
(*counter_)++; (*counter_)++;
for (int i = 0; i < children_; i++) { for (int i = 0; i < children_; i++) {
spawn_child(once_sub_task(counter_, children_ - 1)); spawn_child(once_sub_task(counter_, children_ - 1));
}
} }
}
public: public:
explicit once_sub_task(std::atomic<int>* counter, int children): explicit once_sub_task(std::atomic<int> *counter, int children) :
fork_join_sub_task(), fork_join_sub_task(),
counter_{counter}, counter_{counter},
children_{children} {} children_{children} {}
}; };
class force_steal_sub_task: public fork_join_sub_task { class force_steal_sub_task : public fork_join_sub_task {
std::atomic<int>* parent_counter_; std::atomic<int> *parent_counter_;
std::atomic<int>* overall_counter_; std::atomic<int> *overall_counter_;
protected: protected:
void execute_internal() override { void execute_internal() override {
(*overall_counter_)--; (*overall_counter_)--;
if (overall_counter_->load() > 0) { if (overall_counter_->load() > 0) {
std::atomic<int> counter{1}; std::atomic<int> counter{1};
spawn_child(force_steal_sub_task(&counter, overall_counter_)); spawn_child(force_steal_sub_task(&counter, overall_counter_));
while (counter.load() > 0) while (counter.load() > 0); // Spin...
; // Spin...
}
(*parent_counter_)--;
} }
public: (*parent_counter_)--;
explicit force_steal_sub_task(std::atomic<int>* parent_counter, std::atomic<int>* overall_counter): }
fork_join_sub_task(),
parent_counter_{parent_counter}, public:
overall_counter_{overall_counter} {} explicit force_steal_sub_task(std::atomic<int> *parent_counter, std::atomic<int> *overall_counter) :
fork_join_sub_task(),
parent_counter_{parent_counter},
overall_counter_{overall_counter} {}
}; };
TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") { TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") {
malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; malloc_scheduler_memory my_scheduler_memory{8, 2 << 12};
SECTION("tasks are executed exactly once") { SECTION("tasks are executed exactly once") {
scheduler my_scheduler{&my_scheduler_memory, 2}; scheduler my_scheduler{&my_scheduler_memory, 2};
int start_counter = 4; int start_counter = 4;
int total_tasks = 1 + 4 + 4 * 3 + 4 * 3 * 2 + 4 * 3 * 2 * 1; int total_tasks = 1 + 4 + 4 * 3 + 4 * 3 * 2 + 4 * 3 * 2 * 1;
std::atomic<int> counter{0}; std::atomic<int> counter{0};
my_scheduler.perform_work([&] (){ my_scheduler.perform_work([&]() {
once_sub_task sub_task{&counter, start_counter}; once_sub_task sub_task{&counter, start_counter};
fork_join_task task{&sub_task, unique_id::create(42)}; fork_join_task task{&sub_task, unique_id::create(42)};
scheduler::execute_task(task); scheduler::execute_task(task);
}); });
REQUIRE(counter.load() == total_tasks); REQUIRE(counter.load() == total_tasks);
my_scheduler.terminate(true); my_scheduler.terminate(true);
} }
SECTION("tasks can be stolen") { SECTION("tasks can be stolen") {
scheduler my_scheduler{&my_scheduler_memory, 8}; scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([&] (){ my_scheduler.perform_work([&]() {
std::atomic<int> dummy_parent{1}, overall_counter{8}; std::atomic<int> dummy_parent{1}, overall_counter{8};
force_steal_sub_task sub_task{&dummy_parent, &overall_counter}; force_steal_sub_task sub_task{&dummy_parent, &overall_counter};
fork_join_task task{&sub_task, unique_id::create(42)}; fork_join_task task{&sub_task, unique_id::create(42)};
scheduler::execute_task(task); scheduler::execute_task(task);
}); });
my_scheduler.terminate(true); my_scheduler.terminate(true);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment