diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 66ed5b1..77edb49 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -1,65 +1,66 @@ # List all required files here (cmake best practice to NOT automate this step!) add_library(pls STATIC - include/pls/pls.h src/pls.cpp + include/pls/pls.h src/pls.cpp include/pls/algorithms/invoke_parallel.h include/pls/algorithms/invoke_parallel_impl.h include/pls/internal/base/spin_lock.h - include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp - include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp - include/pls/internal/base/thread.h src/internal/base/thread.cpp + include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp + include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp + include/pls/internal/base/swmr_spin_lock.h src/internal/base/swmr_spin_lock.cpp + include/pls/internal/base/thread.h src/internal/base/thread.cpp include/pls/internal/base/thread_impl.h - include/pls/internal/base/barrier.h src/internal/base/barrier.cpp + include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/system_details.h include/pls/internal/base/error_handling.h - include/pls/internal/base/alignment.h src/internal/base/alignment.cpp + include/pls/internal/base/alignment.h src/internal/base/alignment.cpp - include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp + include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h - include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp + include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/profiler.h include/pls/internal/helpers/mini_benchmark.h include/pls/internal/helpers/unique_id.h - include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp - include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp - include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp - include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp + include/pls/internal/scheduling/root_task.h src/internal/scheduling/root_task.cpp + include/pls/internal/scheduling/thread_state.h src/internal/scheduling/thread_state.cpp + include/pls/internal/scheduling/abstract_task.h src/internal/scheduling/abstract_task.cpp + include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp - include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp - include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp -) + include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp + include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp + ) # Add everything in `./include` to be in the include path of this project target_include_directories(pls PUBLIC - $ - $ + $ + $ PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR}/src # TODO: Set this up when we require private headers -) + ${CMAKE_CURRENT_SOURCE_DIR}/src # TODO: Set this up when we require private headers + ) # Add cmake dependencies here if needed target_link_libraries(pls Threads::Threads # pthread support -) -if(EASY_PROFILER) + ) +if (EASY_PROFILER) target_link_libraries(pls easy_profiler) -endif() +endif () # Rules for istalling the library on a system # ...binaries INSTALL(TARGETS pls EXPORT pls-targets LIBRARY - DESTINATION lib/pls + DESTINATION lib/pls ARCHIVE - DESTINATION lib/pls -) + DESTINATION lib/pls + ) # ...all headers in `include` INSTALL( DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/pls diff --git a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h index 44cbc74..2fd05ce 100644 --- a/lib/pls/include/pls/algorithms/invoke_parallel_impl.h +++ b/lib/pls/include/pls/algorithms/invoke_parallel_impl.h @@ -5,6 +5,7 @@ #include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/scheduler.h" #include "pls/internal/helpers/unique_id.h" +#include "pls/internal/base/alignment.h" namespace pls { namespace algorithm { @@ -21,7 +22,7 @@ inline void run_body(const Body &internal_body, const abstract_task::id &id) { auto current_sub_task = reinterpret_cast(current_task)->currently_executing(); internal_body(current_sub_task); } else { - fork_join_lambda root_body(&internal_body); + fork_join_lambda_by_reference root_body(&internal_body); fork_join_task root_task{&root_body, id}; scheduler::execute_task(root_task); } @@ -32,14 +33,15 @@ template void invoke_parallel(const Function1 &function1, const Function2 &function2) { using namespace ::pls::internal::scheduling; using namespace ::pls::internal::helpers; + using namespace ::pls::internal::base; static abstract_task::id id = unique_id::create(); auto internal_body = [&](fork_join_sub_task *this_task) { - auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); }; - auto sub_task_1 = fork_join_lambda(&sub_task_body_1); + auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); }; + auto sub_task_2 = fork_join_lambda_by_reference(&sub_task_body_2); - this_task->spawn_child(sub_task_1); - function2(); // Execute last function 'inline' without spawning a sub_task object + this_task->spawn_child(sub_task_2); + function1(); // Execute first function 'inline' without spawning a sub_task object this_task->wait_for_all(); }; @@ -53,14 +55,14 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2, con static abstract_task::id id = unique_id::create(); auto internal_body = [&](fork_join_sub_task *this_task) { - auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); }; - auto sub_task_1 = fork_join_lambda(&sub_task_body_1); auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); }; - auto sub_task_2 = fork_join_lambda(&sub_task_body_2); + auto sub_task_2 = fork_join_lambda_by_reference(&sub_task_body_2); + auto sub_task_body_3 = [&](fork_join_sub_task *) { function3(); }; + auto sub_task_3 = fork_join_lambda_by_reference(&sub_task_body_3); - this_task->spawn_child(sub_task_1); this_task->spawn_child(sub_task_2); - function3(); // Execute last function 'inline' without spawning a sub_task object + this_task->spawn_child(sub_task_3); + function1(); // Execute first function 'inline' without spawning a sub_task object this_task->wait_for_all(); }; diff --git a/lib/pls/include/pls/internal/base/swmr_spin_lock.h b/lib/pls/include/pls/internal/base/swmr_spin_lock.h new file mode 100644 index 0000000..bfe9284 --- /dev/null +++ b/lib/pls/include/pls/internal/base/swmr_spin_lock.h @@ -0,0 +1,38 @@ + +#ifndef PLS_SWMR_SPIN_LOCK_LOCK_H_ +#define PLS_SWMR_SPIN_LOCK_LOCK_H_ + +#include + +#include "pls/internal/helpers/profiler.h" + +namespace pls { +namespace internal { +namespace base { + +/** + * Single writer, multiple reader spin lock. + * The writer is required to be the same thread all the time (single writer), + * while multiple threads can read. + * Readers fail to lock when the writer requests the lock, + * the acquires the lock after all remaining readers left the critical section. + */ +class swmr_spin_lock { + std::atomic readers_; + std::atomic write_request_; + + public: + explicit swmr_spin_lock() : readers_{0}, write_request_{0} {} + + bool reader_try_lock(); + void reader_unlock(); + + void writer_lock(); + void writer_unlock(); +}; + +} +} +} + +#endif //PLS_SWMR_SPIN_LOCK_LOCK_H_ diff --git a/lib/pls/include/pls/internal/scheduling/abstract_task.h b/lib/pls/include/pls/internal/scheduling/abstract_task.h index 45e5490..868ad24 100644 --- a/lib/pls/include/pls/internal/scheduling/abstract_task.h +++ b/lib/pls/include/pls/internal/scheduling/abstract_task.h @@ -2,7 +2,7 @@ #ifndef PLS_ABSTRACT_TASK_H #define PLS_ABSTRACT_TASK_H -#include "pls/internal/base/spin_lock.h" +#include "pls/internal/base/swmr_spin_lock.h" #include "pls/internal/helpers/unique_id.h" namespace pls { @@ -33,7 +33,7 @@ class abstract_task { id unique_id() const { return unique_id_; } protected: virtual bool internal_stealing(abstract_task *other_task) = 0; - virtual bool split_task(base::spin_lock *lock) = 0; + virtual bool split_task(base::swmr_spin_lock *lock) = 0; bool steal_work(); }; diff --git a/lib/pls/include/pls/internal/scheduling/fork_join_task.h b/lib/pls/include/pls/internal/scheduling/fork_join_task.h index ffa98a5..ffa28b6 100644 --- a/lib/pls/include/pls/internal/scheduling/fork_join_task.h +++ b/lib/pls/include/pls/internal/scheduling/fork_join_task.h @@ -46,11 +46,11 @@ class fork_join_sub_task : public data_structures::deque_item { }; template -class fork_join_lambda : public fork_join_sub_task { +class fork_join_lambda_by_reference : public fork_join_sub_task { const Function *function_; public: - explicit fork_join_lambda(const Function *function) : function_{function} {}; + explicit fork_join_lambda_by_reference(const Function *function) : function_{function} {}; protected: void execute_internal() override { @@ -58,6 +58,19 @@ class fork_join_lambda : public fork_join_sub_task { } }; +template +class fork_join_lambda_by_value : public fork_join_sub_task { + const Function function_; + + public: + explicit fork_join_lambda_by_value(const Function &function) : function_{function} {}; + + protected: + void execute_internal() override { + function_(this); + } +}; + class fork_join_task : public abstract_task { friend class fork_join_sub_task; @@ -75,7 +88,7 @@ class fork_join_task : public abstract_task { fork_join_sub_task *get_stolen_sub_task(); bool internal_stealing(abstract_task *other_task) override; - bool split_task(base::spin_lock * /*lock*/) override; + bool split_task(base::swmr_spin_lock * /*lock*/) override; public: explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id); diff --git a/lib/pls/include/pls/internal/scheduling/root_task.h b/lib/pls/include/pls/internal/scheduling/root_task.h index eff93e5..32a6ea2 100644 --- a/lib/pls/include/pls/internal/scheduling/root_task.h +++ b/lib/pls/include/pls/internal/scheduling/root_task.h @@ -5,7 +5,7 @@ #include #include "pls/internal/helpers/profiler.h" -#include "pls/internal/base/spin_lock.h" +#include "pls/internal/base/swmr_spin_lock.h" #include "abstract_task.h" @@ -43,7 +43,7 @@ class root_task : public abstract_task { return false; } - bool split_task(base::spin_lock * /*lock*/) override { + bool split_task(base::swmr_spin_lock * /*lock*/) override { return false; } }; @@ -70,7 +70,7 @@ class root_worker_task : public abstract_task { return false; } - bool split_task(base::spin_lock * /*lock*/) override { + bool split_task(base::swmr_spin_lock * /*lock*/) override { return false; } }; diff --git a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h index 0a708ff..f412cbd 100644 --- a/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h +++ b/lib/pls/include/pls/internal/scheduling/run_on_n_threads_task.h @@ -61,7 +61,7 @@ class run_on_n_threads_task : public abstract_task { return false; } - bool split_task(base::spin_lock *lock) override; + bool split_task(base::swmr_spin_lock *lock) override; }; template @@ -89,19 +89,18 @@ class run_on_n_threads_task_worker : public abstract_task { return false; } - bool split_task(base::spin_lock * /*lock*/) override { + bool split_task(base::swmr_spin_lock * /*lock*/) override { return false; } }; template -bool run_on_n_threads_task::split_task(base::spin_lock *lock) { +bool run_on_n_threads_task::split_task(base::swmr_spin_lock *lock) { if (get_counter() <= 0) { return false; } // In success case, unlock. - // TODO: this locking is complicated and error prone. - lock->unlock(); + lock->reader_unlock(); auto scheduler = base::this_thread::state()->scheduler_; auto task = run_on_n_threads_task_worker{function_, this}; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index fcfc1dd..0af46c7 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -43,27 +43,29 @@ void scheduler::execute_task(Task &task, int depth) { abstract_task *new_task; // Init Task - { - std::lock_guard lock{my_state->lock_}; - old_task = my_state->current_task_; - new_task = my_state->task_stack_->push(task); + old_task = my_state->current_task_; + new_task = my_state->task_stack_->push(task); + + new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); - new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); + { + my_state->lock_.writer_lock(); my_state->current_task_ = new_task; old_task->set_child(new_task); + my_state->lock_.writer_unlock(); } // Run Task new_task->execute(); // Teardown state back to before the task was executed - { - std::lock_guard lock{my_state->lock_}; + my_state->task_stack_->pop(); + { + my_state->lock_.writer_lock(); old_task->set_child(nullptr); my_state->current_task_ = old_task; - - my_state->task_stack_->pop(); + my_state->lock_.writer_unlock(); } } diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 22154f8..0efbf78 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -5,6 +5,7 @@ #include #include "pls/internal/data_structures/aligned_stack.h" +#include "pls/internal/base/swmr_spin_lock.h" #include "abstract_task.h" namespace pls { @@ -15,13 +16,13 @@ namespace scheduling { class scheduler; struct thread_state { - scheduler *scheduler_; - abstract_task *root_task_; - abstract_task *current_task_; - data_structures::aligned_stack *task_stack_; - size_t id_; - base::spin_lock lock_; - std::minstd_rand random_; + alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_; + alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *root_task_; + alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *current_task_; + alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_; + alignas(base::system_details::CACHE_LINE_SIZE) size_t id_; + alignas(base::system_details::CACHE_LINE_SIZE) base::swmr_spin_lock lock_; + alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; thread_state() : scheduler_{nullptr}, @@ -29,6 +30,7 @@ struct thread_state { current_task_{nullptr}, task_stack_{nullptr}, id_{0}, + lock_{}, random_{id_} {}; thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) : @@ -37,6 +39,7 @@ struct thread_state { current_task_{nullptr}, task_stack_{task_stack}, id_{id}, + lock_{}, random_{id_} {} }; diff --git a/lib/pls/src/internal/base/swmr_spin_lock.cpp b/lib/pls/src/internal/base/swmr_spin_lock.cpp new file mode 100644 index 0000000..7bc5f7a --- /dev/null +++ b/lib/pls/src/internal/base/swmr_spin_lock.cpp @@ -0,0 +1,45 @@ +#include "pls/internal/base/swmr_spin_lock.h" +#include "pls/internal/base/system_details.h" + +namespace pls { +namespace internal { +namespace base { + +bool swmr_spin_lock::reader_try_lock() { + PROFILE_LOCK("Try Acquire Read Lock") + if (write_request_.load(std::memory_order_relaxed) == 1) { + return false; + } + // We think we can enter the region + readers_++; + if (write_request_.load() == 1) { + // Whoops, the writer acquires the lock, so we back off again + readers_--; + return false; + } + + return true; +} + +void swmr_spin_lock::reader_unlock() { + PROFILE_LOCK("Release Read Lock") + readers_--; +} + +void swmr_spin_lock::writer_lock() { + PROFILE_LOCK("Acquire Write Lock") + // Tell the readers that we would like to write + write_request_ = 1; + // Wait for all of them to exit the critical section + while (readers_.load(std::memory_order_relaxed) > 0) + system_details::relax_cpu(); // Spin, not expensive as relaxed load +} + +void swmr_spin_lock::writer_unlock() { + PROFILE_LOCK("Release Write Lock") + write_request_ = 0; +} + +} +} +} diff --git a/lib/pls/src/internal/scheduling/abstract_task.cpp b/lib/pls/src/internal/scheduling/abstract_task.cpp index 50a7003..b82b452 100644 --- a/lib/pls/src/internal/scheduling/abstract_task.cpp +++ b/lib/pls/src/internal/scheduling/abstract_task.cpp @@ -15,7 +15,7 @@ bool abstract_task::steal_work() { const size_t my_id = my_state->id_; const size_t offset = my_state->random_() % my_scheduler->num_threads(); - const size_t max_tries = 1; // my_scheduler->num_threads(); TODO: Tune this value + const size_t max_tries = my_scheduler->num_threads(); // TODO: Tune this value for (size_t i = 0; i < max_tries; i++) { size_t target = (offset + i) % my_scheduler->num_threads(); if (target == my_id) { @@ -23,8 +23,9 @@ bool abstract_task::steal_work() { } auto target_state = my_scheduler->thread_state_for(target); - // TODO: Cleaner Locking Using std::guarded_lock - target_state->lock_.lock(); + if (!target_state->lock_.reader_try_lock()) { + continue; + } // Dig down to our level PROFILE_STEALING("Go to our level") @@ -42,7 +43,7 @@ bool abstract_task::steal_work() { current_task->depth_ == depth_) { if (internal_stealing(current_task)) { // internal steal was a success, hand it back to the internal scheduler - target_state->lock_.unlock(); + target_state->lock_.reader_unlock(); return true; } @@ -59,14 +60,14 @@ bool abstract_task::steal_work() { while (current_task != nullptr) { auto lock = &target_state->lock_; if (current_task->split_task(lock)) { - // internal steal was no success (we did a top level task steal) + // top level steal was a success (we did a top level task steal) return false; } current_task = current_task->child_task_; } PROFILE_END_BLOCK; - target_state->lock_.unlock(); + target_state->lock_.reader_unlock(); } // internal steal was no success diff --git a/lib/pls/src/internal/scheduling/fork_join_task.cpp b/lib/pls/src/internal/scheduling/fork_join_task.cpp index ed30ee9..cba2bc7 100644 --- a/lib/pls/src/internal/scheduling/fork_join_task.cpp +++ b/lib/pls/src/internal/scheduling/fork_join_task.cpp @@ -93,7 +93,7 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) { } } -bool fork_join_task::split_task(base::spin_lock *lock) { +bool fork_join_task::split_task(base::swmr_spin_lock *lock) { PROFILE_STEALING("fork_join_task::split_task") fork_join_sub_task *stolen_sub_task = get_stolen_sub_task(); if (stolen_sub_task == nullptr) { @@ -102,8 +102,7 @@ bool fork_join_task::split_task(base::spin_lock *lock) { fork_join_task task{stolen_sub_task, this->unique_id()}; // In success case, unlock. - // TODO: this locking is complicated and error prone. - lock->unlock(); + lock->reader_unlock(); scheduler::execute_task(task, depth()); return true;