Commit 5bdca02f by FritzFlorian

Add API for starting top level tasks.

parent 88b25f22
......@@ -19,7 +19,9 @@ namespace pls {
static std::uintptr_t next_alignment(std::uintptr_t size);
static char* next_alignment(char* pointer);
public:
explicit aligned_stack(char* memory_region, const std::size_t size):
aligned_stack(): memory_start_{nullptr}, memory_end_{nullptr}, head_{nullptr} {};
aligned_stack(char* memory_region, const std::size_t size):
memory_start_{memory_region},
memory_end_{memory_region + size},
head_{next_alignment(memory_start_)} {}
......
......@@ -5,9 +5,11 @@
#include <array>
#include <iostream>
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h"
#include "pls/internal/scheduling/thread_state.h"
#include "root_master_task.h"
#include "root_worker_task.h"
......@@ -18,78 +20,87 @@ namespace pls {
// Could be moved to templating if needed.
static constexpr int MAX_THREADS = 32;
class scheduler {
static void worker_routine() {
auto my_state = base::this_thread::state<thread_state>();
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
while (true) {
my_state->scheduler_->sync_barrier_.wait();
if (my_state->scheduler_->terminated_) {
return;
}
class scheduler_memory {
public:
virtual size_t max_threads() = 0;
virtual thread_state* thread_state_for(size_t id) = 0;
virtual scheduler_thread* thread_for(size_t id) = 0;
virtual base::aligned_stack* task_stack_for(size_t id) = 0;
};
// The root task must only return when all work is done,
// because of this a simple call is enough to ensure the
// fork-join-section is done (logically joined back into our main thread).
my_state->root_task_->execute();
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
class static_scheduler_memory: public scheduler_memory {
std::array<scheduler_thread, MAX_THREADS> threads_;
std::array<thread_state, MAX_THREADS> thread_states_;
std::array<std::array<char, TASK_STACK_SIZE>, MAX_THREADS> task_stacks_memory_;
std::array<base::aligned_stack, MAX_THREADS> task_stacks_;
my_state->scheduler_->sync_barrier_.wait();
public:
static_scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
task_stacks_[i] = base::aligned_stack(reinterpret_cast<char*>(&task_stacks_memory_[i]), TASK_STACK_SIZE);
}
}
size_t max_threads() override { return MAX_THREADS; }
thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; }
scheduler_thread* thread_for(size_t id) override { return &threads_[id]; }
base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; }
};
class scheduler {
friend void worker_routine();
unsigned int num_threads_;
std::array<thread_state, MAX_THREADS> thread_states_;
std::array<base::thread<decltype(&worker_routine), thread_state>, MAX_THREADS> threads_;
scheduler_memory* memory_;
base::barrier sync_barrier_;
bool terminated_;
public:
explicit scheduler(const unsigned int num_threads):
num_threads_{num_threads},
sync_barrier_{num_threads + 1},
terminated_{false} {
if (num_threads > MAX_THREADS) {
exit(1); // TODO: Exception Handling
}
for (unsigned int i = 0; i < num_threads; i++) {
thread_states_[i] = thread_state{this};
threads_[i] = base::start_thread(&worker_routine, &thread_states_[i]);
}
}
~scheduler() {
terminate();
}
explicit scheduler(scheduler_memory* memory, unsigned int num_threads);
~scheduler();
template<typename Function>
void perform_work(Function work_section) {
root_master_task<Function> master{work_section};
root_worker_task<Function> worker{&master};
thread_states_[0].root_task_ = &master;
// Push root task on stacks
memory_->thread_state_for(0)->root_task_ = memory_->task_stack_for(0)->push(master);
for (unsigned int i = 1; i < num_threads_; i++) {
thread_states_[i].root_task_ = &worker;
memory_->thread_state_for(i)->root_task_ = memory_->task_stack_for(i)->push(worker);
}
// Perform and wait for work
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
}
void terminate(bool wait_for_workers=true) {
if (terminated_) {
return;
// Remove root task from stacks
memory_->task_stack_for(0)->pop<typeof(master)>();
for (unsigned int i = 1; i < num_threads_; i++) {
memory_->task_stack_for(i)->pop<typeof(worker)>();
}
}
terminated_ = true;
sync_barrier_.wait();
// TODO: See if we should place this differently (only for performance reasons)
template<typename Task>
void execute_task(Task& task) {
static_assert(std::is_base_of<abstract_task, Task>::value, "Only pass abstract_task subclasses!");
if (wait_for_workers) {
for (unsigned int i = 0; i < num_threads_; i++) {
threads_[i].join();
}
}
auto my_state = base::this_thread::state<thread_state>();
auto task_stack = my_state->task_stack_;
// TODO: Assert if 'top level' task even have to go somewhere or if
// we can simply keep the on the call stack.
auto my_task = task_stack->push(task);
my_task.execute();
task_stack->pop<Task>();
}
void terminate(bool wait_for_workers=true);
};
}
}
......
......@@ -11,11 +11,15 @@ namespace pls {
class scheduler;
struct thread_state {
thread_state(): scheduler_{nullptr}, root_task_{nullptr} {};
explicit thread_state(scheduler* scheduler): scheduler_{scheduler}, root_task_{nullptr} {}
thread_state(): scheduler_{nullptr}, root_task_{nullptr}, task_stack_{nullptr} {};
explicit thread_state(scheduler* scheduler, base::aligned_stack* task_stack):
scheduler_{scheduler},
root_task_{nullptr},
task_stack_{task_stack} {}
scheduler* scheduler_;
abstract_task* root_task_;
base::aligned_stack* task_stack_;
};
}
}
......
......@@ -3,7 +3,57 @@
namespace pls {
namespace internal {
namespace scheduling {
scheduler::scheduler(scheduler_memory* memory, const unsigned int num_threads):
num_threads_{num_threads},
memory_{memory},
sync_barrier_{num_threads + 1},
terminated_{false} {
if (num_threads > MAX_THREADS) {
exit(1); // TODO: Exception Handling
}
for (unsigned int i = 0; i < num_threads; i++) {
*memory_->thread_state_for(i) = thread_state{this, memory_->task_stack_for(i)};
*memory_->thread_for(i) = base::start_thread(&worker_routine, memory_->thread_state_for(i));
}
}
scheduler::~scheduler() {
terminate();
}
void worker_routine() {
auto my_state = base::this_thread::state<thread_state>();
while (true) {
my_state->scheduler_->sync_barrier_.wait();
if (my_state->scheduler_->terminated_) {
return;
}
// The root task must only return when all work is done,
// because of this a simple call is enough to ensure the
// fork-join-section is done (logically joined back into our main thread).
my_state->root_task_->execute();
my_state->scheduler_->sync_barrier_.wait();
}
}
void scheduler::terminate(bool wait_for_workers) {
if (terminated_) {
return;
}
terminated_ = true;
sync_barrier_.wait();
if (wait_for_workers) {
for (unsigned int i = 0; i < num_threads_; i++) {
memory_->thread_for(i)->join();
}
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment