Commit eecbe38d by FritzFlorian

Sketch out deque sync in case of a steal.

parent 0141a57a
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/static_scheduler_memory.h"
using namespace pls::internal::scheduling;
......
......@@ -61,7 +61,11 @@ add_library(pls STATIC
src/internal/base/error_handling.cpp
include/pls/internal/data_structures/bounded_trading_deque.h
include/pls/internal/scheduling/external_trading_deque.h
include/pls/internal/scheduling/traded_cas_field.h)
include/pls/internal/scheduling/traded_cas_field.h
include/pls/internal/scheduling/task_manager_impl.h
include/pls/internal/scheduling/static_scheduler_memory.h
include/pls/internal/scheduling/heap_scheduler_memory.h
src/internal/scheduling/task_manager.cpp)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......
#ifndef PLS_HEAP_SCHEDULER_MEMORY_H
#define PLS_HEAP_SCHEDULER_MEMORY_H
#include <vector>
#include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/thread_state_static.h"
namespace pls {
namespace internal {
namespace scheduling {
template<size_t NUM_TASKS, size_t STACK_SIZE>
class heap_scheduler_memory : public scheduler_memory {
public:
explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads},
thread_vector_{},
thread_state_vector_{},
thread_state_pointers_{} {
thread_vector_.reserve(max_threads);
thread_state_vector_.reserve(max_threads);
for (size_t i = 0; i < max_threads; i++) {
thread_vector_.emplace_back();
thread_state_vector_.emplace_back();
thread_state_pointers_.emplace_back(&thread_state_vector_[i].get_thread_state());
}
thread_states_array_ = thread_state_pointers_.data();
}
size_t max_threads() const override {
return max_threads_;
}
base::thread &thread_for(size_t id) override {
return thread_vector_[id];
}
private:
using thread_state_type = thread_state_static<NUM_TASKS, STACK_SIZE>;
// thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require
// the new operator to obey alignments bigger than 16, cache lines are usually 64).
// To allow this object to be allocated using 'new' (which the vector does internally),
// we need to wrap it in an non aligned object.
using thread_state_wrapper = base::alignment::cache_alignment_wrapper<thread_state_type>;
size_t max_threads_;
std::vector<base::thread> thread_vector_;
std::vector<thread_state_wrapper> thread_state_vector_;
std::vector<thread_state *> thread_state_pointers_;
};
}
}
}
#endif // PLS_HEOP_SCHEDULER_MEMORY_H
......@@ -2,8 +2,7 @@
#ifndef PLS_SCHEDULER_H
#define PLS_SCHEDULER_H
#include <array>
#include <iostream>
#include <atomic>
#include "pls/internal/helpers/profiler.h"
......@@ -12,18 +11,19 @@
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h"
namespace pls {
namespace internal {
namespace scheduling {
struct task;
/**
* The scheduler is the central part of the dispatching-framework.
* It manages a pool of worker threads (creates, sleeps/wakes up, destroys)
* and allows to execute parallel sections.
*
* It works in close rellation with the 'task' class for scheduling.
* It works in close relation with the 'task' class for scheduling.
*/
class scheduler {
public:
......@@ -54,9 +54,9 @@ class scheduler {
void perform_work(Function work_section);
template<typename Function>
static void spawn(Function &&lambda) {
thread_state::get().get_task_manager().spawn_child(std::forward<Function>(lambda));
}
static void spawn(Function &&lambda);
thread_state &thread_state_for(size_t id);
/**
* Explicitly terminate the worker threads. Scheduler must not be used after this.
......@@ -68,7 +68,6 @@ class scheduler {
private:
static void work_thread_main_loop();
void work_thread_work_section();
thread_state &thread_state_for(size_t id);
const unsigned int num_threads_;
const bool reuse_thread_;
......
......@@ -7,7 +7,9 @@
#include "context_switcher/context_switcher.h"
#include "context_switcher/continuation.h"
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/helpers/profiler.h"
namespace pls {
......@@ -25,10 +27,12 @@ class scheduler::init_function_impl : public init_function {
void run() override {
auto &thread_state = thread_state::get();
thread_state.get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) {
thread_state.set_main_continuation(std::move(cont));
function_();
return std::move(cont);
thread_state.get_scheduler().work_section_done_.store(true);
return std::move(thread_state.get_main_continuation());
});
thread_state.get_scheduler().work_section_done_.store(true);
}
private:
F &function_;
......@@ -55,6 +59,11 @@ void scheduler::perform_work(Function work_section) {
}
}
template<typename Function>
void scheduler::spawn(Function &&lambda) {
thread_state::get().get_task_manager().spawn_child(std::forward<Function>(lambda));
}
}
}
}
......
#ifndef PLS_SCHEDULER_MEMORY_H
#define PLS_SCHEDULER_MEMORY_H
#include <vector>
#include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/thread_state_static.h"
namespace pls {
namespace internal {
namespace scheduling {
// TODO: This way to handle memory is kind of a mess. We reworked it once...maybe it needs a second visit
// especially with the new 'stagered stack' (mmap allocs with page faults would be nice).
class scheduler_memory {
// Note: scheduler_memory is a pure interface and has no data.
// By not having an initialization routine we can do our 'static and heap specialization'
......@@ -27,70 +26,6 @@ class scheduler_memory {
}
};
template<size_t MAX_THREADS, size_t NUM_TASKS, size_t STACK_SIZE>
class static_scheduler_memory : public scheduler_memory {
public:
static_scheduler_memory() : scheduler_memory{} {
for (size_t i = 0; i < MAX_THREADS; i++) {
thread_state_pointers_[i] = &thread_states_[i].get_thread_state();
}
thread_states_array_ = thread_state_pointers_.data();
}
size_t max_threads() const override {
return MAX_THREADS;
}
base::thread &thread_for(size_t id) override {
return threads_[id];
}
private:
using thread_state_type = thread_state_static<NUM_TASKS, STACK_SIZE>;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<base::thread, MAX_THREADS> threads_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state_type, MAX_THREADS> thread_states_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state *, MAX_THREADS> thread_state_pointers_;
};
template<size_t NUM_TASKS, size_t STACK_SIZE>
class heap_scheduler_memory : public scheduler_memory {
public:
explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads},
thread_vector_{},
thread_state_vector_{},
thread_state_pointers_{} {
thread_vector_.reserve(max_threads);
thread_state_vector_.reserve(max_threads);
for (size_t i = 0; i < max_threads; i++) {
thread_vector_.emplace_back();
thread_state_vector_.emplace_back();
thread_state_pointers_.emplace_back(&thread_state_vector_[i].get_thread_state());
}
thread_states_array_ = thread_state_pointers_.data();
}
size_t max_threads() const override {
return max_threads_;
}
base::thread &thread_for(size_t id) override {
return thread_vector_[id];
}
private:
using thread_state_type = thread_state_static<NUM_TASKS, STACK_SIZE>;
// thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require
// the new operator to obey alignments bigger than 16, cache lines are usually 64).
// To allow this object to be allocated using 'new' (which the vector does internally),
// we need to wrap it in an non aligned object.
using thread_state_wrapper = base::alignment::cache_alignment_wrapper<thread_state_type>;
size_t max_threads_;
std::vector<base::thread> thread_vector_;
std::vector<thread_state_wrapper> thread_state_vector_;
std::vector<thread_state *> thread_state_pointers_;
};
}
}
}
......
#ifndef PLS_STATIC_SCHEDULER_MEMORY_H
#define PLS_STATIC_SCHEDULER_MEMORY_H
#include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/thread_state_static.h"
namespace pls {
namespace internal {
namespace scheduling {
template<size_t MAX_THREADS, size_t NUM_TASKS, size_t STACK_SIZE>
class static_scheduler_memory : public scheduler_memory {
public:
static_scheduler_memory() : scheduler_memory{} {
for (size_t i = 0; i < MAX_THREADS; i++) {
thread_state_pointers_[i] = &thread_states_[i].get_thread_state();
}
thread_states_array_ = thread_state_pointers_.data();
}
size_t max_threads() const override {
return MAX_THREADS;
}
base::thread &thread_for(size_t id) override {
return threads_[id];
}
private:
using thread_state_type = thread_state_static<NUM_TASKS, STACK_SIZE>;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<base::thread, MAX_THREADS> threads_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state_type, MAX_THREADS> thread_states_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state *, MAX_THREADS> thread_state_pointers_;
};
}
}
}
#endif // PLS_STATIC_SCHEDULER_MEMORY_H
......@@ -8,7 +8,7 @@
#include "context_switcher/context_switcher.h"
#include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/traded_cas_field.h"
namespace pls {
......@@ -36,34 +36,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task {
thread_id_ = thread_id;
}
unsigned get_thread_id() const {
return thread_id_;
}
void set_thread_id(unsigned thread_id) {
thread_id_ = thread_id;
}
task *get_prev() const {
return prev_;
}
void set_prev(task *prev) {
prev_ = prev;
}
task *get_next() const {
return next_;
}
void set_next(task *next) {
next_ = next;
}
task *get_parent_task() const {
return parent_task_;
}
void set_parent_task(task *parent_task) {
parent_task_ = parent_task;
}
context_switcher::continuation get_continuation() {
return std::move(continuation_);
}
......@@ -76,7 +48,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task {
return context_switcher::enter_context(stack_memory_, stack_size_, std::forward<F>(lambda));
}
// TODO: Proper access control
// TODO: Proper access control and split it up into responsibilities
// Stack/Continuation Management
char *stack_memory_;
size_t stack_size_;
......@@ -84,6 +56,8 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task {
// Work-Stealing
std::atomic<traded_cas_field> traded_field_{};
task *resource_stack_next_{};
std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}};
// Task Tree (we have a parent that we want to continue when we finish)
task *parent_task_;
......
......@@ -6,8 +6,6 @@
#include <utility>
#include <array>
#include "context_switcher/continuation.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/external_trading_deque.h"
......@@ -27,28 +25,18 @@ class task_manager {
data_structures::aligned_stack static_stack_space,
size_t num_tasks,
size_t stack_size,
external_trading_deque &deque) : num_tasks_{num_tasks},
this_thread_tasks_{tasks},
active_task_{&tasks[0]},
deque_{deque} {
for (size_t i = 0; i < num_tasks - 1; i++) {
tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0);
if (i > 0) {
tasks[i].set_prev(&tasks[i - 1]);
}
if (i < num_tasks - 2) {
tasks[i].set_next(&tasks[i + 1]);
}
}
}
external_trading_deque &deque);
task &get_this_thread_task(size_t depth) {
return this_thread_tasks_[depth];
void push_resource_on_task(task *target_task, task *spare_task_chain);
task* pop_resource_from_task(task *target_task);
task *get_this_thread_task(size_t depth) {
return &this_thread_tasks_[depth];
}
void set_thread_id(unsigned id) {
for (size_t i = 0; i < num_tasks_; i++) {
this_thread_tasks_[i].set_thread_id(id);
this_thread_tasks_[i].thread_id_ = id;
}
}
......@@ -57,30 +45,7 @@ class task_manager {
}
template<typename F>
void spawn_child(F &&lambda) {
// TODO: Here is some potential for optimization. We could try placing everything manually on the stack.
active_task_->get_next()->run_as_task([lambda, this](context_switcher::continuation cont) {
auto *last_task = active_task_;
auto *this_task = active_task_->get_next();
last_task->set_continuation(std::move(cont));
active_task_ = this_task;
traded_cas_field expected_cas_value = deque_.push_bot(active_task_);
traded_cas_field empty_cas;
lambda();
if (active_task_->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) {
active_task_ = last_task;
deque_.popped_bot();
return std::move(last_task->get_continuation());
} else {
deque_.empty_deque();
PLS_ERROR("Slow Path/Stealing not implemented!");
}
});
}
void spawn_child(F &&lambda);
private:
size_t num_tasks_;
......@@ -113,4 +78,6 @@ class static_task_manager {
}
}
}
#include "task_manager_impl.h"
#endif //PLS_TASK_MANAGER_H_
#ifndef PLS_TASK_MANAGER_IMPL_H_
#define PLS_TASK_MANAGER_IMPL_H_
#include <memory>
#include <utility>
#include <array>
#include "context_switcher/continuation.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename F>
void task_manager::spawn_child(F &&lambda) {
auto continuation = active_task_->next_->run_as_task([lambda, this](context_switcher::continuation cont) {
auto *last_task = active_task_;
auto *this_task = active_task_->next_;
last_task->continuation_ = std::move(cont);
active_task_ = this_task;
traded_cas_field expected_cas_value = deque_.push_bot(active_task_);
traded_cas_field empty_cas;
lambda();
if (active_task_->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) {
// Fast path, simply continue execution where we left of before spawn.
// This requires no coordination with the resource stack.
active_task_ = last_task;
deque_.popped_bot();
return std::move(last_task->continuation_);
} else {
// Slow path, the continuation was stolen.
// First empty our own deque (everything below must have been stolen already).
deque_.empty_deque();
// TODO: This whole process can be taken out into a function, no need to copy it in every lambda.
// Also split smaller functions out, this is a mess right now...
// Try to get a clean resource chain to go back to the main stealing loop
task *clean_chain = pop_resource_from_task(last_task);
if (clean_chain == nullptr) {
// double-check if we are really last one or we only have unlucky timing
auto cas_field = last_task->traded_field_.load();
if (cas_field.is_filled_with_object()) {
traded_cas_field empty_target;
if (last_task->traded_field_.compare_exchange_strong(cas_field, empty_target)) {
clean_chain = cas_field.get_trade_object();
} else {
clean_chain = pop_resource_from_task(last_task);
}
}
}
if (clean_chain != nullptr) {
// We got a clean chain to continue working on.
PLS_ASSERT(active_task_->prev_->depth_ == clean_chain->depth_,
"Resources must only reside in the correct depth!");
active_task_->prev_ = clean_chain;
// Walk back chain to make first task active
while (active_task_->prev_ != nullptr) {
active_task_ = active_task_->prev_;
}
// jump back to continuation in main scheduling loop, time to steal some work
return std::move(thread_state::get().get_main_continuation());
} else {
// We are the last one working on this task. Thus the sync must be finished, continue working.
active_task_ = last_task;
return std::move(last_task->continuation_);
}
}
PLS_ERROR("Slow Path/Stealing not implemented!");
});
if (continuation.valid()) {
// We jumped in here from the main loop, keep track!
thread_state::get().set_main_continuation(std::move(continuation));
}
}
}
}
}
#endif //PLS_TASK_MANAGER_IMPL_H_
......@@ -4,14 +4,16 @@
#include <random>
#include <chrono>
#include <utility>
#include "pls/internal/scheduling/task_manager.h"
#include "context_switcher/continuation.h"
namespace pls {
namespace internal {
namespace scheduling {
class scheduler;
class task_manager;
struct task;
struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
......@@ -21,6 +23,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
task_manager &task_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) context_switcher::continuation main_loop_continuation_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
public:
......@@ -43,7 +46,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
unsigned get_id() { return id_; }
void set_id(unsigned id) {
id_ = id;
task_manager_.set_thread_id(id);
}
task_manager &get_task_manager() { return task_manager_; }
scheduler &get_scheduler() { return *scheduler_; }
......@@ -54,6 +56,13 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
return random_();
}
void set_main_continuation(context_switcher::continuation &&continuation) {
main_loop_continuation_ = std::move(continuation);
}
context_switcher::continuation get_main_continuation() {
return std::move(main_loop_continuation_);
}
// Do not allow move/copy operations.
// State is a pure memory container with references/pointers into it from all over the code.
// It should be allocated, used and de-allocated, nothing more.
......
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/base/thread.h"
#include "pls/internal/base/error_handling.h"
#include "pls/internal/helpers/profiler.h"
namespace pls {
namespace internal {
......@@ -23,6 +24,7 @@ scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, b
// Placement new is required, as the memory of `memory_` is not required to be initialized.
memory.thread_state_for(i).set_scheduler(this);
memory.thread_state_for(i).set_id(i);
memory.thread_state_for(i).get_task_manager().set_thread_id(i);
if (reuse_thread && i == 0) {
continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
......
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment