From 0141a57ab49c248e7e35b774511eb3cdef67235c Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Fri, 24 Jan 2020 17:39:56 +0100 Subject: [PATCH] Skecht 'externaly trading task dequeue'. The deque trades tasks when stealing. Right now only the fast local path is tested and implemented. For the next step to work we also need to add the resource stack and resource tarding to the system. --- app/benchmark_fib/main.cpp | 2 +- lib/pls/CMakeLists.txt | 8 +++----- lib/pls/include/pls/internal/base/error_handling.h | 3 +-- lib/pls/include/pls/internal/base/system_details.h | 2 +- lib/pls/include/pls/internal/scheduling/external_trading_deque.h | 165 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/task.h | 13 +++++++------ lib/pls/include/pls/internal/scheduling/task_manager.h | 38 ++++++++++++++++++++++---------------- lib/pls/include/pls/internal/scheduling/traded_cas_field.h | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 287 insertions(+), 31 deletions(-) create mode 100644 lib/pls/include/pls/internal/scheduling/external_trading_deque.h create mode 100644 lib/pls/include/pls/internal/scheduling/traded_cas_field.h diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 37e12f4..8180fa3 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -30,7 +30,7 @@ int pls_fib(int n) { constexpr int MAX_NUM_THREADS = 1; constexpr int MAX_NUM_TASKS = 64; -constexpr int MAX_STACK_SIZE = 256; +constexpr int MAX_STACK_SIZE = 1024; int main(int argc, char **argv) { int num_threads; diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 7ec8d16..81e80df 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -49,21 +49,19 @@ add_library(pls STATIC include/pls/internal/helpers/seqence.h include/pls/internal/helpers/member_function.h - include/pls/internal/scheduling/parallel_result.h include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_memory.h include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp - include/pls/internal/scheduling/cont_manager.h - include/pls/internal/scheduling/cont.h include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h - include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h src/internal/base/error_handling.cpp - include/pls/internal/data_structures/bounded_trading_deque.h) + include/pls/internal/data_structures/bounded_trading_deque.h + include/pls/internal/scheduling/external_trading_deque.h + include/pls/internal/scheduling/traded_cas_field.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/base/error_handling.h b/lib/pls/include/pls/internal/base/error_handling.h index 89fa220..8704cc8 100644 --- a/lib/pls/include/pls/internal/base/error_handling.h +++ b/lib/pls/include/pls/internal/base/error_handling.h @@ -16,7 +16,6 @@ void pls_error(const char *msg); // TODO: Distinguish between debug/internal asserts and production asserts. -// TODO: Re-Enable Asserts -#define PLS_ASSERT(cond, msg) //if (!(cond)) { pls_error(msg); } +#define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); } #endif //PLS_ERROR_HANDLING_H diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h index 01543c0..165ed3e 100644 --- a/lib/pls/include/pls/internal/base/system_details.h +++ b/lib/pls/include/pls/internal/base/system_details.h @@ -39,7 +39,7 @@ using pointer_t = std::uintptr_t; * Usually it is sane to assume a pointer can be swapped in a single CAS operation. */ using cas_integer = std::uintptr_t; -constexpr unsigned long CAS_SIZE = sizeof(cas_integer); +constexpr unsigned long CAS_SIZE = sizeof(cas_integer) * 8; /** * Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings). diff --git a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h new file mode 100644 index 0000000..31cd38e --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h @@ -0,0 +1,165 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ +#define PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ + +#include +#include + +#include "pls/internal/base/error_handling.h" +#include "pls/internal/base/system_details.h" + +#include "pls/internal/data_structures/optional.h" +#include "pls/internal/data_structures/stamped_integer.h" + +#include "pls/internal/scheduling/traded_cas_field.h" +#include "pls/internal/scheduling/task.h" + +namespace pls { +namespace internal { +namespace scheduling { + +struct trading_deque_entry { + std::atomic traded_task_{nullptr}; + std::atomic forwarding_stamp_{}; +}; + +/** + * A work stealing deque (single produces/consumer at the end, multiple consumers at the start). + * A task object can only be acquired by stealing consumers (from the start), + * when they also offer a task to trade in for it. + * + * The exchange of 'goods' (here tasks) happens atomically at a linearization point. + * This means that the owning thread always gets a tasks for each and every task that was + * successfully stolen. + * + * As each task is associated with memory this suffices to exchange memory blocks needed for execution. + */ +class external_trading_deque { + public: + external_trading_deque(trading_deque_entry *entries, size_t num_entries) : + entries_{entries}, num_entries_{num_entries} {}; + + /** + * Pushes a task on the bottom of the deque. + * The task itself wil be filled with the unique, synchronizing cas word. + * + * @param published_task The task to publish on the bottom of the deque. + * @return The content of the cas word, can be later used to check if it changed. + */ + traded_cas_field push_bot(task *published_task) { + auto expected_stamp = bot_internal_.stamp; + auto ¤t_entry = entries_[bot_internal_.value]; + + // Store stealing information in the task and deque. + // Relaxed is fine for this, as adding elements is synced over the bot pointer. + current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); + + traded_cas_field new_cas_field; + new_cas_field.fill_with_stamp(expected_stamp, deque_id_); + published_task->traded_field_.store(new_cas_field, std::memory_order_relaxed); + current_entry.traded_task_.store(published_task, std::memory_order_relaxed); + + // Advance the bot pointer. Linearization point for making the task public. + bot_internal_.stamp++; + bot_internal_.value++; + bot_.store(bot_internal_.value, std::memory_order_release); + + return new_cas_field; + } + + void popped_bot() { + bot_internal_.value--; + + bot_.store(bot_internal_.value, std::memory_order_relaxed); + if (bot_internal_.value == 0) { + bot_internal_.stamp++; + top_.store({bot_internal_.stamp, 0}, std::memory_order_release); + } + } + + void empty_deque() { + bot_internal_.value = 0; + bot_internal_.stamp++; + + // TODO: We might be able to relax memory orderings... + bot_.store(bot_internal_.value); + top_.store(bot_internal_); + } + + std::tuple, data_structures::stamped_integer> peek_top() { + auto local_top = top_.load(); + auto local_bot = bot_.load(); + + if (local_top.value >= local_bot) { + return std::make_tuple(data_structures::optional{}, local_top); + } else { + return std::make_tuple(data_structures::optional{entries_[local_top.value].traded_task_}, local_top); + } + } + + data_structures::optional pop_top(task *trade_offer, data_structures::stamped_integer local_top) { + auto local_bot = bot_.load(); + if (local_top.value >= local_bot) { + return data_structures::optional{}; + } + + unsigned long expected_top_stamp = local_top.stamp; + auto &target_entry = entries_[local_top.value]; + + // Read our potential result + task *result = target_entry.traded_task_.load(std::memory_order_relaxed); + unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(std::memory_order_relaxed); + + // Try to get it by CAS with the expected field entry, giving up our offered_object for it + traded_cas_field expected_field; + expected_field.fill_with_stamp(expected_top_stamp, deque_id_); + traded_cas_field offered_field; + offered_field.fill_with_trade_object(trade_offer); + + if (result->traded_field_.compare_exchange_strong(expected_field, offered_field, std::memory_order_acq_rel)) { + // We got it, for sure move the top pointer forward. + top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + // Return the stolen task + return data_structures::optional{result}; + } else { + // We did not get it...help forwarding the top pointer anyway. + if (expected_top_stamp == forwarding_stamp) { + // ...move the pointer forward if someone else put a valid trade object in there. + top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + } else { + // ...we failed because the top tag lags behind...try to fix it. + // This means only updating the tag, as this location can still hold data we need. + top_.compare_exchange_strong(local_top, {forwarding_stamp, local_top.value}); + } + return data_structures::optional{}; + } + } + + private: + trading_deque_entry *entries_; + size_t num_entries_; + + unsigned deque_id_; + + alignas(base::system_details::CACHE_LINE_SIZE) std::atomic top_{{0, 0}}; + alignas(base::system_details::CACHE_LINE_SIZE) std::atomic bot_{0}; + + data_structures::stamped_integer bot_internal_{0, 0}; +}; + +template +class static_external_trading_deque { + public: + static_external_trading_deque() : items_{}, deque_{items_.data(), SIZE} {} + + external_trading_deque &get_deque() { return deque_; } + private: + std::array items_; + external_trading_deque deque_; +}; + +} +} +} + +#endif //PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 5c734b6..8be6867 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -9,6 +9,8 @@ #include "pls/internal/base/system_details.h" +#include "pls/internal/scheduling/traded_cas_field.h" + namespace pls { namespace internal { namespace scheduling { @@ -74,16 +76,15 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { return context_switcher::enter_context(stack_memory_, stack_size_, std::forward(lambda)); } - // TODO: Remove and add proper version - // Simulate 'fast' syncronization - std::atomic flag_{0}; - - private: + // TODO: Proper access control // Stack/Continuation Management char *stack_memory_; - size_t stack_size_; // TODO: We do not need this in every single task... + size_t stack_size_; context_switcher::continuation continuation_; + // Work-Stealing + std::atomic traded_field_{}; + // Task Tree (we have a parent that we want to continue when we finish) task *parent_task_; unsigned depth_; diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index a49090e..85c1558 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -9,8 +9,8 @@ #include "context_switcher/continuation.h" #include "pls/internal/scheduling/task.h" +#include "pls/internal/scheduling/external_trading_deque.h" -#include "pls/internal/data_structures/bounded_trading_deque.h" #include "pls/internal/data_structures/aligned_stack.h" namespace pls { @@ -26,7 +26,11 @@ class task_manager { explicit task_manager(task *tasks, data_structures::aligned_stack static_stack_space, size_t num_tasks, - size_t stack_size) { + size_t stack_size, + external_trading_deque &deque) : num_tasks_{num_tasks}, + this_thread_tasks_{tasks}, + active_task_{&tasks[0]}, + deque_{deque} { for (size_t i = 0; i < num_tasks - 1; i++) { tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0); if (i > 0) { @@ -36,10 +40,6 @@ class task_manager { tasks[i].set_next(&tasks[i + 1]); } } - - num_tasks_ = num_tasks; - this_thread_tasks_ = tasks; - active_task_ = &tasks[0]; } task &get_this_thread_task(size_t depth) { @@ -66,18 +66,19 @@ class task_manager { last_task->set_continuation(std::move(cont)); active_task_ = this_task; - // TODO: Publish last task on deque (do this properly, but this simulates the fastest possible impl) -// last_task->flag_.store(1, std::memory_order_seq_cst); + traded_cas_field expected_cas_value = deque_.push_bot(active_task_); + traded_cas_field empty_cas; lambda(); - // TODO: Check if task was stolen from deque (do this properly, but this simulates the fastest possible impl) -// if (last_task->flag_.exchange(0, std::memory_order_seq_cst) == 1) { - active_task_ = last_task; - return std::move(last_task->get_continuation()); -// } else { -// return context_switcher::continuation{nullptr}; -// } + if (active_task_->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { + active_task_ = last_task; + deque_.popped_bot(); + return std::move(last_task->get_continuation()); + } else { + deque_.empty_deque(); + PLS_ERROR("Slow Path/Stealing not implemented!"); + } }); } @@ -86,6 +87,8 @@ class task_manager { task *this_thread_tasks_; task *active_task_; + + external_trading_deque &deque_; }; template @@ -94,12 +97,15 @@ class static_task_manager { static_task_manager() : tasks_{}, static_stack_storage_{}, - task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE} {}; + static_external_trading_deque_{}, + task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE, + static_external_trading_deque_.get_deque()} {}; task_manager &get_task_manager() { return task_manager_; } private: std::array tasks_; data_structures::static_aligned_stack static_stack_storage_; + static_external_trading_deque static_external_trading_deque_; task_manager task_manager_; }; diff --git a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h new file mode 100644 index 0000000..0985d79 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h @@ -0,0 +1,87 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ +#define PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ + +#include + +#include "pls/internal/base/error_handling.h" +#include "pls/internal/base/system_details.h" + +namespace pls { +namespace internal { +namespace scheduling { + +struct task; +struct traded_cas_field { + static_assert(base::system_details::CACHE_LINE_SIZE >= 4, + "Traded objects must not use their last address bits, as we use them for status flags." + "As traded objects are usually cache aligned, we need big enough cache lines."); + + // Base size of our CAS integer/pointer + static constexpr unsigned long CAS_SIZE = base::system_details::CAS_SIZE; + + // States of the integer (tag indicating current content) + static constexpr unsigned long EMPTY_TAG = 0x0lu; + static constexpr unsigned long STAMP_TAG = 0x1lu; + static constexpr unsigned long TRADE_TAG = 0x2lu; + + // Bitmasks and shifts for cas_integer_, two variants: + // cas_integer_ = traded object | tag + // cas_integer_ = stamp | id | tag + static constexpr unsigned long TAG_SIZE = 2ul; + static constexpr unsigned long TAG_BITS = ~((~0x0ul) << TAG_SIZE); + + static constexpr unsigned long TRADED_OBJECT_SIZE = CAS_SIZE - TAG_SIZE; + static constexpr unsigned long TRADED_OBJECT_SHIFT = TAG_SIZE; + static constexpr unsigned long TRADE_OBJECT_BITS = ~((~0x0ul) << TRADED_OBJECT_SIZE) << TRADED_OBJECT_SHIFT; + + static constexpr unsigned long ID_SIZE = 10ul; // Up to 1024 cores + static constexpr unsigned long ID_SHIFT = TAG_SIZE; + static constexpr unsigned long ID_BITS = ~((~0x0ul) << ID_SIZE) << ID_SHIFT; + + static constexpr unsigned long STAMP_SIZE = CAS_SIZE - TAG_SIZE - ID_SIZE; + static constexpr unsigned long STAMP_SHIFT = TAG_SIZE + ID_SIZE; + static constexpr unsigned long STAMP_BITS = ~((~0x0ul) << STAMP_SIZE) << STAMP_SHIFT; + + public: + void fill_with_stamp(unsigned long stamp, unsigned long deque_id) { + cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((stamp << ID_SHIFT) & ID_BITS) | STAMP_TAG); + } + unsigned long get_stamp() { + PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one."); + return (((unsigned long) cas_integer_) & STAMP_BITS) >> STAMP_SHIFT; + } + unsigned long get_deque_id() { + PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one."); + return (((unsigned long) cas_integer_) & ID_BITS) >> ID_SHIFT; + } + bool is_filled_with_tag() { + return (((unsigned long) cas_integer_) & TAG_BITS) == STAMP_TAG; + } + + void fill_with_trade_object(task *new_task) { + PLS_ASSERT((((unsigned long) new_task) & TAG_BITS) == 0, + "Must only store aligned objects in this data structure (last bits are needed for tag bit)"); + cas_integer_ = (((unsigned long) new_task) | TRADE_TAG); + } + task *get_trade_object() { + PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one."); + return reinterpret_cast(((unsigned long) cas_integer_) & TRADE_OBJECT_BITS); + } + bool is_filled_with_object() { + return (((unsigned long) cas_integer_) & TAG_BITS) == TRADE_TAG; + } + + bool is_empty() { + return (((unsigned long) cas_integer_) & TAG_BITS) == EMPTY_TAG; + } + + private: + base::system_details::cas_integer cas_integer_{}; +}; + +} +} +} + +#endif //PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ -- libgit2 0.26.0