Commit 69fd7e0c by FritzFlorian

WIP: Refactor memory manager to reduce redundancy.

It is still not working, however we now have no more redundant code, making debugging it simpler.
parent 8668cad2
Pipeline #1338 failed with stages
in 27 seconds
......@@ -13,9 +13,11 @@ constexpr size_t NUM_TASKS = 64;
constexpr size_t MAX_TASK_STACK_SIZE = 0;
constexpr size_t NUM_CONTS = 64;
constexpr size_t MAX_CONT_SIZE = 128;
constexpr size_t MAX_CONT_SIZE = 256;
std::atomic<int> count{0};
scheduling::parallel_result<int> fib(int n) {
std::cout << "Fib(" << n << "): " << count++ << ", " << scheduling::thread_state::get().get_id() << std::endl;
if (n == 0) {
return 0;
}
......@@ -27,12 +29,15 @@ scheduling::parallel_result<int> fib(int n) {
return fib(n - 1);
}, [=]() {
return fib(n - 2);
}).then([](int a, int b) {
return a + b;
}).then([=](int a, int b) {
scheduling::parallel_result<int> result{a + b};
std::cout << "Done Fib(" << n << "): " << (a + b) << ", " << scheduling::thread_state::get().get_id() << std::endl;
return result;
});
}
int fib_normal(int n) {
std::cout << "Fib(" << n << "): " << count++ << std::endl;
if (n == 0) {
return 0;
}
......@@ -40,7 +45,9 @@ int fib_normal(int n) {
return 1;
}
return fib_normal(n - 1) + fib_normal(n - 2);
int result = fib_normal(n - 1) + fib_normal(n - 2);
std::cout << "Done Fib(" << n << "): " << result << std::endl;
return result;
}
int main() {
......@@ -53,7 +60,7 @@ int main() {
scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS};
auto start = std::chrono::steady_clock::now();
std::cout << "fib = " << fib_normal(39) << std::endl;
// std::cout << "fib = " << fib_normal(10) << std::endl;
auto end = std::chrono::steady_clock::now();
std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< std::endl;
......@@ -61,14 +68,7 @@ int main() {
start = std::chrono::steady_clock::now();
scheduler.perform_work([]() {
return scheduling::scheduler::par([]() {
return fib(39);
}, []() {
return scheduling::parallel_result<int>{0};
}).then([](int a, int b) {
std::cout << "fib = " << a + b << std::endl;
return a + b;
});
return fib(10);
});
end = std::chrono::steady_clock::now();
......
......@@ -57,8 +57,8 @@ add_library(pls STATIC
include/pls/internal/scheduling/task_manager.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/cont_manager.h
include/pls/internal/scheduling/continuation.h
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h)
include/pls/internal/scheduling/cont.h
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/cont_manager_impl.h include/pls/internal/scheduling/thread_state_static.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......
......@@ -13,11 +13,19 @@ namespace internal {
namespace base {
namespace alignment {
system_details::pointer_t next_alignment(system_details::pointer_t size,
size_t alignment = system_details::CACHE_LINE_SIZE);
constexpr system_details::pointer_t next_alignment(system_details::pointer_t size,
size_t alignment = system_details::CACHE_LINE_SIZE) {
return (size % alignment) == 0 ?
size :
size + (alignment - (size % alignment));
}
system_details::pointer_t previous_alignment(system_details::pointer_t size,
size_t alignment = system_details::CACHE_LINE_SIZE);
constexpr system_details::pointer_t previous_alignment(system_details::pointer_t size,
size_t alignment = system_details::CACHE_LINE_SIZE) {
return (size % alignment) == 0 ?
size :
size - (size % alignment);
}
char *next_alignment(char *pointer, size_t alignment = system_details::CACHE_LINE_SIZE);
......
......@@ -34,7 +34,7 @@ class bounded_ws_deque {
bounded_ws_deque(T *item_array, size_t size) : size_{size}, item_array_{item_array} {}
void push_bottom(T item) {
item_array_[bottom_] = item;
item_array_[local_bottom_] = item;
local_bottom_++;
bottom_.store(local_bottom_, std::memory_order_release);
}
......
......@@ -43,7 +43,9 @@ class delayed_initialization {
template<typename ...ARGS>
void initialize(ARGS &&...args) {
PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!")
if (initialized_) {
PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!");
}
new((void *) memory_.data()) T(std::forward<ARGS>(args)...);
initialized_ = true;
......@@ -57,7 +59,9 @@ class delayed_initialization {
}
T &object() {
if (!initialized_) {
PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!")
}
return *reinterpret_cast<T *>(memory_.data());
}
......
......@@ -18,6 +18,14 @@ struct stamped_integer {
stamped_integer() : stamp{0}, value{0} {};
stamped_integer(member_t new_value) : stamp{0}, value{new_value} {};
stamped_integer(member_t new_stamp, member_t new_value) : stamp{new_stamp}, value{new_value} {};
bool operator==(const stamped_integer &other) const noexcept {
return stamp == other.stamp && value == other.value;
}
bool operator!=(const stamped_integer &other) const noexcept {
return !(*this == other);
}
};
}
......
#ifndef PLS_INTERNAL_SCHEDULING_CONT_H_
#define PLS_INTERNAL_SCHEDULING_CONT_H_
#include <type_traits>
#include <atomic>
#include <utility>
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/data_structures/delayed_initialization.h"
#include "pls/internal/base/alignment.h"
#include "parallel_result.h"
#include "memory_block.h"
namespace pls {
namespace internal {
namespace scheduling {
class base_cont {
protected:
// We plan to only init the members for a continuation on the slow path.
// If we can execute everything inline we simply skip it saving runtime overhead.
template<typename T>
using delayed_init = data_structures::delayed_initialization<T>;
public:
explicit base_cont(base_cont *parent, memory_block *memory_block, bool is_right_child)
: parent_{parent},
memory_block_{memory_block},
is_right_child_{is_right_child} {};
/**
* Execute the continuation itself.
* Make sure to only call when all required results are in.
* Will store the result in it's parent, but not mess with any counters.
*/
virtual void execute() = 0;
/**
* Execute the right hand side task associated with the continuation.
* Will store the result in it's parent, but not mess with any counters.
*/
virtual void execute_task() = 0;
virtual void *get_right_result_pointer() = 0;
virtual void *get_left_result_pointer() = 0;
template<typename T>
void store_right_result(T &&result) {
using BASE_T = typename std::remove_cv<typename std::remove_reference<T>::type>::type;
reinterpret_cast<delayed_init<BASE_T> *>(get_right_result_pointer())->initialize(std::forward<T>(result));
}
template<typename T>
void store_left_result(T &&result) {
using BASE_T = typename std::remove_cv<typename std::remove_reference<T>::type>::type;
reinterpret_cast<delayed_init<BASE_T> *>(get_left_result_pointer())->initialize(std::forward<T>(result));
}
base_cont *get_parent() { return parent_; }
memory_block *get_memory_block() { return memory_block_; }
bool is_right_child() const { return is_right_child_; }
protected:
base_cont *parent_;
memory_block *memory_block_;
bool is_right_child_;
};
template<typename T2, typename R1, typename R2, typename F>
class cont : public base_cont {
private:
template<typename RES_TYPE>
struct result_runner {
// Strip off unwanted modifiers...
using BASE_RES_TYPE = typename std::remove_cv<typename std::remove_reference<RES_TYPE>::type>::type;
static void execute(cont &cont) {
parallel_result<BASE_RES_TYPE> result{cont.function_((*cont.result_1_).value(), (*cont.result_2_).value())};
if (result.fast_path() && cont.parent_ != nullptr) {
if (cont.is_right_child()) {
cont.parent_->store_right_result(std::move(result));
} else {
cont.parent_->store_left_result(std::move(result));
}
}
}
};
template<typename INNER_TYPE>
struct result_runner<parallel_result<INNER_TYPE>> {
static void execute(cont &cont) {
auto result = cont.function_((*cont.result_1_).value(), (*cont.result_2_).value());
if (result.fast_path() && cont.parent_) {
if (cont.is_right_child()) {
cont.parent_->store_right_result(std::move(result));
} else {
cont.parent_->store_left_result(std::move(result));
}
}
}
};
public:
template<typename FARG, typename ...T2ARGS>
explicit cont(base_cont *parent,
memory_block *memory_block,
bool is_right_child,
FARG &&function,
T2ARGS...task_2_args):
base_cont(parent, memory_block, is_right_child),
function_{std::forward<FARG>(function)},
task_{std::forward<T2ARGS>(task_2_args)..., this} {};
void execute() override {
using result_type = decltype(function_((*result_1_).value(), (*result_2_).value()));
result_runner<result_type>::execute(*this);
this->get_memory_block()->free_buffer();
this->~cont();
}
void execute_task() override {
task_.execute();
}
void *get_right_result_pointer() override {
return &result_1_;
}
void *get_left_result_pointer() override {
return &result_2_;
}
T2 *get_task() {
return &task_;
}
private:
// Initial data members. These slow down the fast path, try to init them lazy when possible.
F function_;
T2 task_;
// Some fields/actual values stay uninitialized (save time on the fast path if we don not need them).
// More fields untouched on the fast path is good, but for ease of an implementation we only keep some for now.
delayed_init<R1> result_1_;
delayed_init<R2> result_2_;
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_CONT_H_
#ifndef PLS_INTERNAL_SCHEDULING_CONTINUATION_H_
#define PLS_INTERNAL_SCHEDULING_CONTINUATION_H_
#include <type_traits>
#include <atomic>
#include <utility>
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/data_structures/delayed_initialization.h"
#include "pls/internal/base/alignment.h"
#include "parallel_result.h"
namespace pls {
namespace internal {
namespace scheduling {
class continuation_node;
class base_continuation {
friend class cont_manager;
protected:
// We plan to only init the members for a continuation on the slow path.
// If we can execute everything inline we simply skip it saving runtime overhead.
template<typename T>
using delayed_init = data_structures::delayed_initialization<T>;
public:
explicit base_continuation(base_continuation *parent, continuation_node *node, unsigned int result_index = 0)
: parent_{parent},
node_{node},
result_index_{result_index} {}
virtual void execute() = 0;
virtual void execute_task() = 0;
virtual ~base_continuation() = default;
virtual void *get_result_pointer(unsigned short index) = 0;
template<typename T>
void store_result(unsigned short index, T &&result) {
using BASE_T = typename std::remove_cv<typename std::remove_reference<T>::type>::type;
reinterpret_cast<delayed_init<BASE_T> *>(get_result_pointer(index))->initialize(std::forward<T>(result));
}
base_continuation *get_parent() { return parent_; }
continuation_node *get_cont_node() { return node_; }
protected:
base_continuation *parent_;
continuation_node *node_;
unsigned int result_index_;
};
class continuation_node {
friend class cont_manager;
public:
continuation_node(char *memory, continuation_node *cont_chain_start, continuation_node *prev)
: cont_chain_start_{cont_chain_start},
prev_{prev},
memory_{memory} {}
// Management of the associated continuation
template<typename T, typename ...ARGS>
T *init_continuation(ARGS &&...args) {
PLS_ASSERT(continuation_ == nullptr, "Must only allocate one continuation at once per node.")
auto *result = new(memory_) T(std::forward<ARGS>(args)...);
continuation_ = result;
return result;
}
void destroy_continuation() {
// Deconstruct Continuation
continuation_->~base_continuation();
continuation_ = nullptr;
// Reset Associated counters
results_missing_.store(2);
offered_chain_.store(nullptr);
auto old_state = state_.load();
state_.store({old_state.stamp + 1, initialized});
}
template<typename T>
void destroy_continuation_fast() {
(*reinterpret_cast<T *>(continuation_)).~T();
continuation_ = nullptr;
}
base_continuation *get_continuation() {
return continuation_;
}
continuation_node *get_prev() {
return prev_;
}
bool is_end_of_cont_chain() {
return prev_ == continuation_->get_parent()->get_cont_node();
}
private:
// Linked list property of continuations (continuation chains as memory management).
// Each continuation knows its chain start to allow stealing a whole chain in O(1)
// without the need to traverse back to the chain start.
continuation_node *cont_chain_start_;
continuation_node *prev_, *next_{nullptr};
// When blocked on this continuation, we need to know what other chain we
// got offered by the stealing thread.
// For this we need only the head of the other chain (as each continuation is a
// self describing entity for its chain up to the given node).
std::atomic<continuation_node *> offered_chain_{nullptr};
// Management for coordinating concurrent result writing and stealing.
// The result count decides atomically who gets to execute the continuation.
std::atomic<unsigned short> results_missing_{2};
// The flag is needed for an ongoing stealing request.
// Stealing threads need to offer their continuation chain before the
// 'fully' own the stolen task. As long as that is not done the continuation
// chain can abort the steal request in order to be not blocked without a
// new, clean continuation chain to work with.
enum state { initialized, execute_local, stealing, stolen };
using stamped_state = data_structures::stamped_integer;
std::atomic<stamped_state> state_{{initialized}};
// Pointer to memory region reserved for the companion continuation.
// Must be a buffer big enough to hold any continuation encountered in the program.
// This memory is managed explicitly by the continuation manager and runtime system
// (they need to make sure to always call de-constructors and never allocate two continuations).
char *memory_;
base_continuation *continuation_{nullptr};
};
template<typename T2, typename R1, typename R2, typename F>
class continuation : public base_continuation {
private:
template<typename RES_TYPE>
struct result_runner {
// Strip off unwanted modifiers...
using BASE_RES_TYPE = typename std::remove_cv<typename std::remove_reference<RES_TYPE>::type>::type;
static void execute(continuation &cont) {
parallel_result<BASE_RES_TYPE> result{cont.function_((*cont.result_1_).value(), (*cont.result_2_).value())};
if (result.fast_path()) {
cont.parent_->store_result(cont.result_index_, std::move(result));
}
}
};
template<typename INNER_TYPE>
struct result_runner<parallel_result<INNER_TYPE>> {
static void execute(continuation &cont) {
auto result = cont.function_((*cont.result_1_).value(), (*cont.result_2_).value());
if (result.fast_path()) {
cont.parent_->store_result(cont.result_index_, std::move(result));
}
}
};
public:
template<typename FARG, typename ...T2ARGS>
explicit continuation(base_continuation *parent,
continuation_node *node,
unsigned int result_index,
FARG &&function,
T2ARGS...task_2_args):
base_continuation(parent, node, result_index),
function_{std::forward<FARG>(function)},
task_{std::forward<T2ARGS>(task_2_args)...} {}
~continuation() override = default;
void execute() override {
using result_type = decltype(function_((*result_1_).value(), (*result_2_).value()));
result_runner<result_type>::execute(*this);
}
void execute_task() override {
task_.execute();
}
void *get_result_pointer(unsigned short index) override {
switch (index) {
case 0:return &result_1_;
case 1:return &result_2_;
default: PLS_ERROR("Unexpected Result Index!")
}
}
T2 *get_task() {
return &task_;
}
private:
// Initial data members. These slow down the fast path, try to init them lazy when possible.
F function_;
T2 task_;
// Some fields/actual values stay uninitialized (save time on the fast path if we don not need them).
// More fields untouched on the fast path is good, but for ease of an implementation we only keep some for now.
delayed_init<R1> result_1_;
delayed_init<R2> result_2_;
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_CONTINUATION_H_
......@@ -9,8 +9,11 @@ namespace pls {
namespace internal {
namespace scheduling {
// Used to more enforce the use of parallel_results
class parallel_result_base {};
template<typename T>
class parallel_result {
class parallel_result : public parallel_result_base {
public:
using value_type = T;
......
......@@ -3,7 +3,7 @@
#define PLS_SCHEDULER_IMPL_H
#include <utility>
#include "pls/internal/scheduling/continuation.h"
#include "pls/internal/scheduling/cont.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/task.h"
......@@ -18,6 +18,12 @@ struct scheduler::starter {
using return_type_1 = decltype(function_1_());
using return_type_2 = decltype(function_2_());
// Enforce correct return types of lambdas (parallel_result)
static_assert(std::is_base_of<parallel_result_base, return_type_1>::value,
"Must only return parallel results in parallel code");
static_assert(std::is_base_of<parallel_result_base, return_type_2>::value,
"Must only return parallel results in parallel code");
template<typename F1ARG, typename F2ARG>
explicit starter(F1ARG &&function_1, F2ARG &&function_2) : function_1_{std::forward<F1ARG>(function_1)},
function_2_{std::forward<F2ARG>(function_2)} {};
......@@ -26,64 +32,78 @@ struct scheduler::starter {
auto then(FCONT &&cont_function)
-> decltype(cont_function(std::declval<typename return_type_1::value_type>(),
std::declval<typename return_type_2::value_type>())) {
using continuation_type = continuation<task<F2>, return_type_1, return_type_2, FCONT>;
using continuation_type = cont<task<F2>, return_type_1, return_type_2, FCONT>;
using result_type = decltype(cont_function(std::declval<typename return_type_1::value_type>(),
std::declval<typename return_type_2::value_type>()));
auto &my_state = thread_state::get();
auto &cont_manager = my_state.get_cont_manager();
PLS_ASSERT(sizeof(continuation_type) <= cont_manager.get_max_cont_size(),
"Must stay within the size limit of the static memory configuration.");
// Select current continuation.
// Select current memory block.
// For now directly copy both the continuation function and the second task.
auto *current_cont_node = cont_manager.fast_path_get_next();
// TODO: Fix null pointers at very first spawn...
// In the fast path case we are always on the left side of the tree
// and our prev cont chain node always also holds our parent continuation.
base_continuation *parent_cont =
current_cont_node->get_prev() == nullptr ? nullptr : current_cont_node->get_prev()->get_continuation();
unsigned short int result_index = 0;
auto *current_cont = current_cont_node->init_continuation<continuation_type>(parent_cont,
current_cont_node,
result_index,
// (We might optimize this in the future to require less memory copies.)
auto *current_memory_block = cont_manager.get_next_memory_block();
// We set the correct side when invoking user code.
const bool is_right_cont = my_state.right_spawn_;
// We keep track of the last spawn to build up the parent_cont chain
base_cont *parent_cont = my_state.parent_cont_;
continuation_type *current_cont = current_memory_block->place_in_buffer<continuation_type>(parent_cont,
current_memory_block,
is_right_cont,
cont_function,
function_2_,
current_cont_node);
function_2_);
my_state.parent_cont_ = current_cont;
// Publish the second task.
my_state.get_task_manager().publish_task(current_cont->get_task());
// Call first function on fast path
my_state.right_spawn_ = false;
return_type_1 result_1 = function_1_();
if (cont_manager.falling_through()) {
return result_type{}; // Unwind stack...
// Unwind stack...
return result_type{};
}
// Try to call second function on fast path
if (my_state.get_task_manager().steal_local_task()) {
my_state.right_spawn_ = true;
return_type_2 result_2 = function_2_();
if (cont_manager.falling_through()) {
return result_type{}; // Unwind stack...
// Main scheduling loop is responsible for entering the result to the slow path...
current_cont->store_left_result(std::move(result_1));
cont_manager.fall_through_and_notify_cont(current_cont, false);
// Unwind stack...
return result_type{};
}
// We fully got all results, inline as good as possible.
// This is the common case, branch prediction should be rather good here.
// Just return the cont object unused and directly call the function.
current_cont_node->destroy_continuation_fast<continuation_type>();
my_state.get_cont_manager().fast_path_return();
current_cont->~continuation_type();
current_memory_block->free_buffer();
cont_manager.return_memory_block();
// The continuation has the same execution environment as we had for the children.
// We need this to allow spawns in there.
my_state.parent_cont_ = parent_cont;
my_state.right_spawn_ = is_right_cont;
auto cont_result = cont_function(result_1.value(), result_2.value());
if (cont_manager.falling_through()) {
return result_type{}; // Unwind stack...
// Unwind stack...
return result_type{};
}
return cont_result;
}
cont_manager.fall_through();
// Main scheduling loop is responsible for entering the result to the slow path...
current_cont->store_left_result(std::move(result_1));
cont_manager.fall_through_and_notify_cont(current_cont, false);
// Unwind stack...
return result_type{};
};
};
......@@ -101,24 +121,23 @@ scheduler::starter<F1, F2> scheduler::par(F1 &&function_1, F2 &&function_2) {
class scheduler::init_function {
public:
virtual parallel_result<int> run() = 0;
virtual void run() = 0;
};
template<typename F>
class scheduler::init_function_impl : public init_function {
public:
explicit init_function_impl(F &function) : function_{function} {}
parallel_result<int> run() override {
return scheduler::par([]() {
// No-op
void run() override {
scheduler::par([]() {
std::cout << "Dummy Strain, " << scheduling::thread_state::get().get_id() << std::endl;
return parallel_result<int>{0};
}, [=]() {
function_();
return parallel_result<int>{0};
}).then([](const int &, const int &) {
// Notify that work is done after finishing the last user continuation.
return function_();
}).then([=](int, int) {
thread_state::get().get_scheduler().work_section_done_ = true;
return parallel_result<int>{0};
});
}
private:
F &function_;
......
......@@ -5,6 +5,7 @@
#include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/thread_state_static.h"
namespace pls {
namespace internal {
......@@ -43,7 +44,7 @@ class static_scheduler_memory : public scheduler_memory {
}
private:
using thread_state_type = static_thread_state<NUM_TASKS, MAX_TASK_STACK_SIZE, NUM_CONTS, MAX_CONT_SIZE>;
using thread_state_type = thread_state_static<NUM_TASKS, MAX_TASK_STACK_SIZE, NUM_CONTS, MAX_CONT_SIZE>;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<base::thread, MAX_THREADS> threads_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state_type, MAX_THREADS> thread_states_;
......@@ -77,7 +78,7 @@ class heap_scheduler_memory : public scheduler_memory {
}
private:
using thread_state_type = static_thread_state<NUM_TASKS, MAX_TASK_STACK_SIZE, NUM_CONTS, MAX_CONT_SIZE>;
using thread_state_type = thread_state_static<NUM_TASKS, MAX_TASK_STACK_SIZE, NUM_CONTS, MAX_CONT_SIZE>;
// thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require
// the new operator to obey alignments bigger than 16, cache lines are usually 64).
// To allow this object to be allocated using 'new' (which the vector does internally),
......
#ifndef PLS_TASK_H
#define PLS_TASK_H
#include "pls/internal/scheduling/continuation.h"
#include "pls/internal/scheduling/cont_manager.h"
#include "pls/internal/scheduling/cont.h"
#include "pls/internal/scheduling/memory_block.h"
namespace pls {
namespace internal {
......@@ -15,14 +15,6 @@ namespace scheduling {
* Override the execute_internal() method for your custom code.
*/
class base_task {
protected:
base_task() = default;
/**
* Overwrite this with the actual behaviour of concrete tasks.
*/
virtual void execute_internal() = 0;
public:
/**
* Executes the task and stores its result in the correct continuation.
......@@ -31,22 +23,39 @@ class base_task {
void execute() {
execute_internal();
}
base_cont *get_cont() {
return cont_;
}
protected:
explicit base_task(base_cont *cont) : cont_{cont} {};
/**
* Overwrite this with the actual behaviour of concrete tasks.
*/
virtual void execute_internal() = 0;
base_cont *cont_;
};
template<typename F>
class task : public base_task {
public:
template<typename FARG>
explicit task(FARG &&function, continuation_node *continuation_node)
: base_task{}, function_{std::forward<FARG>(function)}, continuation_node_{continuation_node} {}
explicit task(FARG &&function, base_cont *cont)
: base_task{cont}, function_{std::forward<FARG>(function)} {}
void execute_internal() override {
continuation_node_->get_continuation()->store_result<decltype(function_())>(1, function_());
auto result = function_();
if (result.fast_path()) {
cont_->store_right_result(std::move(result));
}
}
private:
F function_;
continuation_node *continuation_node_;
};
}
......
......@@ -8,6 +8,8 @@
#include <atomic>
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/cont_manager.h"
#include "pls/internal/scheduling/memory_block.h"
#include "pls/internal/data_structures/bounded_ws_deque.h"
#include "pls/internal/data_structures/stamped_integer.h"
......@@ -17,9 +19,15 @@ namespace scheduling {
struct task_handle {
public:
task_handle() : task_{nullptr} {};
explicit task_handle(base_task *task) : task_{task} {};
task_handle() : task_{nullptr}, task_memory_block_{nullptr} {};
explicit task_handle(base_task *task) : task_{task},
task_memory_block_{task->get_cont()->get_memory_block()} {};
base_task *task_;
// This seems redundant first, but is needed for a race-free steal.
// It could happen that the task's memory is overwritten and the pointer to it's memory block gets invalid.
// We can do this more elegantly in the future.
memory_block *task_memory_block_;
};
/**
......@@ -40,8 +48,47 @@ class task_manager {
// Try to steal a task from a remote task_manager instance. The stolen task must be stored locally.
// Returns a pair containing the actual task and if the steal was successful.
// TODO: Re-implement after fast path is done
// std::pair<task, bool> steal_remote_task(task_manager &other);
base_task *steal_remote_task(cont_manager &stealing_cont_manager) {
auto stolen_task_handle = task_deque_.pop_top();
if (stolen_task_handle) {
base_task *stolen_task = (*stolen_task_handle).task_;
auto &atomic_state = (*stolen_task_handle).task_memory_block_->get_state();
auto &atomic_offered_chain = (*stolen_task_handle).task_memory_block_->get_offered_chain();
auto offered_chain = stealing_cont_manager.get_node((*stolen_task_handle).task_memory_block_->get_depth());
auto last_state = atomic_state.load();
if (last_state.value != memory_block::initialized) {
return nullptr;
}
auto last_offered_chain = atomic_offered_chain.load();
memory_block::stamped_state loop_state = {last_state.stamp + 1, memory_block::stealing};
if (atomic_state.compare_exchange_strong(last_state, loop_state)) {
while (true) {
if (atomic_offered_chain.compare_exchange_strong(last_offered_chain, offered_chain)) {
break;
}
last_offered_chain = atomic_offered_chain.load();
last_state = atomic_state.load();
if (last_state != loop_state) {
return nullptr;
}
}
if (atomic_state.compare_exchange_strong(loop_state, {loop_state.stamp + 1, memory_block::stolen})) {
return stolen_task;
} else {
return nullptr;
}
} else {
return nullptr;
}
} else {
return nullptr;
}
}
explicit task_manager(data_structures::bounded_ws_deque<task_handle> &task_deque) : task_deque_{task_deque} {}
......
......@@ -7,21 +7,27 @@
#include <array>
#include <chrono>
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/cont_manager.h"
namespace pls {
namespace internal {
namespace scheduling {
// forward declaration
class task_manager;
class cont_manager;
class scheduler;
class base_task;
class base_cont;
struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
scheduler *scheduler_;
size_t id_;
// Keep track of the last spawn state (needed to chain tasks/conts correctly)
bool right_spawn_;
base_cont *parent_cont_;
// TODO: Set this when spawning!
// See if we should move this to the cont manager...seems like a better fit!
task_manager &task_manager_;
cont_manager &cont_manager_;
......@@ -33,6 +39,8 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
cont_manager &cont_manager) :
scheduler_{nullptr},
id_{0},
right_spawn_{false},
parent_cont_{nullptr},
task_manager_{task_manager},
cont_manager_{cont_manager},
current_task_{nullptr},
......@@ -62,21 +70,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
thread_state &operator=(const thread_state &) = delete;
};
template<size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
struct static_thread_state {
public:
static_thread_state()
: static_task_manager_{},
static_cont_manager_{},
thread_state_{static_task_manager_.get_task_manager(), static_cont_manager_.get_cont_manager()} {}
thread_state &get_thread_state() { return thread_state_; }
private:
static_task_manager<NUM_TASKS, MAX_TASK_STACK_SIZE> static_task_manager_;
static_cont_manager<NUM_CONTS, MAX_CONT_SIZE> static_cont_manager_;
thread_state thread_state_;
};
}
}
}
......
......@@ -5,20 +5,6 @@ namespace internal {
namespace base {
namespace alignment {
system_details::pointer_t next_alignment(system_details::pointer_t size,
size_t alignment) {
return (size % alignment) == 0 ?
size :
size + (alignment - (size % alignment));
}
system_details::pointer_t previous_alignment(system_details::pointer_t size,
size_t alignment) {
return (size % alignment) == 0 ?
size :
size - (size % alignment);
}
char *next_alignment(char *pointer, size_t alignment) {
return reinterpret_cast<char *>(next_alignment(reinterpret_cast<system_details::pointer_t >(pointer), alignment));
}
......
......@@ -54,20 +54,44 @@ void scheduler::work_thread_main_loop() {
void scheduler::work_thread_work_section() {
auto &my_state = thread_state::get();
auto &my_cont_manager = my_state.get_cont_manager();
auto const num_threads = my_state.get_scheduler().num_threads();
auto const my_id = my_state.get_id();
if (my_state.get_id() == 0) {
// Main Thread, kick of by executing the user's main code block.
// Main Thread, kick off by executing the user's main code block.
main_thread_starter_function_->run();
}
do {
// TODO: Implement other threads, for now we are happy if it compiles and runs on one thread
// For now we can test without this, as the fast path should never hit this.
// 1) Try Steal
// 2) Copy Over
// 3) Finish Steal
// 4) Execute Local Copy
// Work off pending continuations we need to execute locally
while (my_cont_manager.falling_through()) {
my_cont_manager.execute_fall_through_code();
}
// Steal Routine (will be continuously executed when there are no more fall through's).
// TODO: move into separate function
const size_t offset = my_state.random_() % num_threads;
const size_t max_tries = num_threads - 1;
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % num_threads;
// Skip our self for stealing
target = ((target == my_id) + target) % num_threads;
auto &target_state = my_state.get_scheduler().thread_state_for(target);
auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager);
if (stolen_task != nullptr) {
my_state.parent_cont_ = stolen_task->get_cont();
my_state.right_spawn_ = true;
my_cont_manager.set_active_depth(stolen_task->get_cont()->get_memory_block()->get_depth() + 1);
stolen_task->execute();
}
}
} while (!work_section_done_);
}
void scheduler::terminate() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment