diff --git a/lib/context_switcher/CMakeLists.txt b/lib/context_switcher/CMakeLists.txt index eebcf0f..354cb26 100644 --- a/lib/context_switcher/CMakeLists.txt +++ b/lib/context_switcher/CMakeLists.txt @@ -50,10 +50,12 @@ add_library(context_switcher STATIC include/context_switcher/cscontext.h include/context_switcher/fcontext.h include/context_switcher/continuation.h - include/context_switcher/lambda_capture.h include/context_switcher/context.h) + include/context_switcher/context.h) if (CS_USE_CSCONTEXT) target_compile_definitions(context_switcher PUBLIC CS_USE_CSCONTEXT) +else () + target_compile_definitions(context_switcher PUBLIC CS_USE_FCONTEXT) endif () # Add everything in `./include` to be in the include path of this project diff --git a/lib/context_switcher/asm/cscontext/enter_context_x86_64_sysv_elf.s b/lib/context_switcher/asm/cscontext/enter_context_x86_64_sysv_elf.s index d76704a..3bd5315 100644 --- a/lib/context_switcher/asm/cscontext/enter_context_x86_64_sysv_elf.s +++ b/lib/context_switcher/asm/cscontext/enter_context_x86_64_sysv_elf.s @@ -5,6 +5,8 @@ .align 16 __cs_enter_context: + .cfi_startproc + .cfi_undefined rip # Parameter List (in order) # rdi = new stack pointer # rsi = first parameter to callback @@ -87,3 +89,4 @@ __cs_finish: # exit application call _exit@PLT hlt + .cfi_endproc diff --git a/lib/context_switcher/include/context_switcher/context.h b/lib/context_switcher/include/context_switcher/context.h index d35eac8..5f637af 100644 --- a/lib/context_switcher/include/context_switcher/context.h +++ b/lib/context_switcher/include/context_switcher/context.h @@ -8,6 +8,10 @@ #include "fcontext.h" #endif +/** + * Picks the correct context switch implementation + * selected by the build system/configuration. + */ namespace context_switcher { #ifdef CS_USE_CSCONTEXT diff --git a/lib/context_switcher/include/context_switcher/context_switcher.h b/lib/context_switcher/include/context_switcher/context_switcher.h index 57c6bf6..78dd01f 100644 --- a/lib/context_switcher/include/context_switcher/context_switcher.h +++ b/lib/context_switcher/include/context_switcher/context_switcher.h @@ -7,19 +7,66 @@ #include #include "context.h" - #include "continuation.h" -#include "lambda_capture.h" #ifdef THREAD_SANITIZER -#include "tsan_fiber_api.h" +#include "tsan_support.h" +#include +#include #endif +/** + * Exposes main interface to start a stackful coroutine and to + * switch between different context. + * + * Both cscontext and fcontext implementations require no global state + * (everything is passed over in the context transfer routines). + * + * Thread sanitizer support uses global state to cache coroutine handles, + * as recycling them frequently causes problems. To do this coroutines + * started with the same stack frame are 'recycled'. For the end user this + * means that the stack memory used by the coroutines MUST NOT be used by anything + * else between usages (you can pool your stack memory as long as you always use + * the same stacks to create coroutines). + */ namespace context_switcher { - +// cscontext implementation. +// Creates coroutines on the fly while entering them, making it very fast for +// extremely short lived routines (e.g. to implement a cactus stack like construct). +#ifdef CS_USE_CSCONTEXT continuation switch_context(continuation &&cont); template +struct lambda_capture { + template + explicit lambda_capture(FARG &&lambda) : lambda_{std::forward(lambda)} {} + + continuation operator()(continuation_t cont) { + return lambda_(continuation{cont}); + } + + private: + F lambda_; +}; + +template +static lambda_capture::type> *place_lambda_capture(F &&lambda, char *memory) { + return new(memory) lambda_capture::type>(std::forward(lambda)); +} + +template +cscontext::continuation_t lambda_capture_callback(cscontext::continuation_t continuation_pointer, + void *lambda_capture_param) { + // Perform Call + T *lambda_capture = reinterpret_cast(lambda_capture_param); + continuation cont = (*lambda_capture)(continuation_pointer); + + // Free resources and switch to result_continuation (this execution thread is finished with the return) + lambda_capture->~T(); + return cont.consume(); +} + +template continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&lambda) { stack_pointer_t lambda_memory = stack_memory + stack_size - sizeof(lambda_capture); auto *captured_lambda = place_lambda_capture(std::forward(lambda), lambda_memory); @@ -27,36 +74,164 @@ continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F && stack_pointer_t stack_base = lambda_memory - 64; stack_pointer_t stack_limit = stack_memory; -#ifdef CS_USE_CSCONTEXT cscontext::callback_t callback = lambda_capture_callback>; void *result = cscontext::__cs_enter_context(stack_base, captured_lambda, callback, stack_limit); return continuation{result}; -#else // CS_USE_CSCONTEXT +} +#endif + +// fcontext implementation (no thread sanitizer). +// Creates a new coroutine, then jumps control to it. +// This creation followed by a jump is slightly slower than the cscontext direct +// creation and jump, but the boost.context assembly is available for more plaforms. +#if defined(CS_USE_FCONTEXT) and not defined(THREAD_SANITIZER) +continuation switch_context(continuation &&cont); + +template +struct lambda_capture { + template + explicit lambda_capture(FARG &&lambda) : lambda_{std::forward(lambda)} {} + + continuation operator()(continuation_t cont) { + return lambda_(continuation{cont}); + } + + private: + F lambda_; +}; + +template +void lambda_capture_callback(fcontext::transfer_t transfer) { + // Perform Call + T *lambda_capture = reinterpret_cast(transfer.data); + continuation cont = (*lambda_capture)(transfer.continuation); + + // Free resources and switch to result_continuation (this execution thread is finished with the return) + lambda_capture->~T(); + + continuation_t cont_pointer = cont.consume(); + fcontext::jump_fcontext(cont_pointer, (void *) 0); +} + +template +static lambda_capture::type> *place_lambda_capture(F &&lambda, char *memory) { + return new(memory) lambda_capture::type>(std::forward(lambda)); +} + +template +continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&lambda) { + stack_pointer_t lambda_memory = stack_memory + stack_size - sizeof(lambda_capture); + auto *captured_lambda = place_lambda_capture(std::forward(lambda), lambda_memory); + + stack_pointer_t stack_base = lambda_memory - 64; + stack_pointer_t stack_limit = stack_memory; + fcontext::callback_t callback = lambda_capture_callback>; + fcontext::continuation_t new_context = fcontext::make_fcontext(stack_base, stack_base - stack_limit, callback); -#ifdef THREAD_SANITIZER - void *new_fiber = __tsan_create_fiber(0); - void *old_fiber = __tsan_get_current_fiber(); - captured_lambda->my_tsan_fiber_ = new_fiber; - captured_lambda->previous_tsan_fiber_ = old_fiber; - __tsan_switch_to_fiber(new_fiber, 0); fcontext::transfer_t transfer = fcontext::jump_fcontext(new_context, captured_lambda); if (transfer.data) { - return continuation{transfer.continuation, transfer.data}; + return continuation{transfer.continuation}; } else { - return continuation{nullptr, nullptr}; + return continuation{nullptr}; } -#else - fcontext::transfer_t transfer = fcontext::jump_fcontext(new_context, captured_lambda); +} +#endif + +// thread sanitizer implementation. +// Thread sanitizer, while having fiber support since mid 2019, has trouble +// with the creation/destruction of a high amount of short lived coroutines. +// To circumvent this we create one coroutine per stack base address and cache them in global variables. +// Please use with care and only for testing purposes. +#if defined(THREAD_SANITIZER) +continuation switch_context(continuation &&cont); +void context_loop(fcontext::transfer_t initial_transfer); + +struct cached_fiber { + continuation_t entry_cont_; + void *tsan_fiber_; +}; +extern std::mutex cache_mtx; +extern std::unordered_map cached_fibers; + +class lambda_capture_base { + public: + virtual continuation run(continuation_t cont) = 0; + virtual void deconstruct() = 0; +}; + +template +class lambda_capture : public lambda_capture_base { + public: + explicit lambda_capture(F lambda) : lambda_{lambda}, previous_tsan_fiber_{nullptr} {} + + void deconstruct() override { + //TODO: re-add deconstructor call +// ~lambda_capture(); + } + + continuation run(continuation_t cont) override { + return lambda_(continuation{cont, previous_tsan_fiber_}); + } + + void set_previous_tsan_fiber(void *tsan_fiber) { + previous_tsan_fiber_ = tsan_fiber; + } + + private: + F lambda_; + void *previous_tsan_fiber_; +}; + +template +static lambda_capture::type> *place_lambda_capture(F &&lambda, char *memory) { + return new(memory) lambda_capture::type>(std::forward(lambda)); +} + +template +continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&lambda) { + stack_pointer_t lambda_memory = stack_memory; + stack_pointer_t stack_base = stack_memory + stack_size; + stack_pointer_t stack_limit = stack_memory; + + auto *captured_lambda = place_lambda_capture(lambda, lambda_memory); + continuation_t fiber_cont; + void *fiber_tsan; + { + std::lock_guard lock{cache_mtx}; + + auto cache_result = cached_fibers.find(stack_memory); + if (cache_result == cached_fibers.end()) { + // No entry found...create it + fcontext::callback_t callback = context_loop; + fcontext::continuation_t + initial_context = fcontext::make_fcontext(stack_base, stack_base - stack_limit, callback); + void *new_fiber = __tsan_create_fiber(0); + void *old_fiber = __tsan_get_current_fiber(); + + __tsan_switch_to_fiber(new_fiber, 0); + fcontext::transfer_t fiber_context = fcontext::jump_fcontext(initial_context, old_fiber); + + cached_fibers[stack_memory].tsan_fiber_ = new_fiber; + cached_fibers[stack_memory].entry_cont_ = fiber_context.continuation; + } + + fiber_tsan = cached_fibers[stack_memory].tsan_fiber_; + fiber_cont = cached_fibers[stack_memory].entry_cont_; + } + + captured_lambda->set_previous_tsan_fiber(__tsan_get_current_fiber()); + __tsan_switch_to_fiber(fiber_tsan, 0); + fcontext::transfer_t transfer = fcontext::jump_fcontext(fiber_cont, captured_lambda); + if (transfer.data) { - return continuation{transfer.continuation}; + return continuation{transfer.continuation, transfer.data}; } else { - return continuation{nullptr}; + return continuation{nullptr, nullptr}; } -#endif // THREAD_SANITIZER -#endif // CS_USE_CSCONTEXT } +#endif } diff --git a/lib/context_switcher/include/context_switcher/lambda_capture.h b/lib/context_switcher/include/context_switcher/lambda_capture.h deleted file mode 100644 index 072a4b4..0000000 --- a/lib/context_switcher/include/context_switcher/lambda_capture.h +++ /dev/null @@ -1,86 +0,0 @@ - -#ifndef CONTEXT_SWITCHER_LAMBDA_CAPTURE_H_ -#define CONTEXT_SWITCHER_LAMBDA_CAPTURE_H_ - -#include -#include -#include - -#include "context.h" -#include "continuation.h" - -#include "tsan_fiber_api.h" - -/** - * Helpers to more easily use a lambda expression as the code run on a context switch. - * Captures lambdas by placing them in a memory region and offering a conversion form - * the assembly_bindings callback API to the user's lambda. - */ - -namespace context_switcher { - -template -struct lambda_capture { - template - explicit lambda_capture(FARG &&lambda) : lambda_{std::forward(lambda)} {} - -#ifdef THREAD_SANITIZER - continuation operator()(continuation_t cont) { - return lambda_(continuation{cont, previous_tsan_fiber_}); - } - void *previous_tsan_fiber_; - void *my_tsan_fiber_; -#else - continuation operator()(continuation_t cont) { - return lambda_(continuation{cont}); - } -#endif - - private: - F lambda_; -}; - -#ifdef CS_USE_CSCONTEXT -template -cscontext::continuation_t lambda_capture_callback(cscontext::continuation_t continuation_pointer, - void *lambda_capture_param) { - // Perform Call - T *lambda_capture = reinterpret_cast(lambda_capture_param); - continuation cont = (*lambda_capture)(continuation_pointer); - - // Free resources and switch to result_continuation (this execution thread is finished with the return) - lambda_capture->~T(); - return cont.consume(); -} -#else // CS_USE_CSCONTEXT -continuation switch_context(continuation &&cont); -template -void lambda_capture_callback(fcontext::transfer_t transfer) { - // Perform Call - T *lambda_capture = reinterpret_cast(transfer.data); - continuation cont = (*lambda_capture)(transfer.continuation); - - // Free resources and switch to result_continuation (this execution thread is finished with the return) - lambda_capture->~T(); - -#ifdef THREAD_SANITIZER - continuation_t cont_pointer = cont.consume(); - void *next_fiber = cont.get_tsan_fiber(); - __tsan_switch_to_fiber(next_fiber, 0); - __tsan_destroy_fiber(lambda_capture->my_tsan_fiber_); - fcontext::jump_fcontext(cont_pointer, (void*)0); -#else // THREAD_SANITIZER - continuation_t cont_pointer = cont.consume(); - fcontext::jump_fcontext(cont_pointer, (void *) 0); -#endif // THREAD_SANITIZER -} -#endif // CS_USE_CSCONTEXT - -template -static lambda_capture::type> *place_lambda_capture(F &&lambda, char *memory) { - return new(memory) lambda_capture::type>(std::forward(lambda)); -} - -} - -#endif //CONTEXT_SWITCHER_LAMBDA_CAPTURE_H_ diff --git a/lib/context_switcher/include/context_switcher/tsan_fiber_api.h b/lib/context_switcher/include/context_switcher/tsan_support.h similarity index 76% rename from lib/context_switcher/include/context_switcher/tsan_fiber_api.h rename to lib/context_switcher/include/context_switcher/tsan_support.h index 04df6f0..a5f60a0 100644 --- a/lib/context_switcher/include/context_switcher/tsan_fiber_api.h +++ b/lib/context_switcher/include/context_switcher/tsan_support.h @@ -1,6 +1,6 @@ -#ifndef PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ -#define PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ +#ifndef CONTEXT_SWITCHER_TSAN_SUPPORT +#define CONTEXT_SWITCHER_TSAN_SUPPORT extern "C" { // Fiber switching API. @@ -18,4 +18,4 @@ void __tsan_switch_to_fiber(void *fiber, unsigned flags); void __tsan_set_fiber_name(void *fiber, const char *name); }; -#endif //PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ +#endif //CONTEXT_SWITCHER_TSAN_SUPPORT diff --git a/lib/context_switcher/src/context_switcher.cpp b/lib/context_switcher/src/context_switcher.cpp index 93be111..fcc19f8 100644 --- a/lib/context_switcher/src/context_switcher.cpp +++ b/lib/context_switcher/src/context_switcher.cpp @@ -1,7 +1,7 @@ #include "context_switcher/context_switcher.h" #ifdef THREAD_SANITIZER -#include "context_switcher/tsan_fiber_api.h" +#include "context_switcher/tsan_support.h" #endif namespace context_switcher { @@ -12,31 +12,50 @@ continuation switch_context(continuation &&cont) { continuation_t result = cscontext::__cs_switch_context(cont_pointer); return continuation{result}; } -#else -#ifdef THREAD_SANITIZER +#endif +#if defined(CS_USE_FCONTEXT) and not defined(THREAD_SANITIZER) continuation switch_context(continuation &&cont) { continuation_t cont_pointer = cont.consume(); - void *next_fiber = cont.get_tsan_fiber(); - void *old_fiber = __tsan_get_current_fiber(); - __tsan_switch_to_fiber(next_fiber, 0); - fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, old_fiber); + fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, (void *) 1); + if (transfer.data) { - return continuation{transfer.continuation, transfer.data}; + return continuation{transfer.continuation}; } else { - return continuation{nullptr, 0}; + return continuation{nullptr}; } } -#else +#endif +#ifdef THREAD_SANITIZER +std::mutex cache_mtx{}; +std::unordered_map cached_fibers{}; + +void context_loop(fcontext::transfer_t initial_transfer) { + continuation successor = continuation{initial_transfer.continuation, initial_transfer.data}; + do { + continuation_t cont_pointer = successor.consume(); + __tsan_switch_to_fiber(successor.get_tsan_fiber(), 0); + fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, nullptr); + + auto *user_code = static_cast(transfer.data); + continuation_t last_cont_pointer = transfer.continuation; + successor = user_code->run(last_cont_pointer); + user_code->deconstruct(); + } while (successor.valid()); +} + continuation switch_context(continuation &&cont) { continuation_t cont_pointer = cont.consume(); - fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, (void *) 1); + void *next_fiber = cont.get_tsan_fiber(); + void *last_fiber = __tsan_get_current_fiber(); + __tsan_switch_to_fiber(next_fiber, 0); + fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, last_fiber); + if (transfer.data) { - return continuation{transfer.continuation}; + return continuation{transfer.continuation, transfer.data}; } else { - return continuation{nullptr}; + return continuation{nullptr, nullptr}; } } #endif -#endif }