diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index a580128..8065e84 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -23,6 +23,7 @@ add_library(pls STATIC include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/stamped_integer.h + include/pls/internal/data_structures/stamped_split_integer.h include/pls/internal/data_structures/delayed_initialization.h include/pls/internal/data_structures/bounded_trading_deque.h include/pls/internal/data_structures/bounded_ws_deque.h diff --git a/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h b/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h index e54d9d7..6c40ca5 100644 --- a/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h +++ b/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h @@ -156,14 +156,14 @@ class bounded_trading_deque { entries_{entries}, num_entries_{num_entries} {}; void push_bot(EntryType *offered_object) { - auto expected_stamp = bot_internal_.stamp; - auto ¤t_entry = entries_[bot_internal_.value]; + auto expected_stamp = bot_internal_.stamp_; + auto ¤t_entry = entries_[bot_internal_.value_]; current_entry.fill_slots(offered_object, expected_stamp); - bot_internal_.stamp++; - bot_internal_.value++; + bot_internal_.stamp_++; + bot_internal_.value_++; - bot_.store(bot_internal_.value, std::memory_order_release); + bot_.store(bot_internal_.value_, std::memory_order_release); } struct pop_result { @@ -175,14 +175,14 @@ class bounded_trading_deque { optional traded_; }; pop_result pop_bot() { - if (bot_internal_.value == 0) { + if (bot_internal_.value_ == 0) { return pop_result{}; // Empty, nothing to return... } // Go one step back - bot_internal_.value--; + bot_internal_.value_--; - auto ¤t_entry = entries_[bot_internal_.value]; + auto ¤t_entry = entries_[bot_internal_.value_]; optional traded_object = current_entry.acquire_traded_type(); optional queue_entry; if (traded_object) { @@ -193,10 +193,10 @@ class bounded_trading_deque { queue_entry = optional{current_entry.get_object()}; } - bot_.store(bot_internal_.value, std::memory_order_relaxed); - if (bot_internal_.value == 0) { - bot_internal_.stamp++; - top_.store({bot_internal_.stamp, 0}, std::memory_order_release); + bot_.store(bot_internal_.value_, std::memory_order_relaxed); + if (bot_internal_.value_ == 0) { + bot_internal_.stamp_++; + top_.store({bot_internal_.stamp_, 0}, std::memory_order_release); } return pop_result{queue_entry, traded_object}; @@ -205,10 +205,10 @@ class bounded_trading_deque { std::tuple, stamped_integer> peek_top() { auto local_top = top_.load(); auto local_bot = bot_.load(); - if (local_top.value >= local_bot) { + if (local_top.value_ >= local_bot) { return std::make_tuple(optional{}, local_top); } else { - return std::make_tuple(optional{entries_[local_top.value].get_object()}, local_top); + return std::make_tuple(optional{entries_[local_top.value_].get_object()}, local_top); } } @@ -219,24 +219,24 @@ class bounded_trading_deque { optional pop_top(TradedType *trade_offer, stamped_integer local_top) { auto local_bot = bot_.load(); - if (local_top.value >= local_bot) { + if (local_top.value_ >= local_bot) { return optional{}; } - unsigned long expected_top_stamp = local_top.stamp; - optional entry = entries_[local_top.value].trade_object(trade_offer, expected_top_stamp); + unsigned long expected_top_stamp = local_top.stamp_; + optional entry = entries_[local_top.value_].trade_object(trade_offer, expected_top_stamp); if (entry) { // We got it, for sure move the top pointer forward. - top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + top_.compare_exchange_strong(local_top, {local_top.stamp_ + 1, local_top.value_ + 1}); } else { // We did not get it.... - if (entries_[local_top.value].is_empty()) { + if (entries_[local_top.value_].is_empty()) { // ...update the top stamp, so the next call can get it (we still make system progress, as the owner // must have popped off the element) - top_.compare_exchange_strong(local_top, {expected_top_stamp, local_top.value}); + top_.compare_exchange_strong(local_top, {expected_top_stamp, local_top.value_}); } else { // ...move the pointer forward if someone else put a valid trade object in there. - top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + top_.compare_exchange_strong(local_top, {local_top.stamp_ + 1, local_top.value_ + 1}); } } diff --git a/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h index 194a94c..cfaa49f 100644 --- a/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h +++ b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h @@ -40,19 +40,19 @@ class bounded_ws_deque { } bool is_empty() { - return top_.load().value < bottom_.load(); + return top_.load().value_ < bottom_.load(); } optional pop_top() { stamped_integer old_top = top_.load(); - unsigned int new_stamp = old_top.stamp + 1; - unsigned int new_value = old_top.value + 1; + unsigned int new_stamp = old_top.stamp_ + 1; + unsigned int new_value = old_top.value_ + 1; - if (bottom_.load() <= old_top.value) { + if (bottom_.load() <= old_top.value_) { return optional(); } - optional result(item_array_[old_top.value]); + optional result(item_array_[old_top.value_]); if (top_.compare_exchange_strong(old_top, {new_stamp, new_value})) { return result; } @@ -71,14 +71,14 @@ class bounded_ws_deque { optional result(item_array_[local_bottom_]); stamped_integer old_top = top_.load(std::memory_order_acquire); - if (local_bottom_ > old_top.value) { + if (local_bottom_ > old_top.value_) { // Enough distance to just return the value return result; } - if (local_bottom_ == old_top.value) { + if (local_bottom_ == old_top.value_) { local_bottom_ = 0; bottom_.store(local_bottom_); - if (top_.compare_exchange_strong(old_top, {old_top.stamp + 1, 0})) { + if (top_.compare_exchange_strong(old_top, {old_top.stamp_ + 1, 0})) { // We won the competition and the queue is empty return result; } @@ -87,7 +87,7 @@ class bounded_ws_deque { // The queue is empty and we lost the competition local_bottom_ = 0; bottom_.store(local_bottom_); - top_.store({old_top.stamp + 1, 0}); + top_.store({old_top.stamp_ + 1, 0}); return optional(); } diff --git a/lib/pls/include/pls/internal/data_structures/stamped_integer.h b/lib/pls/include/pls/internal/data_structures/stamped_integer.h index 511f774..c7f4418 100644 --- a/lib/pls/include/pls/internal/data_structures/stamped_integer.h +++ b/lib/pls/include/pls/internal/data_structures/stamped_integer.h @@ -4,23 +4,20 @@ #include "pls/internal/base/system_details.h" -namespace pls { -namespace internal { -namespace data_structures { +namespace pls::internal::data_structures { -constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2; struct stamped_integer { using member_t = base::system_details::cas_integer; - member_t stamp:HALF_CACHE_LINE; - member_t value:HALF_CACHE_LINE; + member_t stamp_: base::system_details::CAS_SIZE / 2; + member_t value_: base::system_details::CAS_SIZE / 2; - stamped_integer() : stamp{0}, value{0} {}; - stamped_integer(member_t new_value) : stamp{0}, value{new_value} {}; - stamped_integer(member_t new_stamp, member_t new_value) : stamp{new_stamp}, value{new_value} {}; + stamped_integer() : stamp_{0}, value_{0} {}; + stamped_integer(member_t new_value) : stamp_{0}, value_{new_value} {}; + stamped_integer(member_t new_stamp, member_t new_value) : stamp_{new_stamp}, value_{new_value} {}; bool operator==(const stamped_integer &other) const noexcept { - return stamp == other.stamp && value == other.value; + return stamp_ == other.stamp_ && value_ == other.value_; } bool operator!=(const stamped_integer &other) const noexcept { @@ -29,7 +26,5 @@ struct stamped_integer { }; } -} -} #endif //PLS_STAMPED_INTEGER_H_ diff --git a/lib/pls/include/pls/internal/data_structures/stamped_split_integer.h b/lib/pls/include/pls/internal/data_structures/stamped_split_integer.h new file mode 100644 index 0000000..e65ace0 --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/stamped_split_integer.h @@ -0,0 +1,33 @@ + +#ifndef PLS_STAMPED_SPLIT_INTEGER_H_ +#define PLS_STAMPED_SPLIT_INTEGER_H_ + +#include "pls/internal/base/system_details.h" + +namespace pls::internal::data_structures { + +struct stamped_split_integer { + using member_t = base::system_details::cas_integer; + + member_t stamp_: base::system_details::CAS_SIZE / 2; + member_t value_1_: base::system_details::CAS_SIZE / 4; + member_t value_2_: base::system_details::CAS_SIZE / 4; + + stamped_split_integer() : stamp_{0}, value_1_{0}, value_2_{0} {}; + stamped_split_integer(member_t value_1, member_t value_2) : stamp_{0}, value_1_{value_1}, value_2_{value_2} {}; + stamped_split_integer(member_t new_stamp, member_t value_1, member_t value_2) : stamp_{new_stamp}, + value_1_{value_1}, + value_2_{value_2} {}; + + bool operator==(const stamped_split_integer &other) const noexcept { + return stamp_ == other.stamp_ && value_1_ == other.value_1_ && value_2_ == other.value_2_; + } + + bool operator!=(const stamped_split_integer &other) const noexcept { + return !(*this == other); + } +}; + +} + +#endif //PLS_STAMPED_SPLIT_INTEGER_H_ diff --git a/lib/pls/include/pls/internal/profiling/dag_node.h b/lib/pls/include/pls/internal/profiling/dag_node.h index 6edbbb7..4377802 100644 --- a/lib/pls/include/pls/internal/profiling/dag_node.h +++ b/lib/pls/include/pls/internal/profiling/dag_node.h @@ -33,7 +33,7 @@ struct dag_node { } void dag_compact(); - void dag_print(std::ostream &stream, unsigned rank); + void dag_print(std::ostream &stream, unsigned rank, bool capture_memory, bool capture_time); unsigned dag_max_memory(); unsigned long dag_total_user_time(); unsigned long dag_critical_path(); diff --git a/lib/pls/include/pls/internal/profiling/profiler.h b/lib/pls/include/pls/internal/profiling/profiler.h index c48d11e..75a14db 100644 --- a/lib/pls/include/pls/internal/profiling/profiler.h +++ b/lib/pls/include/pls/internal/profiling/profiler.h @@ -24,11 +24,14 @@ class profiler { } struct profiler_run { - profiler_run(unsigned num_threads) : start_time_{}, - end_time_{}, - root_node_{std::make_unique(0)}, - per_thread_stats_(num_threads), - num_threads_{num_threads} {} + profiler_run(profiler &profiler) : profiler_{profiler}, + start_time_{}, + end_time_{}, + root_node_{std::make_unique(0)}, + per_thread_stats_(profiler.num_threads_), + num_threads_{profiler.num_threads_} {} + + profiler &profiler_; // Runtime stats clock::time_point start_time_; diff --git a/lib/pls/include/pls/internal/scheduling/base_task.h b/lib/pls/include/pls/internal/scheduling/base_task.h index 2631028..231c402 100644 --- a/lib/pls/include/pls/internal/scheduling/base_task.h +++ b/lib/pls/include/pls/internal/scheduling/base_task.h @@ -25,6 +25,9 @@ namespace pls::internal::scheduling { * * This base_task can be extended by different trading/stealing implementations, * to add for example additional flags. The scheduler itself always works solely with this base version. + * + * Currently, only the 'lock_free' stealing implementation is present and extends the task in this package. + * The scheduler only uses/operates on this base_task, as the staling/resource trading is not its domain. */ struct strain_resource; struct base_task { diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h b/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h index c032de0..6ed238c 100644 --- a/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h +++ b/lib/pls/include/pls/internal/scheduling/lock_free/external_trading_deque.h @@ -37,8 +37,8 @@ class external_trading_deque { public: external_trading_deque(unsigned thread_id, size_t num_entries) : thread_id_(thread_id), entries_(num_entries) {} - static task *peek_traded_object(task *target_task); - static task *get_trade_object(task *target_task); + static traded_cas_field peek_traded_object(task *target_task); + static task *get_trade_object(task *target_task, traded_cas_field peeked_cas, external_trading_deque &other_deque); /** * Pushes a task on the bottom of the deque. diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/task.h b/lib/pls/include/pls/internal/scheduling/lock_free/task.h index a78da3e..3f7c7ac 100644 --- a/lib/pls/include/pls/internal/scheduling/lock_free/task.h +++ b/lib/pls/include/pls/internal/scheduling/lock_free/task.h @@ -3,7 +3,7 @@ #define PLS_LOCK_FREE_TASK_H_ #include "pls/internal/scheduling/base_task.h" -#include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/data_structures/stamped_split_integer.h" #include "pls/internal/scheduling/lock_free/traded_cas_field.h" namespace pls::internal::scheduling::lock_free { @@ -22,12 +22,17 @@ struct task : public base_task { std::atomic external_trading_deque_cas_{}; void push_task_chain(task *spare_task_chain); + void propose_push_task_chain(task *spare_task_chain); + bool accept_proposed(); + bool decline_proposed(); task *pop_task_chain(); - void reset_task_chain(); private: std::atomic resource_stack_next_{}; - std::atomic resource_stack_root_{{0, 0}}; + // STAMP = CAS stamp, half CAS length (16 or 32 Bit) + // VALUE_1 = Root of the actual stack, indicated by thread ID (8 or 16 Bit) + // VALUE_2 = Proposed element in queue, indicated by thread ID (8 or 16 Bit) + std::atomic resource_stack_root_{{0, 0, 0}}; }; } diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h b/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h index d61a28d..da17b94 100644 --- a/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h +++ b/lib/pls/include/pls/internal/scheduling/lock_free/traded_cas_field.h @@ -34,27 +34,32 @@ struct traded_cas_field { static constexpr base::system_details::cas_integer TRADE_OBJECT_BITS = ~((~0x0ul) << TRADED_OBJECT_SIZE) << TRADED_OBJECT_SHIFT; - static constexpr base::system_details::cas_integer ID_SIZE = 10ul; // Up to 1024 cores + static constexpr base::system_details::cas_integer ID_SIZE = (CAS_SIZE / 2) - TAG_SIZE; // Half the CAS for the ID static constexpr base::system_details::cas_integer ID_SHIFT = TAG_SIZE; static constexpr base::system_details::cas_integer ID_BITS = ~((~0x0ul) << ID_SIZE) << ID_SHIFT; - static constexpr base::system_details::cas_integer STAMP_SIZE = CAS_SIZE - TAG_SIZE - ID_SIZE; + static constexpr base::system_details::cas_integer STAMP_SIZE = (CAS_SIZE / 2); // Half the CAS for the STAMP static constexpr base::system_details::cas_integer STAMP_SHIFT = TAG_SIZE + ID_SIZE; static constexpr base::system_details::cas_integer STAMP_BITS = ~((~0x0ul) << STAMP_SIZE) << STAMP_SHIFT; public: - void fill_with_stamp(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) { + void fill_with_stamp_and_deque(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) { cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | STAMP_TAG); } - base::system_details::cas_integer get_stamp() { + void fill_with_stamp_and_empty(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) { + cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | EMPTY_TAG); + PLS_ASSERT(is_empty(), "Must be empty after filling it empty..."); + } + + [[nodiscard]] base::system_details::cas_integer get_stamp() const { PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); return (((base::system_details::cas_integer) cas_integer_) & STAMP_BITS) >> STAMP_SHIFT; } - base::system_details::cas_integer get_deque_id() { + [[nodiscard]] base::system_details::cas_integer get_deque_id() const { PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); return (((base::system_details::cas_integer) cas_integer_) & ID_BITS) >> ID_SHIFT; } - bool is_filled_with_stamp() { + [[nodiscard]] bool is_filled_with_stamp() const { return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == STAMP_TAG; } @@ -63,18 +68,25 @@ struct traded_cas_field { "Must only store aligned objects in this data structure (last bits are needed for tag bit)"); cas_integer_ = (((base::system_details::cas_integer) new_task) | TRADE_TAG); } - task *get_trade_object() { + [[nodiscard]] task *get_trade_object() const { PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one."); return reinterpret_cast(((base::system_details::cas_integer) cas_integer_) & TRADE_OBJECT_BITS); } - bool is_filled_with_object() { + [[nodiscard]] bool is_filled_with_object() const { return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == TRADE_TAG; } - bool is_empty() { + [[nodiscard]] bool is_empty() const { return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == EMPTY_TAG; } + bool operator==(const traded_cas_field &other) const { + return this->cas_integer_ == other.cas_integer_; + } + bool operator!=(const traded_cas_field &other) const { + return !((*this) == other); + } + private: base::system_details::cas_integer cas_integer_{}; }; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index c68d452..542d42c 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -142,7 +142,7 @@ class scheduler { template static void serial_internal(Function &&lambda); - static context_switcher::continuation slow_return(thread_state &calling_state); + static context_switcher::continuation slow_return(thread_state &calling_state, bool in_sync); static void work_thread_main_loop(); void work_thread_work_section(); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index fa9cf9c..a53b70e 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -33,7 +33,7 @@ scheduler::scheduler(unsigned int num_threads, stack_allocator_{std::make_shared(std::forward(stack_allocator))}, serial_stack_size_{serial_stack_size} #if PLS_PROFILING_ENABLED -, profiler_{num_threads} + , profiler_{num_threads} #endif { @@ -269,7 +269,7 @@ void scheduler::spawn_internal(Function &&lambda) { syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(), spawned_task->profiling_node_); #endif - auto continuation = slow_return(syncing_state); + auto continuation = slow_return(syncing_state, false); return continuation; } }); diff --git a/lib/pls/src/internal/profiling/dag_node.cpp b/lib/pls/src/internal/profiling/dag_node.cpp index 0c3d611..16b721f 100644 --- a/lib/pls/src/internal/profiling/dag_node.cpp +++ b/lib/pls/src/internal/profiling/dag_node.cpp @@ -19,18 +19,23 @@ void dag_node::dag_compact() { } } -void dag_node::dag_print(std::ostream &stream, unsigned rank) { +void dag_node::dag_print(std::ostream &stream, unsigned rank, bool capture_memory, bool capture_time) { stream << node_print_id() - << " [label=\"" << spawning_thread_id_ << "\n" - << max_memory_ << " bytes\n" - << m_to_d(total_runtime_) << " us\"" - << " ,rank=" << rank << "];" << std::endl; + << " [label=\"" << spawning_thread_id_ << "\n"; + if (capture_memory) { + stream << max_memory_ << " bytes\n"; + } + if (capture_time) { + stream << m_to_d(total_runtime_) << " us\""; + } + stream << " ,rank=" << rank << "];" << std::endl; + for (auto &child : child_nodes_) { - child.dag_print(stream, rank + 1); + child.dag_print(stream, rank + 1, capture_memory, capture_time); stream << node_print_id() << " -> " << child.node_print_id() << ";" << std::endl; } if (next_node_) { - next_node_->dag_print(stream, rank); + next_node_->dag_print(stream, rank, capture_memory, capture_time); stream << node_print_id() << " -> " << next_node_->node_print_id() << ";" << std::endl; } } diff --git a/lib/pls/src/internal/profiling/profiler.cpp b/lib/pls/src/internal/profiling/profiler.cpp index 024644d..a69e946 100644 --- a/lib/pls/src/internal/profiling/profiler.cpp +++ b/lib/pls/src/internal/profiling/profiler.cpp @@ -58,12 +58,12 @@ void profiler::profiler_run::print_stats() const { void profiler::profiler_run::print_dag(std::ostream &stream) { stream << "digraph {" << std::endl; - root_node_->dag_print(std::cout, 0); + root_node_->dag_print(std::cout, 0, profiler_.capture_memory_, profiler_.capture_time_); stream << "}" << std::endl; } dag_node *profiler::start_profiler_run() { - profiler_run ¤t_run = profiler_runs_.emplace_back(num_threads_); + profiler_run ¤t_run = profiler_runs_.emplace_back(*this); current_run.start_time_ = clock::now(); return current_run.root_node_.get(); } diff --git a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp index c17ee23..f5a9a3d 100644 --- a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp @@ -3,20 +3,19 @@ namespace pls::internal::scheduling::lock_free { -task *external_trading_deque::peek_traded_object(task *target_task) { +traded_cas_field external_trading_deque::peek_traded_object(task *target_task) { traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); - if (current_cas.is_filled_with_object()) { - return current_cas.get_trade_object(); - } else { - return nullptr; - } + return current_cas; } -task *external_trading_deque::get_trade_object(task *target_task) { - traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); +task *external_trading_deque::get_trade_object(task *target_task, + traded_cas_field peeked_cas, + external_trading_deque &other_deque) { + traded_cas_field current_cas = peeked_cas; if (current_cas.is_filled_with_object()) { task *result = current_cas.get_trade_object(); traded_cas_field empty_cas; + empty_cas.fill_with_stamp_and_empty(other_deque.bot_internal_.stamp_, other_deque.thread_id_); if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { return result; } @@ -26,8 +25,8 @@ task *external_trading_deque::get_trade_object(task *target_task) { } void external_trading_deque::push_bot(task *published_task) { - auto expected_stamp = bot_internal_.stamp; - auto ¤t_entry = entries_[bot_internal_.value]; + auto expected_stamp = bot_internal_.stamp_; + auto ¤t_entry = entries_[bot_internal_.value_]; // Publish the prepared task in the deque. current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); @@ -36,36 +35,37 @@ void external_trading_deque::push_bot(task *published_task) { // Field that all threads synchronize on. // This happens not in the deque itself, but in the published task. traded_cas_field sync_cas_field; - sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); + sync_cas_field.fill_with_stamp_and_deque(expected_stamp, thread_id_); published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); // Advance the bot pointer. Linearization point for making the task public. - bot_internal_.stamp++; - bot_internal_.value++; - bot_.store(bot_internal_.value, std::memory_order_release); + bot_internal_.stamp_++; + bot_internal_.value_++; + bot_.store(bot_internal_.value_, std::memory_order_release); } void external_trading_deque::reset_bot_and_top() { - bot_internal_.value = 0; - bot_internal_.stamp++; + bot_internal_.value_ = 0; + bot_internal_.stamp_++; bot_.store(0); - top_.store({bot_internal_.stamp, 0}); + top_.store({bot_internal_.stamp_, 0}); } task *external_trading_deque::pop_bot() { - if (bot_internal_.value > 0) { - bot_internal_.value--; - bot_.store(bot_internal_.value, std::memory_order_relaxed); + if (bot_internal_.value_ > 0) { + bot_internal_.value_--; + bot_.store(bot_internal_.value_, std::memory_order_relaxed); - auto ¤t_entry = entries_[bot_internal_.value]; + auto ¤t_entry = entries_[bot_internal_.value_]; auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed); auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed); // We know what value must be in the cas field if no other thread stole it. traded_cas_field expected_sync_cas_field; - expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); + expected_sync_cas_field.fill_with_stamp_and_deque(expected_stamp, thread_id_); traded_cas_field empty_cas_field; + empty_cas_field.fill_with_stamp_and_empty(expected_stamp, thread_id_); if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, empty_cas_field, @@ -82,8 +82,8 @@ external_trading_deque::peek_result external_trading_deque::peek_top() { auto local_top = top_.load(); auto local_bot = bot_.load(); - if (local_top.value < local_bot) { - return peek_result{entries_[local_top.value].traded_task_, local_top}; + if (local_top.value_ < local_bot) { + return peek_result{entries_[local_top.value_].traded_task_, local_top}; } else { return peek_result{nullptr, local_top}; } @@ -92,38 +92,38 @@ external_trading_deque::peek_result external_trading_deque::peek_top() { task *external_trading_deque::pop_top(task *offered_task, peek_result peek_result) { stamped_integer expected_top = peek_result.top_pointer_; auto local_bot = bot_.load(); - if (expected_top.value >= local_bot) { + if (expected_top.value_ >= local_bot) { return nullptr; } - auto &target_entry = entries_[expected_top.value]; + auto &target_entry = entries_[expected_top.value_]; // Read our potential result task *result = target_entry.traded_task_.load(); unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); + // Try to get it by CAS with the expected field entry, giving up our offered_task for it traded_cas_field expected_sync_cas_field; - expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_); + expected_sync_cas_field.fill_with_stamp_and_deque(expected_top.stamp_, thread_id_); traded_cas_field offered_field; offered_field.fill_with_trade_object(offered_task); if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { // We got it, for sure move the top pointer forward. - top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); - // Return the stolen task + top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1}); return result; } else { - // We did not get it...help forwarding the top pointer anyway. - if (expected_top.stamp == forwarding_stamp) { - // ...move the pointer forward if someone else put a valid trade object in there. - top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); - } else { + if (forwarding_stamp != expected_top.stamp_) { // ...we failed because the top tag lags behind...try to fix it. // This means only updating the tag, as this location can still hold data we need. - top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value}); + top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value_}); } + // TODO: Figure out how other tasks can put the stamp forward without race conditions. + // This must be here to ensure the lock-free property. + // For now first leave it out, as it makes fixing the other non-blocking interaction + // on the resource stack harder. return nullptr; } } diff --git a/lib/pls/src/internal/scheduling/lock_free/task.cpp b/lib/pls/src/internal/scheduling/lock_free/task.cpp index 7ee5d74..883b18e 100644 --- a/lib/pls/src/internal/scheduling/lock_free/task.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/task.cpp @@ -8,63 +8,116 @@ static task *find_task(unsigned id, unsigned depth) { return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth); } -void task::push_task_chain(task *spare_task_chain) { +void task::propose_push_task_chain(task *spare_task_chain) { PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_, "Makes no sense to push task onto itself, as it is not clean by definition."); PLS_ASSERT(this->depth_ == spare_task_chain->depth_, "Must only push tasks with correct depth."); - data_structures::stamped_integer current_root; - data_structures::stamped_integer target_root; + data_structures::stamped_split_integer current_root; + data_structures::stamped_split_integer target_root; do { current_root = this->resource_stack_root_.load(); - target_root.stamp = current_root.stamp + 1; - target_root.value = spare_task_chain->thread_id_ + 1; + PLS_ASSERT(current_root.value_2_ == 0, "Must only propose one push at a time!"); - if (current_root.value == 0) { - // Empty, simply push in with no successor - spare_task_chain->resource_stack_next_.store(nullptr); - } else { - // Already an entry. Find it's corresponding task and set it as our successor. - auto *current_root_task = find_task(current_root.value - 1, this->depth_); - spare_task_chain->resource_stack_next_.store(current_root_task); + // Add it to the current stack state by chaining its next stack task to the current base. + // Popping threads will see this proposed task as if it is the first item in the stack and pop it first, + // making sure that the chain is valid without any further modification when accepting the proposed task. + auto *current_root_task = current_root.value_1_ == 0 ? nullptr : find_task(current_root.value_1_ - 1, this->depth_); + spare_task_chain->resource_stack_next_.store(current_root_task); + + target_root.stamp_ = current_root.stamp_ + 1; + target_root.value_1_ = current_root.value_1_; + target_root.value_2_ = spare_task_chain->thread_id_ + 1; + } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); +} + +bool task::accept_proposed() { + data_structures::stamped_split_integer current_root; + data_structures::stamped_split_integer target_root; + do { + current_root = this->resource_stack_root_.load(); + if (current_root.value_2_ == 0) { + return false; // We are done, nothing to accept! } + target_root.stamp_ = current_root.stamp_ + 1; + target_root.value_1_ = current_root.value_2_; + target_root.value_2_ = 0; } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); + + return true; +} + +bool task::decline_proposed() { + bool proposed_still_there; + + data_structures::stamped_split_integer current_root; + data_structures::stamped_split_integer target_root; + do { + current_root = this->resource_stack_root_.load(); + proposed_still_there = current_root.value_2_ != 0; + + // No need to fetch anything, just delete the proposed item. + target_root.stamp_ = current_root.stamp_ + 1; + target_root.value_1_ = current_root.value_1_; + target_root.value_2_ = 0; + } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); + + return proposed_still_there; +} + +void task::push_task_chain(task *spare_task_chain) { + PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_, + "Makes no sense to push task onto itself, as it is not clean by definition."); + PLS_ASSERT(this->depth_ == spare_task_chain->depth_, + "Must only push tasks with correct depth."); + propose_push_task_chain(spare_task_chain); + accept_proposed(); } task *task::pop_task_chain() { - data_structures::stamped_integer current_root; - data_structures::stamped_integer target_root; + data_structures::stamped_split_integer current_root; + data_structures::stamped_split_integer target_root; + task *output_task; do { current_root = this->resource_stack_root_.load(); - if (current_root.value == 0) { - // Empty... - return nullptr; + + if (current_root.value_2_ == 0) { + if (current_root.value_1_ == 0) { + // No main chain and no proposed element. + return nullptr; + } else { + // No entries in the proposed slot, but fond + // elements in primary stack, try to pop from there. + auto *current_root_task = find_task(current_root.value_1_ - 1, this->depth_); + auto *next_stack_task = current_root_task->resource_stack_next_.load(); + + target_root.stamp_ = current_root.stamp_ + 1; + target_root.value_1_ = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; + target_root.value_2_ = 0; + + output_task = current_root_task; + } } else { - // Found something, try to pop it - auto *current_root_task = find_task(current_root.value - 1, this->depth_); - auto *next_stack_task = current_root_task->resource_stack_next_.load(); + // We got a proposed element. Treat it as beginning of resource stack by + // popping it instead of the main element chain. + auto *proposed_task = find_task(current_root.value_2_ - 1, this->depth_); - target_root.stamp = current_root.stamp + 1; - target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; + // Empty out the proposed slot + target_root.stamp_ = current_root.stamp_ + 1; + target_root.value_1_ = current_root.value_1_; + target_root.value_2_ = 0; - output_task = current_root_task; + output_task = proposed_task; } } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains."); + output_task->resource_stack_next_.store(nullptr); return output_task; } -void task::reset_task_chain() { - auto current_root = this->resource_stack_root_.load(); - current_root.stamp++; - current_root.value = 0; - this->resource_stack_root_.store(current_root); - this->resource_stack_next_.store(nullptr); -} - } diff --git a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp index 7951d72..cffba80 100644 --- a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp @@ -61,18 +61,28 @@ std::tuple task_manager::steal_task(thread_state PLS_ASSERT(pop_result_task == stolen_task, "We must only steal the task that we peeked at!"); - // update the resource stack associated with the stolen task + // Update the resource stack associated with the stolen task. + // We first propose the traded task, in case someone took our traded in field directly. +// stolen_task->propose_push_task_chain(traded_task); stolen_task->push_task_chain(traded_task); - task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task); + auto peeked_traded_object = external_trading_deque::peek_traded_object(stolen_task); + task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task, + peeked_traded_object, + stealing_state.get_task_manager().deque_); if (optional_exchanged_task) { - // All good, we pushed the task over to the stack, nothing more to do PLS_ASSERT(optional_exchanged_task == traded_task, "We are currently executing this, no one else can put another task in this field!"); + + // No one took the traded task, push it finally into the stack. +// stolen_task->accept_proposed(); } else { - // The last other active thread took it as its spare resource... - // ...remove our traded object from the stack again (it must be empty now and no one must access it anymore). - stolen_task->reset_task_chain(); + // Someone explicitly took the traded task from us, remove it from the stack. +// bool proposed_still_in_stack = stolen_task->decline_proposed(); +// PLS_ASSERT(proposed_still_in_stack, +// "The proposed task was stolen over the CAS field. It is not possible for another consumer to take the proposed task!"); + base_task *popped_task = stolen_task->pop_task_chain(); + PLS_ASSERT(popped_task == traded_task, "Other task must only steal CAS task if deque was empty!"); } return std::tuple{stolen_task, chain_after_stolen_task, true}; @@ -85,20 +95,41 @@ std::tuple task_manager::steal_task(thread_state } base_task *task_manager::pop_clean_task_chain(base_task *base_task) { - task *popped_task = static_cast(base_task); - // Try to get a clean resource chain to go back to the main stealing loop - task *clean_chain = popped_task->pop_task_chain(); - if (clean_chain == nullptr) { - // double-check if we are really last one or we only have unlucky timing - task *optional_cas_task = external_trading_deque::get_trade_object(popped_task); - if (optional_cas_task) { - clean_chain = optional_cas_task; - } else { - clean_chain = popped_task->pop_task_chain(); + task *target_task = static_cast(base_task); + + while (true) { + // Try to get a clean resource chain to go back to the main stealing loop + auto peeked_task_cas_before = external_trading_deque::peek_traded_object(target_task); + task *pop_result = target_task->pop_task_chain(); + if (pop_result) { + return pop_result; // Got something, so we are simply done here + } + auto peeked_task_cas_after = external_trading_deque::peek_traded_object(target_task); + + if (peeked_task_cas_before != peeked_task_cas_after) { + continue; } - } - return clean_chain; + if (peeked_task_cas_after.is_empty() || peeked_task_cas_after.is_filled_with_stamp()) { + if (peeked_task_cas_after.is_filled_with_stamp()) { + printf("what happened!\n"); // TODO: issue to 80% because the stealing threads skip parts of the deque. + continue; + } + // The task was 'stable' during our pop from the stack. + // Or in other words: no other thread operated on the task. + // We are therefore the last child and do not get a clean task chain. + return nullptr; + } + + // The task was stable, but has a potential resource attached in its cas field. + // Try to get it to not be blocked by the other preempted task. +// task *optional_cas_task = +// external_trading_deque::get_trade_object(target_task, peeked_task_cas_after, this->deque_); +// if (optional_cas_task) { +// // We got it, thus the other thread has not got it and will remove it from the queue. +// return optional_cas_task; +// } + } } } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index b797131..cb95032 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -184,7 +184,7 @@ void scheduler::sync_internal() { spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) { active_task->continuation_ = std::move(cont); syncing_state.set_active_task(spawned_task); - return slow_return(syncing_state); + return slow_return(syncing_state, true); }); PLS_ASSERT(!continuation.valid(), @@ -202,7 +202,7 @@ void scheduler::sync_internal() { } } -context_switcher::continuation scheduler::slow_return(thread_state &calling_state) { +context_switcher::continuation scheduler::slow_return(thread_state &calling_state, bool in_sync) { base_task *this_task = calling_state.get_active_task(); PLS_ASSERT(this_task->depth_ > 0, "Must never try to return from a task at level 0 (no last task), as we must have a target to return to."); @@ -240,7 +240,7 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat // Jump back to the continuation in main scheduling loop. context_switcher::continuation result_cont = std::move(thread_state::get().main_continuation()); - PLS_ASSERT(result_cont.valid(), "Must return a valid continuation."); + PLS_ASSERT(result_cont.valid(), "Must return a valid continuation (to main)."); return result_cont; } else { // Make sure that we are owner of this full continuation/task chain. @@ -251,8 +251,13 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat last_task->is_synchronized_ = true; // Jump to parent task and continue working on it. + if (in_sync) { + PLS_ASSERT(last_task->continuation_.valid(), "Must return a valid continuation (to last task) in sync."); + } else { + PLS_ASSERT(last_task->continuation_.valid(), "Must return a valid continuation (to last task) in spawn."); + } + context_switcher::continuation result_cont = std::move(last_task->continuation_); - PLS_ASSERT(result_cont.valid(), "Must return a valid continuation."); return result_cont; } } diff --git a/test/scheduling_lock_free_tests.cpp b/test/scheduling_lock_free_tests.cpp index 987553f..b91e0ed 100644 --- a/test/scheduling_lock_free_tests.cpp +++ b/test/scheduling_lock_free_tests.cpp @@ -21,7 +21,7 @@ TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/lock_free/ const int stamp = 42; const int ID = 10; traded_cas_field tag_field; - tag_field.fill_with_stamp(stamp, ID); + tag_field.fill_with_stamp_and_deque(stamp, ID); REQUIRE(tag_field.is_filled_with_stamp()); REQUIRE(!tag_field.is_empty()); REQUIRE(!tag_field.is_filled_with_object()); @@ -53,6 +53,58 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") { REQUIRE(tasks[0]->pop_task_chain() == nullptr); } + SECTION("propose/pop") { + tasks[0]->propose_push_task_chain(tasks[1]); + + REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); + REQUIRE(tasks[0]->pop_task_chain() == nullptr); + } + + SECTION("propose/accept/pop") { + tasks[0]->propose_push_task_chain(tasks[1]); + tasks[0]->accept_proposed(); + + REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); + REQUIRE(tasks[0]->pop_task_chain() == nullptr); + } + + SECTION("propose/decline/pop") { + tasks[0]->propose_push_task_chain(tasks[1]); + tasks[0]->decline_proposed(); + + REQUIRE(tasks[0]->pop_task_chain() == nullptr); + REQUIRE(tasks[0]->pop_task_chain() == nullptr); + } + + SECTION("propose intertwined normal ops") { + tasks[0]->push_task_chain(tasks[1]); + tasks[0]->propose_push_task_chain(tasks[2]); + + REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); + REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); + + tasks[0]->push_task_chain(tasks[1]); + tasks[0]->propose_push_task_chain(tasks[2]); + + REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); + tasks[0]->accept_proposed(); + REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); + + tasks[0]->push_task_chain(tasks[1]); + tasks[0]->propose_push_task_chain(tasks[2]); + tasks[0]->decline_proposed(); + + REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); + REQUIRE(tasks[0]->pop_task_chain() == nullptr); + + tasks[0]->push_task_chain(tasks[1]); + tasks[0]->propose_push_task_chain(tasks[2]); + tasks[0]->accept_proposed(); + + REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); + REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); + } + SECTION("multiple pushes") { tasks[0]->push_task_chain(tasks[1]); tasks[0]->push_task_chain(tasks[2]); @@ -88,7 +140,8 @@ TEST_CASE("external trading deque", "[internal/scheduling/lock_free/external_tra deque_1.push_bot(&tasks[0]); auto peek = deque_1.peek_top(); REQUIRE(deque_1.pop_top(&tasks[1], peek) == &tasks[0]); - REQUIRE(external_trading_deque::get_trade_object(&tasks[0]) == &tasks[1]); + auto trade_peek = deque_1.peek_traded_object(&tasks[0]); + REQUIRE(external_trading_deque::get_trade_object(&tasks[0], trade_peek, deque_1) == &tasks[1]); REQUIRE(!deque_1.pop_top(&tasks[1], peek)); REQUIRE(!deque_1.pop_bot());