Commit 351ced3d by Florian Fritz

Merge branch 'parallel_invoke' into 'master'

Merge: Parallel invoke

See merge request !4
parents ea24f5c2 2448ea79
Pipeline #1124 passed with stages
in 3 minutes 22 seconds
......@@ -30,6 +30,7 @@ add_subdirectory(lib/pls)
# Include examples
add_subdirectory(app/playground)
add_subdirectory(app/test_for_new)
add_subdirectory(app/invoke_parallel)
# Add optional tests
option(PACKAGE_TESTS "Build the tests" ON)
......
add_executable(invoke_parallel main.cpp)
target_link_libraries(invoke_parallel pls)
#include <pls/pls.h>
#include <iostream>
static pls::static_scheduler_memory<8, 2 << 10> my_scheduler_memory;
static constexpr int CUTOFF = 20;
long fib_serial(long n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib_serial(n - 1) + fib_serial(n - 2);
}
long fib(long n) {
if (n <= CUTOFF) {
return fib_serial(n);
}
// Actual 'invoke_parallel' logic/code
int left, right;
pls::invoke_parallel(
[&] { left = fib(n - 1); },
[&] { right = fib(n - 2); }
);
return left + right;
}
int main() {
pls::scheduler scheduler{&my_scheduler_memory, 8};
scheduler.perform_work([] {
auto start = std::chrono::high_resolution_clock::now();
// Call looks just the same, only requirement is
// the enclosure in the perform_work lambda.
long result = fib(30);
auto end = std::chrono::high_resolution_clock::now();
long time = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
std::cout << "Fib(30)=" << result << std::endl;
std::cout << "Execution time in us: " << time << std::endl;
});
}
......@@ -5,13 +5,14 @@
#include <atomic>
#include <pls/pls.h>
#include <pls/internal/base/prohibit_new.h>
using namespace pls;
// Example for static memory allocation (no malloc or free required)
static static_scheduler_memory<8, 2 << 12> my_scheduler_memory;
class fib: public tbb_sub_task {
class fib: public fork_join_sub_task {
static constexpr int CUTOFF = 20;
int num_;
......@@ -42,30 +43,24 @@ protected:
int left_result;
int right_result;
fib left_child{num_ - 1, &left_result};
spawn_child(left_child);
fib right_child{num_ - 2, &right_result};
spawn_child(right_child);
spawn_child(fib{num_ - 1, &left_result});
spawn_child(fib{num_ - 2, &right_result});
wait_for_all();
*result_ = left_result + right_result;
}
public:
void test() override {
std::cout << "Test Override" << std::endl;
}
};
int main() {
scheduler my_scheduler{&my_scheduler_memory, 1};
scheduler my_scheduler{&my_scheduler_memory, 4};
auto start = std::chrono::high_resolution_clock::now();
my_scheduler.perform_work([] (){
int result;
fib fib_sub_task{45, &result};
tbb_task tbb_task{&fib_sub_task};
fork_join_task tbb_task{&fib_sub_task, task_id{1}};
scheduler::execute_task(tbb_task);
std::cout << "Result: " << result << std::endl;
......
......@@ -12,8 +12,9 @@ add_library(pls STATIC
src/internal/base/aligned_stack.cpp include/pls/internal/base/aligned_stack.h
include/pls/internal/base/system_details.h
src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h
src/internal/scheduling/tbb_task.cpp include/pls/internal/scheduling/tbb_task.h
src/internal/base/deque.cpp include/pls/internal/base/deque.h)
src/internal/scheduling/fork_join_task.cpp include/pls/internal/scheduling/fork_join_task.h
src/internal/base/deque.cpp include/pls/internal/base/deque.h
src/algorithms/invoke_parallel.cpp include/pls/algorithms/invoke_parallel.h include/pls/internal/base/error_handling.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......@@ -31,18 +32,30 @@ target_link_libraries(pls
# Rules for istalling the library on a system
# ...binaries
install(TARGETS pls
INSTALL(TARGETS pls
EXPORT pls-targets
LIBRARY
DESTINATION lib
DESTINATION lib/pls
ARCHIVE
DESTINATION lib
DESTINATION lib/pls
)
# ...all headers in `include`
INSTALL (
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/
INSTALL(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/pls
DESTINATION include
FILES_MATCHING PATTERN "*.h*"
)
# ...allow our project to be a cmake dependency
install(
EXPORT pls-targets
FILE plsTargets.cmake
NAMESPACE pls::
DESTINATION lib/pls
)
INSTALl(
FILES pls-config.cmake
DESTINATION lib/pls
)
# Enable warnings/tidy code checking from our compiler
target_compile_options(pls PRIVATE
......
#ifndef PLS_PARALLEL_INVOKE_H
#define PLS_PARALLEL_INVOKE_H
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
namespace algorithm {
namespace internal {
using namespace ::pls::internal::scheduling;
template<typename Body>
inline void run_body(const Body& internal_body, const abstract_task::id& id) {
// Make sure we are in the context of this invoke_parallel instance,
// if not we will spawn it as a new 'fork-join-style' task.
auto current_task = scheduler::current_task();
if (current_task->unique_id() == id) {
auto current_sub_task = reinterpret_cast<fork_join_task*>(current_task)->currently_executing();
internal_body(current_sub_task);
} else {
fork_join_lambda<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
template<typename Function1, typename Function2>
void invoke_parallel(const Function1& function1, const Function2& function2) {
using namespace ::pls::internal::scheduling;
static abstract_task::id id{PLS_UNIQUE_ID, true};
auto internal_body = [&] (fork_join_sub_task* this_task){
auto sub_task_body_1 = [&] (fork_join_sub_task*){ function1(); };
fork_join_lambda<decltype(sub_task_body_1)> sub_task_1(&sub_task_body_1);
this_task->spawn_child(sub_task_1);
function2(); // Execute last function 'inline' without spawning a sub_task object
this_task->wait_for_all();
};
internal::run_body(internal_body, id);
}
// ...and so on, add more if we decide to keep this design
}
}
#endif //PLS_PARALLEL_INVOKE_H
......@@ -5,6 +5,8 @@
#include <cstdint>
#include <cstdlib>
#include "pls/internal/base/error_handling.h"
namespace pls {
namespace internal {
namespace base {
......@@ -41,7 +43,7 @@ namespace pls {
// Move head to next aligned position after new object
head_ = next_alignment(head_ + sizeof(T));
if (head_ >= memory_end_) {
exit(1); // TODO: Exception Handling
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
}
return result;
......
#ifndef PLS_ERROR_HANDLING_H
#define PLS_ERROR_HANDLING_H
#include <iostream>
// TODO: Figure out proper exception handling
#define PLS_ERROR(msg) std::cout << msg << std::endl; exit(1);
#endif //PLS_ERROR_HANDLING_H
......@@ -10,12 +10,25 @@ namespace pls {
namespace internal {
namespace scheduling {
class abstract_task {
public:
struct id {
uint32_t id_;
bool auto_generated_;
explicit id(uint32_t id, bool auto_generated=false): id_{id}, auto_generated_{auto_generated} {};
bool operator==(const abstract_task::id& other) const {
return id_ == other.id_ && auto_generated_ == other.auto_generated_;
}
};
private:
int depth_;
int unique_id_;
abstract_task::id unique_id_;
abstract_task* child_task_;
public:
abstract_task(int depth, int unique_id):
abstract_task(const int depth, const abstract_task::id& unique_id):
depth_{depth},
unique_id_{unique_id},
child_task_{nullptr} {}
......@@ -25,7 +38,8 @@ namespace pls {
abstract_task* child() { return child_task_; }
void set_depth(int depth) { depth_ = depth; }
int depth() { return depth_; }
int depth() const { return depth_; }
id unique_id() const { return unique_id_; }
protected:
virtual bool internal_stealing(abstract_task* other_task) = 0;
virtual bool split_task(base::spin_lock* lock) = 0;
......
......@@ -11,68 +11,75 @@
namespace pls {
namespace internal {
namespace scheduling {
class tbb_task;
class tbb_sub_task: public base::deque_item {
friend class tbb_task;
class fork_join_task;
class fork_join_sub_task: public base::deque_item {
friend class fork_join_task;
// Coordinate finishing of sub_tasks
std::atomic_uint32_t ref_count_;
tbb_sub_task* parent_;
fork_join_sub_task* parent_;
// Access to TBB scheduling environment
tbb_task* tbb_task_;
fork_join_task* tbb_task_;
// Stack Management (reset stack pointer after wait_for_all() calls)
base::aligned_stack::state stack_state_;
protected:
explicit tbb_sub_task();
tbb_sub_task(const tbb_sub_task& other);
explicit fork_join_sub_task();
fork_join_sub_task(const fork_join_sub_task& other);
// Overwritten with behaviour of child tasks
virtual void execute_internal() = 0;
// SubClass Implementations:
// Do Work
// |-- Spawn Sub Task (new subtask; spawn(subtask);)
// |-- Spawn Sub task
// Do Work
// |-- Wait For All
// Do Work
// |-- Spawn Sub Task
public:
// Only use them when actually executing this sub_task (only public for simpler API design)
template<typename T>
void spawn_child(const T& sub_task);
void wait_for_all();
private:
void spawn_child_internal(tbb_sub_task* sub_task);
void spawn_child_internal(fork_join_sub_task* sub_task);
void execute();
};
template<typename Function>
class fork_join_lambda: public fork_join_sub_task {
const Function* function_;
public:
virtual void test() {
std::cout << "Test" << std::endl;
explicit fork_join_lambda(const Function* function): function_{function} {};
protected:
void execute_internal() override {
(*function_)(this);
}
};
class tbb_task: public abstract_task {
friend class tbb_sub_task;
class fork_join_task: public abstract_task {
friend class fork_join_sub_task;
tbb_sub_task* root_task_;
fork_join_sub_task* root_task_;
fork_join_sub_task* currently_executing_;
base::aligned_stack* my_stack_;
// Double-Ended Queue management
base::deque<tbb_sub_task> deque_;
base::deque<fork_join_sub_task> deque_;
// Steal Management
tbb_sub_task* last_stolen_;
fork_join_sub_task* last_stolen_;
tbb_sub_task* get_local_sub_task();
tbb_sub_task* get_stolen_sub_task();
fork_join_sub_task* get_local_sub_task();
fork_join_sub_task* get_stolen_sub_task();
bool internal_stealing(abstract_task* other_task) override;
bool split_task(base::spin_lock* /*lock*/) override;
public:
explicit tbb_task(tbb_sub_task* root_task):
abstract_task{0, 0},
explicit fork_join_task(fork_join_sub_task* root_task, const abstract_task::id& id):
abstract_task{0, id},
root_task_{root_task},
currently_executing_{nullptr},
my_stack_{nullptr},
deque_{},
last_stolen_{nullptr} {};
......@@ -85,11 +92,13 @@ namespace pls {
// Execute it on our OS thread until its finished
root_task_->execute();
}
fork_join_sub_task* currently_executing() const { return currently_executing_; }
};
template<typename T>
void tbb_sub_task::spawn_child(const T& task) {
static_assert(std::is_base_of<tbb_sub_task, T>::value, "Only pass tbb_sub_task subclasses!");
void fork_join_sub_task::spawn_child(const T& task) {
static_assert(std::is_base_of<fork_join_sub_task, T>::value, "Only pass fork_join_sub_task subclasses!");
T* new_task = tbb_task_->my_stack_->push(task);
spawn_child_internal(new_task);
......
......@@ -19,7 +19,7 @@ namespace pls {
base::spin_lock finished_lock_;
public:
explicit root_task(Function function):
abstract_task{0, 0},
abstract_task{0, id{0}},
function_{function},
finished_{false} {}
......@@ -51,7 +51,7 @@ namespace pls {
public:
explicit root_worker_task(root_task<Function>* master_task):
abstract_task{0, 0},
abstract_task{0, id{0}},
master_task_{master_task} {}
void execute() override {
......
......@@ -37,7 +37,7 @@ namespace pls {
}
public:
run_on_n_threads_task(Function function, int num_threads):
abstract_task{PLS_UNIQUE_ID, 0},
abstract_task{0, id{PLS_UNIQUE_ID, true}},
function_{function},
counter{num_threads - 1} {}
......@@ -66,7 +66,7 @@ namespace pls {
run_on_n_threads_task<Function>* root_;
public:
run_on_n_threads_task_worker(Function function, run_on_n_threads_task<Function>* root):
abstract_task{PLS_UNIQUE_ID, 0},
abstract_task{0, id{PLS_UNIQUE_ID, true}},
function_{function},
root_{root} {}
......
......@@ -123,6 +123,8 @@ namespace pls {
}
}
static abstract_task* current_task() { return base::this_thread::state<thread_state>()->current_task_; }
void terminate(bool wait_for_workers=true);
unsigned int num_threads() const { return num_threads_; }
thread_state* thread_state_for(size_t id) { return memory_->thread_state_for(id); }
......
#ifndef PLS_LIBRARY_H
#define PLS_LIBRARY_H
#include <pls/internal/scheduling/scheduler.h>
#include <pls/internal/scheduling/tbb_task.h>
#include "pls/algorithms/invoke_parallel.h"
#include "pls/internal/scheduling/abstract_task.h"
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
using internal::scheduling::scheduler;
using internal::scheduling::static_scheduler_memory;
using task_id = internal::scheduling::abstract_task::id;
using internal::scheduling::tbb_sub_task;
using internal::scheduling::tbb_task;
using internal::scheduling::fork_join_sub_task;
using internal::scheduling::fork_join_task;
using algorithm::invoke_parallel;
}
#endif
get_filename_component(SELF_DIR "${CMAKE_CURRENT_LIST_FILE}" PATH)
include(${SELF_DIR}/plsTargets.cmake)
#include "pls/algorithms/invoke_parallel.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/tbb_task.h"
#include "pls/internal/scheduling/fork_join_task.h"
namespace pls {
namespace internal {
namespace scheduling {
tbb_sub_task::tbb_sub_task():
fork_join_sub_task::fork_join_sub_task():
base::deque_item{},
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
tbb_sub_task::tbb_sub_task(const tbb_sub_task& other): base::deque_item(other) {
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other): base::deque_item(other) {
// Do Nothing, will be inited after this anyways
}
void tbb_sub_task::execute() {
void fork_join_sub_task::execute() {
tbb_task_->currently_executing_ = this;
execute_internal();
tbb_task_->currently_executing_ = nullptr;
wait_for_all();
if (parent_ != nullptr) {
......@@ -24,7 +26,7 @@ namespace pls {
}
}
void tbb_sub_task::spawn_child_internal(tbb_sub_task* sub_task) {
void fork_join_sub_task::spawn_child_internal(fork_join_sub_task* sub_task) {
// Keep our refcount up to date
ref_count_++;
......@@ -36,9 +38,9 @@ namespace pls {
tbb_task_->deque_.push_tail(sub_task);
}
void tbb_sub_task::wait_for_all() {
void fork_join_sub_task::wait_for_all() {
while (ref_count_ > 0) {
tbb_sub_task* local_task = tbb_task_->get_local_sub_task();
fork_join_sub_task* local_task = tbb_task_->get_local_sub_task();
if (local_task != nullptr) {
local_task->execute();
} else {
......@@ -52,22 +54,22 @@ namespace pls {
tbb_task_->my_stack_->reset_state(stack_state_);
}
tbb_sub_task* tbb_task::get_local_sub_task() {
fork_join_sub_task* fork_join_task::get_local_sub_task() {
return deque_.pop_tail();
}
tbb_sub_task* tbb_task::get_stolen_sub_task() {
fork_join_sub_task* fork_join_task::get_stolen_sub_task() {
return deque_.pop_head();
}
bool tbb_task::internal_stealing(abstract_task* other_task) {
auto cast_other_task = reinterpret_cast<tbb_task*>(other_task);
bool fork_join_task::internal_stealing(abstract_task* other_task) {
auto cast_other_task = reinterpret_cast<fork_join_task*>(other_task);
auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
} else {
// Make sub-task belong to our tbb_task instance
// Make sub-task belong to our fork_join_task instance
stolen_sub_task->tbb_task_ = this;
stolen_sub_task->stack_state_ = my_stack_->save_state();
// We will execute this next without explicitly moving it onto our stack storage
......@@ -77,12 +79,12 @@ namespace pls {
}
}
bool tbb_task::split_task(base::spin_lock* lock) {
tbb_sub_task* stolen_sub_task = get_stolen_sub_task();
bool fork_join_task::split_task(base::spin_lock* lock) {
fork_join_sub_task* stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
}
tbb_task task{stolen_sub_task};
fork_join_task task{stolen_sub_task, this->unique_id()};
// In success case, unlock.
// TODO: this locking is complicated and error prone.
......
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/base/error_handling.h"
namespace pls {
namespace internal {
......@@ -9,7 +10,7 @@ namespace pls {
sync_barrier_{num_threads + 1},
terminated_{false} {
if (num_threads > MAX_THREADS) {
exit(1); // TODO: Exception Handling
PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
}
for (unsigned int i = 0; i < num_threads; i++) {
......
......@@ -4,7 +4,7 @@
using namespace pls;
class once_sub_task: public tbb_sub_task {
class once_sub_task: public fork_join_sub_task {
std::atomic<int>* counter_;
int children_;
......@@ -18,12 +18,12 @@ protected:
public:
explicit once_sub_task(std::atomic<int>* counter, int children):
tbb_sub_task(),
fork_join_sub_task(),
counter_{counter},
children_{children} {}
};
class force_steal_sub_task: public tbb_sub_task {
class force_steal_sub_task: public fork_join_sub_task {
std::atomic<int>* parent_counter_;
std::atomic<int>* overall_counter_;
......@@ -42,12 +42,12 @@ protected:
public:
explicit force_steal_sub_task(std::atomic<int>* parent_counter, std::atomic<int>* overall_counter):
tbb_sub_task(),
fork_join_sub_task(),
parent_counter_{parent_counter},
overall_counter_{overall_counter} {}
};
TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/tbb_task.h]") {
TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") {
static static_scheduler_memory<8, 2 << 12> my_scheduler_memory;
SECTION("tasks are executed exactly once") {
......@@ -58,7 +58,7 @@ TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/tbb_task.h]
my_scheduler.perform_work([&] (){
once_sub_task sub_task{&counter, start_counter};
tbb_task task{&sub_task};
fork_join_task task{&sub_task, task_id{42}};
scheduler::execute_task(task);
});
......@@ -71,7 +71,7 @@ TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/tbb_task.h]
my_scheduler.perform_work([&] (){
std::atomic<int> dummy_parent{1}, overall_counter{8};
force_steal_sub_task sub_task{&dummy_parent, &overall_counter};
tbb_task task{&sub_task};
fork_join_task task{&sub_task, task_id{42}};
scheduler::execute_task(task);
});
my_scheduler.terminate(true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment