From 4cf3848f2a2fafe8556f6718dccfa1b9dc544f25 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Mon, 2 Dec 2019 18:56:00 +0100 Subject: [PATCH] Sketch out idea for lock free trading deque. --- lib/pls/CMakeLists.txt | 2 +- lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h | 200 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 201 insertions(+), 1 deletion(-) create mode 100644 lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index e5ff31e..b1e4333 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -58,7 +58,7 @@ add_library(pls STATIC include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/cont_manager.h include/pls/internal/scheduling/cont.h - include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h src/internal/base/error_handling.cpp) + include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h src/internal/base/error_handling.cpp include/pls/internal/data_structures/bounded_trading_deque.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h b/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h new file mode 100644 index 0000000..1829a46 --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/bounded_trading_deque.h @@ -0,0 +1,200 @@ + +#ifndef PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_TRADING_DEQUE_H_ +#define PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_TRADING_DEQUE_H_ + +#include +#include + +#include "pls/internal/base/error_handling.h" +#include "pls/internal/base/system_details.h" + +#include "pls/internal/data_structures/optional.h" +#include "pls/internal/data_structures/stamped_integer.h" + +namespace pls { +namespace internal { +namespace data_structures { + +template +class traded_field { + // TODO: Replace unsigned long with a portable sized integer + // (some systems might have different pointer sizes to long sizes). + + void fill_with_tag(unsigned long tag) { + pointer_ = (void *) ((tag << 1lu) | 0x1lu); + } + unsigned long get_tag() { + PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one."); + return ((unsigned long) (pointer_)) >> 1lu; + } + bool is_filled_with_tag() { + return ((unsigned long) (pointer_)) & 0x1lu; + } + + void fill_with_object(TradedType *object) { + PLS_ASSERT((object & 0x1lu) == 0, + "Must only store aligned objects in this data structure (last bits are needed for tag bit)"); + pointer_ = object; + } + TradedType *get_object() { + PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one."); + return pointer_; + } + bool is_filled_with_object() { + return !is_filled_with_tag() && pointer_ != nullptr; + } + + private: + void *pointer_{nullptr}; +}; + +template +class alignas(base::system_details::CACHE_LINE_SIZE) trading_deque_entry { + /* + * Fill the slot with its initial values, making it ready for being stolen. + * Performs no synchronization/memory ordering constraints. + * + * Method is called to init a field on pushBot. + */ + void fill_slots(EntryType *entry_item, unsigned long tag) { + entry_slot_.store(entry_item, std::memory_order_relaxed); + + // Relaxed is fine for this, as adding elements is synced over the bot pointer + auto old = trade_slot_.load(std::memory_order_relaxed); + old.fill_with_tag(tag); + trade_slot_.store(std::memory_order_relaxed); + } + + /** + * Tries to atomically read out the object traded in by thieves. + * Either returns the traded in field (the slot was stolen) or no result (the slot is still owned locally). + * + * Method is used to pop a field on popBot. + */ + optional acquire_traded_type() { + traded_field empty_field; + traded_field old_field_value = trade_slot_.exchange(empty_field, std::memory_order_acq_rel); + + if (old_field_value.is_filled_with_tag()) { + return optional(); + } else { + return optional(old_field_value.get_object()); + } + } + + EntryType *get_object() { + return entry_slot_; + } + + optional trade_object(TradedType *offered_object, unsigned long expected_tag) { + // Read our potential result + EntryType *result = entry_slot_.load(std::memory_order_relaxed); + + // Try to get it by CAS with the expected field entry, giving up our offered_object for it + traded_field expected_field; + expected_field.fill_with_tag(expected_tag); + traded_field offered_field; + offered_field.fill_with_object(offered_object); + + if (trade_slot_.compare_exchange_strong(expected_field, offered_field, std::memory_order_acq_rel)) { + return optional(result); + } else { + return optional(nullptr); + } + } + + private: + std::atomic entry_slot_{nullptr}; + std::atomic> trade_slot_{}; +}; + +/** + * A work stealing deque (single produces/consumer at the end, multiple consumers at the start). + * A pointer to an OfferedType object can only be acquired by stealing consumers (from the start), + * when they also offer a pointer to a TradeType object. + * + * The exchange of 'goods' (OfferedType and TradedType) happens atomically at a linearization point. + * This means that the owning thread always gets a TradedType for each and every OfferedType that was + * successfully stolen. + * + * The owner of the deque must pop ALL elements, even the stolen ones (to get the traded goods instead). + * + * @tparam EntryType The type of objects stored in the deque + * @tparam TradedType The type of objects traded in for acquiring a deque element. + */ +template +class bounded_trading_deque { + using deque_entry = trading_deque_entry; + + public: + bounded_trading_deque(deque_entry *entries, size_t num_entries) : + entries_{entries}, num_entries_{num_entries} {}; + + void push_bot(EntryType *offered_object) { + auto expected_stamp = bot_internal_.stamp; + auto *current_entry = entries_[bot_internal_.value]; + + current_entry->fill_slots(offered_object, expected_stamp); + bot_internal_.stamp++; + bot_internal_.value++; + + bot_.store(bot_internal_.value, std::memory_order_release); + } + + struct pop_result { + explicit pop_result(optional entry, optional traded) : entry_{entry}, + traded_{traded} {}; + pop_result() : entry_{}, traded_{} {}; + + optional entry_; + optional traded_; + }; + pop_result pop_bot() { + if (bot_internal_.value == 0) { + return pop_result{}; // Empty, nothing to return... + } + + // Go one step back + bot_internal_.value--; + + auto *current_entry = entries_[bot_internal_.value]; + optional traded_object = current_entry->acquire_traded_type(); + optional queue_entry; + if (traded_object) { + // We do not return an entry, but the traded object + queue_entry = {}; + } else { + // We still got it locally, grab the object + queue_entry = {current_entry->get_object()}; + // Keep the tag up to date (we must re-use it, as the head is just increasing by steps of one from the beginning) + bot_internal_.stamp--; + } + + bot_.store(bot_internal_.value, std::memory_order_relaxed); + + return pop_result{queue_entry, traded_object}; + } + + optional pop_top(TradedType *trade_offer) { + auto local_top = top_.load(); + optional entry = entries_[local_top.value].trade_object(trade_offer, local_top.stamp); + top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + + return entry; + } + + private: + deque_entry *entries_; + size_t num_entries_; + + std::atomic top_{{0, 0}}; + + std::atomic bot_{0}; + stamped_integer bot_internal_{0, 0}; +}; + +} +} +} + +#endif //PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_TRADING_DEQUE_H_ -- libgit2 0.26.0