From e34ea267e61d2bb3a6f9906d08d38b156f0a9f1e Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Wed, 4 Dec 2019 17:07:28 +0100 Subject: [PATCH] Working version of our trading-deque --- app/benchmark_fft/main.cpp | 22 ++++++++++------------ app/playground/main.cpp | 30 +++++++++++++++++------------- lib/pls/include/pls/internal/base/error_handling.h | 4 +++- lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------- lib/pls/include/pls/internal/data_structures/delayed_initialization.h | 41 +++++++++++++++++++++++++++++++++++++---- lib/pls/include/pls/internal/data_structures/optional.h | 33 +++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/cont.h | 14 ++++++++------ lib/pls/include/pls/internal/scheduling/cont_manager.h | 36 ++++++++++++++++++++++++++++-------- lib/pls/include/pls/internal/scheduling/memory_block.h | 49 ++++++++++++++++++++++--------------------------- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 26 +++++++++++++++++--------- lib/pls/include/pls/internal/scheduling/scheduler_memory.h | 6 +++--- lib/pls/include/pls/internal/scheduling/task_manager.h | 72 ++++++++++++++++++++++++++++++------------------------------------------ lib/pls/include/pls/internal/scheduling/thread_state_static.h | 4 ++-- lib/pls/src/internal/scheduling/scheduler.cpp | 3 ++- test/data_structures_test.cpp | 1 + 15 files changed, 315 insertions(+), 159 deletions(-) diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index 534e7d6..e46370d 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -90,11 +90,10 @@ complex_vector prepare_input(int input_size) { return data; } -static constexpr int NUM_ITERATIONS = 500; -constexpr size_t NUM_THREADS = 8; +static constexpr int NUM_ITERATIONS = 1000; +constexpr size_t NUM_THREADS = 5; constexpr size_t NUM_TASKS = 128; -constexpr size_t MAX_TASK_STACK_SIZE = 0; constexpr size_t NUM_CONTS = 128; constexpr size_t MAX_CONT_SIZE = 512; @@ -104,7 +103,6 @@ int main() { static_scheduler_memory static_scheduler_memory; @@ -127,14 +125,14 @@ int main() { std::cout << "Framework: " << std::chrono::duration_cast(end - start).count() << std::endl; -// start = std::chrono::steady_clock::now(); -// for (int i = 0; i < NUM_ITERATIONS; i++) { -// complex_vector input_1(initial_input); -// fft_normal(input_1.begin(), INPUT_SIZE); -// } -// end = std::chrono::steady_clock::now(); -// std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() -// << std::endl; + start = std::chrono::steady_clock::now(); + for (int i = 0; i < NUM_ITERATIONS; i++) { + complex_vector input_1(initial_input); + fft_normal(input_1.begin(), INPUT_SIZE); + } + end = std::chrono::steady_clock::now(); + std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() + << std::endl; return 0; } diff --git a/app/playground/main.cpp b/app/playground/main.cpp index fe090bc..4db3a7b 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -4,14 +4,14 @@ #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/parallel_result.h" #include "pls/internal/scheduling/scheduler_memory.h" -#include "pls/internal/base/thread.h" +#include "pls/internal/data_structures/bounded_trading_deque.h" using namespace pls::internal; constexpr size_t NUM_THREADS = 1; constexpr size_t NUM_TASKS = 128; -constexpr size_t MAX_TASK_STACK_SIZE = 0; +static constexpr int NUM_ITERATIONS = 100; constexpr size_t NUM_CONTS = 128; constexpr size_t MAX_CONT_SIZE = 256; @@ -45,33 +45,37 @@ scheduling::parallel_result fib(int n) { }); } +static volatile int result; int main() { scheduling::static_scheduler_memory static_scheduler_memory; scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS}; auto start = std::chrono::steady_clock::now(); - std::cout << "fib = " << fib_normal(30) << std::endl; + for (int i = 0; i < NUM_ITERATIONS; i++) { + result = fib_normal(30); + } auto end = std::chrono::steady_clock::now(); std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() << std::endl; start = std::chrono::steady_clock::now(); - scheduler.perform_work([]() { - return scheduling::scheduler::par([]() { - return scheduling::parallel_result(0); - }, []() { - return fib(30); - }).then([](int, int b) { - std::cout << "fib = " << b << std::endl; - return scheduling::parallel_result{0}; + for (int i = 0; i < NUM_ITERATIONS; i++) { + scheduler.perform_work([]() { + return scheduling::scheduler::par([]() { + return scheduling::parallel_result(0); + }, []() { + return fib(30); + }).then([](int, int b) { + result = b; + return scheduling::parallel_result{0}; + }); }); - }); + } end = std::chrono::steady_clock::now(); std::cout << "Framework: " << std::chrono::duration_cast(end - start).count() << std::endl; diff --git a/lib/pls/include/pls/internal/base/error_handling.h b/lib/pls/include/pls/internal/base/error_handling.h index e3b7737..89fa220 100644 --- a/lib/pls/include/pls/internal/base/error_handling.h +++ b/lib/pls/include/pls/internal/base/error_handling.h @@ -15,6 +15,8 @@ void pls_error(const char *msg); -#define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); } +// TODO: Distinguish between debug/internal asserts and production asserts. +// TODO: Re-Enable Asserts +#define PLS_ASSERT(cond, msg) //if (!(cond)) { pls_error(msg); } #endif //PLS_ERROR_HANDLING_H diff --git a/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h b/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h index 1829a46..6f77d39 100644 --- a/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h +++ b/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h @@ -3,7 +3,7 @@ #define PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_TRADING_DEQUE_H_ #include -#include +#include #include "pls/internal/base/error_handling.h" #include "pls/internal/base/system_details.h" @@ -17,31 +17,45 @@ namespace data_structures { template class traded_field { + static_assert(base::system_details::CACHE_LINE_SIZE >= 4, + "Traded objects must not use their last address bits, as we use them for status flags." + "As traded objects are usually cache aligned, we need big enough cache lines."); // TODO: Replace unsigned long with a portable sized integer // (some systems might have different pointer sizes to long sizes). + static constexpr unsigned long SHIFT = 0x2lu; + static constexpr unsigned long TAG_BITS = 0x3lu; + static constexpr unsigned long RELEVANT_BITS = ~TAG_BITS; + static constexpr unsigned long EMPTY_TAG = 0x0lu; + static constexpr unsigned long STAMP_TAG = 0x1lu; + static constexpr unsigned long TRADE_TAG = 0x2lu; - void fill_with_tag(unsigned long tag) { - pointer_ = (void *) ((tag << 1lu) | 0x1lu); + public: + void fill_with_stamp(unsigned long stamp) { + pointer_ = (void *) ((stamp << SHIFT) | STAMP_TAG); } - unsigned long get_tag() { + unsigned long get_stamp() { PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one."); - return ((unsigned long) (pointer_)) >> 1lu; + return ((unsigned long) pointer_) >> SHIFT; } bool is_filled_with_tag() { - return ((unsigned long) (pointer_)) & 0x1lu; + return (((unsigned long) pointer_) & TAG_BITS) == STAMP_TAG; } - void fill_with_object(TradedType *object) { - PLS_ASSERT((object & 0x1lu) == 0, + void fill_with_trade_object(TradedType *trade_object) { + PLS_ASSERT((((unsigned long) trade_object) & TAG_BITS) == 0, "Must only store aligned objects in this data structure (last bits are needed for tag bit)"); - pointer_ = object; + pointer_ = reinterpret_cast(((unsigned long) trade_object) | TRADE_TAG); } - TradedType *get_object() { + TradedType *get_trade_object() { PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one."); - return pointer_; + return reinterpret_cast(((unsigned long) pointer_) & RELEVANT_BITS); } bool is_filled_with_object() { - return !is_filled_with_tag() && pointer_ != nullptr; + return (((unsigned long) pointer_) & TAG_BITS) == TRADE_TAG; + } + + bool is_empty() { + return (((unsigned long) pointer_) & TAG_BITS) == EMPTY_TAG; } private: @@ -50,19 +64,21 @@ class traded_field { template class alignas(base::system_details::CACHE_LINE_SIZE) trading_deque_entry { + public: /* * Fill the slot with its initial values, making it ready for being stolen. * Performs no synchronization/memory ordering constraints. * * Method is called to init a field on pushBot. */ - void fill_slots(EntryType *entry_item, unsigned long tag) { + void fill_slots(EntryType *entry_item, unsigned long expected_stamp) { entry_slot_.store(entry_item, std::memory_order_relaxed); + forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); // Relaxed is fine for this, as adding elements is synced over the bot pointer auto old = trade_slot_.load(std::memory_order_relaxed); - old.fill_with_tag(tag); - trade_slot_.store(std::memory_order_relaxed); + old.fill_with_stamp(expected_stamp); + trade_slot_.store(old, std::memory_order_relaxed); } /** @@ -78,7 +94,7 @@ class alignas(base::system_details::CACHE_LINE_SIZE) trading_deque_entry { if (old_field_value.is_filled_with_tag()) { return optional(); } else { - return optional(old_field_value.get_object()); + return optional(old_field_value.get_trade_object()); } } @@ -86,25 +102,34 @@ class alignas(base::system_details::CACHE_LINE_SIZE) trading_deque_entry { return entry_slot_; } - optional trade_object(TradedType *offered_object, unsigned long expected_tag) { + bool is_empty() { + return trade_slot_.load(std::memory_order_seq_cst).is_empty(); + } + + optional trade_object(TradedType *offered_object, unsigned long &expected_stamp) { // Read our potential result EntryType *result = entry_slot_.load(std::memory_order_relaxed); + unsigned long forwarding_stamp = forwarding_stamp_.load(std::memory_order_relaxed); // Try to get it by CAS with the expected field entry, giving up our offered_object for it traded_field expected_field; - expected_field.fill_with_tag(expected_tag); + expected_field.fill_with_stamp(expected_stamp); traded_field offered_field; - offered_field.fill_with_object(offered_object); + offered_field.fill_with_trade_object(offered_object); if (trade_slot_.compare_exchange_strong(expected_field, offered_field, std::memory_order_acq_rel)) { - return optional(result); + return optional{result}; } else { - return optional(nullptr); + if (expected_field.is_empty()) { + expected_stamp = forwarding_stamp; + } + return optional{}; } } private: std::atomic entry_slot_{nullptr}; + std::atomic forwarding_stamp_{}; std::atomic> trade_slot_{}; }; @@ -132,9 +157,9 @@ class bounded_trading_deque { void push_bot(EntryType *offered_object) { auto expected_stamp = bot_internal_.stamp; - auto *current_entry = entries_[bot_internal_.value]; + auto ¤t_entry = entries_[bot_internal_.value]; - current_entry->fill_slots(offered_object, expected_stamp); + current_entry.fill_slots(offered_object, expected_stamp); bot_internal_.stamp++; bot_internal_.value++; @@ -157,28 +182,63 @@ class bounded_trading_deque { // Go one step back bot_internal_.value--; - auto *current_entry = entries_[bot_internal_.value]; - optional traded_object = current_entry->acquire_traded_type(); + auto ¤t_entry = entries_[bot_internal_.value]; + optional traded_object = current_entry.acquire_traded_type(); optional queue_entry; if (traded_object) { // We do not return an entry, but the traded object - queue_entry = {}; + queue_entry = optional{}; } else { // We still got it locally, grab the object - queue_entry = {current_entry->get_object()}; - // Keep the tag up to date (we must re-use it, as the head is just increasing by steps of one from the beginning) - bot_internal_.stamp--; + queue_entry = optional{current_entry.get_object()}; } bot_.store(bot_internal_.value, std::memory_order_relaxed); + if (bot_internal_.value == 0) { + bot_internal_.stamp++; + top_.store({bot_internal_.stamp, 0}, std::memory_order_release); + } return pop_result{queue_entry, traded_object}; } + std::tuple, stamped_integer> peek_top() { + auto local_top = top_.load(); + auto local_bot = bot_.load(); + if (local_top.value >= local_bot) { + return std::make_tuple(optional{}, local_top); + } else { + return std::make_tuple(optional{entries_[local_top.value].get_object()}, local_top); + } + } + optional pop_top(TradedType *trade_offer) { auto local_top = top_.load(); - optional entry = entries_[local_top.value].trade_object(trade_offer, local_top.stamp); - top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + return pop_top(trade_offer, local_top); + } + + optional pop_top(TradedType *trade_offer, stamped_integer local_top) { + auto local_bot = bot_.load(); + if (local_top.value >= local_bot) { + return optional{}; + } + + unsigned long expected_top_stamp = local_top.stamp; + optional entry = entries_[local_top.value].trade_object(trade_offer, expected_top_stamp); + if (entry) { + // We got it, for sure move the top pointer forward. + top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + } else { + // We did not get it.... + if (entries_[local_top.value].is_empty()) { + // ...update the top stamp, so the next call can get it (we still make system progress, as the owner + // must have popped off the element) + top_.compare_exchange_strong(local_top, {expected_top_stamp, local_top.value}); + } else { + // ...move the pointer forward if someone else put a valid trade object in there. + top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + } + } return entry; } @@ -193,6 +253,17 @@ class bounded_trading_deque { stamped_integer bot_internal_{0, 0}; }; +template +class static_bounded_trading_deque { + public: + static_bounded_trading_deque() : items_{}, deque_{items_.data(), SIZE} {} + + bounded_trading_deque &get_deque() { return deque_; } + private: + std::array, SIZE> items_; + bounded_trading_deque deque_; +}; + } } } diff --git a/lib/pls/include/pls/internal/data_structures/delayed_initialization.h b/lib/pls/include/pls/internal/data_structures/delayed_initialization.h index ec815bd..4dc1eaf 100644 --- a/lib/pls/include/pls/internal/data_structures/delayed_initialization.h +++ b/lib/pls/include/pls/internal/data_structures/delayed_initialization.h @@ -19,15 +19,38 @@ namespace data_structures { * Takes care of the de-construction the contained object if one is active. */ template -class delayed_initialization { +class alignas(alignof(T)) delayed_initialization { public: delayed_initialization() : memory_{}, initialized_{false} {} + delayed_initialization(const delayed_initialization &) = delete; delayed_initialization(delayed_initialization &&other) noexcept { - initialized_ = other.initialized_; - if (other.initialized_) { + if (other.initialized()) { + new((void *) memory_.data()) T(std::move(other.object())); + other.initialized_ = false; + initialized_ = true; + } + } + delayed_initialization &operator=(const delayed_initialization &) = delete; + delayed_initialization &operator=(delayed_initialization &&other) noexcept { + if (&other == this) { + return *this; + } + + if (initialized() && other.initialized()) { object() = std::move(other.object()); other.initialized_ = false; + initialized_ = true; + return *this; + } + + if (!initialized() && other.initialized_) { + new((void *) memory_.data()) T(std::move(other.object())); + other.initialized_ = false; + initialized_ = true; + return *this; } + + return *this; } template @@ -62,14 +85,24 @@ class delayed_initialization { return *reinterpret_cast(memory_.data()); } + const T &object() const { + PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!"); + + return *reinterpret_cast(memory_.data()); + } + T &operator*() { return object(); } + const T &operator*() const { + return object(); + } + bool initialized() const { return initialized_; } private: - std::array memory_; + alignas(alignof(T)) std::array memory_; bool initialized_; }; diff --git a/lib/pls/include/pls/internal/data_structures/optional.h b/lib/pls/include/pls/internal/data_structures/optional.h index 22f9eca..2a5d287 100644 --- a/lib/pls/include/pls/internal/data_structures/optional.h +++ b/lib/pls/include/pls/internal/data_structures/optional.h @@ -3,6 +3,7 @@ #define PLS_INTERNAL_DATA_STRUCTURES_OPTIONAL_H_ #include +#include #include "pls/internal/data_structures/delayed_initialization.h" @@ -14,6 +15,38 @@ template class optional { public: optional() = default; + optional(optional &other) noexcept : optional(const_cast(other)) {}; + optional(const optional &other) noexcept { + if (other) { + data_.initialize(other.data_.object()); + } + } + optional(optional &&other) noexcept { + data_ = std::move(other.data_); + } + optional &operator=(const optional &other) { + if (&other == this) { + return *this; + } + + if (data_.initialized()) { + data_.destroy(); + } + if (other) { + data_.initialize(other.data_.object()); + } + + return *this; + } + optional &operator=(optional &&other) noexcept { + if (&other == this) { + return *this; + } + + data_ = std::move(other.data_); + + return *this; + } template explicit optional(ARGS &&...args): data_{std::forward(args)...} {} diff --git a/lib/pls/include/pls/internal/scheduling/cont.h b/lib/pls/include/pls/internal/scheduling/cont.h index 9606f71..dd05411 100644 --- a/lib/pls/include/pls/internal/scheduling/cont.h +++ b/lib/pls/include/pls/internal/scheduling/cont.h @@ -46,6 +46,7 @@ class base_cont { * Will store the result in it's parent, but not mess with any counters. */ virtual void execute_task() = 0; + virtual base_task *get_task() = 0; virtual void *get_right_result_pointer() = 0; virtual void *get_left_result_pointer() = 0; @@ -120,14 +121,19 @@ class cont : public base_cont { void execute() override { using result_type = decltype(function_((*left_result_).value(), (*right_result_).value())); result_runner::execute(*this); - this->get_memory_block()->free_buffer(); - this->get_memory_block()->reset_state(); + this->~cont(); + auto *memory_block = this->get_memory_block(); + memory_block->free_buffer(); + memory_block->reset_state(); } void execute_task() override { task_.execute(); } + base_task *get_task() override { + return &task_; + } void *get_left_result_pointer() override { return &left_result_; @@ -136,10 +142,6 @@ class cont : public base_cont { return &right_result_; } - T2 *get_task() { - return &task_; - } - private: // Initial data members. These slow down the fast path, try to init them lazy when possible. F function_; diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h index a4d9941..3fa279c 100644 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ b/lib/pls/include/pls/internal/scheduling/cont_manager.h @@ -63,6 +63,25 @@ class cont_manager { return active_node_; } + bool is_clean() { + if (get_active_node()->get_depth() == 0) { + memory_block *current_node = active_node_; + for (size_t i = 1; i < num_conts_; i++) { + if (current_node->get_prev() != nullptr && current_node->get_prev()->get_next() != current_node) { + return false; + } + if (current_node->is_buffer_used()) { + return false; + } + current_node = current_node->get_next(); + } + } else { + return false; + } + + return true; + } + // Manage the fall through behaviour/slow path behaviour bool falling_through() const { return fall_through_; @@ -93,23 +112,24 @@ class cont_manager { fall_through_ = false; // Keep the target chain before we execute, as this potentially frees the memory - auto *target_chain = notified_cont->get_memory_block()->get_offered_chain().load(); + auto *target_memory_block = notified_cont->get_memory_block(); + auto *target_chain = target_memory_block->get_offered_chain().load(); // Notify the next continuation of finishing a child... - if (notified_cont->get_memory_block()->get_results_missing().fetch_add(-1) == 1) { + if (target_memory_block->get_results_missing().fetch_add(-1) == 1) { // ... we finished the continuation. // We are now in charge continuing to execute the above continuation chain. - PLS_ASSERT(active_node_->get_prev()->get_depth() == notified_cont->get_memory_block()->get_depth(), + PLS_ASSERT(active_node_->get_prev()->get_depth() == target_memory_block->get_depth(), "We must hold the system invariant to be in the correct depth.") - if (active_node_->get_prev() != notified_cont->get_memory_block()) { + if (active_node_->get_prev() != target_memory_block) { // We do not own the thing we will execute. // Own it by swapping the chain belonging to it in. - aquire_memory_chain(notified_cont->get_memory_block()); + aquire_memory_chain(target_memory_block); } my_state.parent_cont_ = notified_cont->get_parent(); my_state.right_spawn_ = notified_cont->is_right_child(); - active_node_ = notified_cont->get_memory_block(); + active_node_ = target_memory_block; notified_cont->execute(); if (!falling_through() && notified_cont->get_parent() != nullptr) { fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child()); @@ -119,9 +139,9 @@ class cont_manager { // ... we did not finish the last continuation. // We are no longer in charge of executing the above continuation chain. - PLS_ASSERT(active_node_->get_prev()->get_depth() == notified_cont->get_memory_block()->get_depth(), + PLS_ASSERT(active_node_->get_prev()->get_depth() == target_memory_block->get_depth(), "We must hold the system invariant to be in the correct depth.") - if (active_node_->get_prev() == notified_cont->get_memory_block()) { + if (active_node_->get_prev() == target_memory_block) { // We own the thing we are not allowed to execute. // Get rid of the ownership by using the offered chain. aquire_memory_chain(target_chain); diff --git a/lib/pls/include/pls/internal/scheduling/memory_block.h b/lib/pls/include/pls/internal/scheduling/memory_block.h index dbcfcd9..42ad2c0 100644 --- a/lib/pls/include/pls/internal/scheduling/memory_block.h +++ b/lib/pls/include/pls/internal/scheduling/memory_block.h @@ -23,19 +23,22 @@ class memory_block { : prev_{prev}, next_{nullptr}, offered_chain_{nullptr}, - state_{{initialized}}, results_missing_{2}, memory_buffer_{memory_buffer}, memory_buffer_size_{memory_buffer_size}, memory_buffer_used_{false}, - depth_{depth} {}; + depth_{depth}, + owner_{0} {}; template T *place_in_buffer(ARGS &&...args) { PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node."); memory_buffer_used_ = true; - return new(memory_buffer_) T(std::forward(args)...); + auto *result = new(memory_buffer_) T(std::forward(args)...); + continuation_ = result; + + return result; } void free_buffer() { PLS_ASSERT(memory_buffer_used_, "Can only free a memory spot when it is in use.") @@ -44,14 +47,10 @@ class memory_block { bool is_buffer_used() { return memory_buffer_used_; } - - // TODO: Fit the reset somewhere!!! -// // Reset Associated counters -// results_missing_.store(2); -// offered_chain_.store(nullptr); -// auto old_state = state_.load(); -// state_.store({old_state.stamp + 1, initialized}); - + base_cont *get_cont() { + PLS_ASSERT(is_buffer_used(), "Can only read initialized buffer!"); + return continuation_; + } memory_block *get_prev() { return prev_; @@ -66,13 +65,6 @@ class memory_block { next_ = next; } - enum state { initialized, execute_local, stealing, stolen, invalid }; - using stamped_state = data_structures::stamped_integer; - - std::atomic &get_state() { - return state_; - } - std::atomic &get_offered_chain() { return offered_chain_; } @@ -87,11 +79,16 @@ class memory_block { void reset_state() { offered_chain_.store(nullptr); - auto old_state = state_.load(); - state_.store({old_state.stamp + 1, initialized}); results_missing_.store(2); } + void set_owner(int owner) { + owner_ = owner; + } + int get_owner() { + return owner_; + } + private: // Linked list property of memory blocks (a complete list represents a threads currently owned memory). // Each block knows its chain start to allow stealing a whole chain in O(1) @@ -103,13 +100,6 @@ class memory_block { // For this we need the offered chain's element up to the point we can steal. std::atomic offered_chain_; - // The flag is needed for an ongoing stealing request. - // Stealing threads need to offer their memory block chain before the - // 'fully' own the stolen task. As long as that is not done the memory block - // chain can abort the steal request in order to be not blocked without a - // new, clean memory block chain to work with. - std::atomic state_; - // Management for coordinating concurrent result writing and stealing. // The result count decides atomically who gets to execute the continuation // and who therefore get's to own this memory block chain. @@ -120,6 +110,8 @@ class memory_block { // This memory is managed explicitly by the continuation manager and runtime system // (they need to make sure to always call de-constructors and never allocate two continuations). char *memory_buffer_; + base_cont *continuation_; + // These two are only helper properties helping with bugs during development. size_t memory_buffer_size_; bool memory_buffer_used_; @@ -128,6 +120,9 @@ class memory_block { // Swapping parts of a memory chain will not reorder it, as always parts of // the same size are exchanged. const int depth_; + + // TODO: Remove, debug only + int owner_; }; } diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index ab2a39a..f28b844 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -49,6 +49,7 @@ struct scheduler::starter { const bool is_right_cont = my_state.right_spawn_; base_cont *parent_cont = my_state.parent_cont_; + current_memory_block->set_owner(my_state.get_id()); continuation_type *current_cont = current_memory_block->place_in_buffer(parent_cont, current_memory_block, is_right_cont, @@ -63,19 +64,32 @@ struct scheduler::starter { my_state.right_spawn_ = false; return_type_1 result_1 = function_1_(); if (cont_manager.falling_through()) { + // Get our replacement from the task stack and store it for later use when we are actually blocked. + auto traded_memory = my_state.get_task_manager().try_pop_local(); + current_cont->get_memory_block()->get_offered_chain().store(*traded_memory); + // Unwind stack... return result_type{}; } // Try to call second function on fast path - if (my_state.get_task_manager().steal_local_task()) { + auto traded_memory = my_state.get_task_manager().try_pop_local(); + if (traded_memory) { + // The task got stolen...get_memory_block + // ...but we got a memory block that can be used if we block on this one. + current_cont->get_memory_block()->get_offered_chain().store(*traded_memory); + + // Main scheduling loop is responsible for entering the result to the slow path... + current_cont->store_left_result(std::move(result_1)); + cont_manager.fall_through_and_notify_cont(current_cont, false); + // Unwind stack... + return result_type{}; + } else { my_state.right_spawn_ = true; return_type_2 result_2 = function_2_(); if (cont_manager.falling_through()) { // Main scheduling loop is responsible for entering the result to the slow path... current_cont->store_left_result(std::move(result_1)); - auto old_state = current_cont->get_memory_block()->get_state().load(); - current_cont->get_memory_block()->get_state().store({old_state.stamp + 1, memory_block::invalid}); current_cont->get_memory_block()->get_results_missing().fetch_add(-1); // Unwind stack... return result_type{}; @@ -101,12 +115,6 @@ struct scheduler::starter { } return cont_result; } - - // Main scheduling loop is responsible for entering the result to the slow path... - current_cont->store_left_result(std::move(result_1)); - cont_manager.fall_through_and_notify_cont(current_cont, false); - // Unwind stack... - return result_type{}; }; }; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h index d6878dd..935c499 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h @@ -28,7 +28,7 @@ class scheduler_memory { virtual thread_state &thread_state_for(size_t id) = 0; }; -template +template class static_scheduler_memory : public scheduler_memory { public: size_t max_threads() const override { @@ -44,7 +44,7 @@ class static_scheduler_memory : public scheduler_memory { } private: - using thread_state_type = thread_state_static; + using thread_state_type = thread_state_static; alignas(base::system_details::CACHE_LINE_SIZE) std::array threads_; alignas(base::system_details::CACHE_LINE_SIZE) std::array thread_states_; @@ -78,7 +78,7 @@ class heap_scheduler_memory : public scheduler_memory { } private: - using thread_state_type = thread_state_static; + using thread_state_type = thread_state_static; // thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require // the new operator to obey alignments bigger than 16, cache lines are usually 64). // To allow this object to be allocated using 'new' (which the vector does internally), diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 4e2b1b3..961cf1b 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -12,8 +12,9 @@ #include "pls/internal/scheduling/cont_manager.h" #include "pls/internal/scheduling/memory_block.h" -#include "pls/internal/data_structures/bounded_ws_deque.h" +#include "pls/internal/data_structures/bounded_trading_deque.h" #include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/data_structures/optional.h" #include "pls/internal/base/spin_lock.h" @@ -21,19 +22,6 @@ namespace pls { namespace internal { namespace scheduling { -struct task_handle { - public: - task_handle() : task_{nullptr}, task_memory_block_{nullptr} {}; - explicit task_handle(base_task *task) : task_{task}, - task_memory_block_{task->get_cont()->get_memory_block()} {}; - - base_task *task_; - // This seems redundant first, but is needed for a race-free steal. - // It could happen that the task's memory is overwritten and the pointer to it's memory block gets invalid. - // We can do this more elegantly in the future. - memory_block *task_memory_block_; -}; - /** * Handles management of tasks in the system. Each thread has a local task manager, * responsible for allocating, freeing and publishing tasks for stealing. @@ -42,60 +30,60 @@ class task_manager { public: // Publishes a task on the stack, i.e. makes it visible for other threads to steal. void publish_task(base_task *task) { - std::lock_guard lock{lock_}; - task_deque_.push_bottom(task_handle{task}); +// std::lock_guard lock{lock_}; + task_deque_.push_bot(task->get_cont()->get_memory_block()); } // Try to pop a local task from this task managers stack. - bool steal_local_task() { - std::lock_guard lock{lock_}; - return task_deque_.pop_bottom(); + data_structures::optional try_pop_local() { +// std::lock_guard lock{lock_}; + return task_deque_.pop_bot().traded_; } // Try to steal a task from a remote task_manager instance. The stolen task must be stored locally. // Returns a pair containing the actual task and if the steal was successful. base_task *steal_remote_task(cont_manager &stealing_cont_manager) { - std::lock_guard lock{lock_}; +// std::lock_guard lock{lock_}; + auto peek = task_deque_.peek_top(); - // TODO: See if we can somehow make this trade lock free (and still be correct) + if (std::get<0>(peek)) { + memory_block *peeked_memory_block = (*std::get<0>(peek)); + auto peeked_depth = peeked_memory_block->get_depth(); - auto stolen_task_handle = task_deque_.pop_top(); - if (stolen_task_handle) { - base_task *stolen_task = (*stolen_task_handle).task_; - memory_block *stolen_task_memory = (*stolen_task_handle).task_memory_block_; - auto stolen_task_depth = stolen_task_memory->get_depth(); - auto &atomic_state = stolen_task_memory->get_state(); - auto &atomic_offered_chain = stolen_task_memory->get_offered_chain(); - - // TODO: We ignore all we tried with lock free implementations here, just store the state how it is supposed to be - stealing_cont_manager.move_active_node(stolen_task_depth); + stealing_cont_manager.move_active_node(peeked_depth); auto offered_chain = stealing_cont_manager.get_active_node(); stealing_cont_manager.move_active_node(1); - atomic_offered_chain.store(offered_chain); - atomic_state.store(memory_block::stolen); - return stolen_task; - } else { - return nullptr; + auto stolen_memory_block = task_deque_.pop_top(offered_chain, std::get<1>(peek)); + if (stolen_memory_block) { + PLS_ASSERT(*stolen_memory_block == peeked_memory_block, "Steal must only work if it is equal!"); + + return (*stolen_memory_block)->get_cont()->get_task(); + } else { + stealing_cont_manager.move_active_node(-(peeked_depth + 1)); + return nullptr; + } } + + return nullptr; } - explicit task_manager(data_structures::bounded_ws_deque &task_deque) : task_deque_{task_deque}, - lock_{} {} + explicit task_manager(data_structures::bounded_trading_deque &task_deque) : + task_deque_{task_deque} {} private: - data_structures::bounded_ws_deque &task_deque_; - base::spin_lock lock_; + data_structures::bounded_trading_deque &task_deque_; + base::spin_lock lock_{}; }; -template +template class static_task_manager { public: static_task_manager() : task_deque_{}, task_manager_{task_deque_.get_deque()} {}; task_manager &get_task_manager() { return task_manager_; } private: - data_structures::static_bounded_ws_deque task_deque_; + data_structures::static_bounded_trading_deque task_deque_; task_manager task_manager_; }; diff --git a/lib/pls/include/pls/internal/scheduling/thread_state_static.h b/lib/pls/include/pls/internal/scheduling/thread_state_static.h index 1a55456..39b1815 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state_static.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state_static.h @@ -11,7 +11,7 @@ namespace pls { namespace internal { namespace scheduling { -template +template struct thread_state_static { public: thread_state_static() @@ -21,7 +21,7 @@ struct thread_state_static { thread_state &get_thread_state() { return thread_state_; } private: - static_task_manager static_task_manager_; + static_task_manager static_task_manager_; static_cont_manager static_cont_manager_; thread_state thread_state_; }; diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 595ebe0..3a3c9c0 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -83,7 +83,7 @@ void scheduler::work_thread_work_section() { auto &target_state = my_state.get_scheduler().thread_state_for(target); - PLS_ASSERT(my_cont_manager.get_active_node()->get_depth() == 0, "Only steal with clean chain!"); + PLS_ASSERT(my_cont_manager.is_clean(), "Only steal with clean chain!"); auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager); if (stolen_task != nullptr) { my_state.parent_cont_ = stolen_task->get_cont(); @@ -99,6 +99,7 @@ void scheduler::work_thread_work_section() { } } while (!work_section_done_); + PLS_ASSERT(my_cont_manager.is_clean(), "Only finish work section with clean chain!"); } void scheduler::terminate() { diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index 6e12861..ff5b203 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -3,6 +3,7 @@ #include "pls/internal/base/system_details.h" #include "pls/internal/data_structures/aligned_stack.h" +#include "pls/internal/data_structures/bounded_trading_deque.h" using namespace pls::internal::data_structures; using namespace pls::internal::base; -- libgit2 0.26.0