Commit 1d1b5185 by FritzFlorian

Stable version of Re-Work resource-trading implementation to fix race condition.

parent 2784f786
...@@ -21,8 +21,10 @@ struct task : public base_task { ...@@ -21,8 +21,10 @@ struct task : public base_task {
// Additional info for lock-free stealing and resource trading. // Additional info for lock-free stealing and resource trading.
std::atomic<traded_cas_field> external_trading_deque_cas_{{}}; std::atomic<traded_cas_field> external_trading_deque_cas_{{}};
void push_task_chain(task *spare_task_chain); void prepare_for_push(unsigned pushing_thread_id);
bool push_task_chain(task *spare_task_chain, unsigned pushing_thread_id);
task *pop_task_chain(); task *pop_task_chain();
void reset_task_chain(task *expected_content); void reset_task_chain(task *expected_content);
static task *find_task(unsigned id, unsigned depth); static task *find_task(unsigned id, unsigned depth);
...@@ -30,7 +32,10 @@ struct task : public base_task { ...@@ -30,7 +32,10 @@ struct task : public base_task {
private: private:
std::atomic<int> num_resources_{}; std::atomic<int> num_resources_{};
std::atomic<base_task *> resource_stack_next_{}; // STAMP = thread id of 'owning' thread before task was inserted into stack.
// VALUE = next item in stack, indicated by thread ID.
std::atomic<data_structures::stamped_integer> resource_stack_next_{{0, 0}};
// STAMP = CAS stamp, half CAS length (16 or 32 Bit) // STAMP = CAS stamp, half CAS length (16 or 32 Bit)
// VALUE = Root of the actual stack, indicated by thread ID (16 or 32 Bit) // VALUE = Root of the actual stack, indicated by thread ID (16 or 32 Bit)
std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}}; std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}};
......
...@@ -47,19 +47,6 @@ void external_trading_deque::push_bot(task *published_task) { ...@@ -47,19 +47,6 @@ void external_trading_deque::push_bot(task *published_task) {
} }
void external_trading_deque::reset_bot_and_top() { void external_trading_deque::reset_bot_and_top() {
for (int i = bot_internal_.value_; i >= 0; i--) {
auto &current_entry = entries_[i];
auto *task = current_entry.traded_task_.load(std::memory_order_relaxed);
auto task_cas = task->external_trading_deque_cas_.load();
if (task_cas.is_filled_with_trade_request() && task_cas.get_trade_request_thread_id() == thread_id_) {
PLS_ASSERT(false, "Must not have 'non stolen' tasks left in own task chain!");
}
current_entry.traded_task_.store(nullptr, std::memory_order_relaxed);
current_entry.forwarding_stamp_.store(0, std::memory_order_relaxed);
}
bot_internal_.value_ = 0; bot_internal_.value_ = 0;
bot_internal_.stamp_++; bot_internal_.stamp_++;
...@@ -88,9 +75,6 @@ task *external_trading_deque::pop_bot() { ...@@ -88,9 +75,6 @@ task *external_trading_deque::pop_bot() {
if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field,
empty_cas_field, empty_cas_field,
std::memory_order_acq_rel)) { std::memory_order_acq_rel)) {
current_entry.traded_task_.store(nullptr, std::memory_order_relaxed);
current_entry.forwarding_stamp_.store(0, std::memory_order_relaxed);
return popped_task; return popped_task;
} else { } else {
reset_bot_and_top(); reset_bot_and_top();
...@@ -144,7 +128,7 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul ...@@ -144,7 +128,7 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul
top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1}); top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1});
return result; return result;
} else { } else {
// TODO: Re-Check this condition for forwading the stamp! Should only happen if another top-stealer took the // TODO: Re-Check this condition for forwarding the stamp! Should only happen if another top-stealer took the
// slot that we where interested in! // slot that we where interested in!
if (expected_sync_cas_field.is_filled_with_object() && expected_sync_cas_field.get_stamp() == expected_top.stamp_ if (expected_sync_cas_field.is_filled_with_object() && expected_sync_cas_field.get_stamp() == expected_top.stamp_
&& expected_sync_cas_field.get_trade_request_thread_id() == thread_id_) { && expected_sync_cas_field.get_trade_request_thread_id() == thread_id_) {
......
...@@ -7,7 +7,14 @@ task *task::find_task(unsigned id, unsigned depth) { ...@@ -7,7 +7,14 @@ task *task::find_task(unsigned id, unsigned depth) {
return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth); return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth);
} }
void task::push_task_chain(task *spare_task_chain) { void task::prepare_for_push(unsigned int pushing_thread_id) {
data_structures::stamped_integer target_next;
target_next.stamp_ = pushing_thread_id + 1;
target_next.value_ = 0;
resource_stack_next_.store(target_next, std::memory_order_relaxed);
}
bool task::push_task_chain(task *spare_task_chain, unsigned pushing_thread_id) {
num_resources_++; num_resources_++;
PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_, PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_,
...@@ -17,6 +24,12 @@ void task::push_task_chain(task *spare_task_chain) { ...@@ -17,6 +24,12 @@ void task::push_task_chain(task *spare_task_chain) {
data_structures::stamped_integer current_root; data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root; data_structures::stamped_integer target_root;
data_structures::stamped_integer expected_next_field;
data_structures::stamped_integer target_next_field;
expected_next_field.stamp_ = pushing_thread_id + 1;
expected_next_field.value_ = 0;
int iteration = 0; int iteration = 0;
do { do {
iteration++; iteration++;
...@@ -24,25 +37,29 @@ void task::push_task_chain(task *spare_task_chain) { ...@@ -24,25 +37,29 @@ void task::push_task_chain(task *spare_task_chain) {
target_root.stamp_ = current_root.stamp_ + 1; target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_ = spare_task_chain->thread_id_ + 1; target_root.value_ = spare_task_chain->thread_id_ + 1;
// TODO: Setting the resource stack next AFTER publishing the task to the CAS field // Prepare the target_next_field as required
// is a race, as the resource stack next field can be tampered with.
if (current_root.value_ == 0) { if (current_root.value_ == 0) {
// Empty, simply push in with no successor. // Empty, simply push in with no successor.
// We are sure that a spare_task_chain is not in any stack when pushing it. target_next_field.stamp_ = pushing_thread_id + 1;
// Thus, its resource_stack_next_ field must be nullptr. target_next_field.value_ = 0;
// TODO: this should be our race?
// TODO: add more checks (see if this is violated AND cas succeeds)
auto *old_value = spare_task_chain->resource_stack_next_.exchange(nullptr);
if (old_value != nullptr) {
printf("Why would this invariant happen?\n");
}
} else { } else {
// Already an entry. Find it's corresponding task and set it as our successor. // Already an entry. Find it's corresponding task and set it as our successor.
auto *current_root_task = find_task(current_root.value_ - 1, this->depth_); auto *current_root_task = find_task(current_root.value_ - 1, this->depth_);
spare_task_chain->resource_stack_next_.store(current_root_task);
target_next_field.stamp_ = pushing_thread_id + 1;
target_next_field.value_ = current_root_task->thread_id_ + 1;
}
if (!spare_task_chain->resource_stack_next_.compare_exchange_strong(expected_next_field, target_next_field)) {
num_resources_--;
return false;
} else {
expected_next_field = target_next_field;
} }
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root));
return true;
} }
void task::reset_task_chain(task *expected_content) { void task::reset_task_chain(task *expected_content) {
...@@ -52,11 +69,6 @@ void task::reset_task_chain(task *expected_content) { ...@@ -52,11 +69,6 @@ void task::reset_task_chain(task *expected_content) {
PLS_ASSERT(current_root.value_ == expected_content->thread_id_ + 1, PLS_ASSERT(current_root.value_ == expected_content->thread_id_ + 1,
"Must only reset the task chain if we exactly know its state! (current_root.value_)"); "Must only reset the task chain if we exactly know its state! (current_root.value_)");
auto *current_root_task = find_task(current_root.value_ - 1, this->depth_);
if (current_root_task->resource_stack_next_.load(std::memory_order_relaxed) != nullptr) {
printf("This could have been the bug...\n");
}
data_structures::stamped_integer target_root; data_structures::stamped_integer target_root;
target_root.stamp_ = current_root.stamp_ + 1; target_root.stamp_ = current_root.stamp_ + 1;
bool success = this->resource_stack_root_.compare_exchange_strong(current_root, target_root); bool success = this->resource_stack_root_.compare_exchange_strong(current_root, target_root);
...@@ -75,10 +87,10 @@ task *task::pop_task_chain() { ...@@ -75,10 +87,10 @@ task *task::pop_task_chain() {
} else { } else {
// Found something, try to pop it // Found something, try to pop it
auto *current_root_task = find_task(current_root.value_ - 1, this->depth_); auto *current_root_task = find_task(current_root.value_ - 1, this->depth_);
auto *next_stack_task = current_root_task->resource_stack_next_.load(); auto next_stack_cas = current_root_task->resource_stack_next_.load();
target_root.stamp_ = current_root.stamp_ + 1; target_root.stamp_ = current_root.stamp_ + 1;
target_root.value_ = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; target_root.value_ = next_stack_cas.value_;
output_task = current_root_task; output_task = current_root_task;
} }
...@@ -86,7 +98,7 @@ task *task::pop_task_chain() { ...@@ -86,7 +98,7 @@ task *task::pop_task_chain() {
PLS_ASSERT(num_resources_.fetch_add(-1) > 0, "Must only return an task from the chain if there are items!"); PLS_ASSERT(num_resources_.fetch_add(-1) > 0, "Must only return an task from the chain if there are items!");
output_task->resource_stack_next_.store(nullptr); output_task->resource_stack_next_.store({0, 0});
return output_task; return output_task;
} }
......
...@@ -53,7 +53,8 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state ...@@ -53,7 +53,8 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state
task *traded_task = static_cast<task *>(scheduler::get_trade_task(stolen_task, stealing_state)); task *traded_task = static_cast<task *>(scheduler::get_trade_task(stolen_task, stealing_state));
base_task *chain_after_stolen_task = traded_task->next_; base_task *chain_after_stolen_task = traded_task->next_;
// TODO: traded task resource_stack_next_ field is now marked as mine // mark that we would like to push the traded in task
traded_task->prepare_for_push(stealing_state.get_thread_id());
// perform the actual pop operation // perform the actual pop operation
task *pop_result_task = deque_.pop_top(traded_task, peek); task *pop_result_task = deque_.pop_top(traded_task, peek);
...@@ -64,23 +65,21 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state ...@@ -64,23 +65,21 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state
"We must only steal the task that we peeked at!"); "We must only steal the task that we peeked at!");
// Update the resource stack associated with the stolen task. // Update the resource stack associated with the stolen task.
// TODO: push only onto task chain if the resource_stack_next_ was still mine bool push_success = stolen_task->push_task_chain(traded_task, stealing_state.get_thread_id());
// (otherwise the CAS could have been stolen).
// This makes sure, that we never 'destroy' a task that we do not own by our stack push routine.
stolen_task->push_task_chain(traded_task);
auto peeked_traded_object = external_trading_deque::peek_traded_object(stolen_task); auto peeked_traded_object = external_trading_deque::peek_traded_object(stolen_task);
task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task, peeked_traded_object); task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task, peeked_traded_object);
if (optional_exchanged_task) { if (optional_exchanged_task) {
PLS_ASSERT(optional_exchanged_task == traded_task, PLS_ASSERT(optional_exchanged_task == traded_task,
"We are currently executing this, no one else can put another task in this field!"); "We are currently executing this, no one else can put another task in this field!");
// TODO: we should also assert that the push worked in this case! PLS_ASSERT(push_success,
"Push must only be interrupted if someone took the task we tried to push!");
} else { } else {
// Someone explicitly took the traded task from us, remove it from the stack. // Someone explicitly took the traded task from us, remove it from the stack if we pushed it.
// TODO: if the push failed, we do not need to reset anything. if (push_success) {
// Otherwise the normal invariant that we seek holds.
stolen_task->reset_task_chain(traded_task); stolen_task->reset_task_chain(traded_task);
} }
}
return std::tuple{stolen_task, chain_after_stolen_task, true}; return std::tuple{stolen_task, chain_after_stolen_task, true};
} else { } else {
...@@ -109,12 +108,10 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { ...@@ -109,12 +108,10 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
continue; continue;
} }
if (peeked_task_cas_after.is_empty() || peeked_task_cas_after.is_filled_with_trade_request()) { PLS_ASSERT(!peeked_task_cas_after.is_filled_with_trade_request(),
if (peeked_task_cas_after.is_filled_with_trade_request()) { "The resource stack must never be empty while the task is up for being stolen.");
printf("what happened! (%d)\n", base_task->thread_id_);
continue;
}
if (peeked_task_cas_after.is_empty()) {
// The task was 'stable' during our pop from the stack. // The task was 'stable' during our pop from the stack.
// Or in other words: no other thread operated on the task. // Or in other words: no other thread operated on the task.
// We are therefore the last child and do not get a clean task chain. // We are therefore the last child and do not get a clean task chain.
...@@ -123,11 +120,11 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { ...@@ -123,11 +120,11 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
// The task was stable, but has a potential resource attached in its cas field. // The task was stable, but has a potential resource attached in its cas field.
// Try to get it to not be blocked by the other preempted task. // Try to get it to not be blocked by the other preempted task.
// task *optional_cas_task = external_trading_deque::get_trade_object(target_task, peeked_task_cas_after); task *optional_cas_task = external_trading_deque::get_trade_object(target_task, peeked_task_cas_after);
// if (optional_cas_task) { if (optional_cas_task) {
// // We got it, thus the other thread has not got it and will remove it from the queue. // We got it, thus the other thread has not got it and will remove it from the queue.
// return optional_cas_task; return optional_cas_task;
// } }
} }
} }
......
...@@ -47,43 +47,36 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") { ...@@ -47,43 +47,36 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") {
scheduler.task_manager_for(3).get_task(0)}; scheduler.task_manager_for(3).get_task(0)};
SECTION("simple push/pop") { SECTION("simple push/pop") {
tasks[0]->push_task_chain(tasks[1]); tasks[1]->prepare_for_push(0);
tasks[0]->push_task_chain(tasks[1], 0);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == nullptr); REQUIRE(tasks[0]->pop_task_chain() == nullptr);
} }
SECTION("empty pop and multi push") {
tasks[1]->prepare_for_push(0);
SECTION("propose intertwined normal ops") { tasks[0]->push_task_chain(tasks[1], 0);
tasks[0]->push_task_chain(tasks[1]); tasks[2]->prepare_for_push(0);
tasks[0]->push_task_chain(tasks[2]); tasks[0]->push_task_chain(tasks[2], 0);
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
tasks[0]->push_task_chain(tasks[1]);
tasks[0]->push_task_chain(tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
tasks[0]->push_task_chain(tasks[1]); tasks[1]->prepare_for_push(0);
tasks[0]->push_task_chain(tasks[1], 0);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == nullptr); REQUIRE(tasks[0]->pop_task_chain() == nullptr);
tasks[0]->push_task_chain(tasks[1]);
tasks[0]->push_task_chain(tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
} }
SECTION("multiple pushes") { SECTION("multiple pushes") {
tasks[0]->push_task_chain(tasks[1]); tasks[1]->prepare_for_push(0);
tasks[0]->push_task_chain(tasks[2]); tasks[0]->push_task_chain(tasks[1], 0);
tasks[0]->push_task_chain(tasks[3]); tasks[2]->prepare_for_push(0);
tasks[0]->push_task_chain(tasks[2], 0);
tasks[3]->prepare_for_push(0);
tasks[0]->push_task_chain(tasks[3], 0);
REQUIRE(tasks[0]->pop_task_chain() == tasks[3]); REQUIRE(tasks[0]->pop_task_chain() == tasks[3]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment