Commit 6802681d by FritzFlorian

Move functions using templates to *_impl.h header files.

parent a8a35479
Pipeline #1148 passed with stages
in 3 minutes 34 seconds
......@@ -4,7 +4,19 @@ A collection of stuff that we noticed during development.
Useful later on two write a project report and to go back
in time to find out why certain decisions where made.
## 09.02.2019 - Cache Alignment
## 11.04.2019 - Notes on C++ Templating
After working more with templating and talking to mike,
it seems like the common way to go is the following:
- If possible, add template arguments to
data containers only (separate from logic).
- If logic and data are coupled (like often with lambdas),
add the declaration of the interface into the normal header
some_class.h and add it's implementation into an extra implementation
file some_class_impl.h that is included at the end of the file.
## 09.04.2019 - Cache Alignment
Aligning the cache needs all parts (both data types with correct alignment
and base memory with correct alignment).
......
......@@ -2,16 +2,19 @@
add_library(pls STATIC
include/pls/pls.h src/pls.cpp
include/pls/algorithms/invoke_parallel.h src/algorithms/invoke_parallel.cpp
include/pls/algorithms/invoke_parallel.h
include/pls/algorithms/invoke_parallel_impl.h
include/pls/internal/base/spin_lock.h src/internal/base/spin_lock.cpp
include/pls/internal/base/thread.h src/internal/base/thread.cpp
include/pls/internal/base/thread_impl.h
include/pls/internal/base/barrier.h src/internal/base/barrier.cpp
include/pls/internal/base/system_details.h
include/pls/internal/base/error_handling.h
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp
include/pls/internal/helpers/prohibit_new.h
......@@ -22,6 +25,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp
include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
......
......@@ -7,44 +7,15 @@
namespace pls {
namespace algorithm {
namespace internal {
using namespace ::pls::internal::scheduling;
template<typename Body>
inline void run_body(const Body& internal_body, const abstract_task::id& id) {
// Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
auto current_sub_task = reinterpret_cast<fork_join_task*>(current_task)->currently_executing();
internal_body(current_sub_task);
} else {
fork_join_lambda<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
template<typename Function1, typename Function2>
void invoke_parallel(const Function1& function1, const Function2& function2) {
using namespace ::pls::internal::scheduling;
static abstract_task::id id{PLS_UNIQUE_ID, true};
auto internal_body = [&] (fork_join_sub_task* this_task){
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
this_task->spawn_child(sub_task_1);
function2(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
void invoke_parallel(const Function1& function1, const Function2& function2);
internal::run_body(internal_body, id);
}
template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3);
// ...and so on, add more if we decide to keep this design
}
}
#include "invoke_parallel_impl.h"
#endif //PLS_PARALLEL_INVOKE_H
#ifndef PLS_INVOKE_PARALLEL_IMPL_H
#define PLS_INVOKE_PARALLEL_IMPL_H
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
namespace algorithm {
namespace internal {
using namespace ::pls::internal::scheduling;
template<typename Body>
inline void run_body(const Body& internal_body, const abstract_task::id& id) {
// Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
auto current_sub_task = reinterpret_cast<fork_join_task*>(current_task)->currently_executing();
internal_body(current_sub_task);
} else {
fork_join_lambda<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
template<typename Function1, typename Function2>
void invoke_parallel(const Function1& function1, const Function2& function2) {
using namespace ::pls::internal::scheduling;
static abstract_task::id id{PLS_UNIQUE_ID, true};
auto internal_body = [&] (fork_join_sub_task* this_task){
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
this_task->spawn_child(sub_task_1);
function2(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
internal::run_body(internal_body, id);
}
template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3) {
using namespace ::pls::internal::scheduling;
static abstract_task::id id{PLS_UNIQUE_ID, true};
auto internal_body = [&] (fork_join_sub_task* this_task){
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
auto sub_task_body_2 = [&] (fork_join_sub_task*){ function2(); };
auto sub_task_2 = fork_join_lambda<decltype(sub_task_body_2)>(&sub_task_body_2);
this_task->spawn_child(sub_task_1);
this_task->spawn_child(sub_task_2);
function3(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
internal::run_body(internal_body, id);
}
}
}
#endif //PLS_INVOKE_PARALLEL_IMPL_H
......@@ -14,7 +14,6 @@ namespace pls {
template<typename T>
struct aligned_wrapper {
alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)];
T* pointer() { return reinterpret_cast<T*>(data); }
};
void* allocate_aligned(size_t size);
......
......@@ -49,14 +49,7 @@ namespace pls {
* @return The state pointer hold for this thread.
*/
template<typename T>
static T* state() {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
return reinterpret_cast<T*>(pthread_getspecific(local_storage_key_));
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
return reinterpret_cast<T*>(local_state_);
#endif
}
static T* state();
/**
* Stores a pointer to the thread local state object.
......@@ -67,18 +60,11 @@ namespace pls {
* @param state_pointer A pointer to the threads state object.
*/
template<typename T>
static void set_state(T* state_pointer) {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
local_state_ = state_pointer;
#endif
}
static void set_state(T* state_pointer);
};
/**
* Abstraction for starting a function in a sparate thread.
* Abstraction for starting a function in a separate thread.
*
* @tparam Function Lambda being started on the new thread.
* @tparam State State type held for this thread.
......@@ -108,52 +94,13 @@ namespace pls {
// Keep handle to native implementation
pthread_t pthread_thread_;
static void* start_pthread_internal(void* thread_pointer) {
auto my_thread = reinterpret_cast<thread*>(thread_pointer);
Function my_function_copy = my_thread->function_;
State* my_state_pointer_copy = my_thread->state_pointer_;
// Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more
// references to its memory location).
my_thread->startup_flag_->clear();
this_thread::set_state(my_state_pointer_copy);
my_function_copy();
// Finished executing the user function
pthread_exit(nullptr);
}
static void* start_pthread_internal(void* thread_pointer);
public:
thread(): function_{}, state_pointer_{nullptr}, startup_flag_{nullptr}, pthread_thread_{} {}
explicit thread(const Function& function, State* state_pointer):
function_{function},
state_pointer_{state_pointer},
startup_flag_{nullptr},
pthread_thread_{} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
if (!this_thread::local_storage_key_initialized_) {
pthread_key_create(&this_thread::local_storage_key_, nullptr);
this_thread::local_storage_key_initialized_ = true;
}
#endif
explicit thread(const Function& function, State* state_pointer);
// We only need this during startup, will be destroyed when out of scope
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
startup_flag_ = &startup_flag;
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this));
while (startup_flag.test_and_set())
; // Busy waiting for the starting flag to clear
}
public:
void join() {
pthread_join(pthread_thread_, nullptr);
}
void join();
// make object move only
thread(thread&&) noexcept = default;
......@@ -164,17 +111,12 @@ namespace pls {
};
template<typename Function, typename State>
thread<Function, State> start_thread(const Function& function, State* state_pointer) {
return thread<Function, State>(function, state_pointer);
}
thread<Function, State> start_thread(const Function& function, State* state_pointer);
template<typename Function>
thread<Function, void> start_thread(const Function& function) {
return thread<Function, void>(function, nullptr);
}
thread<Function, void> start_thread(const Function& function);
}
}
}
#include "thread_impl.h"
#endif //PLS_THREAD_H
#ifndef PLS_THREAD_IMPL_H
#define PLS_THREAD_IMPL_H
namespace pls {
namespace internal {
namespace base {
template<typename T>
T* this_thread::state() {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
return reinterpret_cast<T*>(pthread_getspecific(local_storage_key_));
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
return reinterpret_cast<T*>(local_state_);
#endif
}
template<typename T>
void this_thread::set_state(T* state_pointer) {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
local_state_ = state_pointer;
#endif
}
template<typename Function, typename State>
void* thread<Function, State>::start_pthread_internal(void* thread_pointer) {
auto my_thread = reinterpret_cast<thread*>(thread_pointer);
Function my_function_copy = my_thread->function_;
State* my_state_pointer_copy = my_thread->state_pointer_;
// Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more
// references to its memory location).
my_thread->startup_flag_->clear();
this_thread::set_state(my_state_pointer_copy);
my_function_copy();
// Finished executing the user function
pthread_exit(nullptr);
}
template<typename Function, typename State>
thread<Function, State>::thread(const Function& function, State* state_pointer):
function_{function},
state_pointer_{state_pointer},
startup_flag_{nullptr},
pthread_thread_{} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
if (!this_thread::local_storage_key_initialized_) {
pthread_key_create(&this_thread::local_storage_key_, nullptr);
this_thread::local_storage_key_initialized_ = true;
}
#endif
// We only need this during startup, will be destroyed when out of scope
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
startup_flag_ = &startup_flag;
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this));
while (startup_flag.test_and_set())
; // Busy waiting for the starting flag to clear
}
template<typename Function, typename State>
void thread<Function, State>::join() {
pthread_join(pthread_thread_, nullptr);
}
template<typename Function, typename State>
thread<Function, State> start_thread(const Function& function, State* state_pointer) {
return thread<Function, State>(function, state_pointer);
}
template<typename Function>
thread<Function, void> start_thread(const Function& function) {
return thread<Function, void>(function, nullptr);
}
}
}
}
#endif //PLS_THREAD_IMPL_H
......@@ -37,40 +37,18 @@ namespace pls {
aligned_stack(char* memory_region, std::size_t size);
template<typename T>
T* push(const T& object) {
// Copy-Construct
return new ((void*)push<T>())T(object);
}
T* push(const T& object);
template<typename T>
void* push() {
void* result = reinterpret_cast<T*>(head_);
// Move head to next aligned position after new object
head_ = base::alignment::next_alignment(head_ + sizeof(T));
if (head_ >= memory_end_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
return result;
}
void* push();
template<typename T>
T pop() {
head_ = head_ - base::alignment::next_alignment(sizeof(T));
return *reinterpret_cast<T*>(head_);
}
state save_state() {
return head_;
}
T pop();
void reset_state(state new_state) {
head_ = new_state;
}
state save_state() const { return head_; }
void reset_state(state new_state) { head_ = new_state; }
};
}
}
}
#include "aligned_stack_impl.h"
#endif //PLS_ALIGNED_STACK_H
#ifndef PLS_ALIGNED_STACK_IMPL_H
#define PLS_ALIGNED_STACK_IMPL_H
namespace pls {
namespace internal {
namespace data_structures {
template<typename T>
T* aligned_stack::push(const T& object) {
// Copy-Construct
return new ((void*)push<T>())T(object);
}
template<typename T>
void* aligned_stack::push() {
void* result = reinterpret_cast<T*>(head_);
// Move head to next aligned position after new object
head_ = base::alignment::next_alignment(head_ + sizeof(T));
if (head_ >= memory_end_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
return result;
}
template<typename T>
T aligned_stack::pop() {
head_ = head_ - base::alignment::next_alignment(sizeof(T));
return *reinterpret_cast<T*>(head_);
}
}
}
}
#endif //PLS_ALIGNED_STACK_IMPL_H
......@@ -77,27 +77,9 @@ namespace pls {
bool split_task(base::spin_lock* /*lock*/) override;
public:
explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id):
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr},
deque_{},
last_stolen_{nullptr} {};
void execute() override {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
root_task_->tbb_task_ = this;
root_task_->stack_state_ = my_stack_->save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
}
fork_join_sub_task* currently_executing() const { return currently_executing_; }
explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id);
void execute() override;
fork_join_sub_task* currently_executing() const;
};
template<typename T>
......
......@@ -34,76 +34,34 @@ namespace pls {
explicit scheduler(scheduler_memory* memory, unsigned int num_threads);
~scheduler();
/**
* Wakes up the thread pool.
* Code inside the Function lambda can invoke all parallel APIs.
*
* @param work_section generic function or lambda to be executed in the scheduler's context.
*/
template<typename Function>
void perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work")
root_task<Function> master{work_section};
// Push root task on stacks
auto new_master = memory_->task_stack_for(0)->push(master);
memory_->thread_state_for(0)->root_task_ = new_master;
memory_->thread_state_for(0)->current_task_ = new_master;
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
auto new_worker = memory_->task_stack_for(0)->push(worker);
memory_->thread_state_for(i)->root_task_ = new_worker;
memory_->thread_state_for(i)->current_task_ = new_worker;
}
// Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
// Clean up stack
memory_->task_stack_for(0)->pop<typeof(master)>();
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
memory_->task_stack_for(0)->pop<typeof(worker)>();
}
}
// TODO: See if we should place this differently (only for performance reasons)
void perform_work(Function work_section);
/**
* Executes a top-level-task (children of abstract_task) on this thread.
*
* @param task The task to be executed.
* @param depth Optional: depth of the new task, otherwise set implicitly.
*/
template<typename Task>
static void execute_task(Task& task, int depth=-1) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
auto my_state = base::this_thread::state<thread_state>();
abstract_task* old_task;
abstract_task* new_task;
// Init Task
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
my_state->current_task_ = new_task;
old_task->set_child(new_task);
}
// Run Task
new_task->execute();
// Teardown state back to before the task was executed
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task->set_child(nullptr);
my_state->current_task_ = old_task;
my_state->task_stack_->pop<Task>();
}
}
static void execute_task(Task& task, int depth=-1);
static abstract_task* current_task() { return base::this_thread::state<thread_state>()->current_task_; }
void terminate(bool wait_for_workers=true);
unsigned int num_threads() const { return num_threads_; }
thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); }
};
}
}
}
#include "scheduler_impl.h"
#endif //PLS_SCHEDULER_H
#ifndef PLS_SCHEDULER_IMPL_H
#define PLS_SCHEDULER_IMPL_H
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
void scheduler::perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work")
root_task<Function> master{work_section};
// Push root task on stacks
auto new_master = memory_->task_stack_for(0)->push(master);
memory_->thread_state_for(0)->root_task_ = new_master;
memory_->thread_state_for(0)->current_task_ = new_master;
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
auto new_worker = memory_->task_stack_for(0)->push(worker);
memory_->thread_state_for(i)->root_task_ = new_worker;
memory_->thread_state_for(i)->current_task_ = new_worker;
}
// Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
// Clean up stack
memory_->task_stack_for(0)->pop<typeof(master)>();
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
memory_->task_stack_for(0)->pop<typeof(worker)>();
}
}
template<typename Task>
void scheduler::execute_task(Task& task, int depth) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
auto my_state = base::this_thread::state<thread_state>();
abstract_task* old_task;
abstract_task* new_task;
// Init Task
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
my_state->current_task_ = new_task;
old_task->set_child(new_task);
}
// Run Task
new_task->execute();
// Teardown state back to before the task was executed
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task->set_child(nullptr);
my_state->current_task_ = old_task;
my_state->task_stack_->pop<Task>();
}
}
}
}
}
#endif //PLS_SCHEDULER_IMPL_H
#include "pls/algorithms/invoke_parallel.h"
......@@ -107,6 +107,28 @@ namespace pls {
scheduler::execute_task(task, depth());
return true;
}
void fork_join_task::execute() {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
root_task_->tbb_task_ = this;
root_task_->stack_state_ = my_stack_->save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
}
fork_join_sub_task* fork_join_task::currently_executing() const { return currently_executing_; }
fork_join_task::fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id):
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr},
deque_{},
last_stolen_{nullptr} {};
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment