Commit cfffd161 by FritzFlorian

Finish pulling out deque/resource stacks from scheduler logic.

parent f9e6fc51
Pipeline #1441 failed with stages
in 3 minutes 31 seconds
......@@ -39,7 +39,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/lock_free/task.h
include/pls/internal/scheduling/lock_free/task_manager.h src/internal/scheduling/lock_free/task_manager.cpp
include/pls/internal/scheduling/lock_free/external_trading_deque.h src/internal/scheduling/lock_free/external_trading_deque.cpp
include/pls/internal/scheduling/lock_free/traded_cas_field.h)
include/pls/internal/scheduling/lock_free/traded_cas_field.h src/internal/scheduling/lock_free/task.cpp)
# Dependencies for pls
target_link_libraries(pls Threads::Threads)
......
......@@ -8,12 +8,24 @@
namespace pls::internal::scheduling::lock_free {
/**
* Lock free task variant.
* Needs extra fields to manage the trading and resources.
* RELIES on the fact that tasks have a unique depth and thread id,
* allowing to refer to a task solely by this property!
*/
struct task : public base_task {
task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) :
base_task(stack_memory, stack_size, depth, thread_id) {}
// Additional info for lock-free stealing and resource trading.
std::atomic<traded_cas_field> external_trading_deque_cas_{};
void push_task_chain(task *spare_task_chain);
task *pop_task_chain();
void reset_task_chain();
private:
std::atomic<base_task *> resource_stack_next_{};
std::atomic<data_structures::stamped_integer> resource_stack_root_{{0, 0}};
};
......
......@@ -5,6 +5,7 @@
#include <memory>
#include <utility>
#include <array>
#include <tuple>
#include "pls/internal/base/stack_allocator.h"
......@@ -21,7 +22,8 @@ namespace pls::internal::scheduling::lock_free {
* Handles management of tasks in the system. Each thread has a local task manager,
* responsible for allocating, freeing and publishing tasks for stealing.
*
* All interaction for spawning, stealing and task trading are managed through this class.
* The task manager handles steal/sync details, the scheduler handles all other aspects.
* By doing this the task manager can be exchanged for a different implementation for testing purposes.
*/
class task_manager {
using stack_allocator = pls::internal::base::stack_allocator;
......@@ -40,15 +42,11 @@ class task_manager {
base_task *pop_local_task();
// Stealing work, automatically trades in another task
base_task *steal_task(thread_state &stealing_state);
std::tuple<base_task *, base_task *> steal_task(thread_state &stealing_state);
// Sync/memory management
base_task *pop_clean_task_chain(base_task *task);
private:
// Internal helpers for resource stack on tasks
void push_resource_on_task(task *target_task, task *spare_task_chain);
task *pop_resource_from_task(task *target_task);
std::shared_ptr<stack_allocator> stack_allocator_;
std::vector<std::unique_ptr<task>> tasks_;
......
#include "pls/internal/scheduling/base_task.h"
namespace pls {
namespace internal {
namespace scheduling {
}
}
namespace pls::internal::scheduling {
}
#include "pls/internal/scheduling/lock_free/task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls::internal::scheduling::lock_free {
// TODO: this 'global' lookup hardly bound to the full scheduler to be setup could be reworked for better testing.
static task *find_task(unsigned id, unsigned depth) {
return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth);
}
void task::push_task_chain(task *spare_task_chain) {
PLS_ASSERT(this->thread_id_ != spare_task_chain->thread_id_,
"Makes no sense to push task onto itself, as it is not clean by definition.");
PLS_ASSERT(this->depth_ == spare_task_chain->depth_,
"Must only push tasks with correct depth.");
data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root;
do {
current_root = this->resource_stack_root_.load();
target_root.stamp = current_root.stamp + 1;
target_root.value = spare_task_chain->thread_id_ + 1;
if (current_root.value == 0) {
// Empty, simply push in with no successor
spare_task_chain->resource_stack_next_.store(nullptr);
} else {
// Already an entry. Find it's corresponding task and set it as our successor.
auto *current_root_task = find_task(current_root.value - 1, this->depth_);
spare_task_chain->resource_stack_next_.store(current_root_task);
}
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root));
}
task *task::pop_task_chain() {
data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root;
task *output_task;
do {
current_root = this->resource_stack_root_.load();
if (current_root.value == 0) {
// Empty...
return nullptr;
} else {
// Found something, try to pop it
auto *current_root_task = find_task(current_root.value - 1, this->depth_);
auto *next_stack_task = current_root_task->resource_stack_next_.load();
target_root.stamp = current_root.stamp + 1;
target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
output_task = current_root_task;
}
} while (!this->resource_stack_root_.compare_exchange_strong(current_root, target_root));
PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains.");
output_task->resource_stack_next_.store(nullptr);
return output_task;
}
void task::reset_task_chain() {
auto current_root = this->resource_stack_root_.load();
current_root.stamp++;
current_root.value = 0;
this->resource_stack_root_.store(current_root);
this->resource_stack_next_.store(nullptr);
}
}
......@@ -14,7 +14,7 @@ task_manager::task_manager(unsigned thread_id,
deque_{thread_id, num_tasks} {
tasks_.reserve(num_tasks);
for (size_t i = 0; i < num_tasks - 1; i++) {
for (size_t i = 0; i < num_tasks; i++) {
char *stack_memory = stack_allocator->allocate_stack(stack_size);
tasks_.emplace_back(std::make_unique<task>(stack_memory, stack_size, i, thread_id));
......@@ -31,10 +31,6 @@ task_manager::~task_manager() {
}
}
static task *find_task(unsigned id, unsigned depth) {
return thread_state::get().get_scheduler().thread_state_for(id).get_task_manager().get_task(depth);
}
void task_manager::push_local_task(base_task *pushed_task) {
deque_.push_bot(static_cast<task *>(pushed_task));
}
......@@ -48,21 +44,17 @@ base_task *task_manager::pop_local_task() {
}
}
base_task *task_manager::steal_task(thread_state &stealing_state) {
std::tuple<base_task *, base_task *> task_manager::steal_task(thread_state &stealing_state) {
PLS_ASSERT(stealing_state.get_active_task()->depth_ == 0, "Must only steal with clean task chain.");
PLS_ASSERT(scheduler::check_task_chain(*stealing_state.get_active_task()), "Must only steal with clean task chain.");
auto peek = deque_.peek_top();
if (peek.top_task_) {
// search for the task we want to trade in
task *stolen_task = static_cast<task *>(*peek.top_task_);
// get a suitable task to trade in
// TODO: opt. add debug marker to traded in tasks that we do not accidentally use them.
task *traded_task = static_cast<task *>(&scheduler::task_chain_at(stolen_task->depth_, stealing_state));
// keep a reference to the rest of the task chain that we keep
base_task *next_own_task = traded_task->next_;
// 'unchain' the traded tasks (to help us find bugs)
traded_task->next_ = nullptr;
// perform the actual pop operation
auto pop_result_task = deque_.pop_top(traded_task, peek);
if (pop_result_task) {
......@@ -71,15 +63,8 @@ base_task *task_manager::steal_task(thread_state &stealing_state) {
PLS_ASSERT(*pop_result_task == stolen_task,
"We must only steal the task that we peeked at!");
// TODO: the re-chaining should not be part of the task manager.
// The manager should only perform the steal + resource push.
// the steal was a success, link the chain so we own the stolen part
stolen_task->next_ = next_own_task;
next_own_task->prev_ = stolen_task;
// update the resource stack associated with the stolen task
push_resource_on_task(stolen_task, traded_task);
stolen_task->push_task_chain(traded_task);
auto optional_exchanged_task = external_trading_deque::get_trade_object(stolen_task);
if (optional_exchanged_task) {
......@@ -89,90 +74,34 @@ base_task *task_manager::steal_task(thread_state &stealing_state) {
} else {
// The last other active thread took it as its spare resource...
// ...remove our traded object from the stack again (it must be empty now and no one must access it anymore).
auto current_root = stolen_task->resource_stack_root_.load();
current_root.stamp++;
current_root.value = 0;
stolen_task->resource_stack_root_.store(current_root);
stolen_task->reset_task_chain();
}
return stolen_task;
return std::pair{stolen_task, traded_task};
} else {
// the steal failed, reset our chain to its old, clean state (re-link what we have broken)
traded_task->next_ = next_own_task;
return nullptr;
return std::pair{nullptr, nullptr};
}
} else {
return nullptr;
return std::pair{nullptr, nullptr};
}
}
base_task *task_manager::pop_clean_task_chain(base_task *base_task) {
task *popped_task = static_cast<task *>(base_task);
// Try to get a clean resource chain to go back to the main stealing loop
task *clean_chain = pop_resource_from_task(popped_task);
task *clean_chain = popped_task->pop_task_chain();
if (clean_chain == nullptr) {
// double-check if we are really last one or we only have unlucky timing
auto optional_cas_task = external_trading_deque::get_trade_object(popped_task);
if (optional_cas_task) {
clean_chain = *optional_cas_task;
} else {
clean_chain = pop_resource_from_task(popped_task);
clean_chain = popped_task->pop_task_chain();
}
}
return clean_chain;
}
void task_manager::push_resource_on_task(task *target_task, task *spare_task_chain) {
PLS_ASSERT(target_task->thread_id_ != spare_task_chain->thread_id_,
"Makes no sense to push task onto itself, as it is not clean by definition.");
PLS_ASSERT(target_task->depth_ == spare_task_chain->depth_,
"Must only push tasks with correct depth.");
data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root;
do {
current_root = target_task->resource_stack_root_.load();
target_root.stamp = current_root.stamp + 1;
target_root.value = spare_task_chain->thread_id_ + 1;
if (current_root.value == 0) {
// Empty, simply push in with no successor
spare_task_chain->resource_stack_next_.store(nullptr);
} else {
// Already an entry. Find it's corresponding task and set it as our successor.
auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
spare_task_chain->resource_stack_next_.store(current_root_task);
}
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
}
task *task_manager::pop_resource_from_task(task *target_task) {
data_structures::stamped_integer current_root;
data_structures::stamped_integer target_root;
task *output_task;
do {
current_root = target_task->resource_stack_root_.load();
if (current_root.value == 0) {
// Empty...
return nullptr;
} else {
// Found something, try to pop it
auto *current_root_task = find_task(current_root.value - 1, target_task->depth_);
auto *next_stack_task = current_root_task->resource_stack_next_.load();
target_root.stamp = current_root.stamp + 1;
target_root.value = next_stack_task != nullptr ? next_stack_task->thread_id_ + 1 : 0;
output_task = current_root_task;
}
} while (!target_task->resource_stack_root_.compare_exchange_strong(current_root, target_root));
PLS_ASSERT(scheduler::check_task_chain_backward(*output_task), "Must only pop proper task chains.");
output_task->resource_stack_next_.store(nullptr);
return output_task;
}
}
......@@ -56,13 +56,14 @@ void scheduler::work_thread_work_section() {
} while (target == my_state.get_thread_id());
thread_state &target_state = my_state.get_scheduler().thread_state_for(target);
base_task *stolen_task = target_state.get_task_manager().steal_task(my_state);
auto[stolen_task, traded_task] = target_state.get_task_manager().steal_task(my_state);
if (stolen_task) {
// Keep task chain consistent. We want to appear as if we are working an a branch upwards of the stolen task.
base_task *next_own_task = traded_task->next_;
stolen_task->next_ = next_own_task;
next_own_task->prev_ = stolen_task;
my_state.set_active_task(stolen_task);
// TODO: Figure out how to model 'steal' interaction .
// The scheduler should decide on 'what to steal' and on how 'to manage the chains'.
// The task_manager should perform the act of actually performing the steal/trade.
// Maybe also give the chain management to the task_manager and associate resources with the traded tasks.
PLS_ASSERT(check_task_chain_forward(*my_state.get_active_task()),
"We are sole owner of this chain, it has to be valid!");
......
......@@ -4,5 +4,5 @@ add_executable(tests
base_tests.cpp
scheduling_tests.cpp
patterns_test.cpp
test_helpers.h)
test_helpers.h scheduling_lock_free_tests.cpp)
target_link_libraries(tests catch2 pls)
#include <catch.hpp>
#include <atomic>
#include "pls/pls.h"
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/thread_state.h"
#if PLS_DEQUE_VARIANT == PLS_DEQUE_LOCK_FREE
#include "pls/internal/scheduling/lock_free/traded_cas_field.h"
#include "pls/internal/scheduling/lock_free/external_trading_deque.h"
using namespace pls::internal::scheduling::lock_free;
TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/lock_free/traded_cas_field]") {
traded_cas_field empty_field;
REQUIRE(empty_field.is_empty());
REQUIRE(!empty_field.is_filled_with_stamp());
REQUIRE(!empty_field.is_filled_with_object());
const int stamp = 42;
const int ID = 10;
traded_cas_field tag_field;
tag_field.fill_with_stamp(stamp, ID);
REQUIRE(tag_field.is_filled_with_stamp());
REQUIRE(!tag_field.is_empty());
REQUIRE(!tag_field.is_filled_with_object());
REQUIRE(tag_field.get_stamp() == stamp);
REQUIRE(tag_field.get_deque_id() == ID);
alignas(64) task obj{nullptr, 0, 0, 0};
traded_cas_field obj_field;
obj_field.fill_with_trade_object(&obj);
REQUIRE(obj_field.is_filled_with_object());
REQUIRE(!obj_field.is_empty());
REQUIRE(!obj_field.is_filled_with_stamp());
}
TEST_CASE("task resource stack", "[internal/scheduling/lock_free/task]") {
// simulate scheduler with four threads and depth 1. We are thread 0.
pls::scheduler scheduler{4, 1, 4096, false};
pls::internal::scheduling::thread_state::set(&scheduler.thread_state_for(0));
task *tasks[] = {scheduler.task_manager_for(0).get_task(0),
scheduler.task_manager_for(1).get_task(0),
scheduler.task_manager_for(2).get_task(0),
scheduler.task_manager_for(3).get_task(0)};
SECTION("simple push/pop") {
tasks[0]->push_task_chain(tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
}
SECTION("multiple pushes") {
tasks[0]->push_task_chain(tasks[1]);
tasks[0]->push_task_chain(tasks[2]);
tasks[0]->push_task_chain(tasks[3]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[3]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[2]);
REQUIRE(tasks[0]->pop_task_chain() == tasks[1]);
REQUIRE(tasks[0]->pop_task_chain() == nullptr);
}
}
TEST_CASE("external trading deque", "[internal/scheduling/lock_free/external_trading_deque]") {
external_trading_deque deque_1{1, 16};
external_trading_deque deque_2{2, 16};
task tasks[4] = {{nullptr, 0, 0, 0},
{nullptr, 0, 1, 0},
{nullptr, 0, 2, 0},
{nullptr, 0, 3, 0}};
SECTION("basic operations") {
// Must start empty
REQUIRE(!deque_1.pop_bot());
REQUIRE(!deque_2.pop_bot());
// Local push/pop
deque_1.push_bot(&tasks[0]);
REQUIRE(*deque_1.pop_bot() == &tasks[0]);
REQUIRE(!deque_1.pop_bot());
// Local push, external pop
deque_1.push_bot(&tasks[0]);
auto peek = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[1], peek) == &tasks[0]);
REQUIRE(*external_trading_deque::get_trade_object(&tasks[0]) == &tasks[1]);
REQUIRE(!deque_1.pop_top(&tasks[1], peek));
REQUIRE(!deque_1.pop_bot());
// Keeps push/pop order
deque_1.push_bot(&tasks[0]);
deque_1.push_bot(&tasks[1]);
REQUIRE(*deque_1.pop_bot() == &tasks[1]);
REQUIRE(*deque_1.pop_bot() == &tasks[0]);
REQUIRE(!deque_1.pop_bot());
deque_1.push_bot(&tasks[0]);
deque_1.push_bot(&tasks[1]);
auto peek1 = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[2], peek1) == &tasks[0]);
auto peek2 = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[3], peek2) == &tasks[1]);
}
SECTION("Interwined execution #1") {
// Two top poppers
deque_1.push_bot(&tasks[0]);
auto peek1 = deque_1.peek_top();
auto peek2 = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[1], peek1) == &tasks[0]);
REQUIRE(!deque_1.pop_top(&tasks[2], peek2));
}
SECTION("Interwined execution #2") {
// Top and bottom access
deque_1.push_bot(&tasks[0]);
auto peek1 = deque_1.peek_top();
REQUIRE(*deque_1.pop_bot() == &tasks[0]);
REQUIRE(!deque_1.pop_top(&tasks[2], peek1));
}
}
#endif // PLS_DEQUE_VARIANT == PLS_DEQUE_LOCK_FREE
#include <catch.hpp>
#include <atomic>
#include "pls/internal/scheduling/lock_free/traded_cas_field.h"
#include "pls/internal/scheduling/lock_free/external_trading_deque.h"
#include "pls/pls.h"
using namespace pls::internal::scheduling;
using namespace pls::internal::scheduling::lock_free;
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 1024 * 8;
TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling/scheduler.h]") {
TEST_CASE("scheduler correctly initializes", "[internal/scheduling/scheduler]") {
const unsigned num_tasks = 16;
const unsigned num_threads = 4;
pls::scheduler scheduler{num_threads, num_tasks, 4096, false};
SECTION("task chains are valid") {
for (unsigned i = 0; i < num_threads; i++) {
task_manager &manager = scheduler.task_manager_for(i);
for (unsigned j = 0; j < num_tasks; j++) {
REQUIRE(manager.get_task(j)->depth_ == j);
REQUIRE(manager.get_task(j)->thread_id_ == i);
if (j < num_tasks - 1) {
REQUIRE(manager.get_task(j)->next_ == manager.get_task(j + 1));
}
if (j > 0) {
REQUIRE(manager.get_task(j)->prev_ == manager.get_task(j - 1));
}
}
}
}
}
TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling/scheduler]") {
scheduler scheduler{3, MAX_NUM_TASKS, MAX_STACK_SIZE};
std::atomic<int> num_run{0};
......@@ -34,87 +54,3 @@ TEST_CASE("tasks distributed over workers (do not block)", "[internal/scheduling
});
REQUIRE(num_run == 3);
}
TEST_CASE("traded cas field bitmaps correctly", "[internal/scheduling/traded_cas_field.h]") {
traded_cas_field empty_field;
REQUIRE(empty_field.is_empty());
REQUIRE(!empty_field.is_filled_with_stamp());
REQUIRE(!empty_field.is_filled_with_object());
const int stamp = 42;
const int ID = 10;
traded_cas_field tag_field;
tag_field.fill_with_stamp(stamp, ID);
REQUIRE(tag_field.is_filled_with_stamp());
REQUIRE(!tag_field.is_empty());
REQUIRE(!tag_field.is_filled_with_object());
REQUIRE(tag_field.get_stamp() == stamp);
REQUIRE(tag_field.get_deque_id() == ID);
alignas(64) task obj{nullptr, 0, 0, 0};
traded_cas_field obj_field;
obj_field.fill_with_trade_object(&obj);
REQUIRE(obj_field.is_filled_with_object());
REQUIRE(!obj_field.is_empty());
REQUIRE(!obj_field.is_filled_with_stamp());
}
TEST_CASE("external trading deque", "[internal/scheduling/external_trading_deque]") {
external_trading_deque deque_1{1, 16};
external_trading_deque deque_2{2, 16};
task tasks[4] = {{nullptr, 0, 0, 0},
{nullptr, 0, 1, 0},
{nullptr, 0, 2, 0},
{nullptr, 0, 3, 0}};
SECTION("basic operations") {
// Must start empty
REQUIRE(!deque_1.pop_bot());
REQUIRE(!deque_2.pop_bot());
// Local push/pop
deque_1.push_bot(&tasks[0]);
REQUIRE(*deque_1.pop_bot() == &tasks[0]);
REQUIRE(!deque_1.pop_bot());
// Local push, external pop
deque_1.push_bot(&tasks[0]);
auto peek = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[1], peek) == &tasks[0]);
REQUIRE(*external_trading_deque::get_trade_object(&tasks[0]) == &tasks[1]);
REQUIRE(!deque_1.pop_top(&tasks[1], peek));
REQUIRE(!deque_1.pop_bot());
// Keeps push/pop order
deque_1.push_bot(&tasks[0]);
deque_1.push_bot(&tasks[1]);
REQUIRE(*deque_1.pop_bot() == &tasks[1]);
REQUIRE(*deque_1.pop_bot() == &tasks[0]);
REQUIRE(!deque_1.pop_bot());
deque_1.push_bot(&tasks[0]);
deque_1.push_bot(&tasks[1]);
auto peek1 = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[2], peek1) == &tasks[0]);
auto peek2 = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[3], peek2) == &tasks[1]);
}
SECTION("Interwined execution #1") {
// Two top poppers
deque_1.push_bot(&tasks[0]);
auto peek1 = deque_1.peek_top();
auto peek2 = deque_1.peek_top();
REQUIRE(*deque_1.pop_top(&tasks[1], peek1) == &tasks[0]);
REQUIRE(!deque_1.pop_top(&tasks[2], peek2));
}
SECTION("Interwined execution #2") {
// Top and bottom access
deque_1.push_bot(&tasks[0]);
auto peek1 = deque_1.peek_top();
REQUIRE(*deque_1.pop_bot() == &tasks[0]);
REQUIRE(!deque_1.pop_top(&tasks[2], peek1));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment