Commit 740ae661 by FritzFlorian

WIP: Sketch continuation and taks class.

This first sketch of the classes captures what we think is needed in terms of general interface and very mich WIP.
parent 693d4e9b
Pipeline #1332 failed with stages
in 25 seconds
# Notes # Notes on Continuation/Parent Stealing Implementation
A collection of stuff that we noticed during development. The new version of pls uses a more complicated/less user friendly
Useful later on two write a project report and to go back API in favor of performance and memory guarantees.
in time to find out why certain decisions where made. For the old version refer to the second half of this document.
# 05.11.2019 - Memory Allocation, 'Fat Objects'
We change our memory allocation for all memory the scheduler requires
from allocating buffers (char* arrays) separate from the actual data
structures to 'fat datastructures' that use templating to create
an object that actually holds all the data. This allows us to more
simple add fields to manage tasks and continuations, as we do not
need to change the scheduler_memory (adding additional buffers), but
as we only have to add the fields directly to the container objects.
# 04.11.2019 - Memory Allocation and Initialization
Our framework tries to be explicit on how and where memory is allocated.
In any production build of the framework we will only use fixed size
memory pools/blocks to manage all data structures required by the
scheduler, as this property is our main research goal.
Never the less, we want to offer different ways on where to allocate
these fixed pools. Some people might prefer to store them in the stack,
some to store them in heap memory, and others might want to place them
into memory managed by custom allocators. Currently we support a stack
based 'fat' object and a heap based memory object that stores each
threads state in a vector (could be changed to lists in the future
to avoid the one big memory block allocated by the vector).
# Notes on Blocking/Child Stealing Implementation
Notes on the child stealing implementation of pls.
This corresponds to tag v0.1.
## 02.08.2019 - Ideas for sleeping threads when no work is available ## 02.08.2019 - Ideas for sleeping threads when no work is available
......
...@@ -76,12 +76,11 @@ long fib(long n) { ...@@ -76,12 +76,11 @@ long fib(long n) {
``` ```
## Project Structure ## Project Structure
The project uses [CMAKE](https://cmake.org/) as it's build system, The project uses [CMAKE](https://cmake.org/) as it's build system,
the recommended IDE is either a simple text editor or [CLion](https://www.jetbrains.com/clion/). the recommended IDE is either a simple text editor or [CLion](https://www.jetbrains.com/clion/).
We divide the project into subtargets to separate for the library We divide the project into sub-targets to separate for the library
itself, testing and example code. The library itself can be found in itself, testing and example code. The library itself can be found in
`lib/pls`, testing related code is in `test`, example and playground `lib/pls`, testing related code is in `test`, example and playground
apps are in `app`. apps are in `app`.
...@@ -114,11 +113,16 @@ Available Settings: ...@@ -114,11 +113,16 @@ Available Settings:
- Enables thread/datarace sanitizer to be linked to the executable - Enables thread/datarace sanitizer to be linked to the executable
- Only one sanitizer can be active at once - Only one sanitizer can be active at once
- Enabling has a performance hit (do not use in releases) - Enabling has a performance hit (do not use in releases)
- `-DDEBUG_SYMBOLS=ON` - `-DDEBUG_SYMBOLS=ON/OFF`
- default OFF - default OFF
- Enables the build with debug symbols - Enables the build with debug symbols
- Use for e.g. profiling the release build - Use for e.g. profiling the release build
Note that these settings are persistent for one CMake build folder.
If you e.g. set a flag in the debug build it will not influence
the release build, but it will persist in the debug build folder
until you explicitly change it back.
### Testing ### Testing
Testing is done using [Catch2](https://github.com/catchorg/Catch2/) Testing is done using [Catch2](https://github.com/catchorg/Catch2/)
...@@ -167,4 +171,8 @@ For detailed profiling of small performance hotspots we prefer ...@@ -167,4 +171,8 @@ For detailed profiling of small performance hotspots we prefer
to use [Intel's VTune Amplifier](https://software.intel.com/en-us/vtune). to use [Intel's VTune Amplifier](https://software.intel.com/en-us/vtune).
It gives insights in detailed microachitecture usage and performance It gives insights in detailed microachitecture usage and performance
hotspots. Follow the instructions by Intel for using it. hotspots. Follow the instructions by Intel for using it.
Make sure to enable debug symbols (`-DDEBUG_SYMBOLS=ON`) in the
analyzed build and that all optimizations are turned on
(by choosing the release build).
option(ASSEMBLY_OUTPUT "Enable output of assembly files when building" OFF)
if (ASSEMBLY_OUTPUT)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -save-temps -Wa,-ahldn=assembly.asm -fverbose-asm -g")
endif ()
message("-- Assembly Output: ${ASSEMBLY_OUTPUT}")
...@@ -13,12 +13,12 @@ message("-- Using Build Type: " ${CMAKE_BUILD_TYPE}) ...@@ -13,12 +13,12 @@ message("-- Using Build Type: " ${CMAKE_BUILD_TYPE})
# Enable optimizations in release builds # Enable optimizations in release builds
if (CMAKE_BUILD_TYPE STREQUAL "Release") if (CMAKE_BUILD_TYPE STREQUAL "Release")
# Link time optimization # Link time optimization
set(CMAKE_CXX_FLAGS "-Wall -Wextra") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra")
# -O2 is often seen as 'the most speed', # -O2 is often seen as 'the most speed',
# but inlining functions and SIMD/Vectorization is # but inlining functions and SIMD/Vectorization is
# only enabled by -O3, thus it's way faster in some # only enabled by -O3, thus it's way faster in some
# array calculations. # array calculations.
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -march=native") set(CMAKE_CXX_FLAGS_RELEASE "-O2 -march=native")
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE) set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)
else () else ()
set(CMAKE_CXX_FLAGS_DEBUG "-g -O0") set(CMAKE_CXX_FLAGS_DEBUG "-g -O0")
......
...@@ -39,6 +39,7 @@ add_library(pls STATIC ...@@ -39,6 +39,7 @@ add_library(pls STATIC
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/stamped_integer.h include/pls/internal/data_structures/stamped_integer.h
include/pls/internal/data_structures/delayed_initialization_wrapper.h
include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/prohibit_new.h
include/pls/internal/helpers/profiler.h include/pls/internal/helpers/profiler.h
...@@ -51,11 +52,11 @@ add_library(pls STATIC ...@@ -51,11 +52,11 @@ add_library(pls STATIC
include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/thread_state.h
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/scheduler_memory.h include/pls/internal/scheduling/scheduler_memory.h
include/pls/internal/scheduling/lambda_task.h
include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task_manager.h
include/pls/internal/scheduling/cont_manager.h) include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/cont_manager.h
include/pls/internal/scheduling/continuation.h)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
......
...@@ -35,11 +35,9 @@ class aligned_stack { ...@@ -35,11 +35,9 @@ class aligned_stack {
public: public:
typedef size_t stack_offset; typedef size_t stack_offset;
protected:
aligned_stack(char *memory_pointer, size_t size); aligned_stack(char *memory_pointer, size_t size);
aligned_stack(char *memory_pointer, size_t size, size_t original_size); aligned_stack(char *memory_pointer, size_t size, size_t original_size);
public:
template<typename T, typename ...ARGS> template<typename T, typename ...ARGS>
T *push(ARGS &&... args); T *push(ARGS &&... args);
template<typename T> template<typename T>
...@@ -64,18 +62,26 @@ class aligned_stack { ...@@ -64,18 +62,26 @@ class aligned_stack {
}; };
template<size_t SIZE> template<size_t SIZE>
class static_aligned_stack : public aligned_stack { class static_aligned_stack {
public: public:
static_aligned_stack(); static_aligned_stack();
aligned_stack &get_stack() { return aligned_stack_; }
private: private:
aligned_stack aligned_stack_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<char, SIZE> memory_; alignas(base::system_details::CACHE_LINE_SIZE) std::array<char, SIZE> memory_;
}; };
class heap_aligned_stack : public aligned_stack { class heap_aligned_stack {
public: public:
explicit heap_aligned_stack(size_t size); explicit heap_aligned_stack(size_t size);
~heap_aligned_stack(); ~heap_aligned_stack();
aligned_stack &get_stack() { return aligned_stack_; }
private:
aligned_stack aligned_stack_;
char *unaligned_memory_pointer_;
}; };
} }
......
...@@ -28,10 +28,11 @@ void aligned_stack::pop() { ...@@ -28,10 +28,11 @@ void aligned_stack::pop() {
} }
template<size_t SIZE> template<size_t SIZE>
static_aligned_stack<SIZE>::static_aligned_stack(): aligned_stack{memory_.data()} {}; static_aligned_stack<SIZE>::static_aligned_stack(): memory_{}, aligned_stack_{memory_.data()} {};
heap_aligned_stack::heap_aligned_stack(size_t size) : heap_aligned_stack::heap_aligned_stack(size_t size) :
aligned_stack{new char[base::alignment::next_alignment(size)], size, base::alignment::next_alignment(size)} {} unaligned_memory_pointer_{new char[base::alignment::next_alignment(size)]},
aligned_stack_{unaligned_memory_pointer_, size, base::alignment::next_alignment(size)} {}
heap_aligned_stack::~heap_aligned_stack() { heap_aligned_stack::~heap_aligned_stack() {
delete[] unaligned_memory_pointer_; delete[] unaligned_memory_pointer_;
......
#ifndef PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_
#define PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_
#include <array>
#include <utility>
#include "pls/internal/base/error_handling.h"
namespace pls {
namespace internal {
namespace data_structures {
/**
* Allows to reserve space for an uninitialized member variable.
* The member must be initialized before usage using the provided
* perfect forwarding constructor method.
*
* Makes sure to call the wrapped objects de-constructor when an object is wrapped.
*/
template<typename T>
class delayed_initialization_wrapper {
public:
delayed_initialization_wrapper() : memory_{}, initialized_{false} {}
template<typename ...ARGS>
explicit delayed_initialization_wrapper(ARGS &&...args): memory_{}, initialized_{true} {
new(memory_) T(std::forward<ARGS>(args)...);
}
~delayed_initialization_wrapper() {
if (initialized_) {
memory_->~T();
}
}
template<typename ...ARGS>
void initialize(ARGS &&...args) {
PLS_ASSERT(initialized_, "Can only initialize delayed wrapper object once!")
new(memory_) T(std::forward<ARGS>(args)...);
initialized_ = true;
}
T &object() {
PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!")
return *reinterpret_cast<T *>(memory_);
}
private:
std::array<char, sizeof(T)> memory_;
bool initialized_;
};
}
}
}
#endif // PLS_INTERNAL_DATA_STRUCTURES_DELAYED_INITIALIZATION_WRAPPER_H_
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <memory> #include <memory>
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/scheduling/continuation.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -12,15 +13,25 @@ namespace scheduling { ...@@ -12,15 +13,25 @@ namespace scheduling {
class cont_manager { class cont_manager {
public: public:
explicit cont_manager() = default; explicit cont_manager(size_t num_conts, size_t max_cont_size, data_structures::aligned_stack &cont_storage)
: num_conts_{num_conts}, max_cont_size_{max_cont_size}, cont_storage_{cont_storage} {
//TODO: Init linked list like structure
};
private: private:
// TODO: Add attributes const size_t num_conts_, max_cont_size_;
data_structures::aligned_stack &cont_storage_;
}; };
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE> template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
class static_cont_manager : public cont_manager { class static_cont_manager {
public:
static_cont_manager() : static_cont_storage_{}, cont_manager_{NUM_CONTS, MAX_CONT_SIZE, static_cont_storage_} {}
cont_manager &get_cont_manager() { return cont_manager_; }
private:
data_structures::static_aligned_stack<NUM_CONTS * MAX_CONT_SIZE> static_cont_storage_;
cont_manager cont_manager_;
}; };
} }
......
#ifndef PLS_INTERNAL_SCHEDULING_CONTINUATION_H_
#define PLS_INTERNAL_SCHEDULING_CONTINUATION_H_
#include <type_traits>
#include <atomic>
#include <utility>
#include "pls/internal/data_structures/delayed_initialization_wrapper.h"
namespace pls {
namespace internal {
namespace scheduling {
class base_continuation {
public:
virtual void run() = 0;
virtual ~base_continuation() = 0;
};
class continuation_node {
public:
private:
// Pointer to memory region reserved for the companion continuation.
// Must be a buffer big enough to hold any continuation encountered in the program.
base_continuation *continuation_;
// Linked list property of continuations (continuation chains as memory management).
// Each continuation knows its chain start to allow stealing a whole chain in O(1)
// without the need to traverse back to the chain start.
continuation_node *cont_chain_start_;
continuation_node *prev_, *next_;
// When blocked on this continuation, we need to know what other chain we
// got offered by the stealing thread.
// For this we need only the head of the other chain (as each continuation is a
// self describing entity for its chain up to the given node).
continuation_node *offered_chain_;
};
template<typename R1, typename R2, typename F>
class continuation : public base_continuation {
public:
void run() override {
// TODO: integrate this better into the runtime system.
// E.g. handle passing the result to the parent continuation
function_.object()(result_1_.object(), result_2_.object());
}
~continuation() override = default;
template<typename R1ARG>
void store_result_1_(R1ARG &&result_1) {
static_assert(std::is_same<R1, typename std::decay<R1ARG>::type>::value,
"must only copy/move objects in, not construct them");
result_1_.initialize(std::forward<R1ARG>(result_1));
}
template<typename R2ARG>
void store_result_2(R2ARG &&result_1) {
static_assert(std::is_same<R2, typename std::decay<R2ARG>::type>::value,
"must only copy/move objects in, not construct them");
result_2_.initialize(std::forward<R2ARG>(result_1));
}
template<typename FARG>
void store_function(FARG &&function) {
static_assert(std::is_same<F, typename std::decay<FARG>::type>::value,
"must only copy/move objects in, not construct them");
function_.initialize(function);
}
private:
// We plan to only init the members for a continuation on the slow path.
// If we can execute everything inline we simply skip it saving runtime overhead.
template<typename T>
using delayed_init = data_structures::delayed_initialization_wrapper<T>;
delayed_init<R1> result_1_;
delayed_init<R2> result_2_;
delayed_init<F> function_;
// Also uninitialized at first, only take the atomic write on the slow path.
// The stealer will init it to 2 while stealing, the 'stolen' sync will then make sure
// everyone sees the value in correct order.
std::atomic<unsigned short> results_missing_{};
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_CONTINUATION_H_
#ifndef PLS_LAMBDA_TASK_H_
#define PLS_LAMBDA_TASK_H_
#include "pls/internal/scheduling/task.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename Function>
class lambda_task_by_reference : public task {
const Function &function_;
public:
explicit lambda_task_by_reference(const Function &function) : task{}, function_{function} {};
protected:
void execute_internal() override {
function_();
wait_for_all();
this->~lambda_task_by_reference<Function>();
}
};
template<typename Function>
class lambda_task_by_value : public task {
const Function function_;
public:
explicit lambda_task_by_value(const Function &function) : task{}, function_{function} {};
protected:
void execute_internal() override {
function_();
wait_for_all();
this->~lambda_task_by_value<Function>();
}
};
}
}
}
#endif //PLS_LAMBDA_TASK_H_
...@@ -13,6 +13,10 @@ namespace scheduling { ...@@ -13,6 +13,10 @@ namespace scheduling {
void worker_routine(); void worker_routine();
class scheduler_memory { class scheduler_memory {
// Note: scheduler_memory is a pure interface and has no data.
// By not having an initialization routine we can do our 'static and heap specialization'
// without running into any ordering problems in the initialization sequence.
// We first worried about performance of this being virtual. // We first worried about performance of this being virtual.
// However, we decided that only thread_state_for is used during the // However, we decided that only thread_state_for is used during the
// runtime and that only when stealing. As stealing is expensive anyways, // runtime and that only when stealing. As stealing is expensive anyways,
...@@ -35,7 +39,7 @@ class static_scheduler_memory : public scheduler_memory { ...@@ -35,7 +39,7 @@ class static_scheduler_memory : public scheduler_memory {
} }
thread_state &thread_state_for(size_t id) const override { thread_state &thread_state_for(size_t id) const override {
return thread_states_[id]; return thread_states_[id].get_thread_state();
} }
private: private:
...@@ -69,7 +73,7 @@ class heap_scheduler_memory : public scheduler_memory { ...@@ -69,7 +73,7 @@ class heap_scheduler_memory : public scheduler_memory {
} }
thread_state &thread_state_for(size_t id) const override { thread_state &thread_state_for(size_t id) const override {
return thread_state_vector_[id].object(); return thread_state_vector_[id].object().get_thread_state();
} }
private: private:
......
...@@ -2,9 +2,7 @@ ...@@ -2,9 +2,7 @@
#ifndef PLS_TASK_H #ifndef PLS_TASK_H
#define PLS_TASK_H #define PLS_TASK_H
#include "pls/internal/helpers/profiler.h" #include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
namespace pls { namespace pls {
...@@ -16,42 +14,21 @@ namespace scheduling { ...@@ -16,42 +14,21 @@ namespace scheduling {
* Tasks are guaranteed to be executed exactly once. * Tasks are guaranteed to be executed exactly once.
* *
* Override the execute_internal() method for your custom code. * Override the execute_internal() method for your custom code.
*
* IMPORTANT:
* Tasks memory is re-used without calling the destructor.
* You must call it yourself at the end of execute_internal().
* This is done to not introduce any overhead of virtual function calls
* if no clean up is required.
*/ */
class task { class task {
friend class scheduler; friend class scheduler;
// Memory-Management (allow to allocate memory blocks in constructor) // TODO: Add ref to continuation
bool finished_construction_; task_manager::task_manager_state task_manager_state_;
// Coordinate finishing of sub_tasks
std::atomic<unsigned int> ref_count_;
task *parent_;
// Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::deque_offset deque_offset_;
protected: protected:
/*
* Must call the parent constructor.
*
* IMPORTANT:
* Tasks memory is re-used without calling the destructor.
* You must call it yourself at the end of execute_internal().
*/
explicit task(); explicit task();
/** /**
* Allow to allocate extra memory during run-time for this task. * Allow to allocate extra memory during run-time for this task.
* Memory will be pushed onto the stack (in aligned memory, thus avoid many small chunks). * Memory will be pushed onto the stack (in aligned memory, thus avoid many small chunks).
* MUST be called in constructor, never afterwards.
* *
* Memory is fully self managed. Calling e.g. deconstructors when not needing objects * Memory is fully self managed. Calling e.g. de-constructors when not needing objects
* anymore is the users responsibility (memory is simply re-used after the life time of the task ends). * anymore is the users responsibility (memory is simply re-used after the life time of the task ends).
* *
* @param size Number of bytes to be allocated * @param size Number of bytes to be allocated
...@@ -64,58 +41,10 @@ class task { ...@@ -64,58 +41,10 @@ class task {
*/ */
virtual void execute_internal() = 0; virtual void execute_internal() = 0;
template<typename T, typename ...ARGS>
void spawn_child(ARGS &&... args);
template<typename T, typename ...ARGS>
void spawn_child_and_wait(ARGS &&... args);
void wait_for_all();
private: private:
void execute(); void execute();
}; };
template<typename T, typename ...ARGS>
void task::spawn_child(ARGS &&... args) {
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<task, typename std::remove_reference<T>::type>::value, "Only pass task subclasses!");
// Keep our refcount up to date
ref_count_++;
// Push on our deque
auto item = thread_state::get()->deque_.push_task<T>(std::forward<ARGS>(args)...);
// Assign forced values (for stack and parent management)
item->parent_ = this;
item->finished_construction_ = true;
item->deque_offset_ = thread_state::get()->deque_.save_offset();
// Make new task visible to others
thread_state::get()->deque_.publish_last_task();
}
template<typename T, typename ...ARGS>
void task::spawn_child_and_wait(ARGS &&... args) {
static_assert(std::is_base_of<task, typename std::remove_reference<T>::type>::value, "Only pass task subclasses!");
spawn_child<T>(std::forward<ARGS>(args)...);
// TODO: Check why 'direct spawn' (even when pushing it onto the tas queue) seems to be slower
// (Also check if it even is slower or if it only appears so on our laptop)
// // Push on our deque
// auto task = thread_state::get()->deque_.push_task<T>(std::forward<ARGS>(args)...);
//
// // Assign forced values (for stack and parent management)
// task->parent_ = nullptr; // ...do not assign this to a parent => it will not notify our reference counter
// task->finished_construction_ = true;
// task->deque_offset_ = thread_state::get()->deque_.save_offset();
//
// // Execute it
// task->execute();
// Wait for the rest of the tasks
wait_for_all();
}
} }
} }
} }
......
...@@ -10,8 +10,40 @@ namespace pls { ...@@ -10,8 +10,40 @@ namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
// TODO: Remove forward references
class task;
/**
* Handles management of tasks in the system. Each thread has a local task manager,
* responsible for allocating, freeing and publishing tasks for stealing.
*
* The manager therefore acts as the deque found in work stealing, as well as the memory
* management for the tasks (as both are deeply intertwined in our implementation to
* integrate the memory management into the stealing procedure.
*/
class task_manager { class task_manager {
protected: using task_manager_offset = data_structures::aligned_stack::stack_offset;
public:
// Data each task needs to store to enable the 'return_task' functionality.
using task_manager_state = task_manager_offset;
// Construct a task onto the stack. Stores the previous offset in the newly constructed task.
template<class T, typename ...ARGS>
T *push_task(ARGS ...args);
// Publishes a task on the stack, i.e. makes it visible for other threads to steal.
void publish_task(task *task);
// Return a no longer needed task to the stack. Must be the current most top task (will reset the stack pointer).
void return_task(task *task);
// Try to pop a local task from this task managers stack.
task *pop_local_task();
// Try to steal a task from a remote task_manager instance.
// The returned task pointer is valid during the lifetyme of the task.
// The returned task pointer must be returned to this task_manager instance.
// (This is because we can either decide to just steal a remote task pointer or to copy the whole task)
task *pop_remote_task(task_manager &other);
explicit task_manager(data_structures::aligned_stack &task_stack) : task_stack_{task_stack} {} explicit task_manager(data_structures::aligned_stack &task_stack) : task_stack_{task_stack} {}
private: private:
...@@ -19,12 +51,14 @@ class task_manager { ...@@ -19,12 +51,14 @@ class task_manager {
}; };
template<size_t NUM_TASKS, size_t MAX_STACK_SIZE> template<size_t NUM_TASKS, size_t MAX_STACK_SIZE>
class static_task_manager : public task_manager { class static_task_manager {
public: public:
static_task_manager() : task_manager{static_task_stack_} {}; static_task_manager() : static_task_stack_{}, task_manager_{static_task_stack_} {};
task_manager &get_task_manager() { return task_manager_; }
private: private:
data_structures::static_aligned_stack<MAX_STACK_SIZE> static_task_stack_; data_structures::static_aligned_stack<MAX_STACK_SIZE> static_task_stack_;
task_manager task_manager_;
}; };
} }
......
...@@ -29,7 +29,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { ...@@ -29,7 +29,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
protected: public:
thread_state(task_manager &task_manager, thread_state(task_manager &task_manager,
cont_manager &cont_manager) : cont_manager &cont_manager) :
scheduler_{nullptr}, scheduler_{nullptr},
...@@ -39,7 +39,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { ...@@ -39,7 +39,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
current_task_{nullptr}, current_task_{nullptr},
random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {} random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {}
public:
/** /**
* Convenience helper to get the thread_state instance associated with this thread. * Convenience helper to get the thread_state instance associated with this thread.
* Must only be called on threads that are associated with a thread_state, * Must only be called on threads that are associated with a thread_state,
...@@ -60,13 +59,18 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { ...@@ -60,13 +59,18 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
}; };
template<size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE> template<size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
struct static_thread_state : public thread_state { struct static_thread_state {
public: public:
static_thread_state() : thread_state{static_task_manager_, static_cont_manager_} {} static_thread_state()
: static_task_manager_{},
static_cont_manager_{},
thread_state_{static_task_manager_.get_task_manager(), static_cont_manager_.get_cont_manager()} {}
thread_state &get_thread_state() { return thread_state_; }
private: private:
static_task_manager<NUM_TASKS, MAX_TASK_STACK_SIZE> static_task_manager_; static_task_manager<NUM_TASKS, MAX_TASK_STACK_SIZE> static_task_manager_;
static_cont_manager<NUM_CONTS, MAX_CONT_SIZE> static_cont_manager_; static_cont_manager<NUM_CONTS, MAX_CONT_SIZE> static_cont_manager_;
thread_state thread_state_;
}; };
} }
......
...@@ -6,6 +6,7 @@ namespace internal { ...@@ -6,6 +6,7 @@ namespace internal {
namespace data_structures { namespace data_structures {
aligned_stack::aligned_stack(char *memory_pointer, size_t size) : aligned_stack::aligned_stack(char *memory_pointer, size_t size) :
unaligned_memory_pointer_{memory_pointer},
memory_pointer_{memory_pointer}, // MUST be aligned memory_pointer_{memory_pointer}, // MUST be aligned
max_offset_{size / base::system_details::CACHE_LINE_SIZE}, max_offset_{size / base::system_details::CACHE_LINE_SIZE},
current_offset_{0} { current_offset_{0} {
...@@ -16,7 +17,8 @@ aligned_stack::aligned_stack(char *memory_pointer, size_t size) : ...@@ -16,7 +17,8 @@ aligned_stack::aligned_stack(char *memory_pointer, size_t size) :
aligned_stack::aligned_stack(char *unaligned_memory_pointer, size_t size, size_t unaligned_size) : aligned_stack::aligned_stack(char *unaligned_memory_pointer, size_t size, size_t unaligned_size) :
unaligned_memory_pointer_{unaligned_memory_pointer}, unaligned_memory_pointer_{unaligned_memory_pointer},
memory_pointer_{base::alignment::next_alignment(unaligned_memory_pointer)}, memory_pointer_{base::alignment::next_alignment(unaligned_memory_pointer)},
max_offset_{unaligned_size / base::system_details::CACHE_LINE_SIZE} { max_offset_{unaligned_size / base::system_details::CACHE_LINE_SIZE},
current_offset_{0} {
PLS_ASSERT(size == base::alignment::previous_alignment(unaligned_size), PLS_ASSERT(size == base::alignment::previous_alignment(unaligned_size),
"Initialized aligned stack with invalid memory configuration!") "Initialized aligned stack with invalid memory configuration!")
} }
......
...@@ -8,46 +8,6 @@ namespace pls { ...@@ -8,46 +8,6 @@ namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
task::task() :
finished_construction_{false},
ref_count_{0},
parent_{nullptr},
deque_offset_{0} {}
void *task::allocate_memory(long size) {
if (finished_construction_) {
PLS_ERROR("Must not allocate dynamic task memory after it's construction.")
}
return thread_state::get()->deque_.push_bytes(size);
}
void task::execute() {
PROFILE_WORK_BLOCK("execute task")
auto last_executing = thread_state::get()->current_task_;
thread_state::get()->current_task_ = this;
execute_internal();
PROFILE_END_BLOCK
wait_for_all();
thread_state::get()->current_task_ = last_executing;
if (parent_ != nullptr) {
parent_->ref_count_--;
}
}
void task::wait_for_all() {
auto scheduler = thread_state::get()->scheduler_;
while (ref_count_ > 0) {
if (!scheduler->try_execute_local()) {
scheduler->try_execute_stolen();
}
}
thread_state::get()->deque_.reset_offset(deque_offset_);
}
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment