Commit a8a35479 by Florian Fritz

Merge branch 'cache_align' into 'master'

Merge: Cache Align thread_state

See merge request !7
parents 310c33d2 65d91329
Pipeline #1147 passed with stages
in 3 minutes 35 seconds
......@@ -16,6 +16,7 @@ include(cmake/SetupThreadingSupport.cmake)
include(cmake/SetupThreadSanitizer.cmake)
include(cmake/SetupAddressSanitizer.cmake)
include(cmake/SetupEasyProfiler.cmake)
include(cmake/SetupDebugSymbols.cmake)
# make our internal cmake script collection avaliable in the build process.
list(APPEND CMAKE_PREFIX_PATH "${PROJECT_SOURCE_DIR}/cmake")
......
......@@ -4,6 +4,19 @@ A collection of stuff that we noticed during development.
Useful later on two write a project report and to go back
in time to find out why certain decisions where made.
## 09.02.2019 - Cache Alignment
Aligning the cache needs all parts (both data types with correct alignment
and base memory with correct alignment).
Our first tests show that the initial alignment (Commit 3535cbd8),
boostet the performance in the fft_benchmark from our library to
Intel TBB's speedup when running on up to 4 threads.
When crossing the boundary to hyper-threading this falls of.
We therefore think that contemption/cache misses are the reason for
bad performance above 4 threads, but have to investigate further to
pin down the issue.
## 08.04.2019 - Random Numbers
We decided to go for a simple linear random number generator
......
# Notes on performance measures during development
#### Commit 9c12addf
#### Commit 52fcb51f - Add basic random stealing
Slight improvement, needs further measurement after removing more important bottlenecks.
| | | | | | | | | | |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
......@@ -19,3 +21,9 @@ change | 100.39 %| 99.14 %| 98.46 %| 107.74 %| 100.17 %|
old | 1654.26 us| 969.12 us| 832.13 us| 680.69 us| 718.70 us| 750.80 us| 744.12 us| 775.24 us| 7125.07 us
new | 1637.04 us| 978.09 us| 799.93 us| 709.33 us| 746.42 us| 684.87 us| 822.30 us| 787.61 us| 7165.59 us
change | 98.96 %| 100.93 %| 96.13 %| 104.21 %| 103.86 %| 91.22 %| 110.51 %| 101.60 %| 100.57 %
#### Commit 3535cbd8 - Cache Align scheduler_memory
Big improvements of about 6% in our test. This seems like a little,
but 6% from the scheduler is a lot, as the 'main work' is the tasks
itself, not the scheduler.
......@@ -3,6 +3,70 @@
[![pipeline status](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/badges/master/pipeline.svg)](http://lab.las3.de/gitlab/las3/development/scheduling/predictable_parallel_patterns/commits/master)
## Getting Started
This section will give a brief introduction on how to get a minimal
project setup that uses the PLS library.
### Installation
Clone the repository and open a terminal session in its folder.
Create a build folder using `mkdir cmake-build-release`
and switch into it `cd cmake-build-release`.
Setup the cmake project using `cmake ../ -DCMAKE_BUILD_TYPE=RELEASE`,
then install it as a system wide dependency using `sudo make install.pls`.
At this point the library is installed on your system.
To use it simply add it to your existing cmake project using
`find_package(pls REQUIRED)` and then link it to your project
using `target_link_libraries(your_target pls::pls)`.
### Basic Usage
```c++
#include <pls/pls.h>
#include <iostream>
long fib(long n);
int main() {
// All memory needed by the scheduler can be allocated in advance either on stack or using malloc.
const unsigned int num_threads = 8;
const unsigned int memory_per_thread = 2 << 14;
static pls::static_scheduler_memory<num_threads, memory_per_thread> memory;
// Create the scheduler instance (starts a thread pool).
pls::scheduler scheduler{&memory, num_threads};
// Wake up the thread pool and perform work.
scheduler.perform_work([&] {
long result = fib(20);
std::cout << "fib(20)=" << result << std::endl;
});
// At this point the thread pool sleeps.
// This can for example be used for periodic work.
}
long fib(long n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
// Example for the high level API.
// Will run both functions in parallel as seperate tasks.
int left, right;
pls::invoke_parallel(
[&] { left = fib(n - 1); },
[&] { right = fib(n - 2); }
);
return left + right;
}
```
## Project Structure
......
......@@ -30,7 +30,7 @@ void combine(complex_vector::iterator data, int n) {
std::complex<double> odd = data[i + n / 2];
// w is the "twiddle-factor".
// this could be cached, but we run the same 'base' algorithm parallel/serial,
// this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
// so it won't impact the performance comparison.
std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n));
......
......@@ -34,7 +34,7 @@ long fib(long n) {
int main() {
PROFILE_ENABLE
pls::scheduler scheduler{&my_scheduler_memory, 8};
pls::scheduler scheduler{&my_scheduler_memory, 2};
long result;
scheduler.perform_work([&] {
......
......@@ -3,69 +3,17 @@
#include <functional>
#include <array>
#include <atomic>
#include <memory>
#include <pls/pls.h>
#include <pls/internal/helpers/prohibit_new.h>
#include <pls/internal/scheduling/thread_state.h>
using namespace pls;
// Example for static memory allocation (no malloc or free required)
static static_scheduler_memory<8, 2 << 12> my_scheduler_memory;
class fib: public fork_join_sub_task {
static constexpr int CUTOFF = 20;
int num_;
int* result_;
public:
fib(int num, int* result): num_{num}, result_{result} {}
private:
static int fib_serial(int num) {
if (num == 0) {
return 0;
}
if (num == 1) {
return 1;
}
return fib_serial(num - 1) + fib_serial(num - 2);
}
protected:
void execute_internal() override {
if (num_ <= CUTOFF) {
*result_ = fib_serial(num_);
return;
}
int left_result;
int right_result;
spawn_child(fib{num_ - 1, &left_result});
spawn_child(fib{num_ - 2, &right_result});
wait_for_all();
*result_ = left_result + right_result;
}
};
int main() {
scheduler my_scheduler{&my_scheduler_memory, 4};
auto start = std::chrono::high_resolution_clock::now();
my_scheduler.perform_work([] (){
int result;
fib fib_sub_task{45, &result};
fork_join_task tbb_task{&fib_sub_task, task_id{1}};
scheduler::execute_task(tbb_task);
std::cout << "Result: " << result << std::endl;
});
auto end = std::chrono::high_resolution_clock::now();
long time = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "Startup time in us: " << time << std::endl;
malloc_scheduler_memory sched_memory{8};
std::cout << (std::uintptr_t)sched_memory.thread_for(0) % 64 << ", " << (std::uintptr_t)sched_memory.thread_for(1) % 64 << ", " << (std::uintptr_t)sched_memory.thread_for(2) % 64 << ", " << std::endl;
std::cout << (std::uintptr_t)sched_memory.thread_state_for(0) % 64 << ", " << (std::uintptr_t)sched_memory.thread_state_for(1) % 64 << ", " << (std::uintptr_t)sched_memory.thread_state_for(2) % 64 << ", " << std::endl;
std::cout << (std::uintptr_t)sched_memory.task_stack_for(0) % 64 << ", " << (std::uintptr_t)sched_memory.task_stack_for(1) % 64 << ", " << (std::uintptr_t)sched_memory.task_stack_for(2) % 64 << ", " << std::endl;
}
option(DEBUG_SYMBOLS "Enable debug symbols" OFF)
if(DEBUG_SYMBOLS)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g")
endif()
message("-- Debug Symbols: ${DEBUG_SYMBOLS}")
# List all required files here (cmake best practice to NOT automate this step!)
add_library(pls STATIC
src/pls.cpp include/pls/pls.h
src/internal/base/spin_lock.cpp include/pls/internal/base/spin_lock.h
src/internal/base/thread.cpp include/pls/internal/base/thread.h
include/pls/internal/helpers/prohibit_new.h
src/internal/scheduling/abstract_task.cpp include/pls/internal/scheduling/abstract_task.h
src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h
src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h
src/internal/base/barrier.cpp include/pls/internal/base/barrier.h
src/internal/scheduling/root_task.cpp include/pls/internal/scheduling/root_task.h
src/internal/base/aligned_stack.cpp include/pls/internal/base/aligned_stack.h
include/pls/pls.h src/pls.cpp
include/pls/algorithms/invoke_parallel.h src/algorithms/invoke_parallel.cpp
include/pls/internal/base/spin_lock.h src/internal/base/spin_lock.cpp
include/pls/internal/base/thread.h src/internal/base/thread.cpp
include/pls/internal/base/barrier.h src/internal/base/barrier.cpp
include/pls/internal/base/system_details.h
src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h
src/internal/scheduling/fork_join_task.cpp include/pls/internal/scheduling/fork_join_task.h
src/internal/base/deque.cpp include/pls/internal/base/deque.h
src/algorithms/invoke_parallel.cpp include/pls/algorithms/invoke_parallel.h
include/pls/internal/base/error_handling.h
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp
include/pls/internal/helpers/prohibit_new.h
include/pls/internal/helpers/profiler.h
include/pls/internal/helpers/mini_benchmark.h)
include/pls/internal/helpers/mini_benchmark.h
include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp
include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......@@ -63,6 +70,13 @@ INSTALl(
FILES pls-config.cmake
DESTINATION lib/pls
)
# ...add a custom target that will only build the library when istalling.
# This can allow us to speed up the installation on embedded devices.
ADD_CUSTOM_TARGET(install.pls
${CMAKE_COMMAND}
-DBUILD_TYPE=${CMAKE_BUILD_TYPE}
-P ${CMAKE_BINARY_DIR}/cmake_install.cmake)
ADD_DEPENDENCIES(install.pls pls)
# Enable warnings/tidy code checking from our compiler
target_compile_options(pls PRIVATE
......
#ifndef PLS_ALIGNMENT_H
#define PLS_ALIGNMENT_H
#include <cstdint>
#include <cstdlib>
#include "system_details.h"
namespace pls {
namespace internal {
namespace base {
namespace alignment {
template<typename T>
struct aligned_wrapper {
alignas(system_details::CACHE_LINE_SIZE) unsigned char data[sizeof(T)];
T* pointer() { return reinterpret_cast<T*>(data); }
};
void* allocate_aligned(size_t size);
std::uintptr_t next_alignment(std::uintptr_t size);
char* next_alignment(char* pointer);
}
}
}
}
#endif //PLS_ALIGNMENT_H
......@@ -7,21 +7,22 @@
namespace pls {
namespace internal {
namespace base {
/**
* Provides standard barrier behaviour.
* `count` threads have to call `wait()` before any of the `wait()` calls returns,
* thus blocking all threads until everyone reached the barrier.
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
class barrier {
pthread_barrier_t barrier_;
public:
explicit barrier(const unsigned int count): barrier_{} {
pthread_barrier_init(&barrier_, nullptr, count);
}
~barrier() {
pthread_barrier_destroy(&barrier_);
}
explicit barrier(unsigned int count);
~barrier();
void wait() {
pthread_barrier_wait(&barrier_);
}
void wait();
};
}
}
......
......@@ -4,7 +4,12 @@
#include <iostream>
// TODO: Figure out proper exception handling
/**
* Called when there is an non-recoverable error/invariant in the scheduler.
* This SHOULD NOT HAPPEN AT ANY POINT in production, any instance of this is a bug!
* The implementation can be changed if for example no iostream is available on a system
* (or its inclusion adds too much overhead).
*/
#define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1);
#endif //PLS_ERROR_HANDLING_H
......@@ -10,6 +10,12 @@
namespace pls {
namespace internal {
namespace base {
/**
* A simple set and test_and_set based spin lock implementation.
*
* PORTABILITY:
* Current implementation is based on C++ 11 atomic_flag.
*/
class spin_lock {
std::atomic_flag flag_;
int yield_at_tries_;
......
......@@ -7,7 +7,25 @@
namespace pls {
namespace internal {
namespace base {
/**
* Collection of system details, e.g. hardware cache line size.
*
* PORTABILITY:
* Currently sane default values for x86.
*/
namespace system_details {
/**
* Most processors have 64 byte cache lines
*/
constexpr std::uintptr_t CACHE_LINE_SIZE = 64;
/**
* Choose one of the following ways to store thread specific data.
* Try to choose the fastest available on this processor/system.
*/
// #define PLS_THREAD_SPECIFIC_PTHREAD
#define PLS_THREAD_SPECIFIC_COMPILER
}
}
}
}
......
......@@ -10,17 +10,33 @@
#include <pthread.h>
#include <atomic>
#include "system_details.h"
namespace pls {
namespace internal {
namespace base {
using thread_entrypoint = void();
/**
* Static methods than can be performed on the current thread.
*
* usage:
* this_thread::yield();
* T* state = this_thread::state<T>();
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
class this_thread {
template<typename Function, typename State>
friend class thread;
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
static pthread_key_t local_storage_key_;
static bool local_storage_key_initialized_;
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
static __thread void* local_state_;
#endif
public:
static void yield() {
pthread_yield();
......@@ -34,7 +50,12 @@ namespace pls {
*/
template<typename T>
static T* state() {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
return reinterpret_cast<T*>(pthread_getspecific(local_storage_key_));
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
return reinterpret_cast<T*>(local_state_);
#endif
}
/**
......@@ -47,10 +68,31 @@ namespace pls {
*/
template<typename T>
static void set_state(T* state_pointer) {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_setspecific(this_thread::local_storage_key_, (void*)state_pointer);
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
local_state_ = state_pointer;
#endif
}
};
/**
* Abstraction for starting a function in a sparate thread.
*
* @tparam Function Lambda being started on the new thread.
* @tparam State State type held for this thread.
*
* usage:
* T* state;
* auto thread = start_thread([] {
* // Run on new thread
* }, state);
* thread.join(); // Wait for it to finish
*
* PORTABILITY:
* Current implementation is based on pthreads.
*/
template<typename Function, typename State>
class thread {
friend class this_thread;
......@@ -92,10 +134,12 @@ namespace pls {
startup_flag_{nullptr},
pthread_thread_{} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
if (!this_thread::local_storage_key_initialized_) {
pthread_key_create(&this_thread::local_storage_key_, nullptr);
this_thread::local_storage_key_initialized_ = true;
}
#endif
// We only need this during startup, will be destroyed when out of scope
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
......
......@@ -6,10 +6,23 @@
#include <cstdlib>
#include "pls/internal/base/error_handling.h"
#include "pls/internal/base/alignment.h"
namespace pls {
namespace internal {
namespace base {
namespace data_structures {
/**
* Generic stack-like data structure that allows to allocate arbitrary objects in a given memory region.
* The objects will be stored aligned in the stack, making the storage cache friendly and very fast
* (as long as one can live with the stack restrictions).
*
* IMPORTANT: Does not call destructors on stored objects! Do not allocate resources in the objects!
*
* Usage:
* aligned_stack stack{pointer_to_memory, size_of_memory};
* T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack
* stack.pop<T>(); // Deconstruct the top object of type T
*/
class aligned_stack {
// Keep bounds of our memory block
char* memory_start_;
......@@ -17,22 +30,15 @@ namespace pls {
// Current head will always be aligned to cache lines
char* head_;
static std::uintptr_t next_alignment(std::uintptr_t size);
static char* next_alignment(char* pointer);
public:
typedef char* state;
aligned_stack(): memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {};
aligned_stack(char* memory_region, const std::size_t size):
memory_start_{memory_region},
memory_end_{memory_region + size},
head_{next_alignment(memory_start_)} {}
aligned_stack(char* memory_region, std::size_t size);
template<typename T>
T* push(const T& object) {
// Placement new into desired memory location
// Copy-Construct
return new ((void*)push<T>())T(object);
}
......@@ -41,7 +47,7 @@ namespace pls {
void* result = reinterpret_cast<T*>(head_);
// Move head to next aligned position after new object
head_ = next_alignment(head_ + sizeof(T));
head_ = base::alignment::next_alignment(head_ + sizeof(T));
if (head_ >= memory_end_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
......@@ -51,8 +57,7 @@ namespace pls {
template<typename T>
T pop() {
head_ = head_ - next_alignment(sizeof(T));
head_ = head_ - base::alignment::next_alignment(sizeof(T));
return *reinterpret_cast<T*>(head_);
}
......
......@@ -2,11 +2,14 @@
#ifndef PLS_DEQUE_H
#define PLS_DEQUE_H
#include "spin_lock.h"
#include "pls/internal/base/spin_lock.h"
namespace pls {
namespace internal {
namespace base {
namespace data_structures {
/**
* Turns any object into deque item when inheriting from this.
*/
class deque_item {
friend class deque_internal;
......@@ -20,13 +23,19 @@ namespace pls {
deque_item* head_;
deque_item* tail_;
spin_lock lock_;
base::spin_lock lock_;
deque_item* pop_head_internal();
deque_item* pop_tail_internal();
void push_tail_internal(deque_item *new_item);
};
/**
* A double linked list based deque.
* Storage is therefore only needed for the individual items.
*
* @tparam Item The type of items stored in this deque
*/
template<typename Item>
class deque: deque_internal {
public:
......
......@@ -4,8 +4,8 @@
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/base/deque.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/deque.h"
#include "abstract_task.h"
#include "thread_state.h"
......@@ -14,7 +14,7 @@ namespace pls {
namespace internal {
namespace scheduling {
class fork_join_task;
class fork_join_sub_task: public base::deque_item {
class fork_join_sub_task: public data_structures::deque_item {
friend class fork_join_task;
// Coordinate finishing of sub_tasks
......@@ -25,7 +25,7 @@ namespace pls {
fork_join_task* tbb_task_;
// Stack Management (reset stack pointer after wait_for_all() calls)
base::aligned_stack::state stack_state_;
data_structures::aligned_stack::state stack_state_;
protected:
explicit fork_join_sub_task();
fork_join_sub_task(const fork_join_sub_task& other);
......@@ -62,10 +62,10 @@ namespace pls {
fork_join_sub_task* root_task_;
fork_join_sub_task* currently_executing_;
base::aligned_stack* my_stack_;
data_structures::aligned_stack* my_stack_;
// Double-Ended Queue management
base::deque<fork_join_sub_task> deque_;
data_structures::deque<fork_join_sub_task> deque_;
// Steal Management
fork_join_sub_task* last_stolen_;
......
......@@ -7,7 +7,8 @@
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h"
......
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/thread.h"
#include "thread_state.h"
......@@ -14,47 +14,62 @@ namespace pls {
class scheduler_memory {
public:
virtual size_t max_threads() = 0;
virtual size_t max_threads() const = 0;
virtual thread_state* thread_state_for(size_t id) = 0;
virtual scheduler_thread* thread_for(size_t id) = 0;
virtual base::aligned_stack* task_stack_for(size_t id) = 0;
virtual data_structures::aligned_stack* task_stack_for(size_t id) = 0;
};
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
class static_scheduler_memory: public scheduler_memory {
std::array<scheduler_thread, MAX_THREADS> threads_;
std::array<thread_state, MAX_THREADS> thread_states_;
std::array<std::array<char, TASK_STACK_SIZE>, MAX_THREADS> task_stacks_memory_;
std::array<base::aligned_stack, MAX_THREADS> task_stacks_;
// Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_thread_stack = base::alignment::aligned_wrapper<std::array<char, TASK_STACK_SIZE>>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
std::array<aligned_thread, MAX_THREADS> threads_;
std::array<aligned_thread_state, MAX_THREADS> thread_states_;
std::array<aligned_thread_stack, MAX_THREADS> task_stacks_memory_;
std::array<aligned_aligned_stack, MAX_THREADS> task_stacks_;
public:
static_scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
task_stacks_[i] = base::aligned_stack(task_stacks_memory_[i].data(), TASK_STACK_SIZE);
new ((void*)task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i].pointer()->data(), TASK_STACK_SIZE);
}
}
size_t max_threads() override { return MAX_THREADS; }
thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; }
scheduler_thread* thread_for(size_t id) override { return &threads_[id]; }
base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; }
size_t max_threads() const override { return MAX_THREADS; }
thread_state* thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
scheduler_thread* thread_for(size_t id) override { return threads_[id].pointer(); }
data_structures::aligned_stack* task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
};
class malloc_scheduler_memory: public scheduler_memory {
size_t num_threads_;
// Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
const size_t num_threads_;
scheduler_thread* threads_;
thread_state* thread_states_;
aligned_thread* threads_;
aligned_thread_state * thread_states_;
char** task_stacks_memory_;
base::aligned_stack* task_stacks_;
aligned_aligned_stack * task_stacks_;
public:
explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16);
~malloc_scheduler_memory();
size_t max_threads() override { return num_threads_; }
thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; }
scheduler_thread* thread_for(size_t id) override { return &threads_[id]; }
base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; }
size_t max_threads() const override { return num_threads_; }
thread_state* thread_state_for(size_t id) override { return thread_states_[id].pointer(); }
scheduler_thread* thread_for(size_t id) override { return threads_[id].pointer(); }
data_structures::aligned_stack* task_stack_for(size_t id) override { return task_stacks_[id].pointer(); }
};
}
}
......
......@@ -4,10 +4,9 @@
#include <random>
#include "pls/internal/data_structures/aligned_stack.h"
#include "abstract_task.h"
#include "pls/internal/base/aligned_stack.h"
namespace pls {
namespace internal {
namespace scheduling {
......@@ -18,7 +17,7 @@ namespace pls {
scheduler* scheduler_;
abstract_task* root_task_;
abstract_task* current_task_;
base::aligned_stack* task_stack_;
data_structures::aligned_stack* task_stack_;
size_t id_;
base::spin_lock lock_;
std::minstd_rand random_;
......@@ -31,7 +30,7 @@ namespace pls {
id_{0},
random_{id_} {};
thread_state(scheduler* scheduler, base::aligned_stack* task_stack, unsigned int id):
thread_state(scheduler* scheduler, data_structures::aligned_stack* task_stack, unsigned int id):
scheduler_{scheduler},
root_task_{nullptr},
current_task_{nullptr},
......
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/base/alignment.h"
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace base {
std::uintptr_t aligned_stack::next_alignment(std::uintptr_t size) {
std::uintptr_t miss_alignment = size % CACHE_LINE_SIZE;
namespace alignment {
void* allocate_aligned(size_t size) {
return aligned_alloc(system_details::CACHE_LINE_SIZE, size);
}
std::uintptr_t next_alignment(std::uintptr_t size) {
std::uintptr_t miss_alignment = size % base::system_details::CACHE_LINE_SIZE;
if (miss_alignment == 0) {
return size;
} else {
return size + (CACHE_LINE_SIZE - miss_alignment);
return size + (base::system_details::CACHE_LINE_SIZE - miss_alignment);
}
}
char* aligned_stack::next_alignment(char* pointer) {
char* next_alignment(char* pointer) {
return reinterpret_cast<char*>(next_alignment(reinterpret_cast<std::uintptr_t >(pointer)));
}
}
}
}
}
......@@ -3,7 +3,17 @@
namespace pls {
namespace internal {
namespace base {
barrier::barrier(const unsigned int count): barrier_{} {
pthread_barrier_init(&barrier_, nullptr, count);
}
barrier::~barrier() {
pthread_barrier_destroy(&barrier_);
}
void barrier::wait() {
pthread_barrier_wait(&barrier_);
}
}
}
}
......@@ -3,8 +3,13 @@
namespace pls {
namespace internal {
namespace base {
bool this_thread::local_storage_key_initialized_ = false;
pthread_key_t this_thread::local_storage_key_;
#ifdef PLS_THREAD_SPECIFIC_PTHREAD
pthread_key_t this_thread::local_storage_key_ = false;
bool this_thread::local_storage_key_initialized_;
#endif
#ifdef PLS_THREAD_SPECIFIC_COMPILER
__thread void* this_thread::local_state_;
#endif
// implementation in header (C++ templating)
}
}
......
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace data_structures {
aligned_stack::aligned_stack(char* memory_region, const std::size_t size):
memory_start_{memory_region},
memory_end_{memory_region + size},
head_{base::alignment::next_alignment(memory_start_)} {}
}
}
}
#include <mutex>
#include "pls/internal/base/deque.h"
#include "pls/internal/data_structures/deque.h"
namespace pls {
namespace internal {
namespace base {
namespace data_structures {
deque_item* deque_internal::pop_head_internal() {
std::lock_guard<spin_lock> lock{lock_};
std::lock_guard<base::spin_lock> lock{lock_};
if (head_ == nullptr) {
return nullptr;
......@@ -24,7 +24,7 @@ namespace pls {
}
deque_item* deque_internal::pop_tail_internal() {
std::lock_guard<spin_lock> lock{lock_};
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ == nullptr) {
return nullptr;
......@@ -42,7 +42,7 @@ namespace pls {
}
void deque_internal::push_tail_internal(deque_item *new_item) {
std::lock_guard<spin_lock> lock{lock_};
std::lock_guard<base::spin_lock> lock{lock_};
if (tail_ != nullptr) {
tail_->prev_ = new_item;
......
......@@ -7,14 +7,14 @@ namespace pls {
namespace internal {
namespace scheduling {
fork_join_sub_task::fork_join_sub_task():
base::deque_item{},
data_structures::deque_item{},
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other):
base::deque_item(other),
data_structures::deque_item(other),
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
......
......@@ -5,14 +5,14 @@ namespace pls {
namespace scheduling {
malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const size_t memory_per_stack):
num_threads_{num_threads} {
threads_ = reinterpret_cast<scheduler_thread*>(malloc(num_threads * sizeof(scheduler_thread)));
thread_states_ = reinterpret_cast<thread_state*>(malloc(num_threads * sizeof(thread_state)));
threads_ = reinterpret_cast<aligned_thread *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread)));
thread_states_ = reinterpret_cast<aligned_thread_state *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_thread_state)));
task_stacks_ = reinterpret_cast<base::aligned_stack*>(malloc(num_threads * sizeof(base::aligned_stack)));
task_stacks_memory_ = reinterpret_cast<char**>(malloc(num_threads * sizeof(char*)));
task_stacks_ = reinterpret_cast<aligned_aligned_stack *>(base::alignment::allocate_aligned(num_threads * sizeof(aligned_aligned_stack)));
task_stacks_memory_ = reinterpret_cast<char**>(base::alignment::allocate_aligned(num_threads * sizeof(char*)));
for (size_t i = 0; i < num_threads_; i++) {
task_stacks_memory_[i] = reinterpret_cast<char*>(malloc(memory_per_stack));
task_stacks_[i] = base::aligned_stack(task_stacks_memory_[i], memory_per_stack);
task_stacks_memory_[i] = reinterpret_cast<char*>(base::alignment::allocate_aligned(memory_per_stack));
new ((void*)task_stacks_[i].pointer()) data_structures::aligned_stack(task_stacks_memory_[i], memory_per_stack);
}
}
......
add_executable(tests
main.cpp
base_tests.cpp scheduling_tests.cpp)
base_tests.cpp scheduling_tests.cpp data_structures_test.cpp)
target_link_libraries(tests catch2 pls)
#include <catch.hpp>
#include <pls/internal/base/thread.h>
#include <pls/internal/base/spin_lock.h>
#include <pls/internal/base/aligned_stack.h>
#include <pls/internal/base/system_details.h>
#include <vector>
#include <mutex>
#include <pls/internal/base/deque.h>
using namespace pls::internal::base;
using namespace std;
......@@ -15,7 +13,7 @@ static bool base_tests_visited;
static int base_tests_local_value_one;
static vector<int> base_tests_local_value_two;
TEST_CASE( "thread creation and joining", "[internal/base/thread.h]") {
TEST_CASE( "thread creation and joining", "[internal/data_structures/thread.h]") {
base_tests_visited = false;
auto t1 = start_thread([]() { base_tests_visited = true; });
t1.join();
......@@ -23,7 +21,7 @@ TEST_CASE( "thread creation and joining", "[internal/base/thread.h]") {
REQUIRE(base_tests_visited);
}
TEST_CASE( "thread state", "[internal/base/thread.h]") {
TEST_CASE( "thread state", "[internal/data_structures/thread.h]") {
int state_one = 1;
vector<int> state_two{1, 2};
......@@ -38,7 +36,7 @@ TEST_CASE( "thread state", "[internal/base/thread.h]") {
int base_tests_shared_counter;
TEST_CASE( "spinlock protects concurrent counter", "[internal/base/spinlock.h]") {
TEST_CASE( "spinlock protects concurrent counter", "[internal/data_structures/spinlock.h]") {
constexpr int num_iterations = 1000000;
base_tests_shared_counter = 0;
spin_lock lock{};
......@@ -85,122 +83,3 @@ TEST_CASE( "spinlock protects concurrent counter", "[internal/base/spinlock.h]")
REQUIRE(base_tests_shared_counter == 0);
}
}
TEST_CASE( "aligned stack stores objects correctly", "[internal/base/aligned_stack.h]") {
constexpr long data_size = 1024;
char data[data_size];
aligned_stack stack{data, data_size};
SECTION( "stack correctly pushes sub linesize objects" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
std::array<char, 1> small_data_three{'A'};
auto pointer_one = stack.push(small_data_one);
auto pointer_two = stack.push(small_data_two);
auto pointer_three = stack.push(small_data_three);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_one) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_two) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_three) % CACHE_LINE_SIZE == 0);
}
SECTION( "stack correctly pushes above linesize objects" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, CACHE_LINE_SIZE + 10> big_data_one{};
auto big_pointer_one = stack.push(big_data_one);
auto small_pointer_one = stack.push(small_data_one);
REQUIRE(reinterpret_cast<std::uintptr_t>(big_pointer_one) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(small_pointer_one) % CACHE_LINE_SIZE == 0);
}
SECTION( "stack correctly stores and retrieves objects" ) {
std::array<char, 5> data_one{'a', 'b', 'c', 'd', 'e'};
stack.push(data_one);
auto retrieved_data = stack.pop<std::array<char, 5>>();
REQUIRE(retrieved_data == std::array<char, 5>{'a', 'b', 'c', 'd', 'e'});
}
SECTION( "stack can push and pop multiple times with correct alignment" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
std::array<char, 1> small_data_three{'A'};
auto pointer_one = stack.push(small_data_one);
auto pointer_two = stack.push(small_data_two);
auto pointer_three = stack.push(small_data_three);
stack.pop<typeof(small_data_three)>();
stack.pop<typeof(small_data_two)>();
auto pointer_four = stack.push(small_data_two);
auto pointer_five = stack.push(small_data_three);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_one) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_two) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_three) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_four) % CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_five) % CACHE_LINE_SIZE == 0);
REQUIRE(pointer_four == pointer_two);
REQUIRE(pointer_five == pointer_three);
}
}
TEST_CASE( "deque stores objects correctly", "[internal/base/deque.h]") {
class my_item: public deque_item {
};
deque<my_item> deque;
my_item one, two, three;
SECTION( "add and remove items form the tail" ) {
deque.push_tail(&one);
deque.push_tail(&two);
deque.push_tail(&three);
REQUIRE(deque.pop_tail() == &three);
REQUIRE(deque.pop_tail() == &two);
REQUIRE(deque.pop_tail() == &one);
}
SECTION( "handles getting empty by popping the tail correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_tail() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_tail() == &two);
}
SECTION( "remove items form the head" ) {
deque.push_tail(&one);
deque.push_tail(&two);
deque.push_tail(&three);
REQUIRE(deque.pop_head() == &one);
REQUIRE(deque.pop_head() == &two);
REQUIRE(deque.pop_head() == &three);
}
SECTION( "handles getting empty by popping the head correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_head() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_head() == &two);
}
SECTION( "handles getting empty by popping the head and tail correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_tail() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_head() == &two);
deque.push_tail(&three);
REQUIRE(deque.pop_tail() == &three);
}
}
#include <catch.hpp>
#include <pls/internal/base/system_details.h>
#include <pls/internal/data_structures/aligned_stack.h>
#include <pls/internal/data_structures/deque.h>
#include <vector>
#include <mutex>
using namespace pls::internal::data_structures;
using namespace pls::internal::base;
using namespace std;
TEST_CASE( "aligned stack stores objects correctly", "[internal/data_structures/aligned_stack.h]") {
constexpr long data_size = 1024;
char data[data_size];
aligned_stack stack{data, data_size};
SECTION( "stack correctly pushes sub linesize objects" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
std::array<char, 1> small_data_three{'A'};
auto pointer_one = stack.push(small_data_one);
auto pointer_two = stack.push(small_data_two);
auto pointer_three = stack.push(small_data_three);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_one) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_two) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_three) % system_details::CACHE_LINE_SIZE == 0);
}
SECTION( "stack correctly pushes above linesize objects" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, system_details::CACHE_LINE_SIZE + 10> big_data_one{};
auto big_pointer_one = stack.push(big_data_one);
auto small_pointer_one = stack.push(small_data_one);
REQUIRE(reinterpret_cast<std::uintptr_t>(big_pointer_one) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(small_pointer_one) % system_details::CACHE_LINE_SIZE == 0);
}
SECTION( "stack correctly stores and retrieves objects" ) {
std::array<char, 5> data_one{'a', 'b', 'c', 'd', 'e'};
stack.push(data_one);
auto retrieved_data = stack.pop<std::array<char, 5>>();
REQUIRE(retrieved_data == std::array<char, 5>{'a', 'b', 'c', 'd', 'e'});
}
SECTION( "stack can push and pop multiple times with correct alignment" ) {
std::array<char, 5> small_data_one{'a', 'b', 'c', 'd', 'e'};
std::array<char, 64> small_data_two{};
std::array<char, 1> small_data_three{'A'};
auto pointer_one = stack.push(small_data_one);
auto pointer_two = stack.push(small_data_two);
auto pointer_three = stack.push(small_data_three);
stack.pop<typeof(small_data_three)>();
stack.pop<typeof(small_data_two)>();
auto pointer_four = stack.push(small_data_two);
auto pointer_five = stack.push(small_data_three);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_one) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_two) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_three) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_four) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(reinterpret_cast<std::uintptr_t>(pointer_five) % system_details::CACHE_LINE_SIZE == 0);
REQUIRE(pointer_four == pointer_two);
REQUIRE(pointer_five == pointer_three);
}
}
TEST_CASE( "deque stores objects correctly", "[internal/data_structures/deque.h]") {
class my_item: public deque_item {
};
deque<my_item> deque;
my_item one, two, three;
SECTION( "add and remove items form the tail" ) {
deque.push_tail(&one);
deque.push_tail(&two);
deque.push_tail(&three);
REQUIRE(deque.pop_tail() == &three);
REQUIRE(deque.pop_tail() == &two);
REQUIRE(deque.pop_tail() == &one);
}
SECTION( "handles getting empty by popping the tail correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_tail() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_tail() == &two);
}
SECTION( "remove items form the head" ) {
deque.push_tail(&one);
deque.push_tail(&two);
deque.push_tail(&three);
REQUIRE(deque.pop_head() == &one);
REQUIRE(deque.pop_head() == &two);
REQUIRE(deque.pop_head() == &three);
}
SECTION( "handles getting empty by popping the head correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_head() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_head() == &two);
}
SECTION( "handles getting empty by popping the head and tail correctly" ) {
deque.push_tail(&one);
REQUIRE(deque.pop_tail() == &one);
deque.push_tail(&two);
REQUIRE(deque.pop_head() == &two);
deque.push_tail(&three);
REQUIRE(deque.pop_tail() == &three);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment