Commit 1b576824 by FritzFlorian

First 'crash free' version.

This version runs through our initial fft and fib tests. However, it is not tested further in any way. Additionally, we added a locking deque, potentially hurting performance and moving away from our initial goal.
parent c6dd2fc0
Pipeline #1341 failed with stages
in 40 seconds
...@@ -90,8 +90,8 @@ complex_vector prepare_input(int input_size) { ...@@ -90,8 +90,8 @@ complex_vector prepare_input(int input_size) {
return data; return data;
} }
static constexpr int NUM_ITERATIONS = 1000; static constexpr int NUM_ITERATIONS = 500;
constexpr size_t NUM_THREADS = 2; constexpr size_t NUM_THREADS = 8;
constexpr size_t NUM_TASKS = 128; constexpr size_t NUM_TASKS = 128;
constexpr size_t MAX_TASK_STACK_SIZE = 0; constexpr size_t MAX_TASK_STACK_SIZE = 0;
...@@ -127,14 +127,14 @@ int main() { ...@@ -127,14 +127,14 @@ int main() {
std::cout << "Framework: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() std::cout << "Framework: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< std::endl; << std::endl;
start = std::chrono::steady_clock::now(); // start = std::chrono::steady_clock::now();
for (int i = 0; i < NUM_ITERATIONS; i++) { // for (int i = 0; i < NUM_ITERATIONS; i++) {
complex_vector input_1(initial_input); // complex_vector input_1(initial_input);
fft_normal(input_1.begin(), INPUT_SIZE); // fft_normal(input_1.begin(), INPUT_SIZE);
} // }
end = std::chrono::steady_clock::now(); // end = std::chrono::steady_clock::now();
std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() // std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< std::endl; // << std::endl;
return 0; return 0;
} }
...@@ -58,7 +58,7 @@ add_library(pls STATIC ...@@ -58,7 +58,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/cont_manager.h include/pls/internal/scheduling/cont_manager.h
include/pls/internal/scheduling/cont.h include/pls/internal/scheduling/cont.h
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h) include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h src/internal/base/error_handling.cpp)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
......
...@@ -12,6 +12,9 @@ ...@@ -12,6 +12,9 @@
* (or its inclusion adds too much overhead). * (or its inclusion adds too much overhead).
*/ */
#define PLS_ERROR(msg) printf("%s\n", msg); exit(1); #define PLS_ERROR(msg) printf("%s\n", msg); exit(1);
#define PLS_ASSERT(cond, msg) if (!(cond)) { PLS_ERROR(msg) }
void pls_error(const char *msg);
#define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); }
#endif //PLS_ERROR_HANDLING_H #endif //PLS_ERROR_HANDLING_H
...@@ -43,9 +43,7 @@ class delayed_initialization { ...@@ -43,9 +43,7 @@ class delayed_initialization {
template<typename ...ARGS> template<typename ...ARGS>
void initialize(ARGS &&...args) { void initialize(ARGS &&...args) {
if (initialized_) {
PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!"); PLS_ASSERT(!initialized_, "Can only initialize delayed wrapper object once!");
}
new((void *) memory_.data()) T(std::forward<ARGS>(args)...); new((void *) memory_.data()) T(std::forward<ARGS>(args)...);
initialized_ = true; initialized_ = true;
...@@ -59,9 +57,8 @@ class delayed_initialization { ...@@ -59,9 +57,8 @@ class delayed_initialization {
} }
T &object() { T &object() {
if (!initialized_) { PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!");
PLS_ASSERT(initialized_, "Can not use an uninitialized delayed wrapper object!")
}
return *reinterpret_cast<T *>(memory_.data()); return *reinterpret_cast<T *>(memory_.data());
} }
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "pls/internal/data_structures/stamped_integer.h" #include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/data_structures/delayed_initialization.h" #include "pls/internal/data_structures/delayed_initialization.h"
#include "pls/internal/base/alignment.h" #include "pls/internal/base/alignment.h"
#include "pls/internal/base/error_handling.h"
#include "parallel_result.h" #include "parallel_result.h"
#include "memory_block.h" #include "memory_block.h"
...@@ -28,7 +29,10 @@ class base_cont { ...@@ -28,7 +29,10 @@ class base_cont {
explicit base_cont(base_cont *parent, memory_block *memory_block, bool is_right_child) explicit base_cont(base_cont *parent, memory_block *memory_block, bool is_right_child)
: parent_{parent}, : parent_{parent},
memory_block_{memory_block}, memory_block_{memory_block},
is_right_child_{is_right_child} {}; is_right_child_{is_right_child} {
PLS_ASSERT(parent_ == nullptr || parent_->memory_block_->get_depth() == memory_block_->get_depth() - 1,
"Must only build cont chains with matching depth!")
};
/** /**
* Execute the continuation itself. * Execute the continuation itself.
...@@ -77,7 +81,8 @@ class cont : public base_cont { ...@@ -77,7 +81,8 @@ class cont : public base_cont {
using BASE_RES_TYPE = typename std::remove_cv<typename std::remove_reference<RES_TYPE>::type>::type; using BASE_RES_TYPE = typename std::remove_cv<typename std::remove_reference<RES_TYPE>::type>::type;
static void execute(cont &cont) { static void execute(cont &cont) {
parallel_result<BASE_RES_TYPE> result{cont.function_((*cont.left_result_).value(), (*cont.right_result_).value())}; parallel_result<BASE_RES_TYPE>
result{cont.function_((*cont.left_result_).value(), (*cont.right_result_).value())};
if (result.fast_path() && cont.parent_ != nullptr) { if (result.fast_path() && cont.parent_ != nullptr) {
if (cont.is_right_child()) { if (cont.is_right_child()) {
cont.parent_->store_right_result(std::move(result)); cont.parent_->store_right_result(std::move(result));
......
...@@ -47,11 +47,11 @@ class cont_manager { ...@@ -47,11 +47,11 @@ class cont_manager {
} }
void move_active_node(int depth) { void move_active_node(int depth) {
if (depth < 0) { if (depth < 0) {
for (int i = 0; i < (depth * -1); i++) { for (long i = 0; i < (depth * -1); i++) {
active_node_ = active_node_->get_prev(); active_node_ = active_node_->get_prev();
} }
} else { } else {
for (int i = 0; i < depth; i++) { for (long i = 0; i < depth; i++) {
active_node_ = active_node_->get_next(); active_node_ = active_node_->get_next();
} }
} }
...@@ -88,49 +88,10 @@ class cont_manager { ...@@ -88,49 +88,10 @@ class cont_manager {
// Copy fall through status and reset it (for potentially nested execution paths). // Copy fall through status and reset it (for potentially nested execution paths).
auto *notified_cont = fall_through_cont_; auto *notified_cont = fall_through_cont_;
bool notifier_is_right_child = fall_through_child_right;
// std::cout << "Notifying Cont on core " << my_state.get_id() << " and depth "
// << notified_cont->get_memory_block()->get_depth() << std::endl;
fall_through_cont_ = nullptr; fall_through_cont_ = nullptr;
fall_through_ = false; fall_through_ = false;
// Special case for lock free implementation.
// When we finish a 'left strain' it can happen that the 'right strain'
// is currently being stolen. We need to be sure that this steal is finished
// as we need the thieves memory blocks in case we get blocked by it.
if (!notifier_is_right_child) {
// Check to execute right child directly...
auto &atomic_state = notified_cont->get_memory_block()->get_state();
auto old_state = atomic_state.load();
memory_block::stamped_state target_state{old_state.stamp + 1, memory_block::state::execute_local};
memory_block::stamped_state exchanged_state = atomic_state.exchange(target_state);
if (exchanged_state.value != memory_block::state::stolen) {
// We 'overruled' the stealing thread and execute the other task ourselfs.
// We can be sure that only we where involved in executing the child tasks of the parent_continuation...
notified_cont->get_memory_block()->get_results_missing().fetch_add(-1);
my_state.parent_cont_ = notified_cont;
my_state.right_spawn_ = true;
notified_cont->execute_task();
if (falling_through()) {
// ... if the second strain was interrupted we fall through without scheduling the parent_continuation
// (the currently pending/interrupted strain will do this itself).
return;
} else {
// ... we could finish the second strain.
// Register the parent continuation for being notified.
// (This is not the most efficient, as we could simply execute it. However,
// this way of doing it spares us from duplicating a lot of code).
fall_through_and_notify_cont(notified_cont, true);
return;
}
}
// Right side is 'fully' stolen. We can continue to inform the parent like we would do normally.
}
// Keep the target chain before we execute, as this potentially frees the memory // Keep the target chain before we execute, as this potentially frees the memory
auto *target_chain = notified_cont->get_memory_block()->get_offered_chain().load(); auto *target_chain = notified_cont->get_memory_block()->get_offered_chain().load();
...@@ -145,12 +106,10 @@ class cont_manager { ...@@ -145,12 +106,10 @@ class cont_manager {
// We do not own the thing we will execute. // We do not own the thing we will execute.
// Own it by swapping the chain belonging to it in. // Own it by swapping the chain belonging to it in.
aquire_memory_chain(notified_cont->get_memory_block()); aquire_memory_chain(notified_cont->get_memory_block());
// std::cout << "Now in charge of memory chain on core " << my_state.get_id() << std::endl;
} }
my_state.parent_cont_ = notified_cont->get_parent(); my_state.parent_cont_ = notified_cont->get_parent();
my_state.right_spawn_ = notified_cont->is_right_child(); my_state.right_spawn_ = notified_cont->is_right_child();
active_node_ = notified_cont->get_memory_block(); active_node_ = notified_cont->get_memory_block();
// std::cout << "Execute cont on core " << my_state.get_id() << std::endl;
notified_cont->execute(); notified_cont->execute();
if (!falling_through() && notified_cont->get_parent() != nullptr) { if (!falling_through() && notified_cont->get_parent() != nullptr) {
fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child()); fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child());
...@@ -166,7 +125,6 @@ class cont_manager { ...@@ -166,7 +125,6 @@ class cont_manager {
// We own the thing we are not allowed to execute. // We own the thing we are not allowed to execute.
// Get rid of the ownership by using the offered chain. // Get rid of the ownership by using the offered chain.
aquire_memory_chain(target_chain); aquire_memory_chain(target_chain);
// std::cout << "No longer in charge of chain above on core " << my_state.get_id() << std::endl;
} }
move_active_node_to_start(); move_active_node_to_start();
......
...@@ -19,7 +19,7 @@ class memory_block { ...@@ -19,7 +19,7 @@ class memory_block {
memory_block(char *memory_buffer, memory_block(char *memory_buffer,
size_t memory_buffer_size, size_t memory_buffer_size,
memory_block *prev, memory_block *prev,
unsigned int depth) int depth)
: prev_{prev}, : prev_{prev},
next_{nullptr}, next_{nullptr},
offered_chain_{nullptr}, offered_chain_{nullptr},
...@@ -32,10 +32,7 @@ class memory_block { ...@@ -32,10 +32,7 @@ class memory_block {
template<typename T, typename ...ARGS> template<typename T, typename ...ARGS>
T *place_in_buffer(ARGS &&...args) { T *place_in_buffer(ARGS &&...args) {
if (memory_buffer_used_) {
pls::internal::base::this_thread::sleep(100000);
PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node."); PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node.");
}
memory_buffer_used_ = true; memory_buffer_used_ = true;
return new(memory_buffer_) T(std::forward<ARGS>(args)...); return new(memory_buffer_) T(std::forward<ARGS>(args)...);
...@@ -84,7 +81,7 @@ class memory_block { ...@@ -84,7 +81,7 @@ class memory_block {
return results_missing_; return results_missing_;
} }
unsigned int get_depth() const noexcept { int get_depth() const noexcept {
return depth_; return depth_;
} }
...@@ -130,7 +127,7 @@ class memory_block { ...@@ -130,7 +127,7 @@ class memory_block {
// Each element stays at a fixed depth for the entire application run. // Each element stays at a fixed depth for the entire application run.
// Swapping parts of a memory chain will not reorder it, as always parts of // Swapping parts of a memory chain will not reorder it, as always parts of
// the same size are exchanged. // the same size are exchanged.
const unsigned int depth_; const int depth_;
}; };
} }
......
...@@ -56,6 +56,9 @@ class task_manager { ...@@ -56,6 +56,9 @@ class task_manager {
// Returns a pair containing the actual task and if the steal was successful. // Returns a pair containing the actual task and if the steal was successful.
base_task *steal_remote_task(cont_manager &stealing_cont_manager) { base_task *steal_remote_task(cont_manager &stealing_cont_manager) {
std::lock_guard<base::spin_lock> lock{lock_}; std::lock_guard<base::spin_lock> lock{lock_};
// TODO: See if we can somehow make this trade lock free (and still be correct)
auto stolen_task_handle = task_deque_.pop_top(); auto stolen_task_handle = task_deque_.pop_top();
if (stolen_task_handle) { if (stolen_task_handle) {
base_task *stolen_task = (*stolen_task_handle).task_; base_task *stolen_task = (*stolen_task_handle).task_;
...@@ -64,53 +67,17 @@ class task_manager { ...@@ -64,53 +67,17 @@ class task_manager {
auto &atomic_state = stolen_task_memory->get_state(); auto &atomic_state = stolen_task_memory->get_state();
auto &atomic_offered_chain = stolen_task_memory->get_offered_chain(); auto &atomic_offered_chain = stolen_task_memory->get_offered_chain();
// std::cout << "Nearly stole on core " << thread_state::get().get_id() << " task with depth " // TODO: We ignore all we tried with lock free implementations here, just store the state how it is supposed to be
// << stolen_task_depth << std::endl;
// Move our chain forward for stealing...
stealing_cont_manager.move_active_node(stolen_task_depth); stealing_cont_manager.move_active_node(stolen_task_depth);
auto offered_chain = stealing_cont_manager.get_active_node(); auto offered_chain = stealing_cont_manager.get_active_node();
if (offered_chain == (*stolen_task_handle).task_memory_block_) {
PLS_ASSERT(false, "How would we offer our own chain? We only offer when stealing!");
}
auto last_state = atomic_state.load();
if (last_state.value != memory_block::initialized) {
stealing_cont_manager.move_active_node(-stolen_task_depth);
return nullptr;
}
auto last_offered_chain = atomic_offered_chain.load();
memory_block::stamped_state loop_state = {last_state.stamp + 1, memory_block::stealing};
if (atomic_state.compare_exchange_strong(last_state, loop_state)) {
while (true) {
if (atomic_offered_chain.compare_exchange_strong(last_offered_chain, offered_chain)) {
break;
}
last_offered_chain = atomic_offered_chain.load();
last_state = atomic_state.load();
if (last_state != loop_state) {
stealing_cont_manager.move_active_node(-stolen_task_depth);
return nullptr;
}
}
if (atomic_state.compare_exchange_strong(loop_state, {loop_state.stamp + 1, memory_block::stolen})) {
// std::cout << "Steal!" << std::endl;
stealing_cont_manager.move_active_node(1); stealing_cont_manager.move_active_node(1);
atomic_offered_chain.store(offered_chain);
atomic_state.store(memory_block::stolen);
return stolen_task; return stolen_task;
} else { } else {
return nullptr; return nullptr;
} }
} else {
return nullptr;
}
} else {
return nullptr;
}
} }
explicit task_manager(data_structures::bounded_ws_deque<task_handle> &task_deque) : task_deque_{task_deque}, explicit task_manager(data_structures::bounded_ws_deque<task_handle> &task_deque) : task_deque_{task_deque},
......
...@@ -44,7 +44,12 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state { ...@@ -44,7 +44,12 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state {
task_manager_{task_manager}, task_manager_{task_manager},
cont_manager_{cont_manager}, cont_manager_{cont_manager},
current_task_{nullptr}, current_task_{nullptr},
random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {} random_{static_cast<unsigned long>(std::chrono::steady_clock::now().time_since_epoch().count())} {};
void reset() {
right_spawn_ = false;
parent_cont_ = nullptr;
}
/** /**
* Convenience helper to get the thread_state instance associated with this thread. * Convenience helper to get the thread_state instance associated with this thread.
......
#include "pls/internal/base/error_handling.h"
void pls_error(const char *msg) {
PLS_ERROR(msg);
}
...@@ -54,6 +54,7 @@ void scheduler::work_thread_main_loop() { ...@@ -54,6 +54,7 @@ void scheduler::work_thread_main_loop() {
void scheduler::work_thread_work_section() { void scheduler::work_thread_work_section() {
auto &my_state = thread_state::get(); auto &my_state = thread_state::get();
my_state.reset();
auto &my_cont_manager = my_state.get_cont_manager(); auto &my_cont_manager = my_state.get_cont_manager();
auto const num_threads = my_state.get_scheduler().num_threads(); auto const num_threads = my_state.get_scheduler().num_threads();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment