Commit 82ccdc91 by FritzFlorian

Update: add spawn_and_sync to final version for benchmarks.

parent 9fa9296a
Pipeline #1547 canceled with stages
in 32 minutes 33 seconds
......@@ -18,10 +18,9 @@ void pls_conquer(fft::complex_vector::iterator data, fft::complex_vector::iterat
pls::spawn([data, n, swap_array]() {
pls_conquer(data, swap_array, n / 2);
});
pls::spawn([data, n, swap_array]() {
pls::spawn_and_sync([data, n, swap_array]() {
pls_conquer(data + n / 2, swap_array + n / 2, n / 2);
});
pls::sync();
}
fft::combine(data, n);
......
#include "pls/pls.h"
#include "benchmark_runner.h"
#include "benchmark_base/fib.h"
using namespace comparison_benchmarks::base;
constexpr int MAX_NUM_TASKS = 32;
constexpr int MAX_STACK_SIZE = 4096 * 1;
int pls_fib(int n) {
......@@ -20,10 +16,9 @@ int pls_fib(int n) {
pls::spawn([n, &a]() {
a = pls_fib(n - 1);
});
pls::spawn([n, &b]() {
pls::spawn_and_sync([n, &b]() {
b = pls_fib(n - 2);
});
pls::sync();
return a + b;
}
......@@ -35,7 +30,7 @@ int main(int argc, char **argv) {
string full_directory = settings.output_directory_ + "/PLS_v3/";
benchmark_runner runner{full_directory, test_name};
pls::scheduler scheduler{(unsigned) settings.num_threads_, MAX_NUM_TASKS, MAX_STACK_SIZE};
pls::scheduler scheduler{(unsigned) settings.num_threads_, settings.size_ + 2, MAX_STACK_SIZE};
volatile int res;
if (settings.type_ == benchmark_runner::benchmark_settings::ISOLATED) {
......
......@@ -11,8 +11,7 @@ void invoke(Function1 &&function1, Function2 &&function2) {
using namespace ::pls::internal::scheduling;
scheduler::spawn(std::forward<Function1>(function1));
scheduler::spawn(std::forward<Function2>(function2));
scheduler::sync();
scheduler::spawn_and_sync(std::forward<Function2>(function2));
}
template<typename Function1, typename Function2, typename Function3>
......@@ -21,8 +20,7 @@ void invoke(Function1 &&function1, Function2 &&function2, Function3 &&function3)
scheduler::spawn(std::forward<Function1>(function1));
scheduler::spawn(std::forward<Function2>(function2));
scheduler::spawn(std::forward<Function3>(function3));
scheduler::sync();
scheduler::spawn_and_sync(std::forward<Function3>(function3));
}
}
......
......@@ -207,8 +207,8 @@ void scheduler::spawn_internal(Function &&lambda) {
spawning_state.get_task_manager().push_local_task(last_task);
#if PLS_SLEEP_WORKERS_ON_EMPTY
// TODO: relax atomic operations on empty flag
data_structures::stamped_integer queue_empty_flag = spawning_state.get_queue_empty_flag().load();
data_structures::stamped_integer
queue_empty_flag = spawning_state.get_queue_empty_flag().load(std::memory_order_relaxed);
switch (queue_empty_flag.value_) {
case EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY: {
// The queue was not found empty, ignore it.
......@@ -218,7 +218,8 @@ void scheduler::spawn_internal(Function &&lambda) {
// Someone tries to mark us empty and might be re-stealing right now.
data_structures::stamped_integer
queue_non_empty_flag{queue_empty_flag.stamp_++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
auto actual_empty_flag = spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag);
auto actual_empty_flag =
spawning_state.get_queue_empty_flag().exchange(queue_non_empty_flag, std::memory_order_acq_rel);
if (actual_empty_flag.value_ == EMPTY_QUEUE_STATE::QUEUE_EMPTY) {
spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake();
}
......@@ -228,7 +229,7 @@ void scheduler::spawn_internal(Function &&lambda) {
// Someone already marked the queue empty, we must revert its action on the central queue.
data_structures::stamped_integer
queue_non_empty_flag{queue_empty_flag.stamp_++, EMPTY_QUEUE_STATE::QUEUE_NON_EMPTY};
spawning_state.get_queue_empty_flag().store(queue_non_empty_flag);
spawning_state.get_queue_empty_flag().store(queue_non_empty_flag, std::memory_order_release);
spawning_state.get_scheduler().empty_queue_decrease_counter_and_wake();
break;
}
......
......@@ -114,8 +114,16 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul
if (forwarding_stamp != expected_top.stamp_) {
// ...we failed because the top tag lags behind...try to fix it.
// This means only updating the tag, as this location can still hold data we need.
top_.compare_exchange_strong(expected_top, {forwarding_stamp, expected_top.value_}, std::memory_order_relaxed);
return nullptr;
data_structures::stamped_integer forwarded_top{forwarding_stamp, expected_top.value_};
if (top_.compare_exchange_strong(expected_top,
forwarded_top,
std::memory_order_relaxed)) {
// We could update the top as needed, continue the pop request
expected_top = forwarded_top;
} else {
// We did not get to update the top tag, back off
return nullptr;
}
}
// Try to get it by CAS with the expected field entry, giving up our offered_task for it
......
......@@ -68,8 +68,8 @@ void scheduler::work_thread_work_section() {
thread_state &target_state = my_state.get_scheduler().thread_state_for(target);
#if PLS_SLEEP_WORKERS_ON_EMPTY
queue_empty_flag_retry_steal:
// TODO: relax atomics for empty flag
data_structures::stamped_integer target_queue_empty_flag = target_state.get_queue_empty_flag().load();
data_structures::stamped_integer
target_queue_empty_flag = target_state.get_queue_empty_flag().load(std::memory_order_relaxed);
#endif
base_task *stolen_task;
......@@ -93,7 +93,7 @@ void scheduler::work_thread_work_section() {
strain_local_resource::acquire_locally(stolen_resources, my_state.get_thread_id());
PLS_ASSERT_EXPENSIVE(check_task_chain_forward(*my_state.get_active_task()),
"We are sole owner of this chain, it has to be valid!");
"We are sole owner of this chain, it has to be valid!");
// Execute the stolen task by jumping to it's continuation.
PLS_ASSERT(stolen_task->continuation_.valid(),
......@@ -125,7 +125,8 @@ void scheduler::work_thread_work_section() {
data_structures::stamped_integer
maybe_empty_flag{target_queue_empty_flag.stamp_ + 1, EMPTY_QUEUE_STATE::QUEUE_MAYBE_EMPTY};
if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag,
maybe_empty_flag)) {
maybe_empty_flag,
std::memory_order_acq_rel)) {
goto queue_empty_flag_retry_steal;
}
break;
......@@ -135,7 +136,9 @@ void scheduler::work_thread_work_section() {
// We can safely mark it empty and increment the central counter.
data_structures::stamped_integer
empty_flag{target_queue_empty_flag.stamp_ + 1, EMPTY_QUEUE_STATE::QUEUE_EMPTY};
if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag, empty_flag)) {
if (target_state.get_queue_empty_flag().compare_exchange_strong(target_queue_empty_flag,
empty_flag,
std::memory_order_acq_rel)) {
// We marked it empty, now its our duty to modify the central counter
my_state.get_scheduler().empty_queue_increase_counter();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment