From c6dd2fc0c9d49c57d316190b02d9dc447d0b5cdf Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Fri, 29 Nov 2019 13:49:47 +0100 Subject: [PATCH] WIP: Partly functional version. Stealing and continuation tarding works 'most' of the time. The main issue seems to still be the fact that we have a lock free protocol where a steal can be pending. We plan to remove this fact next by introducing a protocol that works on a single atomic update. --- app/benchmark_fft/main.cpp | 24 +++++++++--------------- app/playground/main.cpp | 59 +++++++++++++++++++++++++---------------------------------- lib/pls/include/pls/internal/scheduling/cont.h | 19 +++++++++---------- lib/pls/include/pls/internal/scheduling/cont_manager.h | 92 +++++++++++++++++++++++++++++++++++++------------------------------------------------------- lib/pls/include/pls/internal/scheduling/memory_block.h | 11 +---------- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 4 +--- lib/pls/include/pls/internal/scheduling/task_manager.h | 21 +++++++++++++++------ lib/pls/src/internal/scheduling/scheduler.cpp | 5 +---- 8 files changed, 98 insertions(+), 137 deletions(-) diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index e890d9e..6cb316e 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -8,8 +8,6 @@ using namespace pls::internal::scheduling; #include #include -std::atomic count; - static constexpr int CUTOFF = 16; static constexpr int INPUT_SIZE = 8192; typedef std::vector> complex_vector; @@ -44,7 +42,6 @@ void combine(complex_vector::iterator data, int n) { void fft_normal(complex_vector::iterator data, int n) { if (n < 2) { -// count++; return; } @@ -56,14 +53,15 @@ void fft_normal(complex_vector::iterator data, int n) { parallel_result fft(complex_vector::iterator data, int n) { if (n < 2) { - return 0; + return parallel_result{0}; } divide(data, n); if (n <= CUTOFF) { fft_normal(data, n / 2); fft_normal(data + n / 2, n / 2); - return 0; + combine(data, n); + return parallel_result{0}; } else { return scheduler::par([=]() { return fft(data, n / 2); @@ -71,7 +69,7 @@ parallel_result fft(complex_vector::iterator data, int n) { return fft(data + n / 2, n / 2); }).then([=](int, int) { combine(data, n); - return 0; + return parallel_result{0}; }); } } @@ -93,13 +91,13 @@ complex_vector prepare_input(int input_size) { } static constexpr int NUM_ITERATIONS = 1000; -constexpr size_t NUM_THREADS = 1; +constexpr size_t NUM_THREADS = 2; -constexpr size_t NUM_TASKS = 64; +constexpr size_t NUM_TASKS = 128; constexpr size_t MAX_TASK_STACK_SIZE = 0; -constexpr size_t NUM_CONTS = 64; -constexpr size_t MAX_CONT_SIZE = 192; +constexpr size_t NUM_CONTS = 128; +constexpr size_t MAX_CONT_SIZE = 512; int main() { complex_vector initial_input = prepare_input(INPUT_SIZE); @@ -112,7 +110,6 @@ int main() { scheduler scheduler{static_scheduler_memory, NUM_THREADS}; - count.store(0); auto start = std::chrono::steady_clock::now(); for (int i = 0; i < NUM_ITERATIONS; i++) { complex_vector input_2(initial_input); @@ -122,23 +119,20 @@ int main() { }, []() { return parallel_result{0}; }).then([](int, int) { - return 0; + return parallel_result{0}; }); }); } auto end = std::chrono::steady_clock::now(); - std::cout << "Count: " << count.load() << std::endl; std::cout << "Framework: " << std::chrono::duration_cast(end - start).count() << std::endl; - count.store(0); start = std::chrono::steady_clock::now(); for (int i = 0; i < NUM_ITERATIONS; i++) { complex_vector input_1(initial_input); fft_normal(input_1.begin(), INPUT_SIZE); } end = std::chrono::steady_clock::now(); - std::cout << "Count: " << count.load() << std::endl; std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() << std::endl; diff --git a/app/playground/main.cpp b/app/playground/main.cpp index f400ab1..fe090bc 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -8,18 +8,15 @@ using namespace pls::internal; -constexpr size_t NUM_THREADS = 2; +constexpr size_t NUM_THREADS = 1; -constexpr size_t NUM_TASKS = 64; +constexpr size_t NUM_TASKS = 128; constexpr size_t MAX_TASK_STACK_SIZE = 0; -constexpr size_t NUM_CONTS = 64; +constexpr size_t NUM_CONTS = 128; constexpr size_t MAX_CONT_SIZE = 256; -std::atomic count{0}; -scheduling::parallel_result fib(int n) { - base::this_thread::sleep(100); -// std::cout << "Fib(" << n << "): " << count++ << ", " << scheduling::thread_state::get().get_id() << std::endl; +int fib_normal(int n) { if (n == 0) { return 0; } @@ -27,20 +24,11 @@ scheduling::parallel_result fib(int n) { return 1; } - return scheduling::scheduler::par([=]() { - return fib(n - 1); - }, [=]() { - return fib(n - 2); - }).then([=](int a, int b) { - scheduling::parallel_result result{a + b}; - base::this_thread::sleep(100); -// std::cout << "Done Fib(" << n << "): " << (a + b) << ", " << scheduling::thread_state::get().get_id() << std::endl; - return result; - }); + int result = fib_normal(n - 1) + fib_normal(n - 2); + return result; } -int fib_normal(int n) { -// std::cout << "Fib(" << n << "): " << count++ << std::endl; +scheduling::parallel_result fib(int n) { if (n == 0) { return 0; } @@ -48,9 +36,13 @@ int fib_normal(int n) { return 1; } - int result = fib_normal(n - 1) + fib_normal(n - 2); -// std::cout << "Done Fib(" << n << "): " << result << std::endl; - return result; + return scheduling::scheduler::par([=]() { + return fib(n - 1); + }, [=]() { + return fib(n - 2); + }).then([=](int a, int b) { + return scheduling::parallel_result{a + b}; + }); } int main() { @@ -63,27 +55,26 @@ int main() { scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS}; auto start = std::chrono::steady_clock::now(); -// std::cout << "fib = " << fib_normal(10) << std::endl; + std::cout << "fib = " << fib_normal(30) << std::endl; auto end = std::chrono::steady_clock::now(); - std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() + std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() << std::endl; start = std::chrono::steady_clock::now(); scheduler.perform_work([]() { -// return scheduling::scheduler::par([]() { -// return scheduling::parallel_result(0); -// }, []() { -// return fib(16); -// }).then([](int, int b) { -// std::cout << "fib = " << (b) << std::endl; -// return scheduling::parallel_result{0}; -// }); - return fib(10); + return scheduling::scheduler::par([]() { + return scheduling::parallel_result(0); + }, []() { + return fib(30); + }).then([](int, int b) { + std::cout << "fib = " << b << std::endl; + return scheduling::parallel_result{0}; + }); }); end = std::chrono::steady_clock::now(); - std::cout << "Framework: " << std::chrono::duration_cast(end - start).count() << std::endl; + std::cout << "Framework: " << std::chrono::duration_cast(end - start).count() << std::endl; return 0; } diff --git a/lib/pls/include/pls/internal/scheduling/cont.h b/lib/pls/include/pls/internal/scheduling/cont.h index 134db43..37f8a47 100644 --- a/lib/pls/include/pls/internal/scheduling/cont.h +++ b/lib/pls/include/pls/internal/scheduling/cont.h @@ -77,7 +77,7 @@ class cont : public base_cont { using BASE_RES_TYPE = typename std::remove_cv::type>::type; static void execute(cont &cont) { - parallel_result result{cont.function_((*cont.result_1_).value(), (*cont.result_2_).value())}; + parallel_result result{cont.function_((*cont.left_result_).value(), (*cont.right_result_).value())}; if (result.fast_path() && cont.parent_ != nullptr) { if (cont.is_right_child()) { cont.parent_->store_right_result(std::move(result)); @@ -90,7 +90,7 @@ class cont : public base_cont { template struct result_runner> { static void execute(cont &cont) { - auto result = cont.function_((*cont.result_1_).value(), (*cont.result_2_).value()); + auto result = cont.function_((*cont.left_result_).value(), (*cont.right_result_).value()); if (result.fast_path() && cont.parent_) { if (cont.is_right_child()) { cont.parent_->store_right_result(std::move(result)); @@ -113,7 +113,7 @@ class cont : public base_cont { task_{std::forward(task_2_args)..., this} {}; void execute() override { - using result_type = decltype(function_((*result_1_).value(), (*result_2_).value())); + using result_type = decltype(function_((*left_result_).value(), (*right_result_).value())); result_runner::execute(*this); this->get_memory_block()->free_buffer(); this->get_memory_block()->reset_state(); @@ -124,12 +124,11 @@ class cont : public base_cont { task_.execute(); } - void *get_right_result_pointer() override { - return &result_1_; - } - void *get_left_result_pointer() override { - return &result_2_; + return &left_result_; + } + void *get_right_result_pointer() override { + return &right_result_; } T2 *get_task() { @@ -143,8 +142,8 @@ class cont : public base_cont { // Some fields/actual values stay uninitialized (save time on the fast path if we don not need them). // More fields untouched on the fast path is good, but for ease of an implementation we only keep some for now. - delayed_init result_1_; - delayed_init result_2_; + delayed_init left_result_; + delayed_init right_result_; }; } diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h index c8797e6..a4f065b 100644 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ b/lib/pls/include/pls/internal/scheduling/cont_manager.h @@ -25,12 +25,12 @@ class cont_manager { : max_cont_size_{MAX_CONT_SIZE}, num_conts_{NUM_CONTS} { // First node is currently active and our local start - start_node_ = active_node_ = init_memory_block(cont_storage, nullptr, nullptr, 0); + active_node_ = init_memory_block(cont_storage, nullptr, 0); // Build up chain after it - memory_block *current_node = start_node_; + memory_block *current_node = active_node_; for (size_t i = 1; i < NUM_CONTS; i++) { - memory_block *next_node = init_memory_block(cont_storage, start_node_, current_node, i); + memory_block *next_node = init_memory_block(cont_storage, current_node, i); current_node->set_next(next_node); current_node = next_node; } @@ -38,16 +38,30 @@ class cont_manager { // Aquire and release memory blocks... memory_block *get_next_memory_block() { - active_node_->set_start(start_node_); auto result = active_node_; - active_node_ = active_node_->get_next(); - return result; } void return_memory_block() { active_node_ = active_node_->get_prev(); } + void move_active_node(int depth) { + if (depth < 0) { + for (int i = 0; i < (depth * -1); i++) { + active_node_ = active_node_->get_prev(); + } + } else { + for (int i = 0; i < depth; i++) { + active_node_ = active_node_->get_next(); + } + } + } + void move_active_node_to_start() { + move_active_node(-1 * active_node_->get_depth()); + } + memory_block *get_active_node() { + return active_node_; + } // Manage the fall through behaviour/slow path behaviour bool falling_through() const { @@ -60,44 +74,11 @@ class cont_manager { } void aquire_memory_chain(memory_block *target_chain) { - auto *our_next_node = get_node(target_chain->get_depth() + 1); - - our_next_node->set_prev(target_chain); - target_chain->set_next(our_next_node); - - start_node_ = target_chain->get_start(); - active_node_ = target_chain; - } - - memory_block *get_node(unsigned int depth) { - // TODO: Remove this O(n) factor to avoid the - // T_1/P + (T_lim + D) stealing time bound. - memory_block *current = start_node_; - for (unsigned int i = 0; i < depth; i++) { - current->set_start(start_node_); - current = current->get_next(); - } - - return current; - } - - void check_clean_chain() { - memory_block *current = start_node_; - for (unsigned int i = 0; i < num_conts_; i++) { - bool buffer_used = current->is_buffer_used(); - auto state_value = current->get_state().load().value; - if (state_value != memory_block::initialized || buffer_used || current->get_depth() != i) { - PLS_ASSERT(false, - "Must always steal with a clean chain!"); - } - - current->set_start(start_node_); - current = current->get_next(); - } - } + PLS_ASSERT(active_node_->get_depth() == target_chain->get_depth() + 1, + "Can only steal aquire chain parts with correct depth."); - void set_active_depth(unsigned int depth) { - active_node_ = get_node(depth); + active_node_->set_prev(target_chain); + target_chain->set_next(active_node_); } void execute_fall_through_code() { @@ -109,8 +90,8 @@ class cont_manager { auto *notified_cont = fall_through_cont_; bool notifier_is_right_child = fall_through_child_right; - std::cout << "Notifying Cont on core " << my_state.get_id() << " and depth " - << notified_cont->get_memory_block()->get_depth() << std::endl; +// std::cout << "Notifying Cont on core " << my_state.get_id() << " and depth " +// << notified_cont->get_memory_block()->get_depth() << std::endl; fall_through_cont_ = nullptr; fall_through_ = false; @@ -158,17 +139,18 @@ class cont_manager { // ... we finished the continuation. // We are now in charge continuing to execute the above continuation chain. - if (get_node(notified_cont->get_memory_block()->get_depth()) != notified_cont->get_memory_block()) { - my_state.cont_manager_.check_clean_chain(); + PLS_ASSERT(active_node_->get_prev()->get_depth() == notified_cont->get_memory_block()->get_depth(), + "We must hold the system invariant to be in the correct depth.") + if (active_node_->get_prev() != notified_cont->get_memory_block()) { // We do not own the thing we will execute. // Own it by swapping the chain belonging to it in. aquire_memory_chain(notified_cont->get_memory_block()); - std::cout << "Now in charge of memory chain on core " << my_state.get_id() << std::endl; +// std::cout << "Now in charge of memory chain on core " << my_state.get_id() << std::endl; } my_state.parent_cont_ = notified_cont->get_parent(); my_state.right_spawn_ = notified_cont->is_right_child(); active_node_ = notified_cont->get_memory_block(); - std::cout << "Execute cont on core " << my_state.get_id() << std::endl; +// std::cout << "Execute cont on core " << my_state.get_id() << std::endl; notified_cont->execute(); if (!falling_through() && notified_cont->get_parent() != nullptr) { fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child()); @@ -178,14 +160,16 @@ class cont_manager { // ... we did not finish the last continuation. // We are no longer in charge of executing the above continuation chain. - if (get_node(notified_cont->get_memory_block()->get_depth()) == notified_cont->get_memory_block()) { + PLS_ASSERT(active_node_->get_prev()->get_depth() == notified_cont->get_memory_block()->get_depth(), + "We must hold the system invariant to be in the correct depth.") + if (active_node_->get_prev() == notified_cont->get_memory_block()) { // We own the thing we are not allowed to execute. // Get rid of the ownership by using the offered chain. aquire_memory_chain(target_chain); - std::cout << "No longer in charge of chain above on core " << my_state.get_id() << std::endl; +// std::cout << "No longer in charge of chain above on core " << my_state.get_id() << std::endl; } - my_state.cont_manager_.check_clean_chain(); + move_active_node_to_start(); // We are done here...nothing more to execute return; } @@ -194,7 +178,6 @@ class cont_manager { private: template static memory_block *init_memory_block(data_structures::aligned_stack &cont_storage, - memory_block *memory_chain_start, memory_block *prev, unsigned long depth) { // Represents one cont_node and its corresponding memory buffer (as one continuous block of memory). @@ -202,7 +185,7 @@ class cont_manager { char *memory_block_ptr = cont_storage.push_bytes(); char *memory_block_buffer_ptr = cont_storage.push_bytes(buffer_size); - return new(memory_block_ptr) memory_block(memory_block_buffer_ptr, buffer_size, memory_chain_start, prev, depth); + return new(memory_block_ptr) memory_block(memory_block_buffer_ptr, buffer_size, prev, depth); } private: @@ -212,7 +195,6 @@ class cont_manager { /** * Managing the continuation chain. */ - memory_block *start_node_; memory_block *active_node_; /** diff --git a/lib/pls/include/pls/internal/scheduling/memory_block.h b/lib/pls/include/pls/internal/scheduling/memory_block.h index 9ecc015..fbf806b 100644 --- a/lib/pls/include/pls/internal/scheduling/memory_block.h +++ b/lib/pls/include/pls/internal/scheduling/memory_block.h @@ -18,11 +18,9 @@ class memory_block { public: memory_block(char *memory_buffer, size_t memory_buffer_size, - memory_block *memory_chain_start, memory_block *prev, unsigned int depth) - : memory_chain_start_{memory_chain_start}, - prev_{prev}, + : prev_{prev}, next_{nullptr}, offered_chain_{nullptr}, state_{{initialized}}, @@ -70,12 +68,6 @@ class memory_block { void set_next(memory_block *next) { next_ = next; } - memory_block *get_start() { - return memory_chain_start_; - } - void set_start(memory_block *start) { - memory_chain_start_ = start; - } enum state { initialized, execute_local, stealing, stolen, invalid }; using stamped_state = data_structures::stamped_integer; @@ -107,7 +99,6 @@ class memory_block { // Linked list property of memory blocks (a complete list represents a threads currently owned memory). // Each block knows its chain start to allow stealing a whole chain in O(1) // without the need to traverse back to the chain start. - memory_block *memory_chain_start_; memory_block *prev_, *next_; // When blocked on this chain element, we need to know what other chain of memory we diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index fc10abd..ab2a39a 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -76,8 +76,7 @@ struct scheduler::starter { current_cont->store_left_result(std::move(result_1)); auto old_state = current_cont->get_memory_block()->get_state().load(); current_cont->get_memory_block()->get_state().store({old_state.stamp + 1, memory_block::invalid}); - PLS_ASSERT(current_cont->get_memory_block()->get_results_missing().fetch_add(-1) != 1, - "We fall through, meaning we 'block' an cont above, thus this can never happen!"); + current_cont->get_memory_block()->get_results_missing().fetch_add(-1); // Unwind stack... return result_type{}; } @@ -132,7 +131,6 @@ class scheduler::init_function_impl : public init_function { explicit init_function_impl(F &function) : function_{function} {} void run() override { scheduler::par([]() { - std::cout << "Dummy Strain, " << scheduling::thread_state::get().get_id() << std::endl; return parallel_result{0}; }, [=]() { return function_(); diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index a65cc1b..36add79 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -59,11 +59,17 @@ class task_manager { auto stolen_task_handle = task_deque_.pop_top(); if (stolen_task_handle) { base_task *stolen_task = (*stolen_task_handle).task_; - std::cout << "Nearly stole on core " << thread_state::get().get_id() << " task with depth " - << stolen_task->get_cont()->get_memory_block()->get_depth() << std::endl; - auto &atomic_state = (*stolen_task_handle).task_memory_block_->get_state(); - auto &atomic_offered_chain = (*stolen_task_handle).task_memory_block_->get_offered_chain(); - auto offered_chain = stealing_cont_manager.get_node((*stolen_task_handle).task_memory_block_->get_depth()); + memory_block *stolen_task_memory = (*stolen_task_handle).task_memory_block_; + auto stolen_task_depth = stolen_task_memory->get_depth(); + auto &atomic_state = stolen_task_memory->get_state(); + auto &atomic_offered_chain = stolen_task_memory->get_offered_chain(); + +// std::cout << "Nearly stole on core " << thread_state::get().get_id() << " task with depth " +// << stolen_task_depth << std::endl; + + // Move our chain forward for stealing... + stealing_cont_manager.move_active_node(stolen_task_depth); + auto offered_chain = stealing_cont_manager.get_active_node(); if (offered_chain == (*stolen_task_handle).task_memory_block_) { PLS_ASSERT(false, "How would we offer our own chain? We only offer when stealing!"); @@ -71,6 +77,7 @@ class task_manager { auto last_state = atomic_state.load(); if (last_state.value != memory_block::initialized) { + stealing_cont_manager.move_active_node(-stolen_task_depth); return nullptr; } @@ -86,12 +93,14 @@ class task_manager { last_offered_chain = atomic_offered_chain.load(); last_state = atomic_state.load(); if (last_state != loop_state) { + stealing_cont_manager.move_active_node(-stolen_task_depth); return nullptr; } } if (atomic_state.compare_exchange_strong(loop_state, {loop_state.stamp + 1, memory_block::stolen})) { - std::cout << "Steal!" << std::endl; +// std::cout << "Steal!" << std::endl; + stealing_cont_manager.move_active_node(1); return stolen_task; } else { return nullptr; diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 88b0799..19f7df8 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -69,7 +69,6 @@ void scheduler::work_thread_work_section() { while (my_cont_manager.falling_through()) { my_cont_manager.execute_fall_through_code(); } - my_cont_manager.check_clean_chain(); // Steal Routine (will be continuously executed when there are no more fall through's). // TODO: move into separate function @@ -83,13 +82,11 @@ void scheduler::work_thread_work_section() { auto &target_state = my_state.get_scheduler().thread_state_for(target); - my_cont_manager.check_clean_chain(); + PLS_ASSERT(my_cont_manager.get_active_node()->get_depth() == 0, "Only steal with clean chain!"); auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager); if (stolen_task != nullptr) { my_state.parent_cont_ = stolen_task->get_cont(); my_state.right_spawn_ = true; - my_cont_manager.set_active_depth(stolen_task->get_cont()->get_memory_block()->get_depth() + 1); - my_cont_manager.check_clean_chain(); stolen_task->execute(); if (my_cont_manager.falling_through()) { break; -- libgit2 0.26.0