From 21733e4c3497f433dc17856698f5c647a05aff5c Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Wed, 27 Nov 2019 23:42:40 +0100 Subject: [PATCH] WIP: Major flaws fixed. Edge cases at beginning missing and cleanup for conts missing. --- app/playground/main.cpp | 26 +++++++++++++++++--------- lib/pls/include/pls/internal/scheduling/cont_manager.h | 4 ++-- lib/pls/include/pls/internal/scheduling/memory_block.h | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 7 ++++--- lib/pls/include/pls/internal/scheduling/thread_state_static.h | 33 +++++++++++++++++++++++++++++++++ lib/pls/src/internal/scheduling/scheduler.cpp | 5 +++++ 6 files changed, 197 insertions(+), 14 deletions(-) create mode 100644 lib/pls/include/pls/internal/scheduling/memory_block.h create mode 100644 lib/pls/include/pls/internal/scheduling/thread_state_static.h diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 0578201..9fec0c6 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -7,7 +7,7 @@ using namespace pls::internal; -constexpr size_t NUM_THREADS = 1; +constexpr size_t NUM_THREADS = 8; constexpr size_t NUM_TASKS = 64; constexpr size_t MAX_TASK_STACK_SIZE = 0; @@ -17,7 +17,7 @@ constexpr size_t MAX_CONT_SIZE = 256; std::atomic count{0}; scheduling::parallel_result fib(int n) { - std::cout << "Fib(" << n << "): " << count++ << ", " << scheduling::thread_state::get().get_id() << std::endl; +// std::cout << "Fib(" << n << "): " << count++ << ", " << scheduling::thread_state::get().get_id() << std::endl; if (n == 0) { return 0; } @@ -31,13 +31,13 @@ scheduling::parallel_result fib(int n) { return fib(n - 2); }).then([=](int a, int b) { scheduling::parallel_result result{a + b}; - std::cout << "Done Fib(" << n << "): " << (a + b) << ", " << scheduling::thread_state::get().get_id() << std::endl; +// std::cout << "Done Fib(" << n << "): " << (a + b) << ", " << scheduling::thread_state::get().get_id() << std::endl; return result; }); } int fib_normal(int n) { - std::cout << "Fib(" << n << "): " << count++ << std::endl; +// std::cout << "Fib(" << n << "): " << count++ << std::endl; if (n == 0) { return 0; } @@ -46,7 +46,7 @@ int fib_normal(int n) { } int result = fib_normal(n - 1) + fib_normal(n - 2); - std::cout << "Done Fib(" << n << "): " << result << std::endl; +// std::cout << "Done Fib(" << n << "): " << result << std::endl; return result; } @@ -60,19 +60,27 @@ int main() { scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS}; auto start = std::chrono::steady_clock::now(); -// std::cout << "fib = " << fib_normal(10) << std::endl; + std::cout << "fib = " << fib_normal(16) << std::endl; auto end = std::chrono::steady_clock::now(); - std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() + std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() << std::endl; start = std::chrono::steady_clock::now(); scheduler.perform_work([]() { - return fib(10); +// return scheduling::scheduler::par([]() { +// return scheduling::parallel_result(0); +// }, []() { +// return fib(16); +// }).then([](int, int b) { +// std::cout << "fib = " << (b) << std::endl; +// return scheduling::parallel_result{0}; +// }); + return fib(16); }); end = std::chrono::steady_clock::now(); - std::cout << "Framework: " << std::chrono::duration_cast(end - start).count() << std::endl; + std::cout << "Framework: " << std::chrono::duration_cast(end - start).count() << std::endl; return 0; } diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h index 792593e..b42dc5c 100644 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ b/lib/pls/include/pls/internal/scheduling/cont_manager.h @@ -104,8 +104,8 @@ class cont_manager { // Check to execute right child directly... auto &atomic_state = notified_cont->get_memory_block()->get_state(); memory_block::stamped_state target_state{atomic_state.load().stamp + 1, memory_block::state::execute_local}; - atomic_state.exchange(target_state); - if (target_state.value != memory_block::state::stolen) { + memory_block::stamped_state old_state = atomic_state.exchange(target_state); + if (old_state.value != memory_block::state::stolen) { // We 'overruled' the stealing thread and execute the other task ourselfs. // We can be sure that only we where involved in executing the child tasks of the parent_continuation... notified_cont->get_memory_block()->get_results_missing().fetch_add(-1); diff --git a/lib/pls/include/pls/internal/scheduling/memory_block.h b/lib/pls/include/pls/internal/scheduling/memory_block.h new file mode 100644 index 0000000..b71715b --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/memory_block.h @@ -0,0 +1,136 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_CONT_NODE_H_ +#define PLS_INTERNAL_SCHEDULING_CONT_NODE_H_ + +namespace pls { +namespace internal { +namespace scheduling { + +/** + * A block of memory that can be used to store tasks and continuations. + * Threads trade these blocks while executing and stealing tasks. + * + * Each block has an associated, raw memory buffer. The user can place his object + * in this memory region as needed. He is responsible for calling deconstructors of the + * placed objects. + */ +class memory_block { + public: + memory_block(char *memory_buffer, + size_t memory_buffer_size, + memory_block *memory_chain_start, + memory_block *prev, + unsigned int depth) + : memory_chain_start_{memory_chain_start}, + prev_{prev}, + next_{nullptr}, + offered_chain_{nullptr}, + state_{{initialized}}, + results_missing_{2}, + memory_buffer_{memory_buffer}, + memory_buffer_size_{memory_buffer_size}, + memory_buffer_used_{false}, + depth_{depth} {}; + + template + T *place_in_buffer(ARGS &&...args) { + PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node.") + + memory_buffer_used_ = true; + return new(memory_buffer_) T(std::forward(args)...); + } + void free_buffer() { + PLS_ASSERT(memory_buffer_used_, "Can only free a memory spot when it is in use.") + memory_buffer_used_ = false; + } + + // TODO: Fit the reset somewhere!!! +// // Reset Associated counters +// results_missing_.store(2); +// offered_chain_.store(nullptr); +// auto old_state = state_.load(); +// state_.store({old_state.stamp + 1, initialized}); + + + memory_block *get_prev() { + return prev_; + } + void set_prev(memory_block *prev) { + prev_ = prev; + } + memory_block *get_next() { + return next_; + } + void set_next(memory_block *next) { + next_ = next; + } + memory_block *get_start() { + return memory_chain_start_; + } + void set_start(memory_block *start) { + memory_chain_start_ = start; + } + + enum state { initialized, execute_local, stealing, stolen }; + using stamped_state = data_structures::stamped_integer; + + std::atomic &get_state() { + return state_; + } + + std::atomic &get_offered_chain() { + return offered_chain_; + } + + std::atomic &get_results_missing() { + return results_missing_; + } + + unsigned int get_depth() const noexcept { + return depth_; + } + + private: + // Linked list property of memory blocks (a complete list represents a threads currently owned memory). + // Each block knows its chain start to allow stealing a whole chain in O(1) + // without the need to traverse back to the chain start. + memory_block *memory_chain_start_; + memory_block *prev_, *next_; + + // When blocked on this chain element, we need to know what other chain of memory we + // got offered by the stealing thread. + // For this we need the offered chain's element up to the point we can steal. + std::atomic offered_chain_; + + // The flag is needed for an ongoing stealing request. + // Stealing threads need to offer their memory block chain before the + // 'fully' own the stolen task. As long as that is not done the memory block + // chain can abort the steal request in order to be not blocked without a + // new, clean memory block chain to work with. + std::atomic state_; + + // Management for coordinating concurrent result writing and stealing. + // The result count decides atomically who gets to execute the continuation + // and who therefore get's to own this memory block chain. + std::atomic results_missing_; + + // Pointer to memory region reserved for the companion continuation. + // Must be a buffer big enough to hold any continuation encountered in the program. + // This memory is managed explicitly by the continuation manager and runtime system + // (they need to make sure to always call de-constructors and never allocate two continuations). + char *memory_buffer_; + // These two are only helper properties helping with bugs during development. + size_t memory_buffer_size_; + bool memory_buffer_used_; + + // Each element stays at a fixed depth for the entire application run. + // Swapping parts of a memory chain will not reorder it, as always parts of + // the same size are exchanged. + const unsigned int depth_; +}; + +} +} +} + +#endif //PLS_INTERNAL_SCHEDULING_CONT_NODE_H_ diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 8a6dfc9..6d32b1f 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -74,7 +74,8 @@ struct scheduler::starter { if (cont_manager.falling_through()) { // Main scheduling loop is responsible for entering the result to the slow path... current_cont->store_left_result(std::move(result_1)); - cont_manager.fall_through_and_notify_cont(current_cont, false); + PLS_ASSERT(current_cont->get_memory_block()->get_results_missing().fetch_add(-1) != 1, + "We fall through, meaning we 'block' an cont above, thus this can never happen!"); // Unwind stack... return result_type{}; } @@ -133,9 +134,9 @@ class scheduler::init_function_impl : public init_function { return parallel_result{0}; }, [=]() { return function_(); - }).then([=](int, int) { + }).then([=](int, int b) { thread_state::get().get_scheduler().work_section_done_ = true; - return parallel_result{0}; + return parallel_result{b}; }); } diff --git a/lib/pls/include/pls/internal/scheduling/thread_state_static.h b/lib/pls/include/pls/internal/scheduling/thread_state_static.h new file mode 100644 index 0000000..1a55456 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/thread_state_static.h @@ -0,0 +1,33 @@ + +#ifndef PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ +#define PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ + +#include "pls/internal/scheduling/task_manager.h" +#include "pls/internal/scheduling/cont_manager.h" + +#include "thread_state.h" + +namespace pls { +namespace internal { +namespace scheduling { + +template +struct thread_state_static { + public: + thread_state_static() + : static_task_manager_{}, + static_cont_manager_{}, + thread_state_{static_task_manager_.get_task_manager(), static_cont_manager_.get_cont_manager()} {} + thread_state &get_thread_state() { return thread_state_; } + + private: + static_task_manager static_task_manager_; + static_cont_manager static_cont_manager_; + thread_state thread_state_; +}; + +} +} +} + +#endif //PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_ diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 78d2719..4ed1d40 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -88,6 +88,11 @@ void scheduler::work_thread_work_section() { my_state.right_spawn_ = true; my_cont_manager.set_active_depth(stolen_task->get_cont()->get_memory_block()->get_depth() + 1); stolen_task->execute(); + if (my_cont_manager.falling_through()) { + break; + } else { + my_cont_manager.fall_through_and_notify_cont(stolen_task->get_cont(), true); + } } } } while (!work_section_done_); -- libgit2 0.26.0