Commit 2784f786 by FritzFlorian

WIP: Re-Work resource-trading implementation to fix race condition.

parent 2c3a1c9f
Pipeline #1517 passed with stages
in 4 minutes 39 seconds
...@@ -38,7 +38,7 @@ class external_trading_deque { ...@@ -38,7 +38,7 @@ class external_trading_deque {
external_trading_deque(unsigned thread_id, size_t num_entries) : thread_id_(thread_id), entries_(num_entries) {} external_trading_deque(unsigned thread_id, size_t num_entries) : thread_id_(thread_id), entries_(num_entries) {}
static traded_cas_field peek_traded_object(task *target_task); static traded_cas_field peek_traded_object(task *target_task);
static task *get_trade_object(task *target_task, traded_cas_field peeked_cas, external_trading_deque &other_deque); static task *get_trade_object(task *target_task, traded_cas_field peeked_cas);
/** /**
* Pushes a task on the bottom of the deque. * Pushes a task on the bottom of the deque.
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#define PLS_LOCK_FREE_TASK_H_ #define PLS_LOCK_FREE_TASK_H_
#include "pls/internal/scheduling/base_task.h" #include "pls/internal/scheduling/base_task.h"
#include "pls/internal/data_structures/stamped_split_integer.h" #include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/lock_free/traded_cas_field.h" #include "pls/internal/scheduling/lock_free/traded_cas_field.h"
namespace pls::internal::scheduling::lock_free { namespace pls::internal::scheduling::lock_free {
...@@ -19,20 +19,21 @@ struct task : public base_task { ...@@ -19,20 +19,21 @@ struct task : public base_task {
base_task(stack_memory, stack_size, depth, thread_id) {} base_task(stack_memory, stack_size, depth, thread_id) {}
// Additional info for lock-free stealing and resource trading. // Additional info for lock-free stealing and resource trading.
std::atomic<traded_cas_field> external_trading_deque_cas_{}; std::atomic<traded_cas_field> external_trading_deque_cas_{{}};
void push_task_chain(task *spare_task_chain); void push_task_chain(task *spare_task_chain);
void propose_push_task_chain(task *spare_task_chain);
bool accept_proposed();
bool decline_proposed();
task *pop_task_chain(); task *pop_task_chain();
void reset_task_chain(task *expected_content);
static task *find_task(unsigned id, unsigned depth);
private: private:
std::atomic<int> num_resources_{};
std::atomic<base_task *> resource_stack_next_{}; std::atomic<base_task *> resource_stack_next_{};
// STAMP = CAS stamp, half CAS length (16 or 32 Bit) // STAMP = CAS stamp, half CAS length (16 or 32 Bit)
// VALUE_1 = Root of the actual stack, indicated by thread ID (8 or 16 Bit) // VALUE = Root of the actual stack, indicated by thread ID (16 or 32 Bit)
// VALUE_2 = Proposed element in queue, indicated by thread ID (8 or 16 Bit) std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}};
std::atomic<data_structures::stamped_split_integer> resource_stack_root_{{0, 0, 0}};
}; };
} }
......
...@@ -4,92 +4,84 @@ ...@@ -4,92 +4,84 @@
#include <atomic> #include <atomic>
#include "pls/internal/base/error_handling.h" #include "pls/internal/data_structures/stamped_split_integer.h"
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
namespace pls::internal::scheduling::lock_free { namespace pls::internal::scheduling::lock_free {
struct task;
struct traded_cas_field { struct traded_cas_field {
static_assert(base::system_details::CACHE_LINE_SIZE >= 4,
"Traded objects must not use their last address bits, as we use them for status flags."
"As traded objects are usually cache aligned, we need big enough cache lines.");
// Base size of our CAS integer/pointer
static constexpr base::system_details::cas_integer CAS_SIZE = base::system_details::CAS_SIZE;
// States of the integer (tag indicating current content)
static constexpr base::system_details::cas_integer EMPTY_TAG = 0x0lu;
static constexpr base::system_details::cas_integer STAMP_TAG = 0x1lu;
static constexpr base::system_details::cas_integer TRADE_TAG = 0x2lu;
// Bitmasks and shifts for cas_integer_, two variants:
// cas_integer_ = traded object | tag
// cas_integer_ = stamp | id | tag
static constexpr base::system_details::cas_integer TAG_SIZE = 2ul;
static constexpr base::system_details::cas_integer TAG_BITS = ~((~0x0ul) << TAG_SIZE);
static constexpr base::system_details::cas_integer TRADED_OBJECT_SIZE = CAS_SIZE - TAG_SIZE;
static constexpr base::system_details::cas_integer TRADED_OBJECT_SHIFT = TAG_SIZE;
static constexpr base::system_details::cas_integer
TRADE_OBJECT_BITS = ~((~0x0ul) << TRADED_OBJECT_SIZE) << TRADED_OBJECT_SHIFT;
static constexpr base::system_details::cas_integer ID_SIZE = (CAS_SIZE / 2) - TAG_SIZE; // Half the CAS for the ID
static constexpr base::system_details::cas_integer ID_SHIFT = TAG_SIZE;
static constexpr base::system_details::cas_integer ID_BITS = ~((~0x0ul) << ID_SIZE) << ID_SHIFT;
static constexpr base::system_details::cas_integer STAMP_SIZE = (CAS_SIZE / 2); // Half the CAS for the STAMP
static constexpr base::system_details::cas_integer STAMP_SHIFT = TAG_SIZE + ID_SIZE;
static constexpr base::system_details::cas_integer STAMP_BITS = ~((~0x0ul) << STAMP_SIZE) << STAMP_SHIFT;
public: public:
void fill_with_stamp_and_deque(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) { // A traded cas field should always go through a specific state space.
cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | STAMP_TAG); // The field is initially filled with a request to trade the task.
// If this request is not met, the field is emptied out, keeping the stamp.
// If this request is met, the field is filled with an traded in task.
// Next, the traded in task is taken out of the field, leaving it empty, keeping the stamp.
//
// After this, the field can be re-used for trading at a different point.
traded_cas_field() : stamp_{0}, trade_request_thread_id_{0}, traded_task_id_{0}, non_empty_flag_{0} {};
void fill_with_trade_request(unsigned long stamp, unsigned long trading_thread_id) {
PLS_ASSERT(is_empty(),
"traded_cas_field must follow state transitions (to trade request)");
non_empty_flag_ = 1;
stamp_ = stamp;
trade_request_thread_id_ = trading_thread_id + 1;
traded_task_id_ = 0;
} }
void fill_with_stamp_and_empty(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) { void make_empty() {
cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | EMPTY_TAG); PLS_ASSERT(is_filled_with_object() || is_filled_with_trade_request(),
PLS_ASSERT(is_empty(), "Must be empty after filling it empty..."); "traded_cas_field must follow state transitions (to empty)");
non_empty_flag_ = 0;
} }
void fill_with_task(unsigned long task_id) {
PLS_ASSERT(is_filled_with_trade_request(),
"traded_cas_field must follow state transitions (to task)");
[[nodiscard]] base::system_details::cas_integer get_stamp() const { traded_task_id_ = task_id + 1;
PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one.");
return (((base::system_details::cas_integer) cas_integer_) & STAMP_BITS) >> STAMP_SHIFT;
} }
[[nodiscard]] base::system_details::cas_integer get_deque_id() const {
PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); [[nodiscard]] unsigned long get_stamp() const {
return (((base::system_details::cas_integer) cas_integer_) & ID_BITS) >> ID_SHIFT; return stamp_;
} }
[[nodiscard]] bool is_filled_with_stamp() const { [[nodiscard]] unsigned long get_trade_request_thread_id() const {
return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == STAMP_TAG; PLS_ASSERT(is_filled_with_trade_request() || is_filled_with_object(),
"Must only read out the tag when the traded field contains one.");
return trade_request_thread_id_ - 1;
} }
void fill_with_trade_object(task *new_task) { [[nodiscard]] unsigned long get_task_id() const {
PLS_ASSERT((((base::system_details::cas_integer) new_task) & TAG_BITS) == 0,
"Must only store aligned objects in this data structure (last bits are needed for tag bit)");
cas_integer_ = (((base::system_details::cas_integer) new_task) | TRADE_TAG);
}
[[nodiscard]] task *get_trade_object() const {
PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one."); PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one.");
return reinterpret_cast<task *>(((base::system_details::cas_integer) cas_integer_) & TRADE_OBJECT_BITS); return traded_task_id_ - 1;
}
[[nodiscard]] bool is_filled_with_trade_request() const {
return non_empty_flag_ && trade_request_thread_id_ && !traded_task_id_;
} }
[[nodiscard]] bool is_filled_with_object() const { [[nodiscard]] bool is_filled_with_object() const {
return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == TRADE_TAG; return non_empty_flag_ && traded_task_id_;
} }
[[nodiscard]] bool is_empty() const { [[nodiscard]] bool is_empty() const {
return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == EMPTY_TAG; return !non_empty_flag_;
} }
bool operator==(const traded_cas_field &other) const { bool operator==(const traded_cas_field &other) const {
return this->cas_integer_ == other.cas_integer_; return this->stamp_ == other.stamp_ && this->traded_task_id_ == other.traded_task_id_
&& this->trade_request_thread_id_ == other.trade_request_thread_id_
&& this->non_empty_flag_ == other.non_empty_flag_;
} }
bool operator!=(const traded_cas_field &other) const { bool operator!=(const traded_cas_field &other) const {
return !((*this) == other); return !((*this) == other);
} }
private: private:
base::system_details::cas_integer cas_integer_{}; base::system_details::cas_integer stamp_: base::system_details::CAS_SIZE / 2;
base::system_details::cas_integer trade_request_thread_id_: base::system_details::CAS_SIZE / 4;
base::system_details::cas_integer traded_task_id_: base::system_details::CAS_SIZE / 4 - 1;
base::system_details::cas_integer non_empty_flag_: 1;
}; };
static_assert(sizeof(traded_cas_field) * 8 == base::system_details::CAS_SIZE, "Must only have lock free CAS objects");
} }
......
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
#include <thread>
#include <chrono>
void pls_error(const char *msg) { void pls_error(const char *msg) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(5s);
PLS_ERROR(msg); PLS_ERROR(msg);
} }
#include "pls/internal/scheduling/lock_free/external_trading_deque.h" #include "pls/internal/scheduling/lock_free/external_trading_deque.h"
#include "pls/internal/scheduling/lock_free/traded_cas_field.h" #include "pls/internal/scheduling/lock_free/traded_cas_field.h"
#include "pls/internal/scheduling/lock_free/task.h"
namespace pls::internal::scheduling::lock_free { namespace pls::internal::scheduling::lock_free {
...@@ -9,14 +10,15 @@ traded_cas_field external_trading_deque::peek_traded_object(task *target_task) { ...@@ -9,14 +10,15 @@ traded_cas_field external_trading_deque::peek_traded_object(task *target_task) {
} }
task *external_trading_deque::get_trade_object(task *target_task, task *external_trading_deque::get_trade_object(task *target_task,
traded_cas_field peeked_cas, traded_cas_field peeked_cas) {
external_trading_deque &other_deque) {
traded_cas_field current_cas = peeked_cas; traded_cas_field current_cas = peeked_cas;
if (current_cas.is_filled_with_object()) { if (current_cas.is_filled_with_object()) {
task *result = current_cas.get_trade_object(); auto result_id = current_cas.get_task_id();
traded_cas_field empty_cas;
empty_cas.fill_with_stamp_and_empty(other_deque.bot_internal_.stamp_, other_deque.thread_id_); traded_cas_field empty_cas = peeked_cas;
empty_cas.make_empty();
if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) {
task *result = task::find_task(result_id, target_task->depth_);
return result; return result;
} }
} }
...@@ -35,7 +37,7 @@ void external_trading_deque::push_bot(task *published_task) { ...@@ -35,7 +37,7 @@ void external_trading_deque::push_bot(task *published_task) {
// Field that all threads synchronize on. // Field that all threads synchronize on.
// This happens not in the deque itself, but in the published task. // This happens not in the deque itself, but in the published task.
traded_cas_field sync_cas_field; traded_cas_field sync_cas_field;
sync_cas_field.fill_with_stamp_and_deque(expected_stamp, thread_id_); sync_cas_field.fill_with_trade_request(expected_stamp, thread_id_);
published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release); published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release);
// Advance the bot pointer. Linearization point for making the task public. // Advance the bot pointer. Linearization point for making the task public.
...@@ -45,6 +47,19 @@ void external_trading_deque::push_bot(task *published_task) { ...@@ -45,6 +47,19 @@ void external_trading_deque::push_bot(task *published_task) {
} }
void external_trading_deque::reset_bot_and_top() { void external_trading_deque::reset_bot_and_top() {
for (int i = bot_internal_.value_; i >= 0; i--) {
auto &current_entry = entries_[i];
auto *task = current_entry.traded_task_.load(std::memory_order_relaxed);
auto task_cas = task->external_trading_deque_cas_.load();
if (task_cas.is_filled_with_trade_request() && task_cas.get_trade_request_thread_id() == thread_id_) {
PLS_ASSERT(false, "Must not have 'non stolen' tasks left in own task chain!");
}
current_entry.traded_task_.store(nullptr, std::memory_order_relaxed);
current_entry.forwarding_stamp_.store(0, std::memory_order_relaxed);
}
bot_internal_.value_ = 0; bot_internal_.value_ = 0;
bot_internal_.stamp_++; bot_internal_.stamp_++;
...@@ -53,7 +68,10 @@ void external_trading_deque::reset_bot_and_top() { ...@@ -53,7 +68,10 @@ void external_trading_deque::reset_bot_and_top() {
} }
task *external_trading_deque::pop_bot() { task *external_trading_deque::pop_bot() {
if (bot_internal_.value_ > 0) { if (bot_internal_.value_ == 0) {
return nullptr;
}
bot_internal_.value_--; bot_internal_.value_--;
bot_.store(bot_internal_.value_, std::memory_order_relaxed); bot_.store(bot_internal_.value_, std::memory_order_relaxed);
...@@ -63,19 +81,21 @@ task *external_trading_deque::pop_bot() { ...@@ -63,19 +81,21 @@ task *external_trading_deque::pop_bot() {
// We know what value must be in the cas field if no other thread stole it. // We know what value must be in the cas field if no other thread stole it.
traded_cas_field expected_sync_cas_field; traded_cas_field expected_sync_cas_field;
expected_sync_cas_field.fill_with_stamp_and_deque(expected_stamp, thread_id_); expected_sync_cas_field.fill_with_trade_request(expected_stamp, thread_id_);
traded_cas_field empty_cas_field; traded_cas_field empty_cas_field = expected_sync_cas_field;
empty_cas_field.fill_with_stamp_and_empty(expected_stamp, thread_id_); empty_cas_field.make_empty();
if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field,
empty_cas_field, empty_cas_field,
std::memory_order_acq_rel)) { std::memory_order_acq_rel)) {
return popped_task; current_entry.traded_task_.store(nullptr, std::memory_order_relaxed);
} current_entry.forwarding_stamp_.store(0, std::memory_order_relaxed);
}
return popped_task;
} else {
reset_bot_and_top(); reset_bot_and_top();
return nullptr; return nullptr;
}
} }
external_trading_deque::peek_result external_trading_deque::peek_top() { external_trading_deque::peek_result external_trading_deque::peek_top() {
...@@ -102,28 +122,34 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul ...@@ -102,28 +122,34 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul
task *result = target_entry.traded_task_.load(); task *result = target_entry.traded_task_.load();
unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load();
if (result == nullptr) {
return nullptr;
}
if (forwarding_stamp != expected_top.stamp_) {
// ...we failed because the top tag lags behind...try to fix it.
// This means only updating the tag, as this location can still hold data we need.
top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value_});
return nullptr;
}
// Try to get it by CAS with the expected field entry, giving up our offered_task for it // Try to get it by CAS with the expected field entry, giving up our offered_task for it
traded_cas_field expected_sync_cas_field; traded_cas_field expected_sync_cas_field;
expected_sync_cas_field.fill_with_stamp_and_deque(expected_top.stamp_, thread_id_); expected_sync_cas_field.fill_with_trade_request(expected_top.stamp_, thread_id_);
traded_cas_field offered_field; traded_cas_field offered_field = expected_sync_cas_field;
offered_field.fill_with_trade_object(offered_task); offered_field.fill_with_task(offered_task->thread_id_);
if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) {
// We got it, for sure move the top pointer forward. // We got it, for sure move the top pointer forward.
top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1}); top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1});
return result; return result;
} else { } else {
if (forwarding_stamp != expected_top.stamp_) { // TODO: Re-Check this condition for forwading the stamp! Should only happen if another top-stealer took the
// ...we failed because the top tag lags behind...try to fix it. // slot that we where interested in!
// This means only updating the tag, as this location can still hold data we need. if (expected_sync_cas_field.is_filled_with_object() && expected_sync_cas_field.get_stamp() == expected_top.stamp_
top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value_}); && expected_sync_cas_field.get_trade_request_thread_id() == thread_id_) {
top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1});
} }
// TODO: Figure out how other tasks can put the stamp forward without race conditions.
// This must be here to ensure the lock-free property.
// For now first leave it out, as it makes fixing the other non-blocking interaction
// on the resource stack harder.
return nullptr; return nullptr;
} }
} }
......
...@@ -3,118 +3,88 @@ ...@@ -3,118 +3,88 @@
namespace pls::internal::scheduling::lock_free { namespace pls::internal::scheduling::lock_free {
// TODO: this 'global' lookup hardly bound to the full scheduler to be setup could be reworked for better testing. task *task::find_task(unsigned id, unsigned depth) {
static task *find_task(unsigned id, unsigned depth) {
return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth); return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth);
} }
void task::propose_push_task_chain(task *spare_task_chain) { void task::push_task_chain(task *spare_task_chain) {
num_resources_++;
PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_, PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_,
"Makes no sense to push task onto itself, as it is not clean by definition."); "Makes no sense to push task onto itself, as it is not clean by definition.");
PLS_ASSERT(this->depth_ == spare_task_chain->depth_, PLS_ASSERT(this->depth_ == spare_task_chain->depth_,
"Must only push tasks with correct depth."); "Must only push tasks with correct depth.");
data_structures::stamped_split_integer current_root; data_structures::stamped_integer current_root;
data_structures::stamped_split_integer target_root; data_structures::stamped_integer target_root;
int iteration = 0;
do { do {
iteration++;
current_root = this->resource_stack_root_.load(); current_root = this->resource_stack_root_.load();
PLS_ASSERT(current_root.value_2_ == 0, "Must only propose one push at a time!");
// Add it to the current stack state by chaining its next stack task to the current base.
// Popping threads will see this proposed task as if it is the first item in the stack and pop it first,
// making sure that the chain is valid without any further modification when accepting the proposed task.
auto *current_root_task = current_root.value_1_ == 0 ? nullptr : find_task(current_root.value_1_ - 1, this->depth_);
spare_task_chain->resource_stack_next_.store(current_root_task);
target_root.stamp_ = current_root.stamp_ + 1; target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_1_ = current_root.value_1_; target_root.value_ = spare_task_chain->thread_id_ + 1;
target_root.value_2_ = spare_task_chain->thread_id_ + 1;
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); // TODO: Setting the resource stack next AFTER publishing the task to the CAS field
} // is a race, as the resource stack next field can be tampered with.
if (current_root.value_ == 0) {
bool task::accept_proposed() { // Empty, simply push in with no successor.
data_structures::stamped_split_integer current_root; // We are sure that a spare_task_chain is not in any stack when pushing it.
data_structures::stamped_split_integer target_root; // Thus, its resource_stack_next_ field must be nullptr.
do { // TODO: this should be our race?
current_root = this->resource_stack_root_.load(); // TODO: add more checks (see if this is violated AND cas succeeds)
if (current_root.value_2_ == 0) { auto *old_value = spare_task_chain->resource_stack_next_.exchange(nullptr);
return false; // We are done, nothing to accept! if (old_value != nullptr) {
printf("Why would this invariant happen?\n");
}
} else {
// Already an entry. Find it's corresponding task and set it as our successor.
auto *current_root_task = find_task(current_root.value_ - 1, this->depth_);
spare_task_chain->resource_stack_next_.store(current_root_task);
} }
target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_1_ = current_root.value_2_;
target_root.value_2_ = 0;
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root));
return true;
} }
bool task::decline_proposed() { void task::reset_task_chain(task *expected_content) {
bool proposed_still_there; num_resources_--;
data_structures::stamped_split_integer current_root; data_structures::stamped_integer current_root = this->resource_stack_root_.load();
data_structures::stamped_split_integer target_root; PLS_ASSERT(current_root.value_ == expected_content->thread_id_ + 1,
do { "Must only reset the task chain if we exactly know its state! (current_root.value_)");
current_root = this->resource_stack_root_.load();
proposed_still_there = current_root.value_2_ != 0;
// No need to fetch anything, just delete the proposed item. auto *current_root_task = find_task(current_root.value_ - 1, this->depth_);
target_root.stamp_ = current_root.stamp_ + 1; if (current_root_task->resource_stack_next_.load(std::memory_order_relaxed) != nullptr) {
target_root.value_1_ = current_root.value_1_; printf("This could have been the bug...\n");
target_root.value_2_ = 0; }
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root));
return proposed_still_there;
}
void task::push_task_chain(task *spare_task_chain) { data_structures::stamped_integer target_root;
PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_, target_root.stamp_ = current_root.stamp_ + 1;
"Makes no sense to push task onto itself, as it is not clean by definition."); bool success = this->resource_stack_root_.compare_exchange_strong(current_root, target_root);
PLS_ASSERT(this->depth_ == spare_task_chain->depth_, PLS_ASSERT(success, "Must always succeed in resetting the chain, as we must be the sole one operating on it!");
"Must only push tasks with correct depth.");
propose_push_task_chain(spare_task_chain);
accept_proposed();
} }
task *task::pop_task_chain() { task *task::pop_task_chain() {
data_structures::stamped_split_integer current_root; data_structures::stamped_integer current_root;
data_structures::stamped_split_integer target_root; data_structures::stamped_integer target_root;
task *output_task; task *output_task;
do { do {
current_root = this->resource_stack_root_.load(); current_root = this->resource_stack_root_.load();
if (current_root.value_ == 0) {
if (current_root.value_2_ == 0) { // Empty...
if (current_root.value_1_ == 0) {
// No main chain and no proposed element.
return nullptr; return nullptr;
} else { } else {
// No entries in the proposed slot, but fond // Found something, try to pop it
// elements in primary stack, try to pop from there. auto *current_root_task = find_task(current_root.value_ - 1, this->depth_);
auto *current_root_task = find_task(current_root.value_1_ - 1, this->depth_);
auto *next_stack_task = current_root_task->resource_stack_next_.load(); auto *next_stack_task = current_root_task->resource_stack_next_.load();
target_root.stamp_ = current_root.stamp_ + 1; target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_1_ = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; target_root.value_ = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
target_root.value_2_ = 0;
output_task = current_root_task; output_task = current_root_task;
} }
} else {
// We got a proposed element. Treat it as beginning of resource stack by
// popping it instead of the main element chain.
auto *proposed_task = find_task(current_root.value_2_ - 1, this->depth_);
// Empty out the proposed slot
target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_1_ = current_root.value_1_;
target_root.value_2_ = 0;
output_task = proposed_task;
}
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root));
PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains."); PLS_ASSERT(num_resources_.fetch_add(-1) > 0, "Must only return an task from the chain if there are items!");
output_task->resource_stack_next_.store(nullptr); output_task->resource_stack_next_.store(nullptr);
return output_task; return output_task;
......
...@@ -53,6 +53,8 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state ...@@ -53,6 +53,8 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state
task *traded_task = static_cast<task *>(scheduler::get_trade_task(stolen_task, stealing_state)); task *traded_task = static_cast<task *>(scheduler::get_trade_task(stolen_task, stealing_state));
base_task *chain_after_stolen_task = traded_task->next_; base_task *chain_after_stolen_task = traded_task->next_;
// TODO: traded task resource_stack_next_ field is now marked as mine
// perform the actual pop operation // perform the actual pop operation
task *pop_result_task = deque_.pop_top(traded_task, peek); task *pop_result_task = deque_.pop_top(traded_task, peek);
if (pop_result_task) { if (pop_result_task) {
...@@ -62,31 +64,27 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state ...@@ -62,31 +64,27 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state
"We must only steal the task that we peeked at!"); "We must only steal the task that we peeked at!");
// Update the resource stack associated with the stolen task. // Update the resource stack associated with the stolen task.
// We first propose the traded task, in case someone took our traded in field directly. // TODO: push only onto task chain if the resource_stack_next_ was still mine
// stolen_task->propose_push_task_chain(traded_task); // (otherwise the CAS could have been stolen).
// This makes sure, that we never 'destroy' a task that we do not own by our stack push routine.
stolen_task->push_task_chain(traded_task); stolen_task->push_task_chain(traded_task);
auto peeked_traded_object = external_trading_deque::peek_traded_object(stolen_task); auto peeked_traded_object = external_trading_deque::peek_traded_object(stolen_task);
task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task, task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task, peeked_traded_object);
peeked_traded_object,
stealing_state.get_task_manager().deque_);
if (optional_exchanged_task) { if (optional_exchanged_task) {
PLS_ASSERT(optional_exchanged_task == traded_task, PLS_ASSERT(optional_exchanged_task == traded_task,
"We are currently executing this, no one else can put another task in this field!"); "We are currently executing this, no one else can put another task in this field!");
// TODO: we should also assert that the push worked in this case!
// No one took the traded task, push it finally into the stack.
// stolen_task->accept_proposed();
} else { } else {
// Someone explicitly took the traded task from us, remove it from the stack. // Someone explicitly took the traded task from us, remove it from the stack.
// bool proposed_still_in_stack = stolen_task->decline_proposed(); // TODO: if the push failed, we do not need to reset anything.
// PLS_ASSERT(proposed_still_in_stack, // Otherwise the normal invariant that we seek holds.
// "The proposed task was stolen over the CAS field. It is not possible for another consumer to take the proposed task!"); stolen_task->reset_task_chain(traded_task);
base_task *popped_task = stolen_task->pop_task_chain();
PLS_ASSERT(popped_task == traded_task, "Other task must only steal CAS task if deque was empty!");
} }
return std::tuple{stolen_task, chain_after_stolen_task, true}; return std::tuple{stolen_task, chain_after_stolen_task, true};
} else { } else {
// TODO: traded task resource_stack_next_ field is de-marked from being mine
return std::tuple{nullptr, nullptr, false}; return std::tuple{nullptr, nullptr, false};
} }
} else { } else {
...@@ -102,6 +100,7 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { ...@@ -102,6 +100,7 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
auto peeked_task_cas_before = external_trading_deque::peek_traded_object(target_task); auto peeked_task_cas_before = external_trading_deque::peek_traded_object(target_task);
task *pop_result = target_task->pop_task_chain(); task *pop_result = target_task->pop_task_chain();
if (pop_result) { if (pop_result) {
PLS_ASSERT(scheduler::check_task_chain_backward(*pop_result), "Must only pop proper task chains.");
return pop_result; // Got something, so we are simply done here return pop_result; // Got something, so we are simply done here
} }
auto peeked_task_cas_after = external_trading_deque::peek_traded_object(target_task); auto peeked_task_cas_after = external_trading_deque::peek_traded_object(target_task);
...@@ -110,11 +109,12 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { ...@@ -110,11 +109,12 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
continue; continue;
} }
if (peeked_task_cas_after.is_empty() || peeked_task_cas_after.is_filled_with_stamp()) { if (peeked_task_cas_after.is_empty() || peeked_task_cas_after.is_filled_with_trade_request()) {
if (peeked_task_cas_after.is_filled_with_stamp()) { if (peeked_task_cas_after.is_filled_with_trade_request()) {
printf("what happened!\n"); // TODO: issue to 80% because the stealing threads skip parts of the deque. printf("what happened! (%d)\n", base_task->thread_id_);
continue; continue;
} }
// The task was 'stable' during our pop from the stack. // The task was 'stable' during our pop from the stack.
// Or in other words: no other thread operated on the task. // Or in other words: no other thread operated on the task.
// We are therefore the last child and do not get a clean task chain. // We are therefore the last child and do not get a clean task chain.
...@@ -123,8 +123,7 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { ...@@ -123,8 +123,7 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
// The task was stable, but has a potential resource attached in its cas field. // The task was stable, but has a potential resource attached in its cas field.
// Try to get it to not be blocked by the other preempted task. // Try to get it to not be blocked by the other preempted task.
// task *optional_cas_task = // task *optional_cas_task = external_trading_deque::get_trade_object(target_task, peeked_task_cas_after);
// external_trading_deque::get_trade_object(target_task, peeked_task_cas_after, this->deque_);
// if (optional_cas_task) { // if (optional_cas_task) {
// // We got it, thus the other thread has not got it and will remove it from the queue. // // We got it, thus the other thread has not got it and will remove it from the queue.
// return optional_cas_task; // return optional_cas_task;
......
...@@ -15,25 +15,25 @@ using namespace pls::internal::scheduling::lock_free; ...@@ -15,25 +15,25 @@ using namespace pls::internal::scheduling::lock_free;
TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/lock_free/traded_cas_field]") { TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/lock_free/traded_cas_field]") {
traded_cas_field empty_field; traded_cas_field empty_field;
REQUIRE(empty_field.is_empty()); REQUIRE(empty_field.is_empty());
REQUIRE(!empty_field.is_filled_with_stamp()); REQUIRE(!empty_field.is_filled_with_trade_request());
REQUIRE(!empty_field.is_filled_with_object()); REQUIRE(!empty_field.is_filled_with_object());
const int stamp = 42; const int stamp = 42;
const int ID = 10; const int ID = 10;
traded_cas_field tag_field; traded_cas_field tag_field;
tag_field.fill_with_stamp_and_deque(stamp, ID); tag_field.fill_with_trade_request(stamp, ID);
REQUIRE(tag_field.is_filled_with_stamp()); REQUIRE(tag_field.is_filled_with_trade_request());
REQUIRE(!tag_field.is_empty()); REQUIRE(!tag_field.is_empty());
REQUIRE(!tag_field.is_filled_with_object()); REQUIRE(!tag_field.is_filled_with_object());
REQUIRE(tag_field.get_stamp() == stamp); REQUIRE(tag_field.get_stamp() == stamp);
REQUIRE(tag_field.get_deque_id() == ID); REQUIRE(tag_field.get_trade_request_thread_id() == ID);
alignas(64) task obj{nullptr, 0, 0, 0}; alignas(64) task obj{nullptr, 0, 0, 0};
traded_cas_field obj_field; traded_cas_field obj_field = tag_field;
obj_field.fill_with_trade_object(&obj); obj_field.fill_with_task(obj.thread_id_);
REQUIRE(obj_field.is_filled_with_object()); REQUIRE(obj_field.is_filled_with_object());
REQUIRE(!obj_field.is_empty()); REQUIRE(!obj_field.is_empty());
REQUIRE(!obj_field.is_filled_with_stamp()); REQUIRE(!obj_field.is_filled_with_trade_request());
} }
TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") { TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") {
...@@ -53,53 +53,28 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") { ...@@ -53,53 +53,28 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") {
REQUIRE(tasks[0]->pop_task_chain() == nullptr); REQUIRE(tasks[0]->pop_task_chain() == nullptr);
} }
SECTION("propose/pop") {
tasks[0]->propose_push_task_chain(tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
}
SECTION("propose/accept/pop") {
tasks[0]->propose_push_task_chain(tasks[1]);
tasks[0]->accept_proposed();
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
}
SECTION("propose/decline/pop") {
tasks[0]->propose_push_task_chain(tasks[1]);
tasks[0]->decline_proposed();
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
}
SECTION("propose intertwined normal ops") { SECTION("propose intertwined normal ops") {
tasks[0]->push_task_chain(tasks[1]); tasks[0]->push_task_chain(tasks[1]);
tasks[0]->propose_push_task_chain(tasks[2]); tasks[0]->push_task_chain(tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
tasks[0]->push_task_chain(tasks[1]); tasks[0]->push_task_chain(tasks[1]);
tasks[0]->propose_push_task_chain(tasks[2]); tasks[0]->push_task_chain(tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
tasks[0]->accept_proposed();
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
tasks[0]->push_task_chain(tasks[1]); tasks[0]->push_task_chain(tasks[1]);
tasks[0]->propose_push_task_chain(tasks[2]);
tasks[0]->decline_proposed();
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == nullptr); REQUIRE(tasks[0]->pop_task_chain() == nullptr);
tasks[0]->push_task_chain(tasks[1]); tasks[0]->push_task_chain(tasks[1]);
tasks[0]->propose_push_task_chain(tasks[2]); tasks[0]->push_task_chain(tasks[2]);
tasks[0]->accept_proposed();
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
...@@ -118,63 +93,69 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") { ...@@ -118,63 +93,69 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") {
} }
TEST_CASE("external trading deque", "[internal/scheduling/lock_free/external_trading_deque]") { TEST_CASE("external trading deque", "[internal/scheduling/lock_free/external_trading_deque]") {
external_trading_deque deque_1{1, 16}; // simulate scheduler with four threads and depth 1. We are thread 0.
external_trading_deque deque_2{2, 16}; pls::scheduler scheduler{4, 4, 4096, false};
pls::internal::scheduling::thread_state::set(&scheduler.thread_state_for(0));
task tasks[4] = {{nullptr, 0, 0, 0},
{nullptr, 0, 1, 0}, task *tasks_1[] = {scheduler.task_manager_for(0).get_task(0),
{nullptr, 0, 2, 0}, scheduler.task_manager_for(0).get_task(1),
{nullptr, 0, 3, 0}}; scheduler.task_manager_for(0).get_task(2),
scheduler.task_manager_for(0).get_task(3)};
SECTION("basic operations") { task *tasks_2[] = {scheduler.task_manager_for(1).get_task(0),
// Must start empty scheduler.task_manager_for(1).get_task(1),
REQUIRE(!deque_1.pop_bot()); scheduler.task_manager_for(1).get_task(2),
REQUIRE(!deque_2.pop_bot()); scheduler.task_manager_for(1).get_task(3)};
// Local push/pop auto &thread_state_1 = scheduler.thread_state_for(0);
deque_1.push_bot(&tasks[0]); auto &task_manager_1 = scheduler.thread_state_for(0).get_task_manager();
REQUIRE(deque_1.pop_bot() == &tasks[0]); auto &thread_state_2 = scheduler.thread_state_for(1);
REQUIRE(!deque_1.pop_bot()); auto &task_manager_2 = scheduler.thread_state_for(1).get_task_manager();
// Local push, external pop SECTION("Must start empty") {
deque_1.push_bot(&tasks[0]); REQUIRE(!task_manager_1.pop_local_task());
auto peek = deque_1.peek_top(); REQUIRE(!task_manager_1.pop_local_task());
REQUIRE(deque_1.pop_top(&tasks[1], peek) == &tasks[0]); }
auto trade_peek = deque_1.peek_traded_object(&tasks[0]);
REQUIRE(external_trading_deque::get_trade_object(&tasks[0], trade_peek, deque_1) == &tasks[1]); SECTION("Local push/pop") {
REQUIRE(!deque_1.pop_top(&tasks[1], peek)); task_manager_1.push_local_task(tasks_1[0]);
REQUIRE(!deque_1.pop_bot()); REQUIRE(task_manager_1.pop_local_task() == tasks_1[0]);
REQUIRE(!task_manager_1.pop_local_task());
// Keeps push/pop order }
deque_1.push_bot(&tasks[0]);
deque_1.push_bot(&tasks[1]); SECTION("Local push, external pop") {
REQUIRE(deque_1.pop_bot() == &tasks[1]); task_manager_1.push_local_task(tasks_1[0]);
REQUIRE(deque_1.pop_bot() == &tasks[0]); REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == tasks_1[0]);
REQUIRE(!deque_1.pop_bot()); REQUIRE(task_manager_2.pop_clean_task_chain(tasks_1[0]) == tasks_2[0]);
REQUIRE(task_manager_1.pop_local_task() == nullptr);
deque_1.push_bot(&tasks[0]); }
deque_1.push_bot(&tasks[1]);
auto peek1 = deque_1.peek_top(); SECTION("Keeps push/pop order #1") {
REQUIRE(deque_1.pop_top(&tasks[2], peek1) == &tasks[0]); task_manager_1.push_local_task(tasks_1[0]);
auto peek2 = deque_1.peek_top(); task_manager_1.push_local_task(tasks_1[1]);
REQUIRE(deque_1.pop_top(&tasks[3], peek2) == &tasks[1]); REQUIRE(task_manager_1.pop_local_task() == tasks_1[1]);
REQUIRE(task_manager_1.pop_local_task() == tasks_1[0]);
REQUIRE(!task_manager_1.pop_local_task());
}
SECTION("Keeps push/pop order #2") {
task_manager_1.push_local_task(tasks_1[0]);
task_manager_1.push_local_task(tasks_1[1]);
REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == tasks_1[0]);
REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == tasks_1[1]);
} }
SECTION("Interwined execution #1") { SECTION("Interwined execution #1") {
// Two top poppers // Two top poppers
deque_1.push_bot(&tasks[0]); task_manager_1.push_local_task(tasks_1[0]);
auto peek1 = deque_1.peek_top(); REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == tasks_1[0]);
auto peek2 = deque_1.peek_top(); REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == nullptr);
REQUIRE(deque_1.pop_top(&tasks[1], peek1) == &tasks[0]);
REQUIRE(!deque_1.pop_top(&tasks[2], peek2));
} }
SECTION("Interwined execution #2") { SECTION("Interwined execution #2") {
// Top and bottom access // Top and bottom access
deque_1.push_bot(&tasks[0]); task_manager_1.push_local_task(tasks_1[0]);
auto peek1 = deque_1.peek_top(); REQUIRE(task_manager_1.pop_local_task() == tasks_1[0]);
REQUIRE(deque_1.pop_bot() == &tasks[0]); REQUIRE(std::get<0>(task_manager_1.steal_task(thread_state_2)) == nullptr);
REQUIRE(!deque_1.pop_top(&tasks[2], peek1));
} }
} }
#endif // PLS_DEQUE_VARIANT == PLS_DEQUE_LOCK_FREE #endif // PLS_DEQUE_VARIANT == PLS_DEQUE_LOCK_FREE
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment