Commit 3687a591 by FritzFlorian

Fix: race-condition on traded in task-chains.

parent cfffd161
Pipeline #1442 passed with stages
in 3 minutes 39 seconds
...@@ -66,6 +66,7 @@ class scheduler::init_function_impl : public init_function { ...@@ -66,6 +66,7 @@ class scheduler::init_function_impl : public init_function {
void run() override { void run() override {
base_task *root_task = thread_state::get().get_active_task(); base_task *root_task = thread_state::get().get_active_task();
root_task->run_as_task([root_task, this](::context_switcher::continuation cont) { root_task->run_as_task([root_task, this](::context_switcher::continuation cont) {
root_task->is_synchronized_ = true;
thread_state::get().main_continuation() = std::move(cont); thread_state::get().main_continuation() = std::move(cont);
function_(); function_();
thread_state::get().get_scheduler().work_section_done_.store(true); thread_state::get().get_scheduler().work_section_done_.store(true);
......
...@@ -54,6 +54,7 @@ std::tuple<base_task *, base_task *> task_manager::steal_task(thread_state &stea ...@@ -54,6 +54,7 @@ std::tuple<base_task *, base_task *> task_manager::steal_task(thread_state &stea
// get a suitable task to trade in // get a suitable task to trade in
// TODO: opt. add debug marker to traded in tasks that we do not accidentally use them. // TODO: opt. add debug marker to traded in tasks that we do not accidentally use them.
task *traded_task = static_cast<task *>(&scheduler::task_chain_at(stolen_task->depth_, stealing_state)); task *traded_task = static_cast<task *>(&scheduler::task_chain_at(stolen_task->depth_, stealing_state));
base_task *chain_after_stolen_task = traded_task->next_;
// perform the actual pop operation // perform the actual pop operation
auto pop_result_task = deque_.pop_top(traded_task, peek); auto pop_result_task = deque_.pop_top(traded_task, peek);
...@@ -77,7 +78,7 @@ std::tuple<base_task *, base_task *> task_manager::steal_task(thread_state &stea ...@@ -77,7 +78,7 @@ std::tuple<base_task *, base_task *> task_manager::steal_task(thread_state &stea
stolen_task->reset_task_chain(); stolen_task->reset_task_chain();
} }
return std::pair{stolen_task, traded_task}; return std::pair{stolen_task, chain_after_stolen_task};
} else { } else {
return std::pair{nullptr, nullptr}; return std::pair{nullptr, nullptr};
} }
...@@ -103,5 +104,4 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) { ...@@ -103,5 +104,4 @@ base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
return clean_chain; return clean_chain;
} }
} }
...@@ -56,12 +56,11 @@ void scheduler::work_thread_work_section() { ...@@ -56,12 +56,11 @@ void scheduler::work_thread_work_section() {
} while (target == my_state.get_thread_id()); } while (target == my_state.get_thread_id());
thread_state &target_state = my_state.get_scheduler().thread_state_for(target); thread_state &target_state = my_state.get_scheduler().thread_state_for(target);
auto[stolen_task, traded_task] = target_state.get_task_manager().steal_task(my_state); auto[stolen_task, chain_after_stolen_task] = target_state.get_task_manager().steal_task(my_state);
if (stolen_task) { if (stolen_task) {
// Keep task chain consistent. We want to appear as if we are working an a branch upwards of the stolen task. // Keep task chain consistent. We want to appear as if we are working an a branch upwards of the stolen task.
base_task *next_own_task = traded_task->next_; stolen_task->next_ = chain_after_stolen_task;
stolen_task->next_ = next_own_task; chain_after_stolen_task->prev_ = stolen_task;
next_own_task->prev_ = stolen_task;
my_state.set_active_task(stolen_task); my_state.set_active_task(stolen_task);
PLS_ASSERT(check_task_chain_forward(*my_state.get_active_task()), PLS_ASSERT(check_task_chain_forward(*my_state.get_active_task()),
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "pls/pls.h" #include "pls/pls.h"
using namespace pls::internal::scheduling; using namespace pls::internal::scheduling;
using namespace pls::internal::scheduling::lock_free;
constexpr int MAX_NUM_TASKS = 32; constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 8; constexpr int MAX_STACK_SIZE = 1024 * 8;
...@@ -32,7 +33,7 @@ TEST_CASE("scheduler correctly initializes", "[internal/scheduling/scheduler]") ...@@ -32,7 +33,7 @@ TEST_CASE("scheduler correctly initializes", "[internal/scheduling/scheduler]")
} }
} }
TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling/scheduler]") { TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling/scheduler.h]") {
scheduler scheduler{3, MAX_NUM_TASKS, MAX_STACK_SIZE}; scheduler scheduler{3, MAX_NUM_TASKS, MAX_STACK_SIZE};
std::atomic<int> num_run{0}; std::atomic<int> num_run{0};
...@@ -54,3 +55,32 @@ TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling ...@@ -54,3 +55,32 @@ TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling
}); });
REQUIRE(num_run == 3); REQUIRE(num_run == 3);
} }
unsigned fib_serial(unsigned n) {
if (n <= 1) {
return n;
}
return fib_serial(n - 1) + fib_serial(n - 2);
}
unsigned fib_pls(unsigned n) {
if (n <= 1) {
return n;
}
unsigned a, b;
pls::invoke(
[&a, n] { a = fib_pls(n - 1); },
[&b, n] { b = fib_pls(n - 2); }
);
return a + b;
}
TEST_CASE("simple fib", "[internal/scheduling/scheduler]") {
scheduler scheduler{3, MAX_NUM_TASKS, MAX_STACK_SIZE};
scheduler.perform_work([&] {
REQUIRE(fib_serial(28) == fib_pls(28));
});
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment