Commit 2c3a1c9f by FritzFlorian

WIP: Re-Work resource-trading implementation to fix race condition.

parent 86333a60
Pipeline #1516 passed with stages
in 4 minutes 36 seconds
......@@ -23,6 +23,7 @@ add_library(pls STATIC
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/stamped_integer.h
include/pls/internal/data_structures/stamped_split_integer.h
include/pls/internal/data_structures/delayed_initialization.h
include/pls/internal/data_structures/bounded_trading_deque.h
include/pls/internal/data_structures/bounded_ws_deque.h
......
......@@ -156,14 +156,14 @@ class bounded_trading_deque {
entries_{entries}, num_entries_{num_entries} {};
void push_bot(EntryType *offered_object) {
auto expected_stamp = bot_internal_.stamp;
auto &current_entry = entries_[bot_internal_.value];
auto expected_stamp = bot_internal_.stamp_;
auto &current_entry = entries_[bot_internal_.value_];
current_entry.fill_slots(offered_object, expected_stamp);
bot_internal_.stamp++;
bot_internal_.value++;
bot_internal_.stamp_++;
bot_internal_.value_++;
bot_.store(bot_internal_.value, std::memory_order_release);
bot_.store(bot_internal_.value_, std::memory_order_release);
}
struct pop_result {
......@@ -175,14 +175,14 @@ class bounded_trading_deque {
optional<TradedType *> traded_;
};
pop_result pop_bot() {
if (bot_internal_.value == 0) {
if (bot_internal_.value_ == 0) {
return pop_result{}; // Empty, nothing to return...
}
// Go one step back
bot_internal_.value--;
bot_internal_.value_--;
auto &current_entry = entries_[bot_internal_.value];
auto &current_entry = entries_[bot_internal_.value_];
optional<TradedType *> traded_object = current_entry.acquire_traded_type();
optional<EntryType *> queue_entry;
if (traded_object) {
......@@ -193,10 +193,10 @@ class bounded_trading_deque {
queue_entry = optional<EntryType *>{current_entry.get_object()};
}
bot_.store(bot_internal_.value, std::memory_order_relaxed);
if (bot_internal_.value == 0) {
bot_internal_.stamp++;
top_.store({bot_internal_.stamp, 0}, std::memory_order_release);
bot_.store(bot_internal_.value_, std::memory_order_relaxed);
if (bot_internal_.value_ == 0) {
bot_internal_.stamp_++;
top_.store({bot_internal_.stamp_, 0}, std::memory_order_release);
}
return pop_result{queue_entry, traded_object};
......@@ -205,10 +205,10 @@ class bounded_trading_deque {
std::tuple<optional<EntryType *>, stamped_integer> peek_top() {
auto local_top = top_.load();
auto local_bot = bot_.load();
if (local_top.value >= local_bot) {
if (local_top.value_ >= local_bot) {
return std::make_tuple(optional<EntryType *>{}, local_top);
} else {
return std::make_tuple(optional<EntryType *>{entries_[local_top.value].get_object()}, local_top);
return std::make_tuple(optional<EntryType *>{entries_[local_top.value_].get_object()}, local_top);
}
}
......@@ -219,24 +219,24 @@ class bounded_trading_deque {
optional<EntryType *> pop_top(TradedType *trade_offer, stamped_integer local_top) {
auto local_bot = bot_.load();
if (local_top.value >= local_bot) {
if (local_top.value_ >= local_bot) {
return optional<EntryType *>{};
}
unsigned long expected_top_stamp = local_top.stamp;
optional<EntryType *> entry = entries_[local_top.value].trade_object(trade_offer, expected_top_stamp);
unsigned long expected_top_stamp = local_top.stamp_;
optional<EntryType *> entry = entries_[local_top.value_].trade_object(trade_offer, expected_top_stamp);
if (entry) {
// We got it, for sure move the top pointer forward.
top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1});
top_.compare_exchange_strong(local_top, {local_top.stamp_ + 1, local_top.value_ + 1});
} else {
// We did not get it....
if (entries_[local_top.value].is_empty()) {
if (entries_[local_top.value_].is_empty()) {
// ...update the top stamp, so the next call can get it (we still make system progress, as the owner
// must have popped off the element)
top_.compare_exchange_strong(local_top, {expected_top_stamp, local_top.value});
top_.compare_exchange_strong(local_top, {expected_top_stamp, local_top.value_});
} else {
// ...move the pointer forward if someone else put a valid trade object in there.
top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1});
top_.compare_exchange_strong(local_top, {local_top.stamp_ + 1, local_top.value_ + 1});
}
}
......
......@@ -40,19 +40,19 @@ class bounded_ws_deque {
}
bool is_empty() {
return top_.load().value < bottom_.load();
return top_.load().value_ < bottom_.load();
}
optional<T> pop_top() {
stamped_integer old_top = top_.load();
unsigned int new_stamp = old_top.stamp + 1;
unsigned int new_value = old_top.value + 1;
unsigned int new_stamp = old_top.stamp_ + 1;
unsigned int new_value = old_top.value_ + 1;
if (bottom_.load() <= old_top.value) {
if (bottom_.load() <= old_top.value_) {
return optional<T>();
}
optional<T> result(item_array_[old_top.value]);
optional<T> result(item_array_[old_top.value_]);
if (top_.compare_exchange_strong(old_top, {new_stamp, new_value})) {
return result;
}
......@@ -71,14 +71,14 @@ class bounded_ws_deque {
optional<T> result(item_array_[local_bottom_]);
stamped_integer old_top = top_.load(std::memory_order_acquire);
if (local_bottom_ > old_top.value) {
if (local_bottom_ > old_top.value_) {
// Enough distance to just return the value
return result;
}
if (local_bottom_ == old_top.value) {
if (local_bottom_ == old_top.value_) {
local_bottom_ = 0;
bottom_.store(local_bottom_);
if (top_.compare_exchange_strong(old_top, {old_top.stamp + 1, 0})) {
if (top_.compare_exchange_strong(old_top, {old_top.stamp_ + 1, 0})) {
// We won the competition and the queue is empty
return result;
}
......@@ -87,7 +87,7 @@ class bounded_ws_deque {
// The queue is empty and we lost the competition
local_bottom_ = 0;
bottom_.store(local_bottom_);
top_.store({old_top.stamp + 1, 0});
top_.store({old_top.stamp_ + 1, 0});
return optional<T>();
}
......
......@@ -4,23 +4,20 @@
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace data_structures {
namespace pls::internal::data_structures {
constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2;
struct stamped_integer {
using member_t = base::system_details::cas_integer;
member_t stamp:HALF_CACHE_LINE;
member_t value:HALF_CACHE_LINE;
member_t stamp_: base::system_details::CAS_SIZE / 2;
member_t value_: base::system_details::CAS_SIZE / 2;
stamped_integer() : stamp{0}, value{0} {};
stamped_integer(member_t new_value) : stamp{0}, value{new_value} {};
stamped_integer(member_t new_stamp, member_t new_value) : stamp{new_stamp}, value{new_value} {};
stamped_integer() : stamp_{0}, value_{0} {};
stamped_integer(member_t new_value) : stamp_{0}, value_{new_value} {};
stamped_integer(member_t new_stamp, member_t new_value) : stamp_{new_stamp}, value_{new_value} {};
bool operator==(const stamped_integer &other) const noexcept {
return stamp == other.stamp && value == other.value;
return stamp_ == other.stamp_ && value_ == other.value_;
}
bool operator!=(const stamped_integer &other) const noexcept {
......@@ -29,7 +26,5 @@ struct stamped_integer {
};
}
}
}
#endif //PLS_STAMPED_INTEGER_H_
#ifndef PLS_STAMPED_SPLIT_INTEGER_H_
#define PLS_STAMPED_SPLIT_INTEGER_H_
#include "pls/internal/base/system_details.h"
namespace pls::internal::data_structures {
struct stamped_split_integer {
using member_t = base::system_details::cas_integer;
member_t stamp_: base::system_details::CAS_SIZE / 2;
member_t value_1_: base::system_details::CAS_SIZE / 4;
member_t value_2_: base::system_details::CAS_SIZE / 4;
stamped_split_integer() : stamp_{0}, value_1_{0}, value_2_{0} {};
stamped_split_integer(member_t value_1, member_t value_2) : stamp_{0}, value_1_{value_1}, value_2_{value_2} {};
stamped_split_integer(member_t new_stamp, member_t value_1, member_t value_2) : stamp_{new_stamp},
value_1_{value_1},
value_2_{value_2} {};
bool operator==(const stamped_split_integer &other) const noexcept {
return stamp_ == other.stamp_ && value_1_ == other.value_1_ && value_2_ == other.value_2_;
}
bool operator!=(const stamped_split_integer &other) const noexcept {
return !(*this == other);
}
};
}
#endif //PLS_STAMPED_SPLIT_INTEGER_H_
......@@ -33,7 +33,7 @@ struct dag_node {
}
void dag_compact();
void dag_print(std::ostream &stream, unsigned rank);
void dag_print(std::ostream &stream, unsigned rank, bool capture_memory, bool capture_time);
unsigned dag_max_memory();
unsigned long dag_total_user_time();
unsigned long dag_critical_path();
......
......@@ -24,11 +24,14 @@ class profiler {
}
struct profiler_run {
profiler_run(unsigned num_threads) : start_time_{},
end_time_{},
root_node_{std::make_unique<dag_node>(0)},
per_thread_stats_(num_threads),
num_threads_{num_threads} {}
profiler_run(profiler &profiler) : profiler_{profiler},
start_time_{},
end_time_{},
root_node_{std::make_unique<dag_node>(0)},
per_thread_stats_(profiler.num_threads_),
num_threads_{profiler.num_threads_} {}
profiler &profiler_;
// Runtime stats
clock::time_point start_time_;
......
......@@ -25,6 +25,9 @@ namespace pls::internal::scheduling {
*
* This base_task can be extended by different trading/stealing implementations,
* to add for example additional flags. The scheduler itself always works solely with this base version.
*
* Currently, only the 'lock_free' stealing implementation is present and extends the task in this package.
* The scheduler only uses/operates on this base_task, as the staling/resource trading is not its domain.
*/
struct strain_resource;
struct base_task {
......
......@@ -37,8 +37,8 @@ class external_trading_deque {
public:
external_trading_deque(unsigned thread_id, size_t num_entries) : thread_id_(thread_id), entries_(num_entries) {}
static task *peek_traded_object(task *target_task);
static task *get_trade_object(task *target_task);
static traded_cas_field peek_traded_object(task *target_task);
static task *get_trade_object(task *target_task, traded_cas_field peeked_cas, external_trading_deque &other_deque);
/**
* Pushes a task on the bottom of the deque.
......
......@@ -3,7 +3,7 @@
#define PLS_LOCK_FREE_TASK_H_
#include "pls/internal/scheduling/base_task.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/data_structures/stamped_split_integer.h"
#include "pls/internal/scheduling/lock_free/traded_cas_field.h"
namespace pls::internal::scheduling::lock_free {
......@@ -22,12 +22,17 @@ struct task : public base_task {
std::atomic<traded_cas_field> external_trading_deque_cas_{};
void push_task_chain(task *spare_task_chain);
void propose_push_task_chain(task *spare_task_chain);
bool accept_proposed();
bool decline_proposed();
task *pop_task_chain();
void reset_task_chain();
private:
std::atomic<base_task *> resource_stack_next_{};
std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}};
// STAMP = CAS stamp, half CAS length (16 or 32 Bit)
// VALUE_1 = Root of the actual stack, indicated by thread ID (8 or 16 Bit)
// VALUE_2 = Proposed element in queue, indicated by thread ID (8 or 16 Bit)
std::atomic<data_structures::stamped_split_integer> resource_stack_root_{{0, 0, 0}};
};
}
......
......@@ -34,27 +34,32 @@ struct traded_cas_field {
static constexpr base::system_details::cas_integer
TRADE_OBJECT_BITS = ~((~0x0ul) << TRADED_OBJECT_SIZE) << TRADED_OBJECT_SHIFT;
static constexpr base::system_details::cas_integer ID_SIZE = 10ul; // Up to 1024 cores
static constexpr base::system_details::cas_integer ID_SIZE = (CAS_SIZE / 2) - TAG_SIZE; // Half the CAS for the ID
static constexpr base::system_details::cas_integer ID_SHIFT = TAG_SIZE;
static constexpr base::system_details::cas_integer ID_BITS = ~((~0x0ul) << ID_SIZE) << ID_SHIFT;
static constexpr base::system_details::cas_integer STAMP_SIZE = CAS_SIZE - TAG_SIZE - ID_SIZE;
static constexpr base::system_details::cas_integer STAMP_SIZE = (CAS_SIZE / 2); // Half the CAS for the STAMP
static constexpr base::system_details::cas_integer STAMP_SHIFT = TAG_SIZE + ID_SIZE;
static constexpr base::system_details::cas_integer STAMP_BITS = ~((~0x0ul) << STAMP_SIZE) << STAMP_SHIFT;
public:
void fill_with_stamp(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) {
void fill_with_stamp_and_deque(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) {
cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | STAMP_TAG);
}
base::system_details::cas_integer get_stamp() {
void fill_with_stamp_and_empty(base::system_details::cas_integer stamp, base::system_details::cas_integer deque_id) {
cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | EMPTY_TAG);
PLS_ASSERT(is_empty(), "Must be empty after filling it empty...");
}
[[nodiscard]] base::system_details::cas_integer get_stamp() const {
PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one.");
return (((base::system_details::cas_integer) cas_integer_) & STAMP_BITS) >> STAMP_SHIFT;
}
base::system_details::cas_integer get_deque_id() {
[[nodiscard]] base::system_details::cas_integer get_deque_id() const {
PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one.");
return (((base::system_details::cas_integer) cas_integer_) & ID_BITS) >> ID_SHIFT;
}
bool is_filled_with_stamp() {
[[nodiscard]] bool is_filled_with_stamp() const {
return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == STAMP_TAG;
}
......@@ -63,18 +68,25 @@ struct traded_cas_field {
"Must only store aligned objects in this data structure (last bits are needed for tag bit)");
cas_integer_ = (((base::system_details::cas_integer) new_task) | TRADE_TAG);
}
task *get_trade_object() {
[[nodiscard]] task *get_trade_object() const {
PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one.");
return reinterpret_cast<task *>(((base::system_details::cas_integer) cas_integer_) & TRADE_OBJECT_BITS);
}
bool is_filled_with_object() {
[[nodiscard]] bool is_filled_with_object() const {
return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == TRADE_TAG;
}
bool is_empty() {
[[nodiscard]] bool is_empty() const {
return (((base::system_details::cas_integer) cas_integer_) & TAG_BITS) == EMPTY_TAG;
}
bool operator==(const traded_cas_field &other) const {
return this->cas_integer_ == other.cas_integer_;
}
bool operator!=(const traded_cas_field &other) const {
return !((*this) == other);
}
private:
base::system_details::cas_integer cas_integer_{};
};
......
......@@ -142,7 +142,7 @@ class scheduler {
template<typename Function>
static void serial_internal(Function &&lambda);
static context_switcher::continuation slow_return(thread_state &calling_state);
static context_switcher::continuation slow_return(thread_state &calling_state, bool in_sync);
static void work_thread_main_loop();
void work_thread_work_section();
......
......@@ -33,7 +33,7 @@ scheduler::scheduler(unsigned int num_threads,
stack_allocator_{std::make_shared<ALLOC>(std::forward<ALLOC>(stack_allocator))},
serial_stack_size_{serial_stack_size}
#if PLS_PROFILING_ENABLED
, profiler_{num_threads}
, profiler_{num_threads}
#endif
{
......@@ -269,7 +269,7 @@ void scheduler::spawn_internal(Function &&lambda) {
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
spawned_task->profiling_node_);
#endif
auto continuation = slow_return(syncing_state);
auto continuation = slow_return(syncing_state, false);
return continuation;
}
});
......
......@@ -19,18 +19,23 @@ void dag_node::dag_compact() {
}
}
void dag_node::dag_print(std::ostream &stream, unsigned rank) {
void dag_node::dag_print(std::ostream &stream, unsigned rank, bool capture_memory, bool capture_time) {
stream << node_print_id()
<< " [label=\"" << spawning_thread_id_ << "\n"
<< max_memory_ << " bytes\n"
<< m_to_d(total_runtime_) << " us\""
<< " ,rank=" << rank << "];" << std::endl;
<< " [label=\"" << spawning_thread_id_ << "\n";
if (capture_memory) {
stream << max_memory_ << " bytes\n";
}
if (capture_time) {
stream << m_to_d(total_runtime_) << " us\"";
}
stream << " ,rank=" << rank << "];" << std::endl;
for (auto &child : child_nodes_) {
child.dag_print(stream, rank + 1);
child.dag_print(stream, rank + 1, capture_memory, capture_time);
stream << node_print_id() << " -> " << child.node_print_id() << ";" << std::endl;
}
if (next_node_) {
next_node_->dag_print(stream, rank);
next_node_->dag_print(stream, rank, capture_memory, capture_time);
stream << node_print_id() << " -> " << next_node_->node_print_id() << ";" << std::endl;
}
}
......
......@@ -58,12 +58,12 @@ void profiler::profiler_run::print_stats() const {
void profiler::profiler_run::print_dag(std::ostream &stream) {
stream << "digraph {" << std::endl;
root_node_->dag_print(std::cout, 0);
root_node_->dag_print(std::cout, 0, profiler_.capture_memory_, profiler_.capture_time_);
stream << "}" << std::endl;
}
dag_node *profiler::start_profiler_run() {
profiler_run &current_run = profiler_runs_.emplace_back(num_threads_);
profiler_run &current_run = profiler_runs_.emplace_back(*this);
current_run.start_time_ = clock::now();
return current_run.root_node_.get();
}
......
......@@ -3,20 +3,19 @@
namespace pls::internal::scheduling::lock_free {
task *external_trading_deque::peek_traded_object(task *target_task) {
traded_cas_field external_trading_deque::peek_traded_object(task *target_task) {
traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
if (current_cas.is_filled_with_object()) {
return current_cas.get_trade_object();
} else {
return nullptr;
}
return current_cas;
}
task *external_trading_deque::get_trade_object(task *target_task) {
traded_cas_field current_cas = target_task->external_trading_deque_cas_.load();
task *external_trading_deque::get_trade_object(task *target_task,
traded_cas_field peeked_cas,
external_trading_deque &other_deque) {
traded_cas_field current_cas = peeked_cas;
if (current_cas.is_filled_with_object()) {
task *result = current_cas.get_trade_object();
traded_cas_field empty_cas;
empty_cas.fill_with_stamp_and_empty(other_deque.bot_internal_.stamp_, other_deque.thread_id_);
if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) {
return result;
}
......@@ -26,8 +25,8 @@ task *external_trading_deque::get_trade_object(task *target_task) {
}
void external_trading_deque::push_bot(task *published_task) {
auto expected_stamp = bot_internal_.stamp;
auto &current_entry = entries_[bot_internal_.value];
auto expected_stamp = bot_internal_.stamp_;
auto &current_entry = entries_[bot_internal_.value_];
// Publish the prepared task in the deque.
current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed);
......@@ -36,36 +35,37 @@ void external_trading_deque::push_bot(task *published_task) {
// Field that all threads synchronize on.
// This happens not in the deque itself, but in the published task.
traded_cas_field sync_cas_field;
sync_cas_field.fill_with_stamp(expected_stamp, thread_id_);
sync_cas_field.fill_with_stamp_and_deque(expected_stamp, thread_id_);
published_task->external_trading_deque_cas_.store(sync_cas_field, std::memory_order_release);
// Advance the bot pointer. Linearization point for making the task public.
bot_internal_.stamp++;
bot_internal_.value++;
bot_.store(bot_internal_.value, std::memory_order_release);
bot_internal_.stamp_++;
bot_internal_.value_++;
bot_.store(bot_internal_.value_, std::memory_order_release);
}
void external_trading_deque::reset_bot_and_top() {
bot_internal_.value = 0;
bot_internal_.stamp++;
bot_internal_.value_ = 0;
bot_internal_.stamp_++;
bot_.store(0);
top_.store({bot_internal_.stamp, 0});
top_.store({bot_internal_.stamp_, 0});
}
task *external_trading_deque::pop_bot() {
if (bot_internal_.value > 0) {
bot_internal_.value--;
bot_.store(bot_internal_.value, std::memory_order_relaxed);
if (bot_internal_.value_ > 0) {
bot_internal_.value_--;
bot_.store(bot_internal_.value_, std::memory_order_relaxed);
auto &current_entry = entries_[bot_internal_.value];
auto &current_entry = entries_[bot_internal_.value_];
auto *popped_task = current_entry.traded_task_.load(std::memory_order_relaxed);
auto expected_stamp = current_entry.forwarding_stamp_.load(std::memory_order_relaxed);
// We know what value must be in the cas field if no other thread stole it.
traded_cas_field expected_sync_cas_field;
expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_);
expected_sync_cas_field.fill_with_stamp_and_deque(expected_stamp, thread_id_);
traded_cas_field empty_cas_field;
empty_cas_field.fill_with_stamp_and_empty(expected_stamp, thread_id_);
if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field,
empty_cas_field,
......@@ -82,8 +82,8 @@ external_trading_deque::peek_result external_trading_deque::peek_top() {
auto local_top = top_.load();
auto local_bot = bot_.load();
if (local_top.value < local_bot) {
return peek_result{entries_[local_top.value].traded_task_, local_top};
if (local_top.value_ < local_bot) {
return peek_result{entries_[local_top.value_].traded_task_, local_top};
} else {
return peek_result{nullptr, local_top};
}
......@@ -92,38 +92,38 @@ external_trading_deque::peek_result external_trading_deque::peek_top() {
task *external_trading_deque::pop_top(task *offered_task, peek_result peek_result) {
stamped_integer expected_top = peek_result.top_pointer_;
auto local_bot = bot_.load();
if (expected_top.value >= local_bot) {
if (expected_top.value_ >= local_bot) {
return nullptr;
}
auto &target_entry = entries_[expected_top.value];
auto &target_entry = entries_[expected_top.value_];
// Read our potential result
task *result = target_entry.traded_task_.load();
unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load();
// Try to get it by CAS with the expected field entry, giving up our offered_task for it
traded_cas_field expected_sync_cas_field;
expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_);
expected_sync_cas_field.fill_with_stamp_and_deque(expected_top.stamp_, thread_id_);
traded_cas_field offered_field;
offered_field.fill_with_trade_object(offered_task);
if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) {
// We got it, for sure move the top pointer forward.
top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1});
// Return the stolen task
top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1});
return result;
} else {
// We did not get it...help forwarding the top pointer anyway.
if (expected_top.stamp == forwarding_stamp) {
// ...move the pointer forward if someone else put a valid trade object in there.
top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1});
} else {
if (forwarding_stamp != expected_top.stamp_) {
// ...we failed because the top tag lags behind...try to fix it.
// This means only updating the tag, as this location can still hold data we need.
top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value});
top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value_});
}
// TODO: Figure out how other tasks can put the stamp forward without race conditions.
// This must be here to ensure the lock-free property.
// For now first leave it out, as it makes fixing the other non-blocking interaction
// on the resource stack harder.
return nullptr;
}
}
......
......@@ -8,63 +8,116 @@ static task *find_task(unsigned id, unsigned depth) {
return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth);
}
void task::push_task_chain(task *spare_task_chain) {
void task::propose_push_task_chain(task *spare_task_chain) {
PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_,
"Makes no sense to push task onto itself, as it is not clean by definition.");
PLS_ASSERT(this->depth_ == spare_task_chain->depth_,
"Must only push tasks with correct depth.");
data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root;
data_structures::stamped_split_integer current_root;
data_structures::stamped_split_integer target_root;
do {
current_root = this->resource_stack_root_.load();
target_root.stamp = current_root.stamp + 1;
target_root.value = spare_task_chain->thread_id_ + 1;
PLS_ASSERT(current_root.value_2_ == 0, "Must only propose one push at a time!");
if (current_root.value == 0) {
// Empty, simply push in with no successor
spare_task_chain->resource_stack_next_.store(nullptr);
} else {
// Already an entry. Find it's corresponding task and set it as our successor.
auto *current_root_task = find_task(current_root.value - 1, this->depth_);
spare_task_chain->resource_stack_next_.store(current_root_task);
// Add it to the current stack state by chaining its next stack task to the current base.
// Popping threads will see this proposed task as if it is the first item in the stack and pop it first,
// making sure that the chain is valid without any further modification when accepting the proposed task.
auto *current_root_task = current_root.value_1_ == 0 ? nullptr : find_task(current_root.value_1_ - 1, this->depth_);
spare_task_chain->resource_stack_next_.store(current_root_task);
target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_1_ = current_root.value_1_;
target_root.value_2_ = spare_task_chain->thread_id_ + 1;
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root));
}
bool task::accept_proposed() {
data_structures::stamped_split_integer current_root;
data_structures::stamped_split_integer target_root;
do {
current_root = this->resource_stack_root_.load();
if (current_root.value_2_ == 0) {
return false; // We are done, nothing to accept!
}
target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_1_ = current_root.value_2_;
target_root.value_2_ = 0;
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root));
return true;
}
bool task::decline_proposed() {
bool proposed_still_there;
data_structures::stamped_split_integer current_root;
data_structures::stamped_split_integer target_root;
do {
current_root = this->resource_stack_root_.load();
proposed_still_there = current_root.value_2_ != 0;
// No need to fetch anything, just delete the proposed item.
target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_1_ = current_root.value_1_;
target_root.value_2_ = 0;
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root));
return proposed_still_there;
}
void task::push_task_chain(task *spare_task_chain) {
PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_,
"Makes no sense to push task onto itself, as it is not clean by definition.");
PLS_ASSERT(this->depth_ == spare_task_chain->depth_,
"Must only push tasks with correct depth.");
propose_push_task_chain(spare_task_chain);
accept_proposed();
}
task *task::pop_task_chain() {
data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root;
data_structures::stamped_split_integer current_root;
data_structures::stamped_split_integer target_root;
task *output_task;
do {
current_root = this->resource_stack_root_.load();
if (current_root.value == 0) {
// Empty...
return nullptr;
if (current_root.value_2_ == 0) {
if (current_root.value_1_ == 0) {
// No main chain and no proposed element.
return nullptr;
} else {
// No entries in the proposed slot, but fond
// elements in primary stack, try to pop from there.
auto *current_root_task = find_task(current_root.value_1_ - 1, this->depth_);
auto *next_stack_task = current_root_task->resource_stack_next_.load();
target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_1_ = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
target_root.value_2_ = 0;
output_task = current_root_task;
}
} else {
// Found something, try to pop it
auto *current_root_task = find_task(current_root.value - 1, this->depth_);
auto *next_stack_task = current_root_task->resource_stack_next_.load();
// We got a proposed element. Treat it as beginning of resource stack by
// popping it instead of the main element chain.
auto *proposed_task = find_task(current_root.value_2_ - 1, this->depth_);
target_root.stamp = current_root.stamp + 1;
target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
// Empty out the proposed slot
target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_1_ = current_root.value_1_;
target_root.value_2_ = 0;
output_task = current_root_task;
output_task = proposed_task;
}
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root));
PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains.");
output_task->resource_stack_next_.store(nullptr);
return output_task;
}
void task::reset_task_chain() {
auto current_root = this->resource_stack_root_.load();
current_root.stamp++;
current_root.value = 0;
this->resource_stack_root_.store(current_root);
this->resource_stack_next_.store(nullptr);
}
}
......@@ -61,18 +61,28 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state
PLS_ASSERT(pop_result_task == stolen_task,
"We must only steal the task that we peeked at!");
// update the resource stack associated with the stolen task
// Update the resource stack associated with the stolen task.
// We first propose the traded task, in case someone took our traded in field directly.
// stolen_task->propose_push_task_chain(traded_task);
stolen_task->push_task_chain(traded_task);
task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task);
auto peeked_traded_object = external_trading_deque::peek_traded_object(stolen_task);
task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task,
peeked_traded_object,
stealing_state.get_task_manager().deque_);
if (optional_exchanged_task) {
// All good, we pushed the task over to the stack, nothing more to do
PLS_ASSERT(optional_exchanged_task == traded_task,
"We are currently executing this, no one else can put another task in this field!");
// No one took the traded task, push it finally into the stack.
// stolen_task->accept_proposed();
} else {
// The last other active thread took it as its spare resource...
// ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
stolen_task->reset_task_chain();
// Someone explicitly took the traded task from us, remove it from the stack.
// bool proposed_still_in_stack = stolen_task->decline_proposed();
// PLS_ASSERT(proposed_still_in_stack,
// "The proposed task was stolen over the CAS field. It is not possible for another consumer to take the proposed task!");
base_task *popped_task = stolen_task->pop_task_chain();
PLS_ASSERT(popped_task == traded_task, "Other task must only steal CAS task if deque was empty!");
}
return std::tuple{stolen_task, chain_after_stolen_task, true};
......@@ -85,20 +95,41 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state
}
base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
task *popped_task = static_cast<task *>(base_task);
// Try to get a clean resource chain to go back to the main stealing loop
task *clean_chain = popped_task->pop_task_chain();
if (clean_chain == nullptr) {
// double-check if we are really last one or we only have unlucky timing
task *optional_cas_task = external_trading_deque::get_trade_object(popped_task);
if (optional_cas_task) {
clean_chain = optional_cas_task;
} else {
clean_chain = popped_task->pop_task_chain();
task *target_task = static_cast<task *>(base_task);
while (true) {
// Try to get a clean resource chain to go back to the main stealing loop
auto peeked_task_cas_before = external_trading_deque::peek_traded_object(target_task);
task *pop_result = target_task->pop_task_chain();
if (pop_result) {
return pop_result; // Got something, so we are simply done here
}
auto peeked_task_cas_after = external_trading_deque::peek_traded_object(target_task);
if (peeked_task_cas_before != peeked_task_cas_after) {
continue;
}
}
return clean_chain;
if (peeked_task_cas_after.is_empty() || peeked_task_cas_after.is_filled_with_stamp()) {
if (peeked_task_cas_after.is_filled_with_stamp()) {
printf("what happened!\n"); // TODO: issue to 80% because the stealing threads skip parts of the deque.
continue;
}
// The task was 'stable' during our pop from the stack.
// Or in other words: no other thread operated on the task.
// We are therefore the last child and do not get a clean task chain.
return nullptr;
}
// The task was stable, but has a potential resource attached in its cas field.
// Try to get it to not be blocked by the other preempted task.
// task *optional_cas_task =
// external_trading_deque::get_trade_object(target_task, peeked_task_cas_after, this->deque_);
// if (optional_cas_task) {
// // We got it, thus the other thread has not got it and will remove it from the queue.
// return optional_cas_task;
// }
}
}
}
......@@ -184,7 +184,7 @@ void scheduler::sync_internal() {
spawned_task->run_as_task([active_task, spawned_task, &syncing_state](context_switcher::continuation cont) {
active_task->continuation_ = std::move(cont);
syncing_state.set_active_task(spawned_task);
return slow_return(syncing_state);
return slow_return(syncing_state, true);
});
PLS_ASSERT(!continuation.valid(),
......@@ -202,7 +202,7 @@ void scheduler::sync_internal() {
}
}
context_switcher::continuation scheduler::slow_return(thread_state &calling_state) {
context_switcher::continuation scheduler::slow_return(thread_state &calling_state, bool in_sync) {
base_task *this_task = calling_state.get_active_task();
PLS_ASSERT(this_task->depth_ > 0,
"Must never try to return from a task at level 0 (no last task), as we must have a target to return to.");
......@@ -240,7 +240,7 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat
// Jump back to the continuation in main scheduling loop.
context_switcher::continuation result_cont = std::move(thread_state::get().main_continuation());
PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
PLS_ASSERT(result_cont.valid(), "Must return a valid continuation (to main).");
return result_cont;
} else {
// Make sure that we are owner of this full continuation/task chain.
......@@ -251,8 +251,13 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat
last_task->is_synchronized_ = true;
// Jump to parent task and continue working on it.
if (in_sync) {
PLS_ASSERT(last_task->continuation_.valid(), "Must return a valid continuation (to last task) in sync.");
} else {
PLS_ASSERT(last_task->continuation_.valid(), "Must return a valid continuation (to last task) in spawn.");
}
context_switcher::continuation result_cont = std::move(last_task->continuation_);
PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
return result_cont;
}
}
......
......@@ -21,7 +21,7 @@ TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/lock_free/
const int stamp = 42;
const int ID = 10;
traded_cas_field tag_field;
tag_field.fill_with_stamp(stamp, ID);
tag_field.fill_with_stamp_and_deque(stamp, ID);
REQUIRE(tag_field.is_filled_with_stamp());
REQUIRE(!tag_field.is_empty());
REQUIRE(!tag_field.is_filled_with_object());
......@@ -53,6 +53,58 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") {
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
}
SECTION("propose/pop") {
tasks[0]->propose_push_task_chain(tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
}
SECTION("propose/accept/pop") {
tasks[0]->propose_push_task_chain(tasks[1]);
tasks[0]->accept_proposed();
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
}
SECTION("propose/decline/pop") {
tasks[0]->propose_push_task_chain(tasks[1]);
tasks[0]->decline_proposed();
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
}
SECTION("propose intertwined normal ops") {
tasks[0]->push_task_chain(tasks[1]);
tasks[0]->propose_push_task_chain(tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
tasks[0]->push_task_chain(tasks[1]);
tasks[0]->propose_push_task_chain(tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
tasks[0]->accept_proposed();
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
tasks[0]->push_task_chain(tasks[1]);
tasks[0]->propose_push_task_chain(tasks[2]);
tasks[0]->decline_proposed();
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
tasks[0]->push_task_chain(tasks[1]);
tasks[0]->propose_push_task_chain(tasks[2]);
tasks[0]->accept_proposed();
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
}
SECTION("multiple pushes") {
tasks[0]->push_task_chain(tasks[1]);
tasks[0]->push_task_chain(tasks[2]);
......@@ -88,7 +140,8 @@ TEST_CASE("external trading deque", "[internal/scheduling/lock_free/external_tra
deque_1.push_bot(&tasks[0]);
auto peek = deque_1.peek_top();
REQUIRE(deque_1.pop_top(&tasks[1], peek) == &tasks[0]);
REQUIRE(external_trading_deque::get_trade_object(&tasks[0]) == &tasks[1]);
auto trade_peek = deque_1.peek_traded_object(&tasks[0]);
REQUIRE(external_trading_deque::get_trade_object(&tasks[0], trade_peek, deque_1) == &tasks[1]);
REQUIRE(!deque_1.pop_top(&tasks[1], peek));
REQUIRE(!deque_1.pop_bot());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment