Commit 6394b203 by FritzFlorian

Add prototype support for other, strain_local resources.

The strain_local resources can be used to get unique IDs for resources that would, in a serial execution, only be used once in the call chain. This is a first prototype implementation and needs further tesitng. The current implementation adds linear overhead to each steal proportional to the number of used  resources. We might be able to reduce it, but for a first version it is fine.
parent 08283a37
Pipeline #1498 passed with stages
in 3 minutes 56 seconds
......@@ -7,6 +7,9 @@
using namespace comparison_benchmarks::base;
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 4096 * 1;
int pls_fib(int n) {
if (n == 0) {
return 0;
......@@ -27,9 +30,6 @@ int pls_fib(int n) {
return a + b;
}
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 4096 * 1;
int main(int argc, char **argv) {
int num_threads;
string directory;
......
......@@ -39,6 +39,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/base_task.h src/internal/scheduling/base_task.cpp
include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp
include/pls/internal/scheduling/task_manager.h
include/pls/internal/scheduling/strain_local_resource.h src/internal/scheduling/strain_local_resource.cpp
include/pls/internal/scheduling/lock_free/task.h
include/pls/internal/scheduling/lock_free/task_manager.h src/internal/scheduling/lock_free/task_manager.cpp
......@@ -46,8 +47,8 @@ add_library(pls STATIC
include/pls/internal/scheduling/lock_free/traded_cas_field.h src/internal/scheduling/lock_free/task.cpp
include/pls/internal/profiling/dag_node.h src/internal/profiling/dag_node.cpp
include/pls/internal/profiling/profiler.h
include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp src/internal/profiling/profiler.cpp)
include/pls/internal/profiling/profiler.h src/internal/profiling/profiler.cpp
include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp)
# Dependencies for pls
target_link_libraries(pls Threads::Threads)
......
......@@ -26,6 +26,7 @@ namespace pls::internal::scheduling {
* This base_task can be extended by different trading/stealing implementations,
* to add for example additional flags. The scheduler itself always works solely with this base version.
*/
struct strain_resource;
struct base_task {
base_task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) :
depth_{depth},
......@@ -34,7 +35,8 @@ struct base_task {
stack_size_{stack_size},
is_synchronized_{false},
prev_{nullptr},
next_{nullptr} {}
next_{nullptr},
attached_resources_{nullptr} {}
// Do not allow accidental copy/move operations.
// The whole runtime relies on tasks never changing memory positions during execution.
......@@ -63,6 +65,8 @@ struct base_task {
base_task *prev_;
base_task *next_;
std::atomic<strain_resource *> attached_resources_;
#if PLS_PROFILING_ENABLED
profiling::dag_node *profiling_node_;
#endif
......
......@@ -104,7 +104,7 @@ class scheduler {
void terminate();
[[nodiscard]] unsigned int num_threads() const { return num_threads_; }
[[nodiscard]] static base_task &task_chain_at(unsigned int depth, thread_state &calling_state);
[[nodiscard]] static base_task *get_trade_task(base_task *stolen_task, thread_state &calling_state);
static bool check_task_chain_forward(base_task &start_task);
static bool check_task_chain_backward(base_task &start_task);
......
......@@ -69,6 +69,7 @@ class scheduler::init_function_impl : public init_function {
explicit init_function_impl(F &function) : function_{function} {}
void run() override {
base_task *root_task = thread_state::get().get_active_task();
root_task->attached_resources_.store(nullptr, std::memory_order_relaxed);
#if PLS_PROFILING_ENABLED
thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(),
......@@ -151,6 +152,9 @@ void scheduler::spawn_internal(Function &&lambda) {
base_task *last_task = spawning_state.get_active_task();
base_task *spawned_task = last_task->next_;
auto *attached_resources = last_task->attached_resources_.load(std::memory_order_relaxed);
spawned_task->attached_resources_.store(attached_resources, std::memory_order_relaxed);
#if PLS_PROFILING_ENABLED
spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(),
spawned_task->stack_memory_,
......
#ifndef PLS_INTERNAL_SCHEDULING_STACK_RESOURCE_H_
#define PLS_INTERNAL_SCHEDULING_STACK_RESOURCE_H_
#include "pls/internal/base/system_details.h"
#include <vector>
#include <atomic>
#include <memory>
namespace pls::internal::scheduling {
class strain_local_resource;
struct PLS_CACHE_ALIGN strain_resource {
strain_resource(strain_local_resource *resource, unsigned index, unsigned depth) :
strain_local_resource_{resource}, index_{index}, depth_{depth} {}
strain_resource(const strain_resource &) = delete;
strain_resource &operator=(const strain_resource &) = delete;
strain_resource(strain_resource &&) = delete;
strain_resource &operator=(strain_resource &&) = delete;
strain_local_resource *const strain_local_resource_;
unsigned const index_;
unsigned const depth_;
bool used_{false};
std::atomic<strain_resource *> next_{nullptr};
};
class strain_local_resource {
// Handle to a stack-like resource (index of it).
// MUST be handled in a RAII like manner on the stack
// and must not be copied or moved for this to work.
class item_handle {
public:
item_handle(const item_handle &) = delete;
item_handle &operator=(const item_handle &) = delete;
item_handle(item_handle &&) = delete;
item_handle &operator=(item_handle &&) = delete;
explicit item_handle(strain_resource *resource) : resource_{resource} {}
~item_handle();
private:
strain_resource *resource_;
};
struct local_item {
local_item(strain_local_resource *strain_local_resource, unsigned index, unsigned depth) :
initial_resource_{std::make_unique<strain_resource>(strain_local_resource, index, depth)},
resource_{initial_resource_.get()} {}
std::unique_ptr<strain_resource> initial_resource_;
strain_resource *resource_;
};
public:
strain_local_resource(unsigned num_threads,
unsigned depth) : local_items_() {
local_items_.reserve(num_threads);
for (unsigned thread_id = 0; thread_id < num_threads; thread_id++) {
local_items_[thread_id].reserve(depth);
for (unsigned i = 0; i < depth; i++) {
// Start by each thread owning its local items
local_items_[thread_id].emplace_back(this, thread_id, i);
}
}
}
item_handle get_item(unsigned depth);
static strain_resource *get_local_copy(strain_resource *other_resources, unsigned thread_id);
static void acquire_locally(strain_resource *other_resources, unsigned thread_id);
private:
std::vector<std::vector<local_item>> local_items_;
};
}
#endif //PLS_INTERNAL_SCHEDULING_STACK_RESOURCE_H_
......@@ -50,8 +50,7 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state
if (peek.top_task_) {
task *stolen_task = peek.top_task_;
// get a suitable task to trade in
// TODO: opt. add debug marker to traded in tasks that we do not accidentally use them.
task *traded_task = static_cast<task *>(&scheduler::task_chain_at(stolen_task->depth_, stealing_state));
task *traded_task = static_cast<task *>(scheduler::get_trade_task(stolen_task, stealing_state));
base_task *chain_after_stolen_task = traded_task->next_;
// perform the actual pop operation
......
......@@ -2,6 +2,7 @@
#include "context_switcher/context_switcher.h"
#include "pls/internal/scheduling/strain_local_resource.h"
#include "pls/internal/build_flavour.h"
#include "pls/internal/base/futex_wrapper.h"
#include "pls/internal/base/error_handling.h"
......@@ -85,6 +86,9 @@ void scheduler::work_thread_work_section() {
stolen_task->next_ = chain_after_stolen_task;
chain_after_stolen_task->prev_ = stolen_task;
my_state.set_active_task(stolen_task);
// Keep locally owned resources consistent.
auto *stolen_resources = stolen_task->attached_resources_.load(std::memory_order_relaxed);
strain_local_resource::acquire_locally(stolen_resources, my_state.get_thread_id());
PLS_ASSERT(check_task_chain_forward(*my_state.get_active_task()),
"We are sole owner of this chain, it has to be valid!");
......@@ -223,6 +227,10 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat
this_task->prev_ = clean_chain;
clean_chain->next_ = this_task;
// Keep locally owned resources consistent.
auto *clean_resources = clean_chain->attached_resources_.load(std::memory_order_relaxed);
strain_local_resource::acquire_locally(clean_resources, calling_state.get_thread_id());
base_task *active_task = clean_chain;
while (active_task->depth_ > 0) {
active_task = active_task->prev_;
......@@ -248,17 +256,22 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat
}
}
base_task &scheduler::task_chain_at(unsigned int depth, thread_state &calling_state) {
// TODO: possible optimize with cache array at steal events
base_task *scheduler::get_trade_task(base_task *stolen_task, thread_state &calling_state) {
// Get task itself
base_task *result = calling_state.get_active_task();
while (result->depth_ > depth) {
while (result->depth_ > stolen_task->depth_) {
result = result->prev_;
}
while (result->depth_ < depth) {
while (result->depth_ < stolen_task->depth_) {
result = result->next_;
}
return *result;
// Attach other resources we need to trade to it
auto *stolen_resources = stolen_task->attached_resources_.load(std::memory_order_relaxed);
auto *traded_resources = strain_local_resource::get_local_copy(stolen_resources, calling_state.get_thread_id());
result->attached_resources_.store(traded_resources, std::memory_order_relaxed);
return result;
}
void scheduler::terminate() {
......
#include "pls/internal/scheduling/strain_local_resource.h"
#include "pls/internal/base/error_handling.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls::internal::scheduling {
// Get item from locally owned items
strain_local_resource::item_handle strain_local_resource::get_item(unsigned depth) {
// Only change our resource usage when synced.
// TODO: maybe relax this requirement
scheduler::sync();
thread_state &my_state = thread_state::get();
auto *active_task = my_state.get_active_task();
// Find the item to use and acquire it from our local storage
auto &local_resource = local_items_[my_state.get_thread_id()][depth];
strain_resource *result = local_resource.resource_;
// Add it to the task used resources
result->next_.store(active_task->attached_resources_.load(std::memory_order_relaxed), std::memory_order_relaxed);
active_task->attached_resources_.store(result, std::memory_order_relaxed);
// Wrap it for RAII usage on stack
PLS_ASSERT(!result->used_, "Must not try to allocate an already used resource!");
result->used_ = true;
return strain_local_resource::item_handle{result};
}
// Return item to locally owned items
strain_local_resource::item_handle::~item_handle() {
// Only change our resource usage when synced.
// TODO: maybe relax this requirement
scheduler::sync();
thread_state &my_state = thread_state::get();
auto *active_task = my_state.get_active_task();
// Remove from task used resources
strain_resource *previous_resource = nullptr;
strain_resource *current_resource = active_task->attached_resources_.load(std::memory_order_relaxed);
while (current_resource != resource_) {
previous_resource = current_resource;
current_resource = current_resource->next_.load(std::memory_order_relaxed);
}
auto *next_resource = current_resource->next_.load(std::memory_order_relaxed);
if (previous_resource) {
previous_resource->next_.store(next_resource, std::memory_order_relaxed);
} else {
active_task->attached_resources_.store(next_resource, std::memory_order_relaxed);
}
// Give the resource handle back to our local resource array
auto &local_resource = resource_->strain_local_resource_->local_items_[my_state.get_thread_id()][resource_->depth_];
local_resource.resource_ = resource_;
PLS_ASSERT(resource_->used_, "Must only release used resources!");
resource_->used_ = false;
}
strain_resource *strain_local_resource::get_local_copy(strain_resource *other_resources, unsigned thread_id) {
strain_resource *result = nullptr;
while (other_resources != nullptr) {
local_item &local = other_resources->strain_local_resource_->local_items_[thread_id][other_resources->depth_];
local.resource_->next_.store(result, std::memory_order_relaxed);
result = local.resource_;
other_resources = other_resources->next_.load(std::memory_order_relaxed);
}
return result;
}
void strain_local_resource::acquire_locally(strain_resource *other_resources, unsigned thread_id) {
while (other_resources != nullptr) {
local_item &local = other_resources->strain_local_resource_->local_items_[thread_id][other_resources->depth_];
local.resource_ = other_resources;
other_resources = other_resources->next_.load(std::memory_order_relaxed);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment