Commit 0228aa92 by FritzFlorian

Allow to re-use main thread as worker.

parent 18cd7cf7
Pipeline #1259 failed with stages
in 60 minutes 47 seconds
...@@ -32,6 +32,7 @@ using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>; ...@@ -32,6 +32,7 @@ using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler { class scheduler {
friend class task; friend class task;
const unsigned int num_threads_; const unsigned int num_threads_;
const bool reuse_thread_;
scheduler_memory *memory_; scheduler_memory *memory_;
base::barrier sync_barrier_; base::barrier sync_barrier_;
...@@ -49,7 +50,7 @@ class scheduler { ...@@ -49,7 +50,7 @@ class scheduler {
* @param memory All memory is allocated statically, thus the user is required to provide the memory instance. * @param memory All memory is allocated statically, thus the user is required to provide the memory instance.
* @param num_threads The number of worker threads to be created. * @param num_threads The number of worker threads to be created.
*/ */
explicit scheduler(scheduler_memory *memory, unsigned int num_threads); explicit scheduler(scheduler_memory *memory, unsigned int num_threads, bool reuse_thread = true);
/** /**
* The scheduler is implicitly terminated as soon as it leaves the scope. * The scheduler is implicitly terminated as soon as it leaves the scope.
......
...@@ -14,19 +14,30 @@ template<typename Function> ...@@ -14,19 +14,30 @@ template<typename Function>
void scheduler::perform_work(Function work_section) { void scheduler::perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work") PROFILE_WORK_BLOCK("scheduler::perform_work")
// if (execute_main_thread) { // Prepare main root task
// work_section();
//
// sync_barrier_.wait(); // Trigger threads to wake up
// sync_barrier_.wait(); // Wait for threads to finish
// } else {
lambda_task_by_reference<Function> root_task{work_section}; lambda_task_by_reference<Function> root_task{work_section};
main_thread_root_task_ = &root_task; main_thread_root_task_ = &root_task;
work_section_done_ = false; work_section_done_ = false;
sync_barrier_.wait(); // Trigger threads to wake up if (reuse_thread_) {
sync_barrier_.wait(); // Wait for threads to finish // TODO: See if we should change thread-states to not make our state override the current thread state
// } auto my_state = memory_->thread_state_for(0);
base::this_thread::set_state(my_state); // Make THIS THREAD become the main worker
sync_barrier_.wait(); // Trigger threads to wake up
// Do work (see if we can remove this duplicated code)
root_task.parent_ = nullptr;
root_task.deque_state_ = my_state->deque_.save_state();
root_task.execute();
work_section_done_ = true;
sync_barrier_.wait(); // Wait for threads to finish
} else {
// Simply trigger the others to do the work, this thread will sleep/wait for the time being
sync_barrier_.wait(); // Trigger threads to wake up
sync_barrier_.wait(); // Wait for threads to finish
}
} }
template<typename T> template<typename T>
......
...@@ -62,12 +62,13 @@ void task::spawn_child(T &&sub_task) { ...@@ -62,12 +62,13 @@ void task::spawn_child(T &&sub_task) {
template<typename T> template<typename T>
void task::spawn_child_and_wait(T &&sub_task) { void task::spawn_child_and_wait(T &&sub_task) {
PROFILE_FORK_JOIN_STEALING("spawn_child") PROFILE_FORK_JOIN_STEALING("spawn_child_wait")
static_assert(std::is_base_of<task, typename std::remove_reference<T>::type>::value, "Only pass task subclasses!"); static_assert(std::is_base_of<task, typename std::remove_reference<T>::type>::value, "Only pass task subclasses!");
// Assign forced values (for stack and parent management) // Assign forced values (for stack and parent management)
sub_task.parent_ = nullptr; sub_task.parent_ = nullptr;
sub_task.deque_state_ = thread_state::get()->deque_.save_state(); sub_task.deque_state_ = thread_state::get()->deque_.save_state();
PROFILE_END_BLOCK
sub_task.execute(); sub_task.execute();
wait_for_all(); wait_for_all();
......
...@@ -8,10 +8,11 @@ namespace pls { ...@@ -8,10 +8,11 @@ namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) : scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads, bool reuse_thread) :
num_threads_{num_threads}, num_threads_{num_threads},
reuse_thread_{reuse_thread},
memory_{memory}, memory_{memory},
sync_barrier_{num_threads + 1}, sync_barrier_{num_threads + 1 - reuse_thread},
terminated_{false} { terminated_{false} {
if (num_threads_ > memory_->max_threads()) { if (num_threads_ > memory_->max_threads()) {
PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory."); PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
...@@ -20,8 +21,13 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) : ...@@ -20,8 +21,13 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads) :
for (unsigned int i = 0; i < num_threads_; i++) { for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized. // Placement new is required, as the memory of `memory_` is not required to be initialized.
new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i}; new((void *) memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
if (reuse_thread && i == 0) {
continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
}
new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&scheduler::worker_routine, new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&scheduler::worker_routine,
memory_->thread_state_for(i)); memory_->thread_state_for(i));
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment