diff --git a/CMakeLists.txt b/CMakeLists.txt index 0a66e63..bc061e3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,6 +35,7 @@ add_subdirectory(app/test_for_new) add_subdirectory(app/invoke_parallel) add_subdirectory(app/benchmark_fft) add_subdirectory(app/benchmark_unbalanced) +add_subdirectory(app/benchmark_matrix) # Add optional tests option(PACKAGE_TESTS "Build the tests" ON) diff --git a/PERFORMANCE.md b/PERFORMANCE.md index f9cc49e..343c880 100644 --- a/PERFORMANCE.md +++ b/PERFORMANCE.md @@ -3,6 +3,10 @@ #### Commit 52fcb51f - Add basic random stealing Slight improvement, needs further measurement after removing more important bottlenecks. +Below are three individual measurements of the difference. +Overall the trend (sum of all numbers/last number), +go down (98.7%, 96.9% and 100.6%), but with the one measurement +above 100% we think the improvements are minor. | | | | | | | | | | | | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | @@ -28,13 +32,45 @@ Big improvements of about 6% in our test. This seems like a little, but 6% from the scheduler is a lot, as the 'main work' is the tasks itself, not the scheduler. +This change unsurprisingly yields the biggest improvement yet. + +#### Commit b9bb90a4 - Try to figure out the 'high thread bottleneck' + +We are currently seeing good performance on low core counts +(up to 1/2 of the machines cores), but after that performance +plumishes: + +Bana-Pi Best-Case: + + + +Bana-Pi Average-Case: + + + +Laptop Best-Case: + + + +Laptop Average-Case: + + + + +As we can see, in average the performance of PLS starts getting +way worse than TBB and EMBB after 4 cores. We suspect this is due +to contemption, but could not resolve it with any combination +of `tas_spinlock` vs `ttas_spinlock` and `lock` vs `try_lock`. + +This issue clearly needs further investigation. + ### Commit aa27064 - Performance with ttsa spinlocks (and 'full blocking' top level) - + ### Commit d16ad3e - Performance with rw-lock and backoff - + ### Commit 18b2d744 - Performance with lock-free deque @@ -45,7 +81,7 @@ locks we even saw a slight drop in performance). Currently the FFT benchmark shows the following results (average): - + We want to positively note that the overall trend of 'performance drops' at the hyperthreading mark is not really bad anymore, it rather @@ -58,7 +94,7 @@ This is discouraging after many tests. To see where the overhead lies we also implemented the unbalanced tree search benchmark, resulting in the following, suprisingly good, results (average): - + The main difference between the two benchmarks is, that the second one has more work and the work is relatively independent. @@ -77,12 +113,63 @@ down to our level and solely use internal stealing. Average results FFT: - + Average results Unbalanced: - + There seems to be only a minor performance difference between the two, suggesting tha our two-level approach is not the part causing our weaker performance. + +### Commit afd0331b - Some notes on scaling problems + +After tweaking individual values and parameters we can still not find +the main cause for our slowdown on multiple processors. +We also use intel's vtune amplifier to measure performance on our run +and find that we always spend way too much time 'waiting for work', +e.g. in the backoff mechanism when enabled or in the locks for stealing +work when backoff is disabled. This leads us to believe that our problems +might be connected to some issue with work distribution on the FFT case, +as the unbalanced tree search (with a lot 'local' work) performs good. + +To get more data in we add benchmarks on matrix multiplication implemented +in two fashions: once with a 'native' array stealing task and once with +a fork-join task. Both implementations use the same minimum array +sub-size of 4 elements and we can hopefully see if they have any +performance differences. + +Best case fork-join: + + + +Average case fork-join: + + + +Best case Native: + + + +Average case Native: + + + +What we find very interesting is, that the best case times of our +pls library are very fast (as good as TBB), but the average times +drop badly. We currently do not know why this is the case. + +### Commit afd0331b - Intel VTune Amplifier + +We did serval measurements with intel's VTune Amplifier profiling +tool. The main thing that we notice is, that the cycles per instruction +for our useful work blocks increase, thus requiring more CPU time +for the acutal useful work. + +We also measured an implementation using TBB and found no significante +difference, e.g. TBB also has a higher CPI with 8 threads. +Our conclusion after this long hunting for performance is, that we +might just be bound by some general performance issues with our code. +The next step will therefore be to read the other frameworks and our +code carefully, trying to find potential issues. diff --git a/README.md b/README.md index c42781f..75d5d8a 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,10 @@ project setup that uses the PLS library. Further [general notes](NOTES.md) and [performance notes](PERFORMANCE.md) can be found in their respective files. +Further notes on [performance](PERFORMANCE.md) and general +[notes](NOTES.md) on the development progress can be found in +the linked documents. + ### Installation Clone the repository and open a terminal session in its folder. diff --git a/app/benchmark_fft/main.cpp b/app/benchmark_fft/main.cpp index f6ed20e..cbb8445 100644 --- a/app/benchmark_fft/main.cpp +++ b/app/benchmark_fft/main.cpp @@ -80,7 +80,7 @@ int main() { pls::internal::helpers::run_mini_benchmark([&] { complex_vector input = initial_input; fft(input.begin(), input.size()); - }, 8, 4000); + }, 8, 1000); PROFILE_SAVE("test_profile.prof") } diff --git a/app/benchmark_matrix/CMakeLists.txt b/app/benchmark_matrix/CMakeLists.txt new file mode 100644 index 0000000..0245a5b --- /dev/null +++ b/app/benchmark_matrix/CMakeLists.txt @@ -0,0 +1,5 @@ +add_executable(benchmark_matrix main.cpp) +target_link_libraries(benchmark_matrix pls) +if (EASY_PROFILER) + target_link_libraries(benchmark_matrix easy_profiler) +endif () diff --git a/app/benchmark_matrix/main.cpp b/app/benchmark_matrix/main.cpp new file mode 100644 index 0000000..c633820 --- /dev/null +++ b/app/benchmark_matrix/main.cpp @@ -0,0 +1,82 @@ +#include +#include +#include + +#include + +const int MATRIX_SIZE = 128; + +template +class matrix { + public: + T data[SIZE][SIZE]; + + matrix(T i = 1) { + std::fill(&data[0][0], &data[0][0] + SIZE * SIZE, i); + } + + void multiply(const matrix &a, const matrix &b) { + auto range = boost::irange(0, SIZE); + pls::algorithm::parallel_for(range.begin(), range.end(), [&](int i) { + this->multiply_column(i, a, b); + }); + } + + private: + void multiply_column(int i, const matrix &a, const matrix &b) { + for (int j = 0; j < SIZE; ++j) { + data[i][j] = 0; + } + for (int k = 0; k < SIZE; ++k) { + for (int j = 0; j < SIZE; ++j) { + data[i][j] += a.data[i][k] * b.data[k][j]; + } + } + } +}; + +void fill_with_data(matrix &a, matrix &b) { + // Fill in some data... + for (int i = 0; i < MATRIX_SIZE; i++) { + for (int j = 0; j < MATRIX_SIZE; j++) { + a.data[i][j] = i; + b.data[i][j] = j; + } + } +} + +int main() { + PROFILE_ENABLE + matrix a; + matrix b; + matrix result; + fill_with_data(a, b); + + pls::internal::helpers::run_mini_benchmark([&] { + result.multiply(a, b); + }, 8, 1000); + + PROFILE_SAVE("test_profile.prof") +} + +//int main() { +// PROFILE_ENABLE +// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18}; +// pls::scheduler scheduler{&my_scheduler_memory, 8}; +// +// matrix a; +// matrix b; +// matrix result; +// fill_with_data(a, b); +// +// scheduler.perform_work([&] { +// PROFILE_MAIN_THREAD +// for (int i = 0; i < 10; i++) { +// PROFILE_WORK_BLOCK("Top Level") +// result.multiply(a, b); +// } +// }); +// +// PROFILE_SAVE("test_profile.prof") +//} + diff --git a/lib/pls/CMakeLists.txt b/lib/pls/CMakeLists.txt index 52c5710..d834402 100644 --- a/lib/pls/CMakeLists.txt +++ b/lib/pls/CMakeLists.txt @@ -4,6 +4,8 @@ add_library(pls STATIC include/pls/algorithms/invoke_parallel.h include/pls/algorithms/invoke_parallel_impl.h + include/pls/algorithms/parallel_for.h + include/pls/algorithms/parallel_for_impl.h include/pls/internal/base/spin_lock.h include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp @@ -15,12 +17,12 @@ add_library(pls STATIC include/pls/internal/base/system_details.h include/pls/internal/base/error_handling.h include/pls/internal/base/alignment.h src/internal/base/alignment.cpp - include/pls/internal/base/backoff.h include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp include/pls/internal/data_structures/aligned_stack_impl.h include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp - include/pls/internal/data_structures/work_stealing_deque.h + include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h + include/pls/internal/data_structures/stamped_integer.h include/pls/internal/helpers/prohibit_new.h include/pls/internal/helpers/profiler.h @@ -35,7 +37,8 @@ add_library(pls STATIC include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp - ) + include/pls/internal/scheduling/parallel_iterator_task.h include/pls/internal/scheduling/parallel_iterator_task_impl.h + src/internal/scheduling/parallel_iterator_task.cpp) # Add everything in `./include` to be in the include path of this project target_include_directories(pls diff --git a/lib/pls/include/pls/algorithms/parallel_for.h b/lib/pls/include/pls/algorithms/parallel_for.h new file mode 100644 index 0000000..58334fe --- /dev/null +++ b/lib/pls/include/pls/algorithms/parallel_for.h @@ -0,0 +1,18 @@ + +#ifndef PLS_PARALLEL_FOR_H +#define PLS_PARALLEL_FOR_H + +namespace pls { +namespace algorithm { + +template +void parallel_for(RandomIt first, RandomIt last, const Function &function); + +template +void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function); + +} +} +#include "parallel_for_impl.h" + +#endif //PLS_PARALLEL_FOR_H diff --git a/lib/pls/include/pls/algorithms/parallel_for_impl.h b/lib/pls/include/pls/algorithms/parallel_for_impl.h new file mode 100644 index 0000000..5b79468 --- /dev/null +++ b/lib/pls/include/pls/algorithms/parallel_for_impl.h @@ -0,0 +1,69 @@ + +#ifndef PLS_PARALLEL_FOR_IMPL_H +#define PLS_PARALLEL_FOR_IMPL_H + +#include "pls/internal/scheduling/fork_join_task.h" +#include "pls/internal/scheduling/parallel_iterator_task.h" +#include "pls/internal/scheduling/scheduler.h" +#include "pls/internal/scheduling/scheduler.h" + +#include "pls/internal/helpers/unique_id.h" + +namespace pls { +namespace algorithm { +namespace internal { +template +void parallel_for(RandomIt first, RandomIt last, const Function &function) { + using namespace ::pls::internal::scheduling; + using namespace ::pls::internal::helpers; + using namespace ::pls::internal::base; + constexpr long min_elements = 4; + + long num_elements = std::distance(first, last); + if (num_elements <= min_elements) { + // calculate last elements in loop to avoid overhead + for (auto current = first; current != last; current++) { + function(*current); + } + } else { + // Cut in half recursively + long middle_index = num_elements / 2; + + auto body = [=] { internal::parallel_for(first + middle_index, last, function); }; + fork_join_lambda_by_reference second_half_task(body); + fork_join_sub_task::current()->spawn_child(second_half_task); + + parallel_for(first, first + middle_index, function); + fork_join_sub_task::current()->wait_for_all(); + } +} +} + +template +void parallel_for(RandomIt first, RandomIt last, const Function &function) { + using namespace ::pls::internal::scheduling; + using namespace ::pls::internal::helpers; + using namespace ::pls::internal::base; + static abstract_task::id id = unique_id::create(); + + parallel_iterator_task iterator_task{first, last, function, id}; + scheduler::execute_task(iterator_task); +} + +template +void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function) { + using namespace ::pls::internal::scheduling; + using namespace ::pls::internal::helpers; + using namespace ::pls::internal::base; + static abstract_task::id id = unique_id::create(); + + auto body = [=] { internal::parallel_for(first, last, function); }; + fork_join_lambda_by_reference root_body(body); + fork_join_task root_task{&root_body, id}; + scheduler::execute_task(root_task); +} + +} +} + +#endif //PLS_INVOKE_PARALLEL_IMPL_H diff --git a/lib/pls/include/pls/internal/base/alignment.h b/lib/pls/include/pls/internal/base/alignment.h index 5797932..2d05e07 100644 --- a/lib/pls/include/pls/internal/base/alignment.h +++ b/lib/pls/include/pls/internal/base/alignment.h @@ -25,26 +25,6 @@ char *next_alignment(char *pointer); } -template -struct aligned_aba_pointer { - const system_details::pointer_t pointer_; - - explicit aligned_aba_pointer(T *pointer, unsigned int aba = 0) : pointer_{ - reinterpret_cast(pointer) + aba} {} - - T *pointer() const { - return reinterpret_cast(pointer_ & system_details::CACHE_LINE_ADDRESS_USED_BITS); - } - - unsigned int aba() const { - return pointer_ & system_details::CACHE_LINE_ADDRESS_UNUSED_BITS; - } - - aligned_aba_pointer set_aba(unsigned int aba) const { - return aligned_aba_pointer(pointer(), aba); - } -}; - } } } diff --git a/lib/pls/include/pls/internal/base/backoff.h b/lib/pls/include/pls/internal/base/backoff.h index 9e7cbf5..45ca814 100644 --- a/lib/pls/include/pls/internal/base/backoff.h +++ b/lib/pls/include/pls/internal/base/backoff.h @@ -14,7 +14,7 @@ namespace internal { namespace base { class backoff { - const unsigned long INITIAL_SPIN_ITERS = 2u << 4u; + const unsigned long INITIAL_SPIN_ITERS = 2u << 1u; const unsigned long MAX_SPIN_ITERS = 2u << 8u; const unsigned long MAX_ITERS = 2u << 10u; const unsigned long YELD_ITERS = 2u << 10u; @@ -36,7 +36,7 @@ class backoff { if (current_ >= YELD_ITERS) { PROFILE_LOCK("Yield") - this_thread::yield(); + this_thread::sleep(5); } current_ = std::min(current_ * 2, MAX_ITERS); diff --git a/lib/pls/include/pls/internal/base/system_details.h b/lib/pls/include/pls/internal/base/system_details.h index 92e4c65..17b51d7 100644 --- a/lib/pls/include/pls/internal/base/system_details.h +++ b/lib/pls/include/pls/internal/base/system_details.h @@ -24,26 +24,18 @@ namespace system_details { * pointer_t should be an integer type capable of holding ANY pointer value. */ using pointer_t = std::uintptr_t; -constexpr pointer_t ZERO_POINTER = 0; -constexpr pointer_t MAX_POINTER = ~ZERO_POINTER; /** * Biggest type that supports atomic CAS operations. * Usually it is sane to assume a pointer can be swapped in a single CAS operation. */ -using cas_integer = pointer_t; -constexpr cas_integer MIN_CAS_INTEGER = 0; -constexpr cas_integer MAX_CAS_INTEGER = ~MIN_CAS_INTEGER; -constexpr cas_integer FIRST_HALF_CAS_INTEGER = MAX_CAS_INTEGER << ((sizeof(cas_integer) / 2) * 8); -constexpr cas_integer SECOND_HALF_CAS_INTEGER = ~FIRST_HALF_CAS_INTEGER; +using cas_integer = std::uintptr_t; +constexpr unsigned long CAS_SIZE = sizeof(cas_integer); /** * Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings). */ -constexpr unsigned int CACHE_LINE_ADDRESS_BITS = 6; -constexpr pointer_t CACHE_LINE_SIZE = 2u << (CACHE_LINE_ADDRESS_BITS - 1); -constexpr pointer_t CACHE_LINE_ADDRESS_USED_BITS = MAX_POINTER << CACHE_LINE_ADDRESS_BITS; -constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BITS; +constexpr pointer_t CACHE_LINE_SIZE = 64; /** * Choose one of the following ways to store thread specific data. @@ -60,15 +52,9 @@ constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BI * * Choose the implementation appropriate for your compiler-cpu combination. */ -#if (COMPILER == MVCC) -inline void relax_cpu() { - _mm_pause(); -} -#elif (COMPILER == GCC || COMPILER == LLVM) inline void relax_cpu() { - asm("pause"); + asm volatile("pause":: : "memory"); } -#endif } } diff --git a/lib/pls/include/pls/internal/data_structures/stamped_integer.h b/lib/pls/include/pls/internal/data_structures/stamped_integer.h new file mode 100644 index 0000000..a24bcfa --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/stamped_integer.h @@ -0,0 +1,27 @@ + +#ifndef PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_ +#define PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_ + +#include "pls/internal/base/system_details.h" + +namespace pls { +namespace internal { +namespace data_structures { + +constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2; +struct stamped_integer { + using member_t = base::system_details::cas_integer; + + member_t stamp:HALF_CACHE_LINE; + member_t value:HALF_CACHE_LINE; + + stamped_integer() : stamp{0}, value{0} {}; + stamped_integer(member_t new_value) : stamp{0}, value{new_value} {}; + stamped_integer(member_t new_stamp, member_t new_value) : stamp{new_stamp}, value{new_value} {}; +}; + +} +} +} + +#endif //PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_ diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h index 23f734d..5c8ce86 100644 --- a/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque.h @@ -3,12 +3,10 @@ #define PLS_WORK_STEALING_DEQUE_H_ #include -#include -#include -#include "pls/internal/base/system_details.h" -#include "pls/internal/base/spin_lock.h" #include "pls/internal/base/error_handling.h" +#include "pls/internal/data_structures/stamped_integer.h" +#include "pls/internal/scheduling/thread_state.h" #include "aligned_stack.h" @@ -16,30 +14,22 @@ namespace pls { namespace internal { namespace data_structures { -using cas_integer = base::system_details::cas_integer; -using pointer_t = base::system_details::pointer_t; -static cas_integer get_stamp(cas_integer n) { - return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8); -} -static cas_integer get_offset(cas_integer n) { - return n & base::system_details::SECOND_HALF_CAS_INTEGER; -} -static cas_integer set_stamp(cas_integer n, cas_integer new_value) { - return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER); -} -//static cas_integer set_offset(cas_integer n, cas_integer new_value) { -// return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER); -//} +using base::system_details::pointer_t; + +// Integer split into two halfs, can be used in CAS operations +using data_structures::stamped_integer; +using offset_t = stamped_integer::member_t; +// Single Item in the deque class work_stealing_deque_item { // Pointer to the actual data pointer_t data_; // Index (relative to stack base) to the next and previous element - cas_integer next_item_; - cas_integer previous_item_; + offset_t next_item_; + offset_t previous_item_; public: - work_stealing_deque_item() : data_{0}, next_item_{0}, previous_item_{0} {} + work_stealing_deque_item() : data_{0}, next_item_{}, previous_item_{} {} template Item *data() { @@ -51,18 +41,11 @@ class work_stealing_deque_item { data_ = reinterpret_cast(data); } - cas_integer next_item() { - return next_item_; - } - void set_next_item(cas_integer next_item) { - next_item_ = next_item; - } - cas_integer previous_item() { - return previous_item_; - } - void set_previous_item(cas_integer previous_item) { - previous_item_ = previous_item; - } + offset_t next_item() const { return next_item_; } + void set_next_item(offset_t next_item) { next_item_ = next_item; } + + offset_t previous_item() const { return previous_item_; } + void set_previous_item(offset_t previous_item) { previous_item_ = previous_item; } }; static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE, "Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!"); @@ -74,19 +57,16 @@ class work_stealing_deque { aligned_stack *stack_; pointer_t base_pointer_; - std::atomic head_; - std::atomic tail_; - cas_integer previous_tail_; - - base::spin_lock lock_{}; // TODO: Remove after debugging - + std::atomic head_; + std::atomic tail_; + offset_t previous_tail_; public: using state = aligned_stack::state; explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, base_pointer_{0}, - head_{0}, + head_{stamped_integer{0, 0}}, tail_{0}, previous_tail_{0} { reset_base_pointer(); @@ -97,144 +77,25 @@ class work_stealing_deque { tail_{other.tail_.load()}, previous_tail_{other.previous_tail_} {} - void reset_base_pointer() { - base_pointer_ = reinterpret_cast(stack_->save_state()); // Keep the base of our region in the stack - } - - work_stealing_deque_item *item_at(cas_integer position) { - return reinterpret_cast(base_pointer_ - + (base::system_details::CACHE_LINE_SIZE * position)); - } - - cas_integer current_stack_offset() { - return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE; - } + void reset_base_pointer(); + work_stealing_deque_item *item_at(offset_t offset); + offset_t current_stack_offset(); template - std::pair *allocate_item(const T &new_item) { - // 'Union' type to push both on stack - using pair_t = std::pair; - // Allocate space on stack - auto new_pair = reinterpret_cast(stack_->push()); - // Initialize memory on stack - new((void *) &(new_pair->first)) work_stealing_deque_item(); - new((void *) &(new_pair->second)) T(new_item); - - return new_pair; - } + std::pair *allocate_item(const T &new_item); template - Item *push_tail(const T &new_item) { - cas_integer local_tail = tail_; - - auto new_pair = allocate_item(new_item); - // Prepare current tail to point to correct next items - auto tail_deque_item = item_at(local_tail); - tail_deque_item->set_data(&(new_pair->second)); - tail_deque_item->set_next_item(current_stack_offset()); - tail_deque_item->set_previous_item(previous_tail_); - previous_tail_ = local_tail; - - // Linearization point, item appears after this write - cas_integer new_tail = current_stack_offset(); - tail_ = new_tail; - - return &(new_pair->second); - } - - Item *pop_tail() { - cas_integer local_tail = tail_; - cas_integer local_head = head_; - - if (local_tail <= get_offset(local_head)) { - return nullptr; // EMPTY - } - - work_stealing_deque_item *previous_tail_item = item_at(previous_tail_); - cas_integer new_tail = previous_tail_; - previous_tail_ = previous_tail_item->previous_item(); - - // Publish our wish to set the tail back - tail_ = new_tail; - // Get the state of local head AFTER we published our wish - local_head = head_; // Linearization point, outside knows list is empty - - if (get_offset(local_head) < new_tail) { - return previous_tail_item->data(); // Success, enough distance to other threads - } - - if (get_offset(local_head) == new_tail) { - cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1); - // Try competing with consumers by updating the head's stamp value - if (head_.compare_exchange_strong(local_head, new_head)) { - return previous_tail_item->data(); // SUCCESS, we won the competition with other threads - } - } - - // Some other thread either won the competition or it already set the head further than we are - // before we even tried to compete with it. - // Reset the queue into an empty state => head_ = tail_ - tail_ = get_offset(local_head); // ...we give up to the other winning thread - - return nullptr; // EMPTY, we lost the competition with other threads - } - - Item *pop_head() { - cas_integer local_head = head_; - cas_integer local_tail = tail_; - - if (local_tail <= get_offset(local_head)) { - return nullptr; // EMPTY - } - // Load info on current deque item. - // In case we have a race with a new (aba) overwritten item at this position, - // there has to be a competition over the tail -> the stamp increased and our next - // operation will fail anyways! - work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head)); - cas_integer next_item_offset = head_deque_item->next_item(); - Item *head_data_item = head_deque_item->data(); - - // We try to set the head to this new position. - // Possible outcomes: - // 1) no one interrupted us, we win this competition - // 2) other thread took the head, we lose to this - // 3) owning thread removed tail, we lose to this - cas_integer new_head = set_stamp(next_item_offset, get_stamp(local_head) + 1); - if (head_.compare_exchange_strong(local_head, new_head)) { - return head_data_item; // SUCCESS, we won the competition - } - - return nullptr; // EMPTY, we lost the competition - - } - - void release_memory_until(state state) { - cas_integer item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE; + T *push_tail(const T &new_item); + Item *pop_tail(); + Item *pop_head(); - cas_integer local_head = head_; - cas_integer local_tail = tail_; - - stack_->reset_state(state); - - if (item_offset < local_tail) { - tail_ = item_offset; - if (get_offset(local_head) >= local_tail) { - head_ = set_stamp(item_offset, get_stamp(local_head) + 1); - } - } - } - - void release_memory_until(Item *item) { - release_memory_until(reinterpret_cast(item)); - } - - state save_state() { - return stack_->save_state(); - } + void release_memory_until(state state); + state save_state(); }; } } } +#include "work_stealing_deque_impl.h" #endif //PLS_WORK_STEALING_DEQUE_H_ diff --git a/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h b/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h new file mode 100644 index 0000000..95987a5 --- /dev/null +++ b/lib/pls/include/pls/internal/data_structures/work_stealing_deque_impl.h @@ -0,0 +1,155 @@ + +#ifndef PLS_WORK_STEALING_DEQUE_IMPL_H_ +#define PLS_WORK_STEALING_DEQUE_IMPL_H_ + +namespace pls { +namespace internal { +namespace data_structures { + +template +void work_stealing_deque::reset_base_pointer() { + base_pointer_ = reinterpret_cast(stack_->save_state()); // Keep the base of our region in the stack +} + +template +work_stealing_deque_item *work_stealing_deque::item_at(offset_t offset) { + return reinterpret_cast(base_pointer_ + + (base::system_details::CACHE_LINE_SIZE * offset)); +} + +template +offset_t work_stealing_deque::current_stack_offset() { + return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE; +} + +template +template +std::pair *work_stealing_deque::allocate_item(const T &new_item) { + // 'Union' type to push both on stack + using pair_t = std::pair; + // Allocate space on stack + auto new_pair = reinterpret_cast(stack_->push()); + // Initialize memory on stack + new((void *) &(new_pair->first)) work_stealing_deque_item(); + new((void *) &(new_pair->second)) T(new_item); + + return new_pair; +} + +template +template +T *work_stealing_deque::push_tail(const T &new_item) { + static_assert(std::is_same::value || std::is_base_of::value, + "Must only push types of onto work_stealing_deque"); + + offset_t local_tail = tail_; + + auto new_pair = allocate_item(new_item); + // Prepare current tail to point to correct next items + auto tail_deque_item = item_at(local_tail); + tail_deque_item->set_data(&(new_pair->second)); + tail_deque_item->set_next_item(current_stack_offset()); + tail_deque_item->set_previous_item(previous_tail_); + previous_tail_ = local_tail; + + // Linearization point, item appears after this write + offset_t new_tail = current_stack_offset(); + tail_ = new_tail; + + return &(new_pair->second); +} + +template +Item *work_stealing_deque::pop_tail() { + offset_t local_tail = tail_; + stamped_integer local_head = head_; + + if (local_tail <= local_head.value) { + return nullptr; // EMPTY + } + + work_stealing_deque_item *previous_tail_item = item_at(previous_tail_); + offset_t new_tail = previous_tail_; + previous_tail_ = previous_tail_item->previous_item(); + + // Publish our wish to set the tail back + tail_ = new_tail; + // Get the state of local head AFTER we published our wish + local_head = head_; // Linearization point, outside knows list is empty + + if (local_head.value < new_tail) { + return previous_tail_item->data(); // Success, enough distance to other threads + } + + if (local_head.value == new_tail) { + stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail}; + // Try competing with consumers by updating the head's stamp value + if (head_.compare_exchange_strong(local_head, new_head)) { + return previous_tail_item->data(); // SUCCESS, we won the competition with other threads + } + } + + // Some other thread either won the competition or it already set the head further than we are + // before we even tried to compete with it. + // Reset the queue into an empty state => head_ = tail_ + tail_ = local_head.value; // ...we give up to the other winning thread + + return nullptr; // EMPTY, we lost the competition with other threads +} + +template +Item *work_stealing_deque::pop_head() { + stamped_integer local_head = head_; + offset_t local_tail = tail_; + + if (local_tail <= local_head.value) { + return nullptr; // EMPTY + } + // Load info on current deque item. + // In case we have a race with a new (aba) overwritten item at this position, + // there has to be a competition over the tail -> the stamp increased and our next + // operation will fail anyways! + work_stealing_deque_item *head_deque_item = item_at(local_head.value); + offset_t next_item_offset = head_deque_item->next_item(); + Item *head_data_item = head_deque_item->data(); + + // We try to set the head to this new position. + // Possible outcomes: + // 1) no one interrupted us, we win this competition + // 2) other thread took the head, we lose to this + // 3) owning thread removed tail, we lose to this + stamped_integer new_head = stamped_integer{local_head.stamp + 1, next_item_offset}; + if (head_.compare_exchange_strong(local_head, new_head)) { + return head_data_item; // SUCCESS, we won the competition + } + + return nullptr; // EMPTY, we lost the competition +} + +template +void work_stealing_deque::release_memory_until(state state) { + unsigned long item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE; + + stamped_integer local_head = head_; + offset_t local_tail = tail_; + + stack_->reset_state(state); + + if (item_offset < local_tail) { + tail_ = item_offset; + if (local_head.value >= local_tail) { + head_ = stamped_integer{local_head.stamp + 1, item_offset}; + } + } +} + +template +typename work_stealing_deque::state work_stealing_deque::save_state() { + return stack_->save_state(); +} + +} +} +} + +#endif //PLS_WORK_STEALING_DEQUE_IMPL_H_ diff --git a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h new file mode 100644 index 0000000..304df79 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task.h @@ -0,0 +1,45 @@ + +#ifndef PLS_PARALLEL_ITERATOR_TASK_H +#define PLS_PARALLEL_ITERATOR_TASK_H + +#include "pls/internal/data_structures/stamped_integer.h" +#include "abstract_task.h" + +namespace pls { +namespace internal { +namespace scheduling { + +using data_structures::stamped_integer; + +template +class parallel_iterator_task : public abstract_task { + alignas(64) const int step = 8; + + alignas(64) RandomIt first_, last_; + alignas(64) Function function_; + + // External stealing + alignas(64) std::atomic first_index_; + alignas(64) std::atomic to_be_processed_; + alignas(64) std::atomic last_index_; + + alignas(64) parallel_iterator_task *parent_; + + bool steal_front(size_t &stolen_max_index); + bool steal_back(size_t &stolen_first_index, size_t &stolen_last_index); + + protected: + bool internal_stealing(abstract_task *other_task) override; + bool split_task(base::swmr_spin_lock * /*lock*/) override; + + public: + explicit parallel_iterator_task(RandomIt first, RandomIt last, Function function, const abstract_task::id &id); + parallel_iterator_task(const parallel_iterator_task &other); + void execute() override; +}; +} +} +} +#include "parallel_iterator_task_impl.h" + +#endif //PLS_PARALLEL_ITERATOR_TASK_H diff --git a/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h new file mode 100644 index 0000000..f3a2026 --- /dev/null +++ b/lib/pls/include/pls/internal/scheduling/parallel_iterator_task_impl.h @@ -0,0 +1,144 @@ + +#ifndef PLS_PARALLEL_ITERATOR_TASK_IMPL_H +#define PLS_PARALLEL_ITERATOR_TASK_IMPL_H + +#include "scheduler.h" +namespace pls { +namespace internal { +namespace scheduling { +template +parallel_iterator_task::parallel_iterator_task + (RandomIt first, RandomIt last, Function function, const abstract_task::id &id): + abstract_task(0, id), + first_{first}, + last_{last}, + function_{function}, + first_index_{0}, + to_be_processed_{std::distance(first, last)}, + last_index_{stamped_integer{0, std::distance(first, last)}}, + parent_{nullptr} {} + +template +parallel_iterator_task::parallel_iterator_task(const pls::internal::scheduling::parallel_iterator_task< + RandomIt, + Function> &other): + abstract_task{other.depth(), other.unique_id()}, + first_{other.first_}, + last_{other.last_}, + function_{other.function_}, + first_index_{other.first_index_.load()}, + to_be_processed_{other.to_be_processed_.load()}, + last_index_{other.last_index_.load()}, + parent_{other.parent_} {} + +template +void parallel_iterator_task::execute() { + // Start processing at beginning of our data + size_t current_index = 0; + auto current_iterator = first_; + + // Keep going as long as we have data + while (true) { + // Claim next chunk of data for us + size_t local_max_index; + if (!steal_front(local_max_index)) { + break; + } + + // Process Chunk + for (; current_index != local_max_index; current_index++) { + function_(*(current_iterator++)); + } + } + + to_be_processed_ -= current_index; + while (to_be_processed_.load() > 0) + steal_work(); + if (parent_ != nullptr) { + parent_->to_be_processed_ -= std::distance(first_, last_); + } +} + +template +bool parallel_iterator_task::steal_front(size_t &stolen_max) { + auto local_first_index = first_index_.load(); + auto local_last_index = last_index_.load(); + + if (local_first_index >= local_last_index.value) { + return false; + } + + // Proceed the first index == take part of the work for us + auto new_first_index = std::min(local_first_index + step, local_last_index.value); + first_index_ = new_first_index; + // Reload last index + local_last_index = last_index_.load(); + // Enough distance + if (new_first_index < local_last_index.value) { + stolen_max = new_first_index; + return true; + } + + // Fight over last element + if (new_first_index == local_last_index.value) { + auto new_last_index = stamped_integer{local_last_index.stamp + 1, local_last_index.value}; + if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) { + stolen_max = new_first_index; + return true; + } + } + + // All iterator elements are assigned to some executor + return false; +} + +template +bool parallel_iterator_task::steal_back(size_t &stolen_first_index, size_t &stolen_last_index) { + auto local_first_index = first_index_.load(); + auto local_last_index = last_index_.load(); + + if (local_first_index >= local_last_index.value) { + return false; + } + + // Try to steal using cas + auto target_last_index = std::max(local_last_index.value - step, local_first_index); + auto new_last_index = stamped_integer{local_last_index.stamp + 1, target_last_index}; + if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) { + stolen_first_index = new_last_index.value; + stolen_last_index = local_last_index.value; + return true; + } + + return false; +} + +template +bool parallel_iterator_task::split_task(base::swmr_spin_lock *lock) { + auto depth = this->depth(); + auto id = this->unique_id(); + size_t stolen_first_index, stolen_last_index; + if (!steal_back(stolen_first_index, stolen_last_index)) { + lock->reader_unlock(); + return false; + } + + lock->reader_unlock(); + parallel_iterator_task new_task{first_ + stolen_first_index, first_ + stolen_last_index, function_, id}; + new_task.parent_ = this; + scheduler::execute_task(new_task, depth); + + return true; +} + +template +bool parallel_iterator_task::internal_stealing(abstract_task */*other_task*/) { + // Do not allow for now, eases up on ABA problem + return false; +} +} +} +} + +#endif //PLS_PARALLEL_ITERATOR_TASK_IMPL_H diff --git a/lib/pls/include/pls/pls.h b/lib/pls/include/pls/pls.h index b7a218b..accbcc3 100644 --- a/lib/pls/include/pls/pls.h +++ b/lib/pls/include/pls/pls.h @@ -2,6 +2,7 @@ #define PLS_LIBRARY_H #include "pls/algorithms/invoke_parallel.h" +#include "pls/algorithms/parallel_for.h" #include "pls/internal/scheduling/abstract_task.h" #include "pls/internal/scheduling/fork_join_task.h" #include "pls/internal/scheduling/scheduler.h" @@ -23,6 +24,8 @@ using internal::scheduling::fork_join_lambda_by_value; using internal::scheduling::fork_join_task; using algorithm::invoke_parallel; +using algorithm::parallel_for_fork_join; +using algorithm::parallel_for; } diff --git a/lib/pls/src/internal/scheduling/abstract_task.cpp b/lib/pls/src/internal/scheduling/abstract_task.cpp index aeccb1d..85e2c36 100644 --- a/lib/pls/src/internal/scheduling/abstract_task.cpp +++ b/lib/pls/src/internal/scheduling/abstract_task.cpp @@ -10,7 +10,7 @@ namespace internal { namespace scheduling { bool abstract_task::steal_work() { - thread_local static base::backoff backoff{}; +// thread_local static base::backoff backoff{}; PROFILE_STEALING("abstract_task::steal_work") const auto my_state = base::this_thread::state(); @@ -22,7 +22,7 @@ bool abstract_task::steal_work() { for (size_t i = 0; i < max_tries; i++) { size_t target = (offset + i) % my_scheduler->num_threads(); if (target == my_id) { - target = (target + 1) % my_scheduler->num_threads(); + continue; } auto target_state = my_scheduler->thread_state_for(target); @@ -47,7 +47,7 @@ bool abstract_task::steal_work() { if (internal_stealing(current_task)) { // internal steal was a success, hand it back to the internal scheduler target_state->lock_.reader_unlock(); - backoff.reset(); +// backoff.reset(); return true; } @@ -65,7 +65,7 @@ bool abstract_task::steal_work() { auto lock = &target_state->lock_; if (current_task->split_task(lock)) { // top level steal was a success (we did a top level task steal) - backoff.reset(); +// backoff.reset(); return false; } @@ -76,7 +76,8 @@ bool abstract_task::steal_work() { } // internal steal was no success - backoff.do_backoff(); +// backoff.do_backoff(); +// base::this_thread::sleep(5); return false; } diff --git a/lib/pls/src/internal/scheduling/parallel_iterator_task.cpp b/lib/pls/src/internal/scheduling/parallel_iterator_task.cpp new file mode 100644 index 0000000..4b22133 --- /dev/null +++ b/lib/pls/src/internal/scheduling/parallel_iterator_task.cpp @@ -0,0 +1 @@ +#include "pls/internal/scheduling/parallel_iterator_task.h" diff --git a/media/afd0331b_matrix_average_case_fork.png b/media/afd0331b_matrix_average_case_fork.png new file mode 100644 index 0000000..2cd6eb5 Binary files /dev/null and b/media/afd0331b_matrix_average_case_fork.png differ diff --git a/media/afd0331b_matrix_average_case_native.png b/media/afd0331b_matrix_average_case_native.png new file mode 100644 index 0000000..68ca0e5 Binary files /dev/null and b/media/afd0331b_matrix_average_case_native.png differ diff --git a/media/afd0331b_matrix_best_case_fork.png b/media/afd0331b_matrix_best_case_fork.png new file mode 100644 index 0000000..db4c9cf Binary files /dev/null and b/media/afd0331b_matrix_best_case_fork.png differ diff --git a/media/afd0331b_matrix_best_case_native.png b/media/afd0331b_matrix_best_case_native.png new file mode 100644 index 0000000..22ece98 Binary files /dev/null and b/media/afd0331b_matrix_best_case_native.png differ diff --git a/media/b9bb90a4-banana-pi-average-case.png b/media/b9bb90a4-banana-pi-average-case.png new file mode 100644 index 0000000..0d414cb Binary files /dev/null and b/media/b9bb90a4-banana-pi-average-case.png differ diff --git a/media/b9bb90a4-banana-pi-best-case.png b/media/b9bb90a4-banana-pi-best-case.png new file mode 100644 index 0000000..b090449 Binary files /dev/null and b/media/b9bb90a4-banana-pi-best-case.png differ diff --git a/media/b9bb90a4-laptop-average-case.png b/media/b9bb90a4-laptop-average-case.png new file mode 100644 index 0000000..8153ac1 Binary files /dev/null and b/media/b9bb90a4-laptop-average-case.png differ diff --git a/media/b9bb90a4-laptop-best-case.png b/media/b9bb90a4-laptop-best-case.png new file mode 100644 index 0000000..57bb039 Binary files /dev/null and b/media/b9bb90a4-laptop-best-case.png differ diff --git a/test/data_structures_test.cpp b/test/data_structures_test.cpp index 56e4524..97f91ca 100644 --- a/test/data_structures_test.cpp +++ b/test/data_structures_test.cpp @@ -201,11 +201,11 @@ TEST_CASE("work stealing deque stores objects correctly", "[internal/data_struct SECTION("handles stack reset 1 correctly when emptied by tail") { deque.push_tail(one); + auto state = deque.save_state(); deque.push_tail(two); - auto tmp_result = deque.pop_tail(); - REQUIRE(*tmp_result == two); + REQUIRE(*deque.pop_tail() == two); - deque.release_memory_until(tmp_result); + deque.release_memory_until(state); REQUIRE(*deque.pop_tail() == one); deque.push_tail(three);