diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index 6cb316e..534e7d6 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -90,8 +90,8 @@ complex_vector prepare_input(int input_size) { return data; } -static constexpr int NUM_ITERATIONS = 1000; -constexpr size_t NUM_THREADS = 2; +static constexpr int NUM_ITERATIONS = 500; +constexpr size_t NUM_THREADS = 8; constexpr size_t NUM_TASKS = 128; constexpr size_t MAX_TASK_STACK_SIZE = 0; @@ -127,14 +127,14 @@ int main() { std::cout << "Framework: " << std::chrono::duration_cast(end - start).count() << std::endl; - start = std::chrono::steady_clock::now(); - for (int i = 0; i < NUM_ITERATIONS; i++) { - complex_vector input_1(initial_input); - fft_normal(input_1.begin(), INPUT_SIZE); - } - end = std::chrono::steady_clock::now(); - std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() - << std::endl; +// start = std::chrono::steady_clock::now(); +// for (int i = 0; i < NUM_ITERATIONS; i++) { +// complex_vector input_1(initial_input); +// fft_normal(input_1.begin(), INPUT_SIZE); +// } +// end = std::chrono::steady_clock::now(); +// std::cout << "Normal: " << std::chrono::duration_cast(end - start).count() +// << std::endl; return 0; } diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index c286bce..e5ff31e 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -58,7 +58,7 @@ add_library(pls STATIC include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/cont_manager.h include/pls/internal/scheduling/cont.h - include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h) + include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h src/internal/base/error_handling.cpp) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/internal/base/error_handling.h b/lib/pls/include/pls/internal/base/error_handling.h index 15a2df6..e3b7737 100644 --- a/lib/pls/include/pls/internal/base/error_handling.h +++ b/lib/pls/include/pls/internal/base/error_handling.h @@ -12,6 +12,9 @@ * (or its inclusion adds too much overhead). */ #define PLS_ERROR(msg) printf("%s\n", msg); exit(1); -#define PLS_ASSERT(cond, msg) if (!(cond)) { PLS_ERROR(msg) } + +void pls_error(const char *msg); + +#define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); } #endif //PLS_ERROR_HANDLING_H diff --git a/lib/pls/include/pls/internal/data_structures/delayed_initialization.h b/lib/pls/include/pls/internal/data_structures/delayed_initialization.h index 03b0f51..ec815bd 100644 --- a/lib/pls/include/pls/internal/data_structures/delayed_initialization.h +++ b/lib/pls/include/pls/internal/data_structures/delayed_initialization.h @@ -43,9 +43,7 @@ class delayed_initialization { template void initialize(ARGS &&...args) { - if (initialized_) { - PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!"); - } + PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!"); new((void *) memory_.data()) T(std::forward(args)...); initialized_ = true; @@ -59,9 +57,8 @@ class delayed_initialization { } T &object() { - if (!initialized_) { - PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!") - } + PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!"); + return *reinterpret_cast(memory_.data()); } diff --git a/lib/pls/include/pls/internal/scheduling/cont.h b/lib/pls/include/pls/internal/scheduling/cont.h index 37f8a47..9606f71 100644 --- a/lib/pls/include/pls/internal/scheduling/cont.h +++ b/lib/pls/include/pls/internal/scheduling/cont.h @@ -9,6 +9,7 @@ #include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/data_structures/delayed_initialization.h" #include "pls/internal/base/alignment.h" +#include "pls/internal/base/error_handling.h" #include "parallel_result.h" #include "memory_block.h" @@ -28,7 +29,10 @@ class base_cont { explicit base_cont(base_cont *parent, memory_block *memory_block, bool is_right_child) : parent_{parent}, memory_block_{memory_block}, - is_right_child_{is_right_child} {}; + is_right_child_{is_right_child} { + PLS_ASSERT(parent_ == nullptr || parent_->memory_block_->get_depth() == memory_block_->get_depth() - 1, + "Must only build cont chains with matching depth!") + }; /** * Execute the continuation itself. @@ -77,7 +81,8 @@ class cont : public base_cont { using BASE_RES_TYPE = typename std::remove_cv::type>::type; static void execute(cont &cont) { - parallel_result result{cont.function_((*cont.left_result_).value(), (*cont.right_result_).value())}; + parallel_result + result{cont.function_((*cont.left_result_).value(), (*cont.right_result_).value())}; if (result.fast_path() && cont.parent_ != nullptr) { if (cont.is_right_child()) { cont.parent_->store_right_result(std::move(result)); diff --git a/lib/pls/include/pls/internal/scheduling/cont_manager.h b/lib/pls/include/pls/internal/scheduling/cont_manager.h index a4f065b..a4d9941 100644 --- a/lib/pls/include/pls/internal/scheduling/cont_manager.h +++ b/lib/pls/include/pls/internal/scheduling/cont_manager.h @@ -47,11 +47,11 @@ class cont_manager { } void move_active_node(int depth) { if (depth < 0) { - for (int i = 0; i < (depth * -1); i++) { + for (long i = 0; i < (depth * -1); i++) { active_node_ = active_node_->get_prev(); } } else { - for (int i = 0; i < depth; i++) { + for (long i = 0; i < depth; i++) { active_node_ = active_node_->get_next(); } } @@ -88,49 +88,10 @@ class cont_manager { // Copy fall through status and reset it (for potentially nested execution paths). auto *notified_cont = fall_through_cont_; - bool notifier_is_right_child = fall_through_child_right; - -// std::cout << "Notifying Cont on core " << my_state.get_id() << " and depth " -// << notified_cont->get_memory_block()->get_depth() << std::endl; fall_through_cont_ = nullptr; fall_through_ = false; - // Special case for lock free implementation. - // When we finish a 'left strain' it can happen that the 'right strain' - // is currently being stolen. We need to be sure that this steal is finished - // as we need the thieves memory blocks in case we get blocked by it. - if (!notifier_is_right_child) { - // Check to execute right child directly... - auto &atomic_state = notified_cont->get_memory_block()->get_state(); - auto old_state = atomic_state.load(); - memory_block::stamped_state target_state{old_state.stamp + 1, memory_block::state::execute_local}; - memory_block::stamped_state exchanged_state = atomic_state.exchange(target_state); - if (exchanged_state.value != memory_block::state::stolen) { - // We 'overruled' the stealing thread and execute the other task ourselfs. - // We can be sure that only we where involved in executing the child tasks of the parent_continuation... - notified_cont->get_memory_block()->get_results_missing().fetch_add(-1); - - my_state.parent_cont_ = notified_cont; - my_state.right_spawn_ = true; - notified_cont->execute_task(); - if (falling_through()) { - // ... if the second strain was interrupted we fall through without scheduling the parent_continuation - // (the currently pending/interrupted strain will do this itself). - return; - } else { - // ... we could finish the second strain. - // Register the parent continuation for being notified. - // (This is not the most efficient, as we could simply execute it. However, - // this way of doing it spares us from duplicating a lot of code). - fall_through_and_notify_cont(notified_cont, true); - return; - } - } - - // Right side is 'fully' stolen. We can continue to inform the parent like we would do normally. - } - // Keep the target chain before we execute, as this potentially frees the memory auto *target_chain = notified_cont->get_memory_block()->get_offered_chain().load(); @@ -145,12 +106,10 @@ class cont_manager { // We do not own the thing we will execute. // Own it by swapping the chain belonging to it in. aquire_memory_chain(notified_cont->get_memory_block()); -// std::cout << "Now in charge of memory chain on core " << my_state.get_id() << std::endl; } my_state.parent_cont_ = notified_cont->get_parent(); my_state.right_spawn_ = notified_cont->is_right_child(); active_node_ = notified_cont->get_memory_block(); -// std::cout << "Execute cont on core " << my_state.get_id() << std::endl; notified_cont->execute(); if (!falling_through() && notified_cont->get_parent() != nullptr) { fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child()); @@ -166,7 +125,6 @@ class cont_manager { // We own the thing we are not allowed to execute. // Get rid of the ownership by using the offered chain. aquire_memory_chain(target_chain); -// std::cout << "No longer in charge of chain above on core " << my_state.get_id() << std::endl; } move_active_node_to_start(); diff --git a/lib/pls/include/pls/internal/scheduling/memory_block.h b/lib/pls/include/pls/internal/scheduling/memory_block.h index fbf806b..dbcfcd9 100644 --- a/lib/pls/include/pls/internal/scheduling/memory_block.h +++ b/lib/pls/include/pls/internal/scheduling/memory_block.h @@ -19,7 +19,7 @@ class memory_block { memory_block(char *memory_buffer, size_t memory_buffer_size, memory_block *prev, - unsigned int depth) + int depth) : prev_{prev}, next_{nullptr}, offered_chain_{nullptr}, @@ -32,10 +32,7 @@ class memory_block { template T *place_in_buffer(ARGS &&...args) { - if (memory_buffer_used_) { - pls::internal::base::this_thread::sleep(100000); - PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node."); - } + PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node."); memory_buffer_used_ = true; return new(memory_buffer_) T(std::forward(args)...); @@ -84,7 +81,7 @@ class memory_block { return results_missing_; } - unsigned int get_depth() const noexcept { + int get_depth() const noexcept { return depth_; } @@ -130,7 +127,7 @@ class memory_block { // Each element stays at a fixed depth for the entire application run. // Swapping parts of a memory chain will not reorder it, as always parts of // the same size are exchanged. - const unsigned int depth_; + const int depth_; }; } diff --git a/lib/pls/include/pls/internal/scheduling/task_manager.h b/lib/pls/include/pls/internal/scheduling/task_manager.h index 36add79..4e2b1b3 100644 --- a/lib/pls/include/pls/internal/scheduling/task_manager.h +++ b/lib/pls/include/pls/internal/scheduling/task_manager.h @@ -56,6 +56,9 @@ class task_manager { // Returns a pair containing the actual task and if the steal was successful. base_task *steal_remote_task(cont_manager &stealing_cont_manager) { std::lock_guard lock{lock_}; + + // TODO: See if we can somehow make this trade lock free (and still be correct) + auto stolen_task_handle = task_deque_.pop_top(); if (stolen_task_handle) { base_task *stolen_task = (*stolen_task_handle).task_; @@ -64,50 +67,14 @@ class task_manager { auto &atomic_state = stolen_task_memory->get_state(); auto &atomic_offered_chain = stolen_task_memory->get_offered_chain(); -// std::cout << "Nearly stole on core " << thread_state::get().get_id() << " task with depth " -// << stolen_task_depth << std::endl; - - // Move our chain forward for stealing... + // TODO: We ignore all we tried with lock free implementations here, just store the state how it is supposed to be stealing_cont_manager.move_active_node(stolen_task_depth); auto offered_chain = stealing_cont_manager.get_active_node(); + stealing_cont_manager.move_active_node(1); + atomic_offered_chain.store(offered_chain); + atomic_state.store(memory_block::stolen); - if (offered_chain == (*stolen_task_handle).task_memory_block_) { - PLS_ASSERT(false, "How would we offer our own chain? We only offer when stealing!"); - } - - auto last_state = atomic_state.load(); - if (last_state.value != memory_block::initialized) { - stealing_cont_manager.move_active_node(-stolen_task_depth); - return nullptr; - } - - auto last_offered_chain = atomic_offered_chain.load(); - memory_block::stamped_state loop_state = {last_state.stamp + 1, memory_block::stealing}; - - if (atomic_state.compare_exchange_strong(last_state, loop_state)) { - while (true) { - if (atomic_offered_chain.compare_exchange_strong(last_offered_chain, offered_chain)) { - break; - } - - last_offered_chain = atomic_offered_chain.load(); - last_state = atomic_state.load(); - if (last_state != loop_state) { - stealing_cont_manager.move_active_node(-stolen_task_depth); - return nullptr; - } - } - - if (atomic_state.compare_exchange_strong(loop_state, {loop_state.stamp + 1, memory_block::stolen})) { -// std::cout << "Steal!" << std::endl; - stealing_cont_manager.move_active_node(1); - return stolen_task; - } else { - return nullptr; - } - } else { - return nullptr; - } + return stolen_task; } else { return nullptr; } diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 5b665a0..85a4f0a 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -44,7 +44,12 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { task_manager_{task_manager}, cont_manager_{cont_manager}, current_task_{nullptr}, - random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count())} {} + random_{static_cast(std::chrono::steady_clock::now().time_since_epoch().count())} {}; + + void reset() { + right_spawn_ = false; + parent_cont_ = nullptr; + } /** * Convenience helper to get the thread_state instance associated with this thread. diff --git a/lib/pls/src/internal/base/error_handling.cpp b/lib/pls/src/internal/base/error_handling.cpp new file mode 100644 index 0000000..4243a13 --- /dev/null +++ b/lib/pls/src/internal/base/error_handling.cpp @@ -0,0 +1,5 @@ +#include "pls/internal/base/error_handling.h" + +void pls_error(const char *msg) { + PLS_ERROR(msg); +} diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 19f7df8..595ebe0 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -54,6 +54,7 @@ void scheduler::work_thread_main_loop() { void scheduler::work_thread_work_section() { auto &my_state = thread_state::get(); + my_state.reset(); auto &my_cont_manager = my_state.get_cont_manager(); auto const num_threads = my_state.get_scheduler().num_threads();