Commit aa270645 by FritzFlorian

Reformate code to fit GNU code formating style.

parent 3ff10baa
Pipeline #1157 passed with stages
in 3 minutes 36 seconds
......@@ -12,76 +12,75 @@ static constexpr int INPUT_SIZE = 2064;
typedef std::vector<std::complex<double>> complex_vector;
void divide(complex_vector::iterator data, int n) {
complex_vector tmp_odd_elements(n / 2);
for (int i = 0; i < n / 2; i++) {
tmp_odd_elements[i] = data[i * 2 + 1];
}
for (int i = 0; i < n / 2; i++) {
data[i] = data[i * 2];
}
for (int i = 0; i < n / 2; i++) {
data[i + n / 2] = tmp_odd_elements[i];
}
complex_vector tmp_odd_elements(n / 2);
for (int i = 0; i < n / 2; i++) {
tmp_odd_elements[i] = data[i * 2 + 1];
}
for (int i = 0; i < n / 2; i++) {
data[i] = data[i * 2];
}
for (int i = 0; i < n / 2; i++) {
data[i + n / 2] = tmp_odd_elements[i];
}
}
void combine(complex_vector::iterator data, int n) {
for (int i = 0; i < n / 2; i++) {
std::complex<double> even = data[i];
std::complex<double> odd = data[i + n / 2];
for (int i = 0; i < n / 2; i++) {
std::complex<double> even = data[i];
std::complex<double> odd = data[i + n / 2];
// w is the "twiddle-factor".
// this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
// so it won't impact the performance comparison.
std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n));
// w is the "twiddle-factor".
// this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
// so it won't impact the performance comparison.
std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n));
data[i] = even + w * odd;
data[i + n / 2] = even - w * odd;
}
data[i] = even + w * odd;
data[i + n / 2] = even - w * odd;
}
}
void fft(complex_vector::iterator data, int n) {
if (n < 2) {
return;
}
if (n < 2) {
return;
}
divide(data, n);
if (n <= CUTOFF) {
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else {
pls::invoke_parallel(
[&] { fft(data, n / 2); },
[&] { fft(data + n / 2, n / 2); }
);
}
combine(data, n);
divide(data, n);
if (n <= CUTOFF) {
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else {
pls::invoke_parallel(
[&] { fft(data, n / 2); },
[&] { fft(data + n / 2, n / 2); }
);
}
combine(data, n);
}
complex_vector prepare_input(int input_size) {
std::vector<double> known_frequencies{2, 11, 52, 88, 256};
complex_vector data(input_size);
std::vector<double> known_frequencies{2, 11, 52, 88, 256};
complex_vector data(input_size);
// Set our input data to match a time series of the known_frequencies.
// When applying fft to this time-series we should find these frequencies.
for (int i = 0; i < input_size; i++) {
data[i] = std::complex<double>(0.0, 0.0);
for (auto frequencie : known_frequencies) {
data[i] += sin(2 * M_PI * frequencie * i / input_size);
}
// Set our input data to match a time series of the known_frequencies.
// When applying fft to this time-series we should find these frequencies.
for (int i = 0; i < input_size; i++) {
data[i] = std::complex<double>(0.0, 0.0);
for (auto frequencie : known_frequencies) {
data[i] += sin(2 * M_PI * frequencie * i / input_size);
}
}
return data;
return data;
}
int main() {
PROFILE_ENABLE
complex_vector initial_input = prepare_input(INPUT_SIZE);
PROFILE_ENABLE
complex_vector initial_input = prepare_input(INPUT_SIZE);
pls::internal::helpers::run_mini_benchmark([&] {
complex_vector input = initial_input;
fft(input.begin(), input.size());
}, 8, 4000);
pls::internal::helpers::run_mini_benchmark([&] {
complex_vector input = initial_input;
fft(input.begin(), input.size());
}, 8, 4000);
PROFILE_SAVE("test_profile.prof")
PROFILE_SAVE("test_profile.prof")
}
......@@ -8,44 +8,44 @@ static pls::static_scheduler_memory<8, 2 << 14> my_scheduler_memory;
static constexpr int CUTOFF = 10;
long fib_serial(long n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib_serial(n - 1) + fib_serial(n - 2);
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib_serial(n - 1) + fib_serial(n - 2);
}
long fib(long n) {
if (n <= CUTOFF) {
return fib_serial(n);
}
// Actual 'invoke_parallel' logic/code
int left, right;
pls::invoke_parallel(
[&] { left = fib(n - 1); },
[&] { right = fib(n - 2); }
);
return left + right;
if (n <= CUTOFF) {
return fib_serial(n);
}
// Actual 'invoke_parallel' logic/code
int left, right;
pls::invoke_parallel(
[&] { left = fib(n - 1); },
[&] { right = fib(n - 2); }
);
return left + right;
}
int main() {
PROFILE_ENABLE
pls::scheduler scheduler{&my_scheduler_memory, 8};
long result;
scheduler.perform_work([&] {
PROFILE_MAIN_THREAD
// Call looks just the same, only requirement is
// the enclosure in the perform_work lambda.
for (int i = 0; i < 10; i++) {
result = fib(30);
std::cout << "Fib(30)=" << result << std::endl;
}
});
PROFILE_SAVE("test_profile.prof")
PROFILE_ENABLE
pls::scheduler scheduler{&my_scheduler_memory, 8};
long result;
scheduler.perform_work([&] {
PROFILE_MAIN_THREAD
// Call looks just the same, only requirement is
// the enclosure in the perform_work lambda.
for (int i = 0; i < 10; i++) {
result = fib(30);
std::cout << "Fib(30)=" << result << std::endl;
}
});
PROFILE_SAVE("test_profile.prof")
}
......@@ -10,8 +10,9 @@
#include <pls/internal/scheduling/root_task.h>
#include <pls/internal/helpers/unique_id.h>
int main() {
std::cout << pls::internal::scheduling::root_task<void(*)>::create_id().type_.hash_code() << std::endl;
std::cout << pls::internal::helpers::unique_id::create<pls::internal::scheduling::root_task<void(*)>>().type_.hash_code() << std::endl;
std::cout << pls::internal::scheduling::root_task<void (*)>::create_id().type_.hash_code() << std::endl;
std::cout
<< pls::internal::helpers::unique_id::create<pls::internal::scheduling::root_task<void (*)>>().type_.hash_code()
<< std::endl;
}
......@@ -5,9 +5,8 @@ using namespace pls::internal::base;
int global = 0;
int main() {
// Try to use every feature, to trigger the prohibited use of new if found somewhere
auto t1 = start_thread([] (){});
t1.join();
// Try to use every feature, to trigger the prohibited use of new if found somewhere
auto t1 = start_thread([]() {});
t1.join();
}
......@@ -6,15 +6,17 @@
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
namespace algorithm {
template<typename Function1, typename Function2>
void invoke_parallel(const Function1& function1, const Function2& function2);
namespace algorithm {
template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3);
template<typename Function1, typename Function2>
void invoke_parallel(const Function1 &function1, const Function2 &function2);
// ...and so on, add more if we decide to keep this design
}
template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3);
// ...and so on, add more if we decide to keep this design
}
}
#include "invoke_parallel_impl.h"
......
......@@ -7,65 +7,67 @@
#include "pls/internal/helpers/unique_id.h"
namespace pls {
namespace algorithm {
namespace internal {
using namespace ::pls::internal::scheduling;
namespace algorithm {
namespace internal {
template<typename Body>
inline void run_body(const Body& internal_body, const abstract_task::id& id) {
// Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
auto current_sub_task = reinterpret_cast<fork_join_task*>(current_task)->currently_executing();
internal_body(current_sub_task);
} else {
fork_join_lambda<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
using namespace ::pls::internal::scheduling;
template<typename Function1, typename Function2>
void invoke_parallel(const Function1& function1, const Function2& function2) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
static abstract_task::id id = unique_id::create<Function1, Function2>();
template<typename Body>
inline void run_body(const Body &internal_body, const abstract_task::id &id) {
// Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
auto current_sub_task = reinterpret_cast<fork_join_task *>(current_task)->currently_executing();
internal_body(current_sub_task);
} else {
fork_join_lambda<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
auto internal_body = [&] (fork_join_sub_task* this_task){
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
template<typename Function1, typename Function2>
void invoke_parallel(const Function1 &function1, const Function2 &function2) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
static abstract_task::id id = unique_id::create<Function1, Function2>();
this_task->spawn_child(sub_task_1);
function2(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
auto internal_body = [&](fork_join_sub_task *this_task) {
auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
internal::run_body(internal_body, id);
}
this_task->spawn_child(sub_task_1);
function2(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
internal::run_body(internal_body, id);
}
template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
static abstract_task::id id = unique_id::create<Function1, Function2, Function3>();
template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1 &function1, const Function2 &function2, const Function3 &function3) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
static abstract_task::id id = unique_id::create<Function1, Function2, Function3>();
auto internal_body = [&] (fork_join_sub_task* this_task){
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
auto sub_task_body_2 = [&] (fork_join_sub_task*){ function2(); };
auto sub_task_2 = fork_join_lambda<decltype(sub_task_body_2)>(&sub_task_body_2);
auto internal_body = [&](fork_join_sub_task *this_task) {
auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); };
auto sub_task_2 = fork_join_lambda<decltype(sub_task_body_2)>(&sub_task_body_2);
this_task->spawn_child(sub_task_1);
this_task->spawn_child(sub_task_2);
function3(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
this_task->spawn_child(sub_task_1);
this_task->spawn_child(sub_task_2);
function3(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
internal::run_body(internal_body, id);
}
}
internal::run_body(internal_body, id);
}
}
}
#endif //PLS_INVOKE_PARALLEL_IMPL_H
......@@ -8,21 +8,23 @@
#include "system_details.h"
namespace pls {
namespace internal {
namespace base {
namespace alignment {
template<typename T>
struct aligned_wrapper {
alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)];
T* pointer() { return reinterpret_cast<T*>(data); }
};
void* allocate_aligned(size_t size);
std::uintptr_t next_alignment(std::uintptr_t size);
char* next_alignment(char* pointer);
}
}
}
namespace internal {
namespace base {
namespace alignment {
template<typename T>
struct aligned_wrapper {
alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)];
T *pointer() { return reinterpret_cast<T *>(data); }
};
void *allocate_aligned(size_t size);
std::uintptr_t next_alignment(std::uintptr_t size);
char *next_alignment(char *pointer);
}
}
}
}
#endif //PLS_ALIGNMENT_H
......@@ -5,27 +5,29 @@
#include <pthread.h>
namespace pls {
namespace internal {
namespace base {
/**
* Provides standard barrier behaviour.
* `count` threads have to call `wait()` before any of the `wait()` calls returns,
* thus blocking all threads until everyone reached the barrier.
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
class barrier {
pthread_barrier_t barrier_;
public:
explicit barrier(unsigned int count);
~barrier();
void wait();
};
}
}
namespace internal {
namespace base {
/**
* Provides standard barrier behaviour.
* `count` threads have to call `wait()` before any of the `wait()` calls returns,
* thus blocking all threads until everyone reached the barrier.
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
class barrier {
pthread_barrier_t barrier_;
public:
explicit barrier(unsigned int count);
~barrier();
void wait();
};
}
}
}
#endif //PLS_BARRIER_H
......@@ -6,12 +6,14 @@
#include "ttas_spin_lock.h"
namespace pls {
namespace internal {
namespace base {
// Default Spin-Lock implementation for this project.
using spin_lock = tas_spin_lock;
}
}
namespace internal {
namespace base {
// Default Spin-Lock implementation for this project.
using spin_lock = tas_spin_lock;
}
}
}
#endif //PLS_SPINLOCK_H
......@@ -5,29 +5,31 @@
#include <cstdint>
namespace pls {
namespace internal {
namespace base {
/**
* Collection of system details, e.g. hardware cache line size.
*
* PORTABILITY:
* Currently sane default values for x86.
*/
namespace system_details {
/**
* Most processors have 64 byte cache lines
*/
constexpr std::uintptr_t CACHE_LINE_SIZE = 64;
namespace internal {
namespace base {
/**
* Choose one of the following ways to store thread specific data.
* Try to choose the fastest available on this processor/system.
*/
/**
* Collection of system details, e.g. hardware cache line size.
*
* PORTABILITY:
* Currently sane default values for x86.
*/
namespace system_details {
/**
* Most processors have 64 byte cache lines
*/
constexpr std::uintptr_t CACHE_LINE_SIZE = 64;
/**
* Choose one of the following ways to store thread specific data.
* Try to choose the fastest available on this processor/system.
*/
// #define PLS_THREAD_SPECIFIC_PTHREAD
#define PLS_THREAD_SPECIFIC_COMPILER
}
}
}
#define PLS_THREAD_SPECIFIC_COMPILER
}
}
}
}
#endif //PLS_SYSTEM_DETAILS_H
......@@ -10,30 +10,30 @@
#include "pls/internal/base/thread.h"
namespace pls {
namespace internal {
namespace base {
/**
* A simple set and test_and_set based spin lock implementation.
*
* PORTABILITY:
* Current implementation is based on C++ 11 atomic_flag.
*/
class tas_spin_lock {
std::atomic_flag flag_;
unsigned int yield_at_tries_;
public:
tas_spin_lock(): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {};
tas_spin_lock(const tas_spin_lock& other): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {}
void lock();
bool try_lock(unsigned int num_tries=1);
void unlock();
};
}
}
}
namespace internal {
namespace base {
/**
* A simple set and test_and_set based spin lock implementation.
*
* PORTABILITY:
* Current implementation is based on C++ 11 atomic_flag.
*/
class tas_spin_lock {
std::atomic_flag flag_;
unsigned int yield_at_tries_;
public:
tas_spin_lock() : flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {};
tas_spin_lock(const tas_spin_lock &other) : flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {}
void lock();
bool try_lock(unsigned int num_tries = 1);
void unlock();
};
}
}
}
#endif //PLS_TAS_SPIN_LOCK_H
......@@ -13,109 +13,112 @@
#include "system_details.h"
namespace pls {
namespace internal {
namespace base {
using thread_entrypoint = void();
/**
* Static methods than can be performed on the current thread.
*
* usage:
* this_thread::yield();
* T* state = this_thread::state<T>();
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
class this_thread {
template<typename Function, typename State>
friend class thread;
namespace internal {
namespace base {
using thread_entrypoint = void();
/**
* Static methods than can be performed on the current thread.
*
* usage:
* this_thread::yield();
* T* state = this_thread::state<T>();
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
class this_thread {
template<typename Function, typename State>
friend
class thread;
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
static pthread_key_t local_storage_key_;
static bool local_storage_key_initialized_;
static pthread_key_t local_storage_key_;
static bool local_storage_key_initialized_;
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
static __thread void* local_state_;
static __thread void *local_state_;
#endif
public:
static void yield() {
pthread_yield();
}
/**
* Retrieves the local state pointer.
*
* @tparam T The type of the state that is stored.
* @return The state pointer hold for this thread.
*/
template<typename T>
static T* state();
/**
* Stores a pointer to the thread local state object.
* The memory management for this has to be done by the user,
* we only keep the pointer.
*
* @tparam T The type of the state that is stored.
* @param state_pointer A pointer to the threads state object.
*/
template<typename T>
static void set_state(T* state_pointer);
};
/**
* Abstraction for starting a function in a separate thread.
*
* @tparam Function Lambda being started on the new thread.
* @tparam State State type held for this thread.
*
* usage:
* T* state;
* auto thread = start_thread([] {
* // Run on new thread
* }, state);
* thread.join(); // Wait for it to finish
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
template<typename Function, typename State>
class thread {
friend class this_thread;
// Keep a copy of the function (lambda) in this object to make sure it is valid when called!
Function function_;
State* state_pointer_;
// Wee need to wait for the started function to read
// the function_ and state_pointer_ property before returning
// from the constructor, as the object might be moved after this.
std::atomic_flag* startup_flag_;
// Keep handle to native implementation
pthread_t pthread_thread_;
static void* start_pthread_internal(void* thread_pointer);
public:
explicit thread(const Function& function, State* state_pointer);
public:
void join();
// make object move only
thread(thread&&) noexcept = default;
thread& operator=(thread&&) noexcept = default;
thread(const thread&) = delete;
thread& operator=(const thread&) = delete;
};
template<typename Function, typename State>
thread<Function, State> start_thread(const Function& function, State* state_pointer);
template<typename Function>
thread<Function, void> start_thread(const Function& function);
}
}
public:
static void yield() {
pthread_yield();
}
/**
* Retrieves the local state pointer.
*
* @tparam T The type of the state that is stored.
* @return The state pointer hold for this thread.
*/
template<typename T>
static T *state();
/**
* Stores a pointer to the thread local state object.
* The memory management for this has to be done by the user,
* we only keep the pointer.
*
* @tparam T The type of the state that is stored.
* @param state_pointer A pointer to the threads state object.
*/
template<typename T>
static void set_state(T *state_pointer);
};
/**
* Abstraction for starting a function in a separate thread.
*
* @tparam Function Lambda being started on the new thread.
* @tparam State State type held for this thread.
*
* usage:
* T* state;
* auto thread = start_thread([] {
* // Run on new thread
* }, state);
* thread.join(); // Wait for it to finish
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
template<typename Function, typename State>
class thread {
friend class this_thread;
// Keep a copy of the function (lambda) in this object to make sure it is valid when called!
Function function_;
State *state_pointer_;
// Wee need to wait for the started function to read
// the function_ and state_pointer_ property before returning
// from the constructor, as the object might be moved after this.
std::atomic_flag *startup_flag_;
// Keep handle to native implementation
pthread_t pthread_thread_;
static void *start_pthread_internal(void *thread_pointer);
public:
explicit thread(const Function &function, State *state_pointer);
public:
void join();
// make object move only
thread(thread &&) noexcept = default;
thread &operator=(thread &&) noexcept = default;
thread(const thread &) = delete;
thread &operator=(const thread &) = delete;
};
template<typename Function, typename State>
thread<Function, State> start_thread(const Function &function, State *state_pointer);
template<typename Function>
thread<Function, void> start_thread(const Function &function);
}
}
}
#include "thread_impl.h"
......
......@@ -3,86 +3,87 @@
#define PLS_THREAD_IMPL_H
namespace pls {
namespace internal {
namespace base {
template<typename T>
T* this_thread::state() {
namespace internal {
namespace base {
template<typename T>
T *this_thread::state() {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
return reinterpret_cast<T*>(pthread_getspecific(local_storage_key_));
return reinterpret_cast<T*>(pthread_getspecific(local_storage_key_));
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
return reinterpret_cast<T*>(local_state_);
return reinterpret_cast<T *>(local_state_);
#endif
}
}
template<typename T>
void this_thread::set_state(T* state_pointer) {
template<typename T>
void this_thread::set_state(T *state_pointer) {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
local_state_ = state_pointer;
local_state_ = state_pointer;
#endif
}
}
template<typename Function, typename State>
void* thread<Function, State>::start_pthread_internal(void* thread_pointer) {
auto my_thread = reinterpret_cast<thread*>(thread_pointer);
Function my_function_copy = my_thread->function_;
State* my_state_pointer_copy = my_thread->state_pointer_;
template<typename Function, typename State>
void *thread<Function, State>::start_pthread_internal(void *thread_pointer) {
auto my_thread = reinterpret_cast<thread *>(thread_pointer);
Function my_function_copy = my_thread->function_;
State *my_state_pointer_copy = my_thread->state_pointer_;
// Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more
// references to its memory location).
my_thread->startup_flag_->clear();
// Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more
// references to its memory location).
my_thread->startup_flag_->clear();
this_thread::set_state(my_state_pointer_copy);
my_function_copy();
this_thread::set_state(my_state_pointer_copy);
my_function_copy();
// Finished executing the user function
pthread_exit(nullptr);
}
// Finished executing the user function
pthread_exit(nullptr);
}
template<typename Function, typename State>
thread<Function, State>::thread(const Function& function, State* state_pointer):
function_{function},
state_pointer_{state_pointer},
startup_flag_{nullptr},
pthread_thread_{} {
template<typename Function, typename State>
thread<Function, State>::thread(const Function &function, State *state_pointer):
function_{function},
state_pointer_{state_pointer},
startup_flag_{nullptr},
pthread_thread_{} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
if (!this_thread::local_storage_key_initialized_) {
pthread_key_create(&this_thread::local_storage_key_, nullptr);
this_thread::local_storage_key_initialized_ = true;
}
if (!this_thread::local_storage_key_initialized_) {
pthread_key_create(&this_thread::local_storage_key_, nullptr);
this_thread::local_storage_key_initialized_ = true;
}
#endif
// We only need this during startup, will be destroyed when out of scope
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
startup_flag_ = &startup_flag;
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this));
while (startup_flag.test_and_set())
; // Busy waiting for the starting flag to clear
}
template<typename Function, typename State>
void thread<Function, State>::join() {
pthread_join(pthread_thread_, nullptr);
}
template<typename Function, typename State>
thread<Function, State> start_thread(const Function& function, State* state_pointer) {
return thread<Function, State>(function, state_pointer);
}
template<typename Function>
thread<Function, void> start_thread(const Function& function) {
return thread<Function, void>(function, nullptr);
}
}
}
// We only need this during startup, will be destroyed when out of scope
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
startup_flag_ = &startup_flag;
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *) (this));
while (startup_flag.test_and_set()); // Busy waiting for the starting flag to clear
}
template<typename Function, typename State>
void thread<Function, State>::join() {
pthread_join(pthread_thread_, nullptr);
}
template<typename Function, typename State>
thread<Function, State> start_thread(const Function &function, State *state_pointer) {
return thread<Function, State>(function, state_pointer);
}
template<typename Function>
thread<Function, void> start_thread(const Function &function) {
return thread<Function, void>(function, nullptr);
}
}
}
}
#endif //PLS_THREAD_IMPL_H
......@@ -8,30 +8,28 @@
#include "pls/internal/base/thread.h"
namespace pls {
namespace internal {
namespace base {
/**
* A simple set and test_and_set based spin lock implementation.
*
* PORTABILITY:
* Current implementation is based on C++ 11 atomic_flag.
*/
class ttas_spin_lock {
std::atomic<int> flag_;
const unsigned int yield_at_tries_;
public:
ttas_spin_lock(): flag_{0}, yield_at_tries_{1024} {};
ttas_spin_lock(const ttas_spin_lock& other): flag_{0}, yield_at_tries_{other.yield_at_tries_} {}
void lock();
bool try_lock(unsigned int num_tries=1);
void unlock();
};
}
}
namespace internal {
namespace base {
/**
* A simple set and test_and_set based spin lock implementation.
*
* PORTABILITY:
* Current implementation is based on C++ 11 atomic_flag.
*/
class ttas_spin_lock {
std::atomic<int> flag_;
const unsigned int yield_at_tries_;
public:
ttas_spin_lock() : flag_{0}, yield_at_tries_{1024} {};
ttas_spin_lock(const ttas_spin_lock &other) : flag_{0}, yield_at_tries_{other.yield_at_tries_} {}
void lock();
bool try_lock(unsigned int num_tries = 1);
void unlock();
};
}
}
}
#endif //PLS_TTAS_SPIN_LOCK_H
......@@ -9,45 +9,48 @@
#include "pls/internal/base/alignment.h"
namespace pls {
namespace internal {
namespace data_structures {
/**
* Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region.
* The objects will be stored aligned in the stack, making the storage cache friendly and very fast
* (as long as one can live with the stack restrictions).
*
* IMPORTANT: Does not call destructors on stored objects! Do not allocate resources in the objects!
*
* Usage:
* aligned_stack stack{pointer_to_memory, size_of_memory};
* T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack
* stack.pop<T>(); // Deconstruct the top object of type T
*/
class aligned_stack {
// Keep bounds of our memory block
char* memory_start_;
char* memory_end_;
// Current head will always be aligned to cache lines
char* head_;
public:
typedef char* state;
aligned_stack(): memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {};
aligned_stack(char* memory_region, std::size_t size);
template<typename T>
T* push(const T& object);
template<typename T>
void* push();
template<typename T>
T pop();
state save_state() const { return head_; }
void reset_state(state new_state) { head_ = new_state; }
};
}
}
namespace internal {
namespace data_structures {
/**
* Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region.
* The objects will be stored aligned in the stack, making the storage cache friendly and very fast
* (as long as one can live with the stack restrictions).
*
* IMPORTANT: Does not call destructors on stored objects! Do not allocate resources in the objects!
*
* Usage:
* aligned_stack stack{pointer_to_memory, size_of_memory};
* T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack
* stack.pop<T>(); // Deconstruct the top object of type T
*/
class aligned_stack {
// Keep bounds of our memory block
char *memory_start_;
char *memory_end_;
// Current head will always be aligned to cache lines
char *head_;
public:
typedef char *state;
aligned_stack() : memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {};
aligned_stack(char *memory_region, std::size_t size);
template<typename T>
T *push(const T &object);
template<typename T>
void *push();
template<typename T>
T pop();
state save_state() const { return head_; }
void reset_state(state new_state) { head_ = new_state; }
};
}
}
}
#include "aligned_stack_impl.h"
......
......@@ -3,34 +3,36 @@
#define PLS_ALIGNED_STACK_IMPL_H
namespace pls {
namespace internal {
namespace data_structures {
template<typename T>
T* aligned_stack::push(const T& object) {
// Copy-Construct
return new ((void*)push<T>())T(object);
}
template<typename T>
void* aligned_stack::push() {
void* result = reinterpret_cast<T*>(head_);
// Move head to next aligned position after new object
head_ = base::alignment::next_alignment(head_ + sizeof(T));
if (head_ >= memory_end_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
return result;
}
template<typename T>
T aligned_stack::pop() {
head_ = head_ - base::alignment::next_alignment(sizeof(T));
return *reinterpret_cast<T*>(head_);
}
}
}
namespace internal {
namespace data_structures {
template<typename T>
T *aligned_stack::push(const T &object) {
// Copy-Construct
return new((void *) push < T > ())T(object);
}
template<typename T>
void *aligned_stack::push() {
void *result = reinterpret_cast<T *>(head_);
// Move head to next aligned position after new object
head_ = base::alignment::next_alignment(head_ + sizeof(T));
if (head_ >= memory_end_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
return result;
}
template<typename T>
T aligned_stack::pop() {
head_ = head_ - base::alignment::next_alignment(sizeof(T));
return *reinterpret_cast<T *>(head_);
}
}
}
}
#endif //PLS_ALIGNED_STACK_IMPL_H
......@@ -5,56 +5,58 @@
#include "pls/internal/base/spin_lock.h"
namespace pls {
namespace internal {
namespace data_structures {
/**
* Turns any object into deque item when inheriting from this.
*/
class deque_item {
friend class deque_internal;
deque_item* prev_;
deque_item* next_;
};
class deque_internal {
protected:
deque_item* head_;
deque_item* tail_;
base::spin_lock lock_;
deque_item* pop_head_internal();
deque_item* pop_tail_internal();
void push_tail_internal(deque_item *new_item);
};
/**
* A double linked list based deque.
* Storage is therefore only needed for the individual items.
*
* @tparam Item The type of items stored in this deque
*/
template<typename Item>
class deque: deque_internal {
public:
explicit deque(): deque_internal{} {}
inline Item* pop_head() {
return static_cast<Item*>(pop_head_internal());
}
inline Item* pop_tail() {
return static_cast<Item*>(pop_tail_internal());
}
inline void push_tail(Item* new_item) {
push_tail_internal(new_item);
}
};
}
}
namespace internal {
namespace data_structures {
/**
* Turns any object into deque item when inheriting from this.
*/
class deque_item {
friend class deque_internal;
deque_item *prev_;
deque_item *next_;
};
class deque_internal {
protected:
deque_item *head_;
deque_item *tail_;
base::spin_lock lock_;
deque_item *pop_head_internal();
deque_item *pop_tail_internal();
void push_tail_internal(deque_item *new_item);
};
/**
* A double linked list based deque.
* Storage is therefore only needed for the individual items.
*
* @tparam Item The type of items stored in this deque
*/
template<typename Item>
class deque : deque_internal {
public:
explicit deque() : deque_internal{} {}
inline Item *pop_head() {
return static_cast<Item *>(pop_head_internal());
}
inline Item *pop_tail() {
return static_cast<Item *>(pop_tail_internal());
}
inline void push_tail(Item *new_item) {
push_tail_internal(new_item);
}
};
}
}
}
#endif //PLS_DEQUE_H
......@@ -9,45 +9,47 @@
#include <iostream>
namespace pls {
namespace internal {
namespace helpers {
// TODO: Clean up (separate into small functions and .cpp file)
template<typename Function>
void run_mini_benchmark(const Function& lambda, size_t max_threads, unsigned long max_runtime_ms=1000) {
using namespace std;
using namespace pls::internal::scheduling;
malloc_scheduler_memory scheduler_memory{max_threads};
for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) {
scheduler local_scheduler{&scheduler_memory, num_threads};
chrono::high_resolution_clock::time_point start_time;
chrono::high_resolution_clock::time_point end_time;
unsigned long iterations = 0;
local_scheduler.perform_work([&] {
start_time = chrono::high_resolution_clock::now();
end_time = start_time;
chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms);
while (end_time < planned_end_time) {
lambda();
end_time = chrono::high_resolution_clock::now();
iterations++;
}
});
long time = chrono::duration_cast<chrono::microseconds>(end_time - start_time).count();
double time_per_iteration = (double)time / iterations;
std::cout << time_per_iteration;
if (num_threads < max_threads) {
std::cout << ",";
}
}
std::cout << std::endl;
}
}
namespace internal {
namespace helpers {
// TODO: Clean up (separate into small functions and .cpp file)
template<typename Function>
void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned long max_runtime_ms = 1000) {
using namespace std;
using namespace pls::internal::scheduling;
malloc_scheduler_memory scheduler_memory{max_threads};
for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) {
scheduler local_scheduler{&scheduler_memory, num_threads};
chrono::high_resolution_clock::time_point start_time;
chrono::high_resolution_clock::time_point end_time;
unsigned long iterations = 0;
local_scheduler.perform_work([&] {
start_time = chrono::high_resolution_clock::now();
end_time = start_time;
chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms);
while (end_time < planned_end_time) {
lambda();
end_time = chrono::high_resolution_clock::now();
iterations++;
}
});
long time = chrono::duration_cast<chrono::microseconds>(end_time - start_time).count();
double time_per_iteration = (double) time / iterations;
std::cout << time_per_iteration;
if (num_threads < max_threads) {
std::cout << ",";
}
}
std::cout << std::endl;
}
}
}
}
#endif //PLS_MINI_BENCHMARK_H
......@@ -15,9 +15,9 @@
#ifdef NEW_LINK_ERROR
// This will cause a linker error if new is used in the code.
// We also exit if it is somehow still called.
inline void * operator new (std::size_t) {
extern int bare_new_erroneously_called();
exit(bare_new_erroneously_called() | 1);
inline void *operator new(std::size_t) {
extern int bare_new_erroneously_called();
exit(bare_new_erroneously_called() | 1);
}
#else
// Use this + debugging point to find out where we use a new
......
......@@ -7,25 +7,27 @@
#include <stdint.h>
namespace pls {
namespace internal {
namespace helpers {
struct unique_id {
const uint32_t id_;
const std::type_info& type_;
bool operator==(const unique_id& other) const { return id_ == other.id_ && type_ == other.type_; }
namespace internal {
namespace helpers {
static constexpr unique_id create(const uint32_t id) {
return unique_id(id, typeid(void));
}
template<typename ...T>
static constexpr unique_id create() {
return unique_id(UINT32_MAX, typeid(std::tuple<T...>));
}
private:
explicit constexpr unique_id(const uint32_t id, const std::type_info& type): id_{id}, type_{type} {};
};
}
}
struct unique_id {
const uint32_t id_;
const std::type_info &type_;
bool operator==(const unique_id &other) const { return id_ == other.id_ && type_ == other.type_; }
static constexpr unique_id create(const uint32_t id) {
return unique_id(id, typeid(void));
}
template<typename ...T>
static constexpr unique_id create() {
return unique_id(UINT32_MAX, typeid(std::tuple<T...>));
}
private:
explicit constexpr unique_id(const uint32_t id, const std::type_info &type) : id_{id}, type_{type} {};
};
}
}
}
#endif //PLS_UNIQUE_ID_H
......@@ -6,38 +6,40 @@
#include "pls/internal/helpers/unique_id.h"
namespace pls {
namespace internal {
namespace scheduling {
class abstract_task {
public:
using id = helpers::unique_id;
private:
unsigned int depth_;
abstract_task::id unique_id_;
abstract_task* child_task_;
public:
abstract_task(const unsigned int depth, const abstract_task::id& unique_id):
depth_{depth},
unique_id_{unique_id},
child_task_{nullptr} {}
virtual void execute() = 0;
void set_child(abstract_task* child_task) { child_task_ = child_task; }
abstract_task* child() { return child_task_; }
void set_depth(unsigned int depth) { depth_ = depth; }
unsigned int depth() const { return depth_; }
id unique_id() const { return unique_id_; }
protected:
virtual bool internal_stealing(abstract_task* other_task) = 0;
virtual bool split_task(base::spin_lock* lock) = 0;
bool steal_work();
};
}
}
namespace internal {
namespace scheduling {
class abstract_task {
public:
using id = helpers::unique_id;
private:
unsigned int depth_;
abstract_task::id unique_id_;
abstract_task *child_task_;
public:
abstract_task(const unsigned int depth, const abstract_task::id &unique_id) :
depth_{depth},
unique_id_{unique_id},
child_task_{nullptr} {}
virtual void execute() = 0;
void set_child(abstract_task *child_task) { child_task_ = child_task; }
abstract_task *child() { return child_task_; }
void set_depth(unsigned int depth) { depth_ = depth; }
unsigned int depth() const { return depth_; }
id unique_id() const { return unique_id_; }
protected:
virtual bool internal_stealing(abstract_task *other_task) = 0;
virtual bool split_task(base::spin_lock *lock) = 0;
bool steal_work();
};
}
}
}
#endif //PLS_ABSTRACT_TASK_H
......@@ -11,87 +11,89 @@
#include "thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
class fork_join_task;
class fork_join_sub_task: public data_structures::deque_item {
friend class fork_join_task;
// Coordinate finishing of sub_tasks
std::atomic_uint32_t ref_count_;
fork_join_sub_task* parent_;
// Access to TBB scheduling environment
fork_join_task* tbb_task_;
// Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::aligned_stack::state stack_state_;
protected:
explicit fork_join_sub_task();
fork_join_sub_task(const fork_join_sub_task& other);
// Overwritten with behaviour of child tasks
virtual void execute_internal() = 0;
public:
// Only use them when actually executing this sub_task (only public for simpler API design)
template<typename T>
void spawn_child(const T& sub_task);
void wait_for_all();
private:
void spawn_child_internal(fork_join_sub_task* sub_task);
void execute();
};
template<typename Function>
class fork_join_lambda: public fork_join_sub_task {
const Function* function_;
public:
explicit fork_join_lambda(const Function* function): function_{function} {};
protected:
void execute_internal() override {
(*function_)(this);
}
};
class fork_join_task: public abstract_task {
friend class fork_join_sub_task;
fork_join_sub_task* root_task_;
fork_join_sub_task* currently_executing_;
data_structures::aligned_stack* my_stack_;
// Double-Ended Queue management
data_structures::deque<fork_join_sub_task> deque_;
// Steal Management
fork_join_sub_task* last_stolen_;
fork_join_sub_task* get_local_sub_task();
fork_join_sub_task* get_stolen_sub_task();
bool internal_stealing(abstract_task* other_task) override;
bool split_task(base::spin_lock* /*lock*/) override;
public:
explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id);
void execute() override;
fork_join_sub_task* currently_executing() const;
};
template<typename T>
void fork_join_sub_task::spawn_child(const T& task) {
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<fork_join_sub_task, T>::value, "Only pass fork_join_sub_task subclasses!");
T* new_task = tbb_task_->my_stack_->push(task);
spawn_child_internal(new_task);
}
}
}
namespace internal {
namespace scheduling {
class fork_join_task;
class fork_join_sub_task : public data_structures::deque_item {
friend class fork_join_task;
// Coordinate finishing of sub_tasks
std::atomic_uint32_t ref_count_;
fork_join_sub_task *parent_;
// Access to TBB scheduling environment
fork_join_task *tbb_task_;
// Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::aligned_stack::state stack_state_;
protected:
explicit fork_join_sub_task();
fork_join_sub_task(const fork_join_sub_task &other);
// Overwritten with behaviour of child tasks
virtual void execute_internal() = 0;
public:
// Only use them when actually executing this sub_task (only public for simpler API design)
template<typename T>
void spawn_child(const T &sub_task);
void wait_for_all();
private:
void spawn_child_internal(fork_join_sub_task *sub_task);
void execute();
};
template<typename Function>
class fork_join_lambda : public fork_join_sub_task {
const Function *function_;
public:
explicit fork_join_lambda(const Function *function) : function_{function} {};
protected:
void execute_internal() override {
(*function_)(this);
}
};
class fork_join_task : public abstract_task {
friend class fork_join_sub_task;
fork_join_sub_task *root_task_;
fork_join_sub_task *currently_executing_;
data_structures::aligned_stack *my_stack_;
// Double-Ended Queue management
data_structures::deque<fork_join_sub_task> deque_;
// Steal Management
fork_join_sub_task *last_stolen_;
fork_join_sub_task *get_local_sub_task();
fork_join_sub_task *get_stolen_sub_task();
bool internal_stealing(abstract_task *other_task) override;
bool split_task(base::spin_lock * /*lock*/) override;
public:
explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id);
void execute() override;
fork_join_sub_task *currently_executing() const;
};
template<typename T>
void fork_join_sub_task::spawn_child(const T &task) {
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<fork_join_sub_task, T>::value, "Only pass fork_join_sub_task subclasses!");
T *new_task = tbb_task_->my_stack_->push(task);
spawn_child_internal(new_task);
}
}
}
}
#endif //PLS_TBB_LIKE_TASK_H
......@@ -10,71 +10,73 @@
#include "abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class root_task : public abstract_task {
Function function_;
std::atomic_uint8_t finished_;
public:
static constexpr auto create_id = helpers::unique_id::create<root_task<Function>>;
explicit root_task(Function function):
abstract_task{0, create_id()},
function_{function},
finished_{0} {}
root_task(const root_task& other):
abstract_task{0, create_id()},
function_{other.function_},
finished_{0} {}
bool finished() {
return finished_;
}
void execute() override {
PROFILE_WORK_BLOCK("execute root_task");
function_();
finished_ = 1;
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task(base::spin_lock* /*lock*/) override {
return false;
}
};
template<typename Function>
class root_worker_task : public abstract_task {
root_task<Function>* master_task_;
public:
static constexpr auto create_id = root_task<Function>::create_id;
explicit root_worker_task(root_task<Function>* master_task):
abstract_task{0, create_id()},
master_task_{master_task} {}
void execute() override {
PROFILE_WORK_BLOCK("execute root_task");
do {
steal_work();
} while (!master_task_->finished());
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task(base::spin_lock* /*lock*/) override {
return false;
}
};
}
}
namespace internal {
namespace scheduling {
template<typename Function>
class root_task : public abstract_task {
Function function_;
std::atomic_uint8_t finished_;
public:
static constexpr auto create_id = helpers::unique_id::create<root_task<Function>>;
explicit root_task(Function function) :
abstract_task{0, create_id()},
function_{function},
finished_{0} {}
root_task(const root_task &other) :
abstract_task{0, create_id()},
function_{other.function_},
finished_{0} {}
bool finished() {
return finished_;
}
void execute() override {
PROFILE_WORK_BLOCK("execute root_task");
function_();
finished_ = 1;
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::spin_lock * /*lock*/) override {
return false;
}
};
template<typename Function>
class root_worker_task : public abstract_task {
root_task<Function> *master_task_;
public:
static constexpr auto create_id = root_task<Function>::create_id;
explicit root_worker_task(root_task<Function> *master_task) :
abstract_task{0, create_id()},
master_task_{master_task} {}
void execute() override {
PROFILE_WORK_BLOCK("execute root_task");
do {
steal_work();
} while (!master_task_->finished());
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::spin_lock * /*lock*/) override {
return false;
}
};
}
}
}
#endif //PLS_ROOT_MASTER_TASK_H
......@@ -12,107 +12,110 @@
#include "scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class run_on_n_threads_task : public abstract_task {
template<typename F>
friend class run_on_n_threads_task_worker;
Function function_;
// Improvement: Remove lock and replace by atomic variable (performance)
int counter;
base::spin_lock counter_lock_;
int decrement_counter() {
std::lock_guard<base::spin_lock> lock{counter_lock_};
counter--;
return counter;
}
int get_counter() {
std::lock_guard<base::spin_lock> lock{counter_lock_};
return counter;
}
public:
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task<Function>>;
run_on_n_threads_task(Function function, int num_threads):
abstract_task{0, create_id()},
function_{function},
counter{num_threads - 1} {}
void execute() override {
// Execute our function ONCE
function_();
// Steal until we are finished (other threads executed)
do {
steal_work();
} while (get_counter() > 0);
std::cout << "Finished Master!" << std::endl;
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task(base::spin_lock* lock) override;
};
template<typename Function>
class run_on_n_threads_task_worker : public abstract_task {
Function function_;
run_on_n_threads_task<Function>* root_;
public:
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task_worker<Function>>;
run_on_n_threads_task_worker(Function function, run_on_n_threads_task<Function>* root):
abstract_task{0, create_id()},
function_{function},
root_{root} {}
void execute() override {
if (root_->decrement_counter() >= 0) {
function_();
std::cout << "Finished Worker!" << std::endl;
} else {
std::cout << "Abandoned Worker!" << std::endl;
}
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task(base::spin_lock* /*lock*/) override {
return false;
}
};
template<typename Function>
bool run_on_n_threads_task<Function>::split_task(base::spin_lock* lock) {
if (get_counter() <= 0) {
return false;
}
// In success case, unlock.
// TODO: this locking is complicated and error prone.
lock->unlock();
auto scheduler = base::this_thread::state<thread_state>()->scheduler_;
auto task = run_on_n_threads_task_worker<Function>{function_, this};
scheduler->execute_task(task, depth());
return true;
}
template<typename Function>
run_on_n_threads_task<Function> create_run_on_n_threads_task(Function function, int num_threads) {
return run_on_n_threads_task<Function>{function, num_threads};
}
}
namespace internal {
namespace scheduling {
template<typename Function>
class run_on_n_threads_task : public abstract_task {
template<typename F>
friend
class run_on_n_threads_task_worker;
Function function_;
// Improvement: Remove lock and replace by atomic variable (performance)
int counter;
base::spin_lock counter_lock_;
int decrement_counter() {
std::lock_guard<base::spin_lock> lock{counter_lock_};
counter--;
return counter;
}
int get_counter() {
std::lock_guard<base::spin_lock> lock{counter_lock_};
return counter;
}
public:
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task<Function>>;
run_on_n_threads_task(Function function, int num_threads) :
abstract_task{0, create_id()},
function_{function},
counter{num_threads - 1} {}
void execute() override {
// Execute our function ONCE
function_();
// Steal until we are finished (other threads executed)
do {
steal_work();
} while (get_counter() > 0);
std::cout << "Finished Master!" << std::endl;
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::spin_lock *lock) override;
};
template<typename Function>
class run_on_n_threads_task_worker : public abstract_task {
Function function_;
run_on_n_threads_task<Function> *root_;
public:
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task_worker<Function>>;
run_on_n_threads_task_worker(Function function, run_on_n_threads_task<Function> *root) :
abstract_task{0, create_id()},
function_{function},
root_{root} {}
void execute() override {
if (root_->decrement_counter() >= 0) {
function_();
std::cout << "Finished Worker!" << std::endl;
} else {
std::cout << "Abandoned Worker!" << std::endl;
}
}
bool internal_stealing(abstract_task * /*other_task*/) override {
return false;
}
bool split_task(base::spin_lock * /*lock*/) override {
return false;
}
};
template<typename Function>
bool run_on_n_threads_task<Function>::split_task(base::spin_lock *lock) {
if (get_counter() <= 0) {
return false;
}
// In success case, unlock.
// TODO: this locking is complicated and error prone.
lock->unlock();
auto scheduler = base::this_thread::state<thread_state>()->scheduler_;
auto task = run_on_n_threads_task_worker<Function>{function_, this};
scheduler->execute_task(task, depth());
return true;
}
template<typename Function>
run_on_n_threads_task<Function> create_run_on_n_threads_task(Function function, int num_threads) {
return run_on_n_threads_task<Function>{function, num_threads};
}
}
}
}
#endif //PLS_RUN_ON_N_THREADS_TASK_H
......@@ -17,50 +17,52 @@
#include "scheduler_memory.h"
namespace pls {
namespace internal {
namespace scheduling {
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler {
friend void worker_routine();
const unsigned int num_threads_;
scheduler_memory* memory_;
base::barrier sync_barrier_;
bool terminated_;
public:
explicit scheduler(scheduler_memory* memory, unsigned int num_threads);
~scheduler();
/**
* Wakes up the thread pool.
* Code inside the Function lambda can invoke all parallel APIs.
*
* @param work_section generic function or lambda to be executed in the scheduler's context.
*/
template<typename Function>
void perform_work(Function work_section);
/**
* Executes a top-level-task (children of abstract_task) on this thread.
*
* @param task The task to be executed.
* @param depth Optional: depth of the new task, otherwise set implicitly.
*/
template<typename Task>
static void execute_task(Task& task, int depth=-1);
static abstract_task* current_task() { return base::this_thread::state<thread_state>()->current_task_; }
void terminate(bool wait_for_workers=true);
unsigned int num_threads() const { return num_threads_; }
thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); }
};
}
}
namespace internal {
namespace scheduling {
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler {
friend void worker_routine();
const unsigned int num_threads_;
scheduler_memory *memory_;
base::barrier sync_barrier_;
bool terminated_;
public:
explicit scheduler(scheduler_memory *memory, unsigned int num_threads);
~scheduler();
/**
* Wakes up the thread pool.
* Code inside the Function lambda can invoke all parallel APIs.
*
* @param work_section generic function or lambda to be executed in the scheduler's context.
*/
template<typename Function>
void perform_work(Function work_section);
/**
* Executes a top-level-task (children of abstract_task) on this thread.
*
* @param task The task to be executed.
* @param depth Optional: depth of the new task, otherwise set implicitly.
*/
template<typename Task>
static void execute_task(Task &task, int depth = -1);
static abstract_task *current_task() { return base::this_thread::state<thread_state>()->current_task_; }
void terminate(bool wait_for_workers = true);
unsigned int num_threads() const { return num_threads_; }
thread_state *thread_state_for(size_t id) { return memory_->thread_state_for(id); }
};
}
}
}
#include "scheduler_impl.h"
......
......@@ -3,70 +3,72 @@
#define PLS_SCHEDULER_IMPL_H
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
void scheduler::perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work")
root_task<Function> master{work_section};
// Push root task on stacks
auto new_master = memory_->task_stack_for(0)->push(master);
memory_->thread_state_for(0)->root_task_ = new_master;
memory_->thread_state_for(0)->current_task_ = new_master;
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
auto new_worker = memory_->task_stack_for(0)->push(worker);
memory_->thread_state_for(i)->root_task_ = new_worker;
memory_->thread_state_for(i)->current_task_ = new_worker;
}
// Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
// Clean up stack
memory_->task_stack_for(0)->pop<typeof(master)>();
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
memory_->task_stack_for(0)->pop<typeof(worker)>();
}
}
template<typename Task>
void scheduler::execute_task(Task& task, int depth) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
auto my_state = base::this_thread::state<thread_state>();
abstract_task* old_task;
abstract_task* new_task;
// Init Task
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
my_state->current_task_ = new_task;
old_task->set_child(new_task);
}
// Run Task
new_task->execute();
// Teardown state back to before the task was executed
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task->set_child(nullptr);
my_state->current_task_ = old_task;
my_state->task_stack_->pop<Task>();
}
}
}
}
namespace internal {
namespace scheduling {
template<typename Function>
void scheduler::perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work")
root_task<Function> master{work_section};
// Push root task on stacks
auto new_master = memory_->task_stack_for(0)->push(master);
memory_->thread_state_for(0)->root_task_ = new_master;
memory_->thread_state_for(0)->current_task_ = new_master;
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
auto new_worker = memory_->task_stack_for(0)->push(worker);
memory_->thread_state_for(i)->root_task_ = new_worker;
memory_->thread_state_for(i)->current_task_ = new_worker;
}
// Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
// Clean up stack
memory_->task_stack_for(0)->pop<typeof(master)>();
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
memory_->task_stack_for(0)->pop<typeof(worker)>();
}
}
template<typename Task>
void scheduler::execute_task(Task &task, int depth) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
auto my_state = base::this_thread::state<thread_state>();
abstract_task *old_task;
abstract_task *new_task;
// Init Task
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
my_state->current_task_ = new_task;
old_task->set_child(new_task);
}
// Run Task
new_task->execute();
// Teardown state back to before the task was executed
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task->set_child(nullptr);
my_state->current_task_ = old_task;
my_state->task_stack_->pop<Task>();
}
}
}
}
}
#endif //PLS_SCHEDULER_IMPL_H
......@@ -7,72 +7,75 @@
#define PLS_SCHEDULER_MEMORY_H
namespace pls {
namespace internal {
namespace scheduling {
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
namespace internal {
namespace scheduling {
class scheduler_memory {
public:
virtual size_t max_threads() const = 0;
virtual thread_state* thread_state_for(size_t id) = 0;
virtual scheduler_thread* thread_for(size_t id) = 0;
virtual data_structures::aligned_stack* task_stack_for(size_t id) = 0;
};
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
class static_scheduler_memory: public scheduler_memory {
// Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_thread_stack = base::alignment::aligned_wrapper<std::array<char, TASK_STACK_SIZE>>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
class scheduler_memory {
public:
virtual size_t max_threads() const = 0;
virtual thread_state *thread_state_for(size_t id) = 0;
virtual scheduler_thread *thread_for(size_t id) = 0;
virtual data_structures::aligned_stack *task_stack_for(size_t id) = 0;
};
std::array<aligned_thread, MAX_THREADS> threads_;
std::array<aligned_thread_state, MAX_THREADS> thread_states_;
std::array<aligned_thread_stack, MAX_THREADS> task_stacks_memory_;
std::array<aligned_aligned_stack, MAX_THREADS> task_stacks_;
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
class static_scheduler_memory : public scheduler_memory {
// Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_thread_stack = base::alignment::aligned_wrapper<std::array<char, TASK_STACK_SIZE>>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
public:
static_scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
new ((void*)task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(), TASK_STACK_SIZE);
}
}
std::array<aligned_thread, MAX_THREADS> threads_;
std::array<aligned_thread_state, MAX_THREADS> thread_states_;
std::array<aligned_thread_stack, MAX_THREADS> task_stacks_memory_;
std::array<aligned_aligned_stack, MAX_THREADS> task_stacks_;
size_t max_threads() const override { return MAX_THREADS; }
thread_state* thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
scheduler_thread* thread_for(size_t id) override { return threads_[id].pointer(); }
data_structures::aligned_stack* task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
};
public:
static_scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(),
TASK_STACK_SIZE);
}
}
class malloc_scheduler_memory: public scheduler_memory {
// Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
size_t max_threads() const override { return MAX_THREADS; }
thread_state *thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
scheduler_thread *thread_for(size_t id) override { return threads_[id].pointer(); }
data_structures::aligned_stack *task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
};
const size_t num_threads_;
class malloc_scheduler_memory : public scheduler_memory {
// Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
aligned_thread* threads_;
aligned_thread_state * thread_states_;
char** task_stacks_memory_;
aligned_aligned_stack * task_stacks_;
public:
explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16);
~malloc_scheduler_memory();
const size_t num_threads_;
size_t max_threads() const override { return num_threads_; }
thread_state* thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
scheduler_thread* thread_for(size_t id) override { return threads_[id].pointer(); }
data_structures::aligned_stack* task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
};
}
}
aligned_thread *threads_;
aligned_thread_state *thread_states_;
char **task_stacks_memory_;
aligned_aligned_stack *task_stacks_;
public:
explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16);
~malloc_scheduler_memory();
size_t max_threads() const override { return num_threads_; }
thread_state *thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
scheduler_thread *thread_for(size_t id) override { return threads_[id].pointer(); }
data_structures::aligned_stack *task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
};
}
}
}
#endif //PLS_SCHEDULER_MEMORY_H
......@@ -8,38 +8,40 @@
#include "abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
// forward declaration
class scheduler;
struct thread_state {
scheduler* scheduler_;
abstract_task* root_task_;
abstract_task* current_task_;
data_structures::aligned_stack* task_stack_;
size_t id_;
base::spin_lock lock_;
std::minstd_rand random_;
thread_state():
scheduler_{nullptr},
root_task_{nullptr},
current_task_{nullptr},
task_stack_{nullptr},
id_{0},
random_{id_} {};
thread_state(scheduler* scheduler, data_structures::aligned_stack* task_stack, unsigned int id):
scheduler_{scheduler},
root_task_{nullptr},
current_task_{nullptr},
task_stack_{task_stack},
id_{id},
random_{id_} {}
};
}
}
namespace internal {
namespace scheduling {
// forward declaration
class scheduler;
struct thread_state {
scheduler *scheduler_;
abstract_task *root_task_;
abstract_task *current_task_;
data_structures::aligned_stack *task_stack_;
size_t id_;
base::spin_lock lock_;
std::minstd_rand random_;
thread_state() :
scheduler_{nullptr},
root_task_{nullptr},
current_task_{nullptr},
task_stack_{nullptr},
id_{0},
random_{id_} {};
thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) :
scheduler_{scheduler},
root_task_{nullptr},
current_task_{nullptr},
task_stack_{task_stack},
id_{id},
random_{id_} {}
};
}
}
}
#endif //PLS_THREAD_STATE_H
......@@ -8,18 +8,20 @@
#include "pls/internal/helpers/unique_id.h"
namespace pls {
using internal::scheduling::static_scheduler_memory;
using internal::scheduling::malloc_scheduler_memory;
using internal::scheduling::scheduler;
using task_id = internal::scheduling::abstract_task::id;
using internal::scheduling::static_scheduler_memory;
using internal::scheduling::malloc_scheduler_memory;
using unique_id = internal::helpers::unique_id;
using internal::scheduling::scheduler;
using task_id = internal::scheduling::abstract_task::id;
using internal::scheduling::fork_join_sub_task;
using internal::scheduling::fork_join_task;
using unique_id = internal::helpers::unique_id;
using internal::scheduling::fork_join_sub_task;
using internal::scheduling::fork_join_task;
using algorithm::invoke_parallel;
using algorithm::invoke_parallel;
}
#endif
......@@ -2,26 +2,28 @@
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace base {
namespace alignment {
void* allocate_aligned(size_t size) {
return aligned_alloc(system_details::CACHE_LINE_SIZE, size);
}
namespace internal {
namespace base {
namespace alignment {
std::uintptr_t next_alignment(std::uintptr_t size) {
std::uintptr_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
return size + (base::system_details::CACHE_LINE_SIZE - miss_alignment);
}
}
void *allocate_aligned(size_t size) {
return aligned_alloc(system_details::CACHE_LINE_SIZE, size);
}
std::uintptr_t next_alignment(std::uintptr_t size) {
std::uintptr_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
return size + (base::system_details::CACHE_LINE_SIZE - miss_alignment);
}
}
char *next_alignment(char *pointer) {
return reinterpret_cast<char *>(next_alignment(reinterpret_cast<std::uintptr_t >(pointer)));
}
char* next_alignment(char* pointer) {
return reinterpret_cast<char*>(next_alignment(reinterpret_cast<std::uintptr_t >(pointer)));
}
}
}
}
}
}
}
}
#include "pls/internal/base/barrier.h"
namespace pls {
namespace internal {
namespace base {
barrier::barrier(const unsigned int count): barrier_{} {
pthread_barrier_init(&barrier_, nullptr, count);
}
namespace internal {
namespace base {
barrier::~barrier() {
pthread_barrier_destroy(&barrier_);
}
barrier::barrier(const unsigned int count) : barrier_{} {
pthread_barrier_init(&barrier_, nullptr, count);
}
barrier::~barrier() {
pthread_barrier_destroy(&barrier_);
}
void barrier::wait() {
pthread_barrier_wait(&barrier_);
}
}
}
void barrier::wait() {
pthread_barrier_wait(&barrier_);
}
}
}
}
......@@ -2,33 +2,35 @@
#include "pls/internal/base/tas_spin_lock.h"
namespace pls {
namespace internal {
namespace base {
void tas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock")
int tries = 0;
while (flag_.test_and_set(std::memory_order_acquire)) {
tries++;
if (tries % yield_at_tries_ == 0) {
this_thread::yield();
}
}
}
namespace internal {
namespace base {
bool tas_spin_lock::try_lock(unsigned int num_tries) {
PROFILE_LOCK("Try Acquire Lock")
while (flag_.test_and_set(std::memory_order_acquire)) {
num_tries--;
if (num_tries <= 0) {
return false;
}
}
return true;
}
void tas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock")
int tries = 0;
while (flag_.test_and_set(std::memory_order_acquire)) {
tries++;
if (tries % yield_at_tries_ == 0) {
this_thread::yield();
}
}
}
void tas_spin_lock::unlock() {
flag_.clear(std::memory_order_release);
}
}
bool tas_spin_lock::try_lock(unsigned int num_tries) {
PROFILE_LOCK("Try Acquire Lock")
while (flag_.test_and_set(std::memory_order_acquire)) {
num_tries--;
if (num_tries <= 0) {
return false;
}
}
return true;
}
void tas_spin_lock::unlock() {
flag_.clear(std::memory_order_release);
}
}
}
}
#include "pls/internal/base/thread.h"
namespace pls {
namespace internal {
namespace base {
namespace internal {
namespace base {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_key_t this_thread::local_storage_key_ = false;
bool this_thread::local_storage_key_initialized_;
pthread_key_t this_thread::local_storage_key_ = false;
bool this_thread::local_storage_key_initialized_;
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
__thread void* this_thread::local_state_;
__thread void *this_thread::local_state_;
#endif
// implementation in header (C++ templating)
}
}
// implementation in header (C++ templating)
}
}
}
......@@ -2,46 +2,48 @@
#include "pls/internal/base/ttas_spin_lock.h"
namespace pls {
namespace internal {
namespace base {
void ttas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock")
int tries = 0;
int expected = 0;
do {
while (flag_.load(std::memory_order_relaxed) == 1) {
tries++;
if (tries % yield_at_tries_ == 0) {
this_thread::yield();
}
}
expected = 0;
} while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire));
}
bool ttas_spin_lock::try_lock(unsigned int num_tries) {
PROFILE_LOCK("Try Acquire Lock")
int expected = 0;
do {
while (flag_.load(std::memory_order_relaxed) == 1) {
num_tries--;
if (num_tries <= 0) {
return false;
}
}
expected = 0;
} while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire));
return true;
}
void ttas_spin_lock::unlock() {
flag_.store(0, std::memory_order_release);
}
}
namespace internal {
namespace base {
void ttas_spin_lock::lock() {
PROFILE_LOCK("Acquire Lock")
int tries = 0;
int expected = 0;
do {
while (flag_.load(std::memory_order_relaxed) == 1) {
tries++;
if (tries % yield_at_tries_ == 0) {
this_thread::yield();
}
}
expected = 0;
} while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire));
}
bool ttas_spin_lock::try_lock(unsigned int num_tries) {
PROFILE_LOCK("Try Acquire Lock")
int expected = 0;
do {
while (flag_.load(std::memory_order_relaxed) == 1) {
num_tries--;
if (num_tries <= 0) {
return false;
}
}
expected = 0;
} while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_acquire));
return true;
}
void ttas_spin_lock::unlock() {
flag_.store(0, std::memory_order_release);
}
}
}
}
......@@ -2,12 +2,14 @@
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace data_structures {
aligned_stack::aligned_stack(char* memory_region, const std::size_t size):
memory_start_{memory_region},
memory_end_{memory_region + size},
head_{base::alignment::next_alignment(memory_start_)} {}
}
}
namespace internal {
namespace data_structures {
aligned_stack::aligned_stack(char *memory_region, const std::size_t size) :
memory_start_{memory_region},
memory_end_{memory_region + size},
head_{base::alignment::next_alignment(memory_start_)} {}
}
}
}
......@@ -3,56 +3,58 @@
#include "pls/internal/data_structures/deque.h"
namespace pls {
namespace internal {
namespace data_structures {
deque_item* deque_internal::pop_head_internal() {
std::lock_guard<base::spin_lock> lock{lock_};
if (head_ == nullptr) {
return nullptr;
}
deque_item* result = head_;
head_ = head_->prev_;
if (head_ == nullptr) {
tail_ = nullptr;
} else {
head_->next_ = nullptr;
}
return result;
}
deque_item* deque_internal::pop_tail_internal() {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ == nullptr) {
return nullptr;
}
deque_item* result = tail_;
tail_ = tail_->next_;
if (tail_ == nullptr) {
head_ = nullptr;
} else {
tail_->prev_ = nullptr;
}
return result;
}
void deque_internal::push_tail_internal(deque_item *new_item) {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ != nullptr) {
tail_->prev_ = new_item;
} else {
head_ = new_item;
}
new_item->next_ = tail_;
new_item->prev_ = nullptr;
tail_ = new_item;
}
}
}
namespace internal {
namespace data_structures {
deque_item *deque_internal::pop_head_internal() {
std::lock_guard<base::spin_lock> lock{lock_};
if (head_ == nullptr) {
return nullptr;
}
deque_item *result = head_;
head_ = head_->prev_;
if (head_ == nullptr) {
tail_ = nullptr;
} else {
head_->next_ = nullptr;
}
return result;
}
deque_item *deque_internal::pop_tail_internal() {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ == nullptr) {
return nullptr;
}
deque_item *result = tail_;
tail_ = tail_->next_;
if (tail_ == nullptr) {
head_ = nullptr;
} else {
tail_->prev_ = nullptr;
}
return result;
}
void deque_internal::push_tail_internal(deque_item *new_item) {
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ != nullptr) {
tail_->prev_ = new_item;
} else {
head_ = new_item;
}
new_item->next_ = tail_;
new_item->prev_ = nullptr;
tail_ = new_item;
}
}
}
}
......@@ -5,72 +5,74 @@
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
bool abstract_task::steal_work() {
PROFILE_STEALING("abstract_task::steal_work")
const auto my_state = base::this_thread::state<thread_state>();
const auto my_scheduler = my_state->scheduler_;
namespace internal {
namespace scheduling {
const size_t my_id = my_state->id_;
const size_t offset = my_state->random_() % my_scheduler->num_threads();
const size_t max_tries = 1; // my_scheduler->num_threads(); TODO: Tune this value
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % my_scheduler->num_threads();
if (target == my_id) {
continue;
}
auto target_state = my_scheduler->thread_state_for(target);
bool abstract_task::steal_work() {
PROFILE_STEALING("abstract_task::steal_work")
const auto my_state = base::this_thread::state<thread_state>();
const auto my_scheduler = my_state->scheduler_;
// TODO: Cleaner Locking Using std::guarded_lock
target_state->lock_.lock();
const size_t my_id = my_state->id_;
const size_t offset = my_state->random_() % my_scheduler->num_threads();
const size_t max_tries = 1; // my_scheduler->num_threads(); TODO: Tune this value
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % my_scheduler->num_threads();
if (target == my_id) {
continue;
}
auto target_state = my_scheduler->thread_state_for(target);
// Dig down to our level
PROFILE_STEALING("Go to our level")
abstract_task* current_task = target_state->root_task_;
while (current_task != nullptr && current_task->depth() < depth()) {
current_task = current_task->child_task_;
}
PROFILE_END_BLOCK
// TODO: Cleaner Locking Using std::guarded_lock
target_state->lock_.lock();
// Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation
PROFILE_STEALING("Internal Steal")
if (current_task != nullptr) {
// See if it equals our type and depth of task
if (current_task->unique_id_ == unique_id_ &&
current_task->depth_ == depth_) {
if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler
target_state->lock_.unlock();
return true;
}
// Dig down to our level
PROFILE_STEALING("Go to our level")
abstract_task *current_task = target_state->root_task_;
while (current_task != nullptr && current_task->depth() < depth()) {
current_task = current_task->child_task_;
}
PROFILE_END_BLOCK
// No success, we need to steal work from a deeper level using 'top level task stealing'
current_task = current_task->child_task_;
}
}
PROFILE_END_BLOCK;
// Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation
PROFILE_STEALING("Internal Steal")
if (current_task != nullptr) {
// See if it equals our type and depth of task
if (current_task->unique_id_ == unique_id_ &&
current_task->depth_ == depth_) {
if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler
target_state->lock_.unlock();
return true;
}
// No success, we need to steal work from a deeper level using 'top level task stealing'
current_task = current_task->child_task_;
}
}
PROFILE_END_BLOCK;
// Execute 'top level task steal' if possible
// (only try deeper tasks to keep depth restricted stealing).
PROFILE_STEALING("Top Level Steal")
while (current_task != nullptr) {
auto lock = &target_state->lock_;
if (current_task->split_task(lock)) {
// internal steal was no success (we did a top level task steal)
return false;
}
current_task = current_task->child_task_;
}
PROFILE_END_BLOCK;
target_state->lock_.unlock();
}
// Execute 'top level task steal' if possible
// (only try deeper tasks to keep depth restricted stealing).
PROFILE_STEALING("Top Level Steal")
while (current_task != nullptr) {
auto lock = &target_state->lock_;
if (current_task->split_task(lock)) {
// internal steal was no success (we did a top level task steal)
return false;
}
// internal steal was no success
return false;
};
}
current_task = current_task->child_task_;
}
PROFILE_END_BLOCK;
target_state->lock_.unlock();
}
// internal steal was no success
return false;
}
}
}
}
......@@ -4,131 +4,133 @@
#include "pls/internal/scheduling/fork_join_task.h"
namespace pls {
namespace internal {
namespace scheduling {
fork_join_sub_task::fork_join_sub_task():
data_structures::deque_item{},
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other):
data_structures::deque_item(other),
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
void fork_join_sub_task::execute() {
PROFILE_WORK_BLOCK("execute sub_task")
tbb_task_->currently_executing_ = this;
execute_internal();
tbb_task_->currently_executing_ = nullptr;
PROFILE_END_BLOCK
wait_for_all();
if (parent_ != nullptr) {
parent_->ref_count_--;
}
}
void fork_join_sub_task::spawn_child_internal(fork_join_sub_task* sub_task) {
// Keep our refcount up to date
ref_count_++;
// Assign forced values
sub_task->parent_ = this;
sub_task->tbb_task_ = tbb_task_;
sub_task->stack_state_ = tbb_task_->my_stack_->save_state();
tbb_task_->deque_.push_tail(sub_task);
}
void fork_join_sub_task::wait_for_all() {
while (ref_count_ > 0) {
PROFILE_STEALING("get local sub task")
fork_join_sub_task* local_task = tbb_task_->get_local_sub_task();
PROFILE_END_BLOCK
if (local_task != nullptr) {
local_task->execute();
} else {
// Try to steal work.
// External steal will be executed implicitly if success
PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work();
PROFILE_END_BLOCK
if (internal_steal_success) {
tbb_task_->last_stolen_->execute();
}
}
}
tbb_task_->my_stack_->reset_state(stack_state_);
}
fork_join_sub_task* fork_join_task::get_local_sub_task() {
return deque_.pop_tail();
}
fork_join_sub_task* fork_join_task::get_stolen_sub_task() {
return deque_.pop_head();
}
bool fork_join_task::internal_stealing(abstract_task* other_task) {
PROFILE_STEALING("fork_join_task::internal_stealin")
auto cast_other_task = reinterpret_cast<fork_join_task*>(other_task);
auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
} else {
// Make sub-task belong to our fork_join_task instance
stolen_sub_task->tbb_task_ = this;
stolen_sub_task->stack_state_ = my_stack_->save_state();
// We will execute this next without explicitly moving it onto our stack storage
last_stolen_ = stolen_sub_task;
return true;
}
}
bool fork_join_task::split_task(base::spin_lock* lock) {
PROFILE_STEALING("fork_join_task::split_task")
fork_join_sub_task* stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
}
fork_join_task task{stolen_sub_task, this->unique_id()};
// In success case, unlock.
// TODO: this locking is complicated and error prone.
lock->unlock();
scheduler::execute_task(task, depth());
return true;
}
void fork_join_task::execute() {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
root_task_->tbb_task_ = this;
root_task_->stack_state_ = my_stack_->save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
}
fork_join_sub_task* fork_join_task::currently_executing() const { return currently_executing_; }
fork_join_task::fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id):
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr},
deque_{},
last_stolen_{nullptr} {};
}
namespace internal {
namespace scheduling {
fork_join_sub_task::fork_join_sub_task() :
data_structures::deque_item{},
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task &other) :
data_structures::deque_item(other),
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
void fork_join_sub_task::execute() {
PROFILE_WORK_BLOCK("execute sub_task")
tbb_task_->currently_executing_ = this;
execute_internal();
tbb_task_->currently_executing_ = nullptr;
PROFILE_END_BLOCK
wait_for_all();
if (parent_ != nullptr) {
parent_->ref_count_--;
}
}
void fork_join_sub_task::spawn_child_internal(fork_join_sub_task *sub_task) {
// Keep our refcount up to date
ref_count_++;
// Assign forced values
sub_task->parent_ = this;
sub_task->tbb_task_ = tbb_task_;
sub_task->stack_state_ = tbb_task_->my_stack_->save_state();
tbb_task_->deque_.push_tail(sub_task);
}
void fork_join_sub_task::wait_for_all() {
while (ref_count_ > 0) {
PROFILE_STEALING("get local sub task")
fork_join_sub_task *local_task = tbb_task_->get_local_sub_task();
PROFILE_END_BLOCK
if (local_task != nullptr) {
local_task->execute();
} else {
// Try to steal work.
// External steal will be executed implicitly if success
PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work();
PROFILE_END_BLOCK
if (internal_steal_success) {
tbb_task_->last_stolen_->execute();
}
}
}
tbb_task_->my_stack_->reset_state(stack_state_);
}
fork_join_sub_task *fork_join_task::get_local_sub_task() {
return deque_.pop_tail();
}
fork_join_sub_task *fork_join_task::get_stolen_sub_task() {
return deque_.pop_head();
}
bool fork_join_task::internal_stealing(abstract_task *other_task) {
PROFILE_STEALING("fork_join_task::internal_stealin")
auto cast_other_task = reinterpret_cast<fork_join_task *>(other_task);
auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
} else {
// Make sub-task belong to our fork_join_task instance
stolen_sub_task->tbb_task_ = this;
stolen_sub_task->stack_state_ = my_stack_->save_state();
// We will execute this next without explicitly moving it onto our stack storage
last_stolen_ = stolen_sub_task;
return true;
}
}
bool fork_join_task::split_task(base::spin_lock *lock) {
PROFILE_STEALING("fork_join_task::split_task")
fork_join_sub_task *stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
}
fork_join_task task{stolen_sub_task, this->unique_id()};
// In success case, unlock.
// TODO: this locking is complicated and error prone.
lock->unlock();
scheduler::execute_task(task, depth());
return true;
}
void fork_join_task::execute() {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
root_task_->tbb_task_ = this;
root_task_->stack_state_ = my_stack_->save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
}
fork_join_sub_task *fork_join_task::currently_executing() const { return currently_executing_; }
fork_join_task::fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id) :
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr},
deque_{},
last_stolen_{nullptr} {}
}
}
}
#include "pls/internal/scheduling/root_task.h"
namespace pls {
namespace internal {
namespace scheduling {
namespace internal {
namespace scheduling {
}
}
}
}
}
#include "pls/internal/scheduling/run_on_n_threads_task.h"
namespace pls {
namespace internal {
namespace scheduling {
namespace internal {
namespace scheduling {
}
}
}
}
}
......@@ -2,60 +2,63 @@
#include "pls/internal/base/error_handling.h"
namespace pls {
namespace internal {
namespace scheduling {
scheduler::scheduler(scheduler_memory* memory, const unsigned int num_threads):
num_threads_{num_threads},
memory_{memory},
sync_barrier_{num_threads + 1},
terminated_{false} {
if (num_threads_ > memory_->max_threads()) {
PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
}
for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized.
new((void*)memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
new ((void*)memory_->thread_for(i))base::thread<void(*)(), thread_state>(&worker_routine, memory_->thread_state_for(i));
}
}
scheduler::~scheduler() {
terminate();
}
void worker_routine() {
auto my_state = base::this_thread::state<thread_state>();
while (true) {
my_state->scheduler_->sync_barrier_.wait();
if (my_state->scheduler_->terminated_) {
return;
}
// The root task must only return when all work is done,
// because of this a simple call is enough to ensure the
// fork-join-section is done (logically joined back into our main thread).
my_state->root_task_->execute();
my_state->scheduler_->sync_barrier_.wait();
}
}
void scheduler::terminate(bool wait_for_workers) {
if (terminated_) {
return;
}
terminated_ = true;
sync_barrier_.wait();
if (wait_for_workers) {
for (unsigned int i = 0; i < num_threads_; i++) {
memory_->thread_for(i)->join();
}
}
}
}
namespace internal {
namespace scheduling {
scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) :
num_threads_{num_threads},
memory_{memory},
sync_barrier_{num_threads + 1},
terminated_{false} {
if (num_threads_ > memory_->max_threads()) {
PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
}
for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized.
new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&worker_routine,
memory_->thread_state_for(i));
}
}
scheduler::~scheduler() {
terminate();
}
void worker_routine() {
auto my_state = base::this_thread::state<thread_state>();
while (true) {
my_state->scheduler_->sync_barrier_.wait();
if (my_state->scheduler_->terminated_) {
return;
}
// The root task must only return when all work is done,
// because of this a simple call is enough to ensure the
// fork-join-section is done (logically joined back into our main thread).
my_state->root_task_->execute();
my_state->scheduler_->sync_barrier_.wait();
}
}
void scheduler::terminate(bool wait_for_workers) {
if (terminated_) {
return;
}
terminated_ = true;
sync_barrier_.wait();
if (wait_for_workers) {
for (unsigned int i = 0; i < num_threads_; i++) {
memory_->thread_for(i)->join();
}
}
}
}
}
}
#include "pls/internal/scheduling/scheduler_memory.h"
namespace pls {
namespace internal {
namespace scheduling {
malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const size_t memory_per_stack):
num_threads_{num_threads} {
threads_ = reinterpret_cast<aligned_thread *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread)));
thread_states_ = reinterpret_cast<aligned_thread_state *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread_state)));
namespace internal {
namespace scheduling {
task_stacks_ = reinterpret_cast<aligned_aligned_stack *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_aligned_stack)));
task_stacks_memory_ = reinterpret_cast<char**>(base::alignment::allocate_aligned(num_threads * sizeof(char*)));
for (size_t i = 0; i < num_threads_; i++) {
task_stacks_memory_[i] = reinterpret_cast<char*>(base::alignment::allocate_aligned(memory_per_stack));
new ((void*)task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack);
}
}
malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const size_t memory_per_stack) :
num_threads_{num_threads} {
threads_ =
reinterpret_cast<aligned_thread *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread)));
thread_states_ = reinterpret_cast<aligned_thread_state *>(base::alignment::allocate_aligned(
num_threads * sizeof(aligned_thread_state)));
malloc_scheduler_memory::~malloc_scheduler_memory() {
free(threads_);
free(thread_states_);
task_stacks_ = reinterpret_cast<aligned_aligned_stack *>(base::alignment::allocate_aligned(
num_threads * sizeof(aligned_aligned_stack)));
task_stacks_memory_ = reinterpret_cast<char **>(base::alignment::allocate_aligned(num_threads * sizeof(char *)));
for (size_t i = 0; i < num_threads_; i++) {
task_stacks_memory_[i] = reinterpret_cast<char *>(base::alignment::allocate_aligned(memory_per_stack));
new((void *) task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack);
}
}
malloc_scheduler_memory::~malloc_scheduler_memory() {
free(threads_);
free(thread_states_);
for (size_t i = 0; i < num_threads_; i++) {
free(task_stacks_memory_[i]);
}
free(task_stacks_);
free(task_stacks_memory_);
}
for (size_t i = 0; i < num_threads_; i++) {
free(task_stacks_memory_[i]);
}
free(task_stacks_);
free(task_stacks_memory_);
}
}
}
}
}
}
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
namespace internal {
namespace scheduling {
}
}
}
}
}
......@@ -13,73 +13,73 @@ static bool base_tests_visited;
static int base_tests_local_value_one;
static vector<int> base_tests_local_value_two;
TEST_CASE( "thread creation and joining", "[internal/data_structures/thread.h]") {
base_tests_visited = false;
auto t1 = start_thread([]() { base_tests_visited = true; });
t1.join();
TEST_CASE("thread creation and joining", "[internal/data_structures/thread.h]") {
base_tests_visited = false;
auto t1 = start_thread([]() { base_tests_visited = true; });
t1.join();
REQUIRE(base_tests_visited);
REQUIRE(base_tests_visited);
}
TEST_CASE( "thread state", "[internal/data_structures/thread.h]") {
int state_one = 1;
vector<int> state_two{1, 2};
TEST_CASE("thread state", "[internal/data_structures/thread.h]") {
int state_one = 1;
vector<int> state_two{1, 2};
auto t1 = start_thread([]() { base_tests_local_value_one = *this_thread::state<int>(); }, &state_one);
auto t2 = start_thread([]() { base_tests_local_value_two = *this_thread::state<vector<int>>(); }, &state_two);
t1.join();
t2.join();
auto t1 = start_thread([]() { base_tests_local_value_one = *this_thread::state<int>(); }, &state_one);
auto t2 = start_thread([]() { base_tests_local_value_two = *this_thread::state<vector<int>>(); }, &state_two);
t1.join();
t2.join();
REQUIRE(base_tests_local_value_one == 1);
REQUIRE(base_tests_local_value_two == vector<int>{1, 2});
REQUIRE(base_tests_local_value_one == 1);
REQUIRE(base_tests_local_value_two == vector<int>{1, 2});
}
int base_tests_shared_counter;
TEST_CASE( "spinlock protects concurrent counter", "[internal/data_structures/spinlock.h]") {
constexpr int num_iterations = 1000000;
base_tests_shared_counter = 0;
spin_lock lock{};
SECTION( "lock can be used by itself" ) {
auto t1 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) {
lock.lock();
base_tests_shared_counter++;
lock.unlock();
}
});
auto t2 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) {
lock.lock();
base_tests_shared_counter--;
lock.unlock();
}
});
t1.join();
t2.join();
REQUIRE(base_tests_shared_counter == 0);
}
SECTION( "lock can be used with std::lock_guard" ) {
auto t1 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) {
std::lock_guard<spin_lock> my_lock{lock};
base_tests_shared_counter++;
}
});
auto t2 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) {
std::lock_guard<spin_lock> my_lock{lock};
base_tests_shared_counter--;
}
});
t1.join();
t2.join();
REQUIRE(base_tests_shared_counter == 0);
}
TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spinlock.h]") {
constexpr int num_iterations = 1000000;
base_tests_shared_counter = 0;
spin_lock lock{};
SECTION("lock can be used by itself") {
auto t1 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) {
lock.lock();
base_tests_shared_counter++;
lock.unlock();
}
});
auto t2 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) {
lock.lock();
base_tests_shared_counter--;
lock.unlock();
}
});
t1.join();
t2.join();
REQUIRE(base_tests_shared_counter == 0);
}
SECTION("lock can be used with std::lock_guard") {
auto t1 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) {
std::lock_guard<spin_lock> my_lock{lock};
base_tests_shared_counter++;
}
});
auto t2 = start_thread([&]() {
for (int i = 0; i < num_iterations; i++) {
std::lock_guard<spin_lock> my_lock{lock};
base_tests_shared_counter--;
}
});
t1.join();
t2.join();
REQUIRE(base_tests_shared_counter == 0);
}
}
......@@ -12,122 +12,121 @@ using namespace pls::internal::data_structures;
using namespace pls::internal::base;
using namespace std;
TEST_CASE( "aligned stack stores objects correctly", "[internal/data_structures/aligned_stack.h]") {
constexpr long data_size = 1024;
char data[data_size];
aligned_stack stack{data, data_size};
SECTION( "stack correctly pushes sub linesize objects" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
std::array<char, 1> small_data_three{'A'};
auto pointer_one = stack.push(small_data_one);
auto pointer_two = stack.push(small_data_two);
auto pointer_three = stack.push(small_data_three);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_one) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_two) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_three) % system_details::CACHE_LINE_SIZE == 0);
}
SECTION( "stack correctly pushes above linesize objects" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, system_details::CACHE_LINE_SIZE + 10> big_data_one{};
auto big_pointer_one = stack.push(big_data_one);
auto small_pointer_one = stack.push(small_data_one);
REQUIRE(reinterpret_cast<std::uintptr_t>(big_pointer_one) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(small_pointer_one) % system_details::CACHE_LINE_SIZE == 0);
}
SECTION( "stack correctly stores and retrieves objects" ) {
std::array<char, 5> data_one{'a', 'b', 'c', 'd', 'e'};
stack.push(data_one);
auto retrieved_data = stack.pop<std::array<char, 5>>();
REQUIRE(retrieved_data == std::array<char, 5>{'a', 'b', 'c', 'd', 'e'});
}
SECTION( "stack can push and pop multiple times with correct alignment" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
std::array<char, 1> small_data_three{'A'};
auto pointer_one = stack.push(small_data_one);
auto pointer_two = stack.push(small_data_two);
auto pointer_three = stack.push(small_data_three);
stack.pop<typeof(small_data_three)>();
stack.pop<typeof(small_data_two)>();
auto pointer_four = stack.push(small_data_two);
auto pointer_five = stack.push(small_data_three);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_one) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_two) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_three) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_four) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_five) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(pointer_four == pointer_two);
REQUIRE(pointer_five == pointer_three);
}
TEST_CASE("aligned stack stores objects correctly", "[internal/data_structures/aligned_stack.h]") {
constexpr long data_size = 1024;
char data[data_size];
aligned_stack stack{data, data_size};
SECTION("stack correctly pushes sub linesize objects") {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
std::array<char, 1> small_data_three{'A'};
auto pointer_one = stack.push(small_data_one);
auto pointer_two = stack.push(small_data_two);
auto pointer_three = stack.push(small_data_three);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_one) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_two) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_three) % system_details::CACHE_LINE_SIZE == 0);
}
SECTION("stack correctly pushes above linesize objects") {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, system_details::CACHE_LINE_SIZE + 10> big_data_one{};
auto big_pointer_one = stack.push(big_data_one);
auto small_pointer_one = stack.push(small_data_one);
REQUIRE(reinterpret_cast<std::uintptr_t>(big_pointer_one) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(small_pointer_one) % system_details::CACHE_LINE_SIZE == 0);
}
SECTION("stack correctly stores and retrieves objects") {
std::array<char, 5> data_one{'a', 'b', 'c', 'd', 'e'};
stack.push(data_one);
auto retrieved_data = stack.pop<std::array<char, 5>>();
REQUIRE(retrieved_data == std::array<char, 5>{'a', 'b', 'c', 'd', 'e'});
}
SECTION("stack can push and pop multiple times with correct alignment") {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
std::array<char, 1> small_data_three{'A'};
auto pointer_one = stack.push(small_data_one);
auto pointer_two = stack.push(small_data_two);
auto pointer_three = stack.push(small_data_three);
stack.pop<typeof(small_data_three)>();
stack.pop<typeof(small_data_two)>();
auto pointer_four = stack.push(small_data_two);
auto pointer_five = stack.push(small_data_three);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_one) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_two) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_three) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_four) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_five) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(pointer_four == pointer_two);
REQUIRE(pointer_five == pointer_three);
}
}
TEST_CASE( "deque stores objects correctly", "[internal/data_structures/deque.h]") {
class my_item: public deque_item {
TEST_CASE("deque stores objects correctly", "[internal/data_structures/deque.h]") {
class my_item : public deque_item {
};
};
deque<my_item> deque;
my_item one, two, three;
deque<my_item> deque;
my_item one, two, three;
SECTION( "add and remove items form the tail" ) {
deque.push_tail(&one);
deque.push_tail(&two);
deque.push_tail(&three);
SECTION("add and remove items form the tail") {
deque.push_tail(&one);
deque.push_tail(&two);
deque.push_tail(&three);
REQUIRE(deque.pop_tail() == &three);
REQUIRE(deque.pop_tail() == &two);
REQUIRE(deque.pop_tail() == &one);
}
REQUIRE(deque.pop_tail() == &three);
REQUIRE(deque.pop_tail() == &two);
REQUIRE(deque.pop_tail() == &one);
}
SECTION( "handles getting empty by popping the tail correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_tail() == &one);
SECTION("handles getting empty by popping the tail correctly") {
deque.push_tail(&one);
REQUIRE(deque.pop_tail() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_tail() == &two);
}
deque.push_tail(&two);
REQUIRE(deque.pop_tail() == &two);
}
SECTION( "remove items form the head" ) {
deque.push_tail(&one);
deque.push_tail(&two);
deque.push_tail(&three);
SECTION("remove items form the head") {
deque.push_tail(&one);
deque.push_tail(&two);
deque.push_tail(&three);
REQUIRE(deque.pop_head() == &one);
REQUIRE(deque.pop_head() == &two);
REQUIRE(deque.pop_head() == &three);
}
REQUIRE(deque.pop_head() == &one);
REQUIRE(deque.pop_head() == &two);
REQUIRE(deque.pop_head() == &three);
}
SECTION( "handles getting empty by popping the head correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_head() == &one);
SECTION("handles getting empty by popping the head correctly") {
deque.push_tail(&one);
REQUIRE(deque.pop_head() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_head() == &two);
}
deque.push_tail(&two);
REQUIRE(deque.pop_head() == &two);
}
SECTION( "handles getting empty by popping the head and tail correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_tail() == &one);
SECTION("handles getting empty by popping the head and tail correctly") {
deque.push_tail(&one);
REQUIRE(deque.pop_tail() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_head() == &two);
deque.push_tail(&two);
REQUIRE(deque.pop_head() == &two);
deque.push_tail(&three);
REQUIRE(deque.pop_tail() == &three);
}
deque.push_tail(&three);
REQUIRE(deque.pop_tail() == &three);
}
}
......@@ -4,76 +4,75 @@
using namespace pls;
class once_sub_task: public fork_join_sub_task {
std::atomic<int>* counter_;
int children_;
class once_sub_task : public fork_join_sub_task {
std::atomic<int> *counter_;
int children_;
protected:
void execute_internal() override {
(*counter_)++;
for (int i = 0; i < children_; i++) {
spawn_child(once_sub_task(counter_, children_ - 1));
}
protected:
void execute_internal() override {
(*counter_)++;
for (int i = 0; i < children_; i++) {
spawn_child(once_sub_task(counter_, children_ - 1));
}
}
public:
explicit once_sub_task(std::atomic<int>* counter, int children):
fork_join_sub_task(),
counter_{counter},
children_{children} {}
public:
explicit once_sub_task(std::atomic<int> *counter, int children) :
fork_join_sub_task(),
counter_{counter},
children_{children} {}
};
class force_steal_sub_task: public fork_join_sub_task {
std::atomic<int>* parent_counter_;
std::atomic<int>* overall_counter_;
class force_steal_sub_task : public fork_join_sub_task {
std::atomic<int> *parent_counter_;
std::atomic<int> *overall_counter_;
protected:
void execute_internal() override {
(*overall_counter_)--;
if (overall_counter_->load() > 0) {
std::atomic<int> counter{1};
spawn_child(force_steal_sub_task(&counter, overall_counter_));
while (counter.load() > 0)
; // Spin...
}
(*parent_counter_)--;
protected:
void execute_internal() override {
(*overall_counter_)--;
if (overall_counter_->load() > 0) {
std::atomic<int> counter{1};
spawn_child(force_steal_sub_task(&counter, overall_counter_));
while (counter.load() > 0); // Spin...
}
public:
explicit force_steal_sub_task(std::atomic<int>* parent_counter, std::atomic<int>* overall_counter):
fork_join_sub_task(),
parent_counter_{parent_counter},
overall_counter_{overall_counter} {}
(*parent_counter_)--;
}
public:
explicit force_steal_sub_task(std::atomic<int> *parent_counter, std::atomic<int> *overall_counter) :
fork_join_sub_task(),
parent_counter_{parent_counter},
overall_counter_{overall_counter} {}
};
TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") {
malloc_scheduler_memory my_scheduler_memory{8, 2 << 12};
TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") {
malloc_scheduler_memory my_scheduler_memory{8, 2 << 12};
SECTION("tasks are executed exactly once") {
scheduler my_scheduler{&my_scheduler_memory, 2};
int start_counter = 4;
int total_tasks = 1 + 4 + 4 * 3 + 4 * 3 * 2 + 4 * 3 * 2 * 1;
std::atomic<int> counter{0};
SECTION("tasks are executed exactly once") {
scheduler my_scheduler{&my_scheduler_memory, 2};
int start_counter = 4;
int total_tasks = 1 + 4 + 4 * 3 + 4 * 3 * 2 + 4 * 3 * 2 * 1;
std::atomic<int> counter{0};
my_scheduler.perform_work([&] (){
once_sub_task sub_task{&counter, start_counter};
fork_join_task task{&sub_task, unique_id::create(42)};
scheduler::execute_task(task);
});
my_scheduler.perform_work([&]() {
once_sub_task sub_task{&counter, start_counter};
fork_join_task task{&sub_task, unique_id::create(42)};
scheduler::execute_task(task);
});
REQUIRE(counter.load() == total_tasks);
my_scheduler.terminate(true);
}
REQUIRE(counter.load() == total_tasks);
my_scheduler.terminate(true);
}
SECTION("tasks can be stolen") {
scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([&] (){
std::atomic<int> dummy_parent{1}, overall_counter{8};
force_steal_sub_task sub_task{&dummy_parent, &overall_counter};
fork_join_task task{&sub_task, unique_id::create(42)};
scheduler::execute_task(task);
});
my_scheduler.terminate(true);
}
SECTION("tasks can be stolen") {
scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([&]() {
std::atomic<int> dummy_parent{1}, overall_counter{8};
force_steal_sub_task sub_task{&dummy_parent, &overall_counter};
fork_join_task task{&sub_task, unique_id::create(42)};
scheduler::execute_task(task);
});
my_scheduler.terminate(true);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment