From 0035588016e34c5fafcb161e5c546a3d2192ae09 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Fri, 22 Mar 2019 16:13:05 +0100 Subject: [PATCH] Add basic thread pool scheduling loop. Right now no work can be spawned, we simply have a proove of concept that we start up each thread, work for the master to finish and then synchronize back to the main thread. --- NOTES.md | 7 +++++++ README.md | 2 +- lib/pls/CMakeLists.txt | 6 +++++- lib/pls/include/pls/internal/base/barrier.h | 30 ++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/base/thread.h | 30 ++++++++++++++++++++++++++++-- lib/pls/include/pls/internal/scheduling/abstract_task.h | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/root_master_task.h | 47 +++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/root_worker_task.h | 33 +++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/scheduler.h | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/thread_state.h | 24 ++++++++++++++++++++++++ lib/pls/src/internal/base/barrier.cpp | 9 +++++++++ lib/pls/src/internal/base/spin_lock.cpp | 3 +++ lib/pls/src/internal/scheduling/abstract_task.cpp | 9 +++++++++ lib/pls/src/internal/scheduling/root_master_task.cpp | 10 ++++++++++ lib/pls/src/internal/scheduling/root_worker_task.cpp | 10 ++++++++++ lib/pls/src/internal/scheduling/scheduler.cpp | 9 +++++++++ lib/pls/src/internal/scheduling/thread_state.cpp | 9 +++++++++ 17 files changed, 380 insertions(+), 4 deletions(-) create mode 100644 lib/pls/include/pls/internal/base/barrier.h create mode 100644 lib/pls/include/pls/internal/scheduling/abstract_task.h create mode 100644 lib/pls/include/pls/internal/scheduling/root_master_task.h create mode 100644 lib/pls/include/pls/internal/scheduling/root_worker_task.h create mode 100644 lib/pls/include/pls/internal/scheduling/scheduler.h create mode 100644 lib/pls/include/pls/internal/scheduling/thread_state.h create mode 100644 lib/pls/src/internal/base/barrier.cpp create mode 100644 lib/pls/src/internal/scheduling/abstract_task.cpp create mode 100644 lib/pls/src/internal/scheduling/root_master_task.cpp create mode 100644 lib/pls/src/internal/scheduling/root_worker_task.cpp create mode 100644 lib/pls/src/internal/scheduling/scheduler.cpp create mode 100644 lib/pls/src/internal/scheduling/thread_state.cpp diff --git a/NOTES.md b/NOTES.md index f0ab970..d4b07ee 100644 --- a/NOTES.md +++ b/NOTES.md @@ -4,6 +4,13 @@ A collection of stuff that we noticed during development. Useful later on two write a project report and to go back in time to find out why certain decisions where made. +## 21.03.2018 - Allocation on stack/static memory + +We can use the [placement new](https://www.geeksforgeeks.org/placement-new-operator-cpp/) +operator for our tasks and other stuff to manage memory. +This can allow the pure 'stack based' approach without any memory +management suggested by mike. + ## 20.03.2018 - Prohibit New We want to write this library without using any runtime memory diff --git a/README.md b/README.md index 6c5d6c9..daa680b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # P³LS³ **P**redictable **P**arallel **P**atterns **L**ibrary for **S**calable **S**mart **S**ystems -Master Branch: [![pipeline status](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/badges/master/pipeline.svg)](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/commits/master) +[![pipeline status](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/badges/master/pipeline.svg)](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/commits/master) ## Project Structure diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 270c8a3..a3c9989 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -3,7 +3,11 @@ add_library(pls STATIC src/library.cpp include/pls/library.h src/internal/base/spin_lock.cpp include/pls/internal/base/spin_lock.h src/internal/base/thread.cpp include/pls/internal/base/thread.h - include/pls/internal/base/prohibit_new.h) + include/pls/internal/base/prohibit_new.h + src/internal/scheduling/abstract_task.cpp include/pls/internal/scheduling/abstract_task.h + src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h + src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h + src/internal/base/barrier.cpp include/pls/internal/base/barrier.h include/pls/internal/scheduling/root_master_task.h src/internal/scheduling/root_master_task.cpp) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/base/barrier.h b/lib/pls/include/pls/internal/base/barrier.h new file mode 100644 index 0000000..f5ea58b --- /dev/null +++ b/lib/pls/include/pls/internal/base/barrier.h @@ -0,0 +1,30 @@ + +#ifndef PLS_BARRIER_H +#define PLS_BARRIER_H + +#include + +namespace pls { + namespace internal { + namespace base { + class barrier { + pthread_barrier_t barrier_; + + public: + explicit barrier(const unsigned int count): barrier_{} { + pthread_barrier_init(&barrier_, nullptr, count); + } + + ~barrier() { + pthread_barrier_destroy(&barrier_); + } + + void wait() { + pthread_barrier_wait(&barrier_); + } + }; + } + } +} + +#endif //PLS_BARRIER_H diff --git a/lib/pls/include/pls/internal/base/thread.h b/lib/pls/include/pls/internal/base/thread.h index 265190e..f03be21 100644 --- a/lib/pls/include/pls/internal/base/thread.h +++ b/lib/pls/include/pls/internal/base/thread.h @@ -8,6 +8,7 @@ #include #include +#include namespace pls { namespace internal { @@ -57,20 +58,38 @@ namespace pls { Function function_; State* state_pointer_; + // Wee need to wait for the started function to read + // the function_ and state_pointer_ property before returning + // from the constructor, as the object might be moved after this. + std::atomic_flag* startup_flag_; + // Keep handle to native implementation pthread_t pthread_thread_; static void* start_pthread_internal(void* thread_pointer) { auto my_thread = reinterpret_cast(thread_pointer); - this_thread::set_state(my_thread->state_pointer_); - my_thread->function_(); + Function my_function_copy = my_thread->function_; + State* my_state_pointer_copy = my_thread->state_pointer_; + + // Now we have copies of everything we need on the stack. + // The original thread object can be moved freely (no more + // references to its memory location). + my_thread->startup_flag_->clear(); + + this_thread::set_state(my_state_pointer_copy); + my_function_copy(); + + // Finished executing the user function pthread_exit(nullptr); } public: + thread(): function_{}, state_pointer_{nullptr}, startup_flag_{nullptr}, pthread_thread_{} {} + explicit thread(const Function& function, State* state_pointer): function_{function}, state_pointer_{state_pointer}, + startup_flag_{nullptr}, pthread_thread_{} { if (!this_thread::local_storage_key_initialized_) { @@ -78,7 +97,14 @@ namespace pls { this_thread::local_storage_key_initialized_ = true; } + // We only need this during startup, will be destroyed when out of scope + std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; + startup_flag_ = &startup_flag; + + startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this)); + while (startup_flag.test_and_set()) + ; // Busy waiting for the starting flag to clear } public: void join() { diff --git a/lib/pls/include/pls/internal/scheduling/abstract_task.h b/lib/pls/include/pls/internal/scheduling/abstract_task.h new file mode 100644 index 0000000..ceb4943 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/abstract_task.h @@ -0,0 +1,48 @@ + +#ifndef PLS_ABSTRACT_TASK_H +#define PLS_ABSTRACT_TASK_H + +#define PLS_UNIQUE_ID __COUNTER__ + +#include "pls/internal/base/spin_lock.h" + +namespace pls { + namespace internal { + namespace scheduling { + class abstract_task { + int depth_; + int unique_id_; + abstract_task* child_task_; + base::spin_lock spin_lock_; + + public: + explicit abstract_task(int depth, int unique_id): + depth_{depth}, + unique_id_{unique_id}, + child_task_{nullptr}, + spin_lock_{} {}; + + virtual void execute() = 0; + const base::spin_lock& spin_lock() { return spin_lock_; } + + protected: + virtual bool my_stealing(abstract_task *other_task) = 0; + + bool steal_work() { + // get scheduler + // select victim + // try steal + // |-- see if same depth is available + // |-- see if equals depth + id + // |-- try user steal if matches (will return itself if it could steal) + // |-- try internal steal if deeper tasks are available + // |-- if internal steal worked, execute it + // return if the user steal was a success + return false; + }; + }; + } + } +} + +#endif //PLS_ABSTRACT_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/root_master_task.h b/lib/pls/include/pls/internal/scheduling/root_master_task.h new file mode 100644 index 0000000..316ee82 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/root_master_task.h @@ -0,0 +1,47 @@ + +#ifndef PLS_ROOT_MASTER_TASK_H +#define PLS_ROOT_MASTER_TASK_H + +#include + +#include "abstract_task.h" +#include "pls/internal/base/spin_lock.h" + +namespace pls { + namespace internal { + namespace scheduling { + template + class root_master_task : public abstract_task { + Function function_; + bool finished_; + + // Improvement: Remove lock and replace by atomic variable (performance) + base::spin_lock finished_lock_; + public: + explicit root_master_task(Function function): + abstract_task{0, 0}, + function_{function}, + finished_{false} {} + + bool finished() { + std::lock_guard lock{finished_lock_}; + return finished_; + } + + void execute() override { + function_(); + { + std::lock_guard lock{finished_lock_}; + finished_ = true; + } + } + + bool my_stealing(abstract_task *other_task) override { + return false; + } + }; + } + } +} + +#endif //PLS_ROOT_MASTER_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/root_worker_task.h b/lib/pls/include/pls/internal/scheduling/root_worker_task.h new file mode 100644 index 0000000..68cf508 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/root_worker_task.h @@ -0,0 +1,33 @@ + +#ifndef PLS_ROOT_WORKER_TASK_H +#define PLS_ROOT_WORKER_TASK_H + +#include "root_master_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + template + class root_worker_task : public abstract_task { + root_master_task* master_task_; + + public: + explicit root_worker_task(root_master_task* master_task): + abstract_task{0, 0}, + master_task_{master_task} {} + + void execute() override { + do { + steal_work(); + } while (!master_task_->finished()); + } + + bool my_stealing(abstract_task *other_task) override { + return false; + } + }; + } + } +} + +#endif //PLS_ROOT_WORKER_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h new file mode 100644 index 0000000..7c7ab06 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -0,0 +1,98 @@ + +#ifndef PLS_SCHEDULER_H +#define PLS_SCHEDULER_H + +#include +#include + +#include "pls/internal/base/thread.h" +#include "pls/internal/base/barrier.h" +#include "pls/internal/scheduling/thread_state.h" +#include "root_master_task.h" +#include "root_worker_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + // Upper thread limit for static memory allocation. + // Could be moved to templating if needed. + static constexpr int MAX_THREADS = 32; + + class scheduler { + static void worker_routine() { + auto my_state = base::this_thread::state(); + + while (true) { + my_state->scheduler_->sync_barrier_.wait(); + if (my_state->scheduler_->terminated_) { + return; + } + + // The root task must only return when all work is done, + // because of this a simple call is enough to ensure the + // fork-join-section is done (logically joined back into our main thread). + my_state->root_task_->execute(); + + my_state->scheduler_->sync_barrier_.wait(); + } + } + + unsigned int num_threads_; + std::array thread_states_; + std::array, MAX_THREADS> threads_; + + base::barrier sync_barrier_; + bool terminated_; + public: + scheduler(const unsigned int num_threads): + num_threads_{num_threads}, + sync_barrier_{num_threads + 1}, + terminated_{false} { + if (num_threads > MAX_THREADS) { + exit(1); // TODO: Exception Handling + } + + for (unsigned int i = 0; i < num_threads; i++) { + thread_states_[i] = thread_state{this}; + threads_[i] = base::start_thread(&worker_routine, &thread_states_[i]); + } + } + + ~scheduler() { + terminate(); + } + + template + void perform_work(Function work_section) { + root_master_task master{work_section}; + root_worker_task worker{&master}; + + thread_states_[0].root_task_ = &master; + for (unsigned int i = 1; i < num_threads_; i++) { + thread_states_[i].root_task_ = &worker; + } + + sync_barrier_.wait(); // Trigger threads to wake up + sync_barrier_.wait(); // Wait for threads to finish + } + + void terminate(bool wait_for_workers=true) { + if (terminated_) { + return; + } + + terminated_ = true; + sync_barrier_.wait(); + + if (wait_for_workers) { + for (unsigned int i = 0; i < num_threads_; i++) { + threads_[i].join(); + } + } + } + }; + } + } +} + +#endif //PLS_SCHEDULER_H diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h new file mode 100644 index 0000000..e28e270 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -0,0 +1,24 @@ + +#ifndef PLS_THREAD_STATE_H +#define PLS_THREAD_STATE_H + +#include "abstract_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + // forward declaration + class scheduler; + + struct thread_state { + thread_state(): scheduler_{nullptr}, root_task_{nullptr} {}; + explicit thread_state(scheduler* scheduler): scheduler_{scheduler}, root_task_{nullptr} {} + + scheduler* scheduler_; + abstract_task* root_task_; + }; + } + } +} + +#endif //PLS_THREAD_STATE_H diff --git a/lib/pls/src/internal/base/barrier.cpp b/lib/pls/src/internal/base/barrier.cpp new file mode 100644 index 0000000..038e030 --- /dev/null +++ b/lib/pls/src/internal/base/barrier.cpp @@ -0,0 +1,9 @@ +#include "pls/internal/base/barrier.h" + +namespace pls { + namespace internal { + namespace base { + + } + } +} diff --git a/lib/pls/src/internal/base/spin_lock.cpp b/lib/pls/src/internal/base/spin_lock.cpp index 99a2c1d..4c61216 100644 --- a/lib/pls/src/internal/base/spin_lock.cpp +++ b/lib/pls/src/internal/base/spin_lock.cpp @@ -3,6 +3,9 @@ namespace pls { namespace internal { namespace base { + // TODO: Research/Measure how the memory fences/orders influence this. + // For now we simply try to be safe by forcing this lock to + // also act as a strict memory fence. void spin_lock::lock() { int tries = 0; while (flag_.test_and_set(std::memory_order_acquire)) { diff --git a/lib/pls/src/internal/scheduling/abstract_task.cpp b/lib/pls/src/internal/scheduling/abstract_task.cpp new file mode 100644 index 0000000..314dc68 --- /dev/null +++ b/lib/pls/src/internal/scheduling/abstract_task.cpp @@ -0,0 +1,9 @@ +#include "pls/internal/scheduling/abstract_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + + } + } +} diff --git a/lib/pls/src/internal/scheduling/root_master_task.cpp b/lib/pls/src/internal/scheduling/root_master_task.cpp new file mode 100644 index 0000000..8b87c3b --- /dev/null +++ b/lib/pls/src/internal/scheduling/root_master_task.cpp @@ -0,0 +1,10 @@ +#include "pls/internal/scheduling/root_master_task.h" +#include "pls/internal/scheduling/root_worker_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + + } + } +} diff --git a/lib/pls/src/internal/scheduling/root_worker_task.cpp b/lib/pls/src/internal/scheduling/root_worker_task.cpp new file mode 100644 index 0000000..8b87c3b --- /dev/null +++ b/lib/pls/src/internal/scheduling/root_worker_task.cpp @@ -0,0 +1,10 @@ +#include "pls/internal/scheduling/root_master_task.h" +#include "pls/internal/scheduling/root_worker_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + + } + } +} diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp new file mode 100644 index 0000000..0f2f287 --- /dev/null +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -0,0 +1,9 @@ +#include "pls/internal/scheduling/scheduler.h" + +namespace pls { + namespace internal { + namespace scheduling { + + } + } +} diff --git a/lib/pls/src/internal/scheduling/thread_state.cpp b/lib/pls/src/internal/scheduling/thread_state.cpp new file mode 100644 index 0000000..8d467ed --- /dev/null +++ b/lib/pls/src/internal/scheduling/thread_state.cpp @@ -0,0 +1,9 @@ +#include "pls/internal/scheduling/thread_state.h" + +namespace pls { + namespace internal { + namespace scheduling { + + } + } +} -- libgit2 0.26.0