Commit f87b1536 by FritzFlorian

Add basic invoke_parallel construct.

There are serval ways we could optimize the calls, but for now this should be enough for first tests.
parent c6538504
Pipeline #1121 failed with stages
in 1 minute 12 seconds
...@@ -30,6 +30,7 @@ add_subdirectory(lib/pls) ...@@ -30,6 +30,7 @@ add_subdirectory(lib/pls)
# Include examples # Include examples
add_subdirectory(app/playground) add_subdirectory(app/playground)
add_subdirectory(app/test_for_new) add_subdirectory(app/test_for_new)
add_subdirectory(app/invoke_parallel)
# Add optional tests # Add optional tests
option(PACKAGE_TESTS "Build the tests" ON) option(PACKAGE_TESTS "Build the tests" ON)
......
add_executable(invoke_parallel main.cpp)
target_link_libraries(invoke_parallel pls)
#include <pls/pls.h>
#include <iostream>
static pls::static_scheduler_memory<8, 2 << 10> my_scheduler_memory;
static constexpr int CUTOFF = 20;
long fib_serial(long n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib_serial(n - 1) + fib_serial(n - 2);
}
long fib(long n) {
if (n <= CUTOFF) {
return fib_serial(n);
}
// Actual 'invoke_parallel' logic/code
int left, right;
pls::invoke_parallel(
[&] { left = fib(n - 1); },
[&] { right = fib(n - 2); }
);
return left + right;
}
int main() {
pls::scheduler scheduler{&my_scheduler_memory, 8};
scheduler.perform_work([] {
auto start = std::chrono::high_resolution_clock::now();
// Call looks just the same, only requirement is
// the enclosure in the perform_work lambda.
long result = fib(30);
auto end = std::chrono::high_resolution_clock::now();
long time = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
std::cout << "Fib(30)=" << result << std::endl;
std::cout << "Execution time in us: " << time << std::endl;
});
}
...@@ -49,10 +49,6 @@ protected: ...@@ -49,10 +49,6 @@ protected:
wait_for_all(); wait_for_all();
*result_ = left_result + right_result; *result_ = left_result + right_result;
} }
public:
void test() override {
std::cout << "Test Override" << std::endl;
}
}; };
......
...@@ -2,14 +2,48 @@ ...@@ -2,14 +2,48 @@
#ifndef PLS_PARALLEL_INVOKE_H #ifndef PLS_PARALLEL_INVOKE_H
#define PLS_PARALLEL_INVOKE_H #define PLS_PARALLEL_INVOKE_H
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls { namespace pls {
namespace algorithm { namespace algorithm {
// template<typename Function1, typename Function2> namespace internal {
// void invoke_parallel(Function1 function1, Function2 function2) { using namespace ::pls::internal::scheduling;
// if (already_this_invoke_parallel_instance) {
// template<typename Body>
// } inline void run_body(const Body& internal_body, const abstract_task::id& id) {
// } // Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
auto current_sub_task = reinterpret_cast<fork_join_task*>(current_task)->currently_executing();
internal_body(current_sub_task);
} else {
fork_join_lambda<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
template<typename Function1, typename Function2>
void invoke_parallel(const Function1& function1, const Function2& function2) {
using namespace ::pls::internal::scheduling;
static abstract_task::id id{PLS_UNIQUE_ID, true};
auto internal_body = [&] (fork_join_sub_task* this_task){
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
fork_join_lambda<decltype(sub_task_body_1)> sub_task_1(&sub_task_body_1);
this_task->spawn_child(sub_task_1);
function2(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
internal::run_body(internal_body, id);
}
// ...and so on, add more if we decide to keep this design
} }
} }
......
...@@ -28,26 +28,30 @@ namespace pls { ...@@ -28,26 +28,30 @@ namespace pls {
explicit fork_join_sub_task(); explicit fork_join_sub_task();
fork_join_sub_task(const fork_join_sub_task& other); fork_join_sub_task(const fork_join_sub_task& other);
// Overwritten with behaviour of child tasks
virtual void execute_internal() = 0; virtual void execute_internal() = 0;
// SubClass Implementations:
// Do Work
// |-- Spawn Sub Task (new subtask; spawn(subtask);)
// |-- Spawn Sub task
// Do Work
// |-- Wait For All
// Do Work
// |-- Spawn Sub Task
public:
// Only use them when actually executing this sub_task (only public for simpler API design)
template<typename T> template<typename T>
void spawn_child(const T& sub_task); void spawn_child(const T& sub_task);
void wait_for_all(); void wait_for_all();
private: private:
void spawn_child_internal(fork_join_sub_task* sub_task); void spawn_child_internal(fork_join_sub_task* sub_task);
void execute(); void execute();
};
template<typename Function>
class fork_join_lambda: public fork_join_sub_task {
const Function* function_;
public: public:
virtual void test() { explicit fork_join_lambda(const Function* function): function_{function} {};
std::cout << "Test" << std::endl;
protected:
void execute_internal() override {
(*function_)(this);
} }
}; };
...@@ -55,6 +59,7 @@ namespace pls { ...@@ -55,6 +59,7 @@ namespace pls {
friend class fork_join_sub_task; friend class fork_join_sub_task;
fork_join_sub_task* root_task_; fork_join_sub_task* root_task_;
fork_join_sub_task* currently_executing_;
base::aligned_stack* my_stack_; base::aligned_stack* my_stack_;
// Double-Ended Queue management // Double-Ended Queue management
...@@ -73,6 +78,7 @@ namespace pls { ...@@ -73,6 +78,7 @@ namespace pls {
explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id): explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id):
abstract_task{0, id}, abstract_task{0, id},
root_task_{root_task}, root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr}, my_stack_{nullptr},
deque_{}, deque_{},
last_stolen_{nullptr} {}; last_stolen_{nullptr} {};
...@@ -86,6 +92,8 @@ namespace pls { ...@@ -86,6 +92,8 @@ namespace pls {
// Execute it on our OS thread until its finished // Execute it on our OS thread until its finished
root_task_->execute(); root_task_->execute();
} }
fork_join_sub_task* currently_executing() const { return currently_executing_; }
}; };
template<typename T> template<typename T>
......
...@@ -19,7 +19,7 @@ namespace pls { ...@@ -19,7 +19,7 @@ namespace pls {
base::spin_lock finished_lock_; base::spin_lock finished_lock_;
public: public:
explicit root_task(Function function): explicit root_task(Function function):
abstract_task{0, id{0, true}}, abstract_task{0, id{0}},
function_{function}, function_{function},
finished_{false} {} finished_{false} {}
...@@ -51,7 +51,7 @@ namespace pls { ...@@ -51,7 +51,7 @@ namespace pls {
public: public:
explicit root_worker_task(root_task<Function>* master_task): explicit root_worker_task(root_task<Function>* master_task):
abstract_task{0, id{0, true}}, abstract_task{0, id{0}},
master_task_{master_task} {} master_task_{master_task} {}
void execute() override { void execute() override {
......
...@@ -123,6 +123,8 @@ namespace pls { ...@@ -123,6 +123,8 @@ namespace pls {
} }
} }
static abstract_task* current_task() { return base::this_thread::state<thread_state>()->current_task_; }
void terminate(bool wait_for_workers=true); void terminate(bool wait_for_workers=true);
unsigned int num_threads() const { return num_threads_; } unsigned int num_threads() const { return num_threads_; }
thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); } thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); }
......
#ifndef PLS_LIBRARY_H #ifndef PLS_LIBRARY_H
#define PLS_LIBRARY_H #define PLS_LIBRARY_H
#include "pls/internal/scheduling/scheduler.h" #include "pls/algorithms/invoke_parallel.h"
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/abstract_task.h" #include "pls/internal/scheduling/abstract_task.h"
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls { namespace pls {
using internal::scheduling::scheduler; using internal::scheduling::scheduler;
...@@ -12,6 +13,8 @@ namespace pls { ...@@ -12,6 +13,8 @@ namespace pls {
using internal::scheduling::fork_join_sub_task; using internal::scheduling::fork_join_sub_task;
using internal::scheduling::fork_join_task; using internal::scheduling::fork_join_task;
using algorithm::invoke_parallel;
} }
#endif #endif
...@@ -16,7 +16,9 @@ namespace pls { ...@@ -16,7 +16,9 @@ namespace pls {
} }
void fork_join_sub_task::execute() { void fork_join_sub_task::execute() {
tbb_task_->currently_executing_ = this;
execute_internal(); execute_internal();
tbb_task_->currently_executing_ = nullptr;
wait_for_all(); wait_for_all();
if (parent_ != nullptr) { if (parent_ != nullptr) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment