Commit 14cc1155 by Florian Fritz

Merge branch 'clean_up_task' into 'master'

Merge: Clean Up

See merge request !8
parents a8a35479 3bbd8951
Pipeline #1151 passed with stages
in 3 minutes 31 seconds
......@@ -4,7 +4,67 @@ A collection of stuff that we noticed during development.
Useful later on two write a project report and to go back
in time to find out why certain decisions where made.
## 09.02.2019 - Cache Alignment
## 12.04.2019 - Unique IDs
Assigning unique IDs to logical different tasks is key to the
current model of separating timing/memory guarantees for certain
parallel patterns.
We do want to assign these IDs automatic in most cases (a call to
some parallel API should not require the end user to specific a
unique ID for each different call, as this would be error prone).
Instead we want to make sure that each DIFFERENT API call is separated
automatic, while leaving the option for manual ID assignment for later
implementations of GPU offloading (tasks need to have identifiers for
this to work properly).
Our first approach was using the `__COUNTER__` macro,
but this only works in ONE COMPLIATION UNIT and is not
portable to all compilers.
As all our unique instances are copled to a class/type
we decided to implement the unique IDs using
typeid(tuple<T..>) in automatic cases (bound to a specific type)
and fully manual IDs in other types. All this is wrapped in a
helper::unique_id class.
## 11.04.2019 - Lambda Pointer Abstraction
The question is if we could use a pointer to a lambda without
needing templating (for the type of the lambda) at the place that
we use it.
We looked into different techniques to achieve this:
- Using std::function<...>
- Using custom wrappers
std::function uses dynamic memory, thus ruling it out.
All methods that we can think of involve storing a pointer
to the lambda and then calling on it later on.
This works well enough with a simple wrapper, but has one
major downside: It involves virtual function calls,
making it impossible to inline the lambda.
This property broke the technique for us in most places,
as inlining is crucial, especially in small functions like loop
iterations. See `invoke_parallel_impl.h` for an example where we
did this (wrapper with virtual function call), but we only did it there,
as the generic `fork_join_sub_task` requires the virtual call anyway,
thus making us not loose ANY performance with this technique.
## 11.04.2019 - Notes on C++ Templating
After working more with templating and talking to mike,
it seems like the common way to go is the following:
- If possible, add template arguments to
data containers only (separate from logic).
- If logic and data are coupled (like often with lambdas),
add the declaration of the interface into the normal header
some_class.h and add it's implementation into an extra implementation
file some_class_impl.h that is included at the end of the file.
## 09.04.2019 - Cache Alignment
Aligning the cache needs all parts (both data types with correct alignment
and base memory with correct alignment).
......
add_executable(playground main.cpp)
# Example for adding the library to your app (as a cmake project dependency)
target_link_libraries(playground pls)
\ No newline at end of file
target_link_libraries(playground pls)
......@@ -4,16 +4,14 @@
#include <array>
#include <atomic>
#include <memory>
#include <typeindex>
#include <tuple>
#include <pls/pls.h>
#include <pls/internal/helpers/prohibit_new.h>
#include <pls/internal/scheduling/thread_state.h>
#include <pls/internal/scheduling/root_task.h>
#include <pls/internal/helpers/unique_id.h>
using namespace pls;
int main() {
malloc_scheduler_memory sched_memory{8};
std::cout << (std::uintptr_t)sched_memory.thread_for(0) % 64 << ", " << (std::uintptr_t)sched_memory.thread_for(1) % 64 << ", " << (std::uintptr_t)sched_memory.thread_for(2) % 64 << ", " << std::endl;
std::cout << (std::uintptr_t)sched_memory.thread_state_for(0) % 64 << ", " << (std::uintptr_t)sched_memory.thread_state_for(1) % 64 << ", " << (std::uintptr_t)sched_memory.thread_state_for(2) % 64 << ", " << std::endl;
std::cout << (std::uintptr_t)sched_memory.task_stack_for(0) % 64 << ", " << (std::uintptr_t)sched_memory.task_stack_for(1) % 64 << ", " << (std::uintptr_t)sched_memory.task_stack_for(2) % 64 << ", " << std::endl;
std::cout << pls::internal::scheduling::root_task<void(*)>::create_id().type_.hash_code() << std::endl;
std::cout << pls::internal::helpers::unique_id::create<pls::internal::scheduling::root_task<void(*)>>().type_.hash_code() << std::endl;
}
......@@ -2,16 +2,19 @@
add_library(pls STATIC
include/pls/pls.h src/pls.cpp
include/pls/algorithms/invoke_parallel.h src/algorithms/invoke_parallel.cpp
include/pls/algorithms/invoke_parallel.h
include/pls/algorithms/invoke_parallel_impl.h
include/pls/internal/base/spin_lock.h src/internal/base/spin_lock.cpp
include/pls/internal/base/thread.h src/internal/base/thread.cpp
include/pls/internal/base/thread_impl.h
include/pls/internal/base/barrier.h src/internal/base/barrier.cpp
include/pls/internal/base/system_details.h
include/pls/internal/base/error_handling.h
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp
include/pls/internal/helpers/prohibit_new.h
......@@ -22,10 +25,11 @@ add_library(pls STATIC
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp
include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
)
include/pls/internal/helpers/unique_id.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......
......@@ -7,44 +7,15 @@
namespace pls {
namespace algorithm {
namespace internal {
using namespace ::pls::internal::scheduling;
template<typename Body>
inline void run_body(const Body& internal_body, const abstract_task::id& id) {
// Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
auto current_sub_task = reinterpret_cast<fork_join_task*>(current_task)->currently_executing();
internal_body(current_sub_task);
} else {
fork_join_lambda<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
template<typename Function1, typename Function2>
void invoke_parallel(const Function1& function1, const Function2& function2) {
using namespace ::pls::internal::scheduling;
static abstract_task::id id{PLS_UNIQUE_ID, true};
auto internal_body = [&] (fork_join_sub_task* this_task){
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
this_task->spawn_child(sub_task_1);
function2(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
void invoke_parallel(const Function1& function1, const Function2& function2);
internal::run_body(internal_body, id);
}
template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3);
// ...and so on, add more if we decide to keep this design
}
}
#include "invoke_parallel_impl.h"
#endif //PLS_PARALLEL_INVOKE_H
#ifndef PLS_INVOKE_PARALLEL_IMPL_H
#define PLS_INVOKE_PARALLEL_IMPL_H
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h"
namespace pls {
namespace algorithm {
namespace internal {
using namespace ::pls::internal::scheduling;
template<typename Body>
inline void run_body(const Body& internal_body, const abstract_task::id& id) {
// Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
auto current_sub_task = reinterpret_cast<fork_join_task*>(current_task)->currently_executing();
internal_body(current_sub_task);
} else {
fork_join_lambda<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
template<typename Function1, typename Function2>
void invoke_parallel(const Function1& function1, const Function2& function2) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
static abstract_task::id id = unique_id::create<Function1, Function2>();
auto internal_body = [&] (fork_join_sub_task* this_task){
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
this_task->spawn_child(sub_task_1);
function2(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
internal::run_body(internal_body, id);
}
template<typename Function1, typename Function2, typename Function3>
void invoke_parallel(const Function1& function1, const Function2& function2, const Function3& function3) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
static abstract_task::id id = unique_id::create<Function1, Function2, Function3>();
auto internal_body = [&] (fork_join_sub_task* this_task){
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
auto sub_task_body_2 = [&] (fork_join_sub_task*){ function2(); };
auto sub_task_2 = fork_join_lambda<decltype(sub_task_body_2)>(&sub_task_body_2);
this_task->spawn_child(sub_task_1);
this_task->spawn_child(sub_task_2);
function3(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
internal::run_body(internal_body, id);
}
}
}
#endif //PLS_INVOKE_PARALLEL_IMPL_H
......@@ -14,7 +14,6 @@ namespace pls {
template<typename T>
struct aligned_wrapper {
alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)];
T* pointer() { return reinterpret_cast<T*>(data); }
};
void* allocate_aligned(size_t size);
......
......@@ -18,7 +18,7 @@ namespace pls {
*/
class spin_lock {
std::atomic_flag flag_;
int yield_at_tries_;
unsigned int yield_at_tries_;
public:
......
......@@ -49,14 +49,7 @@ namespace pls {
* @return The state pointer hold for this thread.
*/
template<typename T>
static T* state() {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
return reinterpret_cast<T*>(pthread_getspecific(local_storage_key_));
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
return reinterpret_cast<T*>(local_state_);
#endif
}
static T* state();
/**
* Stores a pointer to the thread local state object.
......@@ -67,18 +60,11 @@ namespace pls {
* @param state_pointer A pointer to the threads state object.
*/
template<typename T>
static void set_state(T* state_pointer) {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
local_state_ = state_pointer;
#endif
}
static void set_state(T* state_pointer);
};
/**
* Abstraction for starting a function in a sparate thread.
* Abstraction for starting a function in a separate thread.
*
* @tparam Function Lambda being started on the new thread.
* @tparam State State type held for this thread.
......@@ -108,52 +94,13 @@ namespace pls {
// Keep handle to native implementation
pthread_t pthread_thread_;
static void* start_pthread_internal(void* thread_pointer) {
auto my_thread = reinterpret_cast<thread*>(thread_pointer);
Function my_function_copy = my_thread->function_;
State* my_state_pointer_copy = my_thread->state_pointer_;
// Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more
// references to its memory location).
my_thread->startup_flag_->clear();
this_thread::set_state(my_state_pointer_copy);
my_function_copy();
// Finished executing the user function
pthread_exit(nullptr);
}
static void* start_pthread_internal(void* thread_pointer);
public:
thread(): function_{}, state_pointer_{nullptr}, startup_flag_{nullptr}, pthread_thread_{} {}
explicit thread(const Function& function, State* state_pointer):
function_{function},
state_pointer_{state_pointer},
startup_flag_{nullptr},
pthread_thread_{} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
if (!this_thread::local_storage_key_initialized_) {
pthread_key_create(&this_thread::local_storage_key_, nullptr);
this_thread::local_storage_key_initialized_ = true;
}
#endif
explicit thread(const Function& function, State* state_pointer);
// We only need this during startup, will be destroyed when out of scope
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
startup_flag_ = &startup_flag;
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this));
while (startup_flag.test_and_set())
; // Busy waiting for the starting flag to clear
}
public:
void join() {
pthread_join(pthread_thread_, nullptr);
}
void join();
// make object move only
thread(thread&&) noexcept = default;
......@@ -164,17 +111,12 @@ namespace pls {
};
template<typename Function, typename State>
thread<Function, State> start_thread(const Function& function, State* state_pointer) {
return thread<Function, State>(function, state_pointer);
}
thread<Function, State> start_thread(const Function& function, State* state_pointer);
template<typename Function>
thread<Function, void> start_thread(const Function& function) {
return thread<Function, void>(function, nullptr);
}
thread<Function, void> start_thread(const Function& function);
}
}
}
#include "thread_impl.h"
#endif //PLS_THREAD_H
#ifndef PLS_THREAD_IMPL_H
#define PLS_THREAD_IMPL_H
namespace pls {
namespace internal {
namespace base {
template<typename T>
T* this_thread::state() {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
return reinterpret_cast<T*>(pthread_getspecific(local_storage_key_));
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
return reinterpret_cast<T*>(local_state_);
#endif
}
template<typename T>
void this_thread::set_state(T* state_pointer) {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
local_state_ = state_pointer;
#endif
}
template<typename Function, typename State>
void* thread<Function, State>::start_pthread_internal(void* thread_pointer) {
auto my_thread = reinterpret_cast<thread*>(thread_pointer);
Function my_function_copy = my_thread->function_;
State* my_state_pointer_copy = my_thread->state_pointer_;
// Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more
// references to its memory location).
my_thread->startup_flag_->clear();
this_thread::set_state(my_state_pointer_copy);
my_function_copy();
// Finished executing the user function
pthread_exit(nullptr);
}
template<typename Function, typename State>
thread<Function, State>::thread(const Function& function, State* state_pointer):
function_{function},
state_pointer_{state_pointer},
startup_flag_{nullptr},
pthread_thread_{} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
if (!this_thread::local_storage_key_initialized_) {
pthread_key_create(&this_thread::local_storage_key_, nullptr);
this_thread::local_storage_key_initialized_ = true;
}
#endif
// We only need this during startup, will be destroyed when out of scope
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
startup_flag_ = &startup_flag;
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this));
while (startup_flag.test_and_set())
; // Busy waiting for the starting flag to clear
}
template<typename Function, typename State>
void thread<Function, State>::join() {
pthread_join(pthread_thread_, nullptr);
}
template<typename Function, typename State>
thread<Function, State> start_thread(const Function& function, State* state_pointer) {
return thread<Function, State>(function, state_pointer);
}
template<typename Function>
thread<Function, void> start_thread(const Function& function) {
return thread<Function, void>(function, nullptr);
}
}
}
}
#endif //PLS_THREAD_IMPL_H
......@@ -37,40 +37,18 @@ namespace pls {
aligned_stack(char* memory_region, std::size_t size);
template<typename T>
T* push(const T& object) {
// Copy-Construct
return new ((void*)push<T>())T(object);
}
T* push(const T& object);
template<typename T>
void* push() {
void* result = reinterpret_cast<T*>(head_);
// Move head to next aligned position after new object
head_ = base::alignment::next_alignment(head_ + sizeof(T));
if (head_ >= memory_end_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
return result;
}
void* push();
template<typename T>
T pop() {
head_ = head_ - base::alignment::next_alignment(sizeof(T));
return *reinterpret_cast<T*>(head_);
}
state save_state() {
return head_;
}
T pop();
void reset_state(state new_state) {
head_ = new_state;
}
state save_state() const { return head_; }
void reset_state(state new_state) { head_ = new_state; }
};
}
}
}
#include "aligned_stack_impl.h"
#endif //PLS_ALIGNED_STACK_H
#ifndef PLS_ALIGNED_STACK_IMPL_H
#define PLS_ALIGNED_STACK_IMPL_H
namespace pls {
namespace internal {
namespace data_structures {
template<typename T>
T* aligned_stack::push(const T& object) {
// Copy-Construct
return new ((void*)push<T>())T(object);
}
template<typename T>
void* aligned_stack::push() {
void* result = reinterpret_cast<T*>(head_);
// Move head to next aligned position after new object
head_ = base::alignment::next_alignment(head_ + sizeof(T));
if (head_ >= memory_end_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
return result;
}
template<typename T>
T aligned_stack::pop() {
head_ = head_ - base::alignment::next_alignment(sizeof(T));
return *reinterpret_cast<T*>(head_);
}
}
}
}
#endif //PLS_ALIGNED_STACK_IMPL_H
......@@ -13,7 +13,7 @@ namespace pls {
namespace helpers {
// TODO: Clean up (separate into small functions and .cpp file)
template<typename Function>
void run_mini_benchmark(const Function& lambda, size_t max_threads, long max_runtime_ms=1000) {
void run_mini_benchmark(const Function& lambda, size_t max_threads, unsigned long max_runtime_ms=1000) {
using namespace std;
using namespace pls::internal::scheduling;
......
#ifndef PLS_UNIQUE_ID_H
#define PLS_UNIQUE_ID_H
#include <typeindex>
#include <tuple>
#include <stdint.h>
namespace pls {
namespace internal {
namespace helpers {
struct unique_id {
const uint32_t id_;
const std::type_info& type_;
bool operator==(const unique_id& other) const { return id_ == other.id_ && type_ == other.type_; }
static constexpr unique_id create(const uint32_t id) {
return unique_id(id, typeid(void));
}
template<typename ...T>
static constexpr unique_id create() {
return unique_id(UINT32_MAX, typeid(std::tuple<T...>));
}
private:
explicit constexpr unique_id(const uint32_t id, const std::type_info& type): id_{id}, type_{type} {};
};
}
}
}
#endif //PLS_UNIQUE_ID_H
......@@ -2,33 +2,23 @@
#ifndef PLS_ABSTRACT_TASK_H
#define PLS_ABSTRACT_TASK_H
#define PLS_UNIQUE_ID __COUNTER__
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/helpers/unique_id.h"
namespace pls {
namespace internal {
namespace scheduling {
class abstract_task {
public:
struct id {
uint32_t id_;
bool auto_generated_;
explicit id(uint32_t id, bool auto_generated=false): id_{id}, auto_generated_{auto_generated} {};
bool operator==(const abstract_task::id& other) const {
return id_ == other.id_ && auto_generated_ == other.auto_generated_;
}
};
using id = helpers::unique_id;
private:
int depth_;
unsigned int depth_;
abstract_task::id unique_id_;
abstract_task* child_task_;
public:
abstract_task(const int depth, const abstract_task::id& unique_id):
abstract_task(const unsigned int depth, const abstract_task::id& unique_id):
depth_{depth},
unique_id_{unique_id},
child_task_{nullptr} {}
......@@ -37,8 +27,8 @@ namespace pls {
void set_child(abstract_task* child_task) { child_task_ = child_task; }
abstract_task* child() { return child_task_; }
void set_depth(int depth) { depth_ = depth; }
int depth() const { return depth_; }
void set_depth(unsigned int depth) { depth_ = depth; }
unsigned int depth() const { return depth_; }
id unique_id() const { return unique_id_; }
protected:
virtual bool internal_stealing(abstract_task* other_task) = 0;
......
......@@ -77,27 +77,9 @@ namespace pls {
bool split_task(base::spin_lock* /*lock*/) override;
public:
explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id):
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr},
deque_{},
last_stolen_{nullptr} {};
void execute() override {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
root_task_->tbb_task_ = this;
root_task_->stack_state_ = my_stack_->save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
}
fork_join_sub_task* currently_executing() const { return currently_executing_; }
explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id);
void execute() override;
fork_join_sub_task* currently_executing() const;
};
template<typename T>
......
......@@ -17,12 +17,14 @@ namespace pls {
Function function_;
std::atomic_uint8_t finished_;
public:
static constexpr auto create_id = helpers::unique_id::create<root_task<Function>>;
explicit root_task(Function function):
abstract_task{0, id{0}},
abstract_task{0, create_id()},
function_{function},
finished_{0} {}
root_task(const root_task& other):
abstract_task{0, id{0}},
abstract_task{0, create_id()},
function_{other.function_},
finished_{0} {}
......@@ -50,8 +52,10 @@ namespace pls {
root_task<Function>* master_task_;
public:
static constexpr auto create_id = root_task<Function>::create_id;
explicit root_worker_task(root_task<Function>* master_task):
abstract_task{0, id{0}},
abstract_task{0, create_id()},
master_task_{master_task} {}
void execute() override {
......
......@@ -36,8 +36,10 @@ namespace pls {
return counter;
}
public:
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task<Function>>;
run_on_n_threads_task(Function function, int num_threads):
abstract_task{0, id{PLS_UNIQUE_ID, true}},
abstract_task{0, create_id()},
function_{function},
counter{num_threads - 1} {}
......@@ -65,8 +67,10 @@ namespace pls {
Function function_;
run_on_n_threads_task<Function>* root_;
public:
static constexpr auto create_id = helpers::unique_id::create<run_on_n_threads_task_worker<Function>>;
run_on_n_threads_task_worker(Function function, run_on_n_threads_task<Function>* root):
abstract_task{0, id{PLS_UNIQUE_ID, true}},
abstract_task{0, create_id()},
function_{function},
root_{root} {}
......
......@@ -34,76 +34,34 @@ namespace pls {
explicit scheduler(scheduler_memory* memory, unsigned int num_threads);
~scheduler();
/**
* Wakes up the thread pool.
* Code inside the Function lambda can invoke all parallel APIs.
*
* @param work_section generic function or lambda to be executed in the scheduler's context.
*/
template<typename Function>
void perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work")
root_task<Function> master{work_section};
// Push root task on stacks
auto new_master = memory_->task_stack_for(0)->push(master);
memory_->thread_state_for(0)->root_task_ = new_master;
memory_->thread_state_for(0)->current_task_ = new_master;
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
auto new_worker = memory_->task_stack_for(0)->push(worker);
memory_->thread_state_for(i)->root_task_ = new_worker;
memory_->thread_state_for(i)->current_task_ = new_worker;
}
// Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
// Clean up stack
memory_->task_stack_for(0)->pop<typeof(master)>();
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
memory_->task_stack_for(0)->pop<typeof(worker)>();
}
}
// TODO: See if we should place this differently (only for performance reasons)
void perform_work(Function work_section);
/**
* Executes a top-level-task (children of abstract_task) on this thread.
*
* @param task The task to be executed.
* @param depth Optional: depth of the new task, otherwise set implicitly.
*/
template<typename Task>
static void execute_task(Task& task, int depth=-1) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
auto my_state = base::this_thread::state<thread_state>();
abstract_task* old_task;
abstract_task* new_task;
// Init Task
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
my_state->current_task_ = new_task;
old_task->set_child(new_task);
}
// Run Task
new_task->execute();
// Teardown state back to before the task was executed
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task->set_child(nullptr);
my_state->current_task_ = old_task;
my_state->task_stack_->pop<Task>();
}
}
static void execute_task(Task& task, int depth=-1);
static abstract_task* current_task() { return base::this_thread::state<thread_state>()->current_task_; }
void terminate(bool wait_for_workers=true);
unsigned int num_threads() const { return num_threads_; }
thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); }
};
}
}
}
#include "scheduler_impl.h"
#endif //PLS_SCHEDULER_H
#ifndef PLS_SCHEDULER_IMPL_H
#define PLS_SCHEDULER_IMPL_H
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
void scheduler::perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work")
root_task<Function> master{work_section};
// Push root task on stacks
auto new_master = memory_->task_stack_for(0)->push(master);
memory_->thread_state_for(0)->root_task_ = new_master;
memory_->thread_state_for(0)->current_task_ = new_master;
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
auto new_worker = memory_->task_stack_for(0)->push(worker);
memory_->thread_state_for(i)->root_task_ = new_worker;
memory_->thread_state_for(i)->current_task_ = new_worker;
}
// Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
// Clean up stack
memory_->task_stack_for(0)->pop<typeof(master)>();
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
memory_->task_stack_for(0)->pop<typeof(worker)>();
}
}
template<typename Task>
void scheduler::execute_task(Task& task, int depth) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
auto my_state = base::this_thread::state<thread_state>();
abstract_task* old_task;
abstract_task* new_task;
// Init Task
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
my_state->current_task_ = new_task;
old_task->set_child(new_task);
}
// Run Task
new_task->execute();
// Teardown state back to before the task was executed
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task->set_child(nullptr);
my_state->current_task_ = old_task;
my_state->task_stack_->pop<Task>();
}
}
}
}
}
#endif //PLS_SCHEDULER_IMPL_H
......@@ -5,6 +5,7 @@
#include "pls/internal/scheduling/abstract_task.h"
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h"
namespace pls {
using internal::scheduling::static_scheduler_memory;
......@@ -13,6 +14,8 @@ namespace pls {
using internal::scheduling::scheduler;
using task_id = internal::scheduling::abstract_task::id;
using unique_id = internal::helpers::unique_id;
using internal::scheduling::fork_join_sub_task;
using internal::scheduling::fork_join_task;
......
#include "pls/algorithms/invoke_parallel.h"
......@@ -33,6 +33,7 @@ namespace pls {
}
PROFILE_END_BLOCK
// Try to steal 'internal', e.g. for_join_sub_tasks in a fork_join_task constellation
PROFILE_STEALING("Internal Steal")
if (current_task != nullptr) {
// See if it equals our type and depth of task
......@@ -52,7 +53,7 @@ namespace pls {
// Execute 'top level task steal' if possible
// (only try deeper tasks to keep depth restricted stealing)
// (only try deeper tasks to keep depth restricted stealing).
PROFILE_STEALING("Top Level Steal")
while (current_task != nullptr) {
auto lock = &target_state->lock_;
......
......@@ -107,6 +107,28 @@ namespace pls {
scheduler::execute_task(task, depth());
return true;
}
void fork_join_task::execute() {
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
root_task_->tbb_task_ = this;
root_task_->stack_state_ = my_stack_->save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
}
fork_join_sub_task* fork_join_task::currently_executing() const { return currently_executing_; }
fork_join_task::fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id):
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr},
deque_{},
last_stolen_{nullptr} {};
}
}
}
......@@ -58,7 +58,7 @@ TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_t
my_scheduler.perform_work([&] (){
once_sub_task sub_task{&counter, start_counter};
fork_join_task task{&sub_task, task_id{42}};
fork_join_task task{&sub_task, unique_id::create(42)};
scheduler::execute_task(task);
});
......@@ -71,7 +71,7 @@ TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_t
my_scheduler.perform_work([&] (){
std::atomic<int> dummy_parent{1}, overall_counter{8};
force_steal_sub_task sub_task{&dummy_parent, &overall_counter};
fork_join_task task{&sub_task, task_id{42}};
fork_join_task task{&sub_task, unique_id::create(42)};
scheduler::execute_task(task);
});
my_scheduler.terminate(true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment