diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 37e12f4..8180fa3 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -30,7 +30,7 @@ int pls_fib(int n) { constexpr int MAX_NUM_THREADS = 1; constexpr int MAX_NUM_TASKS = 64; -constexpr int MAX_STACK_SIZE = 256; +constexpr int MAX_STACK_SIZE = 1024; int main(int argc, char **argv) { int num_threads; diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 7ec8d16..81e80df 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -49,21 +49,19 @@ add_library(pls STATIC include/pls/internal/helpers/seqence.h include/pls/internal/helpers/member_function.h - include/pls/internal/scheduling/parallel_result.h include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_memory.h include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp - include/pls/internal/scheduling/cont_manager.h - include/pls/internal/scheduling/cont.h include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h - include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h src/internal/base/error_handling.cpp - include/pls/internal/data_structures/bounded_trading_deque.h) + include/pls/internal/data_structures/bounded_trading_deque.h + include/pls/internal/scheduling/external_trading_deque.h + include/pls/internal/scheduling/traded_cas_field.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/base/error_handling.h b/lib/pls/include/pls/internal/base/error_handling.h index 89fa220..8704cc8 100644 --- a/lib/pls/include/pls/internal/base/error_handling.h +++ b/lib/pls/include/pls/internal/base/error_handling.h @@ -16,7 +16,6 @@ void pls_error(const char *msg); // TODO: Distinguish between debug/internal asserts and production asserts. -// TODO: Re-Enable Asserts -#define PLS_ASSERT(cond, msg) //if (!(cond)) { pls_error(msg); } +#define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); } #endif //PLS_ERROR_HANDLING_H diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h index 01543c0..165ed3e 100644 --- a/lib/pls/include/pls/internal/base/system_details.h +++ b/lib/pls/include/pls/internal/base/system_details.h @@ -39,7 +39,7 @@ using pointer_t = std::uintptr_t; * Usually it is sane to assume a pointer can be swapped in a single CAS operation. */ using cas_integer = std::uintptr_t; -constexpr unsigned long CAS_SIZE = sizeof(cas_integer); +constexpr unsigned long CAS_SIZE = sizeof(cas_integer) * 8; /** * Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings). diff --git a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h new file mode 100644 index 0000000..31cd38e --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h @@ -0,0 +1,165 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ +#define PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ + +#include +#include + +#include "pls/internal/base/error_handling.h" +#include "pls/internal/base/system_details.h" + +#include "pls/internal/data_structures/optional.h" +#include "pls/internal/data_structures/stamped_integer.h" + +#include "pls/internal/scheduling/traded_cas_field.h" +#include "pls/internal/scheduling/task.h" + +namespace pls { +namespace internal { +namespace scheduling { + +struct trading_deque_entry { + std::atomic traded_task_{nullptr}; + std::atomic forwarding_stamp_{}; +}; + +/** + * A work stealing deque (single produces/consumer at the end, multiple consumers at the start). + * A task object can only be acquired by stealing consumers (from the start), + * when they also offer a task to trade in for it. + * + * The exchange of 'goods' (here tasks) happens atomically at a linearization point. + * This means that the owning thread always gets a tasks for each and every task that was + * successfully stolen. + * + * As each task is associated with memory this suffices to exchange memory blocks needed for execution. + */ +class external_trading_deque { + public: + external_trading_deque(trading_deque_entry *entries, size_t num_entries) : + entries_{entries}, num_entries_{num_entries} {}; + + /** + * Pushes a task on the bottom of the deque. + * The task itself wil be filled with the unique, synchronizing cas word. + * + * @param published_task The task to publish on the bottom of the deque. + * @return The content of the cas word, can be later used to check if it changed. + */ + traded_cas_field push_bot(task *published_task) { + auto expected_stamp = bot_internal_.stamp; + auto ¤t_entry = entries_[bot_internal_.value]; + + // Store stealing information in the task and deque. + // Relaxed is fine for this, as adding elements is synced over the bot pointer. + current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); + + traded_cas_field new_cas_field; + new_cas_field.fill_with_stamp(expected_stamp, deque_id_); + published_task->traded_field_.store(new_cas_field, std::memory_order_relaxed); + current_entry.traded_task_.store(published_task, std::memory_order_relaxed); + + // Advance the bot pointer. Linearization point for making the task public. + bot_internal_.stamp++; + bot_internal_.value++; + bot_.store(bot_internal_.value, std::memory_order_release); + + return new_cas_field; + } + + void popped_bot() { + bot_internal_.value--; + + bot_.store(bot_internal_.value, std::memory_order_relaxed); + if (bot_internal_.value == 0) { + bot_internal_.stamp++; + top_.store({bot_internal_.stamp, 0}, std::memory_order_release); + } + } + + void empty_deque() { + bot_internal_.value = 0; + bot_internal_.stamp++; + + // TODO: We might be able to relax memory orderings... + bot_.store(bot_internal_.value); + top_.store(bot_internal_); + } + + std::tuple, data_structures::stamped_integer> peek_top() { + auto local_top = top_.load(); + auto local_bot = bot_.load(); + + if (local_top.value >= local_bot) { + return std::make_tuple(data_structures::optional{}, local_top); + } else { + return std::make_tuple(data_structures::optional{entries_[local_top.value].traded_task_}, local_top); + } + } + + data_structures::optional pop_top(task *trade_offer, data_structures::stamped_integer local_top) { + auto local_bot = bot_.load(); + if (local_top.value >= local_bot) { + return data_structures::optional{}; + } + + unsigned long expected_top_stamp = local_top.stamp; + auto &target_entry = entries_[local_top.value]; + + // Read our potential result + task *result = target_entry.traded_task_.load(std::memory_order_relaxed); + unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(std::memory_order_relaxed); + + // Try to get it by CAS with the expected field entry, giving up our offered_object for it + traded_cas_field expected_field; + expected_field.fill_with_stamp(expected_top_stamp, deque_id_); + traded_cas_field offered_field; + offered_field.fill_with_trade_object(trade_offer); + + if (result->traded_field_.compare_exchange_strong(expected_field, offered_field, std::memory_order_acq_rel)) { + // We got it, for sure move the top pointer forward. + top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + // Return the stolen task + return data_structures::optional{result}; + } else { + // We did not get it...help forwarding the top pointer anyway. + if (expected_top_stamp == forwarding_stamp) { + // ...move the pointer forward if someone else put a valid trade object in there. + top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + } else { + // ...we failed because the top tag lags behind...try to fix it. + // This means only updating the tag, as this location can still hold data we need. + top_.compare_exchange_strong(local_top, {forwarding_stamp, local_top.value}); + } + return data_structures::optional{}; + } + } + + private: + trading_deque_entry *entries_; + size_t num_entries_; + + unsigned deque_id_; + + alignas(base::system_details::CACHE_LINE_SIZE) std::atomic top_{{0, 0}}; + alignas(base::system_details::CACHE_LINE_SIZE) std::atomic bot_{0}; + + data_structures::stamped_integer bot_internal_{0, 0}; +}; + +template +class static_external_trading_deque { + public: + static_external_trading_deque() : items_{}, deque_{items_.data(), SIZE} {} + + external_trading_deque &get_deque() { return deque_; } + private: + std::array items_; + external_trading_deque deque_; +}; + +} +} +} + +#endif //PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 5c734b6..8be6867 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -9,6 +9,8 @@ #include "pls/internal/base/system_details.h" +#include "pls/internal/scheduling/traded_cas_field.h" + namespace pls { namespace internal { namespace scheduling { @@ -74,16 +76,15 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { return context_switcher::enter_context(stack_memory_, stack_size_, std::forward(lambda)); } - // TODO: Remove and add proper version - // Simulate 'fast' syncronization - std::atomic flag_{0}; - - private: + // TODO: Proper access control // Stack/Continuation Management char *stack_memory_; - size_t stack_size_; // TODO: We do not need this in every single task... + size_t stack_size_; context_switcher::continuation continuation_; + // Work-Stealing + std::atomic traded_field_{}; + // Task Tree (we have a parent that we want to continue when we finish) task *parent_task_; unsigned depth_; diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index a49090e..85c1558 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -9,8 +9,8 @@ #include "context_switcher/continuation.h" #include "pls/internal/scheduling/task.h" +#include "pls/internal/scheduling/external_trading_deque.h" -#include "pls/internal/data_structures/bounded_trading_deque.h" #include "pls/internal/data_structures/aligned_stack.h" namespace pls { @@ -26,7 +26,11 @@ class task_manager { explicit task_manager(task *tasks, data_structures::aligned_stack static_stack_space, size_t num_tasks, - size_t stack_size) { + size_t stack_size, + external_trading_deque &deque) : num_tasks_{num_tasks}, + this_thread_tasks_{tasks}, + active_task_{&tasks[0]}, + deque_{deque} { for (size_t i = 0; i < num_tasks - 1; i++) { tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0); if (i > 0) { @@ -36,10 +40,6 @@ class task_manager { tasks[i].set_next(&tasks[i + 1]); } } - - num_tasks_ = num_tasks; - this_thread_tasks_ = tasks; - active_task_ = &tasks[0]; } task &get_this_thread_task(size_t depth) { @@ -66,18 +66,19 @@ class task_manager { last_task->set_continuation(std::move(cont)); active_task_ = this_task; - // TODO: Publish last task on deque (do this properly, but this simulates the fastest possible impl) -// last_task->flag_.store(1, std::memory_order_seq_cst); + traded_cas_field expected_cas_value = deque_.push_bot(active_task_); + traded_cas_field empty_cas; lambda(); - // TODO: Check if task was stolen from deque (do this properly, but this simulates the fastest possible impl) -// if (last_task->flag_.exchange(0, std::memory_order_seq_cst) == 1) { - active_task_ = last_task; - return std::move(last_task->get_continuation()); -// } else { -// return context_switcher::continuation{nullptr}; -// } + if (active_task_->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { + active_task_ = last_task; + deque_.popped_bot(); + return std::move(last_task->get_continuation()); + } else { + deque_.empty_deque(); + PLS_ERROR("Slow Path/Stealing not implemented!"); + } }); } @@ -86,6 +87,8 @@ class task_manager { task *this_thread_tasks_; task *active_task_; + + external_trading_deque &deque_; }; template @@ -94,12 +97,15 @@ class static_task_manager { static_task_manager() : tasks_{}, static_stack_storage_{}, - task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE} {}; + static_external_trading_deque_{}, + task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE, + static_external_trading_deque_.get_deque()} {}; task_manager &get_task_manager() { return task_manager_; } private: std::array tasks_; data_structures::static_aligned_stack static_stack_storage_; + static_external_trading_deque static_external_trading_deque_; task_manager task_manager_; }; diff --git a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h new file mode 100644 index 0000000..0985d79 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h @@ -0,0 +1,87 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ +#define PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_ + +#include + +#include "pls/internal/base/error_handling.h" +#include "pls/internal/base/system_details.h" + +namespace pls { +namespace internal { +namespace scheduling { + +struct task; +struct traded_cas_field { + static_assert(base::system_details::CACHE_LINE_SIZE >= 4, + "Traded objects must not use their last address bits, as we use them for status flags." + "As traded objects are usually cache aligned, we need big enough cache lines."); + + // Base size of our CAS integer/pointer + static constexpr unsigned long CAS_SIZE = base::system_details::CAS_SIZE; + + // States of the integer (tag indicating current content) + static constexpr unsigned long EMPTY_TAG = 0x0lu; + static constexpr unsigned long STAMP_TAG = 0x1lu; + static constexpr unsigned long TRADE_TAG = 0x2lu; + + // Bitmasks and shifts for cas_integer_, two variants: + // cas_integer_ = traded object | tag + // cas_integer_ = stamp | id | tag + static constexpr unsigned long TAG_SIZE = 2ul; + static constexpr unsigned long TAG_BITS = ~((~0x0ul) << TAG_SIZE); + + static constexpr unsigned long TRADED_OBJECT_SIZE = CAS_SIZE - TAG_SIZE; + static constexpr unsigned long TRADED_OBJECT_SHIFT = TAG_SIZE; + static constexpr unsigned long TRADE_OBJECT_BITS = ~((~0x0ul) << TRADED_OBJECT_SIZE) << TRADED_OBJECT_SHIFT; + + static constexpr unsigned long ID_SIZE = 10ul; // Up to 1024 cores + static constexpr unsigned long ID_SHIFT = TAG_SIZE; + static constexpr unsigned long ID_BITS = ~((~0x0ul) << ID_SIZE) << ID_SHIFT; + + static constexpr unsigned long STAMP_SIZE = CAS_SIZE - TAG_SIZE - ID_SIZE; + static constexpr unsigned long STAMP_SHIFT = TAG_SIZE + ID_SIZE; + static constexpr unsigned long STAMP_BITS = ~((~0x0ul) << STAMP_SIZE) << STAMP_SHIFT; + + public: + void fill_with_stamp(unsigned long stamp, unsigned long deque_id) { + cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((stamp << ID_SHIFT) & ID_BITS) | STAMP_TAG); + } + unsigned long get_stamp() { + PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one."); + return (((unsigned long) cas_integer_) & STAMP_BITS) >> STAMP_SHIFT; + } + unsigned long get_deque_id() { + PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one."); + return (((unsigned long) cas_integer_) & ID_BITS) >> ID_SHIFT; + } + bool is_filled_with_tag() { + return (((unsigned long) cas_integer_) & TAG_BITS) == STAMP_TAG; + } + + void fill_with_trade_object(task *new_task) { + PLS_ASSERT((((unsigned long) new_task) & TAG_BITS) == 0, + "Must only store aligned objects in this data structure (last bits are needed for tag bit)"); + cas_integer_ = (((unsigned long) new_task) | TRADE_TAG); + } + task *get_trade_object() { + PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one."); + return reinterpret_cast(((unsigned long) cas_integer_) & TRADE_OBJECT_BITS); + } + bool is_filled_with_object() { + return (((unsigned long) cas_integer_) & TAG_BITS) == TRADE_TAG; + } + + bool is_empty() { + return (((unsigned long) cas_integer_) & TAG_BITS) == EMPTY_TAG; + } + + private: + base::system_details::cas_integer cas_integer_{}; +}; + +} +} +} + +#endif //PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_