Commit f8ab8e0a by FritzFlorian

Change tsan integration to not 'cache' fibers.

We fixed the bug in tsan causing it to crash after creating/deleting many fibers, because of that there is no need for this cache mechanism (you have to use the most recent clang build with the patch for it to work thought).
parent 1e1e08a9
Pipeline #1416 failed with stages
in 37 seconds
......@@ -10,9 +10,7 @@
#include "continuation.h"
#ifdef THREAD_SANITIZER
#include "tsan_support.h"
#include <mutex>
#include <unordered_map>
#include <sanitizer/tsan_interface.h>
#endif
/**
......@@ -149,41 +147,39 @@ continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&
// Please use with care and only for testing purposes.
#if defined(THREAD_SANITIZER)
continuation switch_context(continuation &&cont);
void context_loop(fcontext::transfer_t initial_transfer);
struct cached_fiber {
continuation_t entry_cont_;
void *tsan_fiber_;
};
extern std::mutex shared_cache_mtx;
extern std::unordered_map<char *, cached_fiber> shared_cached_fibers;
extern thread_local std::unordered_map<char *, cached_fiber> local_cached_fibers;
class lambda_capture_base {
public:
virtual continuation run(continuation_t cont) = 0;
virtual ~lambda_capture_base() = default;
};
template<typename F>
class lambda_capture : public lambda_capture_base {
public:
explicit lambda_capture(F lambda) : lambda_{lambda}, previous_tsan_fiber_{nullptr} {}
~lambda_capture() override = default;
continuation run(continuation_t cont) override {
return lambda_(continuation{cont, previous_tsan_fiber_});
}
struct lambda_capture {
template<typename FARG>
explicit lambda_capture(FARG &&lambda) : lambda_{std::forward<FARG>(lambda)} {}
void set_previous_tsan_fiber(void *tsan_fiber) {
previous_tsan_fiber_ = tsan_fiber;
continuation operator()(continuation &&cont) {
return lambda_(std::move(cont));
}
private:
F lambda_;
void *previous_tsan_fiber_;
};
template<typename T>
void lambda_capture_callback(fcontext::transfer_t transfer) {
fcontext::fiber_data received_fiber_data = *(fcontext::fiber_data *) transfer.data;
// Perform Call
T *lambda_capture = reinterpret_cast<T *>(received_fiber_data.data);
continuation cont = (*lambda_capture)({transfer.continuation, received_fiber_data.old_fiber});
// Free resources and switch to result_continuation (this execution thread is finished with the return)
lambda_capture->~T();
continuation_t cont_pointer = cont.consume();
void *last_fiber = __tsan_get_current_fiber();
fcontext::fiber_data send_fiber_data{last_fiber, nullptr, true};
__tsan_switch_to_fiber(cont.get_tsan_fiber(), 0);
fcontext::jump_fcontext(cont_pointer, &send_fiber_data);
}
template<typename F>
static lambda_capture<typename std::remove_reference<F>::type> *place_lambda_capture(F &&lambda, char *memory) {
return new(memory) lambda_capture<typename std::remove_reference<F>::type>(std::forward<F>(lambda));
......@@ -191,51 +187,28 @@ static lambda_capture<typename std::remove_reference<F>::type> *place_lambda_cap
template<typename F>
continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&lambda) {
stack_pointer_t lambda_memory = stack_memory;
stack_pointer_t stack_base = stack_memory + stack_size;
stack_pointer_t stack_limit = stack_memory;
stack_pointer_t lambda_memory = stack_memory + stack_size - sizeof(lambda_capture<F>);
auto *captured_lambda = place_lambda_capture(std::forward<F>(lambda), lambda_memory);
auto local_cache_result = local_cached_fibers.find(stack_memory);
if (local_cache_result == local_cached_fibers.end()) {
// No local entry...fill our local cache up
std::lock_guard<std::mutex> lock{shared_cache_mtx};
auto shared_cache_result = shared_cached_fibers.find(stack_memory);
if (shared_cache_result == shared_cached_fibers.end()) {
// No shared entry...we have a new fiber, create it
fcontext::callback_t callback = context_loop;
fcontext::continuation_t
initial_context = fcontext::make_fcontext(stack_base, stack_base - stack_limit, callback);
void *new_fiber = __tsan_create_fiber(0);
void *old_fiber = __tsan_get_current_fiber();
__tsan_switch_to_fiber(new_fiber, 0);
fcontext::transfer_t fiber_context = fcontext::jump_fcontext(initial_context, old_fiber);
shared_cached_fibers[stack_memory].tsan_fiber_ = new_fiber;
shared_cached_fibers[stack_memory].entry_cont_ = fiber_context.continuation;
}
stack_pointer_t stack_base = lambda_memory - 64;
stack_pointer_t stack_limit = stack_memory;
local_cached_fibers[stack_memory].tsan_fiber_ = shared_cached_fibers[stack_memory].tsan_fiber_;
local_cached_fibers[stack_memory].entry_cont_ = shared_cached_fibers[stack_memory].entry_cont_;
}
fcontext::callback_t callback = lambda_capture_callback<lambda_capture<F>>;
fcontext::continuation_t new_context = fcontext::make_fcontext(stack_base, stack_base - stack_limit, callback);
// We always filled up our local cache with the required fiber here.
// After 'warmup' we will have seen all fibers and we avoid holding the lock between
// fiber switches. This is less for performance and more to not create artificial
// 'happens before' relationships between unrelated fibers.
continuation_t fiber_cont = local_cached_fibers[stack_memory].entry_cont_;
void *fiber_tsan = local_cached_fibers[stack_memory].tsan_fiber_;
void *next_fiber = __tsan_create_fiber(0);
void *last_fiber = __tsan_get_current_fiber();
fcontext::fiber_data send_fiber_data{last_fiber, captured_lambda, false};
auto *captured_lambda = place_lambda_capture(lambda, lambda_memory);
captured_lambda->set_previous_tsan_fiber(__tsan_get_current_fiber());
__tsan_switch_to_fiber(fiber_tsan, 0);
fcontext::transfer_t transfer = fcontext::jump_fcontext(fiber_cont, captured_lambda);
__tsan_switch_to_fiber(next_fiber, 0);
fcontext::transfer_t transfer = fcontext::jump_fcontext(new_context, &send_fiber_data);
fcontext::fiber_data received_fiber_data = *(fcontext::fiber_data *) transfer.data;
if (transfer.data) {
return continuation{transfer.continuation, transfer.data};
} else {
if (received_fiber_data.finished) {
__tsan_destroy_fiber(received_fiber_data.old_fiber);
return continuation{nullptr, nullptr};
} else {
return continuation{transfer.continuation, received_fiber_data.old_fiber};
}
}
#endif
......
......@@ -21,6 +21,14 @@ struct transfer_t {
void *data;
};
#ifdef THREAD_SANITIZER
struct fiber_data {
void *old_fiber;
void *data;
bool finished;
};
#endif
using callback_t = void (*)(transfer_t);
extern "C" {
......
#ifndef CONTEXT_SWITCHER_TSAN_SUPPORT
#define CONTEXT_SWITCHER_TSAN_SUPPORT
extern "C" {
// Fiber switching API.
// - TSAN context for fiber can be created by __tsan_create_fiber
// and freed by __tsan_destroy_fiber.
// - TSAN context of current fiber or thread can be obtained
// by calling __tsan_get_current_fiber.
// - __tsan_switch_to_fiber should be called immediatly before switch
// to fiber, such as call of swapcontext.
// - Fiber name can be set by __tsan_set_fiber_name.
void *__tsan_get_current_fiber(void);
void *__tsan_create_fiber(unsigned flags);
void __tsan_destroy_fiber(void *fiber);
void __tsan_switch_to_fiber(void *fiber, unsigned flags);
void __tsan_set_fiber_name(void *fiber, const char *name);
};
#endif //CONTEXT_SWITCHER_TSAN_SUPPORT
#include "context_switcher/context_switcher.h"
#ifdef THREAD_SANITIZER
#include "context_switcher/tsan_support.h"
#include <sanitizer/tsan_interface.h>
#endif
namespace context_switcher {
......@@ -26,35 +26,21 @@ continuation switch_context(continuation &&cont) {
}
#endif
#ifdef THREAD_SANITIZER
std::mutex shared_cache_mtx{};
std::unordered_map<char *, cached_fiber> shared_cached_fibers{};
thread_local std::unordered_map<char *, cached_fiber> local_cached_fibers{};
void context_loop(fcontext::transfer_t initial_transfer) {
continuation successor = continuation{initial_transfer.continuation, initial_transfer.data};
do {
continuation_t cont_pointer = successor.consume();
__tsan_switch_to_fiber(successor.get_tsan_fiber(), 0);
fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, nullptr);
auto *user_code = static_cast<lambda_capture_base *>(transfer.data);
continuation_t last_cont_pointer = transfer.continuation;
successor = user_code->run(last_cont_pointer);
user_code->~lambda_capture_base();
} while (successor.valid());
}
continuation switch_context(continuation &&cont) {
continuation_t cont_pointer = cont.consume();
void *next_fiber = cont.get_tsan_fiber();
void *last_fiber = __tsan_get_current_fiber();
fcontext::fiber_data send_fiber_data{last_fiber, nullptr, false};
__tsan_switch_to_fiber(next_fiber, 0);
fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, last_fiber);
fcontext::transfer_t transfer = fcontext::jump_fcontext(cont_pointer, &send_fiber_data);
fcontext::fiber_data received_fiber_data = *(fcontext::fiber_data *) transfer.data;
if (transfer.data) {
return continuation{transfer.continuation, transfer.data};
} else {
if (received_fiber_data.finished) {
__tsan_destroy_fiber(received_fiber_data.old_fiber);
return continuation{nullptr, nullptr};
} else {
return continuation{transfer.continuation, received_fiber_data.old_fiber};
}
}
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment