Commit f347849f by FritzFlorian

WIP: Add workaround for tsan short lived fiber support.

Tsan does not cope well with rapidely destroyed/created fibers. As it is currently too much effort to fully investigate the tsan issue we work around it by caching the shourt lived fibers based on their stack base address. This allows us to use thread sanitizer for now.
parent c25e6134
Pipeline #1400 failed with stages
in 39 seconds
...@@ -50,10 +50,12 @@ add_library(context_switcher STATIC ...@@ -50,10 +50,12 @@ add_library(context_switcher STATIC
include/context_switcher/cscontext.h include/context_switcher/cscontext.h
include/context_switcher/fcontext.h include/context_switcher/fcontext.h
include/context_switcher/continuation.h include/context_switcher/continuation.h
include/context_switcher/lambda_capture.h include/context_switcher/context.h) include/context_switcher/context.h)
if (CS_USE_CSCONTEXT) if (CS_USE_CSCONTEXT)
target_compile_definitions(context_switcher PUBLIC CS_USE_CSCONTEXT) target_compile_definitions(context_switcher PUBLIC CS_USE_CSCONTEXT)
else ()
target_compile_definitions(context_switcher PUBLIC CS_USE_FCONTEXT)
endif () endif ()
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
......
...@@ -5,6 +5,8 @@ ...@@ -5,6 +5,8 @@
.align 16 .align 16
__cs_enter_context: __cs_enter_context:
.cfi_startproc
.cfi_undefined rip
# Parameter List (in order) # Parameter List (in order)
# rdi = new stack pointer # rdi = new stack pointer
# rsi = first parameter to callback # rsi = first parameter to callback
...@@ -87,3 +89,4 @@ __cs_finish: ...@@ -87,3 +89,4 @@ __cs_finish:
# exit application # exit application
call _exit@PLT call _exit@PLT
hlt hlt
.cfi_endproc
...@@ -8,6 +8,10 @@ ...@@ -8,6 +8,10 @@
#include "fcontext.h" #include "fcontext.h"
#endif #endif
/**
* Picks the correct context switch implementation
* selected by the build system/configuration.
*/
namespace context_switcher { namespace context_switcher {
#ifdef CS_USE_CSCONTEXT #ifdef CS_USE_CSCONTEXT
......
...@@ -7,19 +7,66 @@ ...@@ -7,19 +7,66 @@
#include <new> #include <new>
#include "context.h" #include "context.h"
#include "continuation.h" #include "continuation.h"
#include "lambda_capture.h"
#ifdef THREAD_SANITIZER #ifdef THREAD_SANITIZER
#include "tsan_fiber_api.h" #include "tsan_support.h"
#include <mutex>
#include <unordered_map>
#endif #endif
/**
* Exposes main interface to start a stackful coroutine and to
* switch between different context.
*
* Both cscontext and fcontext implementations require no global state
* (everything is passed over in the context transfer routines).
*
* Thread sanitizer support uses global state to cache coroutine handles,
* as recycling them frequently causes problems. To do this coroutines
* started with the same stack frame are 'recycled'. For the end user this
* means that the stack memory used by the coroutines MUST NOT be used by anything
* else between usages (you can pool your stack memory as long as you always use
* the same stacks to create coroutines).
*/
namespace context_switcher { namespace context_switcher {
// cscontext implementation.
// Creates coroutines on the fly while entering them, making it very fast for
// extremely short lived routines (e.g. to implement a cactus stack like construct).
#ifdef CS_USE_CSCONTEXT
continuation switch_context(continuation &&cont); continuation switch_context(continuation &&cont);
template<typename F> template<typename F>
struct lambda_capture {
template<typename FARG>
explicit lambda_capture(FARG &&lambda) : lambda_{std::forward<FARG>(lambda)} {}
continuation operator()(continuation_t cont) {
return lambda_(continuation{cont});
}
private:
F lambda_;
};
template<typename F>
static lambda_capture<typename std::remove_reference<F>::type> *place_lambda_capture(F &&lambda, char *memory) {
return new(memory) lambda_capture<typename std::remove_reference<F>::type>(std::forward<F>(lambda));
}
template<typename T>
cscontext::continuation_t lambda_capture_callback(cscontext::continuation_t continuation_pointer,
void *lambda_capture_param) {
// Perform Call
T *lambda_capture = reinterpret_cast<T *>(lambda_capture_param);
continuation cont = (*lambda_capture)(continuation_pointer);
// Free resources and switch to result_continuation (this execution thread is finished with the return)
lambda_capture->~T();
return cont.consume();
}
template<typename F>
continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&lambda) { continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&lambda) {
stack_pointer_t lambda_memory = stack_memory + stack_size - sizeof(lambda_capture<F>); stack_pointer_t lambda_memory = stack_memory + stack_size - sizeof(lambda_capture<F>);
auto *captured_lambda = place_lambda_capture(std::forward<F>(lambda), lambda_memory); auto *captured_lambda = place_lambda_capture(std::forward<F>(lambda), lambda_memory);
...@@ -27,36 +74,164 @@ continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F && ...@@ -27,36 +74,164 @@ continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&
stack_pointer_t stack_base = lambda_memory - 64; stack_pointer_t stack_base = lambda_memory - 64;
stack_pointer_t stack_limit = stack_memory; stack_pointer_t stack_limit = stack_memory;
#ifdef CS_USE_CSCONTEXT
cscontext::callback_t callback = lambda_capture_callback<lambda_capture<F>>; cscontext::callback_t callback = lambda_capture_callback<lambda_capture<F>>;
void *result = cscontext::__cs_enter_context(stack_base, captured_lambda, callback, stack_limit); void *result = cscontext::__cs_enter_context(stack_base, captured_lambda, callback, stack_limit);
return continuation{result}; return continuation{result};
#else // CS_USE_CSCONTEXT }
#endif
// fcontext implementation (no thread sanitizer).
// Creates a new coroutine, then jumps control to it.
// This creation followed by a jump is slightly slower than the cscontext direct
// creation and jump, but the boost.context assembly is available for more plaforms.
#if defined(CS_USE_FCONTEXT) and not defined(THREAD_SANITIZER)
continuation switch_context(continuation &&cont);
template<typename F>
struct lambda_capture {
template<typename FARG>
explicit lambda_capture(FARG &&lambda) : lambda_{std::forward<FARG>(lambda)} {}
continuation operator()(continuation_t cont) {
return lambda_(continuation{cont});
}
private:
F lambda_;
};
template<typename T>
void lambda_capture_callback(fcontext::transfer_t transfer) {
// Perform Call
T *lambda_capture = reinterpret_cast<T *>(transfer.data);
continuation cont = (*lambda_capture)(transfer.continuation);
// Free resources and switch to result_continuation (this execution thread is finished with the return)
lambda_capture->~T();
continuation_t cont_pointer = cont.consume();
fcontext::jump_fcontext(cont_pointer, (void *) 0);
}
template<typename F>
static lambda_capture<typename std::remove_reference<F>::type> *place_lambda_capture(F &&lambda, char *memory) {
return new(memory) lambda_capture<typename std::remove_reference<F>::type>(std::forward<F>(lambda));
}
template<typename F>
continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&lambda) {
stack_pointer_t lambda_memory = stack_memory + stack_size - sizeof(lambda_capture<F>);
auto *captured_lambda = place_lambda_capture(std::forward<F>(lambda), lambda_memory);
stack_pointer_t stack_base = lambda_memory - 64;
stack_pointer_t stack_limit = stack_memory;
fcontext::callback_t callback = lambda_capture_callback<lambda_capture<F>>; fcontext::callback_t callback = lambda_capture_callback<lambda_capture<F>>;
fcontext::continuation_t new_context = fcontext::make_fcontext(stack_base, stack_base - stack_limit, callback); fcontext::continuation_t new_context = fcontext::make_fcontext(stack_base, stack_base - stack_limit, callback);
#ifdef THREAD_SANITIZER
void *new_fiber = __tsan_create_fiber(0);
void *old_fiber = __tsan_get_current_fiber();
captured_lambda->my_tsan_fiber_ = new_fiber;
captured_lambda->previous_tsan_fiber_ = old_fiber;
__tsan_switch_to_fiber(new_fiber, 0);
fcontext::transfer_t transfer = fcontext::jump_fcontext(new_context, captured_lambda); fcontext::transfer_t transfer = fcontext::jump_fcontext(new_context, captured_lambda);
if (transfer.data) { if (transfer.data) {
return continuation{transfer.continuation, transfer.data}; return continuation{transfer.continuation};
} else { } else {
return continuation{nullptr, nullptr}; return continuation{nullptr};
} }
#else }
fcontext::transfer_t transfer = fcontext::jump_fcontext(new_context, captured_lambda); #endif
// thread sanitizer implementation.
// Thread sanitizer, while having fiber support since mid 2019, has trouble
// with the creation/destruction of a high amount of short lived coroutines.
// To circumvent this we create one coroutine per stack base address and cache them in global variables.
// Please use with care and only for testing purposes.
#if defined(THREAD_SANITIZER)
continuation switch_context(continuation &&cont);
void context_loop(fcontext::transfer_t initial_transfer);
struct cached_fiber {
continuation_t entry_cont_;
void *tsan_fiber_;
};
extern std::mutex cache_mtx;
extern std::unordered_map<char *, cached_fiber> cached_fibers;
class lambda_capture_base {
public:
virtual continuation run(continuation_t cont) = 0;
virtual void deconstruct() = 0;
};
template<typename F>
class lambda_capture : public lambda_capture_base {
public:
explicit lambda_capture(F lambda) : lambda_{lambda}, previous_tsan_fiber_{nullptr} {}
void deconstruct() override {
//TODO: re-add deconstructor call
// ~lambda_capture<F>();
}
continuation run(continuation_t cont) override {
return lambda_(continuation{cont, previous_tsan_fiber_});
}
void set_previous_tsan_fiber(void *tsan_fiber) {
previous_tsan_fiber_ = tsan_fiber;
}
private:
F lambda_;
void *previous_tsan_fiber_;
};
template<typename F>
static lambda_capture<typename std::remove_reference<F>::type> *place_lambda_capture(F &&lambda, char *memory) {
return new(memory) lambda_capture<typename std::remove_reference<F>::type>(std::forward<F>(lambda));
}
template<typename F>
continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&lambda) {
stack_pointer_t lambda_memory = stack_memory;
stack_pointer_t stack_base = stack_memory + stack_size;
stack_pointer_t stack_limit = stack_memory;
auto *captured_lambda = place_lambda_capture(lambda, lambda_memory);
continuation_t fiber_cont;
void *fiber_tsan;
{
std::lock_guard<std::mutex> lock{cache_mtx};
auto cache_result = cached_fibers.find(stack_memory);
if (cache_result == cached_fibers.end()) {
// No entry found...create it
fcontext::callback_t callback = context_loop;
fcontext::continuation_t
initial_context = fcontext::make_fcontext(stack_base, stack_base - stack_limit, callback);
void *new_fiber = __tsan_create_fiber(0);
void *old_fiber = __tsan_get_current_fiber();
__tsan_switch_to_fiber(new_fiber, 0);
fcontext::transfer_t fiber_context = fcontext::jump_fcontext(initial_context, old_fiber);
cached_fibers[stack_memory].tsan_fiber_ = new_fiber;
cached_fibers[stack_memory].entry_cont_ = fiber_context.continuation;
}
fiber_tsan = cached_fibers[stack_memory].tsan_fiber_;
fiber_cont = cached_fibers[stack_memory].entry_cont_;
}
captured_lambda->set_previous_tsan_fiber(__tsan_get_current_fiber());
__tsan_switch_to_fiber(fiber_tsan, 0);
fcontext::transfer_t transfer = fcontext::jump_fcontext(fiber_cont, captured_lambda);
if (transfer.data) { if (transfer.data) {
return continuation{transfer.continuation}; return continuation{transfer.continuation, transfer.data};
} else { } else {
return continuation{nullptr}; return continuation{nullptr, nullptr};
} }
#endif // THREAD_SANITIZER
#endif // CS_USE_CSCONTEXT
} }
#endif
} }
......
#ifndef CONTEXT_SWITCHER_LAMBDA_CAPTURE_H_
#define CONTEXT_SWITCHER_LAMBDA_CAPTURE_H_
#include <utility>
#include <new>
#include <type_traits>
#include "context.h"
#include "continuation.h"
#include "tsan_fiber_api.h"
/**
* Helpers to more easily use a lambda expression as the code run on a context switch.
* Captures lambdas by placing them in a memory region and offering a conversion form
* the assembly_bindings callback API to the user's lambda.
*/
namespace context_switcher {
template<typename F>
struct lambda_capture {
template<typename FARG>
explicit lambda_capture(FARG &&lambda) : lambda_{std::forward<FARG>(lambda)} {}
#ifdef THREAD_SANITIZER
continuation operator()(continuation_t cont) {
return lambda_(continuation{cont, previous_tsan_fiber_});
}
void *previous_tsan_fiber_;
void *my_tsan_fiber_;
#else
continuation operator()(continuation_t cont) {
return lambda_(continuation{cont});
}
#endif
private:
F lambda_;
};
#ifdef CS_USE_CSCONTEXT
template<typename T>
cscontext::continuation_t lambda_capture_callback(cscontext::continuation_t continuation_pointer,
void *lambda_capture_param) {
// Perform Call
T *lambda_capture = reinterpret_cast<T *>(lambda_capture_param);
continuation cont = (*lambda_capture)(continuation_pointer);
// Free resources and switch to result_continuation (this execution thread is finished with the return)
lambda_capture->~T();
return cont.consume();
}
#else // CS_USE_CSCONTEXT
continuation switch_context(continuation &&cont);
template<typename T>
void lambda_capture_callback(fcontext::transfer_t transfer) {
// Perform Call
T *lambda_capture = reinterpret_cast<T *>(transfer.data);
continuation cont = (*lambda_capture)(transfer.continuation);
// Free resources and switch to result_continuation (this execution thread is finished with the return)
lambda_capture->~T();
#ifdef THREAD_SANITIZER
continuation_t cont_pointer = cont.consume();
void *next_fiber = cont.get_tsan_fiber();
__tsan_switch_to_fiber(next_fiber, 0);
__tsan_destroy_fiber(lambda_capture->my_tsan_fiber_);
fcontext::jump_fcontext(cont_pointer, (void*)0);
#else // THREAD_SANITIZER
continuation_t cont_pointer = cont.consume();
fcontext::jump_fcontext(cont_pointer, (void *) 0);
#endif // THREAD_SANITIZER
}
#endif // CS_USE_CSCONTEXT
template<typename F>
static lambda_capture<typename std::remove_reference<F>::type> *place_lambda_capture(F &&lambda, char *memory) {
return new(memory) lambda_capture<typename std::remove_reference<F>::type>(std::forward<F>(lambda));
}
}
#endif //CONTEXT_SWITCHER_LAMBDA_CAPTURE_H_
#ifndef PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ #ifndef CONTEXT_SWITCHER_TSAN_SUPPORT
#define PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ #define CONTEXT_SWITCHER_TSAN_SUPPORT
extern "C" { extern "C" {
// Fiber switching API. // Fiber switching API.
...@@ -18,4 +18,4 @@ void __tsan_switch_to_fiber(void *fiber, unsigned flags); ...@@ -18,4 +18,4 @@ void __tsan_switch_to_fiber(void *fiber, unsigned flags);
void __tsan_set_fiber_name(void *fiber, const char *name); void __tsan_set_fiber_name(void *fiber, const char *name);
}; };
#endif //PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_HELPERS_TSAN_FIBER_API_H_ #endif //CONTEXT_SWITCHER_TSAN_SUPPORT
#include "context_switcher/context_switcher.h" #include "context_switcher/context_switcher.h"
#ifdef THREAD_SANITIZER #ifdef THREAD_SANITIZER
#include "context_switcher/tsan_fiber_api.h" #include "context_switcher/tsan_support.h"
#endif #endif
namespace context_switcher { namespace context_switcher {
...@@ -12,31 +12,50 @@ continuation switch_context(continuation &&cont) { ...@@ -12,31 +12,50 @@ continuation switch_context(continuation &&cont) {
continuation_t result = cscontext::__cs_switch_context(cont_pointer); continuation_t result = cscontext::__cs_switch_context(cont_pointer);
return continuation{result}; return continuation{result};
} }
#else #endif
#ifdef THREAD_SANITIZER #if defined(CS_USE_FCONTEXT) and not defined(THREAD_SANITIZER)
continuation switch_context(continuation &&cont) { continuation switch_context(continuation &&cont) {
continuation_t cont_pointer = cont.consume(); continuation_t cont_pointer = cont.consume();
void *next_fiber = cont.get_tsan_fiber(); fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, (void *) 1);
void *old_fiber = __tsan_get_current_fiber();
__tsan_switch_to_fiber(next_fiber, 0);
fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, old_fiber);
if (transfer.data) { if (transfer.data) {
return continuation{transfer.continuation, transfer.data}; return continuation{transfer.continuation};
} else { } else {
return continuation{nullptr, 0}; return continuation{nullptr};
} }
} }
#else #endif
#ifdef THREAD_SANITIZER
std::mutex cache_mtx{};
std::unordered_map<char *, cached_fiber> cached_fibers{};
void context_loop(fcontext::transfer_t initial_transfer) {
continuation successor = continuation{initial_transfer.continuation, initial_transfer.data};
do {
continuation_t cont_pointer = successor.consume();
__tsan_switch_to_fiber(successor.get_tsan_fiber(), 0);
fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, nullptr);
auto *user_code = static_cast<lambda_capture_base *>(transfer.data);
continuation_t last_cont_pointer = transfer.continuation;
successor = user_code->run(last_cont_pointer);
user_code->deconstruct();
} while (successor.valid());
}
continuation switch_context(continuation &&cont) { continuation switch_context(continuation &&cont) {
continuation_t cont_pointer = cont.consume(); continuation_t cont_pointer = cont.consume();
fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, (void *) 1); void *next_fiber = cont.get_tsan_fiber();
void *last_fiber = __tsan_get_current_fiber();
__tsan_switch_to_fiber(next_fiber, 0);
fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, last_fiber);
if (transfer.data) { if (transfer.data) {
return continuation{transfer.continuation}; return continuation{transfer.continuation, transfer.data};
} else { } else {
return continuation{nullptr}; return continuation{nullptr, nullptr};
} }
} }
#endif #endif
#endif
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment