Commit 842b518f by FritzFlorian

WIP: First implementation of serial/fast path.

This showcases the expected performance when a task executes a sub-tree without inference from other threads. We target to stay about 6x slower than a normal function call.
parent 39d2fbd8
Pipeline #1335 failed with stages
in 28 seconds
add_executable(playground main.cpp) add_executable(playground main.cpp)
# Example for adding the library to your app (as a cmake project dependency) # Example for adding the library to your app (as a cmake project dependency)
target_link_libraries(playground pls) target_link_libraries(playground pls)
// Headers are available because we added the pls target #include <iostream>
const long NUM_THREADS = 8; #include <chrono>
const long MEMORY_PER_THREAD = 2u << 12u;
#include "pls/pls.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/scheduler_memory.h"
pls::static_scheduler_memory<NUM_THREADS, MEMORY_PER_THREAD> memory; using namespace pls::internal;
constexpr size_t NUM_THREADS = 1;
constexpr size_t NUM_TASKS = 64;
constexpr size_t MAX_TASK_STACK_SIZE = 0;
constexpr size_t NUM_CONTS = 64;
constexpr size_t MAX_CONT_SIZE = 128;
scheduling::parallel_result<int> fib(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return scheduling::scheduler::par([=]() {
return fib(n - 1);
}, [=]() {
return fib(n - 2);
}).then([](int a, int b) {
return a + b;
});
}
int fib_normal(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib_normal(n - 1) + fib_normal(n - 2);
}
int main() { int main() {
pls::scheduler scheduler{&memory, NUM_THREADS}; scheduling::static_scheduler_memory<NUM_THREADS,
NUM_TASKS,
MAX_TASK_STACK_SIZE,
NUM_CONTS,
MAX_CONT_SIZE> static_scheduler_memory;
scheduler.perform_work([]() { scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS};
auto lambda = []() {
// Do work
};
using lambda_task = pls::lambda_task_by_value<decltype(lambda)>;
pls::scheduler::spawn_child<lambda_task>(lambda); auto start = std::chrono::steady_clock::now();
pls::scheduler::spawn_child<lambda_task>(lambda); std::cout << "fib = " << fib_normal(41) << std::endl;
auto end = std::chrono::steady_clock::now();
std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< std::endl;
pls::scheduler::wait_for_all(); start = std::chrono::steady_clock::now();
scheduler.perform_work([]() {
return scheduling::scheduler::par([]() {
return fib(41);
}, []() {
return scheduling::parallel_result<int>{0};
}).then([](int a, int b) {
std::cout << "fib = " << a << std::endl;
});
}); });
scheduler.terminate(); end = std::chrono::steady_clock::now();
std::cout << "Framework: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << std::endl;
return 0; return 0;
} }
# List all required files here (cmake best practice to NOT automate this step!) # List all required files here (cmake best practice to NOT automate this step!)
add_library(pls STATIC add_library(pls STATIC
include/pls/pls.h src/pls.cpp # include/pls/pls.h src/pls.cpp
#
include/pls/algorithms/invoke.h # include/pls/algorithms/invoke.h
include/pls/algorithms/invoke_impl.h # include/pls/algorithms/invoke_impl.h
include/pls/algorithms/for_each.h # include/pls/algorithms/for_each.h
include/pls/algorithms/for_each_impl.h # include/pls/algorithms/for_each_impl.h
include/pls/algorithms/scan.h # include/pls/algorithms/scan.h
include/pls/algorithms/scan_impl.h # include/pls/algorithms/scan_impl.h
#
include/pls/dataflow/dataflow.h # include/pls/dataflow/dataflow.h
include/pls/dataflow/internal/inputs.h # include/pls/dataflow/internal/inputs.h
include/pls/dataflow/internal/outputs.h # include/pls/dataflow/internal/outputs.h
include/pls/dataflow/internal/token.h # include/pls/dataflow/internal/token.h
include/pls/dataflow/internal/in_port.h # include/pls/dataflow/internal/in_port.h
include/pls/dataflow/internal/out_port.h # include/pls/dataflow/internal/out_port.h
include/pls/dataflow/internal/function_node.h # include/pls/dataflow/internal/function_node.h
include/pls/dataflow/internal/node.h # include/pls/dataflow/internal/node.h
include/pls/dataflow/internal/graph.h # include/pls/dataflow/internal/graph.h
include/pls/dataflow/internal/build_state.h # include/pls/dataflow/internal/build_state.h
include/pls/dataflow/internal/function_node_impl.h # include/pls/dataflow/internal/function_node_impl.h
include/pls/dataflow/internal/graph_impl.h # include/pls/dataflow/internal/graph_impl.h
include/pls/dataflow/internal/switch_node.h # include/pls/dataflow/internal/switch_node.h
include/pls/dataflow/internal/merge_node.h # include/pls/dataflow/internal/merge_node.h
include/pls/dataflow/internal/split_node.h # include/pls/dataflow/internal/split_node.h
include/pls/internal/base/spin_lock.h include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
...@@ -34,7 +34,7 @@ add_library(pls STATIC ...@@ -34,7 +34,7 @@ add_library(pls STATIC
include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/barrier.h src/internal/base/barrier.cpp
include/pls/internal/base/system_details.h include/pls/internal/base/system_details.h
include/pls/internal/base/error_handling.h include/pls/internal/base/error_handling.h
include/pls/internal/base/alignment.h include/pls/internal/base/alignment_impl.h include/pls/internal/base/alignment.h
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/aligned_stack_impl.h
...@@ -56,7 +56,7 @@ add_library(pls STATIC ...@@ -56,7 +56,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task_manager.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/cont_manager.h include/pls/internal/scheduling/cont_manager.h
include/pls/internal/scheduling/continuation.h) include/pls/internal/scheduling/continuation.h include/pls/internal/scheduling/parallel_result.h src/internal/base/alignment.cpp)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
......
...@@ -13,8 +13,8 @@ namespace internal { ...@@ -13,8 +13,8 @@ namespace internal {
namespace base { namespace base {
namespace alignment { namespace alignment {
constexpr system_details::pointer_t next_alignment(system_details::pointer_t size); system_details::pointer_t next_alignment(system_details::pointer_t size);
constexpr system_details::pointer_t previous_alignment(system_details::pointer_t size); system_details::pointer_t previous_alignment(system_details::pointer_t size);
char *next_alignment(char *pointer); char *next_alignment(char *pointer);
/** /**
...@@ -43,9 +43,6 @@ struct alignas(system_details::CACHE_LINE_SIZE) cache_alignment_wrapper { ...@@ -43,9 +43,6 @@ struct alignas(system_details::CACHE_LINE_SIZE) cache_alignment_wrapper {
T *pointer() { return reinterpret_cast<T *>(data_); } T *pointer() { return reinterpret_cast<T *>(data_); }
}; };
void *allocate_aligned(size_t size);
#include "alignment_impl.h"
} }
} }
} }
......
#ifndef PLS_INTERNAL_BASE_ALIGNMENT_IMPL_H_
#define PLS_INTERNAL_BASE_ALIGNMENT_IMPL_H_
namespace pls {
namespace internal {
namespace base {
namespace alignment {
void *allocate_aligned(size_t size) {
return aligned_alloc(system_details::CACHE_LINE_SIZE, size);
}
constexpr system_details::pointer_t next_alignment(system_details::pointer_t size) {
return (size % system_details::CACHE_LINE_SIZE) == 0 ?
size :
size + (system_details::CACHE_LINE_SIZE - (size % system_details::CACHE_LINE_SIZE));
}
constexpr system_details::pointer_t previous_alignment(system_details::pointer_t size) {
return (size % system_details::CACHE_LINE_SIZE) == 0 ?
size :
size - (size % system_details::CACHE_LINE_SIZE);
}
char *next_alignment(char *pointer) {
return reinterpret_cast<char *>(next_alignment(reinterpret_cast<system_details::pointer_t >(pointer)));
}
}
}
}
}
#endif //PLS_INTERNAL_BASE_ALIGNMENT_IMPL_H_
...@@ -41,13 +41,13 @@ class aligned_stack { ...@@ -41,13 +41,13 @@ class aligned_stack {
template<typename T, typename ...ARGS> template<typename T, typename ...ARGS>
T *push(ARGS &&... args); T *push(ARGS &&... args);
template<typename T> template<typename T>
void *push_bytes(); char *push_bytes();
void *push_bytes(size_t size); char *push_bytes(size_t size);
template<typename T> template<typename T>
void pop(); void pop();
void *memory_at_offset(stack_offset offset) const; char *memory_at_offset(stack_offset offset) const;
stack_offset save_offset() const { return current_offset_; } stack_offset save_offset() const { return current_offset_; }
void reset_offset(stack_offset new_offset) { current_offset_ = new_offset; } void reset_offset(stack_offset new_offset) { current_offset_ = new_offset; }
...@@ -74,14 +74,21 @@ class static_aligned_stack { ...@@ -74,14 +74,21 @@ class static_aligned_stack {
class heap_aligned_stack { class heap_aligned_stack {
public: public:
explicit heap_aligned_stack(size_t size); explicit heap_aligned_stack(size_t size) :
~heap_aligned_stack(); unaligned_memory_size_{base::alignment::next_alignment(size)},
unaligned_memory_pointer_{new char[unaligned_memory_size_]},
aligned_stack_{unaligned_memory_pointer_, size, unaligned_memory_size_} {}
~heap_aligned_stack() {
delete[] unaligned_memory_pointer_;
}
aligned_stack &get_stack() { return aligned_stack_; } aligned_stack &get_stack() { return aligned_stack_; }
private: private:
aligned_stack aligned_stack_; size_t unaligned_memory_size_;
char *unaligned_memory_pointer_; char *unaligned_memory_pointer_;
aligned_stack aligned_stack_;
}; };
} }
......
...@@ -14,7 +14,7 @@ T *aligned_stack::push(ARGS &&... args) { ...@@ -14,7 +14,7 @@ T *aligned_stack::push(ARGS &&... args) {
} }
template<typename T> template<typename T>
void *aligned_stack::push_bytes() { char *aligned_stack::push_bytes() {
return push_bytes(sizeof(T)); return push_bytes(sizeof(T));
} }
...@@ -28,15 +28,7 @@ void aligned_stack::pop() { ...@@ -28,15 +28,7 @@ void aligned_stack::pop() {
} }
template<size_t SIZE> template<size_t SIZE>
static_aligned_stack<SIZE>::static_aligned_stack(): memory_{}, aligned_stack_{memory_.data()} {}; static_aligned_stack<SIZE>::static_aligned_stack(): memory_{}, aligned_stack_{memory_.data(), SIZE} {};
heap_aligned_stack::heap_aligned_stack(size_t size) :
unaligned_memory_pointer_{new char[base::alignment::next_alignment(size)]},
aligned_stack_{unaligned_memory_pointer_, size, base::alignment::next_alignment(size)} {}
heap_aligned_stack::~heap_aligned_stack() {
delete[] unaligned_memory_pointer_;
}
} }
} }
......
...@@ -22,14 +22,22 @@ template<typename T> ...@@ -22,14 +22,22 @@ template<typename T>
class delayed_initialization_wrapper { class delayed_initialization_wrapper {
public: public:
delayed_initialization_wrapper() : memory_{}, initialized_{false} {} delayed_initialization_wrapper() : memory_{}, initialized_{false} {}
delayed_initialization_wrapper(delayed_initialization_wrapper &&other) noexcept {
initialized_ = other.initialized_;
if (other.initialized_) {
object() = std::move(other.object());
other.initialized_ = false;
}
}
template<typename ...ARGS> template<typename ...ARGS>
explicit delayed_initialization_wrapper(ARGS &&...args): memory_{}, initialized_{true} { explicit delayed_initialization_wrapper(ARGS &&...args): memory_{}, initialized_{true} {
new(memory_) T(std::forward<ARGS>(args)...); new(memory_.data()) T(std::forward<ARGS>(args)...);
} }
~delayed_initialization_wrapper() { ~delayed_initialization_wrapper() {
if (initialized_) { if (initialized_) {
memory_->~T(); object().~T();
} }
} }
...@@ -37,22 +45,24 @@ class delayed_initialization_wrapper { ...@@ -37,22 +45,24 @@ class delayed_initialization_wrapper {
void initialize(ARGS &&...args) { void initialize(ARGS &&...args) {
PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!") PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!")
new(memory_) T(std::forward<ARGS>(args)...); new(memory_.data()) T(std::forward<ARGS>(args)...);
initialized_ = true; initialized_ = true;
} }
void destroy() { void destroy() {
PLS_ASSERT(initialized_, "Can only destroy initialized objects!") PLS_ASSERT(initialized_, "Can only destroy initialized objects!")
memory_->~T(); object().~T();
initialized_ = false; initialized_ = false;
} }
T &object() { T &object() {
PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!") PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!")
return *reinterpret_cast<T *>(memory_); return *reinterpret_cast<T *>(memory_.data());
} }
bool initialized() const { return initialized_; }
private: private:
std::array<char, sizeof(T)> memory_; std::array<char, sizeof(T)> memory_;
bool initialized_; bool initialized_;
......
...@@ -15,8 +15,13 @@ namespace scheduling { ...@@ -15,8 +15,13 @@ namespace scheduling {
class cont_manager { class cont_manager {
public: public:
// Helper to pass the compile time constants to the constructor.
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE> template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
explicit cont_manager(data_structures::aligned_stack &cont_storage) { struct template_args {};
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args<NUM_CONTS, MAX_CONT_SIZE>) {
// First node is currently active and our local start // First node is currently active and our local start
start_node_ = active_node_ = init_cont_node<MAX_CONT_SIZE>(cont_storage, nullptr, nullptr); start_node_ = active_node_ = init_cont_node<MAX_CONT_SIZE>(cont_storage, nullptr, nullptr);
...@@ -24,7 +29,8 @@ class cont_manager { ...@@ -24,7 +29,8 @@ class cont_manager {
continuation_node *current_node = start_node_; continuation_node *current_node = start_node_;
for (size_t i = 1; i < NUM_CONTS; i++) { for (size_t i = 1; i < NUM_CONTS; i++) {
continuation_node *next_node = init_cont_node<MAX_CONT_SIZE>(cont_storage, start_node_, current_node); continuation_node *next_node = init_cont_node<MAX_CONT_SIZE>(cont_storage, start_node_, current_node);
current_node->set_prev(next_node); current_node->set_next(next_node);
current_node = next_node;
} }
}; };
...@@ -42,8 +48,8 @@ class cont_manager { ...@@ -42,8 +48,8 @@ class cont_manager {
continuation_node *cont_chain_start, continuation_node *cont_chain_start,
continuation_node *prev) { continuation_node *prev) {
// Represents one cont node and its corresponding memory buffer (as one continuous block of memory). // Represents one cont node and its corresponding memory buffer (as one continuous block of memory).
using cont_node_memory_pair = std::pair<continuation_node, constexpr size_t buffer_size = MAX_CONT_SIZE - sizeof(continuation_node);
std::array<char, MAX_CONT_SIZE - sizeof(continuation_node)>>; using cont_node_memory_pair = std::pair<continuation_node, std::array<char, buffer_size>>;
char *pair_memory = cont_storage.push_bytes<cont_node_memory_pair>(); char *pair_memory = cont_storage.push_bytes<cont_node_memory_pair>();
char *cont_node_address = pair_memory; char *cont_node_address = pair_memory;
char *cont_node_memory_address = pair_memory + sizeof(continuation_node); char *cont_node_memory_address = pair_memory + sizeof(continuation_node);
...@@ -59,7 +65,9 @@ class cont_manager { ...@@ -59,7 +65,9 @@ class cont_manager {
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE> template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
class static_cont_manager { class static_cont_manager {
public: public:
static_cont_manager() : static_cont_storage_{}, cont_manager_{NUM_CONTS, MAX_CONT_SIZE, static_cont_storage_} {} static_cont_manager()
: static_cont_storage_{},
cont_manager_(static_cont_storage_.get_stack(), cont_manager::template_args<NUM_CONTS, MAX_CONT_SIZE>{}) {}
cont_manager &get_cont_manager() { return cont_manager_; } cont_manager &get_cont_manager() { return cont_manager_; }
private: private:
......
...@@ -7,8 +7,6 @@ ...@@ -7,8 +7,6 @@
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h" #include "pls/internal/base/barrier.h"
...@@ -28,17 +26,6 @@ namespace scheduling { ...@@ -28,17 +26,6 @@ namespace scheduling {
* It works in close rellation with the 'task' class for scheduling. * It works in close rellation with the 'task' class for scheduling.
*/ */
class scheduler { class scheduler {
friend class task;
const unsigned int num_threads_;
const bool reuse_thread_;
scheduler_memory *memory_;
base::barrier sync_barrier_;
task *main_thread_root_task_;
std::atomic<bool> work_section_done_;
bool terminated_;
public: public:
/** /**
* Initializes a scheduler instance with the given number of threads. * Initializes a scheduler instance with the given number of threads.
...@@ -48,7 +35,7 @@ class scheduler { ...@@ -48,7 +35,7 @@ class scheduler {
* @param memory All memory is allocated statically, thus the user is required to provide the memory instance. * @param memory All memory is allocated statically, thus the user is required to provide the memory instance.
* @param num_threads The number of worker threads to be created. * @param num_threads The number of worker threads to be created.
*/ */
explicit scheduler(scheduler_memory *memory, unsigned int num_threads, bool reuse_thread = true); explicit scheduler(scheduler_memory &memory, unsigned int num_threads, bool reuse_thread = true);
/** /**
* The scheduler is implicitly terminated as soon as it leaves the scope. * The scheduler is implicitly terminated as soon as it leaves the scope.
...@@ -72,41 +59,39 @@ class scheduler { ...@@ -72,41 +59,39 @@ class scheduler {
void terminate(); void terminate();
/** /**
* Helper to spawn a child on the currently running task. * Temporary object used for the parallel(...).then(...) API.
*
* @tparam T type of the new task
* @tparam ARGS Constructor argument types
* @param args constructor arguments
*/ */
template<typename T, typename ...ARGS> template<typename F1, typename F2>
static void spawn_child(ARGS &&... args); struct starter;
/** template<typename F1, typename F2>
* Helper to spawn a child on the currently running task and waiting for it (skipping over the task-deque). starter<F1, F2> invoke_parallel(F1 &&function_1, F2 &&function_2);
*
* @tparam T type of the new task
* @tparam ARGS Constructor argument types
* @param args constructor arguments
*/
template<typename T, typename ...ARGS>
static void spawn_child_and_wait(ARGS &&... args);
/** template<typename F1, typename F2>
* Helper to wait for all children of the currently executing task. static starter<F1, F2> par(F1 &&function_1, F2 &&function_2);
*/
static void wait_for_all();
unsigned int num_threads() const { return num_threads_; } unsigned int num_threads() const { return num_threads_; }
private: private:
static void worker_routine(); static void work_thread_main_loop();
thread_state *thread_state_for(size_t id); void work_thread_work_section();
thread_state &thread_state_for(size_t id);
task *get_local_task(); friend class base_task;
task *steal_task(); const unsigned int num_threads_;
const bool reuse_thread_;
scheduler_memory &memory_;
bool try_execute_local(); base::barrier sync_barrier_;
bool try_execute_stolen();
class init_function;
template<typename F>
class init_function_impl;
init_function *main_thread_starter_function_;
std::atomic<bool> work_section_done_;
bool terminated_;
}; };
} }
......
...@@ -2,36 +2,105 @@ ...@@ -2,36 +2,105 @@
#ifndef PLS_SCHEDULER_IMPL_H #ifndef PLS_SCHEDULER_IMPL_H
#define PLS_SCHEDULER_IMPL_H #define PLS_SCHEDULER_IMPL_H
#include "pls/internal/scheduling/lambda_task.h" #include <utility>
#include "pls/internal/scheduling/continuation.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
// TODO: generally look into the performance implications of using many thread_state::get() calls template<typename F1, typename F2>
struct scheduler::starter {
F1 function_1_;
F2 function_2_;
using return_type_1 = decltype(function_1_());
using return_type_2 = decltype(function_2_());
template<typename F1ARG, typename F2ARG>
explicit starter(F1ARG &&function_1, F2ARG &&function_2) : function_1_{std::forward<F1ARG>(function_1)},
function_2_{std::forward<F2ARG>(function_2)} {};
template<typename F>
auto then(F &&cont_function)
-> decltype(cont_function(std::declval<typename return_type_1::value_type>(),
std::declval<typename return_type_2::value_type>())) {
using continuation_type = continuation<return_type_2, return_type_2, F>;
auto &my_state = thread_state::get();
// Select current continuation
auto *current_cont = my_state.get_cont_manager().fast_path_get_next();
// Move function_2 directly onto the task stack (should in theory be constructed on there).
// Consider only copying it later on, as this could allow a more efficient fast path with inlining.
task<F2, return_type_1, return_type_2, F> task_2{function_2_, current_cont};
my_state.get_task_manager().publish_task(task_2);
// Call first function
auto result_1 = function_1_();
// Try to pop from stack
if (my_state.get_task_manager().steal_local_task()) {
my_state.get_cont_manager().fast_path_return();
auto result_2 = function_2_();
return cont_function(result_1.value(), result_2.value());
} else {
PLS_ERROR("Slow Path Not Implemented!!!")
}
};
};
template<typename F1, typename F2>
scheduler::starter<F1, F2> scheduler::invoke_parallel(F1 &&function_1, F2 &&function_2) {
return scheduler::starter<F1, F2>{std::forward<F1>(function_1), std::forward<F2>(function_2)};
}
template<typename F1, typename F2>
scheduler::starter<F1, F2> scheduler::par(F1 &&function_1, F2 &&function_2) {
return thread_state::get().get_scheduler().invoke_parallel(std::forward<F1>(function_1),
std::forward<F2>(function_2));
}
class scheduler::init_function {
public:
virtual parallel_result<int> run() = 0;
};
template<typename F>
class scheduler::init_function_impl : public init_function {
public:
explicit init_function_impl(F &function) : function_{function} {}
parallel_result<int> run() override {
return scheduler::par([]() {
// No-op
return parallel_result<int>{0};
}, [=]() {
function_();
return parallel_result<int>{0};
}).then([](const int &, const int &) {
// Notify that work is done after finishing the last user continuation.
thread_state::get().get_scheduler().work_section_done_ = true;
return parallel_result<int>{0};
});
}
private:
F &function_;
};
template<typename Function> template<typename Function>
void scheduler::perform_work(Function work_section) { void scheduler::perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work") PROFILE_WORK_BLOCK("scheduler::perform_work")
// Prepare main root task // Prepare main root task
lambda_task_by_reference<Function> root_task{work_section}; init_function_impl<Function> starter_function{work_section};
main_thread_root_task_ = &root_task; main_thread_starter_function_ = &starter_function;
work_section_done_ = false;
work_section_done_ = false;
if (reuse_thread_) { if (reuse_thread_) {
// TODO: See if we should change thread-states to not make our state override the current thread state auto &my_state = memory_.thread_state_for(0);
auto my_state = memory_->thread_state_for(0); base::this_thread::set_state(&my_state); // Make THIS THREAD become the main worker
base::this_thread::set_state(my_state); // Make THIS THREAD become the main worker
sync_barrier_.wait(); // Trigger threads to wake up sync_barrier_.wait(); // Trigger threads to wake up
work_thread_work_section(); // Simply also perform the work section on the main loop
// Do work (see if we can remove this duplicated code)
root_task.parent_ = nullptr;
root_task.deque_offset_ = my_state->deque_.save_offset();
root_task.execute();
work_section_done_ = true;
sync_barrier_.wait(); // Wait for threads to finish sync_barrier_.wait(); // Wait for threads to finish
} else { } else {
// Simply trigger the others to do the work, this thread will sleep/wait for the time being // Simply trigger the others to do the work, this thread will sleep/wait for the time being
...@@ -40,16 +109,6 @@ void scheduler::perform_work(Function work_section) { ...@@ -40,16 +109,6 @@ void scheduler::perform_work(Function work_section) {
} }
} }
template<typename T, typename ...ARGS>
void scheduler::spawn_child(ARGS &&... args) {
thread_state::get()->current_task_->spawn_child<T>(std::forward<ARGS>(args)...);
}
template<typename T, typename ...ARGS>
void scheduler::spawn_child_and_wait(ARGS &&... args) {
thread_state::get()->current_task_->spawn_child_and_wait<T>(std::forward<ARGS>(args)...);
}
} }
} }
} }
......
...@@ -23,8 +23,8 @@ class scheduler_memory { ...@@ -23,8 +23,8 @@ class scheduler_memory {
// this should not add too much overhead. // this should not add too much overhead.
public: public:
virtual size_t max_threads() const = 0; virtual size_t max_threads() const = 0;
virtual base::thread &thread_for(size_t id) const = 0; virtual base::thread &thread_for(size_t id) = 0;
virtual thread_state &thread_state_for(size_t id) const = 0; virtual thread_state &thread_state_for(size_t id) = 0;
}; };
template<size_t MAX_THREADS, size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE> template<size_t MAX_THREADS, size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
...@@ -34,11 +34,11 @@ class static_scheduler_memory : public scheduler_memory { ...@@ -34,11 +34,11 @@ class static_scheduler_memory : public scheduler_memory {
return MAX_THREADS; return MAX_THREADS;
} }
base::thread &thread_for(size_t id) const override { base::thread &thread_for(size_t id) override {
return threads_[id]; return threads_[id];
} }
thread_state &thread_state_for(size_t id) const override { thread_state &thread_state_for(size_t id) override {
return thread_states_[id].get_thread_state(); return thread_states_[id].get_thread_state();
} }
...@@ -68,11 +68,11 @@ class heap_scheduler_memory : public scheduler_memory { ...@@ -68,11 +68,11 @@ class heap_scheduler_memory : public scheduler_memory {
return max_threads_; return max_threads_;
} }
base::thread &thread_for(size_t id) const override { base::thread &thread_for(size_t id) override {
return thread_vector_[id]; return thread_vector_[id];
} }
thread_state &thread_state_for(size_t id) const override { thread_state &thread_state_for(size_t id) override {
return thread_state_vector_[id].object().get_thread_state(); return thread_state_vector_[id].object().get_thread_state();
} }
......
#ifndef PLS_TASK_H #ifndef PLS_TASK_H
#define PLS_TASK_H #define PLS_TASK_H
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/continuation.h" #include "pls/internal/scheduling/continuation.h"
#include "pls/internal/scheduling/cont_manager.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -16,8 +15,6 @@ namespace scheduling { ...@@ -16,8 +15,6 @@ namespace scheduling {
* Override the execute_internal() method for your custom code. * Override the execute_internal() method for your custom code.
*/ */
class base_task { class base_task {
friend class scheduler;
protected: protected:
base_task() = default; base_task() = default;
...@@ -26,7 +23,7 @@ class base_task { ...@@ -26,7 +23,7 @@ class base_task {
*/ */
virtual void execute_internal() = 0; virtual void execute_internal() = 0;
private: public:
void execute() { void execute() {
// TODO: Figure out slow path execution // TODO: Figure out slow path execution
execute_internal(); execute_internal();
...@@ -35,19 +32,22 @@ class base_task { ...@@ -35,19 +32,22 @@ class base_task {
template<typename F, typename R1, typename R2, typename CF> template<typename F, typename R1, typename R2, typename CF>
class task : public base_task { class task : public base_task {
using continutation_type = continuation<R1, R2, CF>;
public: public:
template<typename FARG> template<typename FARG>
explicit task(FARG &&function, continuation<R1, R2, CF> *continuation) explicit task(FARG &&function, continuation_node *continuation_node)
: base_task{}, function_{std::forward<FARG>(function)}, continuation_{continuation} {} : base_task{}, function_{std::forward<FARG>(function)}, continuation_node_{continuation_node} {}
void execute_internal() override { void execute_internal() override {
continuation_->store_result_2(function_()); auto *continuation = dynamic_cast<continutation_type *>(continuation_node_->continuation());
continuation->store_result_2(function_());
// TODO: Properly notify continuation on slow path // TODO: Properly notify continuation on slow path
} }
private: private:
F function_; F function_;
continuation<R1, R2, CF> *continuation_; continuation_node *continuation_node_;
}; };
} }
......
...@@ -9,7 +9,6 @@ ...@@ -9,7 +9,6 @@
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/data_structures/stamped_integer.h"
#include "task.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -79,7 +78,7 @@ class task_manager { ...@@ -79,7 +78,7 @@ class task_manager {
private: private:
task_handle *task_handle_stack_; task_handle *task_handle_stack_;
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<std::atomic<data_structures::stamped_integer>> head_; alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<data_structures::stamped_integer> head_;
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<unsigned int> tail_; alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<unsigned int> tail_;
alignas(base::system_details::CACHE_LINE_SIZE) unsigned int tail_internal_, stamp_internal_; alignas(base::system_details::CACHE_LINE_SIZE) unsigned int tail_internal_, stamp_internal_;
}; };
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
#include <array> #include <array>
#include <chrono> #include <chrono>
#include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/cont_manager.h" #include "pls/internal/scheduling/cont_manager.h"
...@@ -17,7 +16,7 @@ namespace scheduling { ...@@ -17,7 +16,7 @@ namespace scheduling {
// forward declaration // forward declaration
class scheduler; class scheduler;
class task; class base_task;
struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
scheduler *scheduler_; scheduler *scheduler_;
...@@ -26,7 +25,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { ...@@ -26,7 +25,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
task_manager &task_manager_; task_manager &task_manager_;
cont_manager &cont_manager_; cont_manager &cont_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) base_task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
public: public:
...@@ -46,7 +45,12 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { ...@@ -46,7 +45,12 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
* *
* @return The thread_state of this thread. * @return The thread_state of this thread.
*/ */
static thread_state *get() { return base::this_thread::state<thread_state>(); } static thread_state &get() { return *base::this_thread::state<thread_state>(); }
size_t get_id() { return id_; }
task_manager &get_task_manager() { return task_manager_; }
cont_manager &get_cont_manager() { return cont_manager_; }
scheduler &get_scheduler() { return *scheduler_; }
// Do not allow move/copy operations. // Do not allow move/copy operations.
// State is a pure memory container with references/pointers into it from all over the code. // State is a pure memory container with references/pointers into it from all over the code.
......
...@@ -10,7 +10,7 @@ aligned_stack::aligned_stack(char *memory_pointer, size_t size) : ...@@ -10,7 +10,7 @@ aligned_stack::aligned_stack(char *memory_pointer, size_t size) :
memory_pointer_{memory_pointer}, // MUST be aligned memory_pointer_{memory_pointer}, // MUST be aligned
max_offset_{size / base::system_details::CACHE_LINE_SIZE}, max_offset_{size / base::system_details::CACHE_LINE_SIZE},
current_offset_{0} { current_offset_{0} {
PLS_ASSERT((pointer_t) memory_pointer_ % base::system_details::CACHE_LINE_SIZE != 0, PLS_ASSERT((pointer_t) memory_pointer_ % base::system_details::CACHE_LINE_SIZE == 0,
"Must initialize an aligned_stack with a properly aligned memory region!") "Must initialize an aligned_stack with a properly aligned memory region!")
} }
...@@ -23,20 +23,20 @@ aligned_stack::aligned_stack(char *unaligned_memory_pointer, size_t size, size_t ...@@ -23,20 +23,20 @@ aligned_stack::aligned_stack(char *unaligned_memory_pointer, size_t size, size_t
"Initialized aligned stack with invalid memory configuration!") "Initialized aligned stack with invalid memory configuration!")
} }
void *aligned_stack::memory_at_offset(stack_offset offset) const { char *aligned_stack::memory_at_offset(stack_offset offset) const {
const auto byte_offset = offset * base::system_details::CACHE_LINE_SIZE; const auto byte_offset = offset * base::system_details::CACHE_LINE_SIZE;
return reinterpret_cast<void *>(memory_pointer_ + byte_offset); return reinterpret_cast<char *>(memory_pointer_ + byte_offset);
} }
void *aligned_stack::push_bytes(size_t size) { char *aligned_stack::push_bytes(size_t size) {
size_t round_up_size = base::alignment::next_alignment(size); size_t round_up_size = base::alignment::next_alignment(size);
size_t num_cache_lines = round_up_size / base::system_details::CACHE_LINE_SIZE; size_t num_cache_lines = round_up_size / base::system_details::CACHE_LINE_SIZE;
void *result = memory_at_offset(current_offset_); char *result = memory_at_offset(current_offset_);
// Move head to next aligned position after new object // Move head to next aligned position after new object
current_offset_ += num_cache_lines; current_offset_ += num_cache_lines;
PLS_ASSERT(current_offset_ > max_offset_, PLS_ASSERT(current_offset_ <= max_offset_,
"Tried to allocate object on alligned_stack without sufficient memory!"); "Tried to allocate object on alligned_stack without sufficient memory!");
return result; return result;
......
...@@ -8,26 +8,25 @@ namespace pls { ...@@ -8,26 +8,25 @@ namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads, bool reuse_thread) : scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, bool reuse_thread) :
num_threads_{num_threads}, num_threads_{num_threads},
reuse_thread_{reuse_thread}, reuse_thread_{reuse_thread},
memory_{memory}, memory_{memory},
sync_barrier_{num_threads + 1 - reuse_thread}, sync_barrier_{num_threads + 1 - reuse_thread},
terminated_{false} { terminated_{false} {
if (num_threads_ > memory_->max_threads()) { if (num_threads_ > memory.max_threads()) {
PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
} }
for (unsigned int i = 0; i < num_threads_; i++) { for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized. // Placement new is required, as the memory of `memory_` is not required to be initialized.
new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), memory.thread_state_for(i).scheduler_ = this;
memory_->cont_stack_for(i), i}; memory.thread_state_for(i).id_ = i;
if (reuse_thread && i == 0) { if (reuse_thread && i == 0) {
continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
} }
new((void *) memory_->thread_for(i))base::thread(&scheduler::worker_routine, memory_->thread_state_for(i)); memory.thread_for(i) = base::thread(&scheduler::work_thread_main_loop, &memory_.thread_state_for(i));
} }
} }
...@@ -35,40 +34,40 @@ scheduler::~scheduler() { ...@@ -35,40 +34,40 @@ scheduler::~scheduler() {
terminate(); terminate();
} }
void scheduler::worker_routine() { void scheduler::work_thread_main_loop() {
auto my_state = thread_state::get(); auto &scheduler = thread_state::get().get_scheduler();
auto scheduler = my_state->scheduler_;
while (true) { while (true) {
// Wait to be triggered // Wait to be triggered
scheduler->sync_barrier_.wait(); scheduler.sync_barrier_.wait();
// Check for shutdown // Check for shutdown
if (scheduler->terminated_) { if (scheduler.terminated_) {
return; return;
} }
// Execute work scheduler.work_thread_work_section();
if (my_state->id_ == 0) {
// Main Thread
auto root_task = scheduler->main_thread_root_task_;
root_task->parent_ = nullptr;
root_task->deque_offset_ = my_state->deque_.save_offset();
root_task->execute();
scheduler->work_section_done_ = true;
} else {
// Worker Threads
while (!scheduler->work_section_done_) {
if (!scheduler->try_execute_local()) {
scheduler->try_execute_stolen();
}
}
}
// Sync back with main thread // Sync back with main thread
my_state->scheduler_->sync_barrier_.wait(); scheduler.sync_barrier_.wait();
}
}
void scheduler::work_thread_work_section() {
auto &my_state = thread_state::get();
if (my_state.get_id() == 0) {
// Main Thread, kick of by executing the user's main code block.
main_thread_starter_function_->run();
} }
do {
// TODO: Implement other threads, for now we are happy if it compiles and runs on one thread
// For now we can test without this, as the fast path should never hit this.
// 1) Try Steal
// 2) Copy Over
// 3) Finish Steal
// 4) Execute Local Copy
} while (!work_section_done_);
} }
void scheduler::terminate() { void scheduler::terminate() {
...@@ -83,79 +82,11 @@ void scheduler::terminate() { ...@@ -83,79 +82,11 @@ void scheduler::terminate() {
if (reuse_thread_ && i == 0) { if (reuse_thread_ && i == 0) {
continue; continue;
} }
memory_->thread_for(i)->join(); memory_.thread_for(i).join();
}
}
task *scheduler::get_local_task() {
PROFILE_STEALING("Get Local Task")
return thread_state::get()->deque_.pop_local_task();
}
task *scheduler::steal_task() {
PROFILE_STEALING("Steal Task")
// Data for victim selection
const auto my_state = thread_state::get();
const auto my_id = my_state->id_;
const size_t offset = my_state->random_() % num_threads();
const size_t max_tries = num_threads(); // TODO: Tune this value
bool any_cas_fails_occured = false;
// Current strategy: random start, then round robin from there
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % num_threads();
// Skip our self for stealing
target = ((target == my_id) + target) % num_threads();
auto target_state = thread_state_for(target);
bool cas_fail;
auto result = target_state->deque_.pop_external_task(cas_fail);
any_cas_fails_occured |= cas_fail;
if (result != nullptr) {
return result;
}
// TODO: See if we should backoff here (per missed steal)
}
if (!any_cas_fails_occured) {
// Went through every task and we did not find any work.
// Most likely there is non available right now, yield to other threads.
pls::internal::base::this_thread::yield();
}
return nullptr;
}
bool scheduler::try_execute_local() {
task *local_task = get_local_task();
if (local_task != nullptr) {
local_task->execute();
return true;
} else {
return false;
}
}
bool scheduler::try_execute_stolen() {
task *stolen_task = steal_task();
if (stolen_task != nullptr) {
stolen_task->deque_offset_ = thread_state::get()->deque_.save_offset();
stolen_task->execute();
return true;
} }
return false;
}
void scheduler::wait_for_all() {
thread_state::get()->current_task_->wait_for_all();
} }
thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); } thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); }
} }
} }
......
...@@ -2,9 +2,7 @@ ...@@ -2,9 +2,7 @@
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
#include "pls/internal/scheduling/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/scheduling/data_structures/locking_deque.h"
#include "pls/internal/scheduling/data_structures/work_stealing_deque.h"
#include <mutex> #include <mutex>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment