Commit d7107d27 by FritzFlorian

Add malloc implementation of scheduler memory.

This lead to some bug-fixes that covered move constructors not correctly assigning memory. We might clean this up further by only allowing in place creation of these types (placement new).
parent 72bffbb1
Pipeline #1130 failed with stages
in 24 seconds
...@@ -26,6 +26,23 @@ After this is done you can use normal `make` commands like ...@@ -26,6 +26,23 @@ After this is done you can use normal `make` commands like
`make` to build everything `make <target>` to build a target `make` to build everything `make <target>` to build a target
or `make install` to install the library globally. or `make install` to install the library globally.
Available Settings:
- `-DEASY_PROFILER=ON/OFF`
- default OFF
- Enabling will link the easy profiler library and enable its macros
- Enabling has a performance hit (do not use in releases)
- `-DADDRESS_SANITIZER=ON/OFF`
- default OFF
- Enables address sanitizer to be linked to the executable
- Only one sanitizer can be active at once
- Enabling has a performance hit (do not use in releases)
- `-DTHREAD_SANITIZER=ON/OFF`
- default OFF
- Enables thread/datarace sanitizer to be linked to the executable
- Only one sanitizer can be active at once
- Enabling has a performance hit (do not use in releases)
### Testing ### Testing
Testing is done using [Catch2](https://github.com/catchorg/Catch2/) Testing is done using [Catch2](https://github.com/catchorg/Catch2/)
......
...@@ -14,7 +14,9 @@ add_library(pls STATIC ...@@ -14,7 +14,9 @@ add_library(pls STATIC
src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h
src/internal/scheduling/fork_join_task.cpp include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp include/pls/internal/scheduling/fork_join_task.h
src/internal/base/deque.cpp include/pls/internal/base/deque.h src/internal/base/deque.cpp include/pls/internal/base/deque.h
src/algorithms/invoke_parallel.cpp include/pls/algorithms/invoke_parallel.h include/pls/internal/base/error_handling.h) src/algorithms/invoke_parallel.cpp include/pls/algorithms/invoke_parallel.h
include/pls/internal/base/error_handling.h
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
......
...@@ -12,45 +12,14 @@ ...@@ -12,45 +12,14 @@
#include "thread_state.h" #include "thread_state.h"
#include "root_task.h" #include "root_task.h"
#include "scheduler_memory.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
// Upper thread limit for static memory allocation.
// Could be moved to templating if needed.
static constexpr int MAX_THREADS = 32;
void worker_routine(); void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>; using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler_memory {
public:
virtual size_t max_threads() = 0;
virtual thread_state* thread_state_for(size_t id) = 0;
virtual scheduler_thread* thread_for(size_t id) = 0;
virtual base::aligned_stack* task_stack_for(size_t id) = 0;
};
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
class static_scheduler_memory: public scheduler_memory {
std::array<scheduler_thread, MAX_THREADS> threads_;
std::array<thread_state, MAX_THREADS> thread_states_;
std::array<std::array<char, TASK_STACK_SIZE>, MAX_THREADS> task_stacks_memory_;
std::array<base::aligned_stack, MAX_THREADS> task_stacks_;
public:
static_scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
task_stacks_[i] = base::aligned_stack(reinterpret_cast<char*>(&task_stacks_memory_[i]), TASK_STACK_SIZE);
}
}
size_t max_threads() override { return MAX_THREADS; }
thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; }
scheduler_thread* thread_for(size_t id) override { return &threads_[id]; }
base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; }
};
class scheduler { class scheduler {
friend void worker_routine(); friend void worker_routine();
......
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/base/thread.h"
#include "thread_state.h"
#ifndef PLS_SCHEDULER_MEMORY_H
#define PLS_SCHEDULER_MEMORY_H
namespace pls {
namespace internal {
namespace scheduling {
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler_memory {
public:
virtual size_t max_threads() = 0;
virtual thread_state* thread_state_for(size_t id) = 0;
virtual scheduler_thread* thread_for(size_t id) = 0;
virtual base::aligned_stack* task_stack_for(size_t id) = 0;
};
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
class static_scheduler_memory: public scheduler_memory {
std::array<scheduler_thread, MAX_THREADS> threads_;
std::array<thread_state, MAX_THREADS> thread_states_;
std::array<std::array<char, TASK_STACK_SIZE>, MAX_THREADS> task_stacks_memory_;
std::array<base::aligned_stack, MAX_THREADS> task_stacks_;
public:
static_scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
task_stacks_[i] = base::aligned_stack(task_stacks_memory_[i].data(), TASK_STACK_SIZE);
}
}
size_t max_threads() override { return MAX_THREADS; }
thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; }
scheduler_thread* thread_for(size_t id) override { return &threads_[id]; }
base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; }
};
class malloc_scheduler_memory: public scheduler_memory {
size_t num_threads_;
scheduler_thread* threads_;
thread_state* thread_states_;
char** task_stacks_memory_;
base::aligned_stack* task_stacks_;
public:
explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16);
~malloc_scheduler_memory();
size_t max_threads() override { return num_threads_; }
thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; }
scheduler_thread* thread_for(size_t id) override { return &threads_[id]; }
base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; }
};
}
}
}
#endif //PLS_SCHEDULER_MEMORY_H
...@@ -7,8 +7,10 @@ ...@@ -7,8 +7,10 @@
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
namespace pls { namespace pls {
using internal::scheduling::scheduler;
using internal::scheduling::static_scheduler_memory; using internal::scheduling::static_scheduler_memory;
using internal::scheduling::malloc_scheduler_memory;
using internal::scheduling::scheduler;
using task_id = internal::scheduling::abstract_task::id; using task_id = internal::scheduling::abstract_task::id;
using internal::scheduling::fork_join_sub_task; using internal::scheduling::fork_join_sub_task;
......
...@@ -13,9 +13,12 @@ namespace pls { ...@@ -13,9 +13,12 @@ namespace pls {
tbb_task_{nullptr}, tbb_task_{nullptr},
stack_state_{nullptr} {} stack_state_{nullptr} {}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other): base::deque_item(other) { fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other):
// Do Nothing, will be inited after this anyways base::deque_item(other),
} ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
void fork_join_sub_task::execute() { void fork_join_sub_task::execute() {
EASY_BLOCK("execute sub_task", profiler::colors::Green); EASY_BLOCK("execute sub_task", profiler::colors::Green);
......
...@@ -9,12 +9,12 @@ namespace pls { ...@@ -9,12 +9,12 @@ namespace pls {
memory_{memory}, memory_{memory},
sync_barrier_{num_threads + 1}, sync_barrier_{num_threads + 1},
terminated_{false} { terminated_{false} {
if (num_threads > MAX_THREADS) { if (num_threads_ > memory_->max_threads()) {
PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
} }
for (unsigned int i = 0; i < num_threads; i++) { for (unsigned int i = 0; i < num_threads_; i++) {
*memory_->thread_state_for(i) = thread_state{this, memory_->task_stack_for(i), i}; new(memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
*memory_->thread_for(i) = base::start_thread(&worker_routine, memory_->thread_state_for(i)); *memory_->thread_for(i) = base::start_thread(&worker_routine, memory_->thread_state_for(i));
} }
} }
......
#include "pls/internal/scheduling/scheduler_memory.h"
namespace pls {
namespace internal {
namespace scheduling {
malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const size_t memory_per_stack):
num_threads_{num_threads} {
threads_ = reinterpret_cast<scheduler_thread*>(malloc(num_threads * sizeof(scheduler_thread)));
thread_states_ = reinterpret_cast<thread_state*>(malloc(num_threads * sizeof(thread_state)));
task_stacks_ = reinterpret_cast<base::aligned_stack*>(malloc(num_threads * sizeof(base::aligned_stack)));
task_stacks_memory_ = reinterpret_cast<char**>(malloc(num_threads * sizeof(char*)));
for (size_t i = 0; i < num_threads_; i++) {
task_stacks_memory_[i] = reinterpret_cast<char*>(malloc(memory_per_stack));
task_stacks_[i] = base::aligned_stack(task_stacks_memory_[i], memory_per_stack);
}
}
malloc_scheduler_memory::~malloc_scheduler_memory() {
free(threads_);
free(thread_states_);
for (size_t i = 0; i < num_threads_; i++) {
free(task_stacks_memory_[i]);
}
free(task_stacks_);
free(task_stacks_memory_);
}
}
}
}
...@@ -48,7 +48,7 @@ public: ...@@ -48,7 +48,7 @@ public:
}; };
TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") { TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") {
static static_scheduler_memory<8, 2 << 12> my_scheduler_memory; malloc_scheduler_memory my_scheduler_memory{8, 2 << 12};
SECTION("tasks are executed exactly once") { SECTION("tasks are executed exactly once") {
scheduler my_scheduler{&my_scheduler_memory, 2}; scheduler my_scheduler{&my_scheduler_memory, 2};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment