Commit 4cf3848f by FritzFlorian

Sketch out idea for lock free trading deque.

parent 1b576824
Pipeline #1342 failed with stages
in 36 seconds
......@@ -58,7 +58,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/cont_manager.h
include/pls/internal/scheduling/cont.h
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h src/internal/base/error_handling.cpp)
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h src/internal/base/error_handling.cpp include/pls/internal/data_structures/bounded_trading_deque.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......
#ifndef PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_TRADING_DEQUE_H_
#define PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_TRADING_DEQUE_H_
#include <atomic>
#include <pls/internal/base/system_details.h>
#include "pls/internal/base/error_handling.h"
#include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/optional.h"
#include "pls/internal/data_structures/stamped_integer.h"
namespace pls {
namespace internal {
namespace data_structures {
template<typename TradedType>
class traded_field {
// TODO: Replace unsigned long with a portable sized integer
// (some systems might have different pointer sizes to long sizes).
void fill_with_tag(unsigned long tag) {
pointer_ = (void *) ((tag << 1lu) | 0x1lu);
}
unsigned long get_tag() {
PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one.");
return ((unsigned long) (pointer_)) >> 1lu;
}
bool is_filled_with_tag() {
return ((unsigned long) (pointer_)) & 0x1lu;
}
void fill_with_object(TradedType *object) {
PLS_ASSERT((object & 0x1lu) == 0,
"Must only store aligned objects in this data structure (last bits are needed for tag bit)");
pointer_ = object;
}
TradedType *get_object() {
PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one.");
return pointer_;
}
bool is_filled_with_object() {
return !is_filled_with_tag() && pointer_ != nullptr;
}
private:
void *pointer_{nullptr};
};
template<typename EntryType, typename TradedType>
class alignas(base::system_details::CACHE_LINE_SIZE) trading_deque_entry {
/*
* Fill the slot with its initial values, making it ready for being stolen.
* Performs no synchronization/memory ordering constraints.
*
* Method is called to init a field on pushBot.
*/
void fill_slots(EntryType *entry_item, unsigned long tag) {
entry_slot_.store(entry_item, std::memory_order_relaxed);
// Relaxed is fine for this, as adding elements is synced over the bot pointer
auto old = trade_slot_.load(std::memory_order_relaxed);
old.fill_with_tag(tag);
trade_slot_.store(std::memory_order_relaxed);
}
/**
* Tries to atomically read out the object traded in by thieves.
* Either returns the traded in field (the slot was stolen) or no result (the slot is still owned locally).
*
* Method is used to pop a field on popBot.
*/
optional<TradedType *> acquire_traded_type() {
traded_field<TradedType> empty_field;
traded_field<TradedType> old_field_value = trade_slot_.exchange(empty_field, std::memory_order_acq_rel);
if (old_field_value.is_filled_with_tag()) {
return optional<TradedType *>();
} else {
return optional<TradedType *>(old_field_value.get_object());
}
}
EntryType *get_object() {
return entry_slot_;
}
optional<EntryType *> trade_object(TradedType *offered_object, unsigned long expected_tag) {
// Read our potential result
EntryType *result = entry_slot_.load(std::memory_order_relaxed);
// Try to get it by CAS with the expected field entry, giving up our offered_object for it
traded_field<TradedType> expected_field;
expected_field.fill_with_tag(expected_tag);
traded_field<TradedType> offered_field;
offered_field.fill_with_object(offered_object);
if (trade_slot_.compare_exchange_strong(expected_field, offered_field, std::memory_order_acq_rel)) {
return optional<EntryType *>(result);
} else {
return optional<EntryType *>(nullptr);
}
}
private:
std::atomic<EntryType *> entry_slot_{nullptr};
std::atomic<traded_field<TradedType>> trade_slot_{};
};
/**
* A work stealing deque (single produces/consumer at the end, multiple consumers at the start).
* A pointer to an OfferedType object can only be acquired by stealing consumers (from the start),
* when they also offer a pointer to a TradeType object.
*
* The exchange of 'goods' (OfferedType and TradedType) happens atomically at a linearization point.
* This means that the owning thread always gets a TradedType for each and every OfferedType that was
* successfully stolen.
*
* The owner of the deque must pop ALL elements, even the stolen ones (to get the traded goods instead).
*
* @tparam EntryType The type of objects stored in the deque
* @tparam TradedType The type of objects traded in for acquiring a deque element.
*/
template<typename EntryType, typename TradedType>
class bounded_trading_deque {
using deque_entry = trading_deque_entry<EntryType, TradedType>;
public:
bounded_trading_deque(deque_entry *entries, size_t num_entries) :
entries_{entries}, num_entries_{num_entries} {};
void push_bot(EntryType *offered_object) {
auto expected_stamp = bot_internal_.stamp;
auto *current_entry = entries_[bot_internal_.value];
current_entry->fill_slots(offered_object, expected_stamp);
bot_internal_.stamp++;
bot_internal_.value++;
bot_.store(bot_internal_.value, std::memory_order_release);
}
struct pop_result {
explicit pop_result(optional<EntryType *> entry, optional<TradedType *> traded) : entry_{entry},
traded_{traded} {};
pop_result() : entry_{}, traded_{} {};
optional<EntryType *> entry_;
optional<TradedType *> traded_;
};
pop_result pop_bot() {
if (bot_internal_.value == 0) {
return pop_result{}; // Empty, nothing to return...
}
// Go one step back
bot_internal_.value--;
auto *current_entry = entries_[bot_internal_.value];
optional<TradedType *> traded_object = current_entry->acquire_traded_type();
optional<EntryType *> queue_entry;
if (traded_object) {
// We do not return an entry, but the traded object
queue_entry = {};
} else {
// We still got it locally, grab the object
queue_entry = {current_entry->get_object()};
// Keep the tag up to date (we must re-use it, as the head is just increasing by steps of one from the beginning)
bot_internal_.stamp--;
}
bot_.store(bot_internal_.value, std::memory_order_relaxed);
return pop_result{queue_entry, traded_object};
}
optional<EntryType *> pop_top(TradedType *trade_offer) {
auto local_top = top_.load();
optional<EntryType *> entry = entries_[local_top.value].trade_object(trade_offer, local_top.stamp);
top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1});
return entry;
}
private:
deque_entry *entries_;
size_t num_entries_;
std::atomic<stamped_integer> top_{{0, 0}};
std::atomic<size_t> bot_{0};
stamped_integer bot_internal_{0, 0};
};
}
}
}
#endif //PLS_INTERNAL_DATA_STRUCTURES_BOUNDED_TRADING_DEQUE_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment