Commit 872c1a72 by FritzFlorian

Add different stack allocators.

It is now possible to use a memory mapped stack that throws a SIGSEV if thes coroutine stacks are exhausted.
parent 927d8ac5
Pipeline #1421 failed with stages
in 29 seconds
......@@ -38,7 +38,6 @@ add_subdirectory(lib/pls)
# Include examples
add_subdirectory(app/playground)
add_subdirectory(app/test_for_new)
add_subdirectory(app/invoke_parallel)
add_subdirectory(app/benchmark_fft)
add_subdirectory(app/benchmark_unbalanced)
add_subdirectory(app/benchmark_matrix)
......
......@@ -4,6 +4,40 @@ The new version of pls uses a more complicated/less user friendly
API in favor of performance and memory guarantees.
For the old version refer to the second half of this document.
# 24.03.2020 - mmap stack
We added different means of allocating stacks for our coroutines.
The biggest difference is that we now support mmap with guard pages
for the stacks. This leads to sigsevs if the stack space is really exhausted
during runtime, which is better than undefined behaviour.
The minimum page size is usually 4kB, which is plenty in release builds.
Other solutions for the cactus stack problem (memory mapped cactus stacks)
share this 'minimum resource per depth' limitation. They argue, that they only need
the physical memory in the worst case, whereas we need it in any execution schedule.
As we aim for worst case bounds this seems to be not a big issue. Using any smaller size
requires to give up on the guard pages, but is possible with heap allocated stacks.
See [mmap](http://man7.org/linux/man-pages/man2/mmap.2.html), [mprotect](https://linux.die.net/man/2/mprotect)
and [sysconf](http://man7.org/linux/man-pages/man3/sysconf.3.html) for implementation details.
The general approach of mmaping the stack, then protecting a page above it should be straight forward.
# 24.03.2020 - Bug in tsan
While implementing the coroutines required for the current parent-stealing scheduler
we fond issues when running with thread sanitizer. First we thought that the issues
are related to the way we handle context switching, but after some investigation we found
that it was due to a memory leak in tsan.
Specificially, tsan leaked memory mappings for each created/destroyed fiber
which lead to [linux limits](https://stackoverflow.com/questions/22779556/linux-error-from-munmap)
preventing the application from performing more mappings. This leads to the
application hanging or crashing after some time.
We addressed the issue in a [patch request](https://reviews.llvm.org/D76073) to the
LLVM/Clang tool suite. When the patch is merged we need to setup our test-runner to use the
current version of Clang and add the information to the documentation.
# 18.03.2020 - C++17 benefit, overaligned new
We have many cache line aligned types. Before C++17 they can
......
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/static_scheduler_memory.h"
using namespace pls::internal::scheduling;
......@@ -15,6 +14,9 @@ void pls_conquer(fft::complex_vector::iterator data, fft::complex_vector::iterat
fft::divide(data, swap_array, n);
if (n <= fft::RECURSIVE_CUTOFF) {
FILE* file = fopen("test.text", "w");
fprintf(file, "test %d", n);
fclose(file);
fft::conquer(data, swap_array, n / 2);
fft::conquer(data + n / 2, swap_array + n / 2, n / 2);
} else {
......@@ -30,9 +32,8 @@ void pls_conquer(fft::complex_vector::iterator data, fft::complex_vector::iterat
fft::combine(data, n);
}
constexpr int MAX_NUM_THREADS = 8;
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 8;
constexpr int MAX_STACK_SIZE = 1024 * 4;
int main(int argc, char **argv) {
int num_threads;
......@@ -47,10 +48,7 @@ int main(int argc, char **argv) {
fft::complex_vector swap_array(fft::SIZE);
fft::fill_input(data);
static_scheduler_memory<MAX_NUM_THREADS,
MAX_NUM_TASKS,
MAX_STACK_SIZE> global_scheduler_memory;
scheduler scheduler{global_scheduler_memory, (unsigned) num_threads};
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
runner.run_iterations(fft::NUM_ITERATIONS, [&]() {
scheduler.perform_work([&]() {
......
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/static_scheduler_memory.h"
using namespace pls::internal::scheduling;
......@@ -31,14 +30,9 @@ int pls_fib(int n) {
return a + b;
}
constexpr int MAX_NUM_THREADS = 8;
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 4;
static_scheduler_memory<MAX_NUM_THREADS,
MAX_NUM_TASKS,
MAX_STACK_SIZE> global_scheduler_memory;
int main(int argc, char **argv) {
int num_threads;
string directory;
......@@ -48,10 +42,9 @@ int main(int argc, char **argv) {
string full_directory = directory + "/PLS_v3/";
benchmark_runner runner{full_directory, test_name};
scheduler scheduler{global_scheduler_memory, (unsigned) num_threads};
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
volatile int res;
runner.run_iterations(fib::NUM_ITERATIONS, [&]() {
scheduler.perform_work([&]() {
res = pls_fib(fib::INPUT_N);
......
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/static_scheduler_memory.h"
#include "pls/algorithms/for_each.h"
using namespace pls::internal::scheduling;
......@@ -21,14 +20,9 @@ class pls_matrix : public matrix::matrix<T, SIZE> {
}
};
constexpr int MAX_NUM_THREADS = 8;
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 1;
static_scheduler_memory<MAX_NUM_THREADS,
MAX_NUM_TASKS,
MAX_STACK_SIZE> global_scheduler_memory;
int main(int argc, char **argv) {
int num_threads;
string directory;
......@@ -42,7 +36,7 @@ int main(int argc, char **argv) {
pls_matrix<double, matrix::MATRIX_SIZE> b;
pls_matrix<double, matrix::MATRIX_SIZE> result;
scheduler scheduler{global_scheduler_memory, (unsigned) num_threads};
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
runner.run_iterations(matrix::NUM_ITERATIONS, [&]() {
scheduler.perform_work([&]() {
......
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/static_scheduler_memory.h"
using namespace pls::internal::scheduling;
......@@ -32,14 +31,9 @@ int unbalanced_tree_search(int seed, int root_children, double q, int normal_chi
return count_child_nodes(root);
}
constexpr int MAX_NUM_THREADS = 8;
constexpr int MAX_NUM_TASKS = 256;
constexpr int MAX_STACK_SIZE = 1024 * 2;
static_scheduler_memory<MAX_NUM_THREADS,
MAX_NUM_TASKS,
MAX_STACK_SIZE> global_scheduler_memory;
int main(int argc, char **argv) {
int num_threads;
string directory;
......@@ -49,7 +43,7 @@ int main(int argc, char **argv) {
string full_directory = directory + "/PLS_v3/";
benchmark_runner runner{full_directory, test_name};
scheduler scheduler{global_scheduler_memory, (unsigned) num_threads};
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
runner.run_iterations(unbalanced::NUM_ITERATIONS, [&]() {
scheduler.perform_work([&]() {
......
add_executable(invoke_parallel main.cpp)
target_link_libraries(invoke_parallel pls)
if(EASY_PROFILER)
target_link_libraries(invoke_parallel easy_profiler)
endif()
#include <pls/pls.h>
#include <pls/internal/helpers/profiler.h>
#include <iostream>
#include <complex>
#include <vector>
static constexpr int CUTOFF = 16;
static constexpr int INPUT_SIZE = 8192;
typedef std::vector<std::complex<double>> complex_vector;
void divide(complex_vector::iterator data, int n) {
complex_vector tmp_odd_elements(n / 2);
for (int i = 0; i < n / 2; i++) {
tmp_odd_elements[i] = data[i * 2 + 1];
}
for (int i = 0; i < n / 2; i++) {
data[i] = data[i * 2];
}
for (int i = 0; i < n / 2; i++) {
data[i + n / 2] = tmp_odd_elements[i];
}
}
void combine(complex_vector::iterator data, int n) {
for (int i = 0; i < n / 2; i++) {
std::complex<double> even = data[i];
std::complex<double> odd = data[i + n / 2];
// w is the "twiddle-factor".
// this could be cached, but we run the same 'data_structures' algorithm parallel/serial,
// so it won't impact the performance comparison.
std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n));
data[i] = even + w * odd;
data[i + n / 2] = even - w * odd;
}
}
void fft(complex_vector::iterator data, int n) {
if (n < 2) {
return;
}
PROFILE_WORK_BLOCK("Divide")
divide(data, n);
PROFILE_END_BLOCK
PROFILE_WORK_BLOCK("Invoke Parallel")
if (n == CUTOFF) {
PROFILE_WORK_BLOCK("FFT Serial")
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else if (n <= CUTOFF) {
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else {
pls::invoke(
[n, &data] { fft(data, n / 2); },
[n, &data] { fft(data + n / 2, n / 2); }
);
}
PROFILE_END_BLOCK
PROFILE_WORK_BLOCK("Combine")
combine(data, n);
PROFILE_END_BLOCK
}
complex_vector prepare_input(int input_size) {
std::vector<double> known_frequencies{2, 11, 52, 88, 256};
complex_vector data(input_size);
// Set our input data to match a time series of the known_frequencies.
// When applying fft to this time-series we should find these frequencies.
for (int i = 0; i < input_size; i++) {
data[i] = std::complex<double>(0.0, 0.0);
for (auto frequencie : known_frequencies) {
data[i] += sin(2 * M_PI * frequencie * i / input_size);
}
}
return data;
}
int main() {
PROFILE_ENABLE
pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 14};
pls::scheduler scheduler{&my_scheduler_memory, 8};
complex_vector initial_input = prepare_input(INPUT_SIZE);
scheduler.perform_work([&] {
PROFILE_MAIN_THREAD
// Call looks just the same, only requirement is
// the enclosure in the perform_work lambda.
for (int i = 0; i < 10; i++) {
PROFILE_WORK_BLOCK("Top Level FFT")
complex_vector input = initial_input;
fft(input.begin(), input.size());
}
});
PROFILE_SAVE("test_profile.prof")
}
......@@ -5,9 +5,10 @@ add_library(pls STATIC
include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp
include/pls/internal/base/swmr_spin_lock.h src/internal/base/swmr_spin_lock.cpp
include/pls/internal/base/barrier.h src/internal/base/barrier.cpp
include/pls/internal/base/system_details.h
include/pls/internal/base/system_details.h src/internal/base/system_details.cpp
include/pls/internal/base/error_handling.h src/internal/base/error_handling.cpp
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/base/stack_allocator.h src/internal/base/stack_allocator.cpp
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
......@@ -26,13 +27,11 @@ add_library(pls STATIC
include/pls/internal/helpers/member_function.h
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/task_manager.h src/internal/scheduling/task_manager.cpp
include/pls/internal/scheduling/scheduler.h include/pls/internal/scheduling/scheduler_impl.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task_manager_impl.h src/internal/scheduling/task_manager.cpp
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/external_trading_deque.h src/internal/scheduling/external_trading_deque.cpp
include/pls/internal/scheduling/traded_cas_field.h
include/pls/internal/scheduling/task_manager_impl.h)
include/pls/internal/scheduling/traded_cas_field.h)
# Dependencies for pls
target_link_libraries(pls Threads::Threads)
......
......@@ -8,10 +8,7 @@
#include "system_details.h"
namespace pls {
namespace internal {
namespace base {
namespace alignment {
namespace pls::internal::base::alignment {
constexpr system_details::pointer_t next_alignment(system_details::pointer_t size,
size_t alignment = system_details::CACHE_LINE_SIZE) {
......@@ -70,9 +67,7 @@ struct alignment_wrapper {
template<typename T>
using cache_alignment_wrapper = alignment_wrapper<T, system_details::CACHE_LINE_SIZE>;
}
}
}
}
#endif //PLS_ALIGNMENT_H
......@@ -9,9 +9,7 @@
#include <random>
#include <math.h>
namespace pls {
namespace internal {
namespace base {
namespace pls::internal::base {
class backoff {
const unsigned long INITIAL_SPIN_ITERS = 2u << 1u;
......@@ -48,7 +46,5 @@ class backoff {
};
}
}
}
#endif //PLS_BACKOFF_H_
#ifndef PLS_INCLUDE_PLS_INTERNAL_BASE_STACK_ALLOCATOR_H_
#define PLS_INCLUDE_PLS_INTERNAL_BASE_STACK_ALLOCATOR_H_
#include <cstddef>
namespace pls::internal::base {
class stack_allocator {
public:
virtual char *allocate_stack(size_t size) = 0;
virtual void free_stack(size_t size, char *stack) = 0;
};
class heap_stack_allocator : public stack_allocator {
public:
char *allocate_stack(size_t size) override { return new char[size]; }
void free_stack(size_t, char *stack) override { delete[] stack; }
};
class mmap_stack_allocator : public stack_allocator {
char *allocate_stack(size_t size) override;
void free_stack(size_t, char *stack) override;
};
}
#endif //PLS_INCLUDE_PLS_INTERNAL_BASE_STACK_ALLOCATOR_H_
......@@ -14,17 +14,17 @@
#endif
#endif
#include <cstdint>
#include "error_handling.h"
namespace pls::internal::base {
#include <cstdint>
/**
* Collection of system details, e.g. hardware cache line size.
*
* PORTABILITY:
* Currently sane default values for x86.
* Currently sane default values for x86. Must be changed for any port.
*/
namespace system_details {
namespace pls::internal::base::system_details {
/**
* Pointer Types needed for ABA protection mixed into addresses.
......@@ -42,7 +42,7 @@ constexpr unsigned long CAS_SIZE = sizeof(cas_integer) * 8;
/**
* Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings).
*/
constexpr pointer_t CACHE_LINE_SIZE = 64;
constexpr size_t CACHE_LINE_SIZE = 64;
/**
* Helper to align types/values on cache lines.
......@@ -50,11 +50,40 @@ constexpr pointer_t CACHE_LINE_SIZE = 64;
#define PLS_CACHE_ALIGN alignas(base::system_details::CACHE_LINE_SIZE)
/**
* Choose one of the following ways to store thread specific data.
* Try to choose the fastest available on this processor/system.
* Helper to find mmap page size. Either set constant or rely on system specific getter function.
*/
//#define PLS_THREAD_SPECIFIC_PTHREAD
#define PLS_THREAD_SPECIFIC_COMPILER
size_t get_page_size();
/**
* Wrapper to create a new memory mapping.
* Currently implemented for linux/posix systems.
*
* @param size The page_size aligned size of the new mapping
* @return The newly created mapping or the error returned by the sytem call
*/
void *memory_map_range(size_t size);
/**
* Helper to revert a previous made memory mapping.
* currently implemented for linux/posix systems.
*
* @param addr The start address of the mapping.
* @param size The size of the mapping.
*
* @return status code from system call
*/
int memory_unmap_range(void *addr, size_t size);
/**
* Helper to protect a specific address range (must be mapped in the application)
* from any access. Later accesses to this address range should result in a system error.
*
* @param addr The start address of the to be protected block.
* @param size The size of the protected block.
*
* @return status code from system call
*/
int memory_protect_range(void *addr, size_t size);
/**
* When spinning one wants to 'relax' the CPU from some task,
......@@ -89,6 +118,5 @@ inline void relax_cpu() {
#endif
}
}
#endif //PLS_SYSTEM_DETAILS_H
......@@ -7,7 +7,10 @@
#include <vector>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/barrier.h"
#include "pls/internal/base/stack_allocator.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task_manager.h"
......@@ -103,7 +106,7 @@ class scheduler {
bool terminated_;
// TODO: remove this into a public wrapper class with templating
heap_stack_allocator stack_allocator_{};
base::mmap_stack_allocator stack_allocator_{};
};
}
......
......@@ -9,27 +9,19 @@
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/external_trading_deque.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/stack_allocator.h"
namespace pls::internal::scheduling {
class stack_allocator {
public:
virtual char *allocate_stack(size_t size) = 0;
virtual void free_stack(size_t size, char *stack) = 0;
};
class heap_stack_allocator : public stack_allocator {
public:
char *allocate_stack(size_t size) override { return new char[size]; }
void free_stack(size_t, char *stack) override { delete[] stack; }
};
/**
* Handles management of tasks in the system. Each thread has a local task manager,
* responsible for allocating, freeing and publishing tasks for stealing.
*
* All interaction for spawning, stealing and task trading are managed through this class.
*/
class task_manager {
using stack_allocator = pls::internal::base::stack_allocator;
public:
explicit task_manager(unsigned thread_id,
size_t num_tasks,
......
......@@ -58,11 +58,11 @@ void task_manager::spawn_child(F &&lambda) {
if (syncing_task_manager->try_clean_return(result_cont)) {
// We return back to the main scheduling loop
PLS_ASSERT(result_cont.valid(), "Must only return valid continuations...");
return std::move(result_cont);
return result_cont;
} else {
// We finish up the last task and are the sole owner again
PLS_ASSERT(result_cont.valid(), "Must only return valid continuations...");
return std::move(result_cont);
return result_cont;
}
}
});
......
#include "pls/internal/base/stack_allocator.h"
#include "pls/internal/base/alignment.h"
#include "pls/internal/base/system_details.h"
namespace pls::internal::base {
char *mmap_stack_allocator::allocate_stack(size_t size) {
const size_t page_size = system_details::get_page_size();
const size_t stack_size = alignment::next_alignment(size, page_size);
const size_t guard_size = page_size;
const size_t mmap_size = stack_size + guard_size;
char *const memory_range = reinterpret_cast<char *>(system_details::memory_map_range(mmap_size));
char *const stack_block = memory_range + guard_size;
char *const guard_block = memory_range;
system_details::memory_protect_range(guard_block, guard_size);
return stack_block;
}
void mmap_stack_allocator::free_stack(size_t size, char *stack) {
const size_t page_size = system_details::get_page_size();
const size_t guard_size = page_size;
const size_t mmap_size = size + guard_size;
char *const memory_range = stack - guard_size;
system_details::memory_unmap_range(memory_range, mmap_size);
}
}
#include "pls/internal/base/system_details.h"
#include <unistd.h>
#include <sys/mman.h>
namespace pls::internal::base::system_details {
size_t get_page_size() {
return sysconf(_SC_PAGESIZE);
}
void *memory_map_range(size_t size) {
PLS_ASSERT(size % get_page_size() == 0, "Must only map memory regions in page_size chunks.");
return mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
}
int memory_unmap_range(void *addr, size_t size) {
PLS_ASSERT((pointer_t) addr % get_page_size() == 0, "Must only unmap memory page_size aligned memory regions.");
PLS_ASSERT(size % get_page_size() == 0, "Must only map memory regions in page_size chunks.");
return munmap(addr, size);
}
int memory_protect_range(void *addr, size_t size) {
return mprotect(addr, size, PROT_NONE);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment