Commit e34ea267 by FritzFlorian

Working version of our trading-deque

parent 4cf3848f
Pipeline #1363 failed with stages
in 38 seconds
......@@ -90,11 +90,10 @@ complex_vector prepare_input(int input_size) {
return data;
}
static constexpr int NUM_ITERATIONS = 500;
constexpr size_t NUM_THREADS = 8;
static constexpr int NUM_ITERATIONS = 1000;
constexpr size_t NUM_THREADS = 5;
constexpr size_t NUM_TASKS = 128;
constexpr size_t MAX_TASK_STACK_SIZE = 0;
constexpr size_t NUM_CONTS = 128;
constexpr size_t MAX_CONT_SIZE = 512;
......@@ -104,7 +103,6 @@ int main() {
static_scheduler_memory<NUM_THREADS,
NUM_TASKS,
MAX_TASK_STACK_SIZE,
NUM_CONTS,
MAX_CONT_SIZE> static_scheduler_memory;
......@@ -127,14 +125,14 @@ int main() {
std::cout << "Framework: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< std::endl;
// start = std::chrono::steady_clock::now();
// for (int i = 0; i < NUM_ITERATIONS; i++) {
// complex_vector input_1(initial_input);
// fft_normal(input_1.begin(), INPUT_SIZE);
// }
// end = std::chrono::steady_clock::now();
// std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
// << std::endl;
start = std::chrono::steady_clock::now();
for (int i = 0; i < NUM_ITERATIONS; i++) {
complex_vector input_1(initial_input);
fft_normal(input_1.begin(), INPUT_SIZE);
}
end = std::chrono::steady_clock::now();
std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< std::endl;
return 0;
}
......@@ -4,14 +4,14 @@
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/base/thread.h"
#include "pls/internal/data_structures/bounded_trading_deque.h"
using namespace pls::internal;
constexpr size_t NUM_THREADS = 1;
constexpr size_t NUM_TASKS = 128;
constexpr size_t MAX_TASK_STACK_SIZE = 0;
static constexpr int NUM_ITERATIONS = 100;
constexpr size_t NUM_CONTS = 128;
constexpr size_t MAX_CONT_SIZE = 256;
......@@ -45,33 +45,37 @@ scheduling::parallel_result<int> fib(int n) {
});
}
static volatile int result;
int main() {
scheduling::static_scheduler_memory<NUM_THREADS,
NUM_TASKS,
MAX_TASK_STACK_SIZE,
NUM_CONTS,
MAX_CONT_SIZE> static_scheduler_memory;
scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS};
auto start = std::chrono::steady_clock::now();
std::cout << "fib = " << fib_normal(30) << std::endl;
for (int i = 0; i < NUM_ITERATIONS; i++) {
result = fib_normal(30);
}
auto end = std::chrono::steady_clock::now();
std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< std::endl;
start = std::chrono::steady_clock::now();
for (int i = 0; i < NUM_ITERATIONS; i++) {
scheduler.perform_work([]() {
return scheduling::scheduler::par([]() {
return scheduling::parallel_result<int>(0);
}, []() {
return fib(30);
}).then([](int, int b) {
std::cout << "fib = " << b << std::endl;
result = b;
return scheduling::parallel_result<int>{0};
});
});
}
end = std::chrono::steady_clock::now();
std::cout << "Framework: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << std::endl;
......
......@@ -15,6 +15,8 @@
void pls_error(const char *msg);
#define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); }
// TODO: Distinguish between debug/internal asserts and production asserts.
// TODO: Re-Enable Asserts
#define PLS_ASSERT(cond, msg) //if (!(cond)) { pls_error(msg); }
#endif //PLS_ERROR_HANDLING_H
......@@ -3,7 +3,7 @@
#define PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_TRADING_DEQUE_H_
#include <atomic>
#include <pls/internal/base/system_details.h>
#include <tuple>
#include "pls/internal/base/error_handling.h"
#include "pls/internal/base/system_details.h"
......@@ -17,31 +17,45 @@ namespace data_structures {
template<typename TradedType>
class traded_field {
static_assert(base::system_details::CACHE_LINE_SIZE >= 4,
"Traded objects must not use their last address bits, as we use them for status flags."
"As traded objects are usually cache aligned, we need big enough cache lines.");
// TODO: Replace unsigned long with a portable sized integer
// (some systems might have different pointer sizes to long sizes).
static constexpr unsigned long SHIFT = 0x2lu;
static constexpr unsigned long TAG_BITS = 0x3lu;
static constexpr unsigned long RELEVANT_BITS = ~TAG_BITS;
static constexpr unsigned long EMPTY_TAG = 0x0lu;
static constexpr unsigned long STAMP_TAG = 0x1lu;
static constexpr unsigned long TRADE_TAG = 0x2lu;
void fill_with_tag(unsigned long tag) {
pointer_ = (void *) ((tag << 1lu) | 0x1lu);
public:
void fill_with_stamp(unsigned long stamp) {
pointer_ = (void *) ((stamp << SHIFT) | STAMP_TAG);
}
unsigned long get_tag() {
unsigned long get_stamp() {
PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one.");
return ((unsigned long) (pointer_)) >> 1lu;
return ((unsigned long) pointer_) >> SHIFT;
}
bool is_filled_with_tag() {
return ((unsigned long) (pointer_)) & 0x1lu;
return (((unsigned long) pointer_) & TAG_BITS) == STAMP_TAG;
}
void fill_with_object(TradedType *object) {
PLS_ASSERT((object & 0x1lu) == 0,
void fill_with_trade_object(TradedType *trade_object) {
PLS_ASSERT((((unsigned long) trade_object) & TAG_BITS) == 0,
"Must only store aligned objects in this data structure (last bits are needed for tag bit)");
pointer_ = object;
pointer_ = reinterpret_cast<void *>(((unsigned long) trade_object) | TRADE_TAG);
}
TradedType *get_object() {
TradedType *get_trade_object() {
PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one.");
return pointer_;
return reinterpret_cast<TradedType *>(((unsigned long) pointer_) & RELEVANT_BITS);
}
bool is_filled_with_object() {
return !is_filled_with_tag() && pointer_ != nullptr;
return (((unsigned long) pointer_) & TAG_BITS) == TRADE_TAG;
}
bool is_empty() {
return (((unsigned long) pointer_) & TAG_BITS) == EMPTY_TAG;
}
private:
......@@ -50,19 +64,21 @@ class traded_field {
template<typename EntryType, typename TradedType>
class alignas(base::system_details::CACHE_LINE_SIZE) trading_deque_entry {
public:
/*
* Fill the slot with its initial values, making it ready for being stolen.
* Performs no synchronization/memory ordering constraints.
*
* Method is called to init a field on pushBot.
*/
void fill_slots(EntryType *entry_item, unsigned long tag) {
void fill_slots(EntryType *entry_item, unsigned long expected_stamp) {
entry_slot_.store(entry_item, std::memory_order_relaxed);
forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed);
// Relaxed is fine for this, as adding elements is synced over the bot pointer
auto old = trade_slot_.load(std::memory_order_relaxed);
old.fill_with_tag(tag);
trade_slot_.store(std::memory_order_relaxed);
old.fill_with_stamp(expected_stamp);
trade_slot_.store(old, std::memory_order_relaxed);
}
/**
......@@ -78,7 +94,7 @@ class alignas(base::system_details::CACHE_LINE_SIZE) trading_deque_entry {
if (old_field_value.is_filled_with_tag()) {
return optional<TradedType *>();
} else {
return optional<TradedType *>(old_field_value.get_object());
return optional<TradedType *>(old_field_value.get_trade_object());
}
}
......@@ -86,25 +102,34 @@ class alignas(base::system_details::CACHE_LINE_SIZE) trading_deque_entry {
return entry_slot_;
}
optional<EntryType *> trade_object(TradedType *offered_object, unsigned long expected_tag) {
bool is_empty() {
return trade_slot_.load(std::memory_order_seq_cst).is_empty();
}
optional<EntryType *> trade_object(TradedType *offered_object, unsigned long &expected_stamp) {
// Read our potential result
EntryType *result = entry_slot_.load(std::memory_order_relaxed);
unsigned long forwarding_stamp = forwarding_stamp_.load(std::memory_order_relaxed);
// Try to get it by CAS with the expected field entry, giving up our offered_object for it
traded_field<TradedType> expected_field;
expected_field.fill_with_tag(expected_tag);
expected_field.fill_with_stamp(expected_stamp);
traded_field<TradedType> offered_field;
offered_field.fill_with_object(offered_object);
offered_field.fill_with_trade_object(offered_object);
if (trade_slot_.compare_exchange_strong(expected_field, offered_field, std::memory_order_acq_rel)) {
return optional<EntryType *>(result);
return optional<EntryType *>{result};
} else {
return optional<EntryType *>(nullptr);
if (expected_field.is_empty()) {
expected_stamp = forwarding_stamp;
}
return optional<EntryType *>{};
}
}
private:
std::atomic<EntryType *> entry_slot_{nullptr};
std::atomic<unsigned long> forwarding_stamp_{};
std::atomic<traded_field<TradedType>> trade_slot_{};
};
......@@ -132,9 +157,9 @@ class bounded_trading_deque {
void push_bot(EntryType *offered_object) {
auto expected_stamp = bot_internal_.stamp;
auto *current_entry = entries_[bot_internal_.value];
auto &current_entry = entries_[bot_internal_.value];
current_entry->fill_slots(offered_object, expected_stamp);
current_entry.fill_slots(offered_object, expected_stamp);
bot_internal_.stamp++;
bot_internal_.value++;
......@@ -157,28 +182,63 @@ class bounded_trading_deque {
// Go one step back
bot_internal_.value--;
auto *current_entry = entries_[bot_internal_.value];
optional<TradedType *> traded_object = current_entry->acquire_traded_type();
auto &current_entry = entries_[bot_internal_.value];
optional<TradedType *> traded_object = current_entry.acquire_traded_type();
optional<EntryType *> queue_entry;
if (traded_object) {
// We do not return an entry, but the traded object
queue_entry = {};
queue_entry = optional<EntryType *>{};
} else {
// We still got it locally, grab the object
queue_entry = {current_entry->get_object()};
// Keep the tag up to date (we must re-use it, as the head is just increasing by steps of one from the beginning)
bot_internal_.stamp--;
queue_entry = optional<EntryType *>{current_entry.get_object()};
}
bot_.store(bot_internal_.value, std::memory_order_relaxed);
if (bot_internal_.value == 0) {
bot_internal_.stamp++;
top_.store({bot_internal_.stamp, 0}, std::memory_order_release);
}
return pop_result{queue_entry, traded_object};
}
std::tuple<optional<EntryType *>, stamped_integer> peek_top() {
auto local_top = top_.load();
auto local_bot = bot_.load();
if (local_top.value >= local_bot) {
return std::make_tuple(optional<EntryType *>{}, local_top);
} else {
return std::make_tuple(optional<EntryType *>{entries_[local_top.value].get_object()}, local_top);
}
}
optional<EntryType *> pop_top(TradedType *trade_offer) {
auto local_top = top_.load();
optional<EntryType *> entry = entries_[local_top.value].trade_object(trade_offer, local_top.stamp);
return pop_top(trade_offer, local_top);
}
optional<EntryType *> pop_top(TradedType *trade_offer, stamped_integer local_top) {
auto local_bot = bot_.load();
if (local_top.value >= local_bot) {
return optional<EntryType *>{};
}
unsigned long expected_top_stamp = local_top.stamp;
optional<EntryType *> entry = entries_[local_top.value].trade_object(trade_offer, expected_top_stamp);
if (entry) {
// We got it, for sure move the top pointer forward.
top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1});
} else {
// We did not get it....
if (entries_[local_top.value].is_empty()) {
// ...update the top stamp, so the next call can get it (we still make system progress, as the owner
// must have popped off the element)
top_.compare_exchange_strong(local_top, {expected_top_stamp, local_top.value});
} else {
// ...move the pointer forward if someone else put a valid trade object in there.
top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1});
}
}
return entry;
}
......@@ -193,6 +253,17 @@ class bounded_trading_deque {
stamped_integer bot_internal_{0, 0};
};
template<typename EntryType, typename TradedType, size_t SIZE>
class static_bounded_trading_deque {
public:
static_bounded_trading_deque() : items_{}, deque_{items_.data(), SIZE} {}
bounded_trading_deque<EntryType, TradedType> &get_deque() { return deque_; }
private:
std::array<trading_deque_entry<EntryType, TradedType>, SIZE> items_;
bounded_trading_deque<EntryType, TradedType> deque_;
};
}
}
}
......
......@@ -19,15 +19,38 @@ namespace data_structures {
* Takes care of the de-construction the contained object if one is active.
*/
template<typename T>
class delayed_initialization {
class alignas(alignof(T)) delayed_initialization {
public:
delayed_initialization() : memory_{}, initialized_{false} {}
delayed_initialization(const delayed_initialization &) = delete;
delayed_initialization(delayed_initialization &&other) noexcept {
initialized_ = other.initialized_;
if (other.initialized_) {
if (other.initialized()) {
new((void *) memory_.data()) T(std::move(other.object()));
other.initialized_ = false;
initialized_ = true;
}
}
delayed_initialization &operator=(const delayed_initialization &) = delete;
delayed_initialization &operator=(delayed_initialization &&other) noexcept {
if (&other == this) {
return *this;
}
if (initialized() && other.initialized()) {
object() = std::move(other.object());
other.initialized_ = false;
initialized_ = true;
return *this;
}
if (!initialized() && other.initialized_) {
new((void *) memory_.data()) T(std::move(other.object()));
other.initialized_ = false;
initialized_ = true;
return *this;
}
return *this;
}
template<typename ...ARGS>
......@@ -62,14 +85,24 @@ class delayed_initialization {
return *reinterpret_cast<T *>(memory_.data());
}
const T &object() const {
PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!");
return *reinterpret_cast<const T *>(memory_.data());
}
T &operator*() {
return object();
}
const T &operator*() const {
return object();
}
bool initialized() const { return initialized_; }
private:
std::array<char, sizeof(T)> memory_;
alignas(alignof(T)) std::array<char, sizeof(T)> memory_;
bool initialized_;
};
......
......@@ -3,6 +3,7 @@
#define PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_
#include <utility>
#include <type_traits>
#include "pls/internal/data_structures/delayed_initialization.h"
......@@ -14,6 +15,38 @@ template<typename T>
class optional {
public:
optional() = default;
optional(optional &other) noexcept : optional(const_cast<const optional &>(other)) {};
optional(const optional &other) noexcept {
if (other) {
data_.initialize(other.data_.object());
}
}
optional(optional &&other) noexcept {
data_ = std::move(other.data_);
}
optional &operator=(const optional &other) {
if (&other == this) {
return *this;
}
if (data_.initialized()) {
data_.destroy();
}
if (other) {
data_.initialize(other.data_.object());
}
return *this;
}
optional &operator=(optional &&other) noexcept {
if (&other == this) {
return *this;
}
data_ = std::move(other.data_);
return *this;
}
template<typename ...ARGS>
explicit optional(ARGS &&...args): data_{std::forward<ARGS>(args)...} {}
......
......@@ -46,6 +46,7 @@ class base_cont {
* Will store the result in it's parent, but not mess with any counters.
*/
virtual void execute_task() = 0;
virtual base_task *get_task() = 0;
virtual void *get_right_result_pointer() = 0;
virtual void *get_left_result_pointer() = 0;
......@@ -120,14 +121,19 @@ class cont : public base_cont {
void execute() override {
using result_type = decltype(function_((*left_result_).value(), (*right_result_).value()));
result_runner<result_type>::execute(*this);
this->get_memory_block()->free_buffer();
this->get_memory_block()->reset_state();
this->~cont();
auto *memory_block = this->get_memory_block();
memory_block->free_buffer();
memory_block->reset_state();
}
void execute_task() override {
task_.execute();
}
base_task *get_task() override {
return &task_;
}
void *get_left_result_pointer() override {
return &left_result_;
......@@ -136,10 +142,6 @@ class cont : public base_cont {
return &right_result_;
}
T2 *get_task() {
return &task_;
}
private:
// Initial data members. These slow down the fast path, try to init them lazy when possible.
F function_;
......
......@@ -63,6 +63,25 @@ class cont_manager {
return active_node_;
}
bool is_clean() {
if (get_active_node()->get_depth() == 0) {
memory_block *current_node = active_node_;
for (size_t i = 1; i < num_conts_; i++) {
if (current_node->get_prev() != nullptr && current_node->get_prev()->get_next() != current_node) {
return false;
}
if (current_node->is_buffer_used()) {
return false;
}
current_node = current_node->get_next();
}
} else {
return false;
}
return true;
}
// Manage the fall through behaviour/slow path behaviour
bool falling_through() const {
return fall_through_;
......@@ -93,23 +112,24 @@ class cont_manager {
fall_through_ = false;
// Keep the target chain before we execute, as this potentially frees the memory
auto *target_chain = notified_cont->get_memory_block()->get_offered_chain().load();
auto *target_memory_block = notified_cont->get_memory_block();
auto *target_chain = target_memory_block->get_offered_chain().load();
// Notify the next continuation of finishing a child...
if (notified_cont->get_memory_block()->get_results_missing().fetch_add(-1) == 1) {
if (target_memory_block->get_results_missing().fetch_add(-1) == 1) {
// ... we finished the continuation.
// We are now in charge continuing to execute the above continuation chain.
PLS_ASSERT(active_node_->get_prev()->get_depth() == notified_cont->get_memory_block()->get_depth(),
PLS_ASSERT(active_node_->get_prev()->get_depth() == target_memory_block->get_depth(),
"We must hold the system invariant to be in the correct depth.")
if (active_node_->get_prev() != notified_cont->get_memory_block()) {
if (active_node_->get_prev() != target_memory_block) {
// We do not own the thing we will execute.
// Own it by swapping the chain belonging to it in.
aquire_memory_chain(notified_cont->get_memory_block());
aquire_memory_chain(target_memory_block);
}
my_state.parent_cont_ = notified_cont->get_parent();
my_state.right_spawn_ = notified_cont->is_right_child();
active_node_ = notified_cont->get_memory_block();
active_node_ = target_memory_block;
notified_cont->execute();
if (!falling_through() && notified_cont->get_parent() != nullptr) {
fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child());
......@@ -119,9 +139,9 @@ class cont_manager {
// ... we did not finish the last continuation.
// We are no longer in charge of executing the above continuation chain.
PLS_ASSERT(active_node_->get_prev()->get_depth() == notified_cont->get_memory_block()->get_depth(),
PLS_ASSERT(active_node_->get_prev()->get_depth() == target_memory_block->get_depth(),
"We must hold the system invariant to be in the correct depth.")
if (active_node_->get_prev() == notified_cont->get_memory_block()) {
if (active_node_->get_prev() == target_memory_block) {
// We own the thing we are not allowed to execute.
// Get rid of the ownership by using the offered chain.
aquire_memory_chain(target_chain);
......
......@@ -23,19 +23,22 @@ class memory_block {
: prev_{prev},
next_{nullptr},
offered_chain_{nullptr},
state_{{initialized}},
results_missing_{2},
memory_buffer_{memory_buffer},
memory_buffer_size_{memory_buffer_size},
memory_buffer_used_{false},
depth_{depth} {};
depth_{depth},
owner_{0} {};
template<typename T, typename ...ARGS>
T *place_in_buffer(ARGS &&...args) {
PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node.");
memory_buffer_used_ = true;
return new(memory_buffer_) T(std::forward<ARGS>(args)...);
auto *result = new(memory_buffer_) T(std::forward<ARGS>(args)...);
continuation_ = result;
return result;
}
void free_buffer() {
PLS_ASSERT(memory_buffer_used_, "Can only free a memory spot when it is in use.")
......@@ -44,14 +47,10 @@ class memory_block {
bool is_buffer_used() {
return memory_buffer_used_;
}
// TODO: Fit the reset somewhere!!!
// // Reset Associated counters
// results_missing_.store(2);
// offered_chain_.store(nullptr);
// auto old_state = state_.load();
// state_.store({old_state.stamp + 1, initialized});
base_cont *get_cont() {
PLS_ASSERT(is_buffer_used(), "Can only read initialized buffer!");
return continuation_;
}
memory_block *get_prev() {
return prev_;
......@@ -66,13 +65,6 @@ class memory_block {
next_ = next;
}
enum state { initialized, execute_local, stealing, stolen, invalid };
using stamped_state = data_structures::stamped_integer;
std::atomic<stamped_state> &get_state() {
return state_;
}
std::atomic<memory_block *> &get_offered_chain() {
return offered_chain_;
}
......@@ -87,11 +79,16 @@ class memory_block {
void reset_state() {
offered_chain_.store(nullptr);
auto old_state = state_.load();
state_.store({old_state.stamp + 1, initialized});
results_missing_.store(2);
}
void set_owner(int owner) {
owner_ = owner;
}
int get_owner() {
return owner_;
}
private:
// Linked list property of memory blocks (a complete list represents a threads currently owned memory).
// Each block knows its chain start to allow stealing a whole chain in O(1)
......@@ -103,13 +100,6 @@ class memory_block {
// For this we need the offered chain's element up to the point we can steal.
std::atomic<memory_block *> offered_chain_;
// The flag is needed for an ongoing stealing request.
// Stealing threads need to offer their memory block chain before the
// 'fully' own the stolen task. As long as that is not done the memory block
// chain can abort the steal request in order to be not blocked without a
// new, clean memory block chain to work with.
std::atomic<stamped_state> state_;
// Management for coordinating concurrent result writing and stealing.
// The result count decides atomically who gets to execute the continuation
// and who therefore get's to own this memory block chain.
......@@ -120,6 +110,8 @@ class memory_block {
// This memory is managed explicitly by the continuation manager and runtime system
// (they need to make sure to always call de-constructors and never allocate two continuations).
char *memory_buffer_;
base_cont *continuation_;
// These two are only helper properties helping with bugs during development.
size_t memory_buffer_size_;
bool memory_buffer_used_;
......@@ -128,6 +120,9 @@ class memory_block {
// Swapping parts of a memory chain will not reorder it, as always parts of
// the same size are exchanged.
const int depth_;
// TODO: Remove, debug only
int owner_;
};
}
......
......@@ -49,6 +49,7 @@ struct scheduler::starter {
const bool is_right_cont = my_state.right_spawn_;
base_cont *parent_cont = my_state.parent_cont_;
current_memory_block->set_owner(my_state.get_id());
continuation_type *current_cont = current_memory_block->place_in_buffer<continuation_type>(parent_cont,
current_memory_block,
is_right_cont,
......@@ -63,19 +64,32 @@ struct scheduler::starter {
my_state.right_spawn_ = false;
return_type_1 result_1 = function_1_();
if (cont_manager.falling_through()) {
// Get our replacement from the task stack and store it for later use when we are actually blocked.
auto traded_memory = my_state.get_task_manager().try_pop_local();
current_cont->get_memory_block()->get_offered_chain().store(*traded_memory);
// Unwind stack...
return result_type{};
}
// Try to call second function on fast path
if (my_state.get_task_manager().steal_local_task()) {
auto traded_memory = my_state.get_task_manager().try_pop_local();
if (traded_memory) {
// The task got stolen...get_memory_block
// ...but we got a memory block that can be used if we block on this one.
current_cont->get_memory_block()->get_offered_chain().store(*traded_memory);
// Main scheduling loop is responsible for entering the result to the slow path...
current_cont->store_left_result(std::move(result_1));
cont_manager.fall_through_and_notify_cont(current_cont, false);
// Unwind stack...
return result_type{};
} else {
my_state.right_spawn_ = true;
return_type_2 result_2 = function_2_();
if (cont_manager.falling_through()) {
// Main scheduling loop is responsible for entering the result to the slow path...
current_cont->store_left_result(std::move(result_1));
auto old_state = current_cont->get_memory_block()->get_state().load();
current_cont->get_memory_block()->get_state().store({old_state.stamp + 1, memory_block::invalid});
current_cont->get_memory_block()->get_results_missing().fetch_add(-1);
// Unwind stack...
return result_type{};
......@@ -101,12 +115,6 @@ struct scheduler::starter {
}
return cont_result;
}
// Main scheduling loop is responsible for entering the result to the slow path...
current_cont->store_left_result(std::move(result_1));
cont_manager.fall_through_and_notify_cont(current_cont, false);
// Unwind stack...
return result_type{};
};
};
......
......@@ -28,7 +28,7 @@ class scheduler_memory {
virtual thread_state &thread_state_for(size_t id) = 0;
};
template<size_t MAX_THREADS, size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
template<size_t MAX_THREADS, size_t NUM_TASKS, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
class static_scheduler_memory : public scheduler_memory {
public:
size_t max_threads() const override {
......@@ -44,7 +44,7 @@ class static_scheduler_memory : public scheduler_memory {
}
private:
using thread_state_type = thread_state_static<NUM_TASKS, MAX_TASK_STACK_SIZE, NUM_CONTS, MAX_CONT_SIZE>;
using thread_state_type = thread_state_static<NUM_TASKS, NUM_CONTS, MAX_CONT_SIZE>;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<base::thread, MAX_THREADS> threads_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state_type, MAX_THREADS> thread_states_;
......@@ -78,7 +78,7 @@ class heap_scheduler_memory : public scheduler_memory {
}
private:
using thread_state_type = thread_state_static<NUM_TASKS, MAX_TASK_STACK_SIZE, NUM_CONTS, MAX_CONT_SIZE>;
using thread_state_type = thread_state_static<NUM_TASKS, NUM_CONTS, MAX_CONT_SIZE>;
// thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require
// the new operator to obey alignments bigger than 16, cache lines are usually 64).
// To allow this object to be allocated using 'new' (which the vector does internally),
......
......@@ -12,8 +12,9 @@
#include "pls/internal/scheduling/cont_manager.h"
#include "pls/internal/scheduling/memory_block.h"
#include "pls/internal/data_structures/bounded_ws_deque.h"
#include "pls/internal/data_structures/bounded_trading_deque.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/data_structures/optional.h"
#include "pls/internal/base/spin_lock.h"
......@@ -21,19 +22,6 @@ namespace pls {
namespace internal {
namespace scheduling {
struct task_handle {
public:
task_handle() : task_{nullptr}, task_memory_block_{nullptr} {};
explicit task_handle(base_task *task) : task_{task},
task_memory_block_{task->get_cont()->get_memory_block()} {};
base_task *task_;
// This seems redundant first, but is needed for a race-free steal.
// It could happen that the task's memory is overwritten and the pointer to it's memory block gets invalid.
// We can do this more elegantly in the future.
memory_block *task_memory_block_;
};
/**
* Handles management of tasks in the system. Each thread has a local task manager,
* responsible for allocating, freeing and publishing tasks for stealing.
......@@ -42,60 +30,60 @@ class task_manager {
public:
// Publishes a task on the stack, i.e. makes it visible for other threads to steal.
void publish_task(base_task *task) {
std::lock_guard<base::spin_lock> lock{lock_};
task_deque_.push_bottom(task_handle{task});
// std::lock_guard<base::spin_lock> lock{lock_};
task_deque_.push_bot(task->get_cont()->get_memory_block());
}
// Try to pop a local task from this task managers stack.
bool steal_local_task() {
std::lock_guard<base::spin_lock> lock{lock_};
return task_deque_.pop_bottom();
data_structures::optional<memory_block *> try_pop_local() {
// std::lock_guard<base::spin_lock> lock{lock_};
return task_deque_.pop_bot().traded_;
}
// Try to steal a task from a remote task_manager instance. The stolen task must be stored locally.
// Returns a pair containing the actual task and if the steal was successful.
base_task *steal_remote_task(cont_manager &stealing_cont_manager) {
std::lock_guard<base::spin_lock> lock{lock_};
// TODO: See if we can somehow make this trade lock free (and still be correct)
// std::lock_guard<base::spin_lock> lock{lock_};
auto peek = task_deque_.peek_top();
auto stolen_task_handle = task_deque_.pop_top();
if (stolen_task_handle) {
base_task *stolen_task = (*stolen_task_handle).task_;
memory_block *stolen_task_memory = (*stolen_task_handle).task_memory_block_;
auto stolen_task_depth = stolen_task_memory->get_depth();
auto &atomic_state = stolen_task_memory->get_state();
auto &atomic_offered_chain = stolen_task_memory->get_offered_chain();
if (std::get<0>(peek)) {
memory_block *peeked_memory_block = (*std::get<0>(peek));
auto peeked_depth = peeked_memory_block->get_depth();
// TODO: We ignore all we tried with lock free implementations here, just store the state how it is supposed to be
stealing_cont_manager.move_active_node(stolen_task_depth);
stealing_cont_manager.move_active_node(peeked_depth);
auto offered_chain = stealing_cont_manager.get_active_node();
stealing_cont_manager.move_active_node(1);
atomic_offered_chain.store(offered_chain);
atomic_state.store(memory_block::stolen);
return stolen_task;
auto stolen_memory_block = task_deque_.pop_top(offered_chain, std::get<1>(peek));
if (stolen_memory_block) {
PLS_ASSERT(*stolen_memory_block == peeked_memory_block, "Steal must only work if it is equal!");
return (*stolen_memory_block)->get_cont()->get_task();
} else {
stealing_cont_manager.move_active_node(-(peeked_depth + 1));
return nullptr;
}
}
explicit task_manager(data_structures::bounded_ws_deque<task_handle> &task_deque) : task_deque_{task_deque},
lock_{} {}
return nullptr;
}
explicit task_manager(data_structures::bounded_trading_deque<memory_block, memory_block> &task_deque) :
task_deque_{task_deque} {}
private:
data_structures::bounded_ws_deque<task_handle> &task_deque_;
base::spin_lock lock_;
data_structures::bounded_trading_deque<memory_block, memory_block> &task_deque_;
base::spin_lock lock_{};
};
template<size_t NUM_TASKS, size_t MAX_STACK_SIZE>
template<size_t NUM_TASKS>
class static_task_manager {
public:
static_task_manager() : task_deque_{}, task_manager_{task_deque_.get_deque()} {};
task_manager &get_task_manager() { return task_manager_; }
private:
data_structures::static_bounded_ws_deque<task_handle, NUM_TASKS> task_deque_;
data_structures::static_bounded_trading_deque<memory_block, memory_block, NUM_TASKS> task_deque_;
task_manager task_manager_;
};
......
......@@ -11,7 +11,7 @@ namespace pls {
namespace internal {
namespace scheduling {
template<size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
template<size_t NUM_TASKS, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
struct thread_state_static {
public:
thread_state_static()
......@@ -21,7 +21,7 @@ struct thread_state_static {
thread_state &get_thread_state() { return thread_state_; }
private:
static_task_manager<NUM_TASKS, MAX_TASK_STACK_SIZE> static_task_manager_;
static_task_manager<NUM_TASKS> static_task_manager_;
static_cont_manager<NUM_CONTS, MAX_CONT_SIZE> static_cont_manager_;
thread_state thread_state_;
};
......
......@@ -83,7 +83,7 @@ void scheduler::work_thread_work_section() {
auto &target_state = my_state.get_scheduler().thread_state_for(target);
PLS_ASSERT(my_cont_manager.get_active_node()->get_depth() == 0, "Only steal with clean chain!");
PLS_ASSERT(my_cont_manager.is_clean(), "Only steal with clean chain!");
auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager);
if (stolen_task != nullptr) {
my_state.parent_cont_ = stolen_task->get_cont();
......@@ -99,6 +99,7 @@ void scheduler::work_thread_work_section() {
}
} while (!work_section_done_);
PLS_ASSERT(my_cont_manager.is_clean(), "Only finish work section with clean chain!");
}
void scheduler::terminate() {
......
......@@ -3,6 +3,7 @@
#include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/bounded_trading_deque.h"
using namespace pls::internal::data_structures;
using namespace pls::internal::base;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment