Commit adf05e9a by FritzFlorian

WIP: We plan to fully remove the start property from the cont manager.

The start_chain property does not make sense, as chains are purely 'virtual', i.e. they only fully exist when walking through the computation (by patching them on important events). We initially added the property as a helper for better runtime and simpler implementation, but we think without it we will not get as much inconsistency in the runtime state. Performance can be 're-added' later on.
parent 21733e4c
......@@ -4,10 +4,11 @@
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/base/thread.h"
using namespace pls::internal;
constexpr size_t NUM_THREADS = 8;
constexpr size_t NUM_THREADS = 2;
constexpr size_t NUM_TASKS = 64;
constexpr size_t MAX_TASK_STACK_SIZE = 0;
......@@ -17,6 +18,7 @@ constexpr size_t MAX_CONT_SIZE = 256;
std::atomic<int> count{0};
scheduling::parallel_result<int> fib(int n) {
base::this_thread::sleep(100);
// std::cout << "Fib(" << n << "): " << count++ << ", " << scheduling::thread_state::get().get_id() << std::endl;
if (n == 0) {
return 0;
......@@ -31,6 +33,7 @@ scheduling::parallel_result<int> fib(int n) {
return fib(n - 2);
}).then([=](int a, int b) {
scheduling::parallel_result<int> result{a + b};
base::this_thread::sleep(100);
// std::cout << "Done Fib(" << n << "): " << (a + b) << ", " << scheduling::thread_state::get().get_id() << std::endl;
return result;
});
......@@ -60,7 +63,7 @@ int main() {
scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS};
auto start = std::chrono::steady_clock::now();
std::cout << "fib = " << fib_normal(16) << std::endl;
// std::cout << "fib = " << fib_normal(10) << std::endl;
auto end = std::chrono::steady_clock::now();
std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count()
<< std::endl;
......@@ -76,7 +79,7 @@ int main() {
// std::cout << "fib = " << (b) << std::endl;
// return scheduling::parallel_result<int>{0};
// });
return fib(16);
return fib(10);
});
end = std::chrono::steady_clock::now();
......
......@@ -58,7 +58,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/cont_manager.h
include/pls/internal/scheduling/cont.h
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/cont_manager_impl.h include/pls/internal/scheduling/thread_state_static.h)
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/optional.h include/pls/internal/scheduling/memory_block.h include/pls/internal/scheduling/thread_state_static.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......
......@@ -85,6 +85,8 @@ class bounded_ws_deque {
}
// The queue is empty and we lost the competition
local_bottom_ = 0;
bottom_.store(local_bottom_);
top_.store({old_top.stamp + 1, 0});
return optional<T>();
}
......
......@@ -116,6 +116,7 @@ class cont : public base_cont {
using result_type = decltype(function_((*result_1_).value(), (*result_2_).value()));
result_runner<result_type>::execute(*this);
this->get_memory_block()->free_buffer();
this->get_memory_block()->reset_state();
this->~cont();
}
......
......@@ -22,7 +22,7 @@ class cont_manager {
template<size_t NUM_CONTS, size_t MAX_CONT_SIZE>
explicit cont_manager(data_structures::aligned_stack &cont_storage, template_args<NUM_CONTS, MAX_CONT_SIZE>)
: max_cont_size_{MAX_CONT_SIZE} {
: max_cont_size_{MAX_CONT_SIZE}, num_conts_{NUM_CONTS} {
// First node is currently active and our local start
start_node_ = active_node_ = init_memory_block<MAX_CONT_SIZE>(cont_storage, nullptr, nullptr, 0);
......@@ -66,6 +66,7 @@ class cont_manager {
target_chain->set_next(our_next_node);
start_node_ = target_chain->get_start();
active_node_ = target_chain;
}
memory_block *get_node(unsigned int depth) {
......@@ -80,6 +81,21 @@ class cont_manager {
return current;
}
void check_clean_chain() {
memory_block *current = start_node_;
for (unsigned int i = 0; i < num_conts_; i++) {
bool buffer_used = current->is_buffer_used();
auto state_value = current->get_state().load().value;
if (state_value != memory_block::initialized || buffer_used || current->get_depth() != i) {
PLS_ASSERT(false,
"Must always steal with a clean chain!");
}
current->set_start(start_node_);
current = current->get_next();
}
}
void set_active_depth(unsigned int depth) {
active_node_ = get_node(depth);
}
......@@ -93,6 +109,9 @@ class cont_manager {
auto *notified_cont = fall_through_cont_;
bool notifier_is_right_child = fall_through_child_right;
std::cout << "Notifying Cont on core " << my_state.get_id() << " and depth "
<< notified_cont->get_memory_block()->get_depth() << std::endl;
fall_through_cont_ = nullptr;
fall_through_ = false;
......@@ -103,9 +122,10 @@ class cont_manager {
if (!notifier_is_right_child) {
// Check to execute right child directly...
auto &atomic_state = notified_cont->get_memory_block()->get_state();
memory_block::stamped_state target_state{atomic_state.load().stamp + 1, memory_block::state::execute_local};
memory_block::stamped_state old_state = atomic_state.exchange(target_state);
if (old_state.value != memory_block::state::stolen) {
auto old_state = atomic_state.load();
memory_block::stamped_state target_state{old_state.stamp + 1, memory_block::state::execute_local};
memory_block::stamped_state exchanged_state = atomic_state.exchange(target_state);
if (exchanged_state.value != memory_block::state::stolen) {
// We 'overruled' the stealing thread and execute the other task ourselfs.
// We can be sure that only we where involved in executing the child tasks of the parent_continuation...
notified_cont->get_memory_block()->get_results_missing().fetch_add(-1);
......@@ -130,19 +150,25 @@ class cont_manager {
// Right side is 'fully' stolen. We can continue to inform the parent like we would do normally.
}
// Keep the target chain before we execute, as this potentially frees the memory
auto *target_chain = notified_cont->get_memory_block()->get_offered_chain().load();
// Notify the next continuation of finishing a child...
if (notified_cont->get_memory_block()->get_results_missing().fetch_add(-1) == 1) {
// ... we finished the continuation.
// We are now in charge continuing to execute the above continuation chain.
if (get_node(notified_cont->get_memory_block()->get_depth()) != notified_cont->get_memory_block()) {
my_state.cont_manager_.check_clean_chain();
// We do not own the thing we will execute.
// Own it by swapping the chain belonging to it in.
aquire_memory_chain(notified_cont->get_memory_block());
std::cout << "Now in charge of memory chain on core " << my_state.get_id() << std::endl;
}
my_state.parent_cont_ = notified_cont->get_parent();
my_state.right_spawn_ = notified_cont->is_right_child();
active_node_ = notified_cont->get_memory_block();
std::cout << "Execute cont on core " << my_state.get_id() << std::endl;
notified_cont->execute();
if (!falling_through() && notified_cont->get_parent() != nullptr) {
fall_through_and_notify_cont(notified_cont->get_parent(), notified_cont->is_right_child());
......@@ -155,8 +181,11 @@ class cont_manager {
if (get_node(notified_cont->get_memory_block()->get_depth()) == notified_cont->get_memory_block()) {
// We own the thing we are not allowed to execute.
// Get rid of the ownership by using the offered chain.
aquire_memory_chain(notified_cont->get_memory_block()->get_offered_chain().load());
aquire_memory_chain(target_chain);
std::cout << "No longer in charge of chain above on core " << my_state.get_id() << std::endl;
}
my_state.cont_manager_.check_clean_chain();
// We are done here...nothing more to execute
return;
}
......@@ -178,6 +207,7 @@ class cont_manager {
private:
const size_t max_cont_size_;
const size_t num_conts_;
/**
* Managing the continuation chain.
......
......@@ -34,7 +34,10 @@ class memory_block {
template<typename T, typename ...ARGS>
T *place_in_buffer(ARGS &&...args) {
PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node.")
if (memory_buffer_used_) {
pls::internal::base::this_thread::sleep(100000);
PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node.");
}
memory_buffer_used_ = true;
return new(memory_buffer_) T(std::forward<ARGS>(args)...);
......@@ -43,6 +46,9 @@ class memory_block {
PLS_ASSERT(memory_buffer_used_, "Can only free a memory spot when it is in use.")
memory_buffer_used_ = false;
}
bool is_buffer_used() {
return memory_buffer_used_;
}
// TODO: Fit the reset somewhere!!!
// // Reset Associated counters
......@@ -71,7 +77,7 @@ class memory_block {
memory_chain_start_ = start;
}
enum state { initialized, execute_local, stealing, stolen };
enum state { initialized, execute_local, stealing, stolen, invalid };
using stamped_state = data_structures::stamped_integer;
std::atomic<stamped_state> &get_state() {
......@@ -90,6 +96,13 @@ class memory_block {
return depth_;
}
void reset_state() {
offered_chain_.store(nullptr);
auto old_state = state_.load();
state_.store({old_state.stamp + 1, initialized});
results_missing_.store(2);
}
private:
// Linked list property of memory blocks (a complete list represents a threads currently owned memory).
// Each block knows its chain start to allow stealing a whole chain in O(1)
......
......@@ -44,11 +44,11 @@ struct scheduler::starter {
// (We might optimize this in the future to require less memory copies.)
auto *current_memory_block = cont_manager.get_next_memory_block();
// We set the correct side when invoking user code.
const bool is_right_cont = my_state.right_spawn_;
// We keep track of the last spawn to build up the parent_cont chain
const bool is_right_cont = my_state.right_spawn_;
base_cont *parent_cont = my_state.parent_cont_;
continuation_type *current_cont = current_memory_block->place_in_buffer<continuation_type>(parent_cont,
current_memory_block,
is_right_cont,
......@@ -74,6 +74,8 @@ struct scheduler::starter {
if (cont_manager.falling_through()) {
// Main scheduling loop is responsible for entering the result to the slow path...
current_cont->store_left_result(std::move(result_1));
auto old_state = current_cont->get_memory_block()->get_state().load();
current_cont->get_memory_block()->get_state().store({old_state.stamp + 1, memory_block::invalid});
PLS_ASSERT(current_cont->get_memory_block()->get_results_missing().fetch_add(-1) != 1,
"We fall through, meaning we 'block' an cont above, thus this can never happen!");
// Unwind stack...
......
......@@ -5,14 +5,18 @@
#include <memory>
#include <utility>
#include <array>
#include <mutex>
#include <atomic>
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/cont_manager.h"
#include "pls/internal/scheduling/memory_block.h"
#include "pls/internal/data_structures/bounded_ws_deque.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/base/spin_lock.h"
namespace pls {
namespace internal {
namespace scheduling {
......@@ -38,24 +42,33 @@ class task_manager {
public:
// Publishes a task on the stack, i.e. makes it visible for other threads to steal.
void publish_task(base_task *task) {
std::lock_guard<base::spin_lock> lock{lock_};
task_deque_.push_bottom(task_handle{task});
}
// Try to pop a local task from this task managers stack.
bool steal_local_task() {
std::lock_guard<base::spin_lock> lock{lock_};
return task_deque_.pop_bottom();
}
// Try to steal a task from a remote task_manager instance. The stolen task must be stored locally.
// Returns a pair containing the actual task and if the steal was successful.
base_task *steal_remote_task(cont_manager &stealing_cont_manager) {
std::lock_guard<base::spin_lock> lock{lock_};
auto stolen_task_handle = task_deque_.pop_top();
if (stolen_task_handle) {
base_task *stolen_task = (*stolen_task_handle).task_;
std::cout << "Nearly stole on core " << thread_state::get().get_id() << " task with depth "
<< stolen_task->get_cont()->get_memory_block()->get_depth() << std::endl;
auto &atomic_state = (*stolen_task_handle).task_memory_block_->get_state();
auto &atomic_offered_chain = (*stolen_task_handle).task_memory_block_->get_offered_chain();
auto offered_chain = stealing_cont_manager.get_node((*stolen_task_handle).task_memory_block_->get_depth());
if (offered_chain == (*stolen_task_handle).task_memory_block_) {
PLS_ASSERT(false, "How would we offer our own chain? We only offer when stealing!");
}
auto last_state = atomic_state.load();
if (last_state.value != memory_block::initialized) {
return nullptr;
......@@ -78,6 +91,7 @@ class task_manager {
}
if (atomic_state.compare_exchange_strong(loop_state, {loop_state.stamp + 1, memory_block::stolen})) {
std::cout << "Steal!" << std::endl;
return stolen_task;
} else {
return nullptr;
......@@ -90,10 +104,12 @@ class task_manager {
}
}
explicit task_manager(data_structures::bounded_ws_deque<task_handle> &task_deque) : task_deque_{task_deque} {}
explicit task_manager(data_structures::bounded_ws_deque<task_handle> &task_deque) : task_deque_{task_deque},
lock_{} {}
private:
data_structures::bounded_ws_deque<task_handle> &task_deque_;
base::spin_lock lock_;
};
template<size_t NUM_TASKS, size_t MAX_STACK_SIZE>
......
......@@ -69,6 +69,7 @@ void scheduler::work_thread_work_section() {
while (my_cont_manager.falling_through()) {
my_cont_manager.execute_fall_through_code();
}
my_cont_manager.check_clean_chain();
// Steal Routine (will be continuously executed when there are no more fall through's).
// TODO: move into separate function
......@@ -82,16 +83,19 @@ void scheduler::work_thread_work_section() {
auto &target_state = my_state.get_scheduler().thread_state_for(target);
my_cont_manager.check_clean_chain();
auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager);
if (stolen_task != nullptr) {
my_state.parent_cont_ = stolen_task->get_cont();
my_state.right_spawn_ = true;
my_cont_manager.set_active_depth(stolen_task->get_cont()->get_memory_block()->get_depth() + 1);
my_cont_manager.check_clean_chain();
stolen_task->execute();
if (my_cont_manager.falling_through()) {
break;
} else {
my_cont_manager.fall_through_and_notify_cont(stolen_task->get_cont(), true);
break;
}
}
}
......
#include <catch.hpp>
#include <atomic>
#include <thread>
#include <mutex>
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/cont.h"
#include "pls/internal/scheduling/cont_manager.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/parallel_result.h"
using namespace pls::internal::scheduling;
// TODO: Introduce actual tests once multiple threads work...
TEST_CASE("continuation stealing", "[internal/scheduling/cont_manager.h]") {
const int NUM_THREADS = 2;
const int NUM_TASKS = 8;
const int MAX_TASK_STACK_SIZE = 8;
const int NUM_CONTS = 8;
const int MAX_CONT_SIZE = 256;
static_scheduler_memory<NUM_THREADS,
NUM_TASKS,
MAX_TASK_STACK_SIZE,
NUM_CONTS,
MAX_CONT_SIZE> static_scheduler_memory;
scheduler scheduler{static_scheduler_memory, NUM_THREADS};
// Coordinate progress to match OUR order
std::atomic<int> progress{0};
// Order:
// 0) work on first task on main thread
// 1) second thread stole right task
scheduler.perform_work([&]() {
return scheduler::par([&]() {
while (progress.load() != 1);
return parallel_result<int>{0};
}, [&]() {
progress.store(1);
return parallel_result<int>{0};
}).then([&](int, int) {
return parallel_result<int>{0};
});
});
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment