Commit 83c6e622 by FritzFlorian

Draft of new context switching tasks.

parent 5e0ce1f5
Pipeline #1384 failed with stages
in 31 seconds
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/scheduler_memory.h" #include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/helpers/profiler.h"
using namespace pls::internal::scheduling; using namespace pls::internal::scheduling;
...@@ -14,24 +12,20 @@ using namespace pls::internal::scheduling; ...@@ -14,24 +12,20 @@ using namespace pls::internal::scheduling;
using namespace comparison_benchmarks::base; using namespace comparison_benchmarks::base;
parallel_result<int> pls_fib(int n) { int pls_fib(int n) {
if (n <= 1) { if (n <= 1) {
return parallel_result<int>{1}; return 1;
} }
return scheduler::par([=]() { int a = pls_fib(n - 1);
return pls_fib(n - 1); int b = pls_fib(n - 2);
}, [=]() {
return pls_fib(n - 2); return a + b;
}).then([=](int a, int b) {
return parallel_result<int>{a + b};
});
} }
constexpr int MAX_NUM_THREADS = 8; constexpr int MAX_NUM_THREADS = 1;
constexpr int MAX_NUM_TASKS = 64; constexpr int MAX_NUM_TASKS = 64;
constexpr int MAX_NUM_CONTS = 64; constexpr int MAX_STACK_SIZE = 128;
constexpr int MAX_CONT_SIZE = 256;
int main(int argc, char **argv) { int main(int argc, char **argv) {
int num_threads; int num_threads;
...@@ -39,43 +33,27 @@ int main(int argc, char **argv) { ...@@ -39,43 +33,27 @@ int main(int argc, char **argv) {
benchmark_runner::read_args(argc, argv, num_threads, directory); benchmark_runner::read_args(argc, argv, num_threads, directory);
string test_name = to_string(num_threads) + ".csv"; string test_name = to_string(num_threads) + ".csv";
string full_directory = directory + "/PLS_v2/"; string full_directory = directory + "/PLS_v3/";
benchmark_runner runner{full_directory, test_name}; benchmark_runner runner{full_directory, test_name};
static_scheduler_memory<MAX_NUM_THREADS, static_scheduler_memory<MAX_NUM_THREADS,
MAX_NUM_TASKS, MAX_NUM_TASKS,
MAX_NUM_CONTS, MAX_STACK_SIZE> static_scheduler_memory;
MAX_CONT_SIZE> static_scheduler_memory;
scheduler scheduler{static_scheduler_memory, (unsigned int) num_threads}; scheduler scheduler{static_scheduler_memory, (unsigned) num_threads};
volatile int res; volatile int res;
for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) { for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) {
scheduler.perform_work([&]() { scheduler.perform_work([&]() {
return scheduler::par([&]() { res = pls_fib(fib::INPUT_N);
return pls_fib(fib::INPUT_N);
}, []() {
return parallel_result<int>{0};
}).then([&](int result, int) {
res = result;
return parallel_result<int>{0};
});
}); });
} }
for (int i = 0; i < fib::NUM_ITERATIONS; i++) { for (int i = 0; i < fib::NUM_ITERATIONS; i++) {
scheduler.perform_work([&]() { scheduler.perform_work([&]() {
runner.start_iteration(); runner.start_iteration();
res = pls_fib(fib::INPUT_N);
return scheduler::par([&]() {
return pls_fib(fib::INPUT_N);
}, []() {
return parallel_result<int>{0};
}).then([&](int result, int) {
res = result;
runner.end_iteration(); runner.end_iteration();
return parallel_result<int>{0};
});
}); });
} }
runner.commit_results(true); runner.commit_results(true);
......
...@@ -24,11 +24,7 @@ continuation enter_context(assembly_bindings::stack_pointer_t stack_memory, size ...@@ -24,11 +24,7 @@ continuation enter_context(assembly_bindings::stack_pointer_t stack_memory, size
return continuation{assembly_bindings::__cs_enter_context(stack_base, captured_lambda, callback, stack_limit)}; return continuation{assembly_bindings::__cs_enter_context(stack_base, captured_lambda, callback, stack_limit)};
} }
continuation switch_context(continuation &&cont) { continuation switch_context(continuation &&cont);
assembly_bindings::continuation_t cont_pointer = cont.consume();
return continuation{assembly_bindings::__cs_switch_context(cont_pointer)};
}
} }
......
...@@ -19,6 +19,7 @@ namespace context_switcher { ...@@ -19,6 +19,7 @@ namespace context_switcher {
template<typename F> template<typename F>
struct lambda_capture { struct lambda_capture {
// TODO: Check if we need an extra template here to perform the move
explicit lambda_capture(F &&lambda) : lambda_{std::forward<F>(lambda)} {} explicit lambda_capture(F &&lambda) : lambda_{std::forward<F>(lambda)} {}
assembly_bindings::continuation_t operator()(assembly_bindings::continuation_t continuation_pointer) { assembly_bindings::continuation_t operator()(assembly_bindings::continuation_t continuation_pointer) {
......
#include "context_switcher/context_switcher.h"
namespace context_switcher {
continuation switch_context(continuation &&cont) {
assembly_bindings::continuation_t cont_pointer = cont.consume();
return continuation{assembly_bindings::__cs_switch_context(cont_pointer)};
}
}
...@@ -58,7 +58,12 @@ add_library(pls STATIC ...@@ -58,7 +58,12 @@ add_library(pls STATIC
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/cont_manager.h include/pls/internal/scheduling/cont_manager.h
include/pls/internal/scheduling/cont.h include/pls/internal/scheduling/cont.h
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h src/internal/base/error_handling.cpp include/pls/internal/data_structures/bounded_trading_deque.h ../context_switcher/src/context_switcher.cpp) include/pls/internal/data_structures/bounded_ws_deque.h
include/pls/internal/data_structures/optional.h
include/pls/internal/scheduling/memory_block.h
include/pls/internal/scheduling/thread_state_static.h
src/internal/base/error_handling.cpp
include/pls/internal/data_structures/bounded_trading_deque.h)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
...@@ -72,6 +77,7 @@ target_include_directories(pls ...@@ -72,6 +77,7 @@ target_include_directories(pls
# Add cmake dependencies here if needed # Add cmake dependencies here if needed
target_link_libraries(pls target_link_libraries(pls
Threads::Threads # pthread support Threads::Threads # pthread support
context_switcher # coroutine support
) )
if (EASY_PROFILER) if (EASY_PROFILER)
target_link_libraries(pls easy_profiler) target_link_libraries(pls easy_profiler)
...@@ -79,7 +85,7 @@ endif () ...@@ -79,7 +85,7 @@ endif ()
# Rules for istalling the library on a system # Rules for istalling the library on a system
# ...binaries # ...binaries
INSTALL(TARGETS pls INSTALL(TARGETS pls context_switcher
EXPORT pls-targets EXPORT pls-targets
LIBRARY LIBRARY
DESTINATION lib/pls DESTINATION lib/pls
......
#ifndef PLS_INTERNAL_SCHEDULING_CONT_H_
#define PLS_INTERNAL_SCHEDULING_CONT_H_
#include <type_traits>
#include <atomic>
#include <utility>
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/data_structures/delayed_initialization.h"
#include "pls/internal/base/alignment.h"
#include "pls/internal/base/error_handling.h"
#include "pls/internal/helpers/profiler.h"
#include "parallel_result.h"
#include "memory_block.h"
namespace pls {
namespace internal {
namespace scheduling {
class base_cont {
protected:
// We plan to only init the members for a continuation on the slow path.
// If we can execute everything inline we simply skip it saving runtime overhead.
template<typename T>
using delayed_init = data_structures::delayed_initialization<T>;
public:
explicit base_cont(base_cont *parent, memory_block *memory_block, bool is_right_child)
: parent_{parent},
memory_block_{memory_block},
is_right_child_{is_right_child} {
PLS_ASSERT(parent_ == nullptr || parent_->memory_block_->get_depth() == memory_block_->get_depth() - 1,
"Must only build cont chains with matching depth!")
};
/**
* Execute the continuation itself.
* Make sure to only call when all required results are in.
* Will store the result in it's parent, but not mess with any counters.
*/
virtual void execute() = 0;
/**
* Execute the right hand side task associated with the continuation.
* Will store the result in it's parent, but not mess with any counters.
*/
virtual void execute_task() = 0;
virtual base_task *get_task() = 0;
virtual void *get_right_result_pointer() = 0;
virtual void *get_left_result_pointer() = 0;
template<typename T>
void store_right_result(T &&result) {
using BASE_T = typename std::remove_cv<typename std::remove_reference<T>::type>::type;
reinterpret_cast<delayed_init<BASE_T> *>(get_right_result_pointer())->initialize(std::forward<T>(result));
}
template<typename T>
void store_left_result(T &&result) {
using BASE_T = typename std::remove_cv<typename std::remove_reference<T>::type>::type;
reinterpret_cast<delayed_init<BASE_T> *>(get_left_result_pointer())->initialize(std::forward<T>(result));
}
base_cont *get_parent() { return parent_; }
memory_block *get_memory_block() { return memory_block_; }
bool is_right_child() const { return is_right_child_; }
protected:
base_cont *parent_;
memory_block *memory_block_;
bool is_right_child_;
};
template<typename T2, typename R1, typename R2, typename F>
class cont : public base_cont {
private:
template<typename RES_TYPE>
struct result_runner {
// Strip off unwanted modifiers...
using BASE_RES_TYPE = typename std::remove_cv<typename std::remove_reference<RES_TYPE>::type>::type;
static void execute(cont &cont) {
parallel_result<BASE_RES_TYPE>
result{cont.function_((*cont.left_result_).value(), (*cont.right_result_).value())};
if (result.fast_path() && cont.parent_ != nullptr) {
if (cont.is_right_child()) {
cont.parent_->store_right_result(std::move(result));
} else {
cont.parent_->store_left_result(std::move(result));
}
}
}
};
template<typename INNER_TYPE>
struct result_runner<parallel_result<INNER_TYPE>> {
static void execute(cont &cont) {
auto result = cont.function_((*cont.left_result_).value(), (*cont.right_result_).value());
if (result.fast_path() && cont.parent_) {
if (cont.is_right_child()) {
cont.parent_->store_right_result(std::move(result));
} else {
cont.parent_->store_left_result(std::move(result));
}
}
}
};
public:
template<typename FARG, typename ...T2ARGS>
explicit cont(base_cont *parent,
memory_block *memory_block,
bool is_right_child,
FARG &&function,
T2ARGS...task_2_args):
base_cont(parent, memory_block, is_right_child),
function_{std::forward<FARG>(function)},
task_{std::forward<T2ARGS>(task_2_args)..., this} {};
void execute() override {
PROFILE_CONTINUATION("execute_cont");
using result_type = decltype(function_((*left_result_).value(), (*right_result_).value()));
result_runner<result_type>::execute(*this);
this->~cont();
auto *memory_block = this->get_memory_block();
memory_block->free_buffer();
memory_block->reset_state();
}
void execute_task() override {
task_.execute();
}
base_task *get_task() override {
return &task_;
}
void *get_left_result_pointer() override {
return &left_result_;
}
void *get_right_result_pointer() override {
return &right_result_;
}
private:
// Initial data members. These slow down the fast path, try to init them lazy when possible.
F function_;
T2 task_;
// Some fields/actual values stay uninitialized (save time on the fast path if we don not need them).
// More fields untouched on the fast path is good, but for ease of an implementation we only keep some for now.
delayed_init<R1> left_result_;
delayed_init<R2> right_result_;
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_CONT_H_
#ifndef PLS_CONT_MANAGER_H_
#define PLS_CONT_MANAGER_H_
#include <memory>
#include <utility>
#include <array>
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/scheduling/cont.h"
#include "pls/internal/scheduling/thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
class cont_manager {
public:
// Helper to pass the compile time constants to the constructor.
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
struct template_args {};
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args<NUM_CONTS, MAX_CONT_SIZE>)
: max_cont_size_{MAX_CONT_SIZE}, num_conts_{NUM_CONTS} {
// First node is currently active and our local start
active_node_ = init_memory_block<MAX_CONT_SIZE>(cont_storage, nullptr, 0);
// Build up chain after it
memory_block *current_node = active_node_;
for (size_t i = 1; i < NUM_CONTS; i++) {
memory_block *next_node = init_memory_block<MAX_CONT_SIZE>(cont_storage, current_node, i);
current_node->set_next(next_node);
current_node = next_node;
}
};
// Aquire and release memory blocks...
memory_block *get_next_memory_block() {
auto result = active_node_;
active_node_ = active_node_->get_next();
return result;
}
void return_memory_block() {
active_node_ = active_node_->get_prev();
}
void move_active_node(int depth) {
if (depth < 0) {
for (long i = 0; i < (depth * -1); i++) {
active_node_ = active_node_->get_prev();
}
} else {
for (long i = 0; i < depth; i++) {
active_node_ = active_node_->get_next();
}
}
}
void move_active_node_to_start() {
move_active_node(-1 * active_node_->get_depth());
}
memory_block *get_active_node() {
return active_node_;
}
bool is_clean() {
if (get_active_node()->get_depth() == 0) {
memory_block *current_node = active_node_;
for (size_t i = 1; i < num_conts_; i++) {
if (current_node->get_prev() != nullptr && current_node->get_prev()->get_next() != current_node) {
return false;
}
if (current_node->is_buffer_used()) {
return false;
}
current_node = current_node->get_next();
}
} else {
return false;
}
return true;
}
// Manage the fall through behaviour/slow path behaviour
bool falling_through() const {
return fall_through_;
}
void fall_through_and_notify_cont(base_cont *notified_cont, bool is_child_right) {
fall_through_ = true;
fall_through_cont_ = notified_cont;
fall_through_child_right = is_child_right;
}
void aquire_memory_chain(memory_block *target_chain) {
PLS_ASSERT(active_node_->get_depth() == target_chain->get_depth() + 1,
"Can only steal aquire chain parts with correct depth.");
active_node_->set_prev(target_chain);
target_chain->set_next(active_node_);
}
void execute_fall_through_code() {
PLS_ASSERT(falling_through(), "Must be falling through to execute the associated code.")
auto &my_state = thread_state::get();
// Copy fall through status and reset it (for potentially nested execution paths).
auto *notified_cont = fall_through_cont_;
fall_through_cont_ = nullptr;
fall_through_ = false;
// Keep the target chain before we execute, as this potentially frees the memory
auto *target_memory_block = notified_cont->get_memory_block();
auto *target_chain = target_memory_block->get_offered_chain().load();
// Notify the next continuation of finishing a child...
if (target_memory_block->get_results_missing().fetch_add(-1) == 1) {
// ... we finished the continuation.
// We are now in charge continuing to execute the above continuation chain.
PLS_ASSERT(active_node_->get_prev()->get_depth() == target_memory_block->get_depth(),
"We must hold the system invariant to be in the correct depth.")
if (active_node_->get_prev() != target_memory_block) {
// We do not own the thing we will execute.
// Own it by swapping the chain belonging to it in.
aquire_memory_chain(target_memory_block);
}
my_state.parent_cont_ = notified_cont->get_parent();
my_state.right_spawn_ = notified_cont->is_right_child();
active_node_ = target_memory_block;
notified_cont->execute();
if (!falling_through() && notified_cont->get_parent() != nullptr) {
fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child());
}
return;
} else {
// ... we did not finish the last continuation.
// We are no longer in charge of executing the above continuation chain.
PLS_ASSERT(active_node_->get_prev()->get_depth() == target_memory_block->get_depth(),
"We must hold the system invariant to be in the correct depth.")
if (active_node_->get_prev() == target_memory_block) {
// We own the thing we are not allowed to execute.
// Get rid of the ownership by using the offered chain.
aquire_memory_chain(target_chain);
}
move_active_node_to_start();
// We are done here...nothing more to execute
return;
}
}
private:
template<size_t MAX_CONT_SIZE>
static memory_block *init_memory_block(data_structures::aligned_stack &cont_storage,
memory_block *prev,
unsigned long depth) {
// Represents one cont_node and its corresponding memory buffer (as one continuous block of memory).
constexpr size_t buffer_size = MAX_CONT_SIZE - base::alignment::next_alignment(sizeof(memory_block));
char *memory_block_ptr = cont_storage.push_bytes<memory_block>();
char *memory_block_buffer_ptr = cont_storage.push_bytes(buffer_size);
return new(memory_block_ptr) memory_block(memory_block_buffer_ptr, buffer_size, prev, depth);
}
private:
const size_t max_cont_size_;
const size_t num_conts_;
/**
* Managing the continuation chain.
*/
memory_block *active_node_;
/**
* Managing falling through back to the scheduler.
*/
bool fall_through_{false};
bool fall_through_child_right{false};
base_cont *fall_through_cont_{nullptr};
};
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
class static_cont_manager {
public:
static_cont_manager()
: static_cont_storage_{},
cont_manager_(static_cont_storage_.get_stack(), cont_manager::template_args<NUM_CONTS, MAX_CONT_SIZE>{}) {}
cont_manager &get_cont_manager() { return cont_manager_; }
private:
data_structures::static_aligned_stack<NUM_CONTS * MAX_CONT_SIZE> static_cont_storage_;
cont_manager cont_manager_;
};
}
}
}
#endif //PLS_CONT_MANAGER_H_
#ifndef PLS_INTERNAL_SCHEDULING_CONT_NODE_H_
#define PLS_INTERNAL_SCHEDULING_CONT_NODE_H_
namespace pls {
namespace internal {
namespace scheduling {
/**
* A block of memory that can be used to store tasks and continuations.
* Threads trade these blocks while executing and stealing tasks.
*
* Each block has an associated, raw memory buffer. The user can place his object
* in this memory region as needed. He is responsible for calling deconstructors of the
* placed objects.
*/
class memory_block {
public:
memory_block(char *memory_buffer,
size_t memory_buffer_size,
memory_block *prev,
int depth)
: prev_{prev},
next_{nullptr},
offered_chain_{nullptr},
results_missing_{2},
memory_buffer_{memory_buffer},
memory_buffer_size_{memory_buffer_size},
memory_buffer_used_{false},
depth_{depth} {};
template<typename T, typename ...ARGS>
T *place_in_buffer(ARGS &&...args) {
PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node.");
memory_buffer_used_ = true;
auto *result = new(memory_buffer_) T(std::forward<ARGS>(args)...);
continuation_ = result;
return result;
}
void free_buffer() {
PLS_ASSERT(memory_buffer_used_, "Can only free a memory spot when it is in use.")
memory_buffer_used_ = false;
}
bool is_buffer_used() {
return memory_buffer_used_;
}
base_cont *get_cont() {
PLS_ASSERT(is_buffer_used(), "Can only read initialized buffer!");
return continuation_;
}
memory_block *get_prev() {
return prev_;
}
void set_prev(memory_block *prev) {
prev_ = prev;
}
memory_block *get_next() {
return next_;
}
void set_next(memory_block *next) {
next_ = next;
}
std::atomic<memory_block *> &get_offered_chain() {
return offered_chain_;
}
std::atomic<unsigned short> &get_results_missing() {
return results_missing_;
}
int get_depth() const noexcept {
return depth_;
}
void reset_state() {
offered_chain_.store(nullptr);
results_missing_.store(2);
}
private:
// Linked list property of memory blocks (a complete list represents a threads currently owned memory).
// Each block knows its chain start to allow stealing a whole chain in O(1)
// without the need to traverse back to the chain start.
memory_block *prev_, *next_;
// When blocked on this chain element, we need to know what other chain of memory we
// got offered by the stealing thread.
// For this we need the offered chain's element up to the point we can steal.
std::atomic<memory_block *> offered_chain_;
// Management for coordinating concurrent result writing and stealing.
// The result count decides atomically who gets to execute the continuation
// and who therefore get's to own this memory block chain.
std::atomic<unsigned short> results_missing_;
// Pointer to memory region reserved for the companion continuation.
// Must be a buffer big enough to hold any continuation encountered in the program.
// This memory is managed explicitly by the continuation manager and runtime system
// (they need to make sure to always call de-constructors and never allocate two continuations).
char *memory_buffer_;
base_cont *continuation_;
// These two are only helper properties helping with bugs during development.
size_t memory_buffer_size_;
bool memory_buffer_used_;
// Each element stays at a fixed depth for the entire application run.
// Swapping parts of a memory chain will not reorder it, as always parts of
// the same size are exchanged.
const int depth_;
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_CONT_NODE_H_
#ifndef PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_
#define PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_
#include <utility>
#include "pls/internal/data_structures/delayed_initialization.h"
namespace pls {
namespace internal {
namespace scheduling {
// Used to more enforce the use of parallel_results
class parallel_result_base {};
template<typename T>
class parallel_result : public parallel_result_base {
public:
using value_type = T;
parallel_result() = default;
parallel_result(parallel_result &&other) noexcept : val_{std::move(other.val_)} {}
parallel_result(const parallel_result &other) noexcept : val_{other.val_} {}
parallel_result(T val) : val_{std::move(val)} {}
bool fast_path() { return val_.initialized(); }
T &value() { return val_.object(); }
private:
data_structures::delayed_initialization<T> val_;
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_PARALLEL_RESULT_H_
...@@ -53,23 +53,20 @@ class scheduler { ...@@ -53,23 +53,20 @@ class scheduler {
template<typename Function> template<typename Function>
void perform_work(Function work_section); void perform_work(Function work_section);
template<typename Function>
void spawn(Function &&lambda) {
// TODO: place function on next active
// TODO: capture continuation in current active
// TODO: advance current active
// TODO: after finish, return to last active (if not stolen)
// TODO: revert current active
}
/** /**
* Explicitly terminate the worker threads. Scheduler must not be used after this. * Explicitly terminate the worker threads. Scheduler must not be used after this.
*/ */
void terminate(); void terminate();
/**
* Temporary object used for the parallel(...).then(...) API.
*/
template<typename F1, typename F2>
struct starter;
template<typename F1, typename F2>
starter<F1, F2> invoke_parallel(F1 &&function_1, F2 &&function_2);
template<typename F1, typename F2>
static starter<F1, F2> par(F1 &&function_1, F2 &&function_2);
unsigned int num_threads() const { return num_threads_; } unsigned int num_threads() const { return num_threads_; }
private: private:
...@@ -77,7 +74,6 @@ class scheduler { ...@@ -77,7 +74,6 @@ class scheduler {
void work_thread_work_section(); void work_thread_work_section();
thread_state &thread_state_for(size_t id); thread_state &thread_state_for(size_t id);
friend class base_task;
const unsigned int num_threads_; const unsigned int num_threads_;
const bool reuse_thread_; const bool reuse_thread_;
scheduler_memory &memory_; scheduler_memory &memory_;
......
...@@ -3,134 +3,17 @@ ...@@ -3,134 +3,17 @@
#define PLS_SCHEDULER_IMPL_H #define PLS_SCHEDULER_IMPL_H
#include <utility> #include <utility>
#include "pls/internal/scheduling/cont.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/task.h"
#include "context_switcher/context_switcher.h"
#include "context_switcher/continuation.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
template<typename F1, typename F2>
struct scheduler::starter {
F1 function_1_;
F2 function_2_;
using return_type_1 = decltype(function_1_());
using return_type_2 = decltype(function_2_());
// Enforce correct return types of lambdas (parallel_result)
static_assert(std::is_base_of<parallel_result_base, return_type_1>::value,
"Must only return parallel results in parallel code");
static_assert(std::is_base_of<parallel_result_base, return_type_2>::value,
"Must only return parallel results in parallel code");
template<typename F1ARG, typename F2ARG>
explicit starter(F1ARG &&function_1, F2ARG &&function_2) : function_1_{std::forward<F1ARG>(function_1)},
function_2_{std::forward<F2ARG>(function_2)} {};
template<typename FCONT>
auto then(FCONT &&cont_function)
-> decltype(cont_function(std::declval<typename return_type_1::value_type>(),
std::declval<typename return_type_2::value_type>())) {
PROFILE_FAST_PATH("then");
using continuation_type = cont<task<F2>, return_type_1, return_type_2, FCONT>;
using result_type = decltype(cont_function(std::declval<typename return_type_1::value_type>(),
std::declval<typename return_type_2::value_type>()));
auto &my_state = thread_state::get();
auto &cont_manager = my_state.get_cont_manager();
// Select current memory block.
// For now directly copy both the continuation function and the second task.
// (We might optimize this in the future to require less memory copies.)
auto *current_memory_block = cont_manager.get_next_memory_block();
// We keep track of the last spawn to build up the parent_cont chain
const bool is_right_cont = my_state.right_spawn_;
base_cont *parent_cont = my_state.parent_cont_;
continuation_type *current_cont = current_memory_block->place_in_buffer<continuation_type>(parent_cont,
current_memory_block,
is_right_cont,
cont_function,
function_2_);
my_state.parent_cont_ = current_cont;
// Publish the second task.
my_state.get_task_manager().publish_task(current_cont->get_task());
// Call first function on fast path
my_state.right_spawn_ = false;
return_type_1 result_1 = function_1_();
if (!result_1.fast_path()) {
// Get our replacement from the task stack and store it for later use when we are actually blocked.
auto traded_memory = my_state.get_task_manager().try_pop_local();
current_cont->get_memory_block()->get_offered_chain().store(*traded_memory);
// Unwind stack...
return result_type{};
}
// Try to call second function on fast path
auto traded_memory = my_state.get_task_manager().try_pop_local();
if (traded_memory) {
// The task got stolen...get_memory_block
// ...but we got a memory block that can be used if we block on this one.
current_cont->get_memory_block()->get_offered_chain().store(*traded_memory);
// Main scheduling loop is responsible for entering the result to the slow path...
current_cont->store_left_result(std::move(result_1));
cont_manager.fall_through_and_notify_cont(current_cont, false);
// Unwind stack...
return result_type{};
} else {
my_state.right_spawn_ = true;
return_type_2 result_2 = function_2_();
if (!result_2.fast_path()) {
// Main scheduling loop is responsible for entering the result to the slow path...
current_cont->store_left_result(std::move(result_1));
current_cont->get_memory_block()->get_results_missing().fetch_add(-1);
// Unwind stack...
return result_type{};
}
// We fully got all results, inline as good as possible.
// This is the common case, branch prediction should be rather good here.
// Just return the cont object unused and directly call the function.
current_cont->~continuation_type();
current_memory_block->free_buffer();
cont_manager.return_memory_block();
// The continuation has the same execution environment as we had for the children.
// We need this to allow spawns in there.
my_state.parent_cont_ = parent_cont;
my_state.right_spawn_ = is_right_cont;
auto cont_result = cont_function(result_1.value(), result_2.value());
if (!cont_result.fast_path()) {
// Unwind stack...
return result_type{};
}
return cont_result;
}
};
};
template<typename F1, typename F2>
scheduler::starter<F1, F2> scheduler::invoke_parallel(F1 &&function_1, F2 &&function_2) {
return scheduler::starter<F1, F2>{std::forward<F1>(function_1), std::forward<F2>(function_2)};
}
template<typename F1, typename F2>
scheduler::starter<F1, F2> scheduler::par(F1 &&function_1, F2 &&function_2) {
return thread_state::get().get_scheduler().invoke_parallel(std::forward<F1>(function_1),
std::forward<F2>(function_2));
}
class scheduler::init_function { class scheduler::init_function {
public: public:
virtual void run() = 0; virtual void run() = 0;
...@@ -140,15 +23,12 @@ class scheduler::init_function_impl : public init_function { ...@@ -140,15 +23,12 @@ class scheduler::init_function_impl : public init_function {
public: public:
explicit init_function_impl(F &function) : function_{function} {} explicit init_function_impl(F &function) : function_{function} {}
void run() override { void run() override {
scheduler::par([]() { auto &thread_state = thread_state::get();
return parallel_result<int>{0}; thread_state.get_task_manager().get_active_task().run_as_task([&](context_switcher::continuation cont) {
}, [=]() { function_();
return function_(); return std::move(cont);
}).then([=](int, int b) {
thread_state::get().get_scheduler().work_section_done_ = true;
return parallel_result<int>{b};
}); });
thread_state.get_scheduler().work_section_done_.store(true);
} }
private: private:
F &function_; F &function_;
......
...@@ -11,8 +11,6 @@ namespace pls { ...@@ -11,8 +11,6 @@ namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
void worker_routine();
class scheduler_memory { class scheduler_memory {
// Note: scheduler_memory is a pure interface and has no data. // Note: scheduler_memory is a pure interface and has no data.
// By not having an initialization routine we can do our 'static and heap specialization' // By not having an initialization routine we can do our 'static and heap specialization'
...@@ -29,7 +27,7 @@ class scheduler_memory { ...@@ -29,7 +27,7 @@ class scheduler_memory {
} }
}; };
template<size_t MAX_THREADS, size_t NUM_TASKS, size_t NUM_CONTS, size_t MAX_CONT_SIZE> template<size_t MAX_THREADS, size_t NUM_TASKS, size_t STACK_SIZE>
class static_scheduler_memory : public scheduler_memory { class static_scheduler_memory : public scheduler_memory {
public: public:
static_scheduler_memory() : scheduler_memory{} { static_scheduler_memory() : scheduler_memory{} {
...@@ -47,14 +45,14 @@ class static_scheduler_memory : public scheduler_memory { ...@@ -47,14 +45,14 @@ class static_scheduler_memory : public scheduler_memory {
return threads_[id]; return threads_[id];
} }
private: private:
using thread_state_type = thread_state_static<NUM_TASKS, NUM_CONTS, MAX_CONT_SIZE>; using thread_state_type = thread_state_static<NUM_TASKS, STACK_SIZE>;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<base::thread, MAX_THREADS> threads_; alignas(base::system_details::CACHE_LINE_SIZE) std::array<base::thread, MAX_THREADS> threads_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state_type, MAX_THREADS> thread_states_; alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state_type, MAX_THREADS> thread_states_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state *, MAX_THREADS> thread_state_pointers_; alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state *, MAX_THREADS> thread_state_pointers_;
}; };
template<size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE> template<size_t NUM_TASKS, size_t STACK_SIZE>
class heap_scheduler_memory : public scheduler_memory { class heap_scheduler_memory : public scheduler_memory {
public: public:
explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads}, explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads},
...@@ -80,7 +78,7 @@ class heap_scheduler_memory : public scheduler_memory { ...@@ -80,7 +78,7 @@ class heap_scheduler_memory : public scheduler_memory {
return thread_vector_[id]; return thread_vector_[id];
} }
private: private:
using thread_state_type = thread_state_static<NUM_TASKS, NUM_CONTS, MAX_CONT_SIZE>; using thread_state_type = thread_state_static<NUM_TASKS, STACK_SIZE>;
// thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require // thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require
// the new operator to obey alignments bigger than 16, cache lines are usually 64). // the new operator to obey alignments bigger than 16, cache lines are usually 64).
// To allow this object to be allocated using 'new' (which the vector does internally), // To allow this object to be allocated using 'new' (which the vector does internally),
......
#ifndef PLS_TASK_H #ifndef PLS_TASK_H
#define PLS_TASK_H #define PLS_TASK_H
#include "pls/internal/scheduling/cont.h" #include <utility>
#include "pls/internal/scheduling/memory_block.h"
#include "pls/internal/helpers/profiler.h" #include "context_switcher/continuation.h"
#include "context_switcher/context_switcher.h"
#include "pls/internal/base/system_details.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
/** /**
* A task to be executed by the runtime system. * A task is the smallest unit of execution seen by the runtime system.
* Tasks are guaranteed to be executed exactly once.
* *
* Override the execute_internal() method for your custom code. * Tasks represent a action dispatched by a potentially parallel call.
*/ * Tasks have their own execution context (stack and register state), making them stackefull coroutines.
class base_task { * Tasks can be suspended and resumed (stealing happens by resuming a task).
public: *
/** * Being coroutines tasks go through a very deliberate state machine:
* Executes the task and stores its result in the correct continuation. * - initialized (no execution state)
* The caller must handle finishing the continuation/informing it that task two was finished. * - running (currently executing user code)
* - suspended (suspended by switching to a different task).
*/ */
void execute() { struct alignas(base::system_details::CACHE_LINE_SIZE) task {
execute_internal(); void init(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) {
} stack_memory_ = stack_memory;
stack_size_ = stack_size;
base_cont *get_cont() { depth_ = depth;
return cont_; thread_id_ = thread_id;
} }
protected: unsigned get_thread_id() const {
explicit base_task(base_cont *cont) : cont_{cont} {}; return thread_id_;
}
void set_thread_id(unsigned thread_id) {
thread_id_ = thread_id;
}
/** task *get_prev() const {
* Overwrite this with the actual behaviour of concrete tasks. return prev_;
*/ }
virtual void execute_internal() = 0; void set_prev(task *prev) {
prev_ = prev;
}
base_cont *cont_; task *get_next() const {
}; return next_;
}
void set_next(task *next) {
next_ = next;
}
template<typename F> task *get_parent_task() const {
class task : public base_task { return parent_task_;
public:
template<typename FARG>
explicit task(FARG &&function, base_cont *cont)
: base_task{cont}, function_{std::forward<FARG>(function)} {}
void execute_internal() override {
PROFILE_TASK("execute_internal")
auto result = function_();
if (result.fast_path()) {
cont_->store_right_result(std::move(result));
} }
void set_parent_task(task *parent_task) {
parent_task_ = parent_task;
}
template<typename F>
context_switcher::continuation run_as_task(F &&lambda) {
return context_switcher::enter_context(stack_memory_, stack_size_, std::forward<F>(lambda));
} }
private: private:
F function_; // Stack/Continuation Management
char *stack_memory_;
size_t stack_size_; // TODO: We do not need this in every single task...
context_switcher::continuation continuation_;
// Task Tree (we have a parent that we want to continue when we finish)
task *parent_task_;
unsigned depth_;
unsigned thread_id_;
// Memory Linked List
task *prev_;
task *next_;
}; };
} }
......
...@@ -5,18 +5,11 @@ ...@@ -5,18 +5,11 @@
#include <memory> #include <memory>
#include <utility> #include <utility>
#include <array> #include <array>
#include <mutex>
#include <atomic>
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/cont_manager.h"
#include "pls/internal/scheduling/memory_block.h"
#include "pls/internal/data_structures/bounded_trading_deque.h" #include "pls/internal/data_structures/bounded_trading_deque.h"
#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/optional.h"
#include "pls/internal/base/spin_lock.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
...@@ -28,58 +21,59 @@ namespace scheduling { ...@@ -28,58 +21,59 @@ namespace scheduling {
*/ */
class task_manager { class task_manager {
public: public:
// Publishes a task on the stack, i.e. makes it visible for other threads to steal. explicit task_manager(task *tasks,
void publish_task(base_task *task) { data_structures::aligned_stack static_stack_space,
task_deque_.push_bot(task->get_cont()->get_memory_block()); size_t num_tasks,
size_t stack_size) {
for (size_t i = 0; i < num_tasks - 1; i++) {
tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0);
if (i > 0) {
tasks[i].set_prev(&tasks[i - 1]);
}
if (i < num_tasks - 2) {
tasks[i].set_next(&tasks[i + 1]);
} }
// Try to pop a local task from this task managers stack.
data_structures::optional<memory_block *> try_pop_local() {
return task_deque_.pop_bot().traded_;
} }
// Try to steal a task from a remote task_manager instance. The stolen task must be stored locally. num_tasks_ = num_tasks;
// Returns a pair containing the actual task and if the steal was successful. this_thread_tasks_ = tasks;
base_task *steal_remote_task(cont_manager &stealing_cont_manager) { active_task_ = &tasks[0];
auto peek = task_deque_.peek_top(); }
if (std::get<0>(peek)) {
memory_block *peeked_memory_block = (*std::get<0>(peek));
auto peeked_depth = peeked_memory_block->get_depth();
stealing_cont_manager.move_active_node(peeked_depth);
auto offered_chain = stealing_cont_manager.get_active_node();
stealing_cont_manager.move_active_node(1);
auto stolen_memory_block = task_deque_.pop_top(offered_chain, std::get<1>(peek)); task &get_this_thread_task(size_t depth) {
if (stolen_memory_block) { return this_thread_tasks_[depth];
PLS_ASSERT(*stolen_memory_block == peeked_memory_block, "Steal must only work if it is equal!"); }
return (*stolen_memory_block)->get_cont()->get_task(); void set_thread_id(unsigned id) {
} else { for (size_t i = 0; i < num_tasks_; i++) {
stealing_cont_manager.move_active_node(-(peeked_depth + 1)); this_thread_tasks_[i].set_thread_id(id);
return nullptr;
} }
} }
return nullptr; task &get_active_task() {
return *active_task_;
} }
explicit task_manager(data_structures::bounded_trading_deque<memory_block, memory_block> &task_deque) :
task_deque_{task_deque} {}
private: private:
data_structures::bounded_trading_deque<memory_block, memory_block> &task_deque_; size_t num_tasks_;
task *this_thread_tasks_;
task *active_task_;
}; };
template<size_t NUM_TASKS> template<size_t NUM_TASKS, size_t STACK_SIZE>
class static_task_manager { class static_task_manager {
public: public:
static_task_manager() : task_deque_{}, task_manager_{task_deque_.get_deque()} {}; static_task_manager()
: tasks_{},
static_stack_storage_{},
task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE} {};
task_manager &get_task_manager() { return task_manager_; } task_manager &get_task_manager() { return task_manager_; }
private: private:
data_structures::static_bounded_trading_deque<memory_block, memory_block, NUM_TASKS> task_deque_; std::array<task, NUM_TASKS> tasks_;
data_structures::static_aligned_stack<NUM_TASKS * STACK_SIZE> static_stack_storage_;
task_manager task_manager_; task_manager task_manager_;
}; };
......
...@@ -3,54 +3,34 @@ ...@@ -3,54 +3,34 @@
#define PLS_THREAD_STATE_H #define PLS_THREAD_STATE_H
#include <random> #include <random>
#include <memory>
#include <array>
#include <chrono> #include <chrono>
#include "pls/internal/scheduling/task_manager.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
// forward declaration
class task_manager;
class cont_manager;
class scheduler; class scheduler;
class base_task; struct task;
class base_cont;
struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
private:
scheduler *scheduler_; scheduler *scheduler_;
size_t id_; unsigned id_;
// Keep track of the last spawn state (needed to chain tasks/conts correctly)
bool right_spawn_;
base_cont *parent_cont_;
// TODO: Set this when spawning!
// See if we should move this to the cont manager...seems like a better fit!
task_manager &task_manager_; task_manager &task_manager_;
cont_manager &cont_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) base_task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
public: public:
thread_state(task_manager &task_manager, explicit thread_state(task_manager &task_manager) :
cont_manager &cont_manager) :
scheduler_{nullptr}, scheduler_{nullptr},
id_{0}, id_{0},
right_spawn_{false},
parent_cont_{nullptr},
task_manager_{task_manager}, task_manager_{task_manager},
cont_manager_{cont_manager},
current_task_{nullptr}, current_task_{nullptr},
random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {}; random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {};
void reset() {
right_spawn_ = false;
parent_cont_ = nullptr;
}
/** /**
* Convenience helper to get the thread_state instance associated with this thread. * Convenience helper to get the thread_state instance associated with this thread.
* Must only be called on threads that are associated with a thread_state, * Must only be called on threads that are associated with a thread_state,
...@@ -60,10 +40,19 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { ...@@ -60,10 +40,19 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
*/ */
static thread_state &get() { return *base::this_thread::state<thread_state>(); } static thread_state &get() { return *base::this_thread::state<thread_state>(); }
size_t get_id() { return id_; } unsigned get_id() { return id_; }
void set_id(unsigned id) {
id_ = id;
task_manager_.set_thread_id(id);
}
task_manager &get_task_manager() { return task_manager_; } task_manager &get_task_manager() { return task_manager_; }
cont_manager &get_cont_manager() { return cont_manager_; }
scheduler &get_scheduler() { return *scheduler_; } scheduler &get_scheduler() { return *scheduler_; }
void set_scheduler(scheduler *scheduler) {
scheduler_ = scheduler;
}
long get_rand() {
return random_();
}
// Do not allow move/copy operations. // Do not allow move/copy operations.
// State is a pure memory container with references/pointers into it from all over the code. // State is a pure memory container with references/pointers into it from all over the code.
...@@ -73,6 +62,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { ...@@ -73,6 +62,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
thread_state(const thread_state &) = delete; thread_state(const thread_state &) = delete;
thread_state &operator=(const thread_state &) = delete; thread_state &operator=(const thread_state &) = delete;
}; };
} }
......
...@@ -3,8 +3,6 @@ ...@@ -3,8 +3,6 @@
#define PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ #define PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_
#include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/cont_manager.h"
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
#include "thread_state.h" #include "thread_state.h"
...@@ -13,18 +11,16 @@ namespace pls { ...@@ -13,18 +11,16 @@ namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
template<size_t NUM_TASKS, size_t NUM_CONTS, size_t MAX_CONT_SIZE> template<size_t NUM_TASKS, size_t STACK_SIZE>
struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state_static { struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state_static {
public: public:
thread_state_static() thread_state_static()
: static_task_manager_{}, : static_task_manager_{},
static_cont_manager_{}, thread_state_{static_task_manager_.get_task_manager()} {}
thread_state_{static_task_manager_.get_task_manager(), static_cont_manager_.get_cont_manager()} {}
thread_state &get_thread_state() { return thread_state_; } thread_state &get_thread_state() { return thread_state_; }
private: private:
alignas(base::system_details::CACHE_LINE_SIZE) static_task_manager<NUM_TASKS> static_task_manager_; alignas(base::system_details::CACHE_LINE_SIZE) static_task_manager<NUM_TASKS, STACK_SIZE> static_task_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) static_cont_manager<NUM_CONTS, MAX_CONT_SIZE> static_cont_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) thread_state thread_state_; alignas(base::system_details::CACHE_LINE_SIZE) thread_state thread_state_;
}; };
......
...@@ -21,8 +21,8 @@ scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, b ...@@ -21,8 +21,8 @@ scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, b
for (unsigned int i = 0; i < num_threads_; i++) { for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized. // Placement new is required, as the memory of `memory_` is not required to be initialized.
memory.thread_state_for(i).scheduler_ = this; memory.thread_state_for(i).set_scheduler(this);
memory.thread_state_for(i).id_ = i; memory.thread_state_for(i).set_id(i);
if (reuse_thread && i == 0) { if (reuse_thread && i == 0) {
continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
...@@ -55,8 +55,7 @@ void scheduler::work_thread_main_loop() { ...@@ -55,8 +55,7 @@ void scheduler::work_thread_main_loop() {
void scheduler::work_thread_work_section() { void scheduler::work_thread_work_section() {
auto &my_state = thread_state::get(); auto &my_state = thread_state::get();
my_state.reset(); auto &my_task_manager = my_state.get_task_manager();
auto &my_cont_manager = my_state.get_cont_manager();
auto const num_threads = my_state.get_scheduler().num_threads(); auto const num_threads = my_state.get_scheduler().num_threads();
auto const my_id = my_state.get_id(); auto const my_id = my_state.get_id();
...@@ -67,41 +66,23 @@ void scheduler::work_thread_work_section() { ...@@ -67,41 +66,23 @@ void scheduler::work_thread_work_section() {
} }
do { do {
// Work off pending continuations we need to execute locally
while (my_cont_manager.falling_through()) {
my_cont_manager.execute_fall_through_code();
}
// Steal Routine (will be continuously executed when there are no more fall through's). // Steal Routine (will be continuously executed when there are no more fall through's).
// TODO: move into separate function // TODO: move into separate function
const size_t offset = my_state.random_() % num_threads; // const size_t offset = my_state.get_rand() % num_threads;
const size_t max_tries = num_threads; // const size_t max_tries = num_threads;
for (size_t i = 0; i < max_tries; i++) { // for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % num_threads; // size_t target = (offset + i) % num_threads;
auto &target_state = my_state.get_scheduler().thread_state_for(target); // auto &target_state = my_state.get_scheduler().thread_state_for(target);
//
PLS_ASSERT(my_cont_manager.is_clean(), "Only steal with clean chain!"); // auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager);
PROFILE_STEALING("steal") // if (stolen_task != nullptr) {
auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager); // stolen_task->execute();
PROFILE_END_BLOCK; // }
if (stolen_task != nullptr) { // }
my_state.parent_cont_ = stolen_task->get_cont();
my_state.right_spawn_ = true;
stolen_task->execute();
if (my_cont_manager.falling_through()) {
break;
} else {
my_cont_manager.fall_through_and_notify_cont(stolen_task->get_cont(), true);
break;
}
}
}
// if (!my_cont_manager.falling_through()) { // if (!my_cont_manager.falling_through()) {
// base::this_thread::sleep(5); // base::this_thread::sleep(5);
// } // }
} while (!work_section_done_); } while (!work_section_done_);
PLS_ASSERT(my_cont_manager.is_clean(), "Only finish work section with clean chain!");
} }
void scheduler::terminate() { void scheduler::terminate() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment