From 6394b203ecf11222efdd414757ce86d808b9ea19 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Wed, 3 Jun 2020 14:53:58 +0200 Subject: [PATCH] Add prototype support for other, strain_local resources. The strain_local resources can be used to get unique IDs for resources that would, in a serial execution, only be used once in the call chain. This is a first prototype implementation and needs further tesitng. The current implementation adds linear overhead to each steal proportional to the number of used resources. We might be able to reduce it, but for a first version it is fine. --- app/benchmark_fib/main.cpp | 6 +++--- lib/pls/CMakeLists.txt | 5 +++-- lib/pls/include/pls/internal/scheduling/base_task.h | 6 +++++- lib/pls/include/pls/internal/scheduling/scheduler.h | 2 +- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 4 ++++ lib/pls/include/pls/internal/scheduling/strain_local_resource.h | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/src/internal/scheduling/lock_free/task_manager.cpp | 3 +-- lib/pls/src/internal/scheduling/scheduler.cpp | 23 ++++++++++++++++++----- lib/pls/src/internal/scheduling/strain_local_resource.cpp | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 199 insertions(+), 14 deletions(-) create mode 100644 lib/pls/include/pls/internal/scheduling/strain_local_resource.h create mode 100644 lib/pls/src/internal/scheduling/strain_local_resource.cpp diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 67492d3..44ea705 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -7,6 +7,9 @@ using namespace comparison_benchmarks::base; +constexpr int MAX_NUM_TASKS = 32; +constexpr int MAX_STACK_SIZE = 4096 * 1; + int pls_fib(int n) { if (n == 0) { return 0; @@ -27,9 +30,6 @@ int pls_fib(int n) { return a + b; } -constexpr int MAX_NUM_TASKS = 32; -constexpr int MAX_STACK_SIZE = 4096 * 1; - int main(int argc, char **argv) { int num_threads; string directory; diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 263b62d..4c42f70 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -39,6 +39,7 @@ add_library(pls STATIC include/pls/internal/scheduling/base_task.h src/internal/scheduling/base_task.cpp include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/task_manager.h + include/pls/internal/scheduling/strain_local_resource.h src/internal/scheduling/strain_local_resource.cpp include/pls/internal/scheduling/lock_free/task.h include/pls/internal/scheduling/lock_free/task_manager.h src/internal/scheduling/lock_free/task_manager.cpp @@ -46,8 +47,8 @@ add_library(pls STATIC include/pls/internal/scheduling/lock_free/traded_cas_field.h src/internal/scheduling/lock_free/task.cpp include/pls/internal/profiling/dag_node.h src/internal/profiling/dag_node.cpp - include/pls/internal/profiling/profiler.h - include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp src/internal/profiling/profiler.cpp) + include/pls/internal/profiling/profiler.h src/internal/profiling/profiler.cpp + include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp) # Dependencies for pls target_link_libraries(pls Threads::Threads) diff --git a/lib/pls/include/pls/internal/scheduling/base_task.h b/lib/pls/include/pls/internal/scheduling/base_task.h index 4f78fa9..8fab083 100644 --- a/lib/pls/include/pls/internal/scheduling/base_task.h +++ b/lib/pls/include/pls/internal/scheduling/base_task.h @@ -26,6 +26,7 @@ namespace pls::internal::scheduling { * This base_task can be extended by different trading/stealing implementations, * to add for example additional flags. The scheduler itself always works solely with this base version. */ +struct strain_resource; struct base_task { base_task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : depth_{depth}, @@ -34,7 +35,8 @@ struct base_task { stack_size_{stack_size}, is_synchronized_{false}, prev_{nullptr}, - next_{nullptr} {} + next_{nullptr}, + attached_resources_{nullptr} {} // Do not allow accidental copy/move operations. // The whole runtime relies on tasks never changing memory positions during execution. @@ -63,6 +65,8 @@ struct base_task { base_task *prev_; base_task *next_; + std::atomic attached_resources_; + #if PLS_PROFILING_ENABLED profiling::dag_node *profiling_node_; #endif diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 41b33cb..a427b58 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -104,7 +104,7 @@ class scheduler { void terminate(); [[nodiscard]] unsigned int num_threads() const { return num_threads_; } - [[nodiscard]] static base_task &task_chain_at(unsigned int depth, thread_state &calling_state); + [[nodiscard]] static base_task *get_trade_task(base_task *stolen_task, thread_state &calling_state); static bool check_task_chain_forward(base_task &start_task); static bool check_task_chain_backward(base_task &start_task); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 19cbd88..2f55d79 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -69,6 +69,7 @@ class scheduler::init_function_impl : public init_function { explicit init_function_impl(F &function) : function_{function} {} void run() override { base_task *root_task = thread_state::get().get_active_task(); + root_task->attached_resources_.store(nullptr, std::memory_order_relaxed); #if PLS_PROFILING_ENABLED thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), @@ -151,6 +152,9 @@ void scheduler::spawn_internal(Function &&lambda) { base_task *last_task = spawning_state.get_active_task(); base_task *spawned_task = last_task->next_; + auto *attached_resources = last_task->attached_resources_.load(std::memory_order_relaxed); + spawned_task->attached_resources_.store(attached_resources, std::memory_order_relaxed); + #if PLS_PROFILING_ENABLED spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(), spawned_task->stack_memory_, diff --git a/lib/pls/include/pls/internal/scheduling/strain_local_resource.h b/lib/pls/include/pls/internal/scheduling/strain_local_resource.h new file mode 100644 index 0000000..8d971d8 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/strain_local_resource.h @@ -0,0 +1,80 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_STACK_RESOURCE_H_ +#define PLS_INTERNAL_SCHEDULING_STACK_RESOURCE_H_ + +#include "pls/internal/base/system_details.h" + +#include +#include +#include + +namespace pls::internal::scheduling { + +class strain_local_resource; +struct PLS_CACHE_ALIGN strain_resource { + strain_resource(strain_local_resource *resource, unsigned index, unsigned depth) : + strain_local_resource_{resource}, index_{index}, depth_{depth} {} + + strain_resource(const strain_resource &) = delete; + strain_resource &operator=(const strain_resource &) = delete; + strain_resource(strain_resource &&) = delete; + strain_resource &operator=(strain_resource &&) = delete; + + strain_local_resource *const strain_local_resource_; + unsigned const index_; + unsigned const depth_; + + bool used_{false}; + std::atomic next_{nullptr}; +}; + +class strain_local_resource { + // Handle to a stack-like resource (index of it). + // MUST be handled in a RAII like manner on the stack + // and must not be copied or moved for this to work. + class item_handle { + public: + item_handle(const item_handle &) = delete; + item_handle &operator=(const item_handle &) = delete; + item_handle(item_handle &&) = delete; + item_handle &operator=(item_handle &&) = delete; + + explicit item_handle(strain_resource *resource) : resource_{resource} {} + ~item_handle(); + + private: + strain_resource *resource_; + }; + + struct local_item { + local_item(strain_local_resource *strain_local_resource, unsigned index, unsigned depth) : + initial_resource_{std::make_unique(strain_local_resource, index, depth)}, + resource_{initial_resource_.get()} {} + + std::unique_ptr initial_resource_; + strain_resource *resource_; + }; + public: + strain_local_resource(unsigned num_threads, + unsigned depth) : local_items_() { + local_items_.reserve(num_threads); + for (unsigned thread_id = 0; thread_id < num_threads; thread_id++) { + local_items_[thread_id].reserve(depth); + for (unsigned i = 0; i < depth; i++) { + // Start by each thread owning its local items + local_items_[thread_id].emplace_back(this, thread_id, i); + } + } + } + + item_handle get_item(unsigned depth); + static strain_resource *get_local_copy(strain_resource *other_resources, unsigned thread_id); + static void acquire_locally(strain_resource *other_resources, unsigned thread_id); + + private: + std::vector> local_items_; +}; + +} + +#endif //PLS_INTERNAL_SCHEDULING_STACK_RESOURCE_H_ diff --git a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp index d37aaff..7951d72 100644 --- a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp @@ -50,8 +50,7 @@ std::tuple task_manager::steal_task(thread_state if (peek.top_task_) { task *stolen_task = peek.top_task_; // get a suitable task to trade in - // TODO: opt. add debug marker to traded in tasks that we do not accidentally use them. - task *traded_task = static_cast(&scheduler::task_chain_at(stolen_task->depth_, stealing_state)); + task *traded_task = static_cast(scheduler::get_trade_task(stolen_task, stealing_state)); base_task *chain_after_stolen_task = traded_task->next_; // perform the actual pop operation diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index d5fc96b..8f896a5 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -2,6 +2,7 @@ #include "context_switcher/context_switcher.h" +#include "pls/internal/scheduling/strain_local_resource.h" #include "pls/internal/build_flavour.h" #include "pls/internal/base/futex_wrapper.h" #include "pls/internal/base/error_handling.h" @@ -85,6 +86,9 @@ void scheduler::work_thread_work_section() { stolen_task->next_ = chain_after_stolen_task; chain_after_stolen_task->prev_ = stolen_task; my_state.set_active_task(stolen_task); + // Keep locally owned resources consistent. + auto *stolen_resources = stolen_task->attached_resources_.load(std::memory_order_relaxed); + strain_local_resource::acquire_locally(stolen_resources, my_state.get_thread_id()); PLS_ASSERT(check_task_chain_forward(*my_state.get_active_task()), "We are sole owner of this chain, it has to be valid!"); @@ -223,6 +227,10 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat this_task->prev_ = clean_chain; clean_chain->next_ = this_task; + // Keep locally owned resources consistent. + auto *clean_resources = clean_chain->attached_resources_.load(std::memory_order_relaxed); + strain_local_resource::acquire_locally(clean_resources, calling_state.get_thread_id()); + base_task *active_task = clean_chain; while (active_task->depth_ > 0) { active_task = active_task->prev_; @@ -248,17 +256,22 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat } } -base_task &scheduler::task_chain_at(unsigned int depth, thread_state &calling_state) { - // TODO: possible optimize with cache array at steal events +base_task *scheduler::get_trade_task(base_task *stolen_task, thread_state &calling_state) { + // Get task itself base_task *result = calling_state.get_active_task(); - while (result->depth_ > depth) { + while (result->depth_ > stolen_task->depth_) { result = result->prev_; } - while (result->depth_ < depth) { + while (result->depth_ < stolen_task->depth_) { result = result->next_; } - return *result; + // Attach other resources we need to trade to it + auto *stolen_resources = stolen_task->attached_resources_.load(std::memory_order_relaxed); + auto *traded_resources = strain_local_resource::get_local_copy(stolen_resources, calling_state.get_thread_id()); + result->attached_resources_.store(traded_resources, std::memory_order_relaxed); + + return result; } void scheduler::terminate() { diff --git a/lib/pls/src/internal/scheduling/strain_local_resource.cpp b/lib/pls/src/internal/scheduling/strain_local_resource.cpp new file mode 100644 index 0000000..e1efec3 --- /dev/null +++ b/lib/pls/src/internal/scheduling/strain_local_resource.cpp @@ -0,0 +1,84 @@ +#include "pls/internal/scheduling/strain_local_resource.h" + +#include "pls/internal/base/error_handling.h" +#include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/scheduler.h" + +namespace pls::internal::scheduling { + +// Get item from locally owned items +strain_local_resource::item_handle strain_local_resource::get_item(unsigned depth) { + // Only change our resource usage when synced. + // TODO: maybe relax this requirement + scheduler::sync(); + thread_state &my_state = thread_state::get(); + auto *active_task = my_state.get_active_task(); + + // Find the item to use and acquire it from our local storage + auto &local_resource = local_items_[my_state.get_thread_id()][depth]; + strain_resource *result = local_resource.resource_; + + // Add it to the task used resources + result->next_.store(active_task->attached_resources_.load(std::memory_order_relaxed), std::memory_order_relaxed); + active_task->attached_resources_.store(result, std::memory_order_relaxed); + + // Wrap it for RAII usage on stack + PLS_ASSERT(!result->used_, "Must not try to allocate an already used resource!"); + result->used_ = true; + return strain_local_resource::item_handle{result}; +} + +// Return item to locally owned items +strain_local_resource::item_handle::~item_handle() { + // Only change our resource usage when synced. + // TODO: maybe relax this requirement + scheduler::sync(); + thread_state &my_state = thread_state::get(); + auto *active_task = my_state.get_active_task(); + + // Remove from task used resources + strain_resource *previous_resource = nullptr; + strain_resource *current_resource = active_task->attached_resources_.load(std::memory_order_relaxed); + while (current_resource != resource_) { + previous_resource = current_resource; + current_resource = current_resource->next_.load(std::memory_order_relaxed); + } + + auto *next_resource = current_resource->next_.load(std::memory_order_relaxed); + if (previous_resource) { + previous_resource->next_.store(next_resource, std::memory_order_relaxed); + } else { + active_task->attached_resources_.store(next_resource, std::memory_order_relaxed); + } + + // Give the resource handle back to our local resource array + auto &local_resource = resource_->strain_local_resource_->local_items_[my_state.get_thread_id()][resource_->depth_]; + local_resource.resource_ = resource_; + PLS_ASSERT(resource_->used_, "Must only release used resources!"); + resource_->used_ = false; +} + +strain_resource *strain_local_resource::get_local_copy(strain_resource *other_resources, unsigned thread_id) { + strain_resource *result = nullptr; + + while (other_resources != nullptr) { + local_item &local = other_resources->strain_local_resource_->local_items_[thread_id][other_resources->depth_]; + local.resource_->next_.store(result, std::memory_order_relaxed); + result = local.resource_; + + other_resources = other_resources->next_.load(std::memory_order_relaxed); + } + + return result; +} + +void strain_local_resource::acquire_locally(strain_resource *other_resources, unsigned thread_id) { + while (other_resources != nullptr) { + local_item &local = other_resources->strain_local_resource_->local_items_[thread_id][other_resources->depth_]; + local.resource_ = other_resources; + + other_resources = other_resources->next_.load(std::memory_order_relaxed); + } +} + +} -- libgit2 0.26.0