Commit 0548562e by FritzFlorian

Replace top-level-task-stack lock by swmr lock.

parent d38f584b
...@@ -8,6 +8,7 @@ add_library(pls STATIC ...@@ -8,6 +8,7 @@ add_library(pls STATIC
include/pls/internal/base/spin_lock.h include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp include/pls/internal/base/ttas_spin_lock.h src/internal/base/ttas_spin_lock.cpp
include/pls/internal/base/swmr_spin_lock.h src/internal/base/swmr_spin_lock.cpp
include/pls/internal/base/thread.h src/internal/base/thread.cpp include/pls/internal/base/thread.h src/internal/base/thread.cpp
include/pls/internal/base/thread_impl.h include/pls/internal/base/thread_impl.h
include/pls/internal/base/barrier.h src/internal/base/barrier.cpp include/pls/internal/base/barrier.h src/internal/base/barrier.cpp
...@@ -32,7 +33,7 @@ add_library(pls STATIC ...@@ -32,7 +33,7 @@ add_library(pls STATIC
include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp
include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
) )
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
...@@ -41,15 +42,15 @@ target_include_directories(pls ...@@ -41,15 +42,15 @@ target_include_directories(pls
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
PRIVATE PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/src # TODO: Set this up when we require private headers ${CMAKE_CURRENT_SOURCE_DIR}/src # TODO: Set this up when we require private headers
) )
# Add cmake dependencies here if needed # Add cmake dependencies here if needed
target_link_libraries(pls target_link_libraries(pls
Threads::Threads # pthread support Threads::Threads # pthread support
) )
if(EASY_PROFILER) if (EASY_PROFILER)
target_link_libraries(pls easy_profiler) target_link_libraries(pls easy_profiler)
endif() endif ()
# Rules for istalling the library on a system # Rules for istalling the library on a system
# ...binaries # ...binaries
...@@ -59,7 +60,7 @@ INSTALL(TARGETS pls ...@@ -59,7 +60,7 @@ INSTALL(TARGETS pls
DESTINATION lib/pls DESTINATION lib/pls
ARCHIVE ARCHIVE
DESTINATION lib/pls DESTINATION lib/pls
) )
# ...all headers in `include` # ...all headers in `include`
INSTALL( INSTALL(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/pls DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/pls
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h" #include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
#include "pls/internal/base/alignment.h"
namespace pls { namespace pls {
namespace algorithm { namespace algorithm {
...@@ -21,7 +22,7 @@ inline void run_body(const Body &internal_body, const abstract_task::id &id) { ...@@ -21,7 +22,7 @@ inline void run_body(const Body &internal_body, const abstract_task::id &id) {
auto current_sub_task = reinterpret_cast<fork_join_task *>(current_task)->currently_executing(); auto current_sub_task = reinterpret_cast<fork_join_task *>(current_task)->currently_executing();
internal_body(current_sub_task); internal_body(current_sub_task);
} else { } else {
fork_join_lambda<Body> root_body(&internal_body); fork_join_lambda_by_reference<Body> root_body(&internal_body);
fork_join_task root_task{&root_body, id}; fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task); scheduler::execute_task(root_task);
} }
...@@ -32,14 +33,15 @@ template<typename Function1, typename Function2> ...@@ -32,14 +33,15 @@ template<typename Function1, typename Function2>
void invoke_parallel(const Function1 &function1, const Function2 &function2) { void invoke_parallel(const Function1 &function1, const Function2 &function2) {
using namespace ::pls::internal::scheduling; using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers; using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<Function1, Function2>(); static abstract_task::id id = unique_id::create<Function1, Function2>();
auto internal_body = [&](fork_join_sub_task *this_task) { auto internal_body = [&](fork_join_sub_task *this_task) {
auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); }; auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1); auto sub_task_2 = fork_join_lambda_by_reference<decltype(sub_task_body_2)>(&sub_task_body_2);
this_task->spawn_child(sub_task_1); this_task->spawn_child(sub_task_2);
function2(); // Execute last function 'inline' without spawning a sub_task object function1(); // Execute first function 'inline' without spawning a sub_task object
this_task->wait_for_all(); this_task->wait_for_all();
}; };
...@@ -53,14 +55,14 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2, con ...@@ -53,14 +55,14 @@ void invoke_parallel(const Function1 &function1, const Function2 &function2, con
static abstract_task::id id = unique_id::create<Function1, Function2, Function3>(); static abstract_task::id id = unique_id::create<Function1, Function2, Function3>();
auto internal_body = [&](fork_join_sub_task *this_task) { auto internal_body = [&](fork_join_sub_task *this_task) {
auto sub_task_body_1 = [&](fork_join_sub_task *) { function1(); };
auto sub_task_1 = fork_join_lambda<decltype(sub_task_body_1)>(&sub_task_body_1);
auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); }; auto sub_task_body_2 = [&](fork_join_sub_task *) { function2(); };
auto sub_task_2 = fork_join_lambda<decltype(sub_task_body_2)>(&sub_task_body_2); auto sub_task_2 = fork_join_lambda_by_reference<decltype(sub_task_body_2)>(&sub_task_body_2);
auto sub_task_body_3 = [&](fork_join_sub_task *) { function3(); };
auto sub_task_3 = fork_join_lambda_by_reference<decltype(sub_task_body_3)>(&sub_task_body_3);
this_task->spawn_child(sub_task_1);
this_task->spawn_child(sub_task_2); this_task->spawn_child(sub_task_2);
function3(); // Execute last function 'inline' without spawning a sub_task object this_task->spawn_child(sub_task_3);
function1(); // Execute first function 'inline' without spawning a sub_task object
this_task->wait_for_all(); this_task->wait_for_all();
}; };
......
#ifndef PLS_SWMR_SPIN_LOCK_LOCK_H_
#define PLS_SWMR_SPIN_LOCK_LOCK_H_
#include <atomic>
#include "pls/internal/helpers/profiler.h"
namespace pls {
namespace internal {
namespace base {
/**
* Single writer, multiple reader spin lock.
* The writer is required to be the same thread all the time (single writer),
* while multiple threads can read.
* Readers fail to lock when the writer requests the lock,
* the acquires the lock after all remaining readers left the critical section.
*/
class swmr_spin_lock {
std::atomic<int> readers_;
std::atomic<int> write_request_;
public:
explicit swmr_spin_lock() : readers_{0}, write_request_{0} {}
bool reader_try_lock();
void reader_unlock();
void writer_lock();
void writer_unlock();
};
}
}
}
#endif //PLS_SWMR_SPIN_LOCK_LOCK_H_
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#ifndef PLS_ABSTRACT_TASK_H #ifndef PLS_ABSTRACT_TASK_H
#define PLS_ABSTRACT_TASK_H #define PLS_ABSTRACT_TASK_H
#include "pls/internal/base/spin_lock.h" #include "pls/internal/base/swmr_spin_lock.h"
#include "pls/internal/helpers/unique_id.h" #include "pls/internal/helpers/unique_id.h"
namespace pls { namespace pls {
...@@ -33,7 +33,7 @@ class abstract_task { ...@@ -33,7 +33,7 @@ class abstract_task {
id unique_id() const { return unique_id_; } id unique_id() const { return unique_id_; }
protected: protected:
virtual bool internal_stealing(abstract_task *other_task) = 0; virtual bool internal_stealing(abstract_task *other_task) = 0;
virtual bool split_task(base::spin_lock *lock) = 0; virtual bool split_task(base::swmr_spin_lock *lock) = 0;
bool steal_work(); bool steal_work();
}; };
......
...@@ -46,11 +46,11 @@ class fork_join_sub_task : public data_structures::deque_item { ...@@ -46,11 +46,11 @@ class fork_join_sub_task : public data_structures::deque_item {
}; };
template<typename Function> template<typename Function>
class fork_join_lambda : public fork_join_sub_task { class fork_join_lambda_by_reference : public fork_join_sub_task {
const Function *function_; const Function *function_;
public: public:
explicit fork_join_lambda(const Function *function) : function_{function} {}; explicit fork_join_lambda_by_reference(const Function *function) : function_{function} {};
protected: protected:
void execute_internal() override { void execute_internal() override {
...@@ -58,6 +58,19 @@ class fork_join_lambda : public fork_join_sub_task { ...@@ -58,6 +58,19 @@ class fork_join_lambda : public fork_join_sub_task {
} }
}; };
template<typename Function>
class fork_join_lambda_by_value : public fork_join_sub_task {
const Function function_;
public:
explicit fork_join_lambda_by_value(const Function &function) : function_{function} {};
protected:
void execute_internal() override {
function_(this);
}
};
class fork_join_task : public abstract_task { class fork_join_task : public abstract_task {
friend class fork_join_sub_task; friend class fork_join_sub_task;
...@@ -75,7 +88,7 @@ class fork_join_task : public abstract_task { ...@@ -75,7 +88,7 @@ class fork_join_task : public abstract_task {
fork_join_sub_task *get_stolen_sub_task(); fork_join_sub_task *get_stolen_sub_task();
bool internal_stealing(abstract_task *other_task) override; bool internal_stealing(abstract_task *other_task) override;
bool split_task(base::spin_lock * /*lock*/) override; bool split_task(base::swmr_spin_lock * /*lock*/) override;
public: public:
explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id); explicit fork_join_task(fork_join_sub_task *root_task, const abstract_task::id &id);
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
#include <mutex> #include <mutex>
#include "pls/internal/helpers/profiler.h" #include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/spin_lock.h" #include "pls/internal/base/swmr_spin_lock.h"
#include "abstract_task.h" #include "abstract_task.h"
...@@ -43,7 +43,7 @@ class root_task : public abstract_task { ...@@ -43,7 +43,7 @@ class root_task : public abstract_task {
return false; return false;
} }
bool split_task(base::spin_lock * /*lock*/) override { bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false; return false;
} }
}; };
...@@ -70,7 +70,7 @@ class root_worker_task : public abstract_task { ...@@ -70,7 +70,7 @@ class root_worker_task : public abstract_task {
return false; return false;
} }
bool split_task(base::spin_lock * /*lock*/) override { bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false; return false;
} }
}; };
......
...@@ -61,7 +61,7 @@ class run_on_n_threads_task : public abstract_task { ...@@ -61,7 +61,7 @@ class run_on_n_threads_task : public abstract_task {
return false; return false;
} }
bool split_task(base::spin_lock *lock) override; bool split_task(base::swmr_spin_lock *lock) override;
}; };
template<typename Function> template<typename Function>
...@@ -89,19 +89,18 @@ class run_on_n_threads_task_worker : public abstract_task { ...@@ -89,19 +89,18 @@ class run_on_n_threads_task_worker : public abstract_task {
return false; return false;
} }
bool split_task(base::spin_lock * /*lock*/) override { bool split_task(base::swmr_spin_lock * /*lock*/) override {
return false; return false;
} }
}; };
template<typename Function> template<typename Function>
bool run_on_n_threads_task<Function>::split_task(base::spin_lock *lock) { bool run_on_n_threads_task<Function>::split_task(base::swmr_spin_lock *lock) {
if (get_counter() <= 0) { if (get_counter() <= 0) {
return false; return false;
} }
// In success case, unlock. // In success case, unlock.
// TODO: this locking is complicated and error prone. lock->reader_unlock();
lock->unlock();
auto scheduler = base::this_thread::state<thread_state>()->scheduler_; auto scheduler = base::this_thread::state<thread_state>()->scheduler_;
auto task = run_on_n_threads_task_worker<Function>{function_, this}; auto task = run_on_n_threads_task_worker<Function>{function_, this};
......
...@@ -43,27 +43,29 @@ void scheduler::execute_task(Task &task, int depth) { ...@@ -43,27 +43,29 @@ void scheduler::execute_task(Task &task, int depth) {
abstract_task *new_task; abstract_task *new_task;
// Init Task // Init Task
{
std::lock_guard<base::spin_lock> lock{my_state->lock_};
old_task = my_state->current_task_; old_task = my_state->current_task_;
new_task = my_state->task_stack_->push(task); new_task = my_state->task_stack_->push(task);
new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1); new_task->set_depth(depth >= 0 ? depth : old_task->depth() + 1);
{
my_state->lock_.writer_lock();
my_state->current_task_ = new_task; my_state->current_task_ = new_task;
old_task->set_child(new_task); old_task->set_child(new_task);
my_state->lock_.writer_unlock();
} }
// Run Task // Run Task
new_task->execute(); new_task->execute();
// Teardown state back to before the task was executed // Teardown state back to before the task was executed
{ my_state->task_stack_->pop<Task>();
std::lock_guard<base::spin_lock> lock{my_state->lock_};
{
my_state->lock_.writer_lock();
old_task->set_child(nullptr); old_task->set_child(nullptr);
my_state->current_task_ = old_task; my_state->current_task_ = old_task;
my_state->lock_.writer_unlock();
my_state->task_stack_->pop<Task>();
} }
} }
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <random> #include <random>
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
#include "pls/internal/base/swmr_spin_lock.h"
#include "abstract_task.h" #include "abstract_task.h"
namespace pls { namespace pls {
...@@ -15,13 +16,13 @@ namespace scheduling { ...@@ -15,13 +16,13 @@ namespace scheduling {
class scheduler; class scheduler;
struct thread_state { struct thread_state {
scheduler *scheduler_; alignas(base::system_details::CACHE_LINE_SIZE) scheduler *scheduler_;
abstract_task *root_task_; alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *root_task_;
abstract_task *current_task_; alignas(base::system_details::CACHE_LINE_SIZE) abstract_task *current_task_;
data_structures::aligned_stack *task_stack_; alignas(base::system_details::CACHE_LINE_SIZE) data_structures::aligned_stack *task_stack_;
size_t id_; alignas(base::system_details::CACHE_LINE_SIZE) size_t id_;
base::spin_lock lock_; alignas(base::system_details::CACHE_LINE_SIZE) base::swmr_spin_lock lock_;
std::minstd_rand random_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
thread_state() : thread_state() :
scheduler_{nullptr}, scheduler_{nullptr},
...@@ -29,6 +30,7 @@ struct thread_state { ...@@ -29,6 +30,7 @@ struct thread_state {
current_task_{nullptr}, current_task_{nullptr},
task_stack_{nullptr}, task_stack_{nullptr},
id_{0}, id_{0},
lock_{},
random_{id_} {}; random_{id_} {};
thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) : thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) :
...@@ -37,6 +39,7 @@ struct thread_state { ...@@ -37,6 +39,7 @@ struct thread_state {
current_task_{nullptr}, current_task_{nullptr},
task_stack_{task_stack}, task_stack_{task_stack},
id_{id}, id_{id},
lock_{},
random_{id_} {} random_{id_} {}
}; };
......
#include "pls/internal/base/swmr_spin_lock.h"
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace base {
bool swmr_spin_lock::reader_try_lock() {
PROFILE_LOCK("Try Acquire Read Lock")
if (write_request_.load(std::memory_order_relaxed) == 1) {
return false;
}
// We think we can enter the region
readers_++;
if (write_request_.load() == 1) {
// Whoops, the writer acquires the lock, so we back off again
readers_--;
return false;
}
return true;
}
void swmr_spin_lock::reader_unlock() {
PROFILE_LOCK("Release Read Lock")
readers_--;
}
void swmr_spin_lock::writer_lock() {
PROFILE_LOCK("Acquire Write Lock")
// Tell the readers that we would like to write
write_request_ = 1;
// Wait for all of them to exit the critical section
while (readers_.load(std::memory_order_relaxed) > 0)
system_details::relax_cpu(); // Spin, not expensive as relaxed load
}
void swmr_spin_lock::writer_unlock() {
PROFILE_LOCK("Release Write Lock")
write_request_ = 0;
}
}
}
}
...@@ -15,7 +15,7 @@ bool abstract_task::steal_work() { ...@@ -15,7 +15,7 @@ bool abstract_task::steal_work() {
const size_t my_id = my_state->id_; const size_t my_id = my_state->id_;
const size_t offset = my_state->random_() % my_scheduler->num_threads(); const size_t offset = my_state->random_() % my_scheduler->num_threads();
const size_t max_tries = 1; // my_scheduler->num_threads(); TODO: Tune this value const size_t max_tries = my_scheduler->num_threads(); // TODO: Tune this value
for (size_t i = 0; i < max_tries; i++) { for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % my_scheduler->num_threads(); size_t target = (offset + i) % my_scheduler->num_threads();
if (target == my_id) { if (target == my_id) {
...@@ -23,8 +23,9 @@ bool abstract_task::steal_work() { ...@@ -23,8 +23,9 @@ bool abstract_task::steal_work() {
} }
auto target_state = my_scheduler->thread_state_for(target); auto target_state = my_scheduler->thread_state_for(target);
// TODO: Cleaner Locking Using std::guarded_lock if (!target_state->lock_.reader_try_lock()) {
target_state->lock_.lock(); continue;
}
// Dig down to our level // Dig down to our level
PROFILE_STEALING("Go to our level") PROFILE_STEALING("Go to our level")
...@@ -42,7 +43,7 @@ bool abstract_task::steal_work() { ...@@ -42,7 +43,7 @@ bool abstract_task::steal_work() {
current_task->depth_ == depth_) { current_task->depth_ == depth_) {
if (internal_stealing(current_task)) { if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler // internal steal was a success, hand it back to the internal scheduler
target_state->lock_.unlock(); target_state->lock_.reader_unlock();
return true; return true;
} }
...@@ -59,14 +60,14 @@ bool abstract_task::steal_work() { ...@@ -59,14 +60,14 @@ bool abstract_task::steal_work() {
while (current_task != nullptr) { while (current_task != nullptr) {
auto lock = &target_state->lock_; auto lock = &target_state->lock_;
if (current_task->split_task(lock)) { if (current_task->split_task(lock)) {
// internal steal was no success (we did a top level task steal) // top level steal was a success (we did a top level task steal)
return false; return false;
} }
current_task = current_task->child_task_; current_task = current_task->child_task_;
} }
PROFILE_END_BLOCK; PROFILE_END_BLOCK;
target_state->lock_.unlock(); target_state->lock_.reader_unlock();
} }
// internal steal was no success // internal steal was no success
......
...@@ -93,7 +93,7 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) { ...@@ -93,7 +93,7 @@ bool fork_join_task::internal_stealing(abstract_task *other_task) {
} }
} }
bool fork_join_task::split_task(base::spin_lock *lock) { bool fork_join_task::split_task(base::swmr_spin_lock *lock) {
PROFILE_STEALING("fork_join_task::split_task") PROFILE_STEALING("fork_join_task::split_task")
fork_join_sub_task *stolen_sub_task = get_stolen_sub_task(); fork_join_sub_task *stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) { if (stolen_sub_task == nullptr) {
...@@ -102,8 +102,7 @@ bool fork_join_task::split_task(base::spin_lock *lock) { ...@@ -102,8 +102,7 @@ bool fork_join_task::split_task(base::spin_lock *lock) {
fork_join_task task{stolen_sub_task, this->unique_id()}; fork_join_task task{stolen_sub_task, this->unique_id()};
// In success case, unlock. // In success case, unlock.
// TODO: this locking is complicated and error prone. lock->reader_unlock();
lock->unlock();
scheduler::execute_task(task, depth()); scheduler::execute_task(task, depth());
return true; return true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment