diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 24e0e9a..f199d07 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -35,7 +35,7 @@ add_library(pls STATIC include/pls/internal/base/error_handling.h include/pls/internal/base/alignment.h src/internal/base/alignment.cpp - include/pls/internal/scheduling/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp + include/pls/internal/scheduling/data_structures/aligned_stack.h src/internal/scheduling/data_structures/aligned_stack.cpp include/pls/internal/scheduling/data_structures/aligned_stack_impl.h include/pls/internal/scheduling/data_structures/deque.h include/pls/internal/scheduling/data_structures/locking_deque.h diff --git a/lib/pls/include/pls/internal/scheduling/data_structures/locking_deque.h b/lib/pls/include/pls/internal/scheduling/data_structures/locking_deque.h index 3f798f6..e78acd0 100644 --- a/lib/pls/include/pls/internal/scheduling/data_structures/locking_deque.h +++ b/lib/pls/include/pls/internal/scheduling/data_structures/locking_deque.h @@ -15,19 +15,19 @@ namespace data_structures { using deque_offset = aligned_stack::stack_offset; /** - * Wraps any object into a deque item. + * Wraps any object into a deque Task. */ -template -struct locking_deque_item { - Item *item_; +template +struct locking_deque_task { + Task *item_; - locking_deque_item *prev_; - locking_deque_item *next_; + locking_deque_task *prev_; + locking_deque_task *next_; }; -template -struct locking_deque_container : public locking_deque_item { +template +struct locking_deque_container : public locking_deque_task { Content content_; public: @@ -39,16 +39,16 @@ struct locking_deque_container : public locking_deque_item { * A double linked list based deque. * Storage is therefore only needed for the individual items. * - * @tparam Item The type of items stored in this deque + * @tparam Task The type of Tasks stored in this deque */ -template +template class locking_deque { aligned_stack *stack_; - locking_deque_item *head_; - locking_deque_item *tail_; + locking_deque_task *head_; + locking_deque_task *tail_; - locking_deque_item *last_inserted_; + locking_deque_task *last_inserted_; base::spin_lock lock_; @@ -63,8 +63,10 @@ class locking_deque { void *push_bytes(size_t size); void publish_last_task(); - Item *pop_local_task(); - Item *pop_external_task(); + Task *pop_local_task(bool &cas_fail_out); + Task *pop_local_task(); + Task *pop_external_task(bool &cas_fail_out); + Task *pop_external_task(); void reset_offset(deque_offset state); deque_offset save_offset(); diff --git a/lib/pls/include/pls/internal/scheduling/data_structures/locking_deque_impl.h b/lib/pls/include/pls/internal/scheduling/data_structures/locking_deque_impl.h index c683cb9..1723cd6 100644 --- a/lib/pls/include/pls/internal/scheduling/data_structures/locking_deque_impl.h +++ b/lib/pls/include/pls/internal/scheduling/data_structures/locking_deque_impl.h @@ -14,7 +14,7 @@ T *locking_deque::push_task(ARGS &&...args) { "Must only push types of onto work_stealing_deque"); // Allocate object - auto deque_item = stack_->push < locking_deque_container < Task, T>>(std::forward(args)...); + auto deque_item = stack_->push>(std::forward(args)...); deque_item->item_ = &deque_item->content_; // Keep for later publishing @@ -53,7 +53,14 @@ void locking_deque::publish_last_task() { template Task *locking_deque::pop_local_task() { + bool cas_fail_out; + return pop_local_task(cas_fail_out); +} + +template +Task *locking_deque::pop_local_task(bool &cas_fail_out) { std::lock_guard lock{lock_}; + cas_fail_out = false; // Can not fail CAS in locking implementation if (tail_ == nullptr) { return nullptr; @@ -72,7 +79,14 @@ Task *locking_deque::pop_local_task() { template Task *locking_deque::pop_external_task() { + bool cas_fail_out; + return pop_external_task(cas_fail_out); +} + +template +Task *locking_deque::pop_external_task(bool &cas_fail_out) { std::lock_guard lock{lock_}; + cas_fail_out = false; // Can not fail CAS in locking implementation if (head_ == nullptr) { return nullptr; diff --git a/lib/pls/include/pls/internal/scheduling/data_structures/work_stealing_deque.h b/lib/pls/include/pls/internal/scheduling/data_structures/work_stealing_deque.h index 2266ad5..e874942 100644 --- a/lib/pls/include/pls/internal/scheduling/data_structures/work_stealing_deque.h +++ b/lib/pls/include/pls/internal/scheduling/data_structures/work_stealing_deque.h @@ -79,7 +79,9 @@ class work_stealing_deque { void *push_bytes(size_t size); void publish_last_task(); + Task *pop_local_task(bool &cas_fail_out); Task *pop_local_task(); + Task *pop_external_task(bool &cas_fail_out); Task *pop_external_task(); void reset_offset(deque_offset offset); diff --git a/lib/pls/include/pls/internal/scheduling/data_structures/work_stealing_deque_impl.h b/lib/pls/include/pls/internal/scheduling/data_structures/work_stealing_deque_impl.h index b791bac..33046fd 100644 --- a/lib/pls/include/pls/internal/scheduling/data_structures/work_stealing_deque_impl.h +++ b/lib/pls/include/pls/internal/scheduling/data_structures/work_stealing_deque_impl.h @@ -72,10 +72,17 @@ void work_stealing_deque::publish_last_task() { template Task *work_stealing_deque::pop_local_task() { + bool cas_fail_out; + return pop_local_task(cas_fail_out); +} + +template +Task *work_stealing_deque::pop_local_task(bool &cas_fail_out) { deque_offset local_tail = tail_; stamped_integer local_head = head_; if (local_tail <= local_head.value) { + cas_fail_out = false; return nullptr; // EMPTY } @@ -89,6 +96,7 @@ Task *work_stealing_deque::pop_local_task() { local_head = head_; // Linearization point, outside knows list is empty if (local_head.value < new_tail) { + cas_fail_out = false; return previous_tail_item->data(); // Success, enough distance to other threads } @@ -96,6 +104,7 @@ Task *work_stealing_deque::pop_local_task() { stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail}; // Try competing with consumers by updating the head's stamp value if (head_.compare_exchange_strong(local_head, new_head)) { + cas_fail_out = false; return previous_tail_item->data(); // SUCCESS, we won the competition with other threads } } @@ -105,15 +114,23 @@ Task *work_stealing_deque::pop_local_task() { // Reset the queue into an empty state => head_ = tail_ tail_ = local_head.value; // ...we give up to the other winning thread + cas_fail_out = false; // We failed the CAS race, but the queue is also empty for sure! return nullptr; // EMPTY, we lost the competition with other threads } template Task *work_stealing_deque::pop_external_task() { + bool cas_fail_out; + return pop_external_task(cas_fail_out); +} + +template +Task *work_stealing_deque::pop_external_task(bool &cas_fail_out) { stamped_integer local_head = head_; deque_offset local_tail = tail_; if (local_tail <= local_head.value) { + cas_fail_out = false; return nullptr; // EMPTY } // Load info on current deque item. @@ -131,9 +148,11 @@ Task *work_stealing_deque::pop_external_task() { // 3) owning thread removed tail, we lose to this stamped_integer new_head = stamped_integer{local_head.stamp + 1, next_item_offset}; if (head_.compare_exchange_strong(local_head, new_head)) { + cas_fail_out = false; return head_data_item; // SUCCESS, we won the competition } + cas_fail_out = true; return nullptr; // EMPTY, we lost the competition } diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index eb4dae9..0405600 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -7,7 +7,7 @@ #include "pls/internal/helpers/profiler.h" -#include "pls/internal/data_structures/aligned_stack.h" +#include "pls/internal/scheduling/data_structures/aligned_stack.h" #include "pls/internal/base/thread.h" #include "pls/internal/base/barrier.h" diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h index 57a53dc..81813a6 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h @@ -1,7 +1,7 @@ #ifndef PLS_SCHEDULER_MEMORY_H #define PLS_SCHEDULER_MEMORY_H -#include "pls/internal/data_structures/aligned_stack.h" +#include "pls/internal/scheduling/data_structures/aligned_stack.h" #include "pls/internal/base/thread.h" #include "pls/internal/scheduling/thread_state.h" diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 73771f4..e8925cb 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -4,8 +4,8 @@ #include "pls/internal/helpers/profiler.h" -#include "pls/internal/data_structures/aligned_stack.h" -#include "pls/internal/data_structures/deque.h" +#include "pls/internal/scheduling/data_structures/aligned_stack.h" +#include "pls/internal/scheduling/data_structures/deque.h" #include "pls/internal/scheduling/thread_state.h" diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 0787555..3d5ff63 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -6,8 +6,8 @@ #include "pls/internal/base/thread.h" -#include "pls/internal/data_structures/aligned_stack.h" -#include "pls/internal/data_structures/deque.h" +#include "pls/internal/scheduling/data_structures/aligned_stack.h" +#include "pls/internal/scheduling/data_structures/deque.h" namespace pls { namespace internal { diff --git a/lib/pls/src/internal/data_structures/aligned_stack.cpp b/lib/pls/src/internal/scheduling/data_structures/aligned_stack.cpp similarity index 93% rename from lib/pls/src/internal/data_structures/aligned_stack.cpp rename to lib/pls/src/internal/scheduling/data_structures/aligned_stack.cpp index 2719462..8ad0ead 100644 --- a/lib/pls/src/internal/data_structures/aligned_stack.cpp +++ b/lib/pls/src/internal/scheduling/data_structures/aligned_stack.cpp @@ -1,8 +1,9 @@ -#include "pls/internal/data_structures/aligned_stack.h" +#include "pls/internal/scheduling/data_structures/aligned_stack.h" #include "pls/internal/base/system_details.h" namespace pls { namespace internal { +namespace scheduling { namespace data_structures { aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) : @@ -37,3 +38,4 @@ void *aligned_stack::push_bytes(size_t size) { } } } +} diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 522908b..46a4e0a 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -1,6 +1,7 @@ #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/task.h" +#include "pls/internal/scheduling/data_structures/deque.h" #include "pls/internal/base/error_handling.h" @@ -79,7 +80,6 @@ void scheduler::terminate() { terminated_ = true; sync_barrier_.wait(); - for (unsigned int i = 0; i < num_threads_; i++) { if (reuse_thread_ && i == 0) { continue; @@ -102,6 +102,7 @@ task *scheduler::steal_task() { const auto my_id = my_state->id_; const size_t offset = my_state->random_() % num_threads(); const size_t max_tries = num_threads(); // TODO: Tune this value + bool any_cas_fails_occured = false; // Current strategy: random start, then round robin from there for (size_t i = 0; i < max_tries; i++) { @@ -111,8 +112,10 @@ task *scheduler::steal_task() { target = ((target == my_id) + target) % num_threads(); auto target_state = thread_state_for(target); - // TODO: See if we should re-try popping if it failed due to contention - auto result = target_state->deque_.pop_external_task(); + + bool cas_fail; + auto result = target_state->deque_.pop_external_task(cas_fail); + any_cas_fails_occured |= cas_fail; if (result != nullptr) { return result; } @@ -120,7 +123,11 @@ task *scheduler::steal_task() { // TODO: See if we should backoff here (per missed steal) } - // TODO: See if we should backoff here (after a 'round' of missed steals) + if (!any_cas_fails_occured) { + // Went through every task and we did not find any work. + // Most likely there is non available right now, yield to other threads. + pls::internal::base::this_thread::yield(); + } return nullptr; } diff --git a/lib/pls/src/internal/scheduling/scheduler_memory.cpp b/lib/pls/src/internal/scheduling/scheduler_memory.cpp index e59abd3..b76aaaf 100644 --- a/lib/pls/src/internal/scheduling/scheduler_memory.cpp +++ b/lib/pls/src/internal/scheduling/scheduler_memory.cpp @@ -1,4 +1,5 @@ #include "pls/internal/scheduling/scheduler_memory.h" +#include "pls/internal/scheduling/data_structures/aligned_stack.h" namespace pls { namespace internal { diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index 02932e3..d515972 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -2,13 +2,13 @@ #include "pls/internal/base/system_details.h" -#include "pls/internal/data_structures/aligned_stack.h" -#include "pls/internal/data_structures/locking_deque.h" -#include "pls/internal/data_structures/work_stealing_deque.h" +#include "pls/internal/scheduling/data_structures/aligned_stack.h" +#include "pls/internal/scheduling/data_structures/locking_deque.h" +#include "pls/internal/scheduling/data_structures/work_stealing_deque.h" #include -using namespace pls::internal::data_structures; +using namespace pls::internal::scheduling::data_structures; using namespace pls::internal::base; using namespace std;