diff --git a/app/benchmark_fib/main.cpp b/app/benchmark_fib/main.cpp index 67492d3..44ea705 100644 --- a/app/benchmark_fib/main.cpp +++ b/app/benchmark_fib/main.cpp @@ -7,6 +7,9 @@ using namespace comparison_benchmarks::base; +constexpr int MAX_NUM_TASKS = 32; +constexpr int MAX_STACK_SIZE = 4096 * 1; + int pls_fib(int n) { if (n == 0) { return 0; @@ -27,9 +30,6 @@ int pls_fib(int n) { return a + b; } -constexpr int MAX_NUM_TASKS = 32; -constexpr int MAX_STACK_SIZE = 4096 * 1; - int main(int argc, char **argv) { int num_threads; string directory; diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 263b62d..4c42f70 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -39,6 +39,7 @@ add_library(pls STATIC include/pls/internal/scheduling/base_task.h src/internal/scheduling/base_task.cpp include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/task_manager.h + include/pls/internal/scheduling/strain_local_resource.h src/internal/scheduling/strain_local_resource.cpp include/pls/internal/scheduling/lock_free/task.h include/pls/internal/scheduling/lock_free/task_manager.h src/internal/scheduling/lock_free/task_manager.cpp @@ -46,8 +47,8 @@ add_library(pls STATIC include/pls/internal/scheduling/lock_free/traded_cas_field.h src/internal/scheduling/lock_free/task.cpp include/pls/internal/profiling/dag_node.h src/internal/profiling/dag_node.cpp - include/pls/internal/profiling/profiler.h - include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp src/internal/profiling/profiler.cpp) + include/pls/internal/profiling/profiler.h src/internal/profiling/profiler.cpp + include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp) # Dependencies for pls target_link_libraries(pls Threads::Threads) diff --git a/lib/pls/include/pls/internal/scheduling/base_task.h b/lib/pls/include/pls/internal/scheduling/base_task.h index 4f78fa9..8fab083 100644 --- a/lib/pls/include/pls/internal/scheduling/base_task.h +++ b/lib/pls/include/pls/internal/scheduling/base_task.h @@ -26,6 +26,7 @@ namespace pls::internal::scheduling { * This base_task can be extended by different trading/stealing implementations, * to add for example additional flags. The scheduler itself always works solely with this base version. */ +struct strain_resource; struct base_task { base_task(char *stack_memory, size_t stack_size, unsigned depth, unsigned thread_id) : depth_{depth}, @@ -34,7 +35,8 @@ struct base_task { stack_size_{stack_size}, is_synchronized_{false}, prev_{nullptr}, - next_{nullptr} {} + next_{nullptr}, + attached_resources_{nullptr} {} // Do not allow accidental copy/move operations. // The whole runtime relies on tasks never changing memory positions during execution. @@ -63,6 +65,8 @@ struct base_task { base_task *prev_; base_task *next_; + std::atomic attached_resources_; + #if PLS_PROFILING_ENABLED profiling::dag_node *profiling_node_; #endif diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 41b33cb..a427b58 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -104,7 +104,7 @@ class scheduler { void terminate(); [[nodiscard]] unsigned int num_threads() const { return num_threads_; } - [[nodiscard]] static base_task &task_chain_at(unsigned int depth, thread_state &calling_state); + [[nodiscard]] static base_task *get_trade_task(base_task *stolen_task, thread_state &calling_state); static bool check_task_chain_forward(base_task &start_task); static bool check_task_chain_backward(base_task &start_task); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 19cbd88..2f55d79 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -69,6 +69,7 @@ class scheduler::init_function_impl : public init_function { explicit init_function_impl(F &function) : function_{function} {} void run() override { base_task *root_task = thread_state::get().get_active_task(); + root_task->attached_resources_.store(nullptr, std::memory_order_relaxed); #if PLS_PROFILING_ENABLED thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(), @@ -151,6 +152,9 @@ void scheduler::spawn_internal(Function &&lambda) { base_task *last_task = spawning_state.get_active_task(); base_task *spawned_task = last_task->next_; + auto *attached_resources = last_task->attached_resources_.load(std::memory_order_relaxed); + spawned_task->attached_resources_.store(attached_resources, std::memory_order_relaxed); + #if PLS_PROFILING_ENABLED spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(), spawned_task->stack_memory_, diff --git a/lib/pls/include/pls/internal/scheduling/strain_local_resource.h b/lib/pls/include/pls/internal/scheduling/strain_local_resource.h new file mode 100644 index 0000000..8d971d8 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/strain_local_resource.h @@ -0,0 +1,80 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_STACK_RESOURCE_H_ +#define PLS_INTERNAL_SCHEDULING_STACK_RESOURCE_H_ + +#include "pls/internal/base/system_details.h" + +#include +#include +#include + +namespace pls::internal::scheduling { + +class strain_local_resource; +struct PLS_CACHE_ALIGN strain_resource { + strain_resource(strain_local_resource *resource, unsigned index, unsigned depth) : + strain_local_resource_{resource}, index_{index}, depth_{depth} {} + + strain_resource(const strain_resource &) = delete; + strain_resource &operator=(const strain_resource &) = delete; + strain_resource(strain_resource &&) = delete; + strain_resource &operator=(strain_resource &&) = delete; + + strain_local_resource *const strain_local_resource_; + unsigned const index_; + unsigned const depth_; + + bool used_{false}; + std::atomic next_{nullptr}; +}; + +class strain_local_resource { + // Handle to a stack-like resource (index of it). + // MUST be handled in a RAII like manner on the stack + // and must not be copied or moved for this to work. + class item_handle { + public: + item_handle(const item_handle &) = delete; + item_handle &operator=(const item_handle &) = delete; + item_handle(item_handle &&) = delete; + item_handle &operator=(item_handle &&) = delete; + + explicit item_handle(strain_resource *resource) : resource_{resource} {} + ~item_handle(); + + private: + strain_resource *resource_; + }; + + struct local_item { + local_item(strain_local_resource *strain_local_resource, unsigned index, unsigned depth) : + initial_resource_{std::make_unique(strain_local_resource, index, depth)}, + resource_{initial_resource_.get()} {} + + std::unique_ptr initial_resource_; + strain_resource *resource_; + }; + public: + strain_local_resource(unsigned num_threads, + unsigned depth) : local_items_() { + local_items_.reserve(num_threads); + for (unsigned thread_id = 0; thread_id < num_threads; thread_id++) { + local_items_[thread_id].reserve(depth); + for (unsigned i = 0; i < depth; i++) { + // Start by each thread owning its local items + local_items_[thread_id].emplace_back(this, thread_id, i); + } + } + } + + item_handle get_item(unsigned depth); + static strain_resource *get_local_copy(strain_resource *other_resources, unsigned thread_id); + static void acquire_locally(strain_resource *other_resources, unsigned thread_id); + + private: + std::vector> local_items_; +}; + +} + +#endif //PLS_INTERNAL_SCHEDULING_STACK_RESOURCE_H_ diff --git a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp index d37aaff..7951d72 100644 --- a/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp +++ b/lib/pls/src/internal/scheduling/lock_free/task_manager.cpp @@ -50,8 +50,7 @@ std::tuple task_manager::steal_task(thread_state if (peek.top_task_) { task *stolen_task = peek.top_task_; // get a suitable task to trade in - // TODO: opt. add debug marker to traded in tasks that we do not accidentally use them. - task *traded_task = static_cast(&scheduler::task_chain_at(stolen_task->depth_, stealing_state)); + task *traded_task = static_cast(scheduler::get_trade_task(stolen_task, stealing_state)); base_task *chain_after_stolen_task = traded_task->next_; // perform the actual pop operation diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index d5fc96b..8f896a5 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -2,6 +2,7 @@ #include "context_switcher/context_switcher.h" +#include "pls/internal/scheduling/strain_local_resource.h" #include "pls/internal/build_flavour.h" #include "pls/internal/base/futex_wrapper.h" #include "pls/internal/base/error_handling.h" @@ -85,6 +86,9 @@ void scheduler::work_thread_work_section() { stolen_task->next_ = chain_after_stolen_task; chain_after_stolen_task->prev_ = stolen_task; my_state.set_active_task(stolen_task); + // Keep locally owned resources consistent. + auto *stolen_resources = stolen_task->attached_resources_.load(std::memory_order_relaxed); + strain_local_resource::acquire_locally(stolen_resources, my_state.get_thread_id()); PLS_ASSERT(check_task_chain_forward(*my_state.get_active_task()), "We are sole owner of this chain, it has to be valid!"); @@ -223,6 +227,10 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat this_task->prev_ = clean_chain; clean_chain->next_ = this_task; + // Keep locally owned resources consistent. + auto *clean_resources = clean_chain->attached_resources_.load(std::memory_order_relaxed); + strain_local_resource::acquire_locally(clean_resources, calling_state.get_thread_id()); + base_task *active_task = clean_chain; while (active_task->depth_ > 0) { active_task = active_task->prev_; @@ -248,17 +256,22 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat } } -base_task &scheduler::task_chain_at(unsigned int depth, thread_state &calling_state) { - // TODO: possible optimize with cache array at steal events +base_task *scheduler::get_trade_task(base_task *stolen_task, thread_state &calling_state) { + // Get task itself base_task *result = calling_state.get_active_task(); - while (result->depth_ > depth) { + while (result->depth_ > stolen_task->depth_) { result = result->prev_; } - while (result->depth_ < depth) { + while (result->depth_ < stolen_task->depth_) { result = result->next_; } - return *result; + // Attach other resources we need to trade to it + auto *stolen_resources = stolen_task->attached_resources_.load(std::memory_order_relaxed); + auto *traded_resources = strain_local_resource::get_local_copy(stolen_resources, calling_state.get_thread_id()); + result->attached_resources_.store(traded_resources, std::memory_order_relaxed); + + return result; } void scheduler::terminate() { diff --git a/lib/pls/src/internal/scheduling/strain_local_resource.cpp b/lib/pls/src/internal/scheduling/strain_local_resource.cpp new file mode 100644 index 0000000..e1efec3 --- /dev/null +++ b/lib/pls/src/internal/scheduling/strain_local_resource.cpp @@ -0,0 +1,84 @@ +#include "pls/internal/scheduling/strain_local_resource.h" + +#include "pls/internal/base/error_handling.h" +#include "pls/internal/scheduling/thread_state.h" +#include "pls/internal/scheduling/scheduler.h" + +namespace pls::internal::scheduling { + +// Get item from locally owned items +strain_local_resource::item_handle strain_local_resource::get_item(unsigned depth) { + // Only change our resource usage when synced. + // TODO: maybe relax this requirement + scheduler::sync(); + thread_state &my_state = thread_state::get(); + auto *active_task = my_state.get_active_task(); + + // Find the item to use and acquire it from our local storage + auto &local_resource = local_items_[my_state.get_thread_id()][depth]; + strain_resource *result = local_resource.resource_; + + // Add it to the task used resources + result->next_.store(active_task->attached_resources_.load(std::memory_order_relaxed), std::memory_order_relaxed); + active_task->attached_resources_.store(result, std::memory_order_relaxed); + + // Wrap it for RAII usage on stack + PLS_ASSERT(!result->used_, "Must not try to allocate an already used resource!"); + result->used_ = true; + return strain_local_resource::item_handle{result}; +} + +// Return item to locally owned items +strain_local_resource::item_handle::~item_handle() { + // Only change our resource usage when synced. + // TODO: maybe relax this requirement + scheduler::sync(); + thread_state &my_state = thread_state::get(); + auto *active_task = my_state.get_active_task(); + + // Remove from task used resources + strain_resource *previous_resource = nullptr; + strain_resource *current_resource = active_task->attached_resources_.load(std::memory_order_relaxed); + while (current_resource != resource_) { + previous_resource = current_resource; + current_resource = current_resource->next_.load(std::memory_order_relaxed); + } + + auto *next_resource = current_resource->next_.load(std::memory_order_relaxed); + if (previous_resource) { + previous_resource->next_.store(next_resource, std::memory_order_relaxed); + } else { + active_task->attached_resources_.store(next_resource, std::memory_order_relaxed); + } + + // Give the resource handle back to our local resource array + auto &local_resource = resource_->strain_local_resource_->local_items_[my_state.get_thread_id()][resource_->depth_]; + local_resource.resource_ = resource_; + PLS_ASSERT(resource_->used_, "Must only release used resources!"); + resource_->used_ = false; +} + +strain_resource *strain_local_resource::get_local_copy(strain_resource *other_resources, unsigned thread_id) { + strain_resource *result = nullptr; + + while (other_resources != nullptr) { + local_item &local = other_resources->strain_local_resource_->local_items_[thread_id][other_resources->depth_]; + local.resource_->next_.store(result, std::memory_order_relaxed); + result = local.resource_; + + other_resources = other_resources->next_.load(std::memory_order_relaxed); + } + + return result; +} + +void strain_local_resource::acquire_locally(strain_resource *other_resources, unsigned thread_id) { + while (other_resources != nullptr) { + local_item &local = other_resources->strain_local_resource_->local_items_[thread_id][other_resources->depth_]; + local.resource_ = other_resources; + + other_resources = other_resources->next_.load(std::memory_order_relaxed); + } +} + +}