Commit 116cf4af by Florian Fritz

Merge branch 'parallel_for' into 'master'

Add Parallel For.

See merge request !10
parents 70f72790 0a8b9ebb
Pipeline #1232 passed with stages
in 3 minutes 53 seconds
......@@ -35,6 +35,7 @@ add_subdirectory(app/test_for_new)
add_subdirectory(app/invoke_parallel)
add_subdirectory(app/benchmark_fft)
add_subdirectory(app/benchmark_unbalanced)
add_subdirectory(app/benchmark_matrix)
# Add optional tests
option(PACKAGE_TESTS "Build the tests" ON)
......
......@@ -3,6 +3,10 @@
#### Commit 52fcb51f - Add basic random stealing
Slight improvement, needs further measurement after removing more important bottlenecks.
Below are three individual measurements of the difference.
Overall the trend (sum of all numbers/last number),
go down (98.7%, 96.9% and 100.6%), but with the one measurement
above 100% we think the improvements are minor.
| | | | | | | | | | |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
......@@ -28,13 +32,45 @@ Big improvements of about 6% in our test. This seems like a little,
but 6% from the scheduler is a lot, as the 'main work' is the tasks
itself, not the scheduler.
This change unsurprisingly yields the biggest improvement yet.
#### Commit b9bb90a4 - Try to figure out the 'high thread bottleneck'
We are currently seeing good performance on low core counts
(up to 1/2 of the machines cores), but after that performance
plumishes:
Bana-Pi Best-Case:
<img src="./media/b9bb90a4-banana-pi-best-case.png" width="400"/>
Bana-Pi Average-Case:
<img src="./media/b9bb90a4-banana-pi-average-case.png" width="400"/>
Laptop Best-Case:
<img src="./media/b9bb90a4-laptop-best-case.png" width="400"/>
Laptop Average-Case:
<img src="./media/b9bb90a4-laptop-average-case.png" width="400"/>
As we can see, in average the performance of PLS starts getting
way worse than TBB and EMBB after 4 cores. We suspect this is due
to contemption, but could not resolve it with any combination
of `tas_spinlock` vs `ttas_spinlock` and `lock` vs `try_lock`.
This issue clearly needs further investigation.
### Commit aa27064 - Performance with ttsa spinlocks (and 'full blocking' top level)
<img src="media/aa27064_fft_average.png" width="600"/>
<img src="media/aa27064_fft_average.png" width="400"/>
### Commit d16ad3e - Performance with rw-lock and backoff
<img src="media/d16ad3e_fft_average.png" width="600"/>
<img src="media/d16ad3e_fft_average.png" width="400"/>
### Commit 18b2d744 - Performance with lock-free deque
......@@ -45,7 +81,7 @@ locks we even saw a slight drop in performance).
Currently the FFT benchmark shows the following results (average):
<img src="media/18b2d744_fft_average.png" width="600"/>
<img src="media/18b2d744_fft_average.png" width="400"/>
We want to positively note that the overall trend of 'performance drops'
at the hyperthreading mark is not really bad anymore, it rather
......@@ -58,7 +94,7 @@ This is discouraging after many tests. To see where the overhead lies
we also implemented the unbalanced tree search benchmark,
resulting in the following, suprisingly good, results (average):
<img src="media/18b2d744_unbalanced_average.png" width="600"/>
<img src="media/18b2d744_unbalanced_average.png" width="400"/>
The main difference between the two benchmarks is, that the second
one has more work and the work is relatively independent.
......@@ -77,12 +113,63 @@ down to our level and solely use internal stealing.
Average results FFT:
<img src="media/cf056856_fft_average.png" width="600"/>
<img src="media/cf056856_fft_average.png" width="400"/>
Average results Unbalanced:
<img src="media/cf056856_unbalanced_average.png" width="600"/>
<img src="media/cf056856_unbalanced_average.png" width="400"/>
There seems to be only a minor performance difference between the two,
suggesting tha our two-level approach is not the part causing our
weaker performance.
### Commit afd0331b - Some notes on scaling problems
After tweaking individual values and parameters we can still not find
the main cause for our slowdown on multiple processors.
We also use intel's vtune amplifier to measure performance on our run
and find that we always spend way too much time 'waiting for work',
e.g. in the backoff mechanism when enabled or in the locks for stealing
work when backoff is disabled. This leads us to believe that our problems
might be connected to some issue with work distribution on the FFT case,
as the unbalanced tree search (with a lot 'local' work) performs good.
To get more data in we add benchmarks on matrix multiplication implemented
in two fashions: once with a 'native' array stealing task and once with
a fork-join task. Both implementations use the same minimum array
sub-size of 4 elements and we can hopefully see if they have any
performance differences.
Best case fork-join:
<img src="media/afd0331b_matrix_best_case_fork.png" width="400"/>
Average case fork-join:
<img src="media/afd0331b_matrix_average_case_fork.png" width="400"/>
Best case Native:
<img src="media/afd0331b_matrix_best_case_native.png" width="400"/>
Average case Native:
<img src="media/afd0331b_matrix_average_case_native.png" width="400"/>
What we find very interesting is, that the best case times of our
pls library are very fast (as good as TBB), but the average times
drop badly. We currently do not know why this is the case.
### Commit afd0331b - Intel VTune Amplifier
We did serval measurements with intel's VTune Amplifier profiling
tool. The main thing that we notice is, that the cycles per instruction
for our useful work blocks increase, thus requiring more CPU time
for the acutal useful work.
We also measured an implementation using TBB and found no significante
difference, e.g. TBB also has a higher CPI with 8 threads.
Our conclusion after this long hunting for performance is, that we
might just be bound by some general performance issues with our code.
The next step will therefore be to read the other frameworks and our
code carefully, trying to find potential issues.
......@@ -10,6 +10,10 @@ project setup that uses the PLS library.
Further [general notes](NOTES.md) and [performance notes](PERFORMANCE.md) can be found in
their respective files.
Further notes on [performance](PERFORMANCE.md) and general
[notes](NOTES.md) on the development progress can be found in
the linked documents.
### Installation
Clone the repository and open a terminal session in its folder.
......
......@@ -80,7 +80,7 @@ int main() {
pls::internal::helpers::run_mini_benchmark([&] {
complex_vector input = initial_input;
fft(input.begin(), input.size());
}, 8, 4000);
}, 8, 1000);
PROFILE_SAVE("test_profile.prof")
}
add_executable(benchmark_matrix main.cpp)
target_link_libraries(benchmark_matrix pls)
if (EASY_PROFILER)
target_link_libraries(benchmark_matrix easy_profiler)
endif ()
#include <pls/pls.h>
#include <pls/internal/helpers/profiler.h>
#include <pls/internal/helpers/mini_benchmark.h>
#include <boost/range/irange.hpp>
const int MATRIX_SIZE = 128;
template<typename T, int SIZE>
class matrix {
public:
T data[SIZE][SIZE];
matrix(T i = 1) {
std::fill(&data[0][0], &data[0][0] + SIZE * SIZE, i);
}
void multiply(const matrix<T, SIZE> &a, const matrix<T, SIZE> &b) {
auto range = boost::irange(0, SIZE);
pls::algorithm::parallel_for(range.begin(), range.end(), [&](int i) {
this->multiply_column(i, a, b);
});
}
private:
void multiply_column(int i, const matrix<T, SIZE> &a, const matrix<T, SIZE> &b) {
for (int j = 0; j < SIZE; ++j) {
data[i][j] = 0;
}
for (int k = 0; k < SIZE; ++k) {
for (int j = 0; j < SIZE; ++j) {
data[i][j] += a.data[i][k] * b.data[k][j];
}
}
}
};
void fill_with_data(matrix<double, MATRIX_SIZE> &a, matrix<double, MATRIX_SIZE> &b) {
// Fill in some data...
for (int i = 0; i < MATRIX_SIZE; i++) {
for (int j = 0; j < MATRIX_SIZE; j++) {
a.data[i][j] = i;
b.data[i][j] = j;
}
}
}
int main() {
PROFILE_ENABLE
matrix<double, MATRIX_SIZE> a;
matrix<double, MATRIX_SIZE> b;
matrix<double, MATRIX_SIZE> result;
fill_with_data(a, b);
pls::internal::helpers::run_mini_benchmark([&] {
result.multiply(a, b);
}, 8, 1000);
PROFILE_SAVE("test_profile.prof")
}
//int main() {
// PROFILE_ENABLE
// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18};
// pls::scheduler scheduler{&my_scheduler_memory, 8};
//
// matrix<double, MATRIX_SIZE> a;
// matrix<double, MATRIX_SIZE> b;
// matrix<double, MATRIX_SIZE> result;
// fill_with_data(a, b);
//
// scheduler.perform_work([&] {
// PROFILE_MAIN_THREAD
// for (int i = 0; i < 10; i++) {
// PROFILE_WORK_BLOCK("Top Level")
// result.multiply(a, b);
// }
// });
//
// PROFILE_SAVE("test_profile.prof")
//}
......@@ -4,6 +4,8 @@ add_library(pls STATIC
include/pls/algorithms/invoke_parallel.h
include/pls/algorithms/invoke_parallel_impl.h
include/pls/algorithms/parallel_for.h
include/pls/algorithms/parallel_for_impl.h
include/pls/internal/base/spin_lock.h
include/pls/internal/base/tas_spin_lock.h src/internal/base/tas_spin_lock.cpp
......@@ -15,12 +17,12 @@ add_library(pls STATIC
include/pls/internal/base/system_details.h
include/pls/internal/base/error_handling.h
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/base/backoff.h
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
include/pls/internal/data_structures/deque.h src/internal/data_structures/deque.cpp
include/pls/internal/data_structures/work_stealing_deque.h
include/pls/internal/data_structures/work_stealing_deque.h include/pls/internal/data_structures/work_stealing_deque_impl.h
include/pls/internal/data_structures/stamped_integer.h
include/pls/internal/helpers/prohibit_new.h
include/pls/internal/helpers/profiler.h
......@@ -35,7 +37,8 @@ add_library(pls STATIC
include/pls/internal/scheduling/run_on_n_threads_task.h src/internal/scheduling/run_on_n_threads_task.cpp
include/pls/internal/scheduling/fork_join_task.h src/internal/scheduling/fork_join_task.cpp
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
)
include/pls/internal/scheduling/parallel_iterator_task.h include/pls/internal/scheduling/parallel_iterator_task_impl.h
src/internal/scheduling/parallel_iterator_task.cpp)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......
#ifndef PLS_PARALLEL_FOR_H
#define PLS_PARALLEL_FOR_H
namespace pls {
namespace algorithm {
template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function);
template<typename RandomIt, typename Function>
void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function);
}
}
#include "parallel_for_impl.h"
#endif //PLS_PARALLEL_FOR_H
#ifndef PLS_PARALLEL_FOR_IMPL_H
#define PLS_PARALLEL_FOR_IMPL_H
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/parallel_iterator_task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/helpers/unique_id.h"
namespace pls {
namespace algorithm {
namespace internal {
template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
constexpr long min_elements = 4;
long num_elements = std::distance(first, last);
if (num_elements <= min_elements) {
// calculate last elements in loop to avoid overhead
for (auto current = first; current != last; current++) {
function(*current);
}
} else {
// Cut in half recursively
long middle_index = num_elements / 2;
auto body = [=] { internal::parallel_for(first + middle_index, last, function); };
fork_join_lambda_by_reference<decltype(body)> second_half_task(body);
fork_join_sub_task::current()->spawn_child(second_half_task);
parallel_for(first, first + middle_index, function);
fork_join_sub_task::current()->wait_for_all();
}
}
}
template<typename RandomIt, typename Function>
void parallel_for(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<RandomIt, Function>();
parallel_iterator_task<RandomIt, Function> iterator_task{first, last, function, id};
scheduler::execute_task(iterator_task);
}
template<typename RandomIt, typename Function>
void parallel_for_fork_join(RandomIt first, RandomIt last, const Function &function) {
using namespace ::pls::internal::scheduling;
using namespace ::pls::internal::helpers;
using namespace ::pls::internal::base;
static abstract_task::id id = unique_id::create<RandomIt, Function>();
auto body = [=] { internal::parallel_for(first, last, function); };
fork_join_lambda_by_reference<decltype(body)> root_body(body);
fork_join_task root_task{&root_body, id};
scheduler::execute_task(root_task);
}
}
}
#endif //PLS_INVOKE_PARALLEL_IMPL_H
......@@ -25,26 +25,6 @@ char *next_alignment(char *pointer);
}
template<typename T>
struct aligned_aba_pointer {
const system_details::pointer_t pointer_;
explicit aligned_aba_pointer(T *pointer, unsigned int aba = 0) : pointer_{
reinterpret_cast<system_details::pointer_t >(pointer) + aba} {}
T *pointer() const {
return reinterpret_cast<T *>(pointer_ & system_details::CACHE_LINE_ADDRESS_USED_BITS);
}
unsigned int aba() const {
return pointer_ & system_details::CACHE_LINE_ADDRESS_UNUSED_BITS;
}
aligned_aba_pointer set_aba(unsigned int aba) const {
return aligned_aba_pointer(pointer(), aba);
}
};
}
}
}
......
......@@ -14,7 +14,7 @@ namespace internal {
namespace base {
class backoff {
const unsigned long INITIAL_SPIN_ITERS = 2u << 4u;
const unsigned long INITIAL_SPIN_ITERS = 2u << 1u;
const unsigned long MAX_SPIN_ITERS = 2u << 8u;
const unsigned long MAX_ITERS = 2u << 10u;
const unsigned long YELD_ITERS = 2u << 10u;
......@@ -36,7 +36,7 @@ class backoff {
if (current_ >= YELD_ITERS) {
PROFILE_LOCK("Yield")
this_thread::yield();
this_thread::sleep(5);
}
current_ = std::min(current_ * 2, MAX_ITERS);
......
......@@ -24,26 +24,18 @@ namespace system_details {
* pointer_t should be an integer type capable of holding ANY pointer value.
*/
using pointer_t = std::uintptr_t;
constexpr pointer_t ZERO_POINTER = 0;
constexpr pointer_t MAX_POINTER = ~ZERO_POINTER;
/**
* Biggest type that supports atomic CAS operations.
* Usually it is sane to assume a pointer can be swapped in a single CAS operation.
*/
using cas_integer = pointer_t;
constexpr cas_integer MIN_CAS_INTEGER = 0;
constexpr cas_integer MAX_CAS_INTEGER = ~MIN_CAS_INTEGER;
constexpr cas_integer FIRST_HALF_CAS_INTEGER = MAX_CAS_INTEGER << ((sizeof(cas_integer) / 2) * 8);
constexpr cas_integer SECOND_HALF_CAS_INTEGER = ~FIRST_HALF_CAS_INTEGER;
using cas_integer = std::uintptr_t;
constexpr unsigned long CAS_SIZE = sizeof(cas_integer);
/**
* Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings).
*/
constexpr unsigned int CACHE_LINE_ADDRESS_BITS = 6;
constexpr pointer_t CACHE_LINE_SIZE = 2u << (CACHE_LINE_ADDRESS_BITS - 1);
constexpr pointer_t CACHE_LINE_ADDRESS_USED_BITS = MAX_POINTER << CACHE_LINE_ADDRESS_BITS;
constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BITS;
constexpr pointer_t CACHE_LINE_SIZE = 64;
/**
* Choose one of the following ways to store thread specific data.
......@@ -60,15 +52,9 @@ constexpr pointer_t CACHE_LINE_ADDRESS_UNUSED_BITS = ~CACHE_LINE_ADDRESS_USED_BI
*
* Choose the implementation appropriate for your compiler-cpu combination.
*/
#if (COMPILER == MVCC)
inline void relax_cpu() {
_mm_pause();
}
#elif (COMPILER == GCC || COMPILER == LLVM)
inline void relax_cpu() {
asm("pause");
asm volatile("pause":: : "memory");
}
#endif
}
}
......
#ifndef PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_
#define PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace data_structures {
constexpr unsigned long HALF_CACHE_LINE = base::system_details::CACHE_LINE_SIZE / 2;
struct stamped_integer {
using member_t = base::system_details::cas_integer;
member_t stamp:HALF_CACHE_LINE;
member_t value:HALF_CACHE_LINE;
stamped_integer() : stamp{0}, value{0} {};
stamped_integer(member_t new_value) : stamp{0}, value{new_value} {};
stamped_integer(member_t new_stamp, member_t new_value) : stamp{new_stamp}, value{new_value} {};
};
}
}
}
#endif //PREDICTABLE_PARALLEL_PATTERNS_LIB_PLS_INCLUDE_PLS_INTERNAL_DATA_STRUCTURES_STAMPED_INTEGER_H_
......@@ -3,12 +3,10 @@
#define PLS_WORK_STEALING_DEQUE_H_
#include <atomic>
#include <mutex>
#include <pls/internal/scheduling/thread_state.h>
#include "pls/internal/base/system_details.h"
#include "pls/internal/base/spin_lock.h"
#include "pls/internal/base/error_handling.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/thread_state.h"
#include "aligned_stack.h"
......@@ -16,30 +14,22 @@ namespace pls {
namespace internal {
namespace data_structures {
using cas_integer = base::system_details::cas_integer;
using pointer_t = base::system_details::pointer_t;
static cas_integer get_stamp(cas_integer n) {
return (n & base::system_details::FIRST_HALF_CAS_INTEGER) >> ((sizeof(cas_integer) / 2) * 8);
}
static cas_integer get_offset(cas_integer n) {
return n & base::system_details::SECOND_HALF_CAS_INTEGER;
}
static cas_integer set_stamp(cas_integer n, cas_integer new_value) {
return (new_value << ((sizeof(cas_integer) / 2) * 8)) | (n & base::system_details::SECOND_HALF_CAS_INTEGER);
}
//static cas_integer set_offset(cas_integer n, cas_integer new_value) {
// return new_value | (n & base::system_details::FIRST_HALF_CAS_INTEGER);
//}
using base::system_details::pointer_t;
// Integer split into two halfs, can be used in CAS operations
using data_structures::stamped_integer;
using offset_t = stamped_integer::member_t;
// Single Item in the deque
class work_stealing_deque_item {
// Pointer to the actual data
pointer_t data_;
// Index (relative to stack base) to the next and previous element
cas_integer next_item_;
cas_integer previous_item_;
offset_t next_item_;
offset_t previous_item_;
public:
work_stealing_deque_item() : data_{0}, next_item_{0}, previous_item_{0} {}
work_stealing_deque_item() : data_{0}, next_item_{}, previous_item_{} {}
template<typename Item>
Item *data() {
......@@ -51,18 +41,11 @@ class work_stealing_deque_item {
data_ = reinterpret_cast<pointer_t >(data);
}
cas_integer next_item() {
return next_item_;
}
void set_next_item(cas_integer next_item) {
next_item_ = next_item;
}
cas_integer previous_item() {
return previous_item_;
}
void set_previous_item(cas_integer previous_item) {
previous_item_ = previous_item;
}
offset_t next_item() const { return next_item_; }
void set_next_item(offset_t next_item) { next_item_ = next_item; }
offset_t previous_item() const { return previous_item_; }
void set_previous_item(offset_t previous_item) { previous_item_ = previous_item; }
};
static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE,
"Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!");
......@@ -74,19 +57,16 @@ class work_stealing_deque {
aligned_stack *stack_;
pointer_t base_pointer_;
std::atomic<cas_integer> head_;
std::atomic<cas_integer> tail_;
cas_integer previous_tail_;
base::spin_lock lock_{}; // TODO: Remove after debugging
std::atomic<stamped_integer> head_;
std::atomic<offset_t> tail_;
offset_t previous_tail_;
public:
using state = aligned_stack::state;
explicit work_stealing_deque(aligned_stack *stack) : stack_{stack},
base_pointer_{0},
head_{0},
head_{stamped_integer{0, 0}},
tail_{0},
previous_tail_{0} {
reset_base_pointer();
......@@ -97,144 +77,25 @@ class work_stealing_deque {
tail_{other.tail_.load()},
previous_tail_{other.previous_tail_} {}
void reset_base_pointer() {
base_pointer_ = reinterpret_cast<pointer_t >(stack_->save_state()); // Keep the base of our region in the stack
}
work_stealing_deque_item *item_at(cas_integer position) {
return reinterpret_cast<work_stealing_deque_item *>(base_pointer_
+ (base::system_details::CACHE_LINE_SIZE * position));
}
cas_integer current_stack_offset() {
return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
}
void reset_base_pointer();
work_stealing_deque_item *item_at(offset_t offset);
offset_t current_stack_offset();
template<typename T>
std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item) {
// 'Union' type to push both on stack
using pair_t = std::pair<work_stealing_deque_item, T>;
// Allocate space on stack
auto new_pair = reinterpret_cast<pair_t *>(stack_->push<pair_t>());
// Initialize memory on stack
new((void *) &(new_pair->first)) work_stealing_deque_item();
new((void *) &(new_pair->second)) T(new_item);
return new_pair;
}
std::pair<work_stealing_deque_item, T> *allocate_item(const T &new_item);
template<typename T>
Item *push_tail(const T &new_item) {
cas_integer local_tail = tail_;
auto new_pair = allocate_item(new_item);
// Prepare current tail to point to correct next items
auto tail_deque_item = item_at(local_tail);
tail_deque_item->set_data(&(new_pair->second));
tail_deque_item->set_next_item(current_stack_offset());
tail_deque_item->set_previous_item(previous_tail_);
previous_tail_ = local_tail;
// Linearization point, item appears after this write
cas_integer new_tail = current_stack_offset();
tail_ = new_tail;
return &(new_pair->second);
}
Item *pop_tail() {
cas_integer local_tail = tail_;
cas_integer local_head = head_;
if (local_tail <= get_offset(local_head)) {
return nullptr; // EMPTY
}
work_stealing_deque_item *previous_tail_item = item_at(previous_tail_);
cas_integer new_tail = previous_tail_;
previous_tail_ = previous_tail_item->previous_item();
// Publish our wish to set the tail back
tail_ = new_tail;
// Get the state of local head AFTER we published our wish
local_head = head_; // Linearization point, outside knows list is empty
if (get_offset(local_head) < new_tail) {
return previous_tail_item->data<Item>(); // Success, enough distance to other threads
}
if (get_offset(local_head) == new_tail) {
cas_integer new_head = set_stamp(new_tail, get_stamp(local_head) + 1);
// Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) {
return previous_tail_item->data<Item>(); // SUCCESS, we won the competition with other threads
}
}
// Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_
tail_ = get_offset(local_head); // ...we give up to the other winning thread
return nullptr; // EMPTY, we lost the competition with other threads
}
Item *pop_head() {
cas_integer local_head = head_;
cas_integer local_tail = tail_;
if (local_tail <= get_offset(local_head)) {
return nullptr; // EMPTY
}
// Load info on current deque item.
// In case we have a race with a new (aba) overwritten item at this position,
// there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(get_offset(local_head));
cas_integer next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>();
// We try to set the head to this new position.
// Possible outcomes:
// 1) no one interrupted us, we win this competition
// 2) other thread took the head, we lose to this
// 3) owning thread removed tail, we lose to this
cas_integer new_head = set_stamp(next_item_offset, get_stamp(local_head) + 1);
if (head_.compare_exchange_strong(local_head, new_head)) {
return head_data_item; // SUCCESS, we won the competition
}
return nullptr; // EMPTY, we lost the competition
}
void release_memory_until(state state) {
cas_integer item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
T *push_tail(const T &new_item);
Item *pop_tail();
Item *pop_head();
cas_integer local_head = head_;
cas_integer local_tail = tail_;
stack_->reset_state(state);
if (item_offset < local_tail) {
tail_ = item_offset;
if (get_offset(local_head) >= local_tail) {
head_ = set_stamp(item_offset, get_stamp(local_head) + 1);
}
}
}
void release_memory_until(Item *item) {
release_memory_until(reinterpret_cast<pointer_t >(item));
}
state save_state() {
return stack_->save_state();
}
void release_memory_until(state state);
state save_state();
};
}
}
}
#include "work_stealing_deque_impl.h"
#endif //PLS_WORK_STEALING_DEQUE_H_
#ifndef PLS_WORK_STEALING_DEQUE_IMPL_H_
#define PLS_WORK_STEALING_DEQUE_IMPL_H_
namespace pls {
namespace internal {
namespace data_structures {
template<typename Item>
void work_stealing_deque<Item>::reset_base_pointer() {
base_pointer_ = reinterpret_cast<pointer_t >(stack_->save_state()); // Keep the base of our region in the stack
}
template<typename Item>
work_stealing_deque_item *work_stealing_deque<Item>::item_at(offset_t offset) {
return reinterpret_cast<work_stealing_deque_item *>(base_pointer_
+ (base::system_details::CACHE_LINE_SIZE * offset));
}
template<typename Item>
offset_t work_stealing_deque<Item>::current_stack_offset() {
return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
}
template<typename Item>
template<typename T>
std::pair<work_stealing_deque_item, T> *work_stealing_deque<Item>::allocate_item(const T &new_item) {
// 'Union' type to push both on stack
using pair_t = std::pair<work_stealing_deque_item, T>;
// Allocate space on stack
auto new_pair = reinterpret_cast<pair_t *>(stack_->push<pair_t>());
// Initialize memory on stack
new((void *) &(new_pair->first)) work_stealing_deque_item();
new((void *) &(new_pair->second)) T(new_item);
return new_pair;
}
template<typename Item>
template<typename T>
T *work_stealing_deque<Item>::push_tail(const T &new_item) {
static_assert(std::is_same<Item, T>::value || std::is_base_of<Item, T>::value,
"Must only push types of <Item> onto work_stealing_deque<Item>");
offset_t local_tail = tail_;
auto new_pair = allocate_item(new_item);
// Prepare current tail to point to correct next items
auto tail_deque_item = item_at(local_tail);
tail_deque_item->set_data(&(new_pair->second));
tail_deque_item->set_next_item(current_stack_offset());
tail_deque_item->set_previous_item(previous_tail_);
previous_tail_ = local_tail;
// Linearization point, item appears after this write
offset_t new_tail = current_stack_offset();
tail_ = new_tail;
return &(new_pair->second);
}
template<typename Item>
Item *work_stealing_deque<Item>::pop_tail() {
offset_t local_tail = tail_;
stamped_integer local_head = head_;
if (local_tail <= local_head.value) {
return nullptr; // EMPTY
}
work_stealing_deque_item *previous_tail_item = item_at(previous_tail_);
offset_t new_tail = previous_tail_;
previous_tail_ = previous_tail_item->previous_item();
// Publish our wish to set the tail back
tail_ = new_tail;
// Get the state of local head AFTER we published our wish
local_head = head_; // Linearization point, outside knows list is empty
if (local_head.value < new_tail) {
return previous_tail_item->data<Item>(); // Success, enough distance to other threads
}
if (local_head.value == new_tail) {
stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail};
// Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) {
return previous_tail_item->data<Item>(); // SUCCESS, we won the competition with other threads
}
}
// Some other thread either won the competition or it already set the head further than we are
// before we even tried to compete with it.
// Reset the queue into an empty state => head_ = tail_
tail_ = local_head.value; // ...we give up to the other winning thread
return nullptr; // EMPTY, we lost the competition with other threads
}
template<typename Item>
Item *work_stealing_deque<Item>::pop_head() {
stamped_integer local_head = head_;
offset_t local_tail = tail_;
if (local_tail <= local_head.value) {
return nullptr; // EMPTY
}
// Load info on current deque item.
// In case we have a race with a new (aba) overwritten item at this position,
// there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(local_head.value);
offset_t next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>();
// We try to set the head to this new position.
// Possible outcomes:
// 1) no one interrupted us, we win this competition
// 2) other thread took the head, we lose to this
// 3) owning thread removed tail, we lose to this
stamped_integer new_head = stamped_integer{local_head.stamp + 1, next_item_offset};
if (head_.compare_exchange_strong(local_head, new_head)) {
return head_data_item; // SUCCESS, we won the competition
}
return nullptr; // EMPTY, we lost the competition
}
template<typename Item>
void work_stealing_deque<Item>::release_memory_until(state state) {
unsigned long item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
stamped_integer local_head = head_;
offset_t local_tail = tail_;
stack_->reset_state(state);
if (item_offset < local_tail) {
tail_ = item_offset;
if (local_head.value >= local_tail) {
head_ = stamped_integer{local_head.stamp + 1, item_offset};
}
}
}
template<typename Item>
typename work_stealing_deque<Item>::state work_stealing_deque<Item>::save_state() {
return stack_->save_state();
}
}
}
}
#endif //PLS_WORK_STEALING_DEQUE_IMPL_H_
#ifndef PLS_PARALLEL_ITERATOR_TASK_H
#define PLS_PARALLEL_ITERATOR_TASK_H
#include "pls/internal/data_structures/stamped_integer.h"
#include "abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
using data_structures::stamped_integer;
template<typename RandomIt, typename Function>
class parallel_iterator_task : public abstract_task {
alignas(64) const int step = 8;
alignas(64) RandomIt first_, last_;
alignas(64) Function function_;
// External stealing
alignas(64) std::atomic<size_t> first_index_;
alignas(64) std::atomic<size_t> to_be_processed_;
alignas(64) std::atomic<stamped_integer> last_index_;
alignas(64) parallel_iterator_task *parent_;
bool steal_front(size_t &stolen_max_index);
bool steal_back(size_t &stolen_first_index, size_t &stolen_last_index);
protected:
bool internal_stealing(abstract_task *other_task) override;
bool split_task(base::swmr_spin_lock * /*lock*/) override;
public:
explicit parallel_iterator_task(RandomIt first, RandomIt last, Function function, const abstract_task::id &id);
parallel_iterator_task(const parallel_iterator_task &other);
void execute() override;
};
}
}
}
#include "parallel_iterator_task_impl.h"
#endif //PLS_PARALLEL_ITERATOR_TASK_H
#ifndef PLS_PARALLEL_ITERATOR_TASK_IMPL_H
#define PLS_PARALLEL_ITERATOR_TASK_IMPL_H
#include "scheduler.h"
namespace pls {
namespace internal {
namespace scheduling {
template<typename RandomIt, typename Function>
parallel_iterator_task<RandomIt, Function>::parallel_iterator_task
(RandomIt first, RandomIt last, Function function, const abstract_task::id &id):
abstract_task(0, id),
first_{first},
last_{last},
function_{function},
first_index_{0},
to_be_processed_{std::distance(first, last)},
last_index_{stamped_integer{0, std::distance(first, last)}},
parent_{nullptr} {}
template<typename RandomIt, typename Function>
parallel_iterator_task<RandomIt,
Function>::parallel_iterator_task(const pls::internal::scheduling::parallel_iterator_task<
RandomIt,
Function> &other):
abstract_task{other.depth(), other.unique_id()},
first_{other.first_},
last_{other.last_},
function_{other.function_},
first_index_{other.first_index_.load()},
to_be_processed_{other.to_be_processed_.load()},
last_index_{other.last_index_.load()},
parent_{other.parent_} {}
template<typename RandomIt, typename Function>
void parallel_iterator_task<RandomIt, Function>::execute() {
// Start processing at beginning of our data
size_t current_index = 0;
auto current_iterator = first_;
// Keep going as long as we have data
while (true) {
// Claim next chunk of data for us
size_t local_max_index;
if (!steal_front(local_max_index)) {
break;
}
// Process Chunk
for (; current_index != local_max_index; current_index++) {
function_(*(current_iterator++));
}
}
to_be_processed_ -= current_index;
while (to_be_processed_.load() > 0)
steal_work();
if (parent_ != nullptr) {
parent_->to_be_processed_ -= std::distance(first_, last_);
}
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::steal_front(size_t &stolen_max) {
auto local_first_index = first_index_.load();
auto local_last_index = last_index_.load();
if (local_first_index >= local_last_index.value) {
return false;
}
// Proceed the first index == take part of the work for us
auto new_first_index = std::min(local_first_index + step, local_last_index.value);
first_index_ = new_first_index;
// Reload last index
local_last_index = last_index_.load();
// Enough distance
if (new_first_index < local_last_index.value) {
stolen_max = new_first_index;
return true;
}
// Fight over last element
if (new_first_index == local_last_index.value) {
auto new_last_index = stamped_integer{local_last_index.stamp + 1, local_last_index.value};
if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) {
stolen_max = new_first_index;
return true;
}
}
// All iterator elements are assigned to some executor
return false;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::steal_back(size_t &stolen_first_index, size_t &stolen_last_index) {
auto local_first_index = first_index_.load();
auto local_last_index = last_index_.load();
if (local_first_index >= local_last_index.value) {
return false;
}
// Try to steal using cas
auto target_last_index = std::max(local_last_index.value - step, local_first_index);
auto new_last_index = stamped_integer{local_last_index.stamp + 1, target_last_index};
if (last_index_.compare_exchange_strong(local_last_index, new_last_index)) {
stolen_first_index = new_last_index.value;
stolen_last_index = local_last_index.value;
return true;
}
return false;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::split_task(base::swmr_spin_lock *lock) {
auto depth = this->depth();
auto id = this->unique_id();
size_t stolen_first_index, stolen_last_index;
if (!steal_back(stolen_first_index, stolen_last_index)) {
lock->reader_unlock();
return false;
}
lock->reader_unlock();
parallel_iterator_task new_task{first_ + stolen_first_index, first_ + stolen_last_index, function_, id};
new_task.parent_ = this;
scheduler::execute_task(new_task, depth);
return true;
}
template<typename RandomIt, typename Function>
bool parallel_iterator_task<RandomIt, Function>::internal_stealing(abstract_task */*other_task*/) {
// Do not allow for now, eases up on ABA problem
return false;
}
}
}
}
#endif //PLS_PARALLEL_ITERATOR_TASK_IMPL_H
......@@ -2,6 +2,7 @@
#define PLS_LIBRARY_H
#include "pls/algorithms/invoke_parallel.h"
#include "pls/algorithms/parallel_for.h"
#include "pls/internal/scheduling/abstract_task.h"
#include "pls/internal/scheduling/fork_join_task.h"
#include "pls/internal/scheduling/scheduler.h"
......@@ -23,6 +24,8 @@ using internal::scheduling::fork_join_lambda_by_value;
using internal::scheduling::fork_join_task;
using algorithm::invoke_parallel;
using algorithm::parallel_for_fork_join;
using algorithm::parallel_for;
}
......
......@@ -10,7 +10,7 @@ namespace internal {
namespace scheduling {
bool abstract_task::steal_work() {
thread_local static base::backoff backoff{};
// thread_local static base::backoff backoff{};
PROFILE_STEALING("abstract_task::steal_work")
const auto my_state = base::this_thread::state<thread_state>();
......@@ -22,7 +22,7 @@ bool abstract_task::steal_work() {
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % my_scheduler->num_threads();
if (target == my_id) {
target = (target + 1) % my_scheduler->num_threads();
continue;
}
auto target_state = my_scheduler->thread_state_for(target);
......@@ -47,7 +47,7 @@ bool abstract_task::steal_work() {
if (internal_stealing(current_task)) {
// internal steal was a success, hand it back to the internal scheduler
target_state->lock_.reader_unlock();
backoff.reset();
// backoff.reset();
return true;
}
......@@ -65,7 +65,7 @@ bool abstract_task::steal_work() {
auto lock = &target_state->lock_;
if (current_task->split_task(lock)) {
// top level steal was a success (we did a top level task steal)
backoff.reset();
// backoff.reset();
return false;
}
......@@ -76,7 +76,8 @@ bool abstract_task::steal_work() {
}
// internal steal was no success
backoff.do_backoff();
// backoff.do_backoff();
// base::this_thread::sleep(5);
return false;
}
......
#include "pls/internal/scheduling/parallel_iterator_task.h"
......@@ -201,11 +201,11 @@ TEST_CASE("work stealing deque stores objects correctly", "[internal/data_struct
SECTION("handles stack reset 1 correctly when emptied by tail") {
deque.push_tail(one);
auto state = deque.save_state();
deque.push_tail(two);
auto tmp_result = deque.pop_tail();
REQUIRE(*tmp_result == two);
REQUIRE(*deque.pop_tail() == two);
deque.release_memory_until(tmp_result);
deque.release_memory_until(state);
REQUIRE(*deque.pop_tail() == one);
deque.push_tail(three);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment