From adf05e9a65a2334bac9973e98a38ba35499ed678 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Fri, 29 Nov 2019 11:15:31 +0100 Subject: [PATCH] WIP: We plan to fully remove the start property from the cont manager. The start_chain property does not make sense, as chains are purely 'virtual', i.e. they only fully exist when walking through the computation (by patching them on important events). We initially added the property as a helper for better runtime and simpler implementation, but we think without it we will not get as much inconsistency in the runtime state. Performance can be 're-added' later on. --- app/playground/main.cpp | 9 ++++++--- lib/pls/CMakeLists.txt | 2 +- lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h | 2 ++ lib/pls/include/pls/internal/scheduling/cont.h | 1 + lib/pls/include/pls/internal/scheduling/cont_manager.h | 40 +++++++++++++++++++++++++++++++++++----- lib/pls/include/pls/internal/scheduling/memory_block.h | 17 +++++++++++++++-- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 6 ++++-- lib/pls/include/pls/internal/scheduling/task_manager.h | 18 +++++++++++++++++- lib/pls/src/internal/scheduling/scheduler.cpp | 4 ++++ test/scheduling_tests.cpp | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 132 insertions(+), 14 deletions(-) diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 9fec0c6..f400ab1 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -4,10 +4,11 @@ #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/parallel_result.h" #include "pls/internal/scheduling/scheduler_memory.h" +#include "pls/internal/base/thread.h" using namespace pls::internal; -constexpr size_t NUM_THREADS = 8; +constexpr size_t NUM_THREADS = 2; constexpr size_t NUM_TASKS = 64; constexpr size_t MAX_TASK_STACK_SIZE = 0; @@ -17,6 +18,7 @@ constexpr size_t MAX_CONT_SIZE = 256; std::atomic count{0}; scheduling::parallel_result fib(int n) { + base::this_thread::sleep(100); // std::cout << "Fib(" << n << "): " << count++ << ", " << scheduling::thread_state::get().get_id() << std::endl; if (n == 0) { return 0; @@ -31,6 +33,7 @@ scheduling::parallel_result fib(int n) { return fib(n - 2); }).then([=](int a, int b) { scheduling::parallel_result result{a + b}; + base::this_thread::sleep(100); // std::cout << "Done Fib(" << n << "): " << (a + b) << ", " << scheduling::thread_state::get().get_id() << std::endl; return result; }); @@ -60,7 +63,7 @@ int main() { scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS}; auto start = std::chrono::steady_clock::now(); - std::cout << "fib = " << fib_normal(16) << std::endl; +// std::cout << "fib = " << fib_normal(10) << std::endl; auto end = std::chrono::steady_clock::now(); std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() << std::endl; @@ -76,7 +79,7 @@ int main() { // std::cout << "fib = " << (b) << std::endl; // return scheduling::parallel_result{0}; // }); - return fib(16); + return fib(10); }); end = std::chrono::steady_clock::now(); diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 9ec640a..c286bce 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -58,7 +58,7 @@ add_library(pls STATIC include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/cont_manager.h include/pls/internal/scheduling/cont.h - include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/cont_manager_impl.h include/pls/internal/scheduling/thread_state_static.h) + include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h index f3c20fb..194a94c 100644 --- a/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h +++ b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h @@ -85,6 +85,8 @@ class bounded_ws_deque { } // The queue is empty and we lost the competition + local_bottom_ = 0; + bottom_.store(local_bottom_); top_.store({old_top.stamp + 1, 0}); return optional(); } diff --git a/lib/pls/include/pls/internal/scheduling/cont.h b/lib/pls/include/pls/internal/scheduling/cont.h index 3254746..134db43 100644 --- a/lib/pls/include/pls/internal/scheduling/cont.h +++ b/lib/pls/include/pls/internal/scheduling/cont.h @@ -116,6 +116,7 @@ class cont : public base_cont { using result_type = decltype(function_((*result_1_).value(), (*result_2_).value())); result_runner::execute(*this); this->get_memory_block()->free_buffer(); + this->get_memory_block()->reset_state(); this->~cont(); } diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h index b42dc5c..c8797e6 100644 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ b/lib/pls/include/pls/internal/scheduling/cont_manager.h @@ -22,7 +22,7 @@ class cont_manager { template explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args) - : max_cont_size_{MAX_CONT_SIZE} { + : max_cont_size_{MAX_CONT_SIZE}, num_conts_{NUM_CONTS} { // First node is currently active and our local start start_node_ = active_node_ = init_memory_block(cont_storage, nullptr, nullptr, 0); @@ -66,6 +66,7 @@ class cont_manager { target_chain->set_next(our_next_node); start_node_ = target_chain->get_start(); + active_node_ = target_chain; } memory_block *get_node(unsigned int depth) { @@ -80,6 +81,21 @@ class cont_manager { return current; } + void check_clean_chain() { + memory_block *current = start_node_; + for (unsigned int i = 0; i < num_conts_; i++) { + bool buffer_used = current->is_buffer_used(); + auto state_value = current->get_state().load().value; + if (state_value != memory_block::initialized || buffer_used || current->get_depth() != i) { + PLS_ASSERT(false, + "Must always steal with a clean chain!"); + } + + current->set_start(start_node_); + current = current->get_next(); + } + } + void set_active_depth(unsigned int depth) { active_node_ = get_node(depth); } @@ -93,6 +109,9 @@ class cont_manager { auto *notified_cont = fall_through_cont_; bool notifier_is_right_child = fall_through_child_right; + std::cout << "Notifying Cont on core " << my_state.get_id() << " and depth " + << notified_cont->get_memory_block()->get_depth() << std::endl; + fall_through_cont_ = nullptr; fall_through_ = false; @@ -103,9 +122,10 @@ class cont_manager { if (!notifier_is_right_child) { // Check to execute right child directly... auto &atomic_state = notified_cont->get_memory_block()->get_state(); - memory_block::stamped_state target_state{atomic_state.load().stamp + 1, memory_block::state::execute_local}; - memory_block::stamped_state old_state = atomic_state.exchange(target_state); - if (old_state.value != memory_block::state::stolen) { + auto old_state = atomic_state.load(); + memory_block::stamped_state target_state{old_state.stamp + 1, memory_block::state::execute_local}; + memory_block::stamped_state exchanged_state = atomic_state.exchange(target_state); + if (exchanged_state.value != memory_block::state::stolen) { // We 'overruled' the stealing thread and execute the other task ourselfs. // We can be sure that only we where involved in executing the child tasks of the parent_continuation... notified_cont->get_memory_block()->get_results_missing().fetch_add(-1); @@ -130,19 +150,25 @@ class cont_manager { // Right side is 'fully' stolen. We can continue to inform the parent like we would do normally. } + // Keep the target chain before we execute, as this potentially frees the memory + auto *target_chain = notified_cont->get_memory_block()->get_offered_chain().load(); + // Notify the next continuation of finishing a child... if (notified_cont->get_memory_block()->get_results_missing().fetch_add(-1) == 1) { // ... we finished the continuation. // We are now in charge continuing to execute the above continuation chain. if (get_node(notified_cont->get_memory_block()->get_depth()) != notified_cont->get_memory_block()) { + my_state.cont_manager_.check_clean_chain(); // We do not own the thing we will execute. // Own it by swapping the chain belonging to it in. aquire_memory_chain(notified_cont->get_memory_block()); + std::cout << "Now in charge of memory chain on core " << my_state.get_id() << std::endl; } my_state.parent_cont_ = notified_cont->get_parent(); my_state.right_spawn_ = notified_cont->is_right_child(); active_node_ = notified_cont->get_memory_block(); + std::cout << "Execute cont on core " << my_state.get_id() << std::endl; notified_cont->execute(); if (!falling_through() && notified_cont->get_parent() != nullptr) { fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child()); @@ -155,8 +181,11 @@ class cont_manager { if (get_node(notified_cont->get_memory_block()->get_depth()) == notified_cont->get_memory_block()) { // We own the thing we are not allowed to execute. // Get rid of the ownership by using the offered chain. - aquire_memory_chain(notified_cont->get_memory_block()->get_offered_chain().load()); + aquire_memory_chain(target_chain); + std::cout << "No longer in charge of chain above on core " << my_state.get_id() << std::endl; } + + my_state.cont_manager_.check_clean_chain(); // We are done here...nothing more to execute return; } @@ -178,6 +207,7 @@ class cont_manager { private: const size_t max_cont_size_; + const size_t num_conts_; /** * Managing the continuation chain. diff --git a/lib/pls/include/pls/internal/scheduling/memory_block.h b/lib/pls/include/pls/internal/scheduling/memory_block.h index b71715b..9ecc015 100644 --- a/lib/pls/include/pls/internal/scheduling/memory_block.h +++ b/lib/pls/include/pls/internal/scheduling/memory_block.h @@ -34,7 +34,10 @@ class memory_block { template T *place_in_buffer(ARGS &&...args) { - PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node.") + if (memory_buffer_used_) { + pls::internal::base::this_thread::sleep(100000); + PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node."); + } memory_buffer_used_ = true; return new(memory_buffer_) T(std::forward(args)...); @@ -43,6 +46,9 @@ class memory_block { PLS_ASSERT(memory_buffer_used_, "Can only free a memory spot when it is in use.") memory_buffer_used_ = false; } + bool is_buffer_used() { + return memory_buffer_used_; + } // TODO: Fit the reset somewhere!!! // // Reset Associated counters @@ -71,7 +77,7 @@ class memory_block { memory_chain_start_ = start; } - enum state { initialized, execute_local, stealing, stolen }; + enum state { initialized, execute_local, stealing, stolen, invalid }; using stamped_state = data_structures::stamped_integer; std::atomic &get_state() { @@ -90,6 +96,13 @@ class memory_block { return depth_; } + void reset_state() { + offered_chain_.store(nullptr); + auto old_state = state_.load(); + state_.store({old_state.stamp + 1, initialized}); + results_missing_.store(2); + } + private: // Linked list property of memory blocks (a complete list represents a threads currently owned memory). // Each block knows its chain start to allow stealing a whole chain in O(1) diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 6d32b1f..fc10abd 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -44,11 +44,11 @@ struct scheduler::starter { // (We might optimize this in the future to require less memory copies.) auto *current_memory_block = cont_manager.get_next_memory_block(); - // We set the correct side when invoking user code. - const bool is_right_cont = my_state.right_spawn_; // We keep track of the last spawn to build up the parent_cont chain + const bool is_right_cont = my_state.right_spawn_; base_cont *parent_cont = my_state.parent_cont_; + continuation_type *current_cont = current_memory_block->place_in_buffer(parent_cont, current_memory_block, is_right_cont, @@ -74,6 +74,8 @@ struct scheduler::starter { if (cont_manager.falling_through()) { // Main scheduling loop is responsible for entering the result to the slow path... current_cont->store_left_result(std::move(result_1)); + auto old_state = current_cont->get_memory_block()->get_state().load(); + current_cont->get_memory_block()->get_state().store({old_state.stamp + 1, memory_block::invalid}); PLS_ASSERT(current_cont->get_memory_block()->get_results_missing().fetch_add(-1) != 1, "We fall through, meaning we 'block' an cont above, thus this can never happen!"); // Unwind stack... diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 41117f3..a65cc1b 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -5,14 +5,18 @@ #include #include #include +#include #include #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/cont_manager.h" #include "pls/internal/scheduling/memory_block.h" + #include "pls/internal/data_structures/bounded_ws_deque.h" #include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/base/spin_lock.h" + namespace pls { namespace internal { namespace scheduling { @@ -38,24 +42,33 @@ class task_manager { public: // Publishes a task on the stack, i.e. makes it visible for other threads to steal. void publish_task(base_task *task) { + std::lock_guard lock{lock_}; task_deque_.push_bottom(task_handle{task}); } // Try to pop a local task from this task managers stack. bool steal_local_task() { + std::lock_guard lock{lock_}; return task_deque_.pop_bottom(); } // Try to steal a task from a remote task_manager instance. The stolen task must be stored locally. // Returns a pair containing the actual task and if the steal was successful. base_task *steal_remote_task(cont_manager &stealing_cont_manager) { + std::lock_guard lock{lock_}; auto stolen_task_handle = task_deque_.pop_top(); if (stolen_task_handle) { base_task *stolen_task = (*stolen_task_handle).task_; + std::cout << "Nearly stole on core " << thread_state::get().get_id() << " task with depth " + << stolen_task->get_cont()->get_memory_block()->get_depth() << std::endl; auto &atomic_state = (*stolen_task_handle).task_memory_block_->get_state(); auto &atomic_offered_chain = (*stolen_task_handle).task_memory_block_->get_offered_chain(); auto offered_chain = stealing_cont_manager.get_node((*stolen_task_handle).task_memory_block_->get_depth()); + if (offered_chain == (*stolen_task_handle).task_memory_block_) { + PLS_ASSERT(false, "How would we offer our own chain? We only offer when stealing!"); + } + auto last_state = atomic_state.load(); if (last_state.value != memory_block::initialized) { return nullptr; @@ -78,6 +91,7 @@ class task_manager { } if (atomic_state.compare_exchange_strong(loop_state, {loop_state.stamp + 1, memory_block::stolen})) { + std::cout << "Steal!" << std::endl; return stolen_task; } else { return nullptr; @@ -90,10 +104,12 @@ class task_manager { } } - explicit task_manager(data_structures::bounded_ws_deque &task_deque) : task_deque_{task_deque} {} + explicit task_manager(data_structures::bounded_ws_deque &task_deque) : task_deque_{task_deque}, + lock_{} {} private: data_structures::bounded_ws_deque &task_deque_; + base::spin_lock lock_; }; template diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 4ed1d40..88b0799 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -69,6 +69,7 @@ void scheduler::work_thread_work_section() { while (my_cont_manager.falling_through()) { my_cont_manager.execute_fall_through_code(); } + my_cont_manager.check_clean_chain(); // Steal Routine (will be continuously executed when there are no more fall through's). // TODO: move into separate function @@ -82,16 +83,19 @@ void scheduler::work_thread_work_section() { auto &target_state = my_state.get_scheduler().thread_state_for(target); + my_cont_manager.check_clean_chain(); auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager); if (stolen_task != nullptr) { my_state.parent_cont_ = stolen_task->get_cont(); my_state.right_spawn_ = true; my_cont_manager.set_active_depth(stolen_task->get_cont()->get_memory_block()->get_depth() + 1); + my_cont_manager.check_clean_chain(); stolen_task->execute(); if (my_cont_manager.falling_through()) { break; } else { my_cont_manager.fall_through_and_notify_cont(stolen_task->get_cont(), true); + break; } } } diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index 349d63a..6707436 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -1,3 +1,50 @@ #include +#include +#include +#include + +#include "pls/internal/scheduling/scheduler.h" +#include "pls/internal/scheduling/cont.h" +#include "pls/internal/scheduling/cont_manager.h" +#include "pls/internal/scheduling/scheduler_memory.h" +#include "pls/internal/scheduling/parallel_result.h" + +using namespace pls::internal::scheduling; + // TODO: Introduce actual tests once multiple threads work... +TEST_CASE("continuation stealing", "[internal/scheduling/cont_manager.h]") { + const int NUM_THREADS = 2; + const int NUM_TASKS = 8; + const int MAX_TASK_STACK_SIZE = 8; + const int NUM_CONTS = 8; + const int MAX_CONT_SIZE = 256; + + static_scheduler_memory static_scheduler_memory; + + scheduler scheduler{static_scheduler_memory, NUM_THREADS}; + + // Coordinate progress to match OUR order + std::atomic progress{0}; + + // Order: + // 0) work on first task on main thread + // 1) second thread stole right task + + scheduler.perform_work([&]() { + return scheduler::par([&]() { + while (progress.load() != 1); + return parallel_result{0}; + }, [&]() { + progress.store(1); + return parallel_result{0}; + }).then([&](int, int) { + + return parallel_result{0}; + }); + }); +} -- libgit2 0.26.0