Commit 00355880 by FritzFlorian

Add basic thread pool scheduling loop.

Right now no work can be spawned, we simply have a proove of concept that we start up each thread, work for the master to finish and then synchronize back to the main thread.
parent d8a7e11e
Pipeline #1103 passed with stages
in 1 minute 58 seconds
......@@ -4,6 +4,13 @@ A collection of stuff that we noticed during development.
Useful later on two write a project report and to go back
in time to find out why certain decisions where made.
## 21.03.2018 - Allocation on stack/static memory
We can use the [placement new](https://www.geeksforgeeks.org/placement-new-operator-cpp/)
operator for our tasks and other stuff to manage memory.
This can allow the pure 'stack based' approach without any memory
management suggested by mike.
## 20.03.2018 - Prohibit New
We want to write this library without using any runtime memory
......
# P³LS³
**P**redictable **P**arallel **P**atterns **L**ibrary for **S**calable **S**mart **S**ystems
Master Branch: [![pipeline status](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/badges/master/pipeline.svg)](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/commits/master)
[![pipeline status](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/badges/master/pipeline.svg)](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/commits/master)
## Project Structure
......
......@@ -3,7 +3,11 @@ add_library(pls STATIC
src/library.cpp include/pls/library.h
src/internal/base/spin_lock.cpp include/pls/internal/base/spin_lock.h
src/internal/base/thread.cpp include/pls/internal/base/thread.h
include/pls/internal/base/prohibit_new.h)
include/pls/internal/base/prohibit_new.h
src/internal/scheduling/abstract_task.cpp include/pls/internal/scheduling/abstract_task.h
src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h
src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h
src/internal/base/barrier.cpp include/pls/internal/base/barrier.h include/pls/internal/scheduling/root_master_task.h src/internal/scheduling/root_master_task.cpp)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......
#ifndef PLS_BARRIER_H
#define PLS_BARRIER_H
#include <pthread.h>
namespace pls {
namespace internal {
namespace base {
class barrier {
pthread_barrier_t barrier_;
public:
explicit barrier(const unsigned int count): barrier_{} {
pthread_barrier_init(&barrier_, nullptr, count);
}
~barrier() {
pthread_barrier_destroy(&barrier_);
}
void wait() {
pthread_barrier_wait(&barrier_);
}
};
}
}
}
#endif //PLS_BARRIER_H
......@@ -8,6 +8,7 @@
#include <functional>
#include <pthread.h>
#include <atomic>
namespace pls {
namespace internal {
......@@ -57,20 +58,38 @@ namespace pls {
Function function_;
State* state_pointer_;
// Wee need to wait for the started function to read
// the function_ and state_pointer_ property before returning
// from the constructor, as the object might be moved after this.
std::atomic_flag* startup_flag_;
// Keep handle to native implementation
pthread_t pthread_thread_;
static void* start_pthread_internal(void* thread_pointer) {
auto my_thread = reinterpret_cast<thread*>(thread_pointer);
this_thread::set_state(my_thread->state_pointer_);
my_thread->function_();
Function my_function_copy = my_thread->function_;
State* my_state_pointer_copy = my_thread->state_pointer_;
// Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more
// references to its memory location).
my_thread->startup_flag_->clear();
this_thread::set_state(my_state_pointer_copy);
my_function_copy();
// Finished executing the user function
pthread_exit(nullptr);
}
public:
thread(): function_{}, state_pointer_{nullptr}, startup_flag_{nullptr}, pthread_thread_{} {}
explicit thread(const Function& function, State* state_pointer):
function_{function},
state_pointer_{state_pointer},
startup_flag_{nullptr},
pthread_thread_{} {
if (!this_thread::local_storage_key_initialized_) {
......@@ -78,7 +97,14 @@ namespace pls {
this_thread::local_storage_key_initialized_ = true;
}
// We only need this during startup, will be destroyed when out of scope
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
startup_flag_ = &startup_flag;
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this));
while (startup_flag.test_and_set())
; // Busy waiting for the starting flag to clear
}
public:
void join() {
......
#ifndef PLS_ABSTRACT_TASK_H
#define PLS_ABSTRACT_TASK_H
#define PLS_UNIQUE_ID __COUNTER__
#include "pls/internal/base/spin_lock.h"
namespace pls {
namespace internal {
namespace scheduling {
class abstract_task {
int depth_;
int unique_id_;
abstract_task* child_task_;
base::spin_lock spin_lock_;
public:
explicit abstract_task(int depth, int unique_id):
depth_{depth},
unique_id_{unique_id},
child_task_{nullptr},
spin_lock_{} {};
virtual void execute() = 0;
const base::spin_lock& spin_lock() { return spin_lock_; }
protected:
virtual bool my_stealing(abstract_task *other_task) = 0;
bool steal_work() {
// get scheduler
// select victim
// try steal
// |-- see if same depth is available
// |-- see if equals depth + id
// |-- try user steal if matches (will return itself if it could steal)
// |-- try internal steal if deeper tasks are available
// |-- if internal steal worked, execute it
// return if the user steal was a success
return false;
};
};
}
}
}
#endif //PLS_ABSTRACT_TASK_H
#ifndef PLS_ROOT_MASTER_TASK_H
#define PLS_ROOT_MASTER_TASK_H
#include <mutex>
#include "abstract_task.h"
#include "pls/internal/base/spin_lock.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class root_master_task : public abstract_task {
Function function_;
bool finished_;
// Improvement: Remove lock and replace by atomic variable (performance)
base::spin_lock finished_lock_;
public:
explicit root_master_task(Function function):
abstract_task{0, 0},
function_{function},
finished_{false} {}
bool finished() {
std::lock_guard<base::spin_lock> lock{finished_lock_};
return finished_;
}
void execute() override {
function_();
{
std::lock_guard<base::spin_lock> lock{finished_lock_};
finished_ = true;
}
}
bool my_stealing(abstract_task *other_task) override {
return false;
}
};
}
}
}
#endif //PLS_ROOT_MASTER_TASK_H
#ifndef PLS_ROOT_WORKER_TASK_H
#define PLS_ROOT_WORKER_TASK_H
#include "root_master_task.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class root_worker_task : public abstract_task {
root_master_task<Function>* master_task_;
public:
explicit root_worker_task(root_master_task<Function>* master_task):
abstract_task{0, 0},
master_task_{master_task} {}
void execute() override {
do {
steal_work();
} while (!master_task_->finished());
}
bool my_stealing(abstract_task *other_task) override {
return false;
}
};
}
}
}
#endif //PLS_ROOT_WORKER_TASK_H
#ifndef PLS_SCHEDULER_H
#define PLS_SCHEDULER_H
#include <array>
#include <iostream>
#include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h"
#include "pls/internal/scheduling/thread_state.h"
#include "root_master_task.h"
#include "root_worker_task.h"
namespace pls {
namespace internal {
namespace scheduling {
// Upper thread limit for static memory allocation.
// Could be moved to templating if needed.
static constexpr int MAX_THREADS = 32;
class scheduler {
static void worker_routine() {
auto my_state = base::this_thread::state<thread_state>();
while (true) {
my_state->scheduler_->sync_barrier_.wait();
if (my_state->scheduler_->terminated_) {
return;
}
// The root task must only return when all work is done,
// because of this a simple call is enough to ensure the
// fork-join-section is done (logically joined back into our main thread).
my_state->root_task_->execute();
my_state->scheduler_->sync_barrier_.wait();
}
}
unsigned int num_threads_;
std::array<thread_state, MAX_THREADS> thread_states_;
std::array<base::thread<decltype(&worker_routine), thread_state>, MAX_THREADS> threads_;
base::barrier sync_barrier_;
bool terminated_;
public:
scheduler(const unsigned int num_threads):
num_threads_{num_threads},
sync_barrier_{num_threads + 1},
terminated_{false} {
if (num_threads > MAX_THREADS) {
exit(1); // TODO: Exception Handling
}
for (unsigned int i = 0; i < num_threads; i++) {
thread_states_[i] = thread_state{this};
threads_[i] = base::start_thread(&worker_routine, &thread_states_[i]);
}
}
~scheduler() {
terminate();
}
template<typename Function>
void perform_work(Function work_section) {
root_master_task<Function> master{work_section};
root_worker_task<Function> worker{&master};
thread_states_[0].root_task_ = &master;
for (unsigned int i = 1; i < num_threads_; i++) {
thread_states_[i].root_task_ = &worker;
}
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
}
void terminate(bool wait_for_workers=true) {
if (terminated_) {
return;
}
terminated_ = true;
sync_barrier_.wait();
if (wait_for_workers) {
for (unsigned int i = 0; i < num_threads_; i++) {
threads_[i].join();
}
}
}
};
}
}
}
#endif //PLS_SCHEDULER_H
#ifndef PLS_THREAD_STATE_H
#define PLS_THREAD_STATE_H
#include "abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
// forward declaration
class scheduler;
struct thread_state {
thread_state(): scheduler_{nullptr}, root_task_{nullptr} {};
explicit thread_state(scheduler* scheduler): scheduler_{scheduler}, root_task_{nullptr} {}
scheduler* scheduler_;
abstract_task* root_task_;
};
}
}
}
#endif //PLS_THREAD_STATE_H
#include "pls/internal/base/barrier.h"
namespace pls {
namespace internal {
namespace base {
}
}
}
......@@ -3,6 +3,9 @@
namespace pls {
namespace internal {
namespace base {
// TODO: Research/Measure how the memory fences/orders influence this.
// For now we simply try to be safe by forcing this lock to
// also act as a strict memory fence.
void spin_lock::lock() {
int tries = 0;
while (flag_.test_and_set(std::memory_order_acquire)) {
......
#include "pls/internal/scheduling/abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/root_master_task.h"
#include "pls/internal/scheduling/root_worker_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/root_master_task.h"
#include "pls/internal/scheduling/root_worker_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment