From 2adb2d1656cf161cc469ba0fe20a9b9287862f22 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Wed, 29 Jan 2020 17:03:59 +0100 Subject: [PATCH] WIP: Add simple external trading deque test. The current version has race conditions and is hard to debug (especially because of the fibers, if a wrong thread executes on a fiber we get segfalts very fast). To combat this mess we now refactor the code bit by bit while also adding tests where it can be done with reasonably effort). --- app/benchmark_fib/main.cpp | 8 ++++---- app/playground/main.cpp | 51 ++++++++++++++++++++++----------------------------- lib/context_switcher/asm/enter_context_x86_64.s | 22 ++++++++++++++-------- lib/context_switcher/asm/switch_context_x86_64.s | 12 ++++++------ lib/context_switcher/include/context_switcher/continuation.h | 7 +++++++ lib/pls/include/pls/internal/scheduling/external_trading_deque.h | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------------------- lib/pls/include/pls/internal/scheduling/task.h | 2 +- lib/pls/include/pls/internal/scheduling/task_manager.h | 1 + lib/pls/include/pls/internal/scheduling/task_manager_impl.h | 25 ++++++++++++++----------- lib/pls/include/pls/internal/scheduling/traded_cas_field.h | 8 ++++---- lib/pls/src/internal/scheduling/scheduler.cpp | 7 +++---- lib/pls/src/internal/scheduling/task_manager.cpp | 22 +++++++++++----------- test/scheduling_tests.cpp | 131 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------- 13 files changed, 279 insertions(+), 167 deletions(-) diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 7e3976b..5ea5f82 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -32,8 +32,8 @@ int pls_fib(int n) { } constexpr int MAX_NUM_THREADS = 4; -constexpr int MAX_NUM_TASKS = 64; -constexpr int MAX_STACK_SIZE = 4096 * 8; +constexpr int MAX_NUM_TASKS = 32; +constexpr int MAX_STACK_SIZE = 1024 * 32; static_scheduler_memory #include -#include #include "context_switcher/context_switcher.h" using namespace context_switcher; using namespace std; -const size_t NUM_RUNS = 1000; - // Memory for custom stack and continuation semantics -const size_t STACK_SIZE = 512 * 1; -const size_t NUM_STACKS = 64; +const size_t STACK_SIZE = 512 * 16; +const size_t NUM_STACKS = 4; char custom_stacks[NUM_STACKS][STACK_SIZE]; -int fib(int n) { - if (n <= 1) { - return 1; - } +volatile int result; - int a, b; - enter_context(custom_stacks[n], STACK_SIZE, [n, &a](continuation &&cont) { - a = fib(n - 1); - return std::move(cont); - }); - enter_context(custom_stacks[n], STACK_SIZE, [n, &b](continuation &&cont) { - b = fib(n - 2); - return std::move(cont); - }); +int main() { + context_switcher::continuation outer_cont = enter_context(custom_stacks[0], STACK_SIZE, [](continuation &&main_cont) { + enter_context(custom_stacks[1], STACK_SIZE, [&main_cont](continuation &&middle_cont) { + enter_context(custom_stacks[2], STACK_SIZE, [&main_cont](continuation &&inner_cont) { + for (int i = 0; i < 10; i++) { + printf("Inner %d\n", i); + main_cont = context_switcher::switch_context(std::move(main_cont)); + } - return a + b; -} + return std::move(inner_cont); + }); -volatile int result; -int main() { - auto start_time = chrono::steady_clock::now(); - for (unsigned int i = 0; i < NUM_RUNS; i++) { - result = fib(18); - } - auto end_time = chrono::steady_clock::now(); - auto time = chrono::duration_cast(end_time - start_time).count(); + return std::move(middle_cont); + }); - printf("%f", (float) time / NUM_RUNS); + return std::move(main_cont); + }); + + for (int i = 0; i < 10; i++) { + printf("Outer %d\n", i); + outer_cont = context_switcher::switch_context(std::move(outer_cont)); + } return 0; } diff --git a/lib/context_switcher/asm/enter_context_x86_64.s b/lib/context_switcher/asm/enter_context_x86_64.s index 58afc40..44b29d3 100644 --- a/lib/context_switcher/asm/enter_context_x86_64.s +++ b/lib/context_switcher/asm/enter_context_x86_64.s @@ -17,10 +17,11 @@ __cs_enter_context: # Variables # r12 = temporary for the old stack pointer - ############### Save State ############### - # Make space for all register state we will store. - leaq -0x38(%rsp), %rsp + pushq %rbp + movq %rsp, %rbp + subq $0x38, %rsp + ############### Save State ############### # Store calee saved general registers. movq %r12, 0x00(%rsp) movq %r13, 0x08(%rsp) @@ -42,6 +43,13 @@ __cs_enter_context: # Switch to new stack pointer. movq %rdi, %rsp + # Init the new stack (in our case we want the stack trace to 'stick' to where it was created. + # This will not necessary be valid all the time (thus returning into it is not ok), but + # we only use it for debugging as we explicitly state throwing exceptions beyond the fiber is not ok. + # pushq 0x8(%rbp) + # pushq 0x0(%rbp) + # movq %rsp, %rbp + # Perform actual function call, this will now be on the new stack # rdi = first parametor to callback (continuation) # rsi = second parameter to callback (arbetary pointer) @@ -65,14 +73,12 @@ __cs_enter_context: ldmxcsr 0x30(%rsp) # restore x87 control-word fldcw 0x34(%rsp) - - # Free space for restored state - leaq 0x38(%rsp), %rsp ############ Restore State ############ - # TODO: Maybe look into a 'cleanup' hook for freeing the stack space here. - # Just return back from the call. # This is the end of a fiber, so we have no continuation. xor %rax, %rax + + movq %rbp, %rsp + popq %rbp ret diff --git a/lib/context_switcher/asm/switch_context_x86_64.s b/lib/context_switcher/asm/switch_context_x86_64.s index 10d536e..bb4d9ab 100644 --- a/lib/context_switcher/asm/switch_context_x86_64.s +++ b/lib/context_switcher/asm/switch_context_x86_64.s @@ -11,10 +11,11 @@ __cs_switch_context: # Return # rax = continuation that returned control back to the caller (null if fallthrough) - ############### Save State ############### - # Make space for all register state we will store. - leaq -0x38(%rsp), %rsp + pushq %rbp + movq %rsp, %rbp + subq $0x38, %rsp + ############### Save State ############### # Store calee saved general registers. movq %r12, 0x00(%rsp) movq %r13, 0x08(%rsp) @@ -46,11 +47,10 @@ __cs_switch_context: ldmxcsr 0x30(%rsp) # restore x87 control-word fldcw 0x34(%rsp) - - # Free space for restored state - leaq 0x38(%rsp), %rsp ############ Restore State ############ # Return the context we came from as a continuation. # rax has already the correct value + movq %rbp, %rsp + popq %rbp ret diff --git a/lib/context_switcher/include/context_switcher/continuation.h b/lib/context_switcher/include/context_switcher/continuation.h index 06de42b..63bc7e1 100644 --- a/lib/context_switcher/include/context_switcher/continuation.h +++ b/lib/context_switcher/include/context_switcher/continuation.h @@ -4,6 +4,9 @@ #include "assembly_bindings.h" +#include +#include + namespace context_switcher { /** @@ -38,6 +41,10 @@ struct continuation { } assembly_bindings::continuation_t consume() { + if (cont_pointer_ == nullptr) { + printf("Error!\n"); + } + auto tmp = cont_pointer_; cont_pointer_ = nullptr; return tmp; diff --git a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h index 31cd38e..0192452 100644 --- a/lib/pls/include/pls/internal/scheduling/external_trading_deque.h +++ b/lib/pls/include/pls/internal/scheduling/external_trading_deque.h @@ -3,6 +3,7 @@ #define PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_ #include +#include #include #include "pls/internal/base/error_handling.h" @@ -18,6 +19,8 @@ namespace pls { namespace internal { namespace scheduling { +using namespace data_structures; + struct trading_deque_entry { std::atomic traded_task_{nullptr}; std::atomic forwarding_stamp_{}; @@ -35,116 +38,171 @@ struct trading_deque_entry { * As each task is associated with memory this suffices to exchange memory blocks needed for execution. */ class external_trading_deque { + public: external_trading_deque(trading_deque_entry *entries, size_t num_entries) : entries_{entries}, num_entries_{num_entries} {}; + void set_thread_id(unsigned id) { + thread_id_ = id; + } + + static optional get_trade_object(task *target_task) { + traded_cas_field current_cas = target_task->external_trading_deque_cas_.load(); + if (current_cas.is_filled_with_object()) { + task *result = current_cas.get_trade_object(); + traded_cas_field empty_cas; + if (target_task->external_trading_deque_cas_.compare_exchange_strong(current_cas, empty_cas)) { + return optional{result}; + } + } + + return optional{}; + } /** * Pushes a task on the bottom of the deque. * The task itself wil be filled with the unique, synchronizing cas word. * * @param published_task The task to publish on the bottom of the deque. - * @return The content of the cas word, can be later used to check if it changed. */ - traded_cas_field push_bot(task *published_task) { + void push_bot(task *published_task) { auto expected_stamp = bot_internal_.stamp; auto ¤t_entry = entries_[bot_internal_.value]; - // Store stealing information in the task and deque. - // Relaxed is fine for this, as adding elements is synced over the bot pointer. - current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed); + // Field that all threads synchronize on. + // This happens not in the deque itself, but in the published task. + traded_cas_field sync_cas_field; + sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); + published_task->external_trading_deque_cas_.store(sync_cas_field); - traded_cas_field new_cas_field; - new_cas_field.fill_with_stamp(expected_stamp, deque_id_); - published_task->traded_field_.store(new_cas_field, std::memory_order_relaxed); - current_entry.traded_task_.store(published_task, std::memory_order_relaxed); + // Publish the prepared task in the deque. + current_entry.forwarding_stamp_.store(expected_stamp); + current_entry.traded_task_.store(published_task); // Advance the bot pointer. Linearization point for making the task public. bot_internal_.stamp++; bot_internal_.value++; - bot_.store(bot_internal_.value, std::memory_order_release); + bot_.store(bot_internal_.value); + } + + void reset_bot_and_top() { + bot_internal_.value = 0; + bot_internal_.stamp++; - return new_cas_field; + bot_.store(0); + top_.store({bot_internal_.stamp, 0}); } - void popped_bot() { + void decrease_bot() { bot_internal_.value--; + bot_.store(bot_internal_.value); + } - bot_.store(bot_internal_.value, std::memory_order_relaxed); + /** + * Tries to pop the last task on the deque. + * + * @return optional holding the popped task if successful. + */ + optional pop_bot() { if (bot_internal_.value == 0) { - bot_internal_.stamp++; - top_.store({bot_internal_.stamp, 0}, std::memory_order_release); + reset_bot_and_top(); + return optional{}; } - } + decrease_bot(); - void empty_deque() { - bot_internal_.value = 0; - bot_internal_.stamp++; + auto ¤t_entry = entries_[bot_internal_.value]; + auto *popped_task = current_entry.traded_task_.load(); + auto expected_stamp = current_entry.forwarding_stamp_.load(); - // TODO: We might be able to relax memory orderings... - bot_.store(bot_internal_.value); - top_.store(bot_internal_); + // We know what value must be in the cas field if no other thread stole it. + traded_cas_field expected_sync_cas_field; + expected_sync_cas_field.fill_with_stamp(expected_stamp, thread_id_); + traded_cas_field empty_cas_field; + + if (popped_task->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, empty_cas_field)) { + return optional{popped_task}; + } else { + reset_bot_and_top(); + return optional{}; + } } - std::tuple, data_structures::stamped_integer> peek_top() { + struct peek_result { + peek_result(optional top_task, stamped_integer top_pointer) : top_task_{std::move(top_task)}, + top_pointer_{top_pointer} {}; + optional top_task_; + stamped_integer top_pointer_; + }; + + /** + * Peek at the current task on top of the deque. + * This is required, as we need to look at the task to figure out what we trade in for it. + * (Note: we could go without this by doing some tricks with top/bot pointers, but this + * is simpler and also more flexible if the traded objects are not as trivial as currently). + * + * @return a peek result containing the optional top task (if present) and the current head pointer. + */ + peek_result peek_top() { auto local_top = top_.load(); auto local_bot = bot_.load(); - if (local_top.value >= local_bot) { - return std::make_tuple(data_structures::optional{}, local_top); + if (local_top.value < local_bot) { + return peek_result{optional{entries_[local_top.value].traded_task_}, local_top}; } else { - return std::make_tuple(data_structures::optional{entries_[local_top.value].traded_task_}, local_top); + return peek_result{optional{}, local_top}; } } - data_structures::optional pop_top(task *trade_offer, data_structures::stamped_integer local_top) { + optional pop_top(task *offered_task, stamped_integer expected_top) { auto local_bot = bot_.load(); - if (local_top.value >= local_bot) { + if (expected_top.value >= local_bot) { return data_structures::optional{}; } - unsigned long expected_top_stamp = local_top.stamp; - auto &target_entry = entries_[local_top.value]; + auto &target_entry = entries_[expected_top.value]; // Read our potential result - task *result = target_entry.traded_task_.load(std::memory_order_relaxed); - unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(std::memory_order_relaxed); + task *result = target_entry.traded_task_.load(); + unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(); + + // Try to get it by CAS with the expected field entry, giving up our offered_task for it + traded_cas_field expected_sync_cas_field; + expected_sync_cas_field.fill_with_stamp(expected_top.stamp, thread_id_); - // Try to get it by CAS with the expected field entry, giving up our offered_object for it - traded_cas_field expected_field; - expected_field.fill_with_stamp(expected_top_stamp, deque_id_); traded_cas_field offered_field; - offered_field.fill_with_trade_object(trade_offer); + offered_field.fill_with_trade_object(offered_task); - if (result->traded_field_.compare_exchange_strong(expected_field, offered_field, std::memory_order_acq_rel)) { + if (result->external_trading_deque_cas_.compare_exchange_strong(expected_sync_cas_field, offered_field)) { // We got it, for sure move the top pointer forward. - top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); // Return the stolen task return data_structures::optional{result}; } else { // We did not get it...help forwarding the top pointer anyway. - if (expected_top_stamp == forwarding_stamp) { + if (expected_top.stamp == forwarding_stamp) { // ...move the pointer forward if someone else put a valid trade object in there. - top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1}); + top_.compare_exchange_strong(expected_top, {expected_top.stamp + 1, expected_top.value + 1}); } else { // ...we failed because the top tag lags behind...try to fix it. // This means only updating the tag, as this location can still hold data we need. - top_.compare_exchange_strong(local_top, {forwarding_stamp, local_top.value}); + top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value}); } return data_structures::optional{}; } } private: - trading_deque_entry *entries_; - size_t num_entries_; + // info on this deque + trading_deque_entry *const entries_; + const size_t num_entries_; + unsigned thread_id_; - unsigned deque_id_; + // fields for stealing/interacting + stamped_integer bot_internal_{0, 0}; - alignas(base::system_details::CACHE_LINE_SIZE) std::atomic top_{{0, 0}}; + alignas(base::system_details::CACHE_LINE_SIZE) std::atomic top_{{0, 0}}; alignas(base::system_details::CACHE_LINE_SIZE) std::atomic bot_{0}; - data_structures::stamped_integer bot_internal_{0, 0}; }; template diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index 4dc5881..8342113 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -55,7 +55,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { context_switcher::continuation continuation_; // Work-Stealing - std::atomic traded_field_{}; + std::atomic external_trading_deque_cas_{}; task *resource_stack_next_{}; std::atomic resource_stack_root_{{0, 0}}; bool clean_; diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index b4de8d7..7a1f3d6 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -38,6 +38,7 @@ class task_manager { for (size_t i = 0; i < num_tasks_; i++) { this_thread_tasks_[i].thread_id_ = id; } + deque_.set_thread_id(id); } task &get_active_task() { diff --git a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h index 84ce60e..14d4d2b 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager_impl.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager_impl.h @@ -27,30 +27,33 @@ void task_manager::spawn_child(F &&lambda) { last_task->continuation_ = std::move(cont); spawning_task_manager->active_task_ = this_task; - traded_cas_field expected_cas_value = spawning_task_manager->deque_.push_bot(last_task); - traded_cas_field empty_cas; - + spawning_task_manager->deque_.push_bot(last_task); lambda(); auto *syncing_task_manager = &thread_state::get().get_task_manager(); - if (last_task->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) { + auto pop_result = syncing_task_manager->deque_.pop_bot(); + if (pop_result) { // Fast path, simply continue execution where we left of before spawn. - // This requires no coordination with the resource stack. + PLS_ASSERT(*pop_result == last_task, + "Fast path, nothing can have changed until here."); + PLS_ASSERT(spawning_task_manager == syncing_task_manager, + "Fast path, nothing can have changed here."); + PLS_ASSERT(last_task->continuation_.valid(), + "Fast path, no one can have continued working on the last task."); + syncing_task_manager->active_task_ = last_task; - syncing_task_manager->deque_.popped_bot(); return std::move(last_task->continuation_); } else { // Slow path, the continuation was stolen. - // First empty our own deque (everything below must have been stolen already). - syncing_task_manager->deque_.empty_deque(); - context_switcher::continuation result_cont; if (syncing_task_manager->try_clean_return(result_cont)) { // We return back to the main scheduling loop - return result_cont; + PLS_ASSERT(result_cont.valid(), "Must only return valid continuations..."); + return std::move(result_cont); } else { // We finish up the last task and are the sole owner again - return result_cont; + PLS_ASSERT(result_cont.valid(), "Must only return valid continuations..."); + return std::move(result_cont); } } }); diff --git a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h index 0985d79..d94c8d4 100644 --- a/lib/pls/include/pls/internal/scheduling/traded_cas_field.h +++ b/lib/pls/include/pls/internal/scheduling/traded_cas_field.h @@ -45,17 +45,17 @@ struct traded_cas_field { public: void fill_with_stamp(unsigned long stamp, unsigned long deque_id) { - cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((stamp << ID_SHIFT) & ID_BITS) | STAMP_TAG); + cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((deque_id << ID_SHIFT) & ID_BITS) | STAMP_TAG); } unsigned long get_stamp() { - PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one."); + PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); return (((unsigned long) cas_integer_) & STAMP_BITS) >> STAMP_SHIFT; } unsigned long get_deque_id() { - PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one."); + PLS_ASSERT(is_filled_with_stamp(), "Must only read out the tag when the traded field contains one."); return (((unsigned long) cas_integer_) & ID_BITS) >> ID_SHIFT; } - bool is_filled_with_tag() { + bool is_filled_with_stamp() { return (((unsigned long) cas_integer_) & TAG_BITS) == STAMP_TAG; } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index ac85e6e..e579eff 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -90,17 +90,16 @@ void scheduler::work_thread_work_section() { // Move the traded in resource of this active task over to the stack of resources. auto *stolen_task = &my_task_manager.get_active_task(); - traded_cas_field stolen_task_cas = stolen_task->traded_field_.load(); - + traded_cas_field stolen_task_cas = stolen_task->external_trading_deque_cas_.load(); if (stolen_task_cas.is_filled_with_object()) { // Push the traded in resource on the resource stack to clear the traded_field for later steals/spawns. auto *exchanged_task = stolen_task_cas.get_trade_object(); - my_task_manager.push_resource_on_task(stolen_task, exchanged_task); + traded_cas_field empty_field; traded_cas_field expected_field; expected_field.fill_with_trade_object(exchanged_task); - if (stolen_task->traded_field_.compare_exchange_strong(expected_field, empty_field)) { + if (stolen_task->external_trading_deque_cas_.compare_exchange_strong(expected_field, empty_field)) { // All good, nothing more to do } else { // The last other active thread took it as its spare resource... diff --git a/lib/pls/src/internal/scheduling/task_manager.cpp b/lib/pls/src/internal/scheduling/task_manager.cpp index f538ce7..e49ce47 100644 --- a/lib/pls/src/internal/scheduling/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/task_manager.cpp @@ -36,8 +36,8 @@ bool task_manager::steal_task(task_manager &stealing_task_manager) { PLS_ASSERT(stealing_task_manager.active_task_->depth_ == 0, "Must only steal with clean task chain."); auto peek = deque_.peek_top(); - auto optional_target_task = std::get<0>(peek); - auto target_top = std::get<1>(peek); + auto optional_target_task = peek.top_task_; + auto target_top = peek.top_pointer_; if (optional_target_task) { PLS_ASSERT(stealing_task_manager.check_task_chain(), "We are stealing, must not have a bad chain here!"); @@ -56,7 +56,10 @@ bool task_manager::steal_task(task_manager &stealing_task_manager) { auto optional_result_task = deque_.pop_top(traded_task, target_top); if (optional_result_task) { - PLS_ASSERT(*optional_result_task == target_task, "We must only steal the task that we peeked at!"); + PLS_ASSERT(target_task->thread_id_ != traded_task->thread_id_, + "It is impossible to steal an task we already own!"); + PLS_ASSERT(*optional_result_task == target_task, + "We must only steal the task that we peeked at!"); // the steal was a success, link the chain so we own the stolen part target_task->next_ = next_own_task; next_own_task->prev_ = target_task; @@ -164,14 +167,11 @@ bool task_manager::try_clean_return(context_switcher::continuation &result_cont) task *clean_chain = pop_resource_from_task(last_task); if (clean_chain == nullptr) { // double-check if we are really last one or we only have unlucky timing - auto cas_field = last_task->traded_field_.load(); - if (cas_field.is_filled_with_object()) { - traded_cas_field empty_target; - if (last_task->traded_field_.compare_exchange_strong(cas_field, empty_target)) { - clean_chain = cas_field.get_trade_object(); - } else { - clean_chain = pop_resource_from_task(last_task); - } + auto optional_cas_task = external_trading_deque::get_trade_object(last_task); + if (optional_cas_task) { + clean_chain = *optional_cas_task; + } else { + clean_chain = pop_resource_from_task(last_task); } } diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index 6707436..ce82c9d 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -1,50 +1,95 @@ #include -#include -#include -#include +#include -#include "pls/internal/scheduling/scheduler.h" -#include "pls/internal/scheduling/cont.h" -#include "pls/internal/scheduling/cont_manager.h" -#include "pls/internal/scheduling/scheduler_memory.h" -#include "pls/internal/scheduling/parallel_result.h" +#include "pls/internal/scheduling/traded_cas_field.h" +#include "pls/internal/scheduling/task.h" +#include "pls/internal/scheduling/external_trading_deque.h" using namespace pls::internal::scheduling; -// TODO: Introduce actual tests once multiple threads work... -TEST_CASE("continuation stealing", "[internal/scheduling/cont_manager.h]") { - const int NUM_THREADS = 2; - const int NUM_TASKS = 8; - const int MAX_TASK_STACK_SIZE = 8; - const int NUM_CONTS = 8; - const int MAX_CONT_SIZE = 256; - - static_scheduler_memory static_scheduler_memory; - - scheduler scheduler{static_scheduler_memory, NUM_THREADS}; - - // Coordinate progress to match OUR order - std::atomic progress{0}; - - // Order: - // 0) work on first task on main thread - // 1) second thread stole right task - - scheduler.perform_work([&]() { - return scheduler::par([&]() { - while (progress.load() != 1); - return parallel_result{0}; - }, [&]() { - progress.store(1); - return parallel_result{0}; - }).then([&](int, int) { - - return parallel_result{0}; - }); - }); +TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas_field.h]") { + traded_cas_field empty_field; + REQUIRE(empty_field.is_empty()); + REQUIRE(!empty_field.is_filled_with_stamp()); + REQUIRE(!empty_field.is_filled_with_object()); + + const int stamp = 42; + const int ID = 10; + traded_cas_field tag_field; + tag_field.fill_with_stamp(stamp, ID); + REQUIRE(tag_field.is_filled_with_stamp()); + REQUIRE(!tag_field.is_empty()); + REQUIRE(!tag_field.is_filled_with_object()); + REQUIRE(tag_field.get_stamp() == stamp); + REQUIRE(tag_field.get_deque_id() == ID); + + alignas(64) task obj; + traded_cas_field obj_field; + obj_field.fill_with_trade_object(&obj); + REQUIRE(obj_field.is_filled_with_object()); + REQUIRE(!obj_field.is_empty()); + REQUIRE(!obj_field.is_filled_with_stamp()); +} + +TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque]") { + static_external_trading_deque<16> static_external_trading_deque_1; + external_trading_deque &deque_1 = static_external_trading_deque_1.get_deque(); + deque_1.set_thread_id(1); + + static_external_trading_deque<16> static_external_trading_deque_2; + external_trading_deque &deque_2 = static_external_trading_deque_2.get_deque(); + deque_2.set_thread_id(2); + + std::vector tasks(16); + + SECTION("basic operations") { + // Must start empty + REQUIRE(!deque_1.pop_bot()); + REQUIRE(!deque_2.pop_bot()); + + // Local push/pop + deque_1.push_bot(&tasks[0]); + REQUIRE(*deque_1.pop_bot() == &tasks[0]); + REQUIRE(!deque_1.pop_bot()); + + // Local push, external pop + deque_1.push_bot(&tasks[0]); + auto peek = deque_1.peek_top(); + REQUIRE(*deque_1.pop_top(&tasks[1], peek.top_pointer_) == &tasks[0]); + REQUIRE(*external_trading_deque::get_trade_object(&tasks[0]) == &tasks[1]); + REQUIRE(!deque_1.pop_top(&tasks[1], peek.top_pointer_)); + REQUIRE(!deque_1.pop_bot()); + + // Keeps push/pop order + deque_1.push_bot(&tasks[0]); + deque_1.push_bot(&tasks[1]); + REQUIRE(*deque_1.pop_bot() == &tasks[1]); + REQUIRE(*deque_1.pop_bot() == &tasks[0]); + REQUIRE(!deque_1.pop_bot()); + + deque_1.push_bot(&tasks[0]); + deque_1.push_bot(&tasks[1]); + auto peek1 = deque_1.peek_top(); + REQUIRE(*deque_1.pop_top(&tasks[2], peek1.top_pointer_) == &tasks[0]); + auto peek2 = deque_1.peek_top(); + REQUIRE(*deque_1.pop_top(&tasks[3], peek2.top_pointer_) == &tasks[1]); + } + + SECTION("Interwined execution #1") { + // Two top poppers + deque_1.push_bot(&tasks[0]); + auto peek1 = deque_1.peek_top(); + auto peek2 = deque_1.peek_top(); + REQUIRE(*deque_1.pop_top(&tasks[1], peek1.top_pointer_) == &tasks[0]); + REQUIRE(!deque_1.pop_top(&tasks[2], peek2.top_pointer_)); + } + + SECTION("Interwined execution #2") { + // Top and bottom access + deque_1.push_bot(&tasks[0]); + auto peek1 = deque_1.peek_top(); + REQUIRE(*deque_1.pop_bot() == &tasks[0]); + REQUIRE(!deque_1.pop_top(&tasks[2], peek1.top_pointer_)); + } } -- libgit2 0.26.0