Commit 2f539691 by FritzFlorian

Minor changes for profiling and add more alignment.

The idea is to exclude as many sources as possible that could lead to issues with contention and cache misses. After some experimentation, we think that hyperthreading is simply not working very well with our kind of workload. In the future we might simply test on other hardware.
parent e34ea267
Pipeline #1364 failed with stages
in 41 seconds
# Notes on performance measures during development
#### Commit e34ea267 - 05.12.2019 - First Version of new Algorithm - Scaling Problems
The first version of our memory trading work stealing algorithm works. It still shows scaling issues over
the hyperthreading mark, very similar to what we have seen in version 1. This indicates some sort of
contention between the threads when running the FFT algorithm.
Analyzing the current version we find issue with the frequent call to `thread_state_for(id)` in
the stealing loop.
![](./media/e34ea267_thread_state_for.png)
It is obvious that the method takes some amount of runtime, as FFT has a structure that tends to only
work on the continuations in the end of the computation (the critical path of FFT can only be executed
after most parallel tasks are done).
![](./media/e34ea267_fft_execution_pattern.png)
What we can see here is the long tail of continuations running at the end of the computation. During
this time the non working threads constantly steal, thus requiring the `thread_state_for(id)`
virtual method, potentially hindering other threads from doing their work properly.
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/helpers/profiler.h"
using namespace pls::internal::scheduling;
#include <iostream>
......@@ -9,7 +10,7 @@ using namespace pls::internal::scheduling;
#include <atomic>
static constexpr int CUTOFF = 16;
static constexpr int INPUT_SIZE = 8192;
static constexpr int INPUT_SIZE = 16384;
typedef std::vector<std::complex<double>> complex_vector;
void divide(complex_vector::iterator data, int n) {
......@@ -90,8 +91,8 @@ complex_vector prepare_input(int input_size) {
return data;
}
static constexpr int NUM_ITERATIONS = 1000;
constexpr size_t NUM_THREADS = 5;
static constexpr int NUM_ITERATIONS = 500;
constexpr size_t NUM_THREADS = 2;
constexpr size_t NUM_TASKS = 128;
......@@ -99,6 +100,7 @@ constexpr size_t NUM_CONTS = 128;
constexpr size_t MAX_CONT_SIZE = 512;
int main() {
PROFILE_ENABLE;
complex_vector initial_input = prepare_input(INPUT_SIZE);
static_scheduler_memory<NUM_THREADS,
......@@ -112,6 +114,7 @@ int main() {
for (int i = 0; i < NUM_ITERATIONS; i++) {
complex_vector input_2(initial_input);
scheduler.perform_work([&]() {
PROFILE_MAIN_THREAD;
return scheduler::par([&]() {
return fft(input_2.begin(), INPUT_SIZE);
}, []() {
......@@ -120,10 +123,12 @@ int main() {
return parallel_result<int>{0};
});
});
PROFILE_LOCK("DONE");
}
auto end = std::chrono::steady_clock::now();
std::cout << "Framework: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< std::endl;
PROFILE_SAVE("test_profile.prof");
start = std::chrono::steady_clock::now();
for (int i = 0; i < NUM_ITERATIONS; i++) {
......
#include <pls/pls.h>
#include <pls/internal/helpers/profiler.h>
#include <pls/internal/helpers/mini_benchmark.h>
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/algorithms/for_each.h"
using namespace pls::internal::scheduling;
#include <chrono>
......@@ -15,8 +17,8 @@ class matrix {
std::fill(&data[0][0], &data[0][0] + SIZE * SIZE, i);
}
void multiply(const matrix<T, SIZE> &a, const matrix<T, SIZE> &b) {
pls::for_each_range(0, SIZE, [&](int i) {
parallel_result<int> multiply(const matrix<T, SIZE> &a, const matrix<T, SIZE> &b) {
return pls::algorithm::for_each_range(0, SIZE, [&](int i) {
this->multiply_column(i, a, b);
});
}
......@@ -44,6 +46,14 @@ void fill_with_data(matrix<double, MATRIX_SIZE> &a, matrix<double, MATRIX_SIZE>
}
}
static constexpr int NUM_ITERATIONS = 1000;
constexpr size_t NUM_THREADS = 3;
constexpr size_t NUM_TASKS = 128;
constexpr size_t NUM_CONTS = 128;
constexpr size_t MAX_CONT_SIZE = 512;
int main() {
PROFILE_ENABLE
matrix<double, MATRIX_SIZE> a;
......@@ -51,11 +61,29 @@ int main() {
matrix<double, MATRIX_SIZE> result;
fill_with_data(a, b);
pls::internal::helpers::run_mini_benchmark([&] {
result.multiply(a, b);
}, 8, 1000);
static_scheduler_memory<NUM_THREADS,
NUM_TASKS,
NUM_CONTS,
MAX_CONT_SIZE> static_scheduler_memory;
scheduler scheduler{static_scheduler_memory, NUM_THREADS};
PROFILE_SAVE("test_profile.prof")
auto start = std::chrono::steady_clock::now();
for (int i = 0; i < NUM_ITERATIONS; i++) {
scheduler.perform_work([&]() {
PROFILE_MAIN_THREAD;
return scheduler::par([&]() {
return result.multiply(a, b);
}, []() {
return parallel_result<int>{0};
}).then([](int, int) {
return parallel_result<int>{0};
});
});
}
auto end = std::chrono::steady_clock::now();
std::cout << "Framework: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< std::endl;
}
//int main() {
......
#include <pls/pls.h>
#include <pls/internal/helpers/profiler.h>
#include <pls/internal/helpers/mini_benchmark.h>
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/scheduler_memory.h"
using namespace pls::internal::scheduling;
#include "node.h"
......@@ -11,7 +12,7 @@ const int NORMAL_CHILDREN = 8;
const int NUM_NODES = 71069;
int count_child_nodes(uts::node &node) {
parallel_result<int> count_child_nodes(uts::node &node) {
int child_count = 1;
std::vector<uts::node> children = node.spawn_child_nodes();
......@@ -36,7 +37,7 @@ int count_child_nodes(uts::node &node) {
return child_count;
}
int unbalanced_tree_search(int seed, int root_children, double q, int normal_children) {
parallel_result<int> unbalanced_tree_search(int seed, int root_children, double q, int normal_children) {
int result;
auto lambda = [&] {
......@@ -50,11 +51,33 @@ int unbalanced_tree_search(int seed, int root_children, double q, int normal_chi
return result;
}
constexpr size_t NUM_THREADS = 5;
constexpr size_t NUM_TASKS = 128;
constexpr size_t NUM_CONTS = 128;
constexpr size_t MAX_CONT_SIZE = 512;
volatile int result;
int main() {
PROFILE_ENABLE
pls::internal::helpers::run_mini_benchmark([&] {
unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
}, 8, 2000);
static_scheduler_memory<NUM_THREADS,
NUM_TASKS,
NUM_CONTS,
MAX_CONT_SIZE> static_scheduler_memory;
scheduler scheduler{static_scheduler_memory, NUM_THREADS};
scheduler.perform_work([&]() {
return scheduler::par([&]() {
return unbalanced_tree_search(SEED, ROOT_CHILDREN, Q, NORMAL_CHILDREN);
}, []() {
return parallel_result<int>{0};
}).then([](int a, int) {
result = a;
return parallel_result<int>{0};
});
});
PROFILE_SAVE("test_profile.prof")
}
......
......@@ -8,7 +8,7 @@
using namespace pls::internal;
constexpr size_t NUM_THREADS = 1;
constexpr size_t NUM_THREADS = 4;
constexpr size_t NUM_TASKS = 128;
static constexpr int NUM_ITERATIONS = 100;
......@@ -29,11 +29,8 @@ int fib_normal(int n) {
}
scheduling::parallel_result<int> fib(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
if (n <= 10) {
return fib_normal(n);
}
return scheduling::scheduler::par([=]() {
......@@ -47,6 +44,7 @@ scheduling::parallel_result<int> fib(int n) {
static volatile int result;
int main() {
PROFILE_ENABLE;
scheduling::static_scheduler_memory<NUM_THREADS,
NUM_TASKS,
NUM_CONTS,
......@@ -56,7 +54,7 @@ int main() {
auto start = std::chrono::steady_clock::now();
for (int i = 0; i < NUM_ITERATIONS; i++) {
result = fib_normal(30);
result = fib_normal(35);
}
auto end = std::chrono::steady_clock::now();
std::cout << "Normal: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
......@@ -66,16 +64,20 @@ int main() {
for (int i = 0; i < NUM_ITERATIONS; i++) {
scheduler.perform_work([]() {
PROFILE_MAIN_THREAD;
return scheduling::scheduler::par([]() {
return scheduling::parallel_result<int>(0);
}, []() {
return fib(30);
return fib(35);
}).then([](int, int b) {
result = b;
PROFILE_LOCK("DONE");
return scheduling::parallel_result<int>{0};
});
});
PROFILE_LOCK("DONE");
}
PROFILE_SAVE("test_profile.prof");
end = std::chrono::steady_clock::now();
std::cout << "Framework: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << std::endl;
......
......@@ -2,6 +2,8 @@
#ifndef PLS_PARALLEL_FOR_H
#define PLS_PARALLEL_FOR_H
#include "pls/internal/scheduling/parallel_result.h"
namespace pls {
namespace algorithm {
......@@ -9,19 +11,26 @@ class fixed_strategy;
class dynamic_strategy;
template<typename Function, typename ExecutionStrategy>
void for_each_range(unsigned long first,
pls::internal::scheduling::parallel_result<int> for_each_range(unsigned long first,
unsigned long last,
const Function &function,
ExecutionStrategy &execution_strategy);
template<typename Function>
void for_each_range(unsigned long first, unsigned long last, const Function &function);
pls::internal::scheduling::parallel_result<int> for_each_range(unsigned long first,
unsigned long last,
const Function &function);
template<typename RandomIt, typename Function, typename ExecutionStrategy>
void for_each(RandomIt first, RandomIt last, const Function &function, ExecutionStrategy execution_strategy);
pls::internal::scheduling::parallel_result<int> for_each(RandomIt first,
RandomIt last,
const Function &function,
ExecutionStrategy execution_strategy);
template<typename RandomIt, typename Function>
void for_each(RandomIt first, RandomIt last, const Function &function);
pls::internal::scheduling::parallel_result<int> for_each(RandomIt first,
RandomIt last,
const Function &function);
}
}
......
......@@ -2,11 +2,8 @@
#ifndef PLS_PARALLEL_FOR_IMPL_H
#define PLS_PARALLEL_FOR_IMPL_H
#include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/helpers/unique_id.h"
#include "pls/internal/helpers/range.h"
namespace pls {
......@@ -14,7 +11,10 @@ namespace algorithm {
namespace internal {
template<typename RandomIt, typename Function>
void for_each(const RandomIt first, const RandomIt last, const Function function, const long min_elements) {
pls::internal::scheduling::parallel_result<int> for_each(const RandomIt first,
const RandomIt last,
const Function function,
const long min_elements) {
using namespace ::pls::internal::scheduling;
const long num_elements = std::distance(first, last);
......@@ -23,29 +23,25 @@ void for_each(const RandomIt first, const RandomIt last, const Function function
for (auto current = first; current != last; current++) {
function(*current);
}
return parallel_result<int>{0};
} else {
// Cut in half recursively
const long middle_index = num_elements / 2;
auto second_half_body =
[first, middle_index, last, &function, min_elements] {
internal::for_each(first + middle_index,
last,
return scheduler::par([first, middle_index, last, &function, min_elements] {
return internal::for_each(first,
first + middle_index,
function,
min_elements);
};
using second_half_t = lambda_task_by_reference<decltype(second_half_body)>;
scheduler::spawn_child<second_half_t>(std::move(second_half_body));
auto first_half_body =
[first, middle_index, last, &function, min_elements] {
internal::for_each(first,
first + middle_index,
}, [first, middle_index, last, &function, min_elements] {
return internal::for_each(first + middle_index,
last,
function,
min_elements);
};
using first_half_t = lambda_task_by_reference<decltype(first_half_body)>;
scheduler::spawn_child_and_wait<first_half_t>(std::move(first_half_body));
}).then([](int, int) {
return parallel_result<int>{0};
});
}
}
......@@ -56,7 +52,7 @@ class dynamic_strategy {
explicit dynamic_strategy(const unsigned int tasks_per_thread = 4) : tasks_per_thread_{tasks_per_thread} {};
long calculate_min_elements(long num_elements) const {
const long num_threads = pls::internal::scheduling::thread_state::get()->scheduler_->num_threads();
const long num_threads = pls::internal::scheduling::thread_state::get().scheduler_->num_threads();
return num_elements / (num_threads * tasks_per_thread_);
}
private:
......@@ -75,29 +71,34 @@ class fixed_strategy {
};
template<typename RandomIt, typename Function, typename ExecutionStrategy>
void for_each(RandomIt first, RandomIt last, const Function &function, ExecutionStrategy execution_strategy) {
pls::internal::scheduling::parallel_result<int> for_each(RandomIt first,
RandomIt last,
const Function &function,
ExecutionStrategy execution_strategy) {
long num_elements = std::distance(first, last);
internal::for_each(first, last, function, execution_strategy.calculate_min_elements(num_elements));
return internal::for_each(first, last, function, execution_strategy.calculate_min_elements(num_elements));
}
template<typename RandomIt, typename Function>
void for_each(RandomIt first, RandomIt last, const Function &function) {
for_each(first, last, function, dynamic_strategy{4});
pls::internal::scheduling::parallel_result<int> for_each(RandomIt first, RandomIt last, const Function &function) {
return for_each(first, last, function, dynamic_strategy{4});
}
template<typename Function, typename ExecutionStrategy>
void for_each_range(unsigned long first,
pls::internal::scheduling::parallel_result<int> for_each_range(unsigned long first,
unsigned long last,
const Function &function,
ExecutionStrategy execution_strategy) {
auto range = pls::internal::helpers::range(first, last);
for_each(range.begin(), range.end(), function, execution_strategy);
return for_each(range.begin(), range.end(), function, execution_strategy);
}
template<typename Function>
void for_each_range(unsigned long first, unsigned long last, const Function &function) {
pls::internal::scheduling::parallel_result<int> for_each_range(unsigned long first,
unsigned long last,
const Function &function) {
auto range = pls::internal::helpers::range(first, last);
for_each(range.begin(), range.end(), function);
return for_each(range.begin(), range.end(), function);
}
}
......
......@@ -247,9 +247,9 @@ class bounded_trading_deque {
deque_entry *entries_;
size_t num_entries_;
std::atomic<stamped_integer> top_{{0, 0}};
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<stamped_integer> top_{{0, 0}};
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<size_t> bot_{0};
std::atomic<size_t> bot_{0};
stamped_integer bot_internal_{0, 0};
};
......
......@@ -6,9 +6,10 @@
#include <easy/profiler.h>
#include <easy/arbitrary_value.h>
#define PROFILE_WORK_BLOCK(msg) EASY_BLOCK(msg, profiler::colors::LightGreen)
#define PROFILE_FORK_JOIN_STEALING(msg) EASY_BLOCK(msg, profiler::colors::LightBlue)
#define PROFILE_STEALING(msg) EASY_BLOCK(msg, profiler::colors::Blue)
#define PROFILE_TASK(msg) EASY_BLOCK(msg, profiler::colors::LightBlue)
#define PROFILE_CONTINUATION(msg) EASY_BLOCK(msg, profiler::colors::LightBlue)
#define PROFILE_FAST_PATH(msg) EASY_BLOCK(msg, profiler::colors::Green)
#define PROFILE_STEALING(msg) EASY_BLOCK(msg, profiler::colors::Orange)
#define PROFILE_LOCK(msg) EASY_BLOCK(msg, profiler::colors::Red)
#define PROFILE_END_BLOCK EASY_END_BLOCK
......@@ -21,8 +22,9 @@
#else //ENABLE_EASY_PROFILER
#define PROFILE_WORK_BLOCK(msg)
#define PROFILE_FORK_JOIN_STEALING(msg)
#define PROFILE_TASK(msg)
#define PROFILE_CONTINUATION(msg)
#define PROFILE_FAST_PATH(msg)
#define PROFILE_STEALING(msg)
#define PROFILE_LOCK(msg)
......
......@@ -11,6 +11,8 @@
#include "pls/internal/base/alignment.h"
#include "pls/internal/base/error_handling.h"
#include "pls/internal/helpers/profiler.h"
#include "parallel_result.h"
#include "memory_block.h"
......@@ -119,6 +121,7 @@ class cont : public base_cont {
task_{std::forward<T2ARGS>(task_2_args)..., this} {};
void execute() override {
PROFILE_CONTINUATION("execute_cont");
using result_type = decltype(function_((*left_result_).value(), (*right_result_).value()));
result_runner<result_type>::execute(*this);
......
......@@ -27,8 +27,7 @@ class memory_block {
memory_buffer_{memory_buffer},
memory_buffer_size_{memory_buffer_size},
memory_buffer_used_{false},
depth_{depth},
owner_{0} {};
depth_{depth} {};
template<typename T, typename ...ARGS>
T *place_in_buffer(ARGS &&...args) {
......@@ -82,13 +81,6 @@ class memory_block {
results_missing_.store(2);
}
void set_owner(int owner) {
owner_ = owner;
}
int get_owner() {
return owner_;
}
private:
// Linked list property of memory blocks (a complete list represents a threads currently owned memory).
// Each block knows its chain start to allow stealing a whole chain in O(1)
......@@ -120,9 +112,6 @@ class memory_block {
// Swapping parts of a memory chain will not reorder it, as always parts of
// the same size are exchanged.
const int depth_;
// TODO: Remove, debug only
int owner_;
};
}
......
......@@ -7,6 +7,8 @@
#include "pls/internal/scheduling/parallel_result.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/helpers/profiler.h"
namespace pls {
namespace internal {
namespace scheduling {
......@@ -32,6 +34,7 @@ struct scheduler::starter {
auto then(FCONT &&cont_function)
-> decltype(cont_function(std::declval<typename return_type_1::value_type>(),
std::declval<typename return_type_2::value_type>())) {
PROFILE_FAST_PATH("then");
using continuation_type = cont<task<F2>, return_type_1, return_type_2, FCONT>;
using result_type = decltype(cont_function(std::declval<typename return_type_1::value_type>(),
std::declval<typename return_type_2::value_type>()));
......@@ -49,7 +52,6 @@ struct scheduler::starter {
const bool is_right_cont = my_state.right_spawn_;
base_cont *parent_cont = my_state.parent_cont_;
current_memory_block->set_owner(my_state.get_id());
continuation_type *current_cont = current_memory_block->place_in_buffer<continuation_type>(parent_cont,
current_memory_block,
is_right_cont,
......@@ -63,7 +65,7 @@ struct scheduler::starter {
// Call first function on fast path
my_state.right_spawn_ = false;
return_type_1 result_1 = function_1_();
if (cont_manager.falling_through()) {
if (!result_1.fast_path()) {
// Get our replacement from the task stack and store it for later use when we are actually blocked.
auto traded_memory = my_state.get_task_manager().try_pop_local();
current_cont->get_memory_block()->get_offered_chain().store(*traded_memory);
......@@ -87,7 +89,7 @@ struct scheduler::starter {
} else {
my_state.right_spawn_ = true;
return_type_2 result_2 = function_2_();
if (cont_manager.falling_through()) {
if (!result_2.fast_path()) {
// Main scheduling loop is responsible for entering the result to the slow path...
current_cont->store_left_result(std::move(result_1));
current_cont->get_memory_block()->get_results_missing().fetch_add(-1);
......@@ -109,7 +111,7 @@ struct scheduler::starter {
my_state.right_spawn_ = is_right_cont;
auto cont_result = cont_function(result_1.value(), result_2.value());
if (cont_manager.falling_through()) {
if (!cont_result.fast_path()) {
// Unwind stack...
return result_type{};
}
......@@ -154,8 +156,6 @@ class scheduler::init_function_impl : public init_function {
template<typename Function>
void scheduler::perform_work(Function work_section) {
PROFILE_WORK_BLOCK("scheduler::perform_work")
// Prepare main root task
init_function_impl<Function> starter_function{work_section};
main_thread_starter_function_ = &starter_function;
......
......@@ -18,19 +18,27 @@ class scheduler_memory {
// By not having an initialization routine we can do our 'static and heap specialization'
// without running into any ordering problems in the initialization sequence.
// We first worried about performance of this being virtual.
// However, we decided that only thread_state_for is used during the
// runtime and that only when stealing. As stealing is expensive anyways,
// this should not add too much overhead.
protected:
thread_state **thread_states_array_{nullptr};
public:
virtual size_t max_threads() const = 0;
virtual base::thread &thread_for(size_t id) = 0;
virtual thread_state &thread_state_for(size_t id) = 0;
thread_state &thread_state_for(size_t id) {
return *thread_states_array_[id];
}
};
template<size_t MAX_THREADS, size_t NUM_TASKS, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
class static_scheduler_memory : public scheduler_memory {
public:
static_scheduler_memory() : scheduler_memory{} {
for (size_t i = 0; i < MAX_THREADS; i++) {
thread_state_pointers_[i] = &thread_states_[i].get_thread_state();
}
thread_states_array_ = thread_state_pointers_.data();
}
size_t max_threads() const override {
return MAX_THREADS;
}
......@@ -38,16 +46,12 @@ class static_scheduler_memory : public scheduler_memory {
base::thread &thread_for(size_t id) override {
return threads_[id];
}
thread_state &thread_state_for(size_t id) override {
return thread_states_[id].get_thread_state();
}
private:
using thread_state_type = thread_state_static<NUM_TASKS, NUM_CONTS, MAX_CONT_SIZE>;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<base::thread, MAX_THREADS> threads_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state_type, MAX_THREADS> thread_states_;
alignas(base::system_details::CACHE_LINE_SIZE) std::array<thread_state *, MAX_THREADS> thread_state_pointers_;
};
template<size_t NUM_TASKS, size_t MAX_TASK_STACK_SIZE, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
......@@ -55,14 +59,17 @@ class heap_scheduler_memory : public scheduler_memory {
public:
explicit heap_scheduler_memory(size_t max_threads) : max_threads_{max_threads},
thread_vector_{},
thread_state_vector_{} {
thread_state_vector_{},
thread_state_pointers_{} {
thread_vector_.reserve(max_threads);
thread_state_vector_.reserve(max_threads);
for (size_t i = 0; i < max_threads; i++) {
thread_vector_.emplace_back();
thread_state_vector_.emplace_back();
thread_state_pointers_.emplace_back(&thread_state_vector_[i].get_thread_state());
}
thread_states_array_ = thread_state_pointers_.data();
}
size_t max_threads() const override {
......@@ -72,11 +79,6 @@ class heap_scheduler_memory : public scheduler_memory {
base::thread &thread_for(size_t id) override {
return thread_vector_[id];
}
thread_state &thread_state_for(size_t id) override {
return thread_state_vector_[id].object().get_thread_state();
}
private:
using thread_state_type = thread_state_static<NUM_TASKS, NUM_CONTS, MAX_CONT_SIZE>;
// thread_state_type is aligned at the cache line and therefore overaligned (C++ 11 does not require
......@@ -88,6 +90,7 @@ class heap_scheduler_memory : public scheduler_memory {
size_t max_threads_;
std::vector<base::thread> thread_vector_;
std::vector<thread_state_wrapper> thread_state_vector_;
std::vector<thread_state *> thread_state_pointers_;
};
}
......
......@@ -4,6 +4,8 @@
#include "pls/internal/scheduling/cont.h"
#include "pls/internal/scheduling/memory_block.h"
#include "pls/internal/helpers/profiler.h"
namespace pls {
namespace internal {
namespace scheduling {
......@@ -47,6 +49,7 @@ class task : public base_task {
: base_task{cont}, function_{std::forward<FARG>(function)} {}
void execute_internal() override {
PROFILE_TASK("execute_internal")
auto result = function_();
if (result.fast_path()) {
cont_->store_right_result(std::move(result));
......
......@@ -30,20 +30,17 @@ class task_manager {
public:
// Publishes a task on the stack, i.e. makes it visible for other threads to steal.
void publish_task(base_task *task) {
// std::lock_guard<base::spin_lock> lock{lock_};
task_deque_.push_bot(task->get_cont()->get_memory_block());
}
// Try to pop a local task from this task managers stack.
data_structures::optional<memory_block *> try_pop_local() {
// std::lock_guard<base::spin_lock> lock{lock_};
return task_deque_.pop_bot().traded_;
}
// Try to steal a task from a remote task_manager instance. The stolen task must be stored locally.
// Returns a pair containing the actual task and if the steal was successful.
base_task *steal_remote_task(cont_manager &stealing_cont_manager) {
// std::lock_guard<base::spin_lock> lock{lock_};
auto peek = task_deque_.peek_top();
if (std::get<0>(peek)) {
......@@ -73,7 +70,6 @@ class task_manager {
private:
data_structures::bounded_trading_deque<memory_block, memory_block> &task_deque_;
base::spin_lock lock_{};
};
template<size_t NUM_TASKS>
......
......@@ -5,6 +5,8 @@
#include "pls/internal/scheduling/task_manager.h"
#include "pls/internal/scheduling/cont_manager.h"
#include "pls/internal/base/system_details.h"
#include "thread_state.h"
namespace pls {
......@@ -12,7 +14,7 @@ namespace internal {
namespace scheduling {
template<size_t NUM_TASKS, size_t NUM_CONTS, size_t MAX_CONT_SIZE>
struct thread_state_static {
struct alignas(base::system_details::CACHE_LINE_SIZE) thread_state_static {
public:
thread_state_static()
: static_task_manager_{},
......@@ -21,9 +23,9 @@ struct thread_state_static {
thread_state &get_thread_state() { return thread_state_; }
private:
static_task_manager<NUM_TASKS> static_task_manager_;
static_cont_manager<NUM_CONTS, MAX_CONT_SIZE> static_cont_manager_;
thread_state thread_state_;
alignas(base::system_details::CACHE_LINE_SIZE) static_task_manager<NUM_TASKS> static_task_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) static_cont_manager<NUM_CONTS, MAX_CONT_SIZE> static_cont_manager_;
alignas(base::system_details::CACHE_LINE_SIZE) thread_state thread_state_;
};
}
......
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/task.h"
#include "pls/internal/base/thread.h"
#include "pls/internal/base/error_handling.h"
#include "pls/internal/helpers/profiler.h"
namespace pls {
namespace internal {
......@@ -74,17 +75,15 @@ void scheduler::work_thread_work_section() {
// Steal Routine (will be continuously executed when there are no more fall through's).
// TODO: move into separate function
const size_t offset = my_state.random_() % num_threads;
const size_t max_tries = num_threads - 1;
const size_t max_tries = num_threads;
for (size_t i = 0; i < max_tries; i++) {
size_t target = (offset + i) % num_threads;
// Skip our self for stealing
target = ((target == my_id) + target) % num_threads;
auto &target_state = my_state.get_scheduler().thread_state_for(target);
PLS_ASSERT(my_cont_manager.is_clean(), "Only steal with clean chain!");
PROFILE_STEALING("steal")
auto *stolen_task = target_state.get_task_manager().steal_remote_task(my_cont_manager);
PROFILE_END_BLOCK;
if (stolen_task != nullptr) {
my_state.parent_cont_ = stolen_task->get_cont();
my_state.right_spawn_ = true;
......@@ -97,6 +96,9 @@ void scheduler::work_thread_work_section() {
}
}
}
// if (!my_cont_manager.falling_through()) {
// base::this_thread::sleep(5);
// }
} while (!work_section_done_);
PLS_ASSERT(my_cont_manager.is_clean(), "Only finish work section with clean chain!");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment