Commit 4a9ca21d by FritzFlorian

Rework threads to not be template based.

This allows us to more easily handle them and makes their interface closer to std::thread.
parent 65409e0a
Pipeline #1316 failed with stages
in 39 seconds
# Optionally compile with thread sanitizer enabled to find concurrency bugs # Optionally compile with thread sanitizer enabled to find memory bugs
# https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual # https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual
# Add optional sanitizer, off by default # Add optional sanitizer, off by default
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include <functional> #include <functional>
#include <pthread.h> #include <pthread.h>
#include <atomic> #include <atomic>
#include <time.h> #include <ctime>
#include "system_details.h" #include "system_details.h"
...@@ -30,7 +30,6 @@ using thread_entrypoint = void(); ...@@ -30,7 +30,6 @@ using thread_entrypoint = void();
* Current implementation is based on pthreads. * Current implementation is based on pthreads.
*/ */
class this_thread { class this_thread {
template<typename Function, typename State>
friend friend
class thread; class thread;
#ifdef PLS_THREAD_SPECIFIC_PTHREAD #ifdef PLS_THREAD_SPECIFIC_PTHREAD
...@@ -73,40 +72,28 @@ class this_thread { ...@@ -73,40 +72,28 @@ class this_thread {
/** /**
* Abstraction for starting a function in a separate thread. * Abstraction for starting a function in a separate thread.
* * Offers only threading functionality needed in this project,
* @tparam Function Lambda being started on the new thread. * underlying implementation can be changed.
* @tparam State State type held for this thread. * Uses NO heap memory allocation.
*
* usage:
* T* state;
* auto thread = start_thread([] {
* // Run on new thread
* }, state);
* thread.join(); // Wait for it to finish
* *
* PORTABILITY: * PORTABILITY:
* Current implementation is based on pthreads. * Current implementation is based on pthreads.
*/ */
template<typename Function, typename State>
class thread { class thread {
friend class this_thread; friend class this_thread;
// Keep a copy of the function (lambda) in this object to make sure it is valid when called!
Function function_;
State *state_pointer_;
// Wee need to wait for the started function to read
// the function_ and state_pointer_ property before returning
// from the constructor, as the object might be moved after this.
std::atomic_flag *startup_flag_;
// Keep handle to native implementation // Keep handle to native implementation
pthread_t pthread_thread_; pthread_t pthread_thread_;
template<typename Function, typename State>
static void *start_pthread_internal(void *thread_pointer); static void *start_pthread_internal(void *thread_pointer);
public: public:
template<typename Function, typename State>
explicit thread(const Function &function, State *state_pointer); explicit thread(const Function &function, State *state_pointer);
template<typename Function>
explicit thread(const Function &function);
public: public:
void join(); void join();
...@@ -118,11 +105,6 @@ class thread { ...@@ -118,11 +105,6 @@ class thread {
thread &operator=(const thread &) = delete; thread &operator=(const thread &) = delete;
}; };
template<typename Function, typename State>
thread<Function, State> start_thread(const Function &function, State *state_pointer);
template<typename Function>
thread<Function, void> start_thread(const Function &function);
} }
} }
} }
......
...@@ -27,28 +27,32 @@ void this_thread::set_state(T *state_pointer) { ...@@ -27,28 +27,32 @@ void this_thread::set_state(T *state_pointer) {
} }
template<typename Function, typename State> template<typename Function, typename State>
void *thread<Function, State>::start_pthread_internal(void *thread_pointer) { struct thread_arguments {
auto my_thread = reinterpret_cast<thread *>(thread_pointer); Function function_;
Function my_function_copy = my_thread->function_; State *state_;
State *my_state_pointer_copy = my_thread->state_pointer_; std::atomic_flag *startup_flag_;
};
template<typename Function, typename State>
void *thread::start_pthread_internal(void *thread_pointer) {
// Actively copy all arguments into stack memory.
thread_arguments<Function, State>
arguments_copy = *reinterpret_cast<thread_arguments<Function, State> *>(thread_pointer);
// Now we have copies of everything we need on the stack. // Now we have copies of everything we need on the stack.
// The original thread object can be moved freely (no more // The original thread object can be moved freely (no more
// references to its memory location). // references to its memory location).
my_thread->startup_flag_->clear(); arguments_copy.startup_flag_->clear();
this_thread::set_state(my_state_pointer_copy); this_thread::set_state(arguments_copy.state_);
my_function_copy(); arguments_copy.function_();
// Finished executing the user function // Finished executing the user function
pthread_exit(nullptr); pthread_exit(nullptr);
} }
template<typename Function, typename State> template<typename Function, typename State>
thread<Function, State>::thread(const Function &function, State *state_pointer): thread::thread(const Function &function, State *state_pointer):
function_{function},
state_pointer_{state_pointer},
startup_flag_{nullptr},
pthread_thread_{} { pthread_thread_{} {
#ifdef PLS_THREAD_SPECIFIC_PTHREAD #ifdef PLS_THREAD_SPECIFIC_PTHREAD
...@@ -58,29 +62,20 @@ thread<Function, State>::thread(const Function &function, State *state_pointer): ...@@ -58,29 +62,20 @@ thread<Function, State>::thread(const Function &function, State *state_pointer):
} }
#endif #endif
// We only need this during startup, will be destroyed when out of scope // Wee need to wait for the started function to read
// the function_ and state_pointer_ property before returning
// from the constructor, as the object might be moved after this.
std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; std::atomic_flag startup_flag{ATOMIC_FLAG_INIT};
startup_flag_ = &startup_flag;
thread_arguments<Function, State> arguments{function, state_pointer, &startup_flag};
startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return
pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *) (this)); pthread_create(&pthread_thread_, nullptr, start_pthread_internal < Function, State > , (void *) (&arguments));
while (startup_flag.test_and_set()); // Busy waiting for the starting flag to clear while (startup_flag.test_and_set()); // Busy waiting for the starting flag to clear
} }
template<typename Function, typename State>
void thread<Function, State>::join() {
pthread_join(pthread_thread_, nullptr);
}
template<typename Function, typename State>
thread<Function, State> start_thread(const Function &function, State *state_pointer) {
return thread<Function, State>(function, state_pointer);
}
template<typename Function> template<typename Function>
thread<Function, void> start_thread(const Function &function) { thread::thread(const Function &function): thread{function, (void *) nullptr} {}
return thread<Function, void>(function, nullptr);
}
} }
} }
......
...@@ -20,8 +20,6 @@ namespace pls { ...@@ -20,8 +20,6 @@ namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
/** /**
* The scheduler is the central part of the dispatching-framework. * The scheduler is the central part of the dispatching-framework.
* It manages a pool of worker threads (creates, sleeps/wakes up, destroys) * It manages a pool of worker threads (creates, sleeps/wakes up, destroys)
......
...@@ -11,19 +11,18 @@ namespace internal { ...@@ -11,19 +11,18 @@ namespace internal {
namespace scheduling { namespace scheduling {
void worker_routine(); void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler_memory { class scheduler_memory {
private: private:
size_t max_threads_; size_t max_threads_;
thread_state **thread_states_; thread_state **thread_states_;
scheduler_thread **threads_; base::thread **threads_;
data_structures::aligned_stack **task_stacks_; data_structures::aligned_stack **task_stacks_;
protected: protected:
void init(size_t max_therads, void init(size_t max_therads,
thread_state **thread_states, thread_state **thread_states,
scheduler_thread **threads, base::thread **threads,
data_structures::aligned_stack **task_stacks) { data_structures::aligned_stack **task_stacks) {
max_threads_ = max_therads; max_threads_ = max_therads;
thread_states_ = thread_states; thread_states_ = thread_states;
...@@ -38,7 +37,7 @@ class scheduler_memory { ...@@ -38,7 +37,7 @@ class scheduler_memory {
thread_state *thread_state_for(size_t id) const { thread_state *thread_state_for(size_t id) const {
return thread_states_[id]; return thread_states_[id];
} }
scheduler_thread *thread_for(size_t id) const { base::thread *thread_for(size_t id) const {
return threads_[id]; return threads_[id];
} }
data_structures::aligned_stack *task_stack_for(size_t id) const { data_structures::aligned_stack *task_stack_for(size_t id) const {
...@@ -51,7 +50,7 @@ class static_scheduler_memory : public scheduler_memory { ...@@ -51,7 +50,7 @@ class static_scheduler_memory : public scheduler_memory {
// Everyone of these types has to live on its own cache line, // Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently. // as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these. // Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>; using aligned_thread = base::alignment::aligned_wrapper<base::thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>; using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_thread_stack = base::alignment::aligned_wrapper<std::array<char, TASK_STACK_SIZE>>; using aligned_thread_stack = base::alignment::aligned_wrapper<std::array<char, TASK_STACK_SIZE>>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>; using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
...@@ -63,7 +62,7 @@ class static_scheduler_memory : public scheduler_memory { ...@@ -63,7 +62,7 @@ class static_scheduler_memory : public scheduler_memory {
std::array<aligned_aligned_stack, MAX_THREADS> task_stacks_; std::array<aligned_aligned_stack, MAX_THREADS> task_stacks_;
// References for parent // References for parent
std::array<scheduler_thread *, MAX_THREADS> thread_refs_; std::array<base::thread *, MAX_THREADS> thread_refs_;
std::array<thread_state *, MAX_THREADS> thread_state_refs_; std::array<thread_state *, MAX_THREADS> thread_state_refs_;
std::array<data_structures::aligned_stack *, MAX_THREADS> task_stack_refs_; std::array<data_structures::aligned_stack *, MAX_THREADS> task_stack_refs_;
...@@ -86,7 +85,7 @@ class malloc_scheduler_memory : public scheduler_memory { ...@@ -86,7 +85,7 @@ class malloc_scheduler_memory : public scheduler_memory {
// Everyone of these types has to live on its own cache line, // Everyone of these types has to live on its own cache line,
// as each thread uses one of them independently. // as each thread uses one of them independently.
// Therefore it would be a major performance hit if we shared cache lines on these. // Therefore it would be a major performance hit if we shared cache lines on these.
using aligned_thread = base::alignment::aligned_wrapper<scheduler_thread>; using aligned_thread = base::alignment::aligned_wrapper<base::thread>;
using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>; using aligned_thread_state = base::alignment::aligned_wrapper<thread_state>;
using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>; using aligned_aligned_stack = base::alignment::aligned_wrapper<data_structures::aligned_stack>;
...@@ -99,7 +98,7 @@ class malloc_scheduler_memory : public scheduler_memory { ...@@ -99,7 +98,7 @@ class malloc_scheduler_memory : public scheduler_memory {
aligned_aligned_stack *task_stacks_; aligned_aligned_stack *task_stacks_;
// References for parent // References for parent
scheduler_thread **thread_refs_; base::thread **thread_refs_;
thread_state **thread_state_refs_; thread_state **thread_state_refs_;
data_structures::aligned_stack **task_stack_refs_; data_structures::aligned_stack **task_stack_refs_;
......
...@@ -25,14 +25,6 @@ struct thread_state { ...@@ -25,14 +25,6 @@ struct thread_state {
alignas(base::system_details::CACHE_LINE_SIZE) size_t id_; alignas(base::system_details::CACHE_LINE_SIZE) size_t id_;
alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_;
thread_state() :
scheduler_{nullptr},
current_task_{nullptr},
task_stack_{nullptr},
deque_{task_stack_},
id_{0},
random_{id_} {};
thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) : thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) :
scheduler_{scheduler}, scheduler_{scheduler},
current_task_{nullptr}, current_task_{nullptr},
......
...@@ -11,7 +11,10 @@ bool this_thread::local_storage_key_initialized_; ...@@ -11,7 +11,10 @@ bool this_thread::local_storage_key_initialized_;
#ifdef PLS_THREAD_SPECIFIC_COMPILER #ifdef PLS_THREAD_SPECIFIC_COMPILER
__thread void *this_thread::local_state_; __thread void *this_thread::local_state_;
#endif #endif
// implementation in header (C++ templating)
void thread::join() {
pthread_join(pthread_thread_, nullptr);
}
} }
} }
......
...@@ -26,8 +26,7 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads, b ...@@ -26,8 +26,7 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads, b
if (reuse_thread && i == 0) { if (reuse_thread && i == 0) {
continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one.
} }
new((void *) memory_->thread_for(i))base::thread<void (*)(), thread_state>(&scheduler::worker_routine, new((void *) memory_->thread_for(i))base::thread(&scheduler::worker_routine, memory_->thread_state_for(i));
memory_->thread_state_for(i));
} }
} }
......
...@@ -15,7 +15,7 @@ malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const ...@@ -15,7 +15,7 @@ malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const
num_threads * sizeof(aligned_aligned_stack))); num_threads * sizeof(aligned_aligned_stack)));
task_stacks_memory_ = reinterpret_cast<char **>(base::alignment::allocate_aligned(num_threads * sizeof(char *))); task_stacks_memory_ = reinterpret_cast<char **>(base::alignment::allocate_aligned(num_threads * sizeof(char *)));
thread_refs_ = static_cast<scheduler_thread **>(malloc(num_threads * sizeof(scheduler_thread *))); thread_refs_ = static_cast<base::thread **>(malloc(num_threads * sizeof(base::thread *)));
thread_state_refs_ = static_cast<thread_state **>(malloc(num_threads * sizeof(thread_state *))); thread_state_refs_ = static_cast<thread_state **>(malloc(num_threads * sizeof(thread_state *)));
task_stack_refs_ = task_stack_refs_ =
static_cast<data_structures::aligned_stack **>(malloc(num_threads * sizeof(data_structures::aligned_stack *))); static_cast<data_structures::aligned_stack **>(malloc(num_threads * sizeof(data_structures::aligned_stack *)));
......
add_executable(tests add_executable(tests
main.cpp main.cpp
data_structures_test.cpp data_structures_test.cpp
base_tests.cpp
scheduling_tests.cpp scheduling_tests.cpp
algorithm_test.cpp algorithm_test.cpp
dataflow_test.cpp) dataflow_test.cpp)
......
...@@ -15,7 +15,7 @@ static vector<int> base_tests_local_value_two; ...@@ -15,7 +15,7 @@ static vector<int> base_tests_local_value_two;
TEST_CASE("thread creation and joining", "[internal/data_structures/thread.h]") { TEST_CASE("thread creation and joining", "[internal/data_structures/thread.h]") {
base_tests_visited = false; base_tests_visited = false;
auto t1 = start_thread([]() { base_tests_visited = true; }); thread t1{[]() { base_tests_visited = true; }};
t1.join(); t1.join();
REQUIRE(base_tests_visited); REQUIRE(base_tests_visited);
...@@ -25,8 +25,8 @@ TEST_CASE("thread state", "[internal/data_structures/thread.h]") { ...@@ -25,8 +25,8 @@ TEST_CASE("thread state", "[internal/data_structures/thread.h]") {
int state_one = 1; int state_one = 1;
vector<int> state_two{1, 2}; vector<int> state_two{1, 2};
auto t1 = start_thread([]() { base_tests_local_value_one = *this_thread::state<int>(); }, &state_one); thread t1{[]() { base_tests_local_value_one = *this_thread::state<int>(); }, &state_one};
auto t2 = start_thread([]() { base_tests_local_value_two = *this_thread::state<vector<int>>(); }, &state_two); thread t2{[]() { base_tests_local_value_two = *this_thread::state<vector<int>>(); }, &state_two};
t1.join(); t1.join();
t2.join(); t2.join();
...@@ -42,20 +42,20 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi ...@@ -42,20 +42,20 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi
spin_lock lock{}; spin_lock lock{};
SECTION("lock can be used by itself") { SECTION("lock can be used by itself") {
auto t1 = start_thread([&]() { thread t1{[&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
lock.lock(); lock.lock();
base_tests_shared_counter++; base_tests_shared_counter++;
lock.unlock(); lock.unlock();
} }
}); }};
auto t2 = start_thread([&]() { thread t2{[&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
lock.lock(); lock.lock();
base_tests_shared_counter--; base_tests_shared_counter--;
lock.unlock(); lock.unlock();
} }
}); }};
t1.join(); t1.join();
t2.join(); t2.join();
...@@ -64,18 +64,18 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi ...@@ -64,18 +64,18 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi
} }
SECTION("lock can be used with std::lock_guard") { SECTION("lock can be used with std::lock_guard") {
auto t1 = start_thread([&]() { thread t1{[&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
std::lock_guard<spin_lock> my_lock{lock}; std::lock_guard<spin_lock> my_lock{lock};
base_tests_shared_counter++; base_tests_shared_counter++;
} }
}); }};
auto t2 = start_thread([&]() { thread t2{[&]() {
for (int i = 0; i < num_iterations; i++) { for (int i = 0; i < num_iterations; i++) {
std::lock_guard<spin_lock> my_lock{lock}; std::lock_guard<spin_lock> my_lock{lock};
base_tests_shared_counter--; base_tests_shared_counter--;
} }
}); }};
t1.join(); t1.join();
t2.join(); t2.join();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment