diff --git a/CMakeLists.txt b/CMakeLists.txt index 513e8a0..8178a05 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,7 @@ add_subdirectory(lib/pls) # Include examples add_subdirectory(app/playground) add_subdirectory(app/test_for_new) +add_subdirectory(app/invoke_parallel) # Add optional tests option(PACKAGE_TESTS "Build the tests" ON) diff --git a/app/invoke_parallel/CMakeLists.txt b/app/invoke_parallel/CMakeLists.txt new file mode 100644 index 0000000..435b2d3 --- /dev/null +++ b/app/invoke_parallel/CMakeLists.txt @@ -0,0 +1,2 @@ +add_executable(invoke_parallel main.cpp) +target_link_libraries(invoke_parallel pls) diff --git a/app/invoke_parallel/main.cpp b/app/invoke_parallel/main.cpp new file mode 100644 index 0000000..9a17139 --- /dev/null +++ b/app/invoke_parallel/main.cpp @@ -0,0 +1,49 @@ +#include +#include + +static pls::static_scheduler_memory<8, 2 << 10> my_scheduler_memory; + +static constexpr int CUTOFF = 20; + +long fib_serial(long n) { + if (n == 0) { + return 0; + } + if (n == 1) { + return 1; + } + + return fib_serial(n - 1) + fib_serial(n - 2); +} + +long fib(long n) { + if (n <= CUTOFF) { + return fib_serial(n); + } + + // Actual 'invoke_parallel' logic/code + int left, right; + pls::invoke_parallel( + [&] { left = fib(n - 1); }, + [&] { right = fib(n - 2); } + ); + return left + right; +} + +int main() { + pls::scheduler scheduler{&my_scheduler_memory, 8}; + + scheduler.perform_work([] { + auto start = std::chrono::high_resolution_clock::now(); + + // Call looks just the same, only requirement is + // the enclosure in the perform_work lambda. + long result = fib(30); + + auto end = std::chrono::high_resolution_clock::now(); + long time = std::chrono::duration_cast(end - start).count(); + + std::cout << "Fib(30)=" << result << std::endl; + std::cout << "Execution time in us: " << time << std::endl; + }); +} diff --git a/app/playground/main.cpp b/app/playground/main.cpp index 4d585cd..e9aee51 100644 --- a/app/playground/main.cpp +++ b/app/playground/main.cpp @@ -49,10 +49,6 @@ protected: wait_for_all(); *result_ = left_result + right_result; } -public: - void test() override { - std::cout << "Test Override" << std::endl; - } }; diff --git a/lib/pls/include/pls/algorithms/invoke_parallel.h b/lib/pls/include/pls/algorithms/invoke_parallel.h index 3809034..aa82aa8 100644 --- a/lib/pls/include/pls/algorithms/invoke_parallel.h +++ b/lib/pls/include/pls/algorithms/invoke_parallel.h @@ -2,14 +2,48 @@ #ifndef PLS_PARALLEL_INVOKE_H #define PLS_PARALLEL_INVOKE_H +#include "pls/internal/scheduling/fork_join_task.h" +#include "pls/internal/scheduling/scheduler.h" + namespace pls { namespace algorithm { -// template -// void invoke_parallel(Function1 function1, Function2 function2) { -// if (already_this_invoke_parallel_instance) { -// -// } -// } + namespace internal { + using namespace ::pls::internal::scheduling; + + template + inline void run_body(const Body& internal_body, const abstract_task::id& id) { + // Make sure we are in the context of this invoke_parallel instance, + // if not we will spawn it as a new 'fork-join-style' task. + auto current_task = scheduler::current_task(); + if (current_task->unique_id() == id) { + auto current_sub_task = reinterpret_cast(current_task)->currently_executing(); + internal_body(current_sub_task); + } else { + fork_join_lambda root_body(&internal_body); + fork_join_task root_task{&root_body, id}; + scheduler::execute_task(root_task); + } + } + } + + template + void invoke_parallel(const Function1& function1, const Function2& function2) { + using namespace ::pls::internal::scheduling; + static abstract_task::id id{PLS_UNIQUE_ID, true}; + + auto internal_body = [&] (fork_join_sub_task* this_task){ + auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); }; + fork_join_lambda sub_task_1(&sub_task_body_1); + + this_task->spawn_child(sub_task_1); + function2(); // Execute last function 'inline' without spawning a sub_task object + this_task->wait_for_all(); + }; + + internal::run_body(internal_body, id); + } + + // ...and so on, add more if we decide to keep this design } } diff --git a/lib/pls/include/pls/internal/scheduling/fork_join_task.h b/lib/pls/include/pls/internal/scheduling/fork_join_task.h index eb08fcd..0248d4e 100644 --- a/lib/pls/include/pls/internal/scheduling/fork_join_task.h +++ b/lib/pls/include/pls/internal/scheduling/fork_join_task.h @@ -28,26 +28,30 @@ namespace pls { explicit fork_join_sub_task(); fork_join_sub_task(const fork_join_sub_task& other); + // Overwritten with behaviour of child tasks virtual void execute_internal() = 0; - // SubClass Implementations: - // Do Work - // |-- Spawn Sub Task (new subtask; spawn(subtask);) - // |-- Spawn Sub task - // Do Work - // |-- Wait For All - // Do Work - // |-- Spawn Sub Task + public: + // Only use them when actually executing this sub_task (only public for simpler API design) template void spawn_child(const T& sub_task); void wait_for_all(); + private: void spawn_child_internal(fork_join_sub_task* sub_task); void execute(); + }; + + template + class fork_join_lambda: public fork_join_sub_task { + const Function* function_; public: - virtual void test() { - std::cout << "Test" << std::endl; + explicit fork_join_lambda(const Function* function): function_{function} {}; + + protected: + void execute_internal() override { + (*function_)(this); } }; @@ -55,6 +59,7 @@ namespace pls { friend class fork_join_sub_task; fork_join_sub_task* root_task_; + fork_join_sub_task* currently_executing_; base::aligned_stack* my_stack_; // Double-Ended Queue management @@ -73,6 +78,7 @@ namespace pls { explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id): abstract_task{0, id}, root_task_{root_task}, + currently_executing_{nullptr}, my_stack_{nullptr}, deque_{}, last_stolen_{nullptr} {}; @@ -86,6 +92,8 @@ namespace pls { // Execute it on our OS thread until its finished root_task_->execute(); } + + fork_join_sub_task* currently_executing() const { return currently_executing_; } }; template diff --git a/lib/pls/include/pls/internal/scheduling/root_task.h b/lib/pls/include/pls/internal/scheduling/root_task.h index c96d624..57a4dca 100644 --- a/lib/pls/include/pls/internal/scheduling/root_task.h +++ b/lib/pls/include/pls/internal/scheduling/root_task.h @@ -19,7 +19,7 @@ namespace pls { base::spin_lock finished_lock_; public: explicit root_task(Function function): - abstract_task{0, id{0, true}}, + abstract_task{0, id{0}}, function_{function}, finished_{false} {} @@ -51,7 +51,7 @@ namespace pls { public: explicit root_worker_task(root_task* master_task): - abstract_task{0, id{0, true}}, + abstract_task{0, id{0}}, master_task_{master_task} {} void execute() override { diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 164eaff..59f9440 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -123,6 +123,8 @@ namespace pls { } } + static abstract_task* current_task() { return base::this_thread::state()->current_task_; } + void terminate(bool wait_for_workers=true); unsigned int num_threads() const { return num_threads_; } thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); } diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index 6b7f81b..a6354cd 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -1,9 +1,10 @@ #ifndef PLS_LIBRARY_H #define PLS_LIBRARY_H -#include "pls/internal/scheduling/scheduler.h" -#include "pls/internal/scheduling/fork_join_task.h" +#include "pls/algorithms/invoke_parallel.h" #include "pls/internal/scheduling/abstract_task.h" +#include "pls/internal/scheduling/fork_join_task.h" +#include "pls/internal/scheduling/scheduler.h" namespace pls { using internal::scheduling::scheduler; @@ -12,6 +13,8 @@ namespace pls { using internal::scheduling::fork_join_sub_task; using internal::scheduling::fork_join_task; + + using algorithm::invoke_parallel; } #endif diff --git a/lib/pls/src/internal/scheduling/fork_join_task.cpp b/lib/pls/src/internal/scheduling/fork_join_task.cpp index cf16d53..b0bba90 100644 --- a/lib/pls/src/internal/scheduling/fork_join_task.cpp +++ b/lib/pls/src/internal/scheduling/fork_join_task.cpp @@ -16,7 +16,9 @@ namespace pls { } void fork_join_sub_task::execute() { + tbb_task_->currently_executing_ = this; execute_internal(); + tbb_task_->currently_executing_ = nullptr; wait_for_all(); if (parent_ != nullptr) {