diff --git a/app/playground/CMakeLists.txt b/app/playground/CMakeLists.txt index c1b57ee..3ea2ce1 100644 --- a/app/playground/CMakeLists.txt +++ b/app/playground/CMakeLists.txt @@ -1,4 +1,4 @@ add_executable(playground main.cpp) # Example for adding the library to your app (as a cmake project dependency) -target_link_libraries(playground pls) +target_link_libraries(playground pls) diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 03b3bbf..eb815c7 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -1,26 +1,77 @@ -// Headers are available because we added the pls target -const long NUM_THREADS = 8; -const long MEMORY_PER_THREAD = 2u << 12u; +#include +#include -#include "pls/pls.h" +#include "pls/internal/scheduling/scheduler.h" +#include "pls/internal/scheduling/parallel_result.h" +#include "pls/internal/scheduling/scheduler_memory.h" -pls::static_scheduler_memory memory; +using namespace pls::internal; + +constexpr size_t NUM_THREADS = 1; + +constexpr size_t NUM_TASKS = 64; +constexpr size_t MAX_TASK_STACK_SIZE = 0; + +constexpr size_t NUM_CONTS = 64; +constexpr size_t MAX_CONT_SIZE = 128; + +scheduling::parallel_result fib(int n) { + if (n == 0) { + return 0; + } + if (n == 1) { + return 1; + } + + return scheduling::scheduler::par([=]() { + return fib(n - 1); + }, [=]() { + return fib(n - 2); + }).then([](int a, int b) { + return a + b; + }); +} + +int fib_normal(int n) { + if (n == 0) { + return 0; + } + if (n == 1) { + return 1; + } + + return fib_normal(n - 1) + fib_normal(n - 2); +} int main() { - pls::scheduler scheduler{&memory, NUM_THREADS}; + scheduling::static_scheduler_memory static_scheduler_memory; - scheduler.perform_work([]() { - auto lambda = []() { - // Do work - }; - using lambda_task = pls::lambda_task_by_value; + scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS}; - pls::scheduler::spawn_child(lambda); - pls::scheduler::spawn_child(lambda); + auto start = std::chrono::steady_clock::now(); + std::cout << "fib = " << fib_normal(41) << std::endl; + auto end = std::chrono::steady_clock::now(); + std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() + << std::endl; - pls::scheduler::wait_for_all(); + start = std::chrono::steady_clock::now(); + + scheduler.perform_work([]() { + return scheduling::scheduler::par([]() { + return fib(41); + }, []() { + return scheduling::parallel_result{0}; + }).then([](int a, int b) { + std::cout << "fib = " << a << std::endl; + }); }); - scheduler.terminate(); + end = std::chrono::steady_clock::now(); + std::cout << "Framework: " << std::chrono::duration_cast(end - start).count() << std::endl; + return 0; } diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 4b70932..7c6176b 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -1,29 +1,29 @@ # List all required files here (cmake best practice to NOT automate this step!) add_library(pls STATIC - include/pls/pls.h src/pls.cpp - - include/pls/algorithms/invoke.h - include/pls/algorithms/invoke_impl.h - include/pls/algorithms/for_each.h - include/pls/algorithms/for_each_impl.h - include/pls/algorithms/scan.h - include/pls/algorithms/scan_impl.h - - include/pls/dataflow/dataflow.h - include/pls/dataflow/internal/inputs.h - include/pls/dataflow/internal/outputs.h - include/pls/dataflow/internal/token.h - include/pls/dataflow/internal/in_port.h - include/pls/dataflow/internal/out_port.h - include/pls/dataflow/internal/function_node.h - include/pls/dataflow/internal/node.h - include/pls/dataflow/internal/graph.h - include/pls/dataflow/internal/build_state.h - include/pls/dataflow/internal/function_node_impl.h - include/pls/dataflow/internal/graph_impl.h - include/pls/dataflow/internal/switch_node.h - include/pls/dataflow/internal/merge_node.h - include/pls/dataflow/internal/split_node.h +# include/pls/pls.h src/pls.cpp + # + # include/pls/algorithms/invoke.h + # include/pls/algorithms/invoke_impl.h + # include/pls/algorithms/for_each.h + # include/pls/algorithms/for_each_impl.h + # include/pls/algorithms/scan.h + # include/pls/algorithms/scan_impl.h + # + # include/pls/dataflow/dataflow.h + # include/pls/dataflow/internal/inputs.h + # include/pls/dataflow/internal/outputs.h + # include/pls/dataflow/internal/token.h + # include/pls/dataflow/internal/in_port.h + # include/pls/dataflow/internal/out_port.h + # include/pls/dataflow/internal/function_node.h + # include/pls/dataflow/internal/node.h + # include/pls/dataflow/internal/graph.h + # include/pls/dataflow/internal/build_state.h + # include/pls/dataflow/internal/function_node_impl.h + # include/pls/dataflow/internal/graph_impl.h + # include/pls/dataflow/internal/switch_node.h + # include/pls/dataflow/internal/merge_node.h + # include/pls/dataflow/internal/split_node.h include/pls/internal/base/spin_lock.h include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp @@ -34,7 +34,7 @@ add_library(pls STATIC include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/system_details.h include/pls/internal/base/error_handling.h - include/pls/internal/base/alignment.h include/pls/internal/base/alignment_impl.h + include/pls/internal/base/alignment.h include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h @@ -56,7 +56,7 @@ add_library(pls STATIC include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/cont_manager.h - include/pls/internal/scheduling/continuation.h) + include/pls/internal/scheduling/continuation.h include/pls/internal/scheduling/parallel_result.h src/internal/base/alignment.cpp) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index 71aa7a0..26c7762 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -13,8 +13,8 @@ namespace internal { namespace base { namespace alignment { -constexpr system_details::pointer_t next_alignment(system_details::pointer_t size); -constexpr system_details::pointer_t previous_alignment(system_details::pointer_t size); +system_details::pointer_t next_alignment(system_details::pointer_t size); +system_details::pointer_t previous_alignment(system_details::pointer_t size); char *next_alignment(char *pointer); /** @@ -43,9 +43,6 @@ struct alignas(system_details::CACHE_LINE_SIZE) cache_alignment_wrapper { T *pointer() { return reinterpret_cast(data_); } }; -void *allocate_aligned(size_t size); - -#include "alignment_impl.h" } } } diff --git a/lib/pls/include/pls/internal/base/alignment_impl.h b/lib/pls/include/pls/internal/base/alignment_impl.h deleted file mode 100644 index a734a40..0000000 --- a/lib/pls/include/pls/internal/base/alignment_impl.h +++ /dev/null @@ -1,35 +0,0 @@ - -#ifndef PLS_INTERNAL_BASE_ALIGNMENT_IMPL_H_ -#define PLS_INTERNAL_BASE_ALIGNMENT_IMPL_H_ - -namespace pls { -namespace internal { -namespace base { -namespace alignment { - -void *allocate_aligned(size_t size) { - return aligned_alloc(system_details::CACHE_LINE_SIZE, size); -} - -constexpr system_details::pointer_t next_alignment(system_details::pointer_t size) { - return (size % system_details::CACHE_LINE_SIZE) == 0 ? - size : - size + (system_details::CACHE_LINE_SIZE - (size % system_details::CACHE_LINE_SIZE)); -} - -constexpr system_details::pointer_t previous_alignment(system_details::pointer_t size) { - return (size % system_details::CACHE_LINE_SIZE) == 0 ? - size : - size - (size % system_details::CACHE_LINE_SIZE); -} - -char *next_alignment(char *pointer) { - return reinterpret_cast(next_alignment(reinterpret_cast(pointer))); -} - -} -} -} -} - -#endif //PLS_INTERNAL_BASE_ALIGNMENT_IMPL_H_ diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h index b4de92c..cbbadbc 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h @@ -41,13 +41,13 @@ class aligned_stack { template T *push(ARGS &&... args); template - void *push_bytes(); - void *push_bytes(size_t size); + char *push_bytes(); + char *push_bytes(size_t size); template void pop(); - void *memory_at_offset(stack_offset offset) const; + char *memory_at_offset(stack_offset offset) const; stack_offset save_offset() const { return current_offset_; } void reset_offset(stack_offset new_offset) { current_offset_ = new_offset; } @@ -74,14 +74,21 @@ class static_aligned_stack { class heap_aligned_stack { public: - explicit heap_aligned_stack(size_t size); - ~heap_aligned_stack(); + explicit heap_aligned_stack(size_t size) : + unaligned_memory_size_{base::alignment::next_alignment(size)}, + unaligned_memory_pointer_{new char[unaligned_memory_size_]}, + aligned_stack_{unaligned_memory_pointer_, size, unaligned_memory_size_} {} + + ~heap_aligned_stack() { + delete[] unaligned_memory_pointer_; + } aligned_stack &get_stack() { return aligned_stack_; } private: - aligned_stack aligned_stack_; + size_t unaligned_memory_size_; char *unaligned_memory_pointer_; + aligned_stack aligned_stack_; }; } diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h index e5de0d6..41f6bf1 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack_impl.h @@ -14,7 +14,7 @@ T *aligned_stack::push(ARGS &&... args) { } template -void *aligned_stack::push_bytes() { +char *aligned_stack::push_bytes() { return push_bytes(sizeof(T)); } @@ -28,15 +28,7 @@ void aligned_stack::pop() { } template -static_aligned_stack::static_aligned_stack(): memory_{}, aligned_stack_{memory_.data()} {}; - -heap_aligned_stack::heap_aligned_stack(size_t size) : - unaligned_memory_pointer_{new char[base::alignment::next_alignment(size)]}, - aligned_stack_{unaligned_memory_pointer_, size, base::alignment::next_alignment(size)} {} - -heap_aligned_stack::~heap_aligned_stack() { - delete[] unaligned_memory_pointer_; -} +static_aligned_stack::static_aligned_stack(): memory_{}, aligned_stack_{memory_.data(), SIZE} {}; } } diff --git a/lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h b/lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h index d02f94e..ac31ea0 100644 --- a/lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h +++ b/lib/pls/include/pls/internal/data_structures/delayed_initialization_wrapper.h @@ -22,14 +22,22 @@ template class delayed_initialization_wrapper { public: delayed_initialization_wrapper() : memory_{}, initialized_{false} {} + delayed_initialization_wrapper(delayed_initialization_wrapper &&other) noexcept { + initialized_ = other.initialized_; + if (other.initialized_) { + object() = std::move(other.object()); + other.initialized_ = false; + } + } + template explicit delayed_initialization_wrapper(ARGS &&...args): memory_{}, initialized_{true} { - new(memory_) T(std::forward(args)...); + new(memory_.data()) T(std::forward(args)...); } ~delayed_initialization_wrapper() { if (initialized_) { - memory_->~T(); + object().~T(); } } @@ -37,22 +45,24 @@ class delayed_initialization_wrapper { void initialize(ARGS &&...args) { PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!") - new(memory_) T(std::forward(args)...); + new(memory_.data()) T(std::forward(args)...); initialized_ = true; } void destroy() { PLS_ASSERT(initialized_, "Can only destroy initialized objects!") - memory_->~T(); + object().~T(); initialized_ = false; } T &object() { PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!") - return *reinterpret_cast(memory_); + return *reinterpret_cast(memory_.data()); } + bool initialized() const { return initialized_; } + private: std::array memory_; bool initialized_; diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h index b7f33f2..dbc787d 100644 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ b/lib/pls/include/pls/internal/scheduling/cont_manager.h @@ -15,8 +15,13 @@ namespace scheduling { class cont_manager { public: + // Helper to pass the compile time constants to the constructor. template - explicit cont_manager(data_structures::aligned_stack &cont_storage) { + struct template_args {}; + + template + explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args) { + // First node is currently active and our local start start_node_ = active_node_ = init_cont_node(cont_storage, nullptr, nullptr); @@ -24,7 +29,8 @@ class cont_manager { continuation_node *current_node = start_node_; for (size_t i = 1; i < NUM_CONTS; i++) { continuation_node *next_node = init_cont_node(cont_storage, start_node_, current_node); - current_node->set_prev(next_node); + current_node->set_next(next_node); + current_node = next_node; } }; @@ -42,8 +48,8 @@ class cont_manager { continuation_node *cont_chain_start, continuation_node *prev) { // Represents one cont node and its corresponding memory buffer (as one continuous block of memory). - using cont_node_memory_pair = std::pair>; + constexpr size_t buffer_size = MAX_CONT_SIZE - sizeof(continuation_node); + using cont_node_memory_pair = std::pair>; char *pair_memory = cont_storage.push_bytes(); char *cont_node_address = pair_memory; char *cont_node_memory_address = pair_memory + sizeof(continuation_node); @@ -59,7 +65,9 @@ class cont_manager { template class static_cont_manager { public: - static_cont_manager() : static_cont_storage_{}, cont_manager_{NUM_CONTS, MAX_CONT_SIZE, static_cont_storage_} {} + static_cont_manager() + : static_cont_storage_{}, + cont_manager_(static_cont_storage_.get_stack(), cont_manager::template_args{}) {} cont_manager &get_cont_manager() { return cont_manager_; } private: diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 32e2a63..33081d0 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -7,8 +7,6 @@ #include "pls/internal/helpers/profiler.h" -#include "pls/internal/data_structures/aligned_stack.h" - #include "pls/internal/base/thread.h" #include "pls/internal/base/barrier.h" @@ -28,17 +26,6 @@ namespace scheduling { * It works in close rellation with the 'task' class for scheduling. */ class scheduler { - friend class task; - const unsigned int num_threads_; - const bool reuse_thread_; - scheduler_memory *memory_; - - base::barrier sync_barrier_; - - task *main_thread_root_task_; - std::atomic work_section_done_; - - bool terminated_; public: /** * Initializes a scheduler instance with the given number of threads. @@ -48,7 +35,7 @@ class scheduler { * @param memory All memory is allocated statically, thus the user is required to provide the memory instance. * @param num_threads The number of worker threads to be created. */ - explicit scheduler(scheduler_memory *memory, unsigned int num_threads, bool reuse_thread = true); + explicit scheduler(scheduler_memory &memory, unsigned int num_threads, bool reuse_thread = true); /** * The scheduler is implicitly terminated as soon as it leaves the scope. @@ -72,41 +59,39 @@ class scheduler { void terminate(); /** - * Helper to spawn a child on the currently running task. - * - * @tparam T type of the new task - * @tparam ARGS Constructor argument types - * @param args constructor arguments + * Temporary object used for the parallel(...).then(...) API. */ - template - static void spawn_child(ARGS &&... args); + template + struct starter; - /** - * Helper to spawn a child on the currently running task and waiting for it (skipping over the task-deque). - * - * @tparam T type of the new task - * @tparam ARGS Constructor argument types - * @param args constructor arguments - */ - template - static void spawn_child_and_wait(ARGS &&... args); + template + starter invoke_parallel(F1 &&function_1, F2 &&function_2); - /** - * Helper to wait for all children of the currently executing task. - */ - static void wait_for_all(); + template + static starter par(F1 &&function_1, F2 &&function_2); unsigned int num_threads() const { return num_threads_; } private: - static void worker_routine(); - thread_state *thread_state_for(size_t id); + static void work_thread_main_loop(); + void work_thread_work_section(); + thread_state &thread_state_for(size_t id); - task *get_local_task(); - task *steal_task(); + friend class base_task; + const unsigned int num_threads_; + const bool reuse_thread_; + scheduler_memory &memory_; - bool try_execute_local(); - bool try_execute_stolen(); + base::barrier sync_barrier_; + + class init_function; + template + class init_function_impl; + + init_function *main_thread_starter_function_; + std::atomic work_section_done_; + + bool terminated_; }; } diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 386a32b..664adc0 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -2,36 +2,105 @@ #ifndef PLS_SCHEDULER_IMPL_H #define PLS_SCHEDULER_IMPL_H -#include "pls/internal/scheduling/lambda_task.h" +#include +#include "pls/internal/scheduling/continuation.h" +#include "pls/internal/scheduling/parallel_result.h" +#include "pls/internal/scheduling/task.h" namespace pls { namespace internal { namespace scheduling { -// TODO: generally look into the performance implications of using many thread_state::get() calls +template +struct scheduler::starter { + F1 function_1_; + F2 function_2_; + using return_type_1 = decltype(function_1_()); + using return_type_2 = decltype(function_2_()); + + template + explicit starter(F1ARG &&function_1, F2ARG &&function_2) : function_1_{std::forward(function_1)}, + function_2_{std::forward(function_2)} {}; + + template + auto then(F &&cont_function) + -> decltype(cont_function(std::declval(), + std::declval())) { + using continuation_type = continuation; + auto &my_state = thread_state::get(); + + // Select current continuation + auto *current_cont = my_state.get_cont_manager().fast_path_get_next(); + + // Move function_2 directly onto the task stack (should in theory be constructed on there). + // Consider only copying it later on, as this could allow a more efficient fast path with inlining. + task task_2{function_2_, current_cont}; + my_state.get_task_manager().publish_task(task_2); + + // Call first function + auto result_1 = function_1_(); + // Try to pop from stack + if (my_state.get_task_manager().steal_local_task()) { + my_state.get_cont_manager().fast_path_return(); + auto result_2 = function_2_(); + return cont_function(result_1.value(), result_2.value()); + } else { + PLS_ERROR("Slow Path Not Implemented!!!") + } + }; +}; + +template +scheduler::starter scheduler::invoke_parallel(F1 &&function_1, F2 &&function_2) { + return scheduler::starter{std::forward(function_1), std::forward(function_2)}; +} + +template +scheduler::starter scheduler::par(F1 &&function_1, F2 &&function_2) { + return thread_state::get().get_scheduler().invoke_parallel(std::forward(function_1), + std::forward(function_2)); +} + +class scheduler::init_function { + public: + virtual parallel_result run() = 0; +}; +template +class scheduler::init_function_impl : public init_function { + public: + explicit init_function_impl(F &function) : function_{function} {} + parallel_result run() override { + return scheduler::par([]() { + // No-op + return parallel_result{0}; + }, [=]() { + function_(); + return parallel_result{0}; + }).then([](const int &, const int &) { + // Notify that work is done after finishing the last user continuation. + thread_state::get().get_scheduler().work_section_done_ = true; + return parallel_result{0}; + }); + } + private: + F &function_; +}; template void scheduler::perform_work(Function work_section) { PROFILE_WORK_BLOCK("scheduler::perform_work") // Prepare main root task - lambda_task_by_reference root_task{work_section}; - main_thread_root_task_ = &root_task; - work_section_done_ = false; + init_function_impl starter_function{work_section}; + main_thread_starter_function_ = &starter_function; + work_section_done_ = false; if (reuse_thread_) { - // TODO: See if we should change thread-states to not make our state override the current thread state - auto my_state = memory_->thread_state_for(0); - base::this_thread::set_state(my_state); // Make THIS THREAD become the main worker + auto &my_state = memory_.thread_state_for(0); + base::this_thread::set_state(&my_state); // Make THIS THREAD become the main worker sync_barrier_.wait(); // Trigger threads to wake up - - // Do work (see if we can remove this duplicated code) - root_task.parent_ = nullptr; - root_task.deque_offset_ = my_state->deque_.save_offset(); - root_task.execute(); - work_section_done_ = true; - + work_thread_work_section(); // Simply also perform the work section on the main loop sync_barrier_.wait(); // Wait for threads to finish } else { // Simply trigger the others to do the work, this thread will sleep/wait for the time being @@ -40,16 +109,6 @@ void scheduler::perform_work(Function work_section) { } } -template -void scheduler::spawn_child(ARGS &&... args) { - thread_state::get()->current_task_->spawn_child(std::forward(args)...); -} - -template -void scheduler::spawn_child_and_wait(ARGS &&... args) { - thread_state::get()->current_task_->spawn_child_and_wait(std::forward(args)...); -} - } } } diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h index f7e88d4..9fa177a 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h @@ -23,8 +23,8 @@ class scheduler_memory { // this should not add too much overhead. public: virtual size_t max_threads() const = 0; - virtual base::thread &thread_for(size_t id) const = 0; - virtual thread_state &thread_state_for(size_t id) const = 0; + virtual base::thread &thread_for(size_t id) = 0; + virtual thread_state &thread_state_for(size_t id) = 0; }; template @@ -34,11 +34,11 @@ class static_scheduler_memory : public scheduler_memory { return MAX_THREADS; } - base::thread &thread_for(size_t id) const override { + base::thread &thread_for(size_t id) override { return threads_[id]; } - thread_state &thread_state_for(size_t id) const override { + thread_state &thread_state_for(size_t id) override { return thread_states_[id].get_thread_state(); } @@ -68,11 +68,11 @@ class heap_scheduler_memory : public scheduler_memory { return max_threads_; } - base::thread &thread_for(size_t id) const override { + base::thread &thread_for(size_t id) override { return thread_vector_[id]; } - thread_state &thread_state_for(size_t id) const override { + thread_state &thread_state_for(size_t id) override { return thread_state_vector_[id].object().get_thread_state(); } diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index f5c351c..45abb03 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -1,9 +1,8 @@ - #ifndef PLS_TASK_H #define PLS_TASK_H -#include "pls/internal/scheduling/thread_state.h" #include "pls/internal/scheduling/continuation.h" +#include "pls/internal/scheduling/cont_manager.h" namespace pls { namespace internal { @@ -16,8 +15,6 @@ namespace scheduling { * Override the execute_internal() method for your custom code. */ class base_task { - friend class scheduler; - protected: base_task() = default; @@ -26,7 +23,7 @@ class base_task { */ virtual void execute_internal() = 0; - private: + public: void execute() { // TODO: Figure out slow path execution execute_internal(); @@ -35,19 +32,22 @@ class base_task { template class task : public base_task { + using continutation_type = continuation; + public: template - explicit task(FARG &&function, continuation *continuation) - : base_task{}, function_{std::forward(function)}, continuation_{continuation} {} + explicit task(FARG &&function, continuation_node *continuation_node) + : base_task{}, function_{std::forward(function)}, continuation_node_{continuation_node} {} void execute_internal() override { - continuation_->store_result_2(function_()); + auto *continuation = dynamic_cast(continuation_node_->continuation()); + continuation->store_result_2(function_()); // TODO: Properly notify continuation on slow path } private: F function_; - continuation *continuation_; + continuation_node *continuation_node_; }; } diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index d31a2a8..4b1d46d 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -9,7 +9,6 @@ #include "pls/internal/scheduling/task.h" #include "pls/internal/data_structures/stamped_integer.h" -#include "task.h" namespace pls { namespace internal { @@ -79,7 +78,7 @@ class task_manager { private: task_handle *task_handle_stack_; - alignas(base::system_details::CACHE_LINE_SIZE) std::atomic> head_; + alignas(base::system_details::CACHE_LINE_SIZE) std::atomic head_; alignas(base::system_details::CACHE_LINE_SIZE) std::atomic tail_; alignas(base::system_details::CACHE_LINE_SIZE) unsigned int tail_internal_, stamp_internal_; }; diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index eaba7f3..b1b3867 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -7,7 +7,6 @@ #include #include -#include "pls/internal/base/thread.h" #include "pls/internal/scheduling/task_manager.h" #include "pls/internal/scheduling/cont_manager.h" @@ -17,7 +16,7 @@ namespace scheduling { // forward declaration class scheduler; -class task; +class base_task; struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { scheduler *scheduler_; @@ -26,7 +25,7 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { task_manager &task_manager_; cont_manager &cont_manager_; - alignas(base::system_details::CACHE_LINE_SIZE) task *current_task_; + alignas(base::system_details::CACHE_LINE_SIZE) base_task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; public: @@ -46,7 +45,12 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { * * @return The thread_state of this thread. */ - static thread_state *get() { return base::this_thread::state(); } + static thread_state &get() { return *base::this_thread::state(); } + + size_t get_id() { return id_; } + task_manager &get_task_manager() { return task_manager_; } + cont_manager &get_cont_manager() { return cont_manager_; } + scheduler &get_scheduler() { return *scheduler_; } // Do not allow move/copy operations. // State is a pure memory container with references/pointers into it from all over the code. diff --git a/lib/pls/src/internal/data_structures/aligned_stack.cpp b/lib/pls/src/internal/data_structures/aligned_stack.cpp index 12fab84..991778b 100644 --- a/lib/pls/src/internal/data_structures/aligned_stack.cpp +++ b/lib/pls/src/internal/data_structures/aligned_stack.cpp @@ -10,7 +10,7 @@ aligned_stack::aligned_stack(char *memory_pointer, size_t size) : memory_pointer_{memory_pointer}, // MUST be aligned max_offset_{size / base::system_details::CACHE_LINE_SIZE}, current_offset_{0} { - PLS_ASSERT((pointer_t) memory_pointer_ % base::system_details::CACHE_LINE_SIZE != 0, + PLS_ASSERT((pointer_t) memory_pointer_ % base::system_details::CACHE_LINE_SIZE == 0, "Must initialize an aligned_stack with a properly aligned memory region!") } @@ -23,20 +23,20 @@ aligned_stack::aligned_stack(char *unaligned_memory_pointer, size_t size, size_t "Initialized aligned stack with invalid memory configuration!") } -void *aligned_stack::memory_at_offset(stack_offset offset) const { +char *aligned_stack::memory_at_offset(stack_offset offset) const { const auto byte_offset = offset * base::system_details::CACHE_LINE_SIZE; - return reinterpret_cast(memory_pointer_ + byte_offset); + return reinterpret_cast(memory_pointer_ + byte_offset); } -void *aligned_stack::push_bytes(size_t size) { +char *aligned_stack::push_bytes(size_t size) { size_t round_up_size = base::alignment::next_alignment(size); size_t num_cache_lines = round_up_size / base::system_details::CACHE_LINE_SIZE; - void *result = memory_at_offset(current_offset_); + char *result = memory_at_offset(current_offset_); // Move head to next aligned position after new object current_offset_ += num_cache_lines; - PLS_ASSERT(current_offset_ > max_offset_, + PLS_ASSERT(current_offset_ <= max_offset_, "Tried to allocate object on alligned_stack without sufficient memory!"); return result; diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 1d205ac..9344a8b 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -8,26 +8,25 @@ namespace pls { namespace internal { namespace scheduling { -scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads, bool reuse_thread) : +scheduler::scheduler(scheduler_memory &memory, const unsigned int num_threads, bool reuse_thread) : num_threads_{num_threads}, reuse_thread_{reuse_thread}, memory_{memory}, sync_barrier_{num_threads + 1 - reuse_thread}, terminated_{false} { - if (num_threads_ > memory_->max_threads()) { + if (num_threads_ > memory.max_threads()) { PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); } for (unsigned int i = 0; i < num_threads_; i++) { // Placement new is required, as the memory of `memory_` is not required to be initialized. - new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), - memory_->cont_stack_for(i), i}; + memory.thread_state_for(i).scheduler_ = this; + memory.thread_state_for(i).id_ = i; if (reuse_thread && i == 0) { continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. } - new((void *) memory_->thread_for(i))base::thread(&scheduler::worker_routine, memory_->thread_state_for(i)); - + memory.thread_for(i) = base::thread(&scheduler::work_thread_main_loop, &memory_.thread_state_for(i)); } } @@ -35,40 +34,40 @@ scheduler::~scheduler() { terminate(); } -void scheduler::worker_routine() { - auto my_state = thread_state::get(); - auto scheduler = my_state->scheduler_; - +void scheduler::work_thread_main_loop() { + auto &scheduler = thread_state::get().get_scheduler(); while (true) { // Wait to be triggered - scheduler->sync_barrier_.wait(); + scheduler.sync_barrier_.wait(); // Check for shutdown - if (scheduler->terminated_) { + if (scheduler.terminated_) { return; } - // Execute work - if (my_state->id_ == 0) { - // Main Thread - auto root_task = scheduler->main_thread_root_task_; - root_task->parent_ = nullptr; - root_task->deque_offset_ = my_state->deque_.save_offset(); - - root_task->execute(); - scheduler->work_section_done_ = true; - } else { - // Worker Threads - while (!scheduler->work_section_done_) { - if (!scheduler->try_execute_local()) { - scheduler->try_execute_stolen(); - } - } - } + scheduler.work_thread_work_section(); // Sync back with main thread - my_state->scheduler_->sync_barrier_.wait(); + scheduler.sync_barrier_.wait(); + } +} + +void scheduler::work_thread_work_section() { + auto &my_state = thread_state::get(); + + if (my_state.get_id() == 0) { + // Main Thread, kick of by executing the user's main code block. + main_thread_starter_function_->run(); } + + do { + // TODO: Implement other threads, for now we are happy if it compiles and runs on one thread + // For now we can test without this, as the fast path should never hit this. + // 1) Try Steal + // 2) Copy Over + // 3) Finish Steal + // 4) Execute Local Copy + } while (!work_section_done_); } void scheduler::terminate() { @@ -83,79 +82,11 @@ void scheduler::terminate() { if (reuse_thread_ && i == 0) { continue; } - memory_->thread_for(i)->join(); - } -} - -task *scheduler::get_local_task() { - PROFILE_STEALING("Get Local Task") - return thread_state::get()->deque_.pop_local_task(); -} - -task *scheduler::steal_task() { - PROFILE_STEALING("Steal Task") - - // Data for victim selection - const auto my_state = thread_state::get(); - - const auto my_id = my_state->id_; - const size_t offset = my_state->random_() % num_threads(); - const size_t max_tries = num_threads(); // TODO: Tune this value - bool any_cas_fails_occured = false; - - // Current strategy: random start, then round robin from there - for (size_t i = 0; i < max_tries; i++) { - size_t target = (offset + i) % num_threads(); - - // Skip our self for stealing - target = ((target == my_id) + target) % num_threads(); - - auto target_state = thread_state_for(target); - - bool cas_fail; - auto result = target_state->deque_.pop_external_task(cas_fail); - any_cas_fails_occured |= cas_fail; - if (result != nullptr) { - return result; - } - - // TODO: See if we should backoff here (per missed steal) - } - - if (!any_cas_fails_occured) { - // Went through every task and we did not find any work. - // Most likely there is non available right now, yield to other threads. - pls::internal::base::this_thread::yield(); - } - return nullptr; -} - -bool scheduler::try_execute_local() { - task *local_task = get_local_task(); - if (local_task != nullptr) { - local_task->execute(); - return true; - } else { - return false; - } -} - -bool scheduler::try_execute_stolen() { - task *stolen_task = steal_task(); - if (stolen_task != nullptr) { - stolen_task->deque_offset_ = thread_state::get()->deque_.save_offset(); - stolen_task->execute(); - return true; + memory_.thread_for(i).join(); } - - return false; -} - -void scheduler::wait_for_all() { - thread_state::get()->current_task_->wait_for_all(); } -thread_state *scheduler::thread_state_for(size_t id) { return memory_->thread_state_for(id); } +thread_state &scheduler::thread_state_for(size_t id) { return memory_.thread_state_for(id); } } } diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index d515972..aa7f29c 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -2,9 +2,7 @@ #include "pls/internal/base/system_details.h" -#include "pls/internal/scheduling/data_structures/aligned_stack.h" -#include "pls/internal/scheduling/data_structures/locking_deque.h" -#include "pls/internal/scheduling/data_structures/work_stealing_deque.h" +#include "pls/internal/data_structures/aligned_stack.h" #include