Commit 21733e4c by FritzFlorian

WIP: Major flaws fixed. Edge cases at beginning missing and cleanup for conts missing.

parent 69fd7e0c
Pipeline #1339 failed with stages
in 25 seconds
......@@ -7,7 +7,7 @@
using namespace pls::internal;
constexpr size_t NUM_THREADS = 1;
constexpr size_t NUM_THREADS = 8;
constexpr size_t NUM_TASKS = 64;
constexpr size_t MAX_TASK_STACK_SIZE = 0;
......@@ -17,7 +17,7 @@ constexpr size_t MAX_CONT_SIZE = 256;
std::atomic<int> count{0};
scheduling::parallel_result<int> fib(int n) {
std::cout << "Fib(" << n << "): " << count++ << ", " << scheduling::thread_state::get().get_id() << std::endl;
// std::cout << "Fib(" << n << "): " << count++ << ", " << scheduling::thread_state::get().get_id() << std::endl;
if (n == 0) {
return 0;
}
......@@ -31,13 +31,13 @@ scheduling::parallel_result<int> fib(int n) {
return fib(n - 2);
}).then([=](int a, int b) {
scheduling::parallel_result<int> result{a + b};
std::cout << "Done Fib(" << n << "): " << (a + b) << ", " << scheduling::thread_state::get().get_id() << std::endl;
// std::cout << "Done Fib(" << n << "): " << (a + b) << ", " << scheduling::thread_state::get().get_id() << std::endl;
return result;
});
}
int fib_normal(int n) {
std::cout << "Fib(" << n << "): " << count++ << std::endl;
// std::cout << "Fib(" << n << "): " << count++ << std::endl;
if (n == 0) {
return 0;
}
......@@ -46,7 +46,7 @@ int fib_normal(int n) {
}
int result = fib_normal(n - 1) + fib_normal(n - 2);
std::cout << "Done Fib(" << n << "): " << result << std::endl;
// std::cout << "Done Fib(" << n << "): " << result << std::endl;
return result;
}
......@@ -60,19 +60,27 @@ int main() {
scheduling::scheduler scheduler{static_scheduler_memory, NUM_THREADS};
auto start = std::chrono::steady_clock::now();
// std::cout << "fib = " << fib_normal(10) << std::endl;
std::cout << "fib = " << fib_normal(16) << std::endl;
auto end = std::chrono::steady_clock::now();
std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count()
<< std::endl;
start = std::chrono::steady_clock::now();
scheduler.perform_work([]() {
return fib(10);
// return scheduling::scheduler::par([]() {
// return scheduling::parallel_result<int>(0);
// }, []() {
// return fib(16);
// }).then([](int, int b) {
// std::cout << "fib = " << (b) << std::endl;
// return scheduling::parallel_result<int>{0};
// });
return fib(16);
});
end = std::chrono::steady_clock::now();
std::cout << "Framework: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << std::endl;
std::cout << "Framework: " << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << std::endl;
return 0;
}
......@@ -104,8 +104,8 @@ class cont_manager {
// Check to execute right child directly...
auto &atomic_state = notified_cont->get_memory_block()->get_state();
memory_block::stamped_state target_state{atomic_state.load().stamp + 1, memory_block::state::execute_local};
atomic_state.exchange(target_state);
if (target_state.value != memory_block::state::stolen) {
memory_block::stamped_state old_state = atomic_state.exchange(target_state);
if (old_state.value != memory_block::state::stolen) {
// We 'overruled' the stealing thread and execute the other task ourselfs.
// We can be sure that only we where involved in executing the child tasks of the parent_continuation...
notified_cont->get_memory_block()->get_results_missing().fetch_add(-1);
......
#ifndef PLS_INTERNAL_SCHEDULING_CONT_NODE_H_
#define PLS_INTERNAL_SCHEDULING_CONT_NODE_H_
namespace pls {
namespace internal {
namespace scheduling {
/**
* A block of memory that can be used to store tasks and continuations.
* Threads trade these blocks while executing and stealing tasks.
*
* Each block has an associated, raw memory buffer. The user can place his object
* in this memory region as needed. He is responsible for calling deconstructors of the
* placed objects.
*/
class memory_block {
public:
memory_block(char *memory_buffer,
size_t memory_buffer_size,
memory_block *memory_chain_start,
memory_block *prev,
unsigned int depth)
: memory_chain_start_{memory_chain_start},
prev_{prev},
next_{nullptr},
offered_chain_{nullptr},
state_{{initialized}},
results_missing_{2},
memory_buffer_{memory_buffer},
memory_buffer_size_{memory_buffer_size},
memory_buffer_used_{false},
depth_{depth} {};
template<typename T, typename ...ARGS>
T *place_in_buffer(ARGS &&...args) {
PLS_ASSERT(!memory_buffer_used_, "Must only allocate one continuation at once per node.")
memory_buffer_used_ = true;
return new(memory_buffer_) T(std::forward<ARGS>(args)...);
}
void free_buffer() {
PLS_ASSERT(memory_buffer_used_, "Can only free a memory spot when it is in use.")
memory_buffer_used_ = false;
}
// TODO: Fit the reset somewhere!!!
// // Reset Associated counters
// results_missing_.store(2);
// offered_chain_.store(nullptr);
// auto old_state = state_.load();
// state_.store({old_state.stamp + 1, initialized});
memory_block *get_prev() {
return prev_;
}
void set_prev(memory_block *prev) {
prev_ = prev;
}
memory_block *get_next() {
return next_;
}
void set_next(memory_block *next) {
next_ = next;
}
memory_block *get_start() {
return memory_chain_start_;
}
void set_start(memory_block *start) {
memory_chain_start_ = start;
}
enum state { initialized, execute_local, stealing, stolen };
using stamped_state = data_structures::stamped_integer;
std::atomic<stamped_state> &get_state() {
return state_;
}
std::atomic<memory_block *> &get_offered_chain() {
return offered_chain_;
}
std::atomic<unsigned short> &get_results_missing() {
return results_missing_;
}
unsigned int get_depth() const noexcept {
return depth_;
}
private:
// Linked list property of memory blocks (a complete list represents a threads currently owned memory).
// Each block knows its chain start to allow stealing a whole chain in O(1)
// without the need to traverse back to the chain start.
memory_block *memory_chain_start_;
memory_block *prev_, *next_;
// When blocked on this chain element, we need to know what other chain of memory we
// got offered by the stealing thread.
// For this we need the offered chain's element up to the point we can steal.
std::atomic<memory_block *> offered_chain_;
// The flag is needed for an ongoing stealing request.
// Stealing threads need to offer their memory block chain before the
// 'fully' own the stolen task. As long as that is not done the memory block
// chain can abort the steal request in order to be not blocked without a
// new, clean memory block chain to work with.
std::atomic<stamped_state> state_;
// Management for coordinating concurrent result writing and stealing.
// The result count decides atomically who gets to execute the continuation
// and who therefore get's to own this memory block chain.
std::atomic<unsigned short> results_missing_;
// Pointer to memory region reserved for the companion continuation.
// Must be a buffer big enough to hold any continuation encountered in the program.
// This memory is managed explicitly by the continuation manager and runtime system
// (they need to make sure to always call de-constructors and never allocate two continuations).
char *memory_buffer_;
// These two are only helper properties helping with bugs during development.
size_t memory_buffer_size_;
bool memory_buffer_used_;
// Each element stays at a fixed depth for the entire application run.
// Swapping parts of a memory chain will not reorder it, as always parts of
// the same size are exchanged.
const unsigned int depth_;
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_CONT_NODE_H_
......@@ -74,7 +74,8 @@ struct scheduler::starter {
if (cont_manager.falling_through()) {
// Main scheduling loop is responsible for entering the result to the slow path...
current_cont->store_left_result(std::move(result_1));
cont_manager.fall_through_and_notify_cont(current_cont, false);
PLS_ASSERT(current_cont->get_memory_block()->get_results_missing().fetch_add(-1) != 1,
"We fall through, meaning we 'block' an cont above, thus this can never happen!");
// Unwind stack...
return result_type{};
}
......@@ -133,9 +134,9 @@ class scheduler::init_function_impl : public init_function {
return parallel_result<int>{0};
}, [=]() {
return function_();
}).then([=](int, int) {
}).then([=](int, int b) {
thread_state::get().get_scheduler().work_section_done_ = true;
return parallel_result<int>{0};
return parallel_result<int>{b};
});
}
......
#ifndef PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_
#define PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/cont_manager.h"
#include "thread_state.h"
namespace pls {
namespace internal {
namespace scheduling {
template<size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
struct thread_state_static {
public:
thread_state_static()
: static_task_manager_{},
static_cont_manager_{},
thread_state_{static_task_manager_.get_task_manager(), static_cont_manager_.get_cont_manager()} {}
thread_state &get_thread_state() { return thread_state_; }
private:
static_task_manager<NUM_TASKS, MAX_TASK_STACK_SIZE> static_task_manager_;
static_cont_manager<NUM_CONTS, MAX_CONT_SIZE> static_cont_manager_;
thread_state thread_state_;
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_THREAD_STATE_STATIC_H_
......@@ -88,6 +88,11 @@ void scheduler::work_thread_work_section() {
my_state.right_spawn_ = true;
my_cont_manager.set_active_depth(stolen_task->get_cont()->get_memory_block()->get_depth() + 1);
stolen_task->execute();
if (my_cont_manager.falling_through()) {
break;
} else {
my_cont_manager.fall_through_and_notify_cont(stolen_task->get_cont(), true);
}
}
}
} while (!work_section_done_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment