Commit 251e228d by Florian Fritz

Merge branch 'basic_taskmodel' into 'master'

Merge first example of a Basic Task Model

See merge request !2
parents d8a7e11e b4a6c2b1
Pipeline #1109 passed with stages
in 3 minutes 10 seconds
...@@ -22,3 +22,8 @@ run_thread_sanitizer: ...@@ -22,3 +22,8 @@ run_thread_sanitizer:
stage: sanitizer stage: sanitizer
script: script:
./ci_scripts/run_thread_sanitizer.sh ./ci_scripts/run_thread_sanitizer.sh
run_address_sanitizer:
stage: sanitizer
script:
./ci_scripts/run_address_sanitizer.sh
...@@ -14,6 +14,7 @@ include(cmake/DisabelInSource.cmake) ...@@ -14,6 +14,7 @@ include(cmake/DisabelInSource.cmake)
include(cmake/SetupOptimizationLevel.cmake) include(cmake/SetupOptimizationLevel.cmake)
include(cmake/SetupThreadingSupport.cmake) include(cmake/SetupThreadingSupport.cmake)
include(cmake/SetupThreadSanitizer.cmake) include(cmake/SetupThreadSanitizer.cmake)
include(cmake/SetupAddressSanitizer.cmake)
# make our internal cmake script collection avaliable in the build process. # make our internal cmake script collection avaliable in the build process.
list(APPEND CMAKE_PREFIX_PATH "${PROJECT_SOURCE_DIR}/cmake") list(APPEND CMAKE_PREFIX_PATH "${PROJECT_SOURCE_DIR}/cmake")
......
...@@ -4,6 +4,13 @@ A collection of stuff that we noticed during development. ...@@ -4,6 +4,13 @@ A collection of stuff that we noticed during development.
Useful later on two write a project report and to go back Useful later on two write a project report and to go back
in time to find out why certain decisions where made. in time to find out why certain decisions where made.
## 21.03.2018 - Allocation on stack/static memory
We can use the [placement new](https://www.geeksforgeeks.org/placement-new-operator-cpp/)
operator for our tasks and other stuff to manage memory.
This can allow the pure 'stack based' approach without any memory
management suggested by mike.
## 20.03.2018 - Prohibit New ## 20.03.2018 - Prohibit New
We want to write this library without using any runtime memory We want to write this library without using any runtime memory
......
# P³LS³ # P³LS³
**P**redictable **P**arallel **P**atterns **L**ibrary for **S**calable **S**mart **S**ystems **P**redictable **P**arallel **P**atterns **L**ibrary for **S**calable **S**mart **S**ystems
Master Branch: [![pipeline status](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/badges/master/pipeline.svg)](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/commits/master) [![pipeline status](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/badges/master/pipeline.svg)](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/commits/master)
## Project Structure ## Project Structure
......
#!/usr/bin/env bash
mkdir cmake-build-release-address-sanitizer
cd cmake-build-release-address-sanitizer
cmake .. -DCMAKE_BUILD_TYPE=RELEASE -DTHREAD_SANITIZER=OFF -DADDRESS_SANITIZER=ON
make
# run the actual tests with sanitizer enabled, reporting the result
ASAN_OPTIONS="detect_stack_use_after_return=1 detect_leaks=1" ./bin/tests
STATUS_CODE=$?
exit $STATUS_CODE
# Optionally compile with thread sanitizer enabled to find concurrency bugs
# https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual
# Add optional sanitizer, off by default
option(ADDRESS_SANITIZER "Add address sanitizer" OFF)
if(ADDRESS_SANITIZER)
add_compile_options(-fsanitize=address -fno-omit-frame-pointer -fsanitize-address-use-after-scope -g)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address")
endif()
message("-- Address Sanitizer: ${ADDRESS_SANITIZER}")
...@@ -3,7 +3,15 @@ add_library(pls STATIC ...@@ -3,7 +3,15 @@ add_library(pls STATIC
src/library.cpp include/pls/library.h src/library.cpp include/pls/library.h
src/internal/base/spin_lock.cpp include/pls/internal/base/spin_lock.h src/internal/base/spin_lock.cpp include/pls/internal/base/spin_lock.h
src/internal/base/thread.cpp include/pls/internal/base/thread.h src/internal/base/thread.cpp include/pls/internal/base/thread.h
include/pls/internal/base/prohibit_new.h) include/pls/internal/base/prohibit_new.h
src/internal/scheduling/abstract_task.cpp include/pls/internal/scheduling/abstract_task.h
src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h
src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h
src/internal/base/barrier.cpp include/pls/internal/base/barrier.h
src/internal/scheduling/root_master_task.cpp include/pls/internal/scheduling/root_master_task.h
src/internal/base/aligned_stack.cpp include/pls/internal/base/aligned_stack.h
include/pls/internal/base/system_details.h
src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
......
#ifndef PLS_ALIGNED_STACK_H
#define PLS_ALIGNED_STACK_H
#include <cstdint>
#include <cstdlib>
namespace pls {
namespace internal {
namespace base {
class aligned_stack {
// Keep bounds of our memory block
char* memory_start_;
char* memory_end_;
// Current head will always be aligned to cache lines
char* head_;
static std::uintptr_t next_alignment(std::uintptr_t size);
static char* next_alignment(char* pointer);
public:
aligned_stack(): memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {};
aligned_stack(char* memory_region, const std::size_t size):
memory_start_{memory_region},
memory_end_{memory_region + size},
head_{next_alignment(memory_start_)} {}
template<typename T>
T* push(T object) {
T* result = reinterpret_cast<T*>(head_);
// Move head to next aligned position after new object
head_ = next_alignment(head_ + sizeof(T));
if (head_ >= memory_end_) {
exit(1); // TODO: Exception Handling
}
*result = object;
return result;
}
template<typename T>
T pop() {
head_ = head_ - next_alignment(sizeof(T));
return *reinterpret_cast<T*>(head_);
}
};
}
}
}
#endif //PLS_ALIGNED_STACK_H
#ifndef PLS_BARRIER_H
#define PLS_BARRIER_H
#include <pthread.h>
namespace pls {
namespace internal {
namespace base {
class barrier {
pthread_barrier_t barrier_;
public:
explicit barrier(const unsigned int count): barrier_{} {
pthread_barrier_init(&barrier_, nullptr, count);
}
~barrier() {
pthread_barrier_destroy(&barrier_);
}
void wait() {
pthread_barrier_wait(&barrier_);
}
};
}
}
}
#endif //PLS_BARRIER_H
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#define PLS_SPINLOCK_H #define PLS_SPINLOCK_H
#include <atomic> #include <atomic>
#include <iostream>
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
...@@ -16,6 +17,9 @@ namespace pls { ...@@ -16,6 +17,9 @@ namespace pls {
public: public:
spin_lock(): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {}; spin_lock(): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {};
spin_lock(const spin_lock& other): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} {
std::cout << "Spinlock Moved!" << std::endl;
}
void lock(); void lock();
void unlock(); void unlock();
......
#ifndef PLS_SYSTEM_DETAILS_H
#define PLS_SYSTEM_DETAILS_H
#include <cstdint>
namespace pls {
namespace internal {
namespace base {
constexpr std::uintptr_t CACHE_LINE_SIZE = 64;
}
}
}
#endif //PLS_SYSTEM_DETAILS_H
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <functional> #include <functional>
#include <pthread.h> #include <pthread.h>
#include <atomic>
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -57,20 +58,38 @@ namespace pls { ...@@ -57,20 +58,38 @@ namespace pls {
Function function_; Function function_;
State* state_pointer_; State* state_pointer_;
// Wee need to wait for the started function to read
// the function_ and state_pointer_ property before returning
// from the constructor, as the object might be moved after this.
std::atomic_flag* startup_flag_;
// Keep handle to native implementation // Keep handle to native implementation
pthread_t pthread_thread_; pthread_t pthread_thread_;
static void* start_pthread_internal(void* thread_pointer) { static void* start_pthread_internal(void* thread_pointer) {
auto my_thread = reinterpret_cast<thread*>(thread_pointer); auto my_thread = reinterpret_cast<thread*>(thread_pointer);
this_thread::set_state(my_thread->state_pointer_); Function my_function_copy = my_thread->function_;
my_thread->function_(); State* my_state_pointer_copy = my_thread->state_pointer_;
// Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more
// references to its memory location).
my_thread->startup_flag_->clear();
this_thread::set_state(my_state_pointer_copy);
my_function_copy();
// Finished executing the user function
pthread_exit(nullptr); pthread_exit(nullptr);
} }
public: public:
thread(): function_{}, state_pointer_{nullptr}, startup_flag_{nullptr}, pthread_thread_{} {}
explicit thread(const Function& function, State* state_pointer): explicit thread(const Function& function, State* state_pointer):
function_{function}, function_{function},
state_pointer_{state_pointer}, state_pointer_{state_pointer},
startup_flag_{nullptr},
pthread_thread_{} { pthread_thread_{} {
if (!this_thread::local_storage_key_initialized_) { if (!this_thread::local_storage_key_initialized_) {
...@@ -78,7 +97,14 @@ namespace pls { ...@@ -78,7 +97,14 @@ namespace pls {
this_thread::local_storage_key_initialized_ = true; this_thread::local_storage_key_initialized_ = true;
} }
// We only need this during startup, will be destroyed when out of scope
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
startup_flag_ = &startup_flag;
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this)); pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this));
while (startup_flag.test_and_set())
; // Busy waiting for the starting flag to clear
} }
public: public:
void join() { void join() {
......
#ifndef PLS_ABSTRACT_TASK_H
#define PLS_ABSTRACT_TASK_H
#define PLS_UNIQUE_ID __COUNTER__
#include "pls/internal/base/spin_lock.h"
namespace pls {
namespace internal {
namespace scheduling {
class abstract_task {
int depth_;
int unique_id_;
abstract_task* child_task_;
public:
abstract_task(int depth, int unique_id):
depth_{depth},
unique_id_{unique_id},
child_task_{nullptr} {}
virtual void execute() = 0;
void set_child(abstract_task* child_task) { child_task_ = child_task; }
abstract_task* child() { return child_task_; }
void set_depth(int depth) { depth_ = depth; }
int depth() { return depth_; }
protected:
virtual bool internal_stealing(abstract_task* other_task) = 0;
virtual bool split_task() = 0;
bool steal_work();
};
}
}
}
#endif //PLS_ABSTRACT_TASK_H
#ifndef PLS_ROOT_MASTER_TASK_H
#define PLS_ROOT_MASTER_TASK_H
#include <mutex>
#include "abstract_task.h"
#include "pls/internal/base/spin_lock.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class root_master_task : public abstract_task {
Function function_;
bool finished_;
// Improvement: Remove lock and replace by atomic variable (performance)
base::spin_lock finished_lock_;
public:
explicit root_master_task(Function function):
abstract_task{0, 0},
function_{function},
finished_{false} {}
bool finished() {
std::lock_guard<base::spin_lock> lock{finished_lock_};
return finished_;
}
void execute() override {
function_();
{
std::lock_guard<base::spin_lock> lock{finished_lock_};
finished_ = true;
}
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task() override {
return false;
}
};
}
}
}
#endif //PLS_ROOT_MASTER_TASK_H
#ifndef PLS_ROOT_WORKER_TASK_H
#define PLS_ROOT_WORKER_TASK_H
#include "root_master_task.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class root_worker_task : public abstract_task {
root_master_task<Function>* master_task_;
public:
explicit root_worker_task(root_master_task<Function>* master_task):
abstract_task{0, 0},
master_task_{master_task} {}
void execute() override {
do {
steal_work();
} while (!master_task_->finished());
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task() override {
return false;
}
};
}
}
}
#endif //PLS_ROOT_WORKER_TASK_H
#ifndef PLS_RUN_ON_N_THREADS_TASK_H
#define PLS_RUN_ON_N_THREADS_TASK_H
#include <mutex>
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/thread.h"
#include "abstract_task.h"
#include "thread_state.h"
#include "scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class run_on_n_threads_task : public abstract_task {
template<typename F>
friend class run_on_n_threads_task_worker;
Function function_;
// Improvement: Remove lock and replace by atomic variable (performance)
int counter;
base::spin_lock counter_lock_;
int decrement_counter() {
std::lock_guard<base::spin_lock> lock{counter_lock_};
counter--;
return counter;
}
int get_counter() {
std::lock_guard<base::spin_lock> lock{counter_lock_};
return counter;
}
public:
run_on_n_threads_task(Function function, int num_threads):
abstract_task{PLS_UNIQUE_ID, 0},
function_{function},
counter{num_threads - 1} {}
void execute() override {
// Execute our function ONCE
function_();
// Steal until we are finished (other threads executed)
do {
steal_work();
} while (get_counter() > 0);
std::cout << "Finished Master!" << std::endl;
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task() override;
};
template<typename Function>
class run_on_n_threads_task_worker : public abstract_task {
Function function_;
run_on_n_threads_task<Function>* root_;
public:
run_on_n_threads_task_worker(Function function, run_on_n_threads_task<Function>* root):
abstract_task{PLS_UNIQUE_ID, 0},
function_{function},
root_{root} {}
void execute() override {
if (root_->decrement_counter() >= 0) {
function_();
std::cout << "Finished Worker!" << std::endl;
} else {
std::cout << "Abandoned Worker!" << std::endl;
}
}
bool internal_stealing(abstract_task* /*other_task*/) override {
return false;
}
bool split_task() override {
return false;
}
};
template<typename Function>
bool run_on_n_threads_task<Function>::split_task() {
if (get_counter() <= 0) {
return false;
}
auto scheduler = base::this_thread::state<thread_state>()->scheduler_;
auto task = run_on_n_threads_task_worker<Function>{function_, this};
scheduler->execute_task(task, depth());
return true;
}
template<typename Function>
run_on_n_threads_task<Function> create_run_on_n_threads_task(Function function, int num_threads) {
return run_on_n_threads_task<Function>{function, num_threads};
}
}
}
}
#endif //PLS_RUN_ON_N_THREADS_TASK_H
#ifndef PLS_SCHEDULER_H
#define PLS_SCHEDULER_H
#include <array>
#include <iostream>
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h"
#include "thread_state.h"
#include "root_master_task.h"
#include "root_worker_task.h"
namespace pls {
namespace internal {
namespace scheduling {
// Upper thread limit for static memory allocation.
// Could be moved to templating if needed.
static constexpr int MAX_THREADS = 32;
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler_memory {
public:
virtual size_t max_threads() = 0;
virtual thread_state* thread_state_for(size_t id) = 0;
virtual scheduler_thread* thread_for(size_t id) = 0;
virtual base::aligned_stack* task_stack_for(size_t id) = 0;
};
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
class static_scheduler_memory: public scheduler_memory {
std::array<scheduler_thread, MAX_THREADS> threads_;
std::array<thread_state, MAX_THREADS> thread_states_;
std::array<std::array<char, TASK_STACK_SIZE>, MAX_THREADS> task_stacks_memory_;
std::array<base::aligned_stack, MAX_THREADS> task_stacks_;
public:
static_scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
task_stacks_[i] = base::aligned_stack(reinterpret_cast<char*>(&task_stacks_memory_[i]), TASK_STACK_SIZE);
}
}
size_t max_threads() override { return MAX_THREADS; }
thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; }
scheduler_thread* thread_for(size_t id) override { return &threads_[id]; }
base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; }
};
class scheduler {
friend void worker_routine();
const unsigned int num_threads_;
scheduler_memory* memory_;
base::barrier sync_barrier_;
bool terminated_;
public:
explicit scheduler(scheduler_memory* memory, unsigned int num_threads);
~scheduler();
template<typename Function>
void perform_work(Function work_section) {
root_master_task<Function> master{work_section};
root_worker_task<Function> worker{&master};
// Push root task on stacks
memory_->thread_state_for(0)->root_task_ = &master;
memory_->thread_state_for(0)->current_task_ = &master;
for (unsigned int i = 1; i < num_threads_; i++) {
memory_->thread_state_for(i)->root_task_ = &worker;
memory_->thread_state_for(i)->current_task_ = &worker;
}
// Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
}
// TODO: See if we should place this differently (only for performance reasons)
template<typename Task>
static void execute_task(Task task, int depth=-1) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
auto my_state = base::this_thread::state<thread_state>();
auto current_task = my_state->current_task_;
// Init Task
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
task.set_depth(depth >= 0 ? depth : current_task->depth() + 1);
my_state->current_task_ = &task;
current_task->set_child(&task);
}
// Run Task
task.execute();
// Teardown state back to before the task was executed
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
current_task->set_child(nullptr);
my_state->current_task_ = current_task;
}
}
void terminate(bool wait_for_workers=true);
unsigned int num_threads() const { return num_threads_; }
thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); }
};
}
}
}
#endif //PLS_SCHEDULER_H
#ifndef PLS_THREAD_STATE_H
#define PLS_THREAD_STATE_H
#include "abstract_task.h"
#include "pls/internal/base/aligned_stack.h"
namespace pls {
namespace internal {
namespace scheduling {
// forward declaration
class scheduler;
struct thread_state {
scheduler* scheduler_;
abstract_task* root_task_;
abstract_task* current_task_;
base::aligned_stack* task_stack_;
unsigned int id_;
base::spin_lock lock_;
thread_state():
scheduler_{nullptr},
root_task_{nullptr},
current_task_{nullptr},
task_stack_{nullptr},
id_{0} {};
thread_state(scheduler* scheduler, base::aligned_stack* task_stack, unsigned int id):
scheduler_{scheduler},
root_task_{nullptr},
current_task_{nullptr},
task_stack_{task_stack},
id_{id} {}
thread_state(const thread_state& other):
scheduler_{other.scheduler_},
root_task_{other.root_task_},
current_task_{other.current_task_},
task_stack_{other.task_stack_},
id_{other.id_} {}
thread_state& operator=(const thread_state& other) {
scheduler_ = other.scheduler_;
root_task_ = other.root_task_;
current_task_ = other.current_task_;
task_stack_ = other.task_stack_;
id_ = other.id_;
return *this;
}
};
}
}
}
#endif //PLS_THREAD_STATE_H
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace base {
std::uintptr_t aligned_stack::next_alignment(std::uintptr_t size) {
std::uintptr_t miss_alignment = size % CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
return size + (CACHE_LINE_SIZE - miss_alignment);
}
}
char* aligned_stack::next_alignment(char* pointer) {
return reinterpret_cast<char*>(next_alignment(reinterpret_cast<std::uintptr_t >(pointer)));
}
}
}
}
#include "pls/internal/base/barrier.h"
namespace pls {
namespace internal {
namespace base {
}
}
}
...@@ -3,6 +3,9 @@ ...@@ -3,6 +3,9 @@
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace base { namespace base {
// TODO: Research/Measure how the memory fences/orders influence this.
// For now we simply try to be safe by forcing this lock to
// also act as a strict memory fence.
void spin_lock::lock() { void spin_lock::lock() {
int tries = 0; int tries = 0;
while (flag_.test_and_set(std::memory_order_acquire)) { while (flag_.test_and_set(std::memory_order_acquire)) {
......
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/abstract_task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
bool abstract_task::steal_work() {
auto my_state = base::this_thread::state<thread_state>();
auto my_scheduler = my_state->scheduler_;
int my_id = my_state->id_;
for (size_t i = 1; i < my_scheduler->num_threads(); i++) {
size_t target = (my_id + i) % my_scheduler->num_threads();
auto target_state = my_scheduler->thread_state_for(target);
std::lock_guard<base::spin_lock> lock{target_state->lock_};
// Dig down to our level
abstract_task* current_task = target_state->root_task_;
while (current_task != nullptr && current_task->depth() < depth()) {
current_task = current_task->child_task_;
}
if (current_task != nullptr) {
// See if it equals our type and depth of task
if (current_task->unique_id_ == unique_id_ &&
current_task->depth_ == depth_) {
if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler
return true;
}
// No success, we need to steal work from a deeper level using 'top level task stealing'
current_task = current_task->child_task_;
}
}
// Execute 'top level task steal' if possible
// (only try deeper tasks to keep depth restricted stealing)
while (current_task != nullptr) {
if (current_task->split_task()) {
// internal steal was no success (we did a top level task steal)
return false;
}
current_task = current_task->child_task_;
}
}
// internal steal was no success
return false;
};
}
}
}
#include "pls/internal/scheduling/root_master_task.h"
#include "pls/internal/scheduling/root_worker_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/root_master_task.h"
#include "pls/internal/scheduling/root_worker_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/run_on_n_threads_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
scheduler::scheduler(scheduler_memory* memory, const unsigned int num_threads):
num_threads_{num_threads},
memory_{memory},
sync_barrier_{num_threads + 1},
terminated_{false} {
if (num_threads > MAX_THREADS) {
exit(1); // TODO: Exception Handling
}
for (unsigned int i = 0; i < num_threads; i++) {
*memory_->thread_state_for(i) = thread_state{this, memory_->task_stack_for(i), i};
*memory_->thread_for(i) = base::start_thread(&worker_routine, memory_->thread_state_for(i));
}
}
scheduler::~scheduler() {
terminate();
}
void worker_routine() {
auto my_state = base::this_thread::state<thread_state>();
while (true) {
my_state->scheduler_->sync_barrier_.wait();
if (my_state->scheduler_->terminated_) {
return;
}
// The root task must only return when all work is done,
// because of this a simple call is enough to ensure the
// fork-join-section is done (logically joined back into our main thread).
my_state->root_task_->execute();
my_state->scheduler_->sync_barrier_.wait();
}
}
void scheduler::terminate(bool wait_for_workers) {
if (terminated_) {
return;
}
terminated_ = true;
sync_barrier_.wait();
if (wait_for_workers) {
for (unsigned int i = 0; i < num_threads_; i++) {
memory_->thread_for(i)->join();
}
}
}
}
}
}
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include <catch.hpp> #include <catch.hpp>
#include <pls/internal/base/thread.h> #include <pls/internal/base/thread.h>
#include <pls/internal/base/spin_lock.h> #include <pls/internal/base/spin_lock.h>
#include <pls/internal/base/aligned_stack.h>
#include <pls/internal/base/system_details.h>
#include <vector> #include <vector>
#include <mutex> #include <mutex>
...@@ -82,3 +84,66 @@ TEST_CASE( "spinlock protects concurrent counter", "[internal/base/spinlock.h]") ...@@ -82,3 +84,66 @@ TEST_CASE( "spinlock protects concurrent counter", "[internal/base/spinlock.h]")
REQUIRE(base_tests_shared_counter == 0); REQUIRE(base_tests_shared_counter == 0);
} }
} }
TEST_CASE( "aligned stack stores objects correctly", "[internal/base/aligned_stack.h]") {
constexpr long data_size = 1024;
char data[data_size];
aligned_stack stack{data, data_size};
SECTION( "stack correctly pushes sub linesize objects" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
std::array<char, 1> small_data_three{'A'};
auto pointer_one = stack.push(small_data_one);
auto pointer_two = stack.push(small_data_two);
auto pointer_three = stack.push(small_data_three);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_one) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_two) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_three) % CACHE_LINE_SIZE == 0);
}
SECTION( "stack correctly pushes above linesize objects" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, CACHE_LINE_SIZE + 10> big_data_one{};
auto big_pointer_one = stack.push(big_data_one);
auto small_pointer_one = stack.push(small_data_one);
REQUIRE(reinterpret_cast<std::uintptr_t>(big_pointer_one) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(small_pointer_one) % CACHE_LINE_SIZE == 0);
}
SECTION( "stack correctly stores and retrieves objects" ) {
std::array<char, 5> data_one{'a', 'b', 'c', 'd', 'e'};
stack.push(data_one);
auto retrieved_data = stack.pop<std::array<char, 5>>();
REQUIRE(retrieved_data == std::array<char, 5>{'a', 'b', 'c', 'd', 'e'});
}
SECTION( "stack can push and pop multiple times with correct alignment" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
std::array<char, 1> small_data_three{'A'};
auto pointer_one = stack.push(small_data_one);
auto pointer_two = stack.push(small_data_two);
auto pointer_three = stack.push(small_data_three);
stack.pop<typeof(small_data_three)>();
stack.pop<typeof(small_data_two)>();
auto pointer_four = stack.push(small_data_two);
auto pointer_five = stack.push(small_data_three);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_one) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_two) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_three) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_four) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_five) % CACHE_LINE_SIZE == 0);
REQUIRE(pointer_four == pointer_two);
REQUIRE(pointer_five == pointer_three);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment