diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 9fec0c6..f400ab1 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -4,10 +4,11 @@ #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/parallel_result.h" #include "pls/internal/scheduling/scheduler_memory.h" +#include "pls/internal/base/thread.h" using namespace pls::internal; -constexpr size_t NUM_THREADS = 8; +constexpr size_t NUM_THREADS = 2; constexpr size_t NUM_TASKS = 64; constexpr size_t MAX_TASK_STACK_SIZE = 0; @@ -17,6 +18,7 @@ constexpr size_t MAX_CONT_SIZE = 256; std::atomic count{0}; scheduling::parallel_result fib(int n) { + base::this_thread::sleep(100); // std::cout << "Fib(" << n << "): " << count++ << ", " << scheduling::thread_state::get().get_id() << std::endl; if (n == 0) { return 0; @@ -31,6 +33,7 @@ scheduling::parallel_result fib(int n) { return fib(n - 2); }).then([=](int a, int b) { scheduling::parallel_result result{a + b}; + base::this_thread::sleep(100); // std::cout << "Done Fib(" << n << "): " << (a + b) << ", " << scheduling::thread_state::get().get_id() << std::endl; return result; }); @@ -60,7 +63,7 @@ int main() { scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS}; auto start = std::chrono::steady_clock::now(); - std::cout << "fib = " << fib_normal(16) << std::endl; +// std::cout << "fib = " << fib_normal(10) << std::endl; auto end = std::chrono::steady_clock::now(); std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() << std::endl; @@ -76,7 +79,7 @@ int main() { // std::cout << "fib = " << (b) << std::endl; // return scheduling::parallel_result{0}; // }); - return fib(16); + return fib(10); }); end = std::chrono::steady_clock::now(); diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 9ec640a..c286bce 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -58,7 +58,7 @@ add_library(pls STATIC include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/cont_manager.h include/pls/internal/scheduling/cont.h - include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/cont_manager_impl.h include/pls/internal/scheduling/thread_state_static.h) + include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h index f3c20fb..194a94c 100644 --- a/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h +++ b/lib/pls/include/pls/internal/data_structures/bounded_ws_deque.h @@ -85,6 +85,8 @@ class bounded_ws_deque { } // The queue is empty and we lost the competition + local_bottom_ = 0; + bottom_.store(local_bottom_); top_.store({old_top.stamp + 1, 0}); return optional(); } diff --git a/lib/pls/include/pls/internal/scheduling/cont.h b/lib/pls/include/pls/internal/scheduling/cont.h index 3254746..134db43 100644 --- a/lib/pls/include/pls/internal/scheduling/cont.h +++ b/lib/pls/include/pls/internal/scheduling/cont.h @@ -116,6 +116,7 @@ class cont : public base_cont { using result_type = decltype(function_((*result_1_).value(), (*result_2_).value())); result_runner::execute(*this); this->get_memory_block()->free_buffer(); + this->get_memory_block()->reset_state(); this->~cont(); } diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h index b42dc5c..c8797e6 100644 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ b/lib/pls/include/pls/internal/scheduling/cont_manager.h @@ -22,7 +22,7 @@ class cont_manager { template explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args) - : max_cont_size_{MAX_CONT_SIZE} { + : max_cont_size_{MAX_CONT_SIZE}, num_conts_{NUM_CONTS} { // First node is currently active and our local start start_node_ = active_node_ = init_memory_block(cont_storage, nullptr, nullptr, 0); @@ -66,6 +66,7 @@ class cont_manager { target_chain->set_next(our_next_node); start_node_ = target_chain->get_start(); + active_node_ = target_chain; } memory_block *get_node(unsigned int depth) { @@ -80,6 +81,21 @@ class cont_manager { return current; } + void check_clean_chain() { + memory_block *current = start_node_; + for (unsigned int i = 0; i < num_conts_; i++) { + bool buffer_used = current->is_buffer_used(); + auto state_value = current->get_state().load().value; + if (state_value != memory_block::initialized || buffer_used || current->get_depth() != i) { + PLS_ASSERT(false, + "Must always steal with a clean chain!"); + } + + current->set_start(start_node_); + current = current->get_next(); + } + } + void set_active_depth(unsigned int depth) { active_node_ = get_node(depth); } @@ -93,6 +109,9 @@ class cont_manager { auto *notified_cont = fall_through_cont_; bool notifier_is_right_child = fall_through_child_right; + std::cout << "Notifying Cont on core " << my_state.get_id() << " and depth " + << notified_cont->get_memory_block()->get_depth() << std::endl; + fall_through_cont_ = nullptr; fall_through_ = false; @@ -103,9 +122,10 @@ class cont_manager { if (!notifier_is_right_child) { // Check to execute right child directly... auto &atomic_state = notified_cont->get_memory_block()->get_state(); - memory_block::stamped_state target_state{atomic_state.load().stamp + 1, memory_block::state::execute_local}; - memory_block::stamped_state old_state = atomic_state.exchange(target_state); - if (old_state.value != memory_block::state::stolen) { + auto old_state = atomic_state.load(); + memory_block::stamped_state target_state{old_state.stamp + 1, memory_block::state::execute_local}; + memory_block::stamped_state exchanged_state = atomic_state.exchange(target_state); + if (exchanged_state.value != memory_block::state::stolen) { // We 'overruled' the stealing thread and execute the other task ourselfs. // We can be sure that only we where involved in executing the child tasks of the parent_continuation... notified_cont->get_memory_block()->get_results_missing().fetch_add(-1); @@ -130,19 +150,25 @@ class cont_manager { // Right side is 'fully' stolen. We can continue to inform the parent like we would do normally. } + // Keep the target chain before we execute, as this potentially frees the memory + auto *target_chain = notified_cont->get_memory_block()->get_offered_chain().load(); + // Notify the next continuation of finishing a child... if (notified_cont->get_memory_block()->get_results_missing().fetch_add(-1) == 1) { // ... we finished the continuation. // We are now in charge continuing to execute the above continuation chain. if (get_node(notified_cont->get_memory_block()->get_depth()) != notified_cont->get_memory_block()) { + my_state.cont_manager_.check_clean_chain(); // We do not own the thing we will execute. // Own it by swapping the chain belonging to it in. aquire_memory_chain(notified_cont->get_memory_block()); + std::cout << "Now in charge of memory chain on core " << my_state.get_id() << std::endl; } my_state.parent_cont_ = notified_cont->get_parent(); my_state.right_spawn_ = notified_cont->is_right_child(); active_node_ = notified_cont->get_memory_block(); + std::cout << "Execute cont on core " << my_state.get_id() << std::endl; notified_cont->execute(); if (!falling_through() && notified_cont->get_parent() != nullptr) { fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child()); @@ -155,8 +181,11 @@ class cont_manager { if (get_node(notified_cont->get_memory_block()->get_depth()) == notified_cont->get_memory_block()) { // We own the thing we are not allowed to execute. // Get rid of the ownership by using the offered chain. - aquire_memory_chain(notified_cont->get_memory_block()->get_offered_chain().load()); + aquire_memory_chain(target_chain); + std::cout << "No longer in charge of chain above on core " << my_state.get_id() << std::endl; } + + my_state.cont_manager_.check_clean_chain(); // We are done here...nothing more to execute return; } @@ -178,6 +207,7 @@ class cont_manager { private: const size_t max_cont_size_; + const size_t num_conts_; /** * Managing the continuation chain. diff --git a/lib/pls/include/pls/internal/scheduling/memory_block.h b/lib/pls/include/pls/internal/scheduling/memory_block.h index b71715b..9ecc015 100644 --- a/lib/pls/include/pls/internal/scheduling/memory_block.h +++ b/lib/pls/include/pls/internal/scheduling/memory_block.h @@ -34,7 +34,10 @@ class memory_block { template T *place_in_buffer(ARGS &&...args) { - PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node.") + if (memory_buffer_used_) { + pls::internal::base::this_thread::sleep(100000); + PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node."); + } memory_buffer_used_ = true; return new(memory_buffer_) T(std::forward(args)...); @@ -43,6 +46,9 @@ class memory_block { PLS_ASSERT(memory_buffer_used_, "Can only free a memory spot when it is in use.") memory_buffer_used_ = false; } + bool is_buffer_used() { + return memory_buffer_used_; + } // TODO: Fit the reset somewhere!!! // // Reset Associated counters @@ -71,7 +77,7 @@ class memory_block { memory_chain_start_ = start; } - enum state { initialized, execute_local, stealing, stolen }; + enum state { initialized, execute_local, stealing, stolen, invalid }; using stamped_state = data_structures::stamped_integer; std::atomic &get_state() { @@ -90,6 +96,13 @@ class memory_block { return depth_; } + void reset_state() { + offered_chain_.store(nullptr); + auto old_state = state_.load(); + state_.store({old_state.stamp + 1, initialized}); + results_missing_.store(2); + } + private: // Linked list property of memory blocks (a complete list represents a threads currently owned memory). // Each block knows its chain start to allow stealing a whole chain in O(1) diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 6d32b1f..fc10abd 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -44,11 +44,11 @@ struct scheduler::starter { // (We might optimize this in the future to require less memory copies.) auto *current_memory_block = cont_manager.get_next_memory_block(); - // We set the correct side when invoking user code. - const bool is_right_cont = my_state.right_spawn_; // We keep track of the last spawn to build up the parent_cont chain + const bool is_right_cont = my_state.right_spawn_; base_cont *parent_cont = my_state.parent_cont_; + continuation_type *current_cont = current_memory_block->place_in_buffer(parent_cont, current_memory_block, is_right_cont, @@ -74,6 +74,8 @@ struct scheduler::starter { if (cont_manager.falling_through()) { // Main scheduling loop is responsible for entering the result to the slow path... current_cont->store_left_result(std::move(result_1)); + auto old_state = current_cont->get_memory_block()->get_state().load(); + current_cont->get_memory_block()->get_state().store({old_state.stamp + 1, memory_block::invalid}); PLS_ASSERT(current_cont->get_memory_block()->get_results_missing().fetch_add(-1) != 1, "We fall through, meaning we 'block' an cont above, thus this can never happen!"); // Unwind stack... diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 41117f3..a65cc1b 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -5,14 +5,18 @@ #include #include #include +#include #include #include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/cont_manager.h" #include "pls/internal/scheduling/memory_block.h" + #include "pls/internal/data_structures/bounded_ws_deque.h" #include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/base/spin_lock.h" + namespace pls { namespace internal { namespace scheduling { @@ -38,24 +42,33 @@ class task_manager { public: // Publishes a task on the stack, i.e. makes it visible for other threads to steal. void publish_task(base_task *task) { + std::lock_guard lock{lock_}; task_deque_.push_bottom(task_handle{task}); } // Try to pop a local task from this task managers stack. bool steal_local_task() { + std::lock_guard lock{lock_}; return task_deque_.pop_bottom(); } // Try to steal a task from a remote task_manager instance. The stolen task must be stored locally. // Returns a pair containing the actual task and if the steal was successful. base_task *steal_remote_task(cont_manager &stealing_cont_manager) { + std::lock_guard lock{lock_}; auto stolen_task_handle = task_deque_.pop_top(); if (stolen_task_handle) { base_task *stolen_task = (*stolen_task_handle).task_; + std::cout << "Nearly stole on core " << thread_state::get().get_id() << " task with depth " + << stolen_task->get_cont()->get_memory_block()->get_depth() << std::endl; auto &atomic_state = (*stolen_task_handle).task_memory_block_->get_state(); auto &atomic_offered_chain = (*stolen_task_handle).task_memory_block_->get_offered_chain(); auto offered_chain = stealing_cont_manager.get_node((*stolen_task_handle).task_memory_block_->get_depth()); + if (offered_chain == (*stolen_task_handle).task_memory_block_) { + PLS_ASSERT(false, "How would we offer our own chain? We only offer when stealing!"); + } + auto last_state = atomic_state.load(); if (last_state.value != memory_block::initialized) { return nullptr; @@ -78,6 +91,7 @@ class task_manager { } if (atomic_state.compare_exchange_strong(loop_state, {loop_state.stamp + 1, memory_block::stolen})) { + std::cout << "Steal!" << std::endl; return stolen_task; } else { return nullptr; @@ -90,10 +104,12 @@ class task_manager { } } - explicit task_manager(data_structures::bounded_ws_deque &task_deque) : task_deque_{task_deque} {} + explicit task_manager(data_structures::bounded_ws_deque &task_deque) : task_deque_{task_deque}, + lock_{} {} private: data_structures::bounded_ws_deque &task_deque_; + base::spin_lock lock_; }; template diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 4ed1d40..88b0799 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -69,6 +69,7 @@ void scheduler::work_thread_work_section() { while (my_cont_manager.falling_through()) { my_cont_manager.execute_fall_through_code(); } + my_cont_manager.check_clean_chain(); // Steal Routine (will be continuously executed when there are no more fall through's). // TODO: move into separate function @@ -82,16 +83,19 @@ void scheduler::work_thread_work_section() { auto &target_state = my_state.get_scheduler().thread_state_for(target); + my_cont_manager.check_clean_chain(); auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager); if (stolen_task != nullptr) { my_state.parent_cont_ = stolen_task->get_cont(); my_state.right_spawn_ = true; my_cont_manager.set_active_depth(stolen_task->get_cont()->get_memory_block()->get_depth() + 1); + my_cont_manager.check_clean_chain(); stolen_task->execute(); if (my_cont_manager.falling_through()) { break; } else { my_cont_manager.fall_through_and_notify_cont(stolen_task->get_cont(), true); + break; } } } diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index 349d63a..6707436 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -1,3 +1,50 @@ #include +#include +#include +#include + +#include "pls/internal/scheduling/scheduler.h" +#include "pls/internal/scheduling/cont.h" +#include "pls/internal/scheduling/cont_manager.h" +#include "pls/internal/scheduling/scheduler_memory.h" +#include "pls/internal/scheduling/parallel_result.h" + +using namespace pls::internal::scheduling; + // TODO: Introduce actual tests once multiple threads work... +TEST_CASE("continuation stealing", "[internal/scheduling/cont_manager.h]") { + const int NUM_THREADS = 2; + const int NUM_TASKS = 8; + const int MAX_TASK_STACK_SIZE = 8; + const int NUM_CONTS = 8; + const int MAX_CONT_SIZE = 256; + + static_scheduler_memory static_scheduler_memory; + + scheduler scheduler{static_scheduler_memory, NUM_THREADS}; + + // Coordinate progress to match OUR order + std::atomic progress{0}; + + // Order: + // 0) work on first task on main thread + // 1) second thread stole right task + + scheduler.perform_work([&]() { + return scheduler::par([&]() { + while (progress.load() != 1); + return parallel_result{0}; + }, [&]() { + progress.store(1); + return parallel_result{0}; + }).then([&](int, int) { + + return parallel_result{0}; + }); + }); +}