From 731b47c504d173dab005d2f0a78f415e3414ead8 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Wed, 5 Feb 2020 18:15:13 +0100 Subject: [PATCH] WIP: Remove unneeded attributes from scheduler. --- app/benchmark_fib/main.cpp | 3 +-- lib/context_switcher/include/context_switcher/context_switcher.h | 46 +++++++++++++++++++++++++--------------------- lib/context_switcher/src/context_switcher.cpp | 7 ++++--- lib/pls/include/pls/internal/base/error_handling.h | 2 +- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 1 - lib/pls/include/pls/internal/scheduling/task.h | 9 --------- lib/pls/include/pls/internal/scheduling/thread_state.h | 2 -- lib/pls/src/internal/scheduling/task_manager.cpp | 2 -- 8 files changed, 31 insertions(+), 41 deletions(-) diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index a4c2a99..e9bf411 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -33,7 +33,7 @@ int pls_fib(int n) { constexpr int MAX_NUM_THREADS = 8; constexpr int MAX_NUM_TASKS = 32; -constexpr int MAX_STACK_SIZE = 1024 * 64; +constexpr int MAX_STACK_SIZE = 1024 * 1; static_scheduler_memory cached_fibers; +extern std::mutex shared_cache_mtx; +extern std::unordered_map shared_cached_fibers; +extern thread_local std::unordered_map local_cached_fibers; class lambda_capture_base { public: virtual continuation run(continuation_t cont) = 0; - virtual void deconstruct() = 0; + virtual ~lambda_capture_base() = default; }; template class lambda_capture : public lambda_capture_base { public: explicit lambda_capture(F lambda) : lambda_{lambda}, previous_tsan_fiber_{nullptr} {} - - void deconstruct() override { - //TODO: re-add deconstructor call -// ~lambda_capture(); - } + ~lambda_capture() override = default; continuation run(continuation_t cont) override { return lambda_(continuation{cont, previous_tsan_fiber_}); @@ -195,15 +192,14 @@ continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F && stack_pointer_t stack_base = stack_memory + stack_size; stack_pointer_t stack_limit = stack_memory; - auto *captured_lambda = place_lambda_capture(lambda, lambda_memory); - continuation_t fiber_cont; - void *fiber_tsan; - { - std::lock_guard lock{cache_mtx}; - - auto cache_result = cached_fibers.find(stack_memory); - if (cache_result == cached_fibers.end()) { - // No entry found...create it + auto local_cache_result = local_cached_fibers.find(stack_memory); + if (local_cache_result == local_cached_fibers.end()) { + // No local entry...fill our local cache up + std::lock_guard lock{shared_cache_mtx}; + + auto shared_cache_result = shared_cached_fibers.find(stack_memory); + if (shared_cache_result == shared_cached_fibers.end()) { + // No shared entry...we have a new fiber, create it fcontext::callback_t callback = context_loop; fcontext::continuation_t initial_context = fcontext::make_fcontext(stack_base, stack_base - stack_limit, callback); @@ -213,14 +209,22 @@ continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F && __tsan_switch_to_fiber(new_fiber, 0); fcontext::transfer_t fiber_context = fcontext::jump_fcontext(initial_context, old_fiber); - cached_fibers[stack_memory].tsan_fiber_ = new_fiber; - cached_fibers[stack_memory].entry_cont_ = fiber_context.continuation; + shared_cached_fibers[stack_memory].tsan_fiber_ = new_fiber; + shared_cached_fibers[stack_memory].entry_cont_ = fiber_context.continuation; } - fiber_tsan = cached_fibers[stack_memory].tsan_fiber_; - fiber_cont = cached_fibers[stack_memory].entry_cont_; + local_cached_fibers[stack_memory].tsan_fiber_ = shared_cached_fibers[stack_memory].tsan_fiber_; + local_cached_fibers[stack_memory].entry_cont_ = shared_cached_fibers[stack_memory].entry_cont_; } + // We always filled up our local cache with the required fiber here. + // After 'warmup' we will have seen all fibers and we avoid holding the lock between + // fiber switches. This is less for performance and more to not create artificial + // 'happens before' relationships between unrelated fibers. + continuation_t fiber_cont = local_cached_fibers[stack_memory].entry_cont_; + void *fiber_tsan = local_cached_fibers[stack_memory].tsan_fiber_; + + auto *captured_lambda = place_lambda_capture(lambda, lambda_memory); captured_lambda->set_previous_tsan_fiber(__tsan_get_current_fiber()); __tsan_switch_to_fiber(fiber_tsan, 0); fcontext::transfer_t transfer = fcontext::jump_fcontext(fiber_cont, captured_lambda); diff --git a/lib/context_switcher/src/context_switcher.cpp b/lib/context_switcher/src/context_switcher.cpp index fcc19f8..fa34c0e 100644 --- a/lib/context_switcher/src/context_switcher.cpp +++ b/lib/context_switcher/src/context_switcher.cpp @@ -26,8 +26,9 @@ continuation switch_context(continuation &&cont) { } #endif #ifdef THREAD_SANITIZER -std::mutex cache_mtx{}; -std::unordered_map cached_fibers{}; +std::mutex shared_cache_mtx{}; +std::unordered_map shared_cached_fibers{}; +thread_local std::unordered_map local_cached_fibers{}; void context_loop(fcontext::transfer_t initial_transfer) { continuation successor = continuation{initial_transfer.continuation, initial_transfer.data}; @@ -39,7 +40,7 @@ void context_loop(fcontext::transfer_t initial_transfer) { auto *user_code = static_cast(transfer.data); continuation_t last_cont_pointer = transfer.continuation; successor = user_code->run(last_cont_pointer); - user_code->deconstruct(); + user_code->~lambda_capture_base(); } while (successor.valid()); } diff --git a/lib/pls/include/pls/internal/base/error_handling.h b/lib/pls/include/pls/internal/base/error_handling.h index d21f751..bdb6466 100644 --- a/lib/pls/include/pls/internal/base/error_handling.h +++ b/lib/pls/include/pls/internal/base/error_handling.h @@ -16,6 +16,6 @@ void pls_error(const char *msg); // TODO: Distinguish between debug/internal asserts and production asserts. -#define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); } +#define PLS_ASSERT(cond, msg) // if (!(cond)) { pls_error(msg); } #endif //PLS_ERROR_HANDLING_H diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 2d6d347..45abc65 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -26,7 +26,6 @@ class scheduler::init_function_impl : public init_function { explicit init_function_impl(F &function) : function_{function} {} void run() override { auto &root_task = thread_state::get().get_task_manager().get_active_task(); - root_task.clean_ = true; root_task.run_as_task([&](context_switcher::continuation cont) { thread_state::get().set_main_continuation(std::move(cont)); function_(); diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index ae7680f..fa2977c 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -36,13 +36,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { thread_id_ = thread_id; } - context_switcher::continuation get_continuation() { - return std::move(continuation_); - } - void set_continuation(context_switcher::continuation &&continuation) { - continuation_ = std::move(continuation); - } - template context_switcher::continuation run_as_task(F &&lambda) { return context_switcher::enter_context(stack_memory_, stack_size_, std::forward(lambda)); @@ -58,10 +51,8 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { std::atomic external_trading_deque_cas_{}; std::atomic resource_stack_next_{}; std::atomic resource_stack_root_{{0, 0}}; - bool clean_; // Task Tree (we have a parent that we want to continue when we finish) - task *parent_task_; unsigned depth_; unsigned thread_id_; diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 82f425e..74fe5d9 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -22,7 +22,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { unsigned id_; task_manager &task_manager_; - alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) context_switcher::continuation main_loop_continuation_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; @@ -31,7 +30,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { scheduler_{nullptr}, id_{0}, task_manager_{task_manager}, - current_task_{nullptr}, random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count())} {}; /** diff --git a/lib/pls/src/internal/scheduling/task_manager.cpp b/lib/pls/src/internal/scheduling/task_manager.cpp index d7b35a2..2c73fe1 100644 --- a/lib/pls/src/internal/scheduling/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/task_manager.cpp @@ -138,11 +138,9 @@ void task_manager::sync() { context_switcher::continuation result_cont; if (spawning_task_manager->try_clean_return(result_cont)) { // We return back to the main scheduling loop - active_task_->clean_ = true; return result_cont; } else { // We finish up the last task - active_task_->clean_ = false; return result_cont; } }); -- libgit2 0.26.0