Commit 65409e0a by FritzFlorian

Yield thread if there is (probably) no more work.

The scheduler yields if it failed to steal any work due to the task
lists being empty. This should improve performance on multiprogrammed
systems, as it potentially makes room for other worker threads which
still have work to perform.
parent 68af3068
Pipeline #1308 passed with stages
in 4 minutes 6 seconds
...@@ -35,7 +35,7 @@ add_library(pls STATIC ...@@ -35,7 +35,7 @@ add_library(pls STATIC
include/pls/internal/base/error_handling.h include/pls/internal/base/error_handling.h
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/scheduling/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/scheduling/data_structures/aligned_stack.h src/internal/scheduling/data_structures/aligned_stack.cpp
include/pls/internal/scheduling/data_structures/aligned_stack_impl.h include/pls/internal/scheduling/data_structures/aligned_stack_impl.h
include/pls/internal/scheduling/data_structures/deque.h include/pls/internal/scheduling/data_structures/deque.h
include/pls/internal/scheduling/data_structures/locking_deque.h include/pls/internal/scheduling/data_structures/locking_deque.h
......
...@@ -15,19 +15,19 @@ namespace data_structures { ...@@ -15,19 +15,19 @@ namespace data_structures {
using deque_offset = aligned_stack::stack_offset; using deque_offset = aligned_stack::stack_offset;
/** /**
* Wraps any object into a deque item. * Wraps any object into a deque Task.
*/ */
template<typename Item> template<typename Task>
struct locking_deque_item { struct locking_deque_task {
Item *item_; Task *item_;
locking_deque_item *prev_; locking_deque_task *prev_;
locking_deque_item *next_; locking_deque_task *next_;
}; };
template<typename Item, typename Content> template<typename Task, typename Content>
struct locking_deque_container : public locking_deque_item<Item> { struct locking_deque_container : public locking_deque_task<Task> {
Content content_; Content content_;
public: public:
...@@ -39,16 +39,16 @@ struct locking_deque_container : public locking_deque_item<Item> { ...@@ -39,16 +39,16 @@ struct locking_deque_container : public locking_deque_item<Item> {
* A double linked list based deque. * A double linked list based deque.
* Storage is therefore only needed for the individual items. * Storage is therefore only needed for the individual items.
* *
* @tparam Item The type of items stored in this deque * @tparam Task The type of Tasks stored in this deque
*/ */
template<typename Item> template<typename Task>
class locking_deque { class locking_deque {
aligned_stack *stack_; aligned_stack *stack_;
locking_deque_item<Item> *head_; locking_deque_task<Task> *head_;
locking_deque_item<Item> *tail_; locking_deque_task<Task> *tail_;
locking_deque_item<Item> *last_inserted_; locking_deque_task<Task> *last_inserted_;
base::spin_lock lock_; base::spin_lock lock_;
...@@ -63,8 +63,10 @@ class locking_deque { ...@@ -63,8 +63,10 @@ class locking_deque {
void *push_bytes(size_t size); void *push_bytes(size_t size);
void publish_last_task(); void publish_last_task();
Item *pop_local_task(); Task *pop_local_task(bool &cas_fail_out);
Item *pop_external_task(); Task *pop_local_task();
Task *pop_external_task(bool &cas_fail_out);
Task *pop_external_task();
void reset_offset(deque_offset state); void reset_offset(deque_offset state);
deque_offset save_offset(); deque_offset save_offset();
......
...@@ -14,7 +14,7 @@ T *locking_deque<Task>::push_task(ARGS &&...args) { ...@@ -14,7 +14,7 @@ T *locking_deque<Task>::push_task(ARGS &&...args) {
"Must only push types of <Item> onto work_stealing_deque<Item>"); "Must only push types of <Item> onto work_stealing_deque<Item>");
// Allocate object // Allocate object
auto deque_item = stack_->push < locking_deque_container < Task, T>>(std::forward<ARGS>(args)...); auto deque_item = stack_->push<locking_deque_container<Task, T>>(std::forward<ARGS>(args)...);
deque_item->item_ = &deque_item->content_; deque_item->item_ = &deque_item->content_;
// Keep for later publishing // Keep for later publishing
...@@ -53,7 +53,14 @@ void locking_deque<Task>::publish_last_task() { ...@@ -53,7 +53,14 @@ void locking_deque<Task>::publish_last_task() {
template<typename Task> template<typename Task>
Task *locking_deque<Task>::pop_local_task() { Task *locking_deque<Task>::pop_local_task() {
bool cas_fail_out;
return pop_local_task(cas_fail_out);
}
template<typename Task>
Task *locking_deque<Task>::pop_local_task(bool &cas_fail_out) {
std::lock_guard<base::spin_lock> lock{lock_}; std::lock_guard<base::spin_lock> lock{lock_};
cas_fail_out = false; // Can not fail CAS in locking implementation
if (tail_ == nullptr) { if (tail_ == nullptr) {
return nullptr; return nullptr;
...@@ -72,7 +79,14 @@ Task *locking_deque<Task>::pop_local_task() { ...@@ -72,7 +79,14 @@ Task *locking_deque<Task>::pop_local_task() {
template<typename Task> template<typename Task>
Task *locking_deque<Task>::pop_external_task() { Task *locking_deque<Task>::pop_external_task() {
bool cas_fail_out;
return pop_external_task(cas_fail_out);
}
template<typename Task>
Task *locking_deque<Task>::pop_external_task(bool &cas_fail_out) {
std::lock_guard<base::spin_lock> lock{lock_}; std::lock_guard<base::spin_lock> lock{lock_};
cas_fail_out = false; // Can not fail CAS in locking implementation
if (head_ == nullptr) { if (head_ == nullptr) {
return nullptr; return nullptr;
......
...@@ -79,7 +79,9 @@ class work_stealing_deque { ...@@ -79,7 +79,9 @@ class work_stealing_deque {
void *push_bytes(size_t size); void *push_bytes(size_t size);
void publish_last_task(); void publish_last_task();
Task *pop_local_task(bool &cas_fail_out);
Task *pop_local_task(); Task *pop_local_task();
Task *pop_external_task(bool &cas_fail_out);
Task *pop_external_task(); Task *pop_external_task();
void reset_offset(deque_offset offset); void reset_offset(deque_offset offset);
......
...@@ -72,10 +72,17 @@ void work_stealing_deque<Task>::publish_last_task() { ...@@ -72,10 +72,17 @@ void work_stealing_deque<Task>::publish_last_task() {
template<typename Task> template<typename Task>
Task *work_stealing_deque<Task>::pop_local_task() { Task *work_stealing_deque<Task>::pop_local_task() {
bool cas_fail_out;
return pop_local_task(cas_fail_out);
}
template<typename Task>
Task *work_stealing_deque<Task>::pop_local_task(bool &cas_fail_out) {
deque_offset local_tail = tail_; deque_offset local_tail = tail_;
stamped_integer local_head = head_; stamped_integer local_head = head_;
if (local_tail <= local_head.value) { if (local_tail <= local_head.value) {
cas_fail_out = false;
return nullptr; // EMPTY return nullptr; // EMPTY
} }
...@@ -89,6 +96,7 @@ Task *work_stealing_deque<Task>::pop_local_task() { ...@@ -89,6 +96,7 @@ Task *work_stealing_deque<Task>::pop_local_task() {
local_head = head_; // Linearization point, outside knows list is empty local_head = head_; // Linearization point, outside knows list is empty
if (local_head.value < new_tail) { if (local_head.value < new_tail) {
cas_fail_out = false;
return previous_tail_item->data<Task>(); // Success, enough distance to other threads return previous_tail_item->data<Task>(); // Success, enough distance to other threads
} }
...@@ -96,6 +104,7 @@ Task *work_stealing_deque<Task>::pop_local_task() { ...@@ -96,6 +104,7 @@ Task *work_stealing_deque<Task>::pop_local_task() {
stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail}; stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail};
// Try competing with consumers by updating the head's stamp value // Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) { if (head_.compare_exchange_strong(local_head, new_head)) {
cas_fail_out = false;
return previous_tail_item->data<Task>(); // SUCCESS, we won the competition with other threads return previous_tail_item->data<Task>(); // SUCCESS, we won the competition with other threads
} }
} }
...@@ -105,15 +114,23 @@ Task *work_stealing_deque<Task>::pop_local_task() { ...@@ -105,15 +114,23 @@ Task *work_stealing_deque<Task>::pop_local_task() {
// Reset the queue into an empty state => head_ = tail_ // Reset the queue into an empty state => head_ = tail_
tail_ = local_head.value; // ...we give up to the other winning thread tail_ = local_head.value; // ...we give up to the other winning thread
cas_fail_out = false; // We failed the CAS race, but the queue is also empty for sure!
return nullptr; // EMPTY, we lost the competition with other threads return nullptr; // EMPTY, we lost the competition with other threads
} }
template<typename Task> template<typename Task>
Task *work_stealing_deque<Task>::pop_external_task() { Task *work_stealing_deque<Task>::pop_external_task() {
bool cas_fail_out;
return pop_external_task(cas_fail_out);
}
template<typename Task>
Task *work_stealing_deque<Task>::pop_external_task(bool &cas_fail_out) {
stamped_integer local_head = head_; stamped_integer local_head = head_;
deque_offset local_tail = tail_; deque_offset local_tail = tail_;
if (local_tail <= local_head.value) { if (local_tail <= local_head.value) {
cas_fail_out = false;
return nullptr; // EMPTY return nullptr; // EMPTY
} }
// Load info on current deque item. // Load info on current deque item.
...@@ -131,9 +148,11 @@ Task *work_stealing_deque<Task>::pop_external_task() { ...@@ -131,9 +148,11 @@ Task *work_stealing_deque<Task>::pop_external_task() {
// 3) owning thread removed tail, we lose to this // 3) owning thread removed tail, we lose to this
stamped_integer new_head = stamped_integer{local_head.stamp + 1, next_item_offset}; stamped_integer new_head = stamped_integer{local_head.stamp + 1, next_item_offset};
if (head_.compare_exchange_strong(local_head, new_head)) { if (head_.compare_exchange_strong(local_head, new_head)) {
cas_fail_out = false;
return head_data_item; // SUCCESS, we won the competition return head_data_item; // SUCCESS, we won the competition
} }
cas_fail_out = true;
return nullptr; // EMPTY, we lost the competition return nullptr; // EMPTY, we lost the competition
} }
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h" #include "pls/internal/base/barrier.h"
......
#ifndef PLS_SCHEDULER_MEMORY_H #ifndef PLS_SCHEDULER_MEMORY_H
#define PLS_SCHEDULER_MEMORY_H #define PLS_SCHEDULER_MEMORY_H
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
......
...@@ -4,8 +4,8 @@ ...@@ -4,8 +4,8 @@
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/deque.h" #include "pls/internal/scheduling/data_structures/deque.h"
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
......
...@@ -6,8 +6,8 @@ ...@@ -6,8 +6,8 @@
#include "pls/internal/base/thread.h" #include "pls/internal/base/thread.h"
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/deque.h" #include "pls/internal/scheduling/data_structures/deque.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
......
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling {
namespace data_structures { namespace data_structures {
aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) : aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) :
...@@ -37,3 +38,4 @@ void *aligned_stack::push_bytes(size_t size) { ...@@ -37,3 +38,4 @@ void *aligned_stack::push_bytes(size_t size) {
} }
} }
} }
}
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/data_structures/deque.h"
#include "pls/internal/base/error_handling.h" #include "pls/internal/base/error_handling.h"
...@@ -79,7 +80,6 @@ void scheduler::terminate() { ...@@ -79,7 +80,6 @@ void scheduler::terminate() {
terminated_ = true; terminated_ = true;
sync_barrier_.wait(); sync_barrier_.wait();
for (unsigned int i = 0; i < num_threads_; i++) { for (unsigned int i = 0; i < num_threads_; i++) {
if (reuse_thread_ && i == 0) { if (reuse_thread_ && i == 0) {
continue; continue;
...@@ -102,6 +102,7 @@ task *scheduler::steal_task() { ...@@ -102,6 +102,7 @@ task *scheduler::steal_task() {
const auto my_id = my_state->id_; const auto my_id = my_state->id_;
const size_t offset = my_state->random_() % num_threads(); const size_t offset = my_state->random_() % num_threads();
const size_t max_tries = num_threads(); // TODO: Tune this value const size_t max_tries = num_threads(); // TODO: Tune this value
bool any_cas_fails_occured = false;
// Current strategy: random start, then round robin from there // Current strategy: random start, then round robin from there
for (size_t i = 0; i < max_tries; i++) { for (size_t i = 0; i < max_tries; i++) {
...@@ -111,8 +112,10 @@ task *scheduler::steal_task() { ...@@ -111,8 +112,10 @@ task *scheduler::steal_task() {
target = ((target == my_id) + target) % num_threads(); target = ((target == my_id) + target) % num_threads();
auto target_state = thread_state_for(target); auto target_state = thread_state_for(target);
// TODO: See if we should re-try popping if it failed due to contention
auto result = target_state->deque_.pop_external_task(); bool cas_fail;
auto result = target_state->deque_.pop_external_task(cas_fail);
any_cas_fails_occured |= cas_fail;
if (result != nullptr) { if (result != nullptr) {
return result; return result;
} }
...@@ -120,7 +123,11 @@ task *scheduler::steal_task() { ...@@ -120,7 +123,11 @@ task *scheduler::steal_task() {
// TODO: See if we should backoff here (per missed steal) // TODO: See if we should backoff here (per missed steal)
} }
// TODO: See if we should backoff here (after a 'round' of missed steals) if (!any_cas_fails_occured) {
// Went through every task and we did not find any work.
// Most likely there is non available right now, yield to other threads.
pls::internal::base::this_thread::yield();
}
return nullptr; return nullptr;
} }
......
#include "pls/internal/scheduling/scheduler_memory.h" #include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/data_structures/aligned_stack.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
......
...@@ -2,13 +2,13 @@ ...@@ -2,13 +2,13 @@
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/scheduling/data_structures/aligned_stack.h"
#include "pls/internal/data_structures/locking_deque.h" #include "pls/internal/scheduling/data_structures/locking_deque.h"
#include "pls/internal/data_structures/work_stealing_deque.h" #include "pls/internal/scheduling/data_structures/work_stealing_deque.h"
#include <mutex> #include <mutex>
using namespace pls::internal::data_structures; using namespace pls::internal::scheduling::data_structures;
using namespace pls::internal::base; using namespace pls::internal::base;
using namespace std; using namespace std;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment