From 4a9ca21d8a6fa00c3d484543e336470454ab73a9 Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Mon, 16 Sep 2019 11:03:35 +0200 Subject: [PATCH] Rework threads to not be template based. This allows us to more easily handle them and makes their interface closer to std::thread. --- cmake/SetupAddressSanitizer.cmake | 2 +- lib/pls/include/pls/internal/base/thread.h | 36 +++++++++--------------------------- lib/pls/include/pls/internal/base/thread_impl.h | 49 ++++++++++++++++++++++--------------------------- lib/pls/include/pls/internal/scheduling/scheduler.h | 2 -- lib/pls/include/pls/internal/scheduling/scheduler_memory.h | 15 +++++++-------- lib/pls/include/pls/internal/scheduling/thread_state.h | 8 -------- lib/pls/src/internal/base/thread.cpp | 5 ++++- lib/pls/src/internal/scheduling/scheduler.cpp | 3 +-- lib/pls/src/internal/scheduling/scheduler_memory.cpp | 2 +- test/CMakeLists.txt | 1 + test/base_tests.cpp | 22 +++++++++++----------- 11 files changed, 57 insertions(+), 88 deletions(-) diff --git a/cmake/SetupAddressSanitizer.cmake b/cmake/SetupAddressSanitizer.cmake index f2422a7..c75a194 100644 --- a/cmake/SetupAddressSanitizer.cmake +++ b/cmake/SetupAddressSanitizer.cmake @@ -1,4 +1,4 @@ -# Optionally compile with thread sanitizer enabled to find concurrency bugs +# Optionally compile with thread sanitizer enabled to find memory bugs # https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual # Add optional sanitizer, off by default diff --git a/lib/pls/include/pls/internal/base/thread.h b/lib/pls/include/pls/internal/base/thread.h index 3ffdf88..74e38ec 100644 --- a/lib/pls/include/pls/internal/base/thread.h +++ b/lib/pls/include/pls/internal/base/thread.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include "system_details.h" @@ -30,7 +30,6 @@ using thread_entrypoint = void(); * Current implementation is based on pthreads. */ class this_thread { - template friend class thread; #ifdef PLS_THREAD_SPECIFIC_PTHREAD @@ -73,40 +72,28 @@ class this_thread { /** * Abstraction for starting a function in a separate thread. - * - * @tparam Function Lambda being started on the new thread. - * @tparam State State type held for this thread. - * - * usage: - * T* state; - * auto thread = start_thread([] { - * // Run on new thread - * }, state); - * thread.join(); // Wait for it to finish + * Offers only threading functionality needed in this project, + * underlying implementation can be changed. + * Uses NO heap memory allocation. * * PORTABILITY: * Current implementation is based on pthreads. */ -template class thread { friend class this_thread; - // Keep a copy of the function (lambda) in this object to make sure it is valid when called! - Function function_; - State *state_pointer_; - - // Wee need to wait for the started function to read - // the function_ and state_pointer_ property before returning - // from the constructor, as the object might be moved after this. - std::atomic_flag *startup_flag_; - // Keep handle to native implementation pthread_t pthread_thread_; + template static void *start_pthread_internal(void *thread_pointer); public: + template explicit thread(const Function &function, State *state_pointer); + template + explicit thread(const Function &function); + public: void join(); @@ -118,11 +105,6 @@ class thread { thread &operator=(const thread &) = delete; }; -template -thread start_thread(const Function &function, State *state_pointer); -template -thread start_thread(const Function &function); - } } } diff --git a/lib/pls/include/pls/internal/base/thread_impl.h b/lib/pls/include/pls/internal/base/thread_impl.h index 64320b6..498ed2b 100644 --- a/lib/pls/include/pls/internal/base/thread_impl.h +++ b/lib/pls/include/pls/internal/base/thread_impl.h @@ -27,28 +27,32 @@ void this_thread::set_state(T *state_pointer) { } template -void *thread::start_pthread_internal(void *thread_pointer) { - auto my_thread = reinterpret_cast(thread_pointer); - Function my_function_copy = my_thread->function_; - State *my_state_pointer_copy = my_thread->state_pointer_; +struct thread_arguments { + Function function_; + State *state_; + std::atomic_flag *startup_flag_; +}; + +template +void *thread::start_pthread_internal(void *thread_pointer) { + // Actively copy all arguments into stack memory. + thread_arguments + arguments_copy = *reinterpret_cast *>(thread_pointer); // Now we have copies of everything we need on the stack. // The original thread object can be moved freely (no more // references to its memory location). - my_thread->startup_flag_->clear(); + arguments_copy.startup_flag_->clear(); - this_thread::set_state(my_state_pointer_copy); - my_function_copy(); + this_thread::set_state(arguments_copy.state_); + arguments_copy.function_(); // Finished executing the user function pthread_exit(nullptr); } template -thread::thread(const Function &function, State *state_pointer): - function_{function}, - state_pointer_{state_pointer}, - startup_flag_{nullptr}, +thread::thread(const Function &function, State *state_pointer): pthread_thread_{} { #ifdef PLS_THREAD_SPECIFIC_PTHREAD @@ -58,29 +62,20 @@ thread::thread(const Function &function, State *state_pointer): } #endif - // We only need this during startup, will be destroyed when out of scope + // Wee need to wait for the started function to read + // the function_ and state_pointer_ property before returning + // from the constructor, as the object might be moved after this. std::atomic_flag startup_flag{ATOMIC_FLAG_INIT}; - startup_flag_ = &startup_flag; + + thread_arguments arguments{function, state_pointer, &startup_flag}; startup_flag.test_and_set(); // Set the flag, pthread will clear it when it is safe to return - pthread_create(&pthread_thread_, nullptr, start_pthread_internal, (void *) (this)); + pthread_create(&pthread_thread_, nullptr, start_pthread_internal < Function, State > , (void *) (&arguments)); while (startup_flag.test_and_set()); // Busy waiting for the starting flag to clear } -template -void thread::join() { - pthread_join(pthread_thread_, nullptr); -} - -template -thread start_thread(const Function &function, State *state_pointer) { - return thread(function, state_pointer); -} - template -thread start_thread(const Function &function) { - return thread(function, nullptr); -} +thread::thread(const Function &function): thread{function, (void *) nullptr} {} } } diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 0405600..f93bab4 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -20,8 +20,6 @@ namespace pls { namespace internal { namespace scheduling { -using scheduler_thread = base::thread; - /** * The scheduler is the central part of the dispatching-framework. * It manages a pool of worker threads (creates, sleeps/wakes up, destroys) diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h index 81813a6..6552ad1 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_memory.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_memory.h @@ -11,19 +11,18 @@ namespace internal { namespace scheduling { void worker_routine(); -using scheduler_thread = base::thread; class scheduler_memory { private: size_t max_threads_; thread_state **thread_states_; - scheduler_thread **threads_; + base::thread **threads_; data_structures::aligned_stack **task_stacks_; protected: void init(size_t max_therads, thread_state **thread_states, - scheduler_thread **threads, + base::thread **threads, data_structures::aligned_stack **task_stacks) { max_threads_ = max_therads; thread_states_ = thread_states; @@ -38,7 +37,7 @@ class scheduler_memory { thread_state *thread_state_for(size_t id) const { return thread_states_[id]; } - scheduler_thread *thread_for(size_t id) const { + base::thread *thread_for(size_t id) const { return threads_[id]; } data_structures::aligned_stack *task_stack_for(size_t id) const { @@ -51,7 +50,7 @@ class static_scheduler_memory : public scheduler_memory { // Everyone of these types has to live on its own cache line, // as each thread uses one of them independently. // Therefore it would be a major performance hit if we shared cache lines on these. - using aligned_thread = base::alignment::aligned_wrapper; + using aligned_thread = base::alignment::aligned_wrapper; using aligned_thread_state = base::alignment::aligned_wrapper; using aligned_thread_stack = base::alignment::aligned_wrapper>; using aligned_aligned_stack = base::alignment::aligned_wrapper; @@ -63,7 +62,7 @@ class static_scheduler_memory : public scheduler_memory { std::array task_stacks_; // References for parent - std::array thread_refs_; + std::array thread_refs_; std::array thread_state_refs_; std::array task_stack_refs_; @@ -86,7 +85,7 @@ class malloc_scheduler_memory : public scheduler_memory { // Everyone of these types has to live on its own cache line, // as each thread uses one of them independently. // Therefore it would be a major performance hit if we shared cache lines on these. - using aligned_thread = base::alignment::aligned_wrapper; + using aligned_thread = base::alignment::aligned_wrapper; using aligned_thread_state = base::alignment::aligned_wrapper; using aligned_aligned_stack = base::alignment::aligned_wrapper; @@ -99,7 +98,7 @@ class malloc_scheduler_memory : public scheduler_memory { aligned_aligned_stack *task_stacks_; // References for parent - scheduler_thread **thread_refs_; + base::thread **thread_refs_; thread_state **thread_state_refs_; data_structures::aligned_stack **task_stack_refs_; diff --git a/lib/pls/include/pls/internal/scheduling/thread_state.h b/lib/pls/include/pls/internal/scheduling/thread_state.h index 3d5ff63..48a7b29 100644 --- a/lib/pls/include/pls/internal/scheduling/thread_state.h +++ b/lib/pls/include/pls/internal/scheduling/thread_state.h @@ -25,14 +25,6 @@ struct thread_state { alignas(base::system_details::CACHE_LINE_SIZE) size_t id_; alignas(base::system_details::CACHE_LINE_SIZE) std::minstd_rand random_; - thread_state() : - scheduler_{nullptr}, - current_task_{nullptr}, - task_stack_{nullptr}, - deque_{task_stack_}, - id_{0}, - random_{id_} {}; - thread_state(scheduler *scheduler, data_structures::aligned_stack *task_stack, unsigned int id) : scheduler_{scheduler}, current_task_{nullptr}, diff --git a/lib/pls/src/internal/base/thread.cpp b/lib/pls/src/internal/base/thread.cpp index 8c47060..4991952 100644 --- a/lib/pls/src/internal/base/thread.cpp +++ b/lib/pls/src/internal/base/thread.cpp @@ -11,7 +11,10 @@ bool this_thread::local_storage_key_initialized_; #ifdef PLS_THREAD_SPECIFIC_COMPILER __thread void *this_thread::local_state_; #endif -// implementation in header (C++ templating) + +void thread::join() { + pthread_join(pthread_thread_, nullptr); +} } } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 46a4e0a..3280c39 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -26,8 +26,7 @@ scheduler::scheduler(scheduler_memory *memory, const unsigned int num_threads, b if (reuse_thread && i == 0) { continue; // Skip over first/main thread when re-using the users thread, as this one will replace the first one. } - new((void *) memory_->thread_for(i))base::thread(&scheduler::worker_routine, - memory_->thread_state_for(i)); + new((void *) memory_->thread_for(i))base::thread(&scheduler::worker_routine, memory_->thread_state_for(i)); } } diff --git a/lib/pls/src/internal/scheduling/scheduler_memory.cpp b/lib/pls/src/internal/scheduling/scheduler_memory.cpp index b76aaaf..d2764d7 100644 --- a/lib/pls/src/internal/scheduling/scheduler_memory.cpp +++ b/lib/pls/src/internal/scheduling/scheduler_memory.cpp @@ -15,7 +15,7 @@ malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const num_threads * sizeof(aligned_aligned_stack))); task_stacks_memory_ = reinterpret_cast(base::alignment::allocate_aligned(num_threads * sizeof(char *))); - thread_refs_ = static_cast(malloc(num_threads * sizeof(scheduler_thread *))); + thread_refs_ = static_cast(malloc(num_threads * sizeof(base::thread *))); thread_state_refs_ = static_cast(malloc(num_threads * sizeof(thread_state *))); task_stack_refs_ = static_cast(malloc(num_threads * sizeof(data_structures::aligned_stack *))); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9023079..501e622 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,6 +1,7 @@ add_executable(tests main.cpp data_structures_test.cpp + base_tests.cpp scheduling_tests.cpp algorithm_test.cpp dataflow_test.cpp) diff --git a/test/base_tests.cpp b/test/base_tests.cpp index ba3789e..dafdfca 100644 --- a/test/base_tests.cpp +++ b/test/base_tests.cpp @@ -15,7 +15,7 @@ static vector base_tests_local_value_two; TEST_CASE("thread creation and joining", "[internal/data_structures/thread.h]") { base_tests_visited = false; - auto t1 = start_thread([]() { base_tests_visited = true; }); + thread t1{[]() { base_tests_visited = true; }}; t1.join(); REQUIRE(base_tests_visited); @@ -25,8 +25,8 @@ TEST_CASE("thread state", "[internal/data_structures/thread.h]") { int state_one = 1; vector state_two{1, 2}; - auto t1 = start_thread([]() { base_tests_local_value_one = *this_thread::state(); }, &state_one); - auto t2 = start_thread([]() { base_tests_local_value_two = *this_thread::state>(); }, &state_two); + thread t1{[]() { base_tests_local_value_one = *this_thread::state(); }, &state_one}; + thread t2{[]() { base_tests_local_value_two = *this_thread::state>(); }, &state_two}; t1.join(); t2.join(); @@ -42,20 +42,20 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi spin_lock lock{}; SECTION("lock can be used by itself") { - auto t1 = start_thread([&]() { + thread t1{[&]() { for (int i = 0; i < num_iterations; i++) { lock.lock(); base_tests_shared_counter++; lock.unlock(); } - }); - auto t2 = start_thread([&]() { + }}; + thread t2{[&]() { for (int i = 0; i < num_iterations; i++) { lock.lock(); base_tests_shared_counter--; lock.unlock(); } - }); + }}; t1.join(); t2.join(); @@ -64,18 +64,18 @@ TEST_CASE("spinlock protects concurrent counter", "[internal/data_structures/spi } SECTION("lock can be used with std::lock_guard") { - auto t1 = start_thread([&]() { + thread t1{[&]() { for (int i = 0; i < num_iterations; i++) { std::lock_guard my_lock{lock}; base_tests_shared_counter++; } - }); - auto t2 = start_thread([&]() { + }}; + thread t2{[&]() { for (int i = 0; i < num_iterations; i++) { std::lock_guard my_lock{lock}; base_tests_shared_counter--; } - }); + }}; t1.join(); t2.join(); -- libgit2 0.26.0