Commit 731b47c5 by FritzFlorian

WIP: Remove unneeded attributes from scheduler.

parent 7c8d7a83
Pipeline #1402 failed with stages
in 40 seconds
...@@ -33,7 +33,7 @@ int pls_fib(int n) { ...@@ -33,7 +33,7 @@ int pls_fib(int n) {
constexpr int MAX_NUM_THREADS = 8; constexpr int MAX_NUM_THREADS = 8;
constexpr int MAX_NUM_TASKS = 32; constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 64; constexpr int MAX_STACK_SIZE = 1024 * 1;
static_scheduler_memory<MAX_NUM_THREADS, static_scheduler_memory<MAX_NUM_THREADS,
MAX_NUM_TASKS, MAX_NUM_TASKS,
...@@ -51,7 +51,6 @@ int main(int argc, char **argv) { ...@@ -51,7 +51,6 @@ int main(int argc, char **argv) {
scheduler scheduler{global_scheduler_memory, (unsigned) num_threads}; scheduler scheduler{global_scheduler_memory, (unsigned) num_threads};
volatile int res; volatile int res;
scheduler.perform_work([&]() { scheduler.perform_work([&]() {
for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) { for (int i = 0; i < fib::NUM_WARMUP_ITERATIONS; i++) {
res = pls_fib(fib::INPUT_N); res = pls_fib(fib::INPUT_N);
......
...@@ -152,24 +152,21 @@ struct cached_fiber { ...@@ -152,24 +152,21 @@ struct cached_fiber {
continuation_t entry_cont_; continuation_t entry_cont_;
void *tsan_fiber_; void *tsan_fiber_;
}; };
extern std::mutex cache_mtx; extern std::mutex shared_cache_mtx;
extern std::unordered_map<char *, cached_fiber> cached_fibers; extern std::unordered_map<char *, cached_fiber> shared_cached_fibers;
extern thread_local std::unordered_map<char *, cached_fiber> local_cached_fibers;
class lambda_capture_base { class lambda_capture_base {
public: public:
virtual continuation run(continuation_t cont) = 0; virtual continuation run(continuation_t cont) = 0;
virtual void deconstruct() = 0; virtual ~lambda_capture_base() = default;
}; };
template<typename F> template<typename F>
class lambda_capture : public lambda_capture_base { class lambda_capture : public lambda_capture_base {
public: public:
explicit lambda_capture(F lambda) : lambda_{lambda}, previous_tsan_fiber_{nullptr} {} explicit lambda_capture(F lambda) : lambda_{lambda}, previous_tsan_fiber_{nullptr} {}
~lambda_capture() override = default;
void deconstruct() override {
//TODO: re-add deconstructor call
// ~lambda_capture<F>();
}
continuation run(continuation_t cont) override { continuation run(continuation_t cont) override {
return lambda_(continuation{cont, previous_tsan_fiber_}); return lambda_(continuation{cont, previous_tsan_fiber_});
...@@ -195,15 +192,14 @@ continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F && ...@@ -195,15 +192,14 @@ continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&
stack_pointer_t stack_base = stack_memory + stack_size; stack_pointer_t stack_base = stack_memory + stack_size;
stack_pointer_t stack_limit = stack_memory; stack_pointer_t stack_limit = stack_memory;
auto *captured_lambda = place_lambda_capture(lambda, lambda_memory); auto local_cache_result = local_cached_fibers.find(stack_memory);
continuation_t fiber_cont; if (local_cache_result == local_cached_fibers.end()) {
void *fiber_tsan; // No local entry...fill our local cache up
{ std::lock_guard<std::mutex> lock{shared_cache_mtx};
std::lock_guard<std::mutex> lock{cache_mtx};
auto shared_cache_result = shared_cached_fibers.find(stack_memory);
auto cache_result = cached_fibers.find(stack_memory); if (shared_cache_result == shared_cached_fibers.end()) {
if (cache_result == cached_fibers.end()) { // No shared entry...we have a new fiber, create it
// No entry found...create it
fcontext::callback_t callback = context_loop; fcontext::callback_t callback = context_loop;
fcontext::continuation_t fcontext::continuation_t
initial_context = fcontext::make_fcontext(stack_base, stack_base - stack_limit, callback); initial_context = fcontext::make_fcontext(stack_base, stack_base - stack_limit, callback);
...@@ -213,14 +209,22 @@ continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F && ...@@ -213,14 +209,22 @@ continuation enter_context(stack_pointer_t stack_memory, size_t stack_size, F &&
__tsan_switch_to_fiber(new_fiber, 0); __tsan_switch_to_fiber(new_fiber, 0);
fcontext::transfer_t fiber_context = fcontext::jump_fcontext(initial_context, old_fiber); fcontext::transfer_t fiber_context = fcontext::jump_fcontext(initial_context, old_fiber);
cached_fibers[stack_memory].tsan_fiber_ = new_fiber; shared_cached_fibers[stack_memory].tsan_fiber_ = new_fiber;
cached_fibers[stack_memory].entry_cont_ = fiber_context.continuation; shared_cached_fibers[stack_memory].entry_cont_ = fiber_context.continuation;
} }
fiber_tsan = cached_fibers[stack_memory].tsan_fiber_; local_cached_fibers[stack_memory].tsan_fiber_ = shared_cached_fibers[stack_memory].tsan_fiber_;
fiber_cont = cached_fibers[stack_memory].entry_cont_; local_cached_fibers[stack_memory].entry_cont_ = shared_cached_fibers[stack_memory].entry_cont_;
} }
// We always filled up our local cache with the required fiber here.
// After 'warmup' we will have seen all fibers and we avoid holding the lock between
// fiber switches. This is less for performance and more to not create artificial
// 'happens before' relationships between unrelated fibers.
continuation_t fiber_cont = local_cached_fibers[stack_memory].entry_cont_;
void *fiber_tsan = local_cached_fibers[stack_memory].tsan_fiber_;
auto *captured_lambda = place_lambda_capture(lambda, lambda_memory);
captured_lambda->set_previous_tsan_fiber(__tsan_get_current_fiber()); captured_lambda->set_previous_tsan_fiber(__tsan_get_current_fiber());
__tsan_switch_to_fiber(fiber_tsan, 0); __tsan_switch_to_fiber(fiber_tsan, 0);
fcontext::transfer_t transfer = fcontext::jump_fcontext(fiber_cont, captured_lambda); fcontext::transfer_t transfer = fcontext::jump_fcontext(fiber_cont, captured_lambda);
......
...@@ -26,8 +26,9 @@ continuation switch_context(continuation &&cont) { ...@@ -26,8 +26,9 @@ continuation switch_context(continuation &&cont) {
} }
#endif #endif
#ifdef THREAD_SANITIZER #ifdef THREAD_SANITIZER
std::mutex cache_mtx{}; std::mutex shared_cache_mtx{};
std::unordered_map<char *, cached_fiber> cached_fibers{}; std::unordered_map<char *, cached_fiber> shared_cached_fibers{};
thread_local std::unordered_map<char *, cached_fiber> local_cached_fibers{};
void context_loop(fcontext::transfer_t initial_transfer) { void context_loop(fcontext::transfer_t initial_transfer) {
continuation successor = continuation{initial_transfer.continuation, initial_transfer.data}; continuation successor = continuation{initial_transfer.continuation, initial_transfer.data};
...@@ -39,7 +40,7 @@ void context_loop(fcontext::transfer_t initial_transfer) { ...@@ -39,7 +40,7 @@ void context_loop(fcontext::transfer_t initial_transfer) {
auto *user_code = static_cast<lambda_capture_base *>(transfer.data); auto *user_code = static_cast<lambda_capture_base *>(transfer.data);
continuation_t last_cont_pointer = transfer.continuation; continuation_t last_cont_pointer = transfer.continuation;
successor = user_code->run(last_cont_pointer); successor = user_code->run(last_cont_pointer);
user_code->deconstruct(); user_code->~lambda_capture_base();
} while (successor.valid()); } while (successor.valid());
} }
......
...@@ -16,6 +16,6 @@ ...@@ -16,6 +16,6 @@
void pls_error(const char *msg); void pls_error(const char *msg);
// TODO: Distinguish between debug/internal asserts and production asserts. // TODO: Distinguish between debug/internal asserts and production asserts.
#define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); } #define PLS_ASSERT(cond, msg) // if (!(cond)) { pls_error(msg); }
#endif //PLS_ERROR_HANDLING_H #endif //PLS_ERROR_HANDLING_H
...@@ -26,7 +26,6 @@ class scheduler::init_function_impl : public init_function { ...@@ -26,7 +26,6 @@ class scheduler::init_function_impl : public init_function {
explicit init_function_impl(F &function) : function_{function} {} explicit init_function_impl(F &function) : function_{function} {}
void run() override { void run() override {
auto &root_task = thread_state::get().get_task_manager().get_active_task(); auto &root_task = thread_state::get().get_task_manager().get_active_task();
root_task.clean_ = true;
root_task.run_as_task([&](context_switcher::continuation cont) { root_task.run_as_task([&](context_switcher::continuation cont) {
thread_state::get().set_main_continuation(std::move(cont)); thread_state::get().set_main_continuation(std::move(cont));
function_(); function_();
......
...@@ -36,13 +36,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { ...@@ -36,13 +36,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task {
thread_id_ = thread_id; thread_id_ = thread_id;
} }
context_switcher::continuation get_continuation() {
return std::move(continuation_);
}
void set_continuation(context_switcher::continuation &&continuation) {
continuation_ = std::move(continuation);
}
template<typename F> template<typename F>
context_switcher::continuation run_as_task(F &&lambda) { context_switcher::continuation run_as_task(F &&lambda) {
return context_switcher::enter_context(stack_memory_, stack_size_, std::forward<F>(lambda)); return context_switcher::enter_context(stack_memory_, stack_size_, std::forward<F>(lambda));
...@@ -58,10 +51,8 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { ...@@ -58,10 +51,8 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task {
std::atomic<traded_cas_field> external_trading_deque_cas_{}; std::atomic<traded_cas_field> external_trading_deque_cas_{};
std::atomic<task *> resource_stack_next_{}; std::atomic<task *> resource_stack_next_{};
std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}}; std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}};
bool clean_;
// Task Tree (we have a parent that we want to continue when we finish) // Task Tree (we have a parent that we want to continue when we finish)
task *parent_task_;
unsigned depth_; unsigned depth_;
unsigned thread_id_; unsigned thread_id_;
......
...@@ -22,7 +22,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { ...@@ -22,7 +22,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
unsigned id_; unsigned id_;
task_manager &task_manager_; task_manager &task_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_;
alignas(base::system_details::CACHE_LINE_SIZE) context_switcher::continuation main_loop_continuation_; alignas(base::system_details::CACHE_LINE_SIZE) context_switcher::continuation main_loop_continuation_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
...@@ -31,7 +30,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { ...@@ -31,7 +30,6 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
scheduler_{nullptr}, scheduler_{nullptr},
id_{0}, id_{0},
task_manager_{task_manager}, task_manager_{task_manager},
current_task_{nullptr},
random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {}; random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {};
/** /**
......
...@@ -138,11 +138,9 @@ void task_manager::sync() { ...@@ -138,11 +138,9 @@ void task_manager::sync() {
context_switcher::continuation result_cont; context_switcher::continuation result_cont;
if (spawning_task_manager->try_clean_return(result_cont)) { if (spawning_task_manager->try_clean_return(result_cont)) {
// We return back to the main scheduling loop // We return back to the main scheduling loop
active_task_->clean_ = true;
return result_cont; return result_cont;
} else { } else {
// We finish up the last task // We finish up the last task
active_task_->clean_ = false;
return result_cont; return result_cont;
} }
}); });
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment