From 1d1b51854328250c97a5188899d5d8d05026d303 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Tue, 23 Jun 2020 17:38:21 +0200 Subject: [PATCH] Stable version of Re-Work resource-trading implementation to fix race condition. --- lib/pls/include/pls/internal/scheduling/lock_free/task.h | 9 +++++++-- lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp | 18 +----------------- lib/pls/src/internal/scheduling/lock_free/task.cpp | 52 ++++++++++++++++++++++++++++++++-------------------- lib/pls/src/internal/scheduling/lock_free/task_manager.cpp | 37 +++++++++++++++++-------------------- test/scheduling_lock_free_tests.cpp | 37 +++++++++++++++---------------------- 5 files changed, 72 insertions(+), 81 deletions(-) diff --git a/lib/pls/include/pls/internal/scheduling/lock_free/task.h b/lib/pls/include/pls/internal/scheduling/lock_free/task.h index 2d0e3a9..723a067 100644 --- a/lib/pls/include/pls/internal/scheduling/lock_free/task.h +++ b/lib/pls/include/pls/internal/scheduling/lock_free/task.h @@ -21,8 +21,10 @@ struct task : public base_task { // Additional info for lock-free stealing and resource trading. std::atomic external_trading_deque_cas_{{}}; - void push_task_chain(task *spare_task_chain); + void prepare_for_push(unsigned pushing_thread_id); + bool push_task_chain(task *spare_task_chain, unsigned pushing_thread_id); task *pop_task_chain(); + void reset_task_chain(task *expected_content); static task *find_task(unsigned id, unsigned depth); @@ -30,7 +32,10 @@ struct task : public base_task { private: std::atomic num_resources_{}; - std::atomic resource_stack_next_{}; + // STAMP = thread id of 'owning' thread before task was inserted into stack. + // VALUE = next item in stack, indicated by thread ID. + std::atomic resource_stack_next_{{0, 0}}; + // STAMP = CAS stamp, half CAS length (16 or 32 Bit) // VALUE = Root of the actual stack, indicated by thread ID (16 or 32 Bit) std::atomic resource_stack_root_{{0, 0}}; diff --git a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp index baba64c..f57c6b6 100644 --- a/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp @@ -47,19 +47,6 @@ void external_trading_deque::push_bot(task *published_task) { } void external_trading_deque::reset_bot_and_top() { - for (int i = bot_internal_.value_; i >= 0; i--) { - auto ¤t_entry = entries_[i]; - - auto *task = current_entry.traded_task_.load(std::memory_order_relaxed); - auto task_cas = task->external_trading_deque_cas_.load(); - if (task_cas.is_filled_with_trade_request() && task_cas.get_trade_request_thread_id() == thread_id_) { - PLS_ASSERT(false, "Must not have 'non stolen' tasks left in own task chain!"); - } - - current_entry.traded_task_.store(nullptr, std::memory_order_relaxed); - current_entry.forwarding_stamp_.store(0, std::memory_order_relaxed); - } - bot_internal_.value_ = 0; bot_internal_.stamp_++; @@ -88,9 +75,6 @@ task *external_trading_deque::pop_bot() { if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, empty_cas_field, std::memory_order_acq_rel)) { - current_entry.traded_task_.store(nullptr, std::memory_order_relaxed); - current_entry.forwarding_stamp_.store(0, std::memory_order_relaxed); - return popped_task; } else { reset_bot_and_top(); @@ -144,7 +128,7 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul top_.compare_exchange_strong(expected_top, {expected_top.stamp_ + 1, expected_top.value_ + 1}); return result; } else { - // TODO: Re-Check this condition for forwading the stamp! Should only happen if another top-stealer took the + // TODO: Re-Check this condition for forwarding the stamp! Should only happen if another top-stealer took the // slot that we where interested in! if (expected_sync_cas_field.is_filled_with_object() && expected_sync_cas_field.get_stamp() == expected_top.stamp_ && expected_sync_cas_field.get_trade_request_thread_id() == thread_id_) { diff --git a/lib/pls/src/internal/scheduling/lock_free/task.cpp b/lib/pls/src/internal/scheduling/lock_free/task.cpp index c3bf52b..7100b46 100644 --- a/lib/pls/src/internal/scheduling/lock_free/task.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/task.cpp @@ -7,7 +7,14 @@ task *task::find_task(unsigned id, unsigned depth) { return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth); } -void task::push_task_chain(task *spare_task_chain) { +void task::prepare_for_push(unsigned int pushing_thread_id) { + data_structures::stamped_integer target_next; + target_next.stamp_ = pushing_thread_id + 1; + target_next.value_ = 0; + resource_stack_next_.store(target_next, std::memory_order_relaxed); +} + +bool task::push_task_chain(task *spare_task_chain, unsigned pushing_thread_id) { num_resources_++; PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_, @@ -17,6 +24,12 @@ void task::push_task_chain(task *spare_task_chain) { data_structures::stamped_integer current_root; data_structures::stamped_integer target_root; + + data_structures::stamped_integer expected_next_field; + data_structures::stamped_integer target_next_field; + expected_next_field.stamp_ = pushing_thread_id + 1; + expected_next_field.value_ = 0; + int iteration = 0; do { iteration++; @@ -24,25 +37,29 @@ void task::push_task_chain(task *spare_task_chain) { target_root.stamp_ = current_root.stamp_ + 1; target_root.value_ = spare_task_chain->thread_id_ + 1; - // TODO: Setting the resource stack next AFTER publishing the task to the CAS field - // is a race, as the resource stack next field can be tampered with. + // Prepare the target_next_field as required if (current_root.value_ == 0) { // Empty, simply push in with no successor. - // We are sure that a spare_task_chain is not in any stack when pushing it. - // Thus, its resource_stack_next_ field must be nullptr. - // TODO: this should be our race? - // TODO: add more checks (see if this is violated AND cas succeeds) - auto *old_value = spare_task_chain->resource_stack_next_.exchange(nullptr); - if (old_value != nullptr) { - printf("Why would this invariant happen?\n"); - } + target_next_field.stamp_ = pushing_thread_id + 1; + target_next_field.value_ = 0; } else { // Already an entry. Find it's corresponding task and set it as our successor. auto *current_root_task = find_task(current_root.value_ - 1, this->depth_); - spare_task_chain->resource_stack_next_.store(current_root_task); + + target_next_field.stamp_ = pushing_thread_id + 1; + target_next_field.value_ = current_root_task->thread_id_ + 1; + } + + if (!spare_task_chain->resource_stack_next_.compare_exchange_strong(expected_next_field, target_next_field)) { + num_resources_--; + return false; + } else { + expected_next_field = target_next_field; } } while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root)); + + return true; } void task::reset_task_chain(task *expected_content) { @@ -52,11 +69,6 @@ void task::reset_task_chain(task *expected_content) { PLS_ASSERT(current_root.value_ == expected_content->thread_id_ + 1, "Must only reset the task chain if we exactly know its state! (current_root.value_)"); - auto *current_root_task = find_task(current_root.value_ - 1, this->depth_); - if (current_root_task->resource_stack_next_.load(std::memory_order_relaxed) != nullptr) { - printf("This could have been the bug...\n"); - } - data_structures::stamped_integer target_root; target_root.stamp_ = current_root.stamp_ + 1; bool success = this->resource_stack_root_.compare_exchange_strong(current_root, target_root); @@ -75,10 +87,10 @@ task *task::pop_task_chain() { } else { // Found something, try to pop it auto *current_root_task = find_task(current_root.value_ - 1, this->depth_); - auto *next_stack_task = current_root_task->resource_stack_next_.load(); + auto next_stack_cas = current_root_task->resource_stack_next_.load(); target_root.stamp_ = current_root.stamp_ + 1; - target_root.value_ = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0; + target_root.value_ = next_stack_cas.value_; output_task = current_root_task; } @@ -86,7 +98,7 @@ task *task::pop_task_chain() { PLS_ASSERT(num_resources_.fetch_add(-1) > 0, "Must only return an task from the chain if there are items!"); - output_task->resource_stack_next_.store(nullptr); + output_task->resource_stack_next_.store({0, 0}); return output_task; } diff --git a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp index e95d817..e1e2da9 100644 --- a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp @@ -53,7 +53,8 @@ std::tuple task_manager::steal_task(thread_state task *traded_task = static_cast(scheduler::get_trade_task(stolen_task, stealing_state)); base_task *chain_after_stolen_task = traded_task->next_; - // TODO: traded task resource_stack_next_ field is now marked as mine + // mark that we would like to push the traded in task + traded_task->prepare_for_push(stealing_state.get_thread_id()); // perform the actual pop operation task *pop_result_task = deque_.pop_top(traded_task, peek); @@ -64,22 +65,20 @@ std::tuple task_manager::steal_task(thread_state "We must only steal the task that we peeked at!"); // Update the resource stack associated with the stolen task. - // TODO: push only onto task chain if the resource_stack_next_ was still mine - // (otherwise the CAS could have been stolen). - // This makes sure, that we never 'destroy' a task that we do not own by our stack push routine. - stolen_task->push_task_chain(traded_task); + bool push_success = stolen_task->push_task_chain(traded_task, stealing_state.get_thread_id()); auto peeked_traded_object = external_trading_deque::peek_traded_object(stolen_task); task *optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task, peeked_traded_object); if (optional_exchanged_task) { PLS_ASSERT(optional_exchanged_task == traded_task, "We are currently executing this, no one else can put another task in this field!"); - // TODO: we should also assert that the push worked in this case! + PLS_ASSERT(push_success, + "Push must only be interrupted if someone took the task we tried to push!"); } else { - // Someone explicitly took the traded task from us, remove it from the stack. - // TODO: if the push failed, we do not need to reset anything. - // Otherwise the normal invariant that we seek holds. - stolen_task->reset_task_chain(traded_task); + // Someone explicitly took the traded task from us, remove it from the stack if we pushed it. + if (push_success) { + stolen_task->reset_task_chain(traded_task); + } } return std::tuple{stolen_task, chain_after_stolen_task, true}; @@ -109,12 +108,10 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { continue; } - if (peeked_task_cas_after.is_empty() || peeked_task_cas_after.is_filled_with_trade_request()) { - if (peeked_task_cas_after.is_filled_with_trade_request()) { - printf("what happened! (%d)\n", base_task->thread_id_); - continue; - } + PLS_ASSERT(!peeked_task_cas_after.is_filled_with_trade_request(), + "The resource stack must never be empty while the task is up for being stolen."); + if (peeked_task_cas_after.is_empty()) { // The task was 'stable' during our pop from the stack. // Or in other words: no other thread operated on the task. // We are therefore the last child and do not get a clean task chain. @@ -123,11 +120,11 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { // The task was stable, but has a potential resource attached in its cas field. // Try to get it to not be blocked by the other preempted task. -// task *optional_cas_task = external_trading_deque::get_trade_object(target_task, peeked_task_cas_after); -// if (optional_cas_task) { -// // We got it, thus the other thread has not got it and will remove it from the queue. -// return optional_cas_task; -// } + task *optional_cas_task = external_trading_deque::get_trade_object(target_task, peeked_task_cas_after); + if (optional_cas_task) { + // We got it, thus the other thread has not got it and will remove it from the queue. + return optional_cas_task; + } } } diff --git a/test/scheduling_lock_free_tests.cpp b/test/scheduling_lock_free_tests.cpp index ad42a31..4c73310 100644 --- a/test/scheduling_lock_free_tests.cpp +++ b/test/scheduling_lock_free_tests.cpp @@ -47,43 +47,36 @@ TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") { scheduler.task_manager_for(3).get_task(0)}; SECTION("simple push/pop") { - tasks[0]->push_task_chain(tasks[1]); + tasks[1]->prepare_for_push(0); + tasks[0]->push_task_chain(tasks[1], 0); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); REQUIRE(tasks[0]->pop_task_chain() == nullptr); } - - - SECTION("propose intertwined normal ops") { - tasks[0]->push_task_chain(tasks[1]); - tasks[0]->push_task_chain(tasks[2]); - - REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); - REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); - - tasks[0]->push_task_chain(tasks[1]); - tasks[0]->push_task_chain(tasks[2]); + SECTION("empty pop and multi push") { + tasks[1]->prepare_for_push(0); + tasks[0]->push_task_chain(tasks[1], 0); + tasks[2]->prepare_for_push(0); + tasks[0]->push_task_chain(tasks[2], 0); REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); - tasks[0]->push_task_chain(tasks[1]); + tasks[1]->prepare_for_push(0); + tasks[0]->push_task_chain(tasks[1], 0); REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); REQUIRE(tasks[0]->pop_task_chain() == nullptr); - - tasks[0]->push_task_chain(tasks[1]); - tasks[0]->push_task_chain(tasks[2]); - - REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); - REQUIRE(tasks[0]->pop_task_chain() == tasks[1]); } SECTION("multiple pushes") { - tasks[0]->push_task_chain(tasks[1]); - tasks[0]->push_task_chain(tasks[2]); - tasks[0]->push_task_chain(tasks[3]); + tasks[1]->prepare_for_push(0); + tasks[0]->push_task_chain(tasks[1], 0); + tasks[2]->prepare_for_push(0); + tasks[0]->push_task_chain(tasks[2], 0); + tasks[3]->prepare_for_push(0); + tasks[0]->push_task_chain(tasks[3], 0); REQUIRE(tasks[0]->pop_task_chain() == tasks[3]); REQUIRE(tasks[0]->pop_task_chain() == tasks[2]); -- libgit2 0.26.0