diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 95f4b16..975ba41 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -22,3 +22,8 @@ run_thread_sanitizer: stage: sanitizer script: ./ci_scripts/run_thread_sanitizer.sh + +run_address_sanitizer: + stage: sanitizer + script: + ./ci_scripts/run_address_sanitizer.sh diff --git a/CMakeLists.txt b/CMakeLists.txt index 92b9eca..513e8a0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,6 +14,7 @@ include(cmake/DisabelInSource.cmake) include(cmake/SetupOptimizationLevel.cmake) include(cmake/SetupThreadingSupport.cmake) include(cmake/SetupThreadSanitizer.cmake) +include(cmake/SetupAddressSanitizer.cmake) # make our internal cmake script collection avaliable in the build process. list(APPEND CMAKE_PREFIX_PATH "${PROJECT_SOURCE_DIR}/cmake") diff --git a/NOTES.md b/NOTES.md index f0ab970..d4b07ee 100644 --- a/NOTES.md +++ b/NOTES.md @@ -4,6 +4,13 @@ A collection of stuff that we noticed during development. Useful later on two write a project report and to go back in time to find out why certain decisions where made. +## 21.03.2018 - Allocation on stack/static memory + +We can use the [placement new](https://www.geeksforgeeks.org/placement-new-operator-cpp/) +operator for our tasks and other stuff to manage memory. +This can allow the pure 'stack based' approach without any memory +management suggested by mike. + ## 20.03.2018 - Prohibit New We want to write this library without using any runtime memory diff --git a/README.md b/README.md index 6c5d6c9..daa680b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # P³LS³ **P**redictable **P**arallel **P**atterns **L**ibrary for **S**calable **S**mart **S**ystems -Master Branch: [![pipeline status](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/badges/master/pipeline.svg)](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/commits/master) +[![pipeline status](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/badges/master/pipeline.svg)](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/commits/master) ## Project Structure diff --git a/ci_scripts/run_address_sanitizer.sh b/ci_scripts/run_address_sanitizer.sh new file mode 100755 index 0000000..ab095af --- /dev/null +++ b/ci_scripts/run_address_sanitizer.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +mkdir cmake-build-release-address-sanitizer +cd cmake-build-release-address-sanitizer +cmake .. -DCMAKE_BUILD_TYPE=RELEASE -DTHREAD_SANITIZER=OFF -DADDRESS_SANITIZER=ON +make + +# run the actual tests with sanitizer enabled, reporting the result +ASAN_OPTIONS="detect_stack_use_after_return=1 detect_leaks=1" ./bin/tests +STATUS_CODE=$? + +exit $STATUS_CODE diff --git a/cmake/SetupAddressSanitizer.cmake b/cmake/SetupAddressSanitizer.cmake new file mode 100644 index 0000000..f2422a7 --- /dev/null +++ b/cmake/SetupAddressSanitizer.cmake @@ -0,0 +1,10 @@ +# Optionally compile with thread sanitizer enabled to find concurrency bugs +# https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual + +# Add optional sanitizer, off by default +option(ADDRESS_SANITIZER "Add address sanitizer" OFF) +if(ADDRESS_SANITIZER) + add_compile_options(-fsanitize=address -fno-omit-frame-pointer -fsanitize-address-use-after-scope -g) + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address") +endif() +message("-- Address Sanitizer: ${ADDRESS_SANITIZER}") diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 270c8a3..661d055 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -3,7 +3,15 @@ add_library(pls STATIC src/library.cpp include/pls/library.h src/internal/base/spin_lock.cpp include/pls/internal/base/spin_lock.h src/internal/base/thread.cpp include/pls/internal/base/thread.h - include/pls/internal/base/prohibit_new.h) + include/pls/internal/base/prohibit_new.h + src/internal/scheduling/abstract_task.cpp include/pls/internal/scheduling/abstract_task.h + src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h + src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h + src/internal/base/barrier.cpp include/pls/internal/base/barrier.h + src/internal/scheduling/root_master_task.cpp include/pls/internal/scheduling/root_master_task.h + src/internal/base/aligned_stack.cpp include/pls/internal/base/aligned_stack.h + include/pls/internal/base/system_details.h + src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/base/aligned_stack.h b/lib/pls/include/pls/internal/base/aligned_stack.h new file mode 100644 index 0000000..9ff993c --- /dev/null +++ b/lib/pls/include/pls/internal/base/aligned_stack.h @@ -0,0 +1,54 @@ + +#ifndef PLS_ALIGNED_STACK_H +#define PLS_ALIGNED_STACK_H + +#include +#include + +namespace pls { + namespace internal { + namespace base { + class aligned_stack { + // Keep bounds of our memory block + char* memory_start_; + char* memory_end_; + + // Current head will always be aligned to cache lines + char* head_; + + static std::uintptr_t next_alignment(std::uintptr_t size); + static char* next_alignment(char* pointer); + public: + aligned_stack(): memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {}; + + aligned_stack(char* memory_region, const std::size_t size): + memory_start_{memory_region}, + memory_end_{memory_region + size}, + head_{next_alignment(memory_start_)} {} + + template + T* push(T object) { + T* result = reinterpret_cast(head_); + + // Move head to next aligned position after new object + head_ = next_alignment(head_ + sizeof(T)); + if (head_ >= memory_end_) { + exit(1); // TODO: Exception Handling + } + + *result = object; + return result; + } + + template + T pop() { + head_ = head_ - next_alignment(sizeof(T)); + + return *reinterpret_cast(head_); + } + }; + } + } +} + +#endif //PLS_ALIGNED_STACK_H diff --git a/lib/pls/include/pls/internal/base/barrier.h b/lib/pls/include/pls/internal/base/barrier.h new file mode 100644 index 0000000..f5ea58b --- /dev/null +++ b/lib/pls/include/pls/internal/base/barrier.h @@ -0,0 +1,30 @@ + +#ifndef PLS_BARRIER_H +#define PLS_BARRIER_H + +#include + +namespace pls { + namespace internal { + namespace base { + class barrier { + pthread_barrier_t barrier_; + + public: + explicit barrier(const unsigned int count): barrier_{} { + pthread_barrier_init(&barrier_, nullptr, count); + } + + ~barrier() { + pthread_barrier_destroy(&barrier_); + } + + void wait() { + pthread_barrier_wait(&barrier_); + } + }; + } + } +} + +#endif //PLS_BARRIER_H diff --git a/lib/pls/include/pls/internal/base/spin_lock.h b/lib/pls/include/pls/internal/base/spin_lock.h index 5236e9f..cfd3143 100644 --- a/lib/pls/include/pls/internal/base/spin_lock.h +++ b/lib/pls/include/pls/internal/base/spin_lock.h @@ -3,6 +3,7 @@ #define PLS_SPINLOCK_H #include +#include #include "pls/internal/base/thread.h" @@ -16,6 +17,9 @@ namespace pls { public: spin_lock(): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{1024} {}; + spin_lock(const spin_lock& other): flag_{ATOMIC_FLAG_INIT}, yield_at_tries_{other.yield_at_tries_} { + std::cout << "Spinlock Moved!" << std::endl; + } void lock(); void unlock(); diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h new file mode 100644 index 0000000..a8dfb72 --- /dev/null +++ b/lib/pls/include/pls/internal/base/system_details.h @@ -0,0 +1,15 @@ + +#ifndef PLS_SYSTEM_DETAILS_H +#define PLS_SYSTEM_DETAILS_H + +#include + +namespace pls { + namespace internal { + namespace base { + constexpr std::uintptr_t CACHE_LINE_SIZE = 64; + } + } +} + +#endif //PLS_SYSTEM_DETAILS_H diff --git a/lib/pls/include/pls/internal/base/thread.h b/lib/pls/include/pls/internal/base/thread.h index 265190e..f03be21 100644 --- a/lib/pls/include/pls/internal/base/thread.h +++ b/lib/pls/include/pls/internal/base/thread.h @@ -8,6 +8,7 @@ #include #include +#include namespace pls { namespace internal { @@ -57,20 +58,38 @@ namespace pls { Function function_; State* state_pointer_; + // Wee need to wait for the started function to read + // the function_ and state_pointer_ property before returning + // from the constructor, as the object might be moved after this. + std::atomic_flag* startup_flag_; + // Keep handle to native implementation pthread_t pthread_thread_; static void* start_pthread_internal(void* thread_pointer) { auto my_thread = reinterpret_cast(thread_pointer); - this_thread::set_state(my_thread->state_pointer_); - my_thread->function_(); + Function my_function_copy = my_thread->function_; + State* my_state_pointer_copy = my_thread->state_pointer_; + + // Now we have copies of everything we need on the stack. + // The original thread object can be moved freely (no more + // references to its memory location). + my_thread->startup_flag_->clear(); + + this_thread::set_state(my_state_pointer_copy); + my_function_copy(); + + // Finished executing the user function pthread_exit(nullptr); } public: + thread(): function_{}, state_pointer_{nullptr}, startup_flag_{nullptr}, pthread_thread_{} {} + explicit thread(const Function& function, State* state_pointer): function_{function}, state_pointer_{state_pointer}, + startup_flag_{nullptr}, pthread_thread_{} { if (!this_thread::local_storage_key_initialized_) { @@ -78,7 +97,14 @@ namespace pls { this_thread::local_storage_key_initialized_ = true; } + // We only need this during startup, will be destroyed when out of scope + std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; + startup_flag_ = &startup_flag; + + startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *)(this)); + while (startup_flag.test_and_set()) + ; // Busy waiting for the starting flag to clear } public: void join() { diff --git a/lib/pls/include/pls/internal/scheduling/abstract_task.h b/lib/pls/include/pls/internal/scheduling/abstract_task.h new file mode 100644 index 0000000..d736127 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/abstract_task.h @@ -0,0 +1,39 @@ + +#ifndef PLS_ABSTRACT_TASK_H +#define PLS_ABSTRACT_TASK_H + +#define PLS_UNIQUE_ID __COUNTER__ + +#include "pls/internal/base/spin_lock.h" + +namespace pls { + namespace internal { + namespace scheduling { + class abstract_task { + int depth_; + int unique_id_; + abstract_task* child_task_; + + public: + abstract_task(int depth, int unique_id): + depth_{depth}, + unique_id_{unique_id}, + child_task_{nullptr} {} + + virtual void execute() = 0; + void set_child(abstract_task* child_task) { child_task_ = child_task; } + abstract_task* child() { return child_task_; } + + void set_depth(int depth) { depth_ = depth; } + int depth() { return depth_; } + protected: + virtual bool internal_stealing(abstract_task* other_task) = 0; + virtual bool split_task() = 0; + + bool steal_work(); + }; + } + } +} + +#endif //PLS_ABSTRACT_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/root_master_task.h b/lib/pls/include/pls/internal/scheduling/root_master_task.h new file mode 100644 index 0000000..ba46408 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/root_master_task.h @@ -0,0 +1,51 @@ + +#ifndef PLS_ROOT_MASTER_TASK_H +#define PLS_ROOT_MASTER_TASK_H + +#include + +#include "abstract_task.h" +#include "pls/internal/base/spin_lock.h" + +namespace pls { + namespace internal { + namespace scheduling { + template + class root_master_task : public abstract_task { + Function function_; + bool finished_; + + // Improvement: Remove lock and replace by atomic variable (performance) + base::spin_lock finished_lock_; + public: + explicit root_master_task(Function function): + abstract_task{0, 0}, + function_{function}, + finished_{false} {} + + bool finished() { + std::lock_guard lock{finished_lock_}; + return finished_; + } + + void execute() override { + function_(); + { + std::lock_guard lock{finished_lock_}; + finished_ = true; + } + } + + bool internal_stealing(abstract_task* /*other_task*/) override { + return false; + } + + bool split_task() override { + return false; + } + }; + } + } +} + +#endif //PLS_ROOT_MASTER_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/root_worker_task.h b/lib/pls/include/pls/internal/scheduling/root_worker_task.h new file mode 100644 index 0000000..ff67cbd --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/root_worker_task.h @@ -0,0 +1,37 @@ + +#ifndef PLS_ROOT_WORKER_TASK_H +#define PLS_ROOT_WORKER_TASK_H + +#include "root_master_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + template + class root_worker_task : public abstract_task { + root_master_task* master_task_; + + public: + explicit root_worker_task(root_master_task* master_task): + abstract_task{0, 0}, + master_task_{master_task} {} + + void execute() override { + do { + steal_work(); + } while (!master_task_->finished()); + } + + bool internal_stealing(abstract_task* /*other_task*/) override { + return false; + } + + bool split_task() override { + return false; + } + }; + } + } +} + +#endif //PLS_ROOT_WORKER_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h new file mode 100644 index 0000000..d7e2a24 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h @@ -0,0 +1,111 @@ + +#ifndef PLS_RUN_ON_N_THREADS_TASK_H +#define PLS_RUN_ON_N_THREADS_TASK_H + +#include + +#include "pls/internal/base/spin_lock.h" +#include "pls/internal/base/thread.h" + +#include "abstract_task.h" +#include "thread_state.h" +#include "scheduler.h" + +namespace pls { + namespace internal { + namespace scheduling { + template + class run_on_n_threads_task : public abstract_task { + template + friend class run_on_n_threads_task_worker; + + Function function_; + + // Improvement: Remove lock and replace by atomic variable (performance) + int counter; + base::spin_lock counter_lock_; + + int decrement_counter() { + std::lock_guard lock{counter_lock_}; + counter--; + return counter; + } + + int get_counter() { + std::lock_guard lock{counter_lock_}; + return counter; + } + public: + run_on_n_threads_task(Function function, int num_threads): + abstract_task{PLS_UNIQUE_ID, 0}, + function_{function}, + counter{num_threads - 1} {} + + void execute() override { + // Execute our function ONCE + function_(); + + // Steal until we are finished (other threads executed) + do { + steal_work(); + } while (get_counter() > 0); + + std::cout << "Finished Master!" << std::endl; + } + + bool internal_stealing(abstract_task* /*other_task*/) override { + return false; + } + + bool split_task() override; + }; + + template + class run_on_n_threads_task_worker : public abstract_task { + Function function_; + run_on_n_threads_task* root_; + public: + run_on_n_threads_task_worker(Function function, run_on_n_threads_task* root): + abstract_task{PLS_UNIQUE_ID, 0}, + function_{function}, + root_{root} {} + + void execute() override { + if (root_->decrement_counter() >= 0) { + function_(); + std::cout << "Finished Worker!" << std::endl; + } else { + std::cout << "Abandoned Worker!" << std::endl; + } + } + + bool internal_stealing(abstract_task* /*other_task*/) override { + return false; + } + + bool split_task() override { + return false; + } + }; + + template + bool run_on_n_threads_task::split_task() { + if (get_counter() <= 0) { + return false; + } + + auto scheduler = base::this_thread::state()->scheduler_; + auto task = run_on_n_threads_task_worker{function_, this}; + scheduler->execute_task(task, depth()); + return true; + } + + template + run_on_n_threads_task create_run_on_n_threads_task(Function function, int num_threads) { + return run_on_n_threads_task{function, num_threads}; + } + } + } +} + +#endif //PLS_RUN_ON_N_THREADS_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h new file mode 100644 index 0000000..b8f1f39 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -0,0 +1,119 @@ + +#ifndef PLS_SCHEDULER_H +#define PLS_SCHEDULER_H + +#include +#include + +#include "pls/internal/base/aligned_stack.h" +#include "pls/internal/base/thread.h" +#include "pls/internal/base/barrier.h" + +#include "thread_state.h" +#include "root_master_task.h" +#include "root_worker_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + // Upper thread limit for static memory allocation. + // Could be moved to templating if needed. + static constexpr int MAX_THREADS = 32; + + void worker_routine(); + using scheduler_thread = base::thread; + + class scheduler_memory { + public: + virtual size_t max_threads() = 0; + virtual thread_state* thread_state_for(size_t id) = 0; + virtual scheduler_thread* thread_for(size_t id) = 0; + virtual base::aligned_stack* task_stack_for(size_t id) = 0; + }; + + template + class static_scheduler_memory: public scheduler_memory { + std::array threads_; + std::array thread_states_; + std::array, MAX_THREADS> task_stacks_memory_; + std::array task_stacks_; + + public: + static_scheduler_memory() { + for (size_t i = 0; i < MAX_THREADS; i++) { + task_stacks_[i] = base::aligned_stack(reinterpret_cast(&task_stacks_memory_[i]), TASK_STACK_SIZE); + } + } + + size_t max_threads() override { return MAX_THREADS; } + thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; } + scheduler_thread* thread_for(size_t id) override { return &threads_[id]; } + base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; } + }; + + class scheduler { + friend void worker_routine(); + + const unsigned int num_threads_; + scheduler_memory* memory_; + + base::barrier sync_barrier_; + bool terminated_; + public: + explicit scheduler(scheduler_memory* memory, unsigned int num_threads); + ~scheduler(); + + template + void perform_work(Function work_section) { + root_master_task master{work_section}; + root_worker_task worker{&master}; + + // Push root task on stacks + memory_->thread_state_for(0)->root_task_ = &master; + memory_->thread_state_for(0)->current_task_ = &master; + for (unsigned int i = 1; i < num_threads_; i++) { + memory_->thread_state_for(i)->root_task_ = &worker; + memory_->thread_state_for(i)->current_task_ = &worker; + } + + // Perform and wait for work + sync_barrier_.wait(); // Trigger threads to wake up + sync_barrier_.wait(); // Wait for threads to finish + } + + // TODO: See if we should place this differently (only for performance reasons) + template + static void execute_task(Task task, int depth=-1) { + static_assert(std::is_base_of::value, "Only pass abstract_task subclasses!"); + + auto my_state = base::this_thread::state(); + auto current_task = my_state->current_task_; + + // Init Task + { + std::lock_guard lock{my_state->lock_}; + task.set_depth(depth >= 0 ? depth : current_task->depth() + 1); + my_state->current_task_ = &task; + current_task->set_child(&task); + } + + // Run Task + task.execute(); + + // Teardown state back to before the task was executed + { + std::lock_guard lock{my_state->lock_}; + current_task->set_child(nullptr); + my_state->current_task_ = current_task; + } + } + + void terminate(bool wait_for_workers=true); + unsigned int num_threads() const { return num_threads_; } + thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); } + }; + } + } +} + +#endif //PLS_SCHEDULER_H diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h new file mode 100644 index 0000000..58dcc9b --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -0,0 +1,57 @@ + +#ifndef PLS_THREAD_STATE_H +#define PLS_THREAD_STATE_H + +#include "abstract_task.h" +#include "pls/internal/base/aligned_stack.h" + +namespace pls { + namespace internal { + namespace scheduling { + // forward declaration + class scheduler; + + struct thread_state { + scheduler* scheduler_; + abstract_task* root_task_; + abstract_task* current_task_; + base::aligned_stack* task_stack_; + unsigned int id_; + base::spin_lock lock_; + + thread_state(): + scheduler_{nullptr}, + root_task_{nullptr}, + current_task_{nullptr}, + task_stack_{nullptr}, + id_{0} {}; + + thread_state(scheduler* scheduler, base::aligned_stack* task_stack, unsigned int id): + scheduler_{scheduler}, + root_task_{nullptr}, + current_task_{nullptr}, + task_stack_{task_stack}, + id_{id} {} + + thread_state(const thread_state& other): + scheduler_{other.scheduler_}, + root_task_{other.root_task_}, + current_task_{other.current_task_}, + task_stack_{other.task_stack_}, + id_{other.id_} {} + + thread_state& operator=(const thread_state& other) { + scheduler_ = other.scheduler_; + root_task_ = other.root_task_; + current_task_ = other.current_task_; + task_stack_ = other.task_stack_; + id_ = other.id_; + + return *this; + } + }; + } + } +} + +#endif //PLS_THREAD_STATE_H diff --git a/lib/pls/src/internal/base/aligned_stack.cpp b/lib/pls/src/internal/base/aligned_stack.cpp new file mode 100644 index 0000000..4efe681 --- /dev/null +++ b/lib/pls/src/internal/base/aligned_stack.cpp @@ -0,0 +1,21 @@ +#include "pls/internal/base/aligned_stack.h" +#include "pls/internal/base/system_details.h" + +namespace pls { + namespace internal { + namespace base { + std::uintptr_t aligned_stack::next_alignment(std::uintptr_t size) { + std::uintptr_t miss_alignment = size % CACHE_LINE_SIZE; + if (miss_alignment == 0) { + return size; + } else { + return size + (CACHE_LINE_SIZE - miss_alignment); + } + } + + char* aligned_stack::next_alignment(char* pointer) { + return reinterpret_cast(next_alignment(reinterpret_cast(pointer))); + } + } + } +} diff --git a/lib/pls/src/internal/base/barrier.cpp b/lib/pls/src/internal/base/barrier.cpp new file mode 100644 index 0000000..038e030 --- /dev/null +++ b/lib/pls/src/internal/base/barrier.cpp @@ -0,0 +1,9 @@ +#include "pls/internal/base/barrier.h" + +namespace pls { + namespace internal { + namespace base { + + } + } +} diff --git a/lib/pls/src/internal/base/spin_lock.cpp b/lib/pls/src/internal/base/spin_lock.cpp index 99a2c1d..4c61216 100644 --- a/lib/pls/src/internal/base/spin_lock.cpp +++ b/lib/pls/src/internal/base/spin_lock.cpp @@ -3,6 +3,9 @@ namespace pls { namespace internal { namespace base { + // TODO: Research/Measure how the memory fences/orders influence this. + // For now we simply try to be safe by forcing this lock to + // also act as a strict memory fence. void spin_lock::lock() { int tries = 0; while (flag_.test_and_set(std::memory_order_acquire)) { diff --git a/lib/pls/src/internal/scheduling/abstract_task.cpp b/lib/pls/src/internal/scheduling/abstract_task.cpp new file mode 100644 index 0000000..ba69bcb --- /dev/null +++ b/lib/pls/src/internal/scheduling/abstract_task.cpp @@ -0,0 +1,56 @@ +#include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/abstract_task.h" +#include "pls/internal/scheduling/scheduler.h" + +namespace pls { + namespace internal { + namespace scheduling { + bool abstract_task::steal_work() { + auto my_state = base::this_thread::state(); + auto my_scheduler = my_state->scheduler_; + + int my_id = my_state->id_; + for (size_t i = 1; i < my_scheduler->num_threads(); i++) { + size_t target = (my_id + i) % my_scheduler->num_threads(); + auto target_state = my_scheduler->thread_state_for(target); + std::lock_guard lock{target_state->lock_}; + + // Dig down to our level + abstract_task* current_task = target_state->root_task_; + while (current_task != nullptr && current_task->depth() < depth()) { + current_task = current_task->child_task_; + } + + if (current_task != nullptr) { + // See if it equals our type and depth of task + if (current_task->unique_id_ == unique_id_ && + current_task->depth_ == depth_) { + if (internal_stealing(current_task)) { + // internal steal was a success, hand it back to the internal scheduler + return true; + } + + // No success, we need to steal work from a deeper level using 'top level task stealing' + current_task = current_task->child_task_; + } + } + + + // Execute 'top level task steal' if possible + // (only try deeper tasks to keep depth restricted stealing) + while (current_task != nullptr) { + if (current_task->split_task()) { + // internal steal was no success (we did a top level task steal) + return false; + } + + current_task = current_task->child_task_; + } + } + + // internal steal was no success + return false; + }; + } + } +} diff --git a/lib/pls/src/internal/scheduling/root_master_task.cpp b/lib/pls/src/internal/scheduling/root_master_task.cpp new file mode 100644 index 0000000..8b87c3b --- /dev/null +++ b/lib/pls/src/internal/scheduling/root_master_task.cpp @@ -0,0 +1,10 @@ +#include "pls/internal/scheduling/root_master_task.h" +#include "pls/internal/scheduling/root_worker_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + + } + } +} diff --git a/lib/pls/src/internal/scheduling/root_worker_task.cpp b/lib/pls/src/internal/scheduling/root_worker_task.cpp new file mode 100644 index 0000000..8b87c3b --- /dev/null +++ b/lib/pls/src/internal/scheduling/root_worker_task.cpp @@ -0,0 +1,10 @@ +#include "pls/internal/scheduling/root_master_task.h" +#include "pls/internal/scheduling/root_worker_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + + } + } +} diff --git a/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp b/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp new file mode 100644 index 0000000..e41571f --- /dev/null +++ b/lib/pls/src/internal/scheduling/run_on_n_threads_task.cpp @@ -0,0 +1,9 @@ +#include "pls/internal/scheduling/run_on_n_threads_task.h" + +namespace pls { + namespace internal { + namespace scheduling { + + } + } +} diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp new file mode 100644 index 0000000..8e55147 --- /dev/null +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -0,0 +1,59 @@ +#include "pls/internal/scheduling/scheduler.h" + +namespace pls { + namespace internal { + namespace scheduling { + scheduler::scheduler(scheduler_memory* memory, const unsigned int num_threads): + num_threads_{num_threads}, + memory_{memory}, + sync_barrier_{num_threads + 1}, + terminated_{false} { + if (num_threads > MAX_THREADS) { + exit(1); // TODO: Exception Handling + } + + for (unsigned int i = 0; i < num_threads; i++) { + *memory_->thread_state_for(i) = thread_state{this, memory_->task_stack_for(i), i}; + *memory_->thread_for(i) = base::start_thread(&worker_routine, memory_->thread_state_for(i)); + } + } + + scheduler::~scheduler() { + terminate(); + } + + void worker_routine() { + auto my_state = base::this_thread::state(); + + while (true) { + my_state->scheduler_->sync_barrier_.wait(); + if (my_state->scheduler_->terminated_) { + return; + } + + // The root task must only return when all work is done, + // because of this a simple call is enough to ensure the + // fork-join-section is done (logically joined back into our main thread). + my_state->root_task_->execute(); + + my_state->scheduler_->sync_barrier_.wait(); + } + } + + void scheduler::terminate(bool wait_for_workers) { + if (terminated_) { + return; + } + + terminated_ = true; + sync_barrier_.wait(); + + if (wait_for_workers) { + for (unsigned int i = 0; i < num_threads_; i++) { + memory_->thread_for(i)->join(); + } + } + } + } + } +} diff --git a/lib/pls/src/internal/scheduling/thread_state.cpp b/lib/pls/src/internal/scheduling/thread_state.cpp new file mode 100644 index 0000000..8d467ed --- /dev/null +++ b/lib/pls/src/internal/scheduling/thread_state.cpp @@ -0,0 +1,9 @@ +#include "pls/internal/scheduling/thread_state.h" + +namespace pls { + namespace internal { + namespace scheduling { + + } + } +} diff --git a/test/base_tests.cpp b/test/base_tests.cpp index d64614a..8780513 100644 --- a/test/base_tests.cpp +++ b/test/base_tests.cpp @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include #include @@ -82,3 +84,66 @@ TEST_CASE( "spinlock protects concurrent counter", "[internal/base/spinlock.h]") REQUIRE(base_tests_shared_counter == 0); } } + +TEST_CASE( "aligned stack stores objects correctly", "[internal/base/aligned_stack.h]") { + constexpr long data_size = 1024; + char data[data_size]; + aligned_stack stack{data, data_size}; + + SECTION( "stack correctly pushes sub linesize objects" ) { + std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; + std::array small_data_two{}; + std::array small_data_three{'A'}; + + auto pointer_one = stack.push(small_data_one); + auto pointer_two = stack.push(small_data_two); + auto pointer_three = stack.push(small_data_three); + + REQUIRE(reinterpret_cast(pointer_one) % CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_two) % CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_three) % CACHE_LINE_SIZE == 0); + } + + SECTION( "stack correctly pushes above linesize objects" ) { + std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; + std::array big_data_one{}; + + auto big_pointer_one = stack.push(big_data_one); + auto small_pointer_one = stack.push(small_data_one); + + REQUIRE(reinterpret_cast(big_pointer_one) % CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(small_pointer_one) % CACHE_LINE_SIZE == 0); + } + + SECTION( "stack correctly stores and retrieves objects" ) { + std::array data_one{'a', 'b', 'c', 'd', 'e'}; + + stack.push(data_one); + auto retrieved_data = stack.pop>(); + + REQUIRE(retrieved_data == std::array{'a', 'b', 'c', 'd', 'e'}); + } + + SECTION( "stack can push and pop multiple times with correct alignment" ) { + std::array small_data_one{'a', 'b', 'c', 'd', 'e'}; + std::array small_data_two{}; + std::array small_data_three{'A'}; + + auto pointer_one = stack.push(small_data_one); + auto pointer_two = stack.push(small_data_two); + auto pointer_three = stack.push(small_data_three); + stack.pop(); + stack.pop(); + auto pointer_four = stack.push(small_data_two); + auto pointer_five = stack.push(small_data_three); + + REQUIRE(reinterpret_cast(pointer_one) % CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_two) % CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_three) % CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_four) % CACHE_LINE_SIZE == 0); + REQUIRE(reinterpret_cast(pointer_five) % CACHE_LINE_SIZE == 0); + + REQUIRE(pointer_four == pointer_two); + REQUIRE(pointer_five == pointer_three); + } +}