Commit ea24f5c2 by Florian Fritz

Merge branch 'tbb_like_stealing' into 'master'

Merge: TBB like task stealing

See merge request !3
parents 251e228d 4da92c52
Pipeline #1120 passed with stages
in 3 minutes 16 seconds
...@@ -4,14 +4,68 @@ A collection of stuff that we noticed during development. ...@@ -4,14 +4,68 @@ A collection of stuff that we noticed during development.
Useful later on two write a project report and to go back Useful later on two write a project report and to go back
in time to find out why certain decisions where made. in time to find out why certain decisions where made.
## 21.03.2018 - Allocation on stack/static memory ## 28.03.2018 - custom new operators
When initializing sub_tasks we want to place them on our custom
'stack like' data structure per thread. We looked at TBB's API
and noticed them somehow implicitly setting parent relationships
in the new operator. After further investigation we see that the
initialization in this manner is a 'hack' to avoid passing
of references and counters.
It can be found at the bottom of the `task.h` file:
```C++
inline void *operator new( size_t bytes, const tbb::internal::allocate_child_proxy& p ) {
return &p.allocate(bytes);
}
inline void operator delete( void* task, const tbb::internal::allocate_child_proxy& p ) {
p.free( *static_cast<tbb::task*>(task) );
}
```
It simlpy constructs a temp 'allocator type' passed as the second
argument to new. This type then is called in new and
allocates the memory required.
## 27.03.2019 - atomics
C++ 11 offers atomics, however these require careful usage
and are not always lock free. We plan on doing more research
for these operations when we try to transform our code form using
spin locks to using more fine grained locks.
Resources can be found [here](https://www.justsoftwaresolutions.co.uk/files/ndc_oslo_2016_safety_off.pdf)
and [here](http://www.modernescpp.com/index.php/c-core-guidelines-the-remaining-rules-to-lock-free-programming).
## 27.03.2019 - variable sized lambdas
When working with lambdas one faces the problem of them having not
a fixed size because they can capture variables from the surrounding
scope.
To 'fix' this in normal C++ one would use a std::function,
wrapping the lambda by moving it onto the heap. This is of course
a problem when trying to prevent dynamic memory allocation.
When we want static allocation we have two options:
1) keep the lambda on the stack and only call into it while it is valid
2) use templating to create variable sized classes for each lambda used
Option 1) is preferable, as it does not create extra templating code
(longer compile time, can not separate code into CPP files). However
we can encounter situations where the lambda is not on the stack when
used, especially when working with sub-tasks.
## 21.03.2019 - Allocation on stack/static memory
We can use the [placement new](https://www.geeksforgeeks.org/placement-new-operator-cpp/) We can use the [placement new](https://www.geeksforgeeks.org/placement-new-operator-cpp/)
operator for our tasks and other stuff to manage memory. operator for our tasks and other stuff to manage memory.
This can allow the pure 'stack based' approach without any memory This can allow the pure 'stack based' approach without any memory
management suggested by mike. management suggested by mike.
## 20.03.2018 - Prohibit New ## 20.03.2019 - Prohibit New
We want to write this library without using any runtime memory We want to write this library without using any runtime memory
allocation to better fit the needs of the embedded marked. allocation to better fit the needs of the embedded marked.
...@@ -24,7 +78,7 @@ by using a new implementation with a break point in it. ...@@ -24,7 +78,7 @@ by using a new implementation with a break point in it.
That way we for example ruled out std::thread, as we found the dynamic That way we for example ruled out std::thread, as we found the dynamic
memory allocation used in it. memory allocation used in it.
## 20.03.2018 - callable objects and memory allocation / why we use no std::thread ## 20.03.2019 - callable objects and memory allocation / why we use no std::thread
When working with any sort of functionality that can be passed When working with any sort of functionality that can be passed
to an object or function it is usually passed as: to an object or function it is usually passed as:
......
// Headers are available because we added the pls target // Headers are available because we added the pls target
#include <pls/library.h> #include <iostream>
#include <functional>
#include <array>
#include <atomic>
#include <pls/pls.h>
using namespace pls;
// Example for static memory allocation (no malloc or free required)
static static_scheduler_memory<8, 2 << 12> my_scheduler_memory;
class fib: public tbb_sub_task {
static constexpr int CUTOFF = 20;
int num_;
int* result_;
public:
fib(int num, int* result): num_{num}, result_{result} {}
private:
static int fib_serial(int num) {
if (num == 0) {
return 0;
}
if (num == 1) {
return 1;
}
return fib_serial(num - 1) + fib_serial(num - 2);
}
protected:
void execute_internal() override {
if (num_ <= CUTOFF) {
*result_ = fib_serial(num_);
return;
}
int left_result;
int right_result;
fib left_child{num_ - 1, &left_result};
spawn_child(left_child);
fib right_child{num_ - 2, &right_result};
spawn_child(right_child);
wait_for_all();
*result_ = left_result + right_result;
}
public:
void test() override {
std::cout << "Test Override" << std::endl;
}
};
int main() { int main() {
// All interfaces are scoped in the pls namespace... scheduler my_scheduler{&my_scheduler_memory, 1};
// ...explicitly name it...
pls::hello(); auto start = std::chrono::high_resolution_clock::now();
// ...or use the namespace... my_scheduler.perform_work([] (){
using namespace pls; int result;
hello();
fib fib_sub_task{45, &result};
tbb_task tbb_task{&fib_sub_task};
scheduler::execute_task(tbb_task);
std::cout << "Result: " << result << std::endl;
});
auto end = std::chrono::high_resolution_clock::now();
long time = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "Startup time in us: " << time << std::endl;
} }
# List all required files here (cmake best practice to NOT automate this step!) # List all required files here (cmake best practice to NOT automate this step!)
add_library(pls STATIC add_library(pls STATIC
src/library.cpp include/pls/library.h src/pls.cpp include/pls/pls.h
src/internal/base/spin_lock.cpp include/pls/internal/base/spin_lock.h src/internal/base/spin_lock.cpp include/pls/internal/base/spin_lock.h
src/internal/base/thread.cpp include/pls/internal/base/thread.h src/internal/base/thread.cpp include/pls/internal/base/thread.h
include/pls/internal/base/prohibit_new.h include/pls/internal/base/prohibit_new.h
...@@ -8,10 +8,12 @@ add_library(pls STATIC ...@@ -8,10 +8,12 @@ add_library(pls STATIC
src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h
src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h
src/internal/base/barrier.cpp include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/barrier.h
src/internal/scheduling/root_master_task.cpp include/pls/internal/scheduling/root_master_task.h src/internal/scheduling/root_task.cpp include/pls/internal/scheduling/root_task.h
src/internal/base/aligned_stack.cpp include/pls/internal/base/aligned_stack.h src/internal/base/aligned_stack.cpp include/pls/internal/base/aligned_stack.h
include/pls/internal/base/system_details.h include/pls/internal/base/system_details.h
src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h) src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h
src/internal/scheduling/tbb_task.cpp include/pls/internal/scheduling/tbb_task.h
src/internal/base/deque.cpp include/pls/internal/base/deque.h)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
......
...@@ -19,6 +19,8 @@ namespace pls { ...@@ -19,6 +19,8 @@ namespace pls {
static std::uintptr_t next_alignment(std::uintptr_t size); static std::uintptr_t next_alignment(std::uintptr_t size);
static char* next_alignment(char* pointer); static char* next_alignment(char* pointer);
public: public:
typedef char* state;
aligned_stack(): memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {}; aligned_stack(): memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {};
aligned_stack(char* memory_region, const std::size_t size): aligned_stack(char* memory_region, const std::size_t size):
...@@ -27,7 +29,13 @@ namespace pls { ...@@ -27,7 +29,13 @@ namespace pls {
head_{next_alignment(memory_start_)} {} head_{next_alignment(memory_start_)} {}
template<typename T> template<typename T>
T* push(T object) { T* push(const T& object) {
// Copy-Construct into desired memory location
return new (push<T>())T(object);
}
template<typename T>
T* push() {
T* result = reinterpret_cast<T*>(head_); T* result = reinterpret_cast<T*>(head_);
// Move head to next aligned position after new object // Move head to next aligned position after new object
...@@ -36,7 +44,6 @@ namespace pls { ...@@ -36,7 +44,6 @@ namespace pls {
exit(1); // TODO: Exception Handling exit(1); // TODO: Exception Handling
} }
*result = object;
return result; return result;
} }
...@@ -46,6 +53,14 @@ namespace pls { ...@@ -46,6 +53,14 @@ namespace pls {
return *reinterpret_cast<T*>(head_); return *reinterpret_cast<T*>(head_);
} }
state save_state() {
return head_;
}
void reset_state(state new_state) {
head_ = new_state;
}
}; };
} }
} }
......
#ifndef PLS_DEQUE_H
#define PLS_DEQUE_H
#include "spin_lock.h"
namespace pls {
namespace internal {
namespace base {
class deque_item {
friend class deque_internal;
deque_item* prev_;
deque_item* next_;
};
class deque_internal {
protected:
deque_item* head_;
deque_item* tail_;
spin_lock lock_;
deque_item* pop_head_internal();
deque_item* pop_tail_internal();
void push_tail_internal(deque_item *new_item);
};
template<typename Item>
class deque: deque_internal {
public:
explicit deque(): deque_internal{} {}
inline Item* pop_head() {
return static_cast<Item*>(pop_head_internal());
}
inline Item* pop_tail() {
return static_cast<Item*>(pop_tail_internal());
}
inline void push_tail(Item* new_item) {
push_tail_internal(new_item);
}
};
}
}
}
#endif //PLS_DEQUE_H
...@@ -17,9 +17,7 @@ namespace pls { ...@@ -17,9 +17,7 @@ namespace pls {
public: public:
spin_lock(): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {}; spin_lock(): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {};
spin_lock(const spin_lock& other): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} { spin_lock(const spin_lock& other): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {}
std::cout << "Spinlock Moved!" << std::endl;
}
void lock(); void lock();
void unlock(); void unlock();
......
...@@ -28,7 +28,7 @@ namespace pls { ...@@ -28,7 +28,7 @@ namespace pls {
int depth() { return depth_; } int depth() { return depth_; }
protected: protected:
virtual bool internal_stealing(abstract_task* other_task) = 0; virtual bool internal_stealing(abstract_task* other_task) = 0;
virtual bool split_task() = 0; virtual bool split_task(base::spin_lock* lock) = 0;
bool steal_work(); bool steal_work();
}; };
......
...@@ -11,14 +11,14 @@ namespace pls { ...@@ -11,14 +11,14 @@ namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
template<typename Function> template<typename Function>
class root_master_task : public abstract_task { class root_task : public abstract_task {
Function function_; Function function_;
bool finished_; bool finished_;
// Improvement: Remove lock and replace by atomic variable (performance) // Improvement: Remove lock and replace by atomic variable (performance)
base::spin_lock finished_lock_; base::spin_lock finished_lock_;
public: public:
explicit root_master_task(Function function): explicit root_task(Function function):
abstract_task{0, 0}, abstract_task{0, 0},
function_{function}, function_{function},
finished_{false} {} finished_{false} {}
...@@ -40,7 +40,31 @@ namespace pls { ...@@ -40,7 +40,31 @@ namespace pls {
return false; return false;
} }
bool split_task() override { bool split_task(base::spin_lock* /*lock*/) override {
return false;
}
};
template<typename Function>
class root_worker_task : public abstract_task {
root_task<Function>* master_task_;
public:
explicit root_worker_task(root_task<Function>* master_task):
abstract_task{0, 0},
master_task_{master_task} {}
void execute() override {
do {
steal_work();
} while (!master_task_->finished());
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task(base::spin_lock* /*lock*/) override {
return false; return false;
} }
}; };
......
#ifndef PLS_ROOT_WORKER_TASK_H
#define PLS_ROOT_WORKER_TASK_H
#include "root_master_task.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class root_worker_task : public abstract_task {
root_master_task<Function>* master_task_;
public:
explicit root_worker_task(root_master_task<Function>* master_task):
abstract_task{0, 0},
master_task_{master_task} {}
void execute() override {
do {
steal_work();
} while (!master_task_->finished());
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task() override {
return false;
}
};
}
}
}
#endif //PLS_ROOT_WORKER_TASK_H
...@@ -57,7 +57,7 @@ namespace pls { ...@@ -57,7 +57,7 @@ namespace pls {
return false; return false;
} }
bool split_task() override; bool split_task(base::spin_lock* lock) override;
}; };
template<typename Function> template<typename Function>
...@@ -83,16 +83,19 @@ namespace pls { ...@@ -83,16 +83,19 @@ namespace pls {
return false; return false;
} }
bool split_task() override { bool split_task(base::spin_lock* /*lock*/) override {
return false; return false;
} }
}; };
template<typename Function> template<typename Function>
bool run_on_n_threads_task<Function>::split_task() { bool run_on_n_threads_task<Function>::split_task(base::spin_lock* lock) {
if (get_counter() <= 0) { if (get_counter() <= 0) {
return false; return false;
} }
// In success case, unlock.
// TODO: this locking is complicated and error prone.
lock->unlock();
auto scheduler = base::this_thread::state<thread_state>()->scheduler_; auto scheduler = base::this_thread::state<thread_state>()->scheduler_;
auto task = run_on_n_threads_task_worker<Function>{function_, this}; auto task = run_on_n_threads_task_worker<Function>{function_, this};
......
...@@ -10,8 +10,7 @@ ...@@ -10,8 +10,7 @@
#include "pls/internal/base/barrier.h" #include "pls/internal/base/barrier.h"
#include "thread_state.h" #include "thread_state.h"
#include "root_master_task.h" #include "root_task.h"
#include "root_worker_task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -65,46 +64,62 @@ namespace pls { ...@@ -65,46 +64,62 @@ namespace pls {
template<typename Function> template<typename Function>
void perform_work(Function work_section) { void perform_work(Function work_section) {
root_master_task<Function> master{work_section}; root_task<Function> master{work_section};
root_worker_task<Function> worker{&master};
// Push root task on stacks // Push root task on stacks
memory_->thread_state_for(0)->root_task_ = &master; auto new_master = memory_->task_stack_for(0)->push(master);
memory_->thread_state_for(0)->current_task_ = &master; memory_->thread_state_for(0)->root_task_ = new_master;
memory_->thread_state_for(0)->current_task_ = new_master;
for (unsigned int i = 1; i < num_threads_; i++) { for (unsigned int i = 1; i < num_threads_; i++) {
memory_->thread_state_for(i)->root_task_ = &worker; root_worker_task<Function> worker{new_master};
memory_->thread_state_for(i)->current_task_ = &worker; auto new_worker = memory_->task_stack_for(0)->push(worker);
memory_->thread_state_for(i)->root_task_ = new_worker;
memory_->thread_state_for(i)->current_task_ = new_worker;
} }
// Perform and wait for work // Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish sync_barrier_.wait(); // Wait for threads to finish
// Clean up stack
memory_->task_stack_for(0)->pop<typeof(master)>();
for (unsigned int i = 1; i < num_threads_; i++) {
root_worker_task<Function> worker{new_master};
memory_->task_stack_for(0)->pop<typeof(worker)>();
}
} }
// TODO: See if we should place this differently (only for performance reasons) // TODO: See if we should place this differently (only for performance reasons)
template<typename Task> template<typename Task>
static void execute_task(Task task, int depth=-1) { static void execute_task(Task& task, int depth=-1) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!"); static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
auto my_state = base::this_thread::state<thread_state>(); auto my_state = base::this_thread::state<thread_state>();
auto current_task = my_state->current_task_; abstract_task* old_task;
abstract_task* new_task;
// Init Task // Init Task
{ {
std::lock_guard<base::spin_lock> lock{my_state->lock_}; std::lock_guard<base::spin_lock> lock{my_state->lock_};
task.set_depth(depth >= 0 ? depth : current_task->depth() + 1); old_task = my_state->current_task_;
my_state->current_task_ = &task; new_task = my_state->task_stack_->push(task);
current_task->set_child(&task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
my_state->current_task_ = new_task;
old_task->set_child(new_task);
} }
// Run Task // Run Task
task.execute(); new_task->execute();
// Teardown state back to before the task was executed // Teardown state back to before the task was executed
{ {
std::lock_guard<base::spin_lock> lock{my_state->lock_}; std::lock_guard<base::spin_lock> lock{my_state->lock_};
current_task->set_child(nullptr);
my_state->current_task_ = current_task; old_task->set_child(nullptr);
my_state->current_task_ = old_task;
my_state->task_stack_->pop<Task>();
} }
} }
......
#ifndef PLS_TBB_LIKE_TASK_H
#define PLS_TBB_LIKE_TASK_H
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/base/deque.h"
#include "abstract_task.h"
#include "thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
class tbb_task;
class tbb_sub_task: public base::deque_item {
friend class tbb_task;
// Coordinate finishing of sub_tasks
std::atomic_uint32_t ref_count_;
tbb_sub_task* parent_;
// Access to TBB scheduling environment
tbb_task* tbb_task_;
// Stack Management (reset stack pointer after wait_for_all() calls)
base::aligned_stack::state stack_state_;
protected:
explicit tbb_sub_task();
tbb_sub_task(const tbb_sub_task& other);
virtual void execute_internal() = 0;
// SubClass Implementations:
// Do Work
// |-- Spawn Sub Task (new subtask; spawn(subtask);)
// |-- Spawn Sub task
// Do Work
// |-- Wait For All
// Do Work
// |-- Spawn Sub Task
template<typename T>
void spawn_child(const T& sub_task);
void wait_for_all();
private:
void spawn_child_internal(tbb_sub_task* sub_task);
void execute();
public:
virtual void test() {
std::cout << "Test" << std::endl;
}
};
class tbb_task: public abstract_task {
friend class tbb_sub_task;
tbb_sub_task* root_task_;
base::aligned_stack* my_stack_;
// Double-Ended Queue management
base::deque<tbb_sub_task> deque_;
// Steal Management
tbb_sub_task* last_stolen_;
tbb_sub_task* get_local_sub_task();
tbb_sub_task* get_stolen_sub_task();
bool internal_stealing(abstract_task* other_task) override;
bool split_task(base::spin_lock* /*lock*/) override;
public:
explicit tbb_task(tbb_sub_task* root_task):
abstract_task{0, 0},
root_task_{root_task},
deque_{},
last_stolen_{nullptr} {};
void execute() override {
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
root_task_->tbb_task_ = this;
root_task_->stack_state_ = my_stack_->save_state();
// Execute it on our OS thread until its finished
root_task_->execute();
}
};
template<typename T>
void tbb_sub_task::spawn_child(const T& task) {
static_assert(std::is_base_of<tbb_sub_task, T>::value, "Only pass tbb_sub_task subclasses!");
T* new_task = tbb_task_->my_stack_->push(task);
spawn_child_internal(new_task);
}
}
}
}
#endif //PLS_TBB_LIKE_TASK_H
#ifndef PLS_LIBRARY_H #ifndef PLS_LIBRARY_H
#define PLS_LIBRARY_H #define PLS_LIBRARY_H
#include <pls/internal/scheduling/scheduler.h>
#include <pls/internal/scheduling/tbb_task.h>
namespace pls { namespace pls {
void hello(); using internal::scheduling::scheduler;
int test_adder(int a, int b); using internal::scheduling::static_scheduler_memory;
using internal::scheduling::tbb_sub_task;
using internal::scheduling::tbb_task;
} }
#endif #endif
#include <mutex>
#include "pls/internal/base/deque.h"
namespace pls {
namespace internal {
namespace base {
deque_item* deque_internal::pop_head_internal() {
std::lock_guard<spin_lock> lock{lock_};
if (head_ == nullptr) {
return nullptr;
}
deque_item* result = head_;
head_ = head_->prev_;
if (head_ == nullptr) {
tail_ = nullptr;
} else {
head_->next_ = nullptr;
}
return result;
}
deque_item* deque_internal::pop_tail_internal() {
std::lock_guard<spin_lock> lock{lock_};
if (tail_ == nullptr) {
return nullptr;
}
deque_item* result = tail_;
tail_ = tail_->next_;
if (tail_ == nullptr) {
head_ = nullptr;
} else {
tail_->prev_ = nullptr;
}
return result;
}
void deque_internal::push_tail_internal(deque_item *new_item) {
std::lock_guard<spin_lock> lock{lock_};
if (tail_ != nullptr) {
tail_->prev_ = new_item;
} else {
head_ = new_item;
}
new_item->next_ = tail_;
new_item->prev_ = nullptr;
tail_ = new_item;
}
}
}
}
...@@ -8,18 +8,16 @@ namespace pls { ...@@ -8,18 +8,16 @@ namespace pls {
// also act as a strict memory fence. // also act as a strict memory fence.
void spin_lock::lock() { void spin_lock::lock() {
int tries = 0; int tries = 0;
while (flag_.test_and_set(std::memory_order_acquire)) { while (flag_.test_and_set(std::memory_order_seq_cst)) {
tries++; tries++;
if (tries % yield_at_tries_ == 0) { if (tries % yield_at_tries_ == 0) {
this_thread::yield(); this_thread::yield();
} }
} }
std::atomic_thread_fence(std::memory_order_seq_cst);
} }
void spin_lock::unlock() { void spin_lock::unlock() {
flag_.clear(std::memory_order_release); flag_.clear(std::memory_order_seq_cst);
std::atomic_thread_fence(std::memory_order_seq_cst);
} }
} }
} }
......
...@@ -13,7 +13,9 @@ namespace pls { ...@@ -13,7 +13,9 @@ namespace pls {
for (size_t i = 1; i < my_scheduler->num_threads(); i++) { for (size_t i = 1; i < my_scheduler->num_threads(); i++) {
size_t target = (my_id + i) % my_scheduler->num_threads(); size_t target = (my_id + i) % my_scheduler->num_threads();
auto target_state = my_scheduler->thread_state_for(target); auto target_state = my_scheduler->thread_state_for(target);
std::lock_guard<base::spin_lock> lock{target_state->lock_};
// TODO: Cleaner Locking Using std::guarded_lock
target_state->lock_.lock();
// Dig down to our level // Dig down to our level
abstract_task* current_task = target_state->root_task_; abstract_task* current_task = target_state->root_task_;
...@@ -27,6 +29,7 @@ namespace pls { ...@@ -27,6 +29,7 @@ namespace pls {
current_task->depth_ == depth_) { current_task->depth_ == depth_) {
if (internal_stealing(current_task)) { if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler // internal steal was a success, hand it back to the internal scheduler
target_state->lock_.unlock();
return true; return true;
} }
...@@ -39,13 +42,15 @@ namespace pls { ...@@ -39,13 +42,15 @@ namespace pls {
// Execute 'top level task steal' if possible // Execute 'top level task steal' if possible
// (only try deeper tasks to keep depth restricted stealing) // (only try deeper tasks to keep depth restricted stealing)
while (current_task != nullptr) { while (current_task != nullptr) {
if (current_task->split_task()) { auto lock = &target_state->lock_;
if (current_task->split_task(lock)) {
// internal steal was no success (we did a top level task steal) // internal steal was no success (we did a top level task steal)
return false; return false;
} }
current_task = current_task->child_task_; current_task = current_task->child_task_;
} }
target_state->lock_.unlock();
} }
// internal steal was no success // internal steal was no success
......
#include "pls/internal/scheduling/root_master_task.h" #include "pls/internal/scheduling/root_task.h"
#include "pls/internal/scheduling/root_worker_task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
......
#include "pls/internal/scheduling/root_master_task.h"
#include "pls/internal/scheduling/root_worker_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/tbb_task.h"
namespace pls {
namespace internal {
namespace scheduling {
tbb_sub_task::tbb_sub_task():
base::deque_item{},
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
tbb_sub_task::tbb_sub_task(const tbb_sub_task& other): base::deque_item(other) {
// Do Nothing, will be inited after this anyways
}
void tbb_sub_task::execute() {
execute_internal();
wait_for_all();
if (parent_ != nullptr) {
parent_->ref_count_--;
}
}
void tbb_sub_task::spawn_child_internal(tbb_sub_task* sub_task) {
// Keep our refcount up to date
ref_count_++;
// Assign forced values
sub_task->parent_ = this;
sub_task->tbb_task_ = tbb_task_;
sub_task->stack_state_ = tbb_task_->my_stack_->save_state();
tbb_task_->deque_.push_tail(sub_task);
}
void tbb_sub_task::wait_for_all() {
while (ref_count_ > 0) {
tbb_sub_task* local_task = tbb_task_->get_local_sub_task();
if (local_task != nullptr) {
local_task->execute();
} else {
// Try to steal work.
// External steal will be executed implicitly if success
if (tbb_task_->steal_work()) {
tbb_task_->last_stolen_->execute();
}
}
}
tbb_task_->my_stack_->reset_state(stack_state_);
}
tbb_sub_task* tbb_task::get_local_sub_task() {
return deque_.pop_tail();
}
tbb_sub_task* tbb_task::get_stolen_sub_task() {
return deque_.pop_head();
}
bool tbb_task::internal_stealing(abstract_task* other_task) {
auto cast_other_task = reinterpret_cast<tbb_task*>(other_task);
auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
} else {
// Make sub-task belong to our tbb_task instance
stolen_sub_task->tbb_task_ = this;
stolen_sub_task->stack_state_ = my_stack_->save_state();
// We will execute this next without explicitly moving it onto our stack storage
last_stolen_ = stolen_sub_task;
return true;
}
}
bool tbb_task::split_task(base::spin_lock* lock) {
tbb_sub_task* stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
}
tbb_task task{stolen_sub_task};
// In success case, unlock.
// TODO: this locking is complicated and error prone.
lock->unlock();
scheduler::execute_task(task, depth());
return true;
}
}
}
}
#include "pls/library.h"
#include <iostream>
namespace pls {
void hello() {
std::cout << "Hello from PLS!" << std::endl;
}
int test_adder(int a, int b) {
return a + b;
}
}
\ No newline at end of file
#include "pls/pls.h"
namespace pls {
}
add_executable(tests add_executable(tests
main.cpp main.cpp
example_tests.cpp base_tests.cpp scheduling_tests.cpp)
base_tests.cpp)
target_link_libraries(tests catch2 pls) target_link_libraries(tests catch2 pls)
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <vector> #include <vector>
#include <mutex> #include <mutex>
#include <pls/internal/base/deque.h>
using namespace pls::internal::base; using namespace pls::internal::base;
using namespace std; using namespace std;
...@@ -147,3 +148,59 @@ TEST_CASE( "aligned stack stores objects correctly", "[internal/base/aligned_sta ...@@ -147,3 +148,59 @@ TEST_CASE( "aligned stack stores objects correctly", "[internal/base/aligned_sta
REQUIRE(pointer_five == pointer_three); REQUIRE(pointer_five == pointer_three);
} }
} }
TEST_CASE( "deque stores objects correctly", "[internal/base/deque.h]") {
class my_item: public deque_item {
};
deque<my_item> deque;
my_item one, two, three;
SECTION( "add and remove items form the tail" ) {
deque.push_tail(&one);
deque.push_tail(&two);
deque.push_tail(&three);
REQUIRE(deque.pop_tail() == &three);
REQUIRE(deque.pop_tail() == &two);
REQUIRE(deque.pop_tail() == &one);
}
SECTION( "handles getting empty by popping the tail correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_tail() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_tail() == &two);
}
SECTION( "remove items form the head" ) {
deque.push_tail(&one);
deque.push_tail(&two);
deque.push_tail(&three);
REQUIRE(deque.pop_head() == &one);
REQUIRE(deque.pop_head() == &two);
REQUIRE(deque.pop_head() == &three);
}
SECTION( "handles getting empty by popping the head correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_head() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_head() == &two);
}
SECTION( "handles getting empty by popping the head and tail correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_tail() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_head() == &two);
deque.push_tail(&three);
REQUIRE(deque.pop_tail() == &three);
}
}
#include <catch.hpp>
#include <pls/library.h>
// BDD style
SCENARIO( "the test_adder works", "[library]" ) {
GIVEN ("two numbers") {
WHEN ( "they are positive") {
THEN ( "the result is positive") {
REQUIRE(pls::test_adder(1, 2) >= 0);
}
THEN ( "the result is the sum of both") {
REQUIRE(pls::test_adder(1, 4) == 5);
}
}
}
}
// Normal Style
TEST_CASE( "the test_adder can sumup", "[libray]") {
}
\ No newline at end of file
#include <catch.hpp>
#include <pls/pls.h>
using namespace pls;
class once_sub_task: public tbb_sub_task {
std::atomic<int>* counter_;
int children_;
protected:
void execute_internal() override {
(*counter_)++;
for (int i = 0; i < children_; i++) {
spawn_child(once_sub_task(counter_, children_ - 1));
}
}
public:
explicit once_sub_task(std::atomic<int>* counter, int children):
tbb_sub_task(),
counter_{counter},
children_{children} {}
};
class force_steal_sub_task: public tbb_sub_task {
std::atomic<int>* parent_counter_;
std::atomic<int>* overall_counter_;
protected:
void execute_internal() override {
(*overall_counter_)--;
if (overall_counter_->load() > 0) {
std::atomic<int> counter{1};
spawn_child(force_steal_sub_task(&counter, overall_counter_));
while (counter.load() > 0)
; // Spin...
}
(*parent_counter_)--;
}
public:
explicit force_steal_sub_task(std::atomic<int>* parent_counter, std::atomic<int>* overall_counter):
tbb_sub_task(),
parent_counter_{parent_counter},
overall_counter_{overall_counter} {}
};
TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/tbb_task.h]") {
static static_scheduler_memory<8, 2 << 12> my_scheduler_memory;
SECTION("tasks are executed exactly once") {
scheduler my_scheduler{&my_scheduler_memory, 2};
int start_counter = 4;
int total_tasks = 1 + 4 + 4 * 3 + 4 * 3 * 2 + 4 * 3 * 2 * 1;
std::atomic<int> counter{0};
my_scheduler.perform_work([&] (){
once_sub_task sub_task{&counter, start_counter};
tbb_task task{&sub_task};
scheduler::execute_task(task);
});
REQUIRE(counter.load() == total_tasks);
my_scheduler.terminate(true);
}
SECTION("tasks can be stolen") {
scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([&] (){
std::atomic<int> dummy_parent{1}, overall_counter{8};
force_steal_sub_task sub_task{&dummy_parent, &overall_counter};
tbb_task task{&sub_task};
scheduler::execute_task(task);
});
my_scheduler.terminate(true);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment