Commit 6148bd1e by FritzFlorian

Split profiling code into cpp files.

parent aeb55d12
Pipeline #1475 passed with stages
in 3 minutes 47 seconds
......@@ -39,7 +39,11 @@ add_library(pls STATIC
include/pls/internal/scheduling/lock_free/task.h
include/pls/internal/scheduling/lock_free/task_manager.h src/internal/scheduling/lock_free/task_manager.cpp
include/pls/internal/scheduling/lock_free/external_trading_deque.h src/internal/scheduling/lock_free/external_trading_deque.cpp
include/pls/internal/scheduling/lock_free/traded_cas_field.h src/internal/scheduling/lock_free/task.cpp include/pls/internal/profiling/dag_node.h include/pls/internal/profiling/profiler.h include/pls/internal/profiling/thread_stats.h)
include/pls/internal/scheduling/lock_free/traded_cas_field.h src/internal/scheduling/lock_free/task.cpp
include/pls/internal/profiling/dag_node.h src/internal/profiling/dag_node.cpp
include/pls/internal/profiling/profiler.h
include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp src/internal/profiling/profiler.cpp)
# Dependencies for pls
target_link_libraries(pls Threads::Threads)
......
......@@ -5,7 +5,6 @@
#include <memory>
#include <list>
#include <algorithm>
#include <iostream>
namespace pls::internal::profiling {
......@@ -24,69 +23,11 @@ struct dag_node {
return reinterpret_cast<base::system_details::pointer_t >(this);
}
void dag_compact() {
while (next_node_ && next_node_->child_nodes_.empty()) {
max_memory_ = std::max(max_memory_, next_node_->max_memory_);
total_runtime_ += next_node_->total_runtime_;
next_node_ = std::move(next_node_->next_node_);
}
if (next_node_) {
next_node_->dag_compact();
}
for (auto &child : child_nodes_) {
child.dag_compact();
}
}
void dag_print(std::ostream &stream, unsigned rank) {
stream << node_print_id() << " [label=" << spawning_thread_id_ << ", rank=" << rank << "];" << std::endl;
for (auto &child : child_nodes_) {
child.dag_print(stream, rank + 1);
stream << node_print_id() << " -> " << child.node_print_id() << ";" << std::endl;
}
if (next_node_) {
next_node_->dag_print(stream, rank);
stream << node_print_id() << " -> " << next_node_->node_print_id() << ";" << std::endl;
}
}
unsigned dag_max_memory() {
unsigned max = max_memory_;
if (next_node_) {
max = std::max(max, next_node_->dag_max_memory());
}
for (auto &child : child_nodes_) {
max = std::max(max, child.dag_max_memory());
}
return max;
}
unsigned long dag_total_user_time() {
unsigned long total_user_time = total_runtime_;
if (next_node_) {
total_user_time += next_node_->dag_total_user_time();
}
for (auto &child : child_nodes_) {
total_user_time += child.dag_total_user_time();
}
return total_user_time;
}
unsigned long dag_critical_path() {
unsigned long critical_path = total_runtime_;
if (next_node_) {
critical_path += next_node_->dag_critical_path();
}
unsigned long slowest_child = 0;
for (auto &child : child_nodes_) {
slowest_child = std::max(slowest_child, child.dag_critical_path());
}
critical_path += slowest_child;
return critical_path;
}
void dag_compact();
void dag_print(std::ostream &stream, unsigned rank);
unsigned dag_max_memory();
unsigned long dag_total_user_time();
unsigned long dag_critical_path();
};
}
......
......@@ -2,7 +2,9 @@
#ifndef PLS_INTERNAL_PROFILING_PROFILER_H_
#define PLS_INTERNAL_PROFILING_PROFILER_H_
#ifndef PLS_PROFILING_ENABLED
#define PLS_PROFILING_ENABLED false
#endif
#include <memory>
#include <chrono>
......@@ -35,169 +37,32 @@ class profiler {
return std::chrono::duration_cast<display_resolution>(measurement_duration).count();
}
void print_stats(unsigned num_threads) const {
root_node_->dag_compact();
auto run_duration = std::chrono::duration_cast<measurement_resolution>(end_time_ - start_time_).count();
std::cout << "===========================" << std::endl;
std::cout << "WALL TIME: " << m_to_d(run_duration) << std::endl;
unsigned long total_user_time = root_node_->dag_total_user_time();
std::cout << "USER TIME: " << m_to_d(total_user_time) << std::endl;
unsigned long critical_path_time = root_node_->dag_critical_path();
std::cout << "CRITICAL TIME: " << m_to_d(critical_path_time) << std::endl;
std::cout << "MAX SPEEDUP:" << (double) total_user_time / (double) critical_path_time << std::endl;
unsigned long total_failed_steals = 0;
unsigned long total_successful_steals = 0;
unsigned long total_steal_time = 0;
for (auto &thread_stats : per_thread_stats_) {
total_failed_steals += thread_stats.failed_steals_;
total_successful_steals += thread_stats.successful_steals_;
total_steal_time += thread_stats.total_time_stealing_;
}
std::cout << "STEALS: (Time " << m_to_d(total_steal_time)
<< ", Total: " << (total_successful_steals + total_failed_steals)
<< ", Success: " << total_successful_steals
<< ", Failed: " << total_failed_steals << ")" << std::endl;
unsigned long total_measured = total_steal_time + total_user_time;
unsigned long total_wall = run_duration * num_threads;
std::cout << "Wall Time vs. Measured: " << 100.0 * total_measured / total_wall << std::endl;
std::cout << "MEMORY: " << root_node_->dag_max_memory() << " bytes per stack" << std::endl;
// TODO: re-add graph printing
// std::cout << "digraph {" << std::endl;
// root_node_->dag_print(std::cout, 0);
// std::cout << "}" << std::endl;
}
void print_stats(unsigned num_threads) const;
};
public:
profiler(unsigned num_threads) : num_threads_{num_threads},
profiler_runs_() {}
dag_node *start_profiler_run() {
profiler_run &current_run = profiler_runs_.emplace_back(num_threads_);
current_run.start_time_ = clock::now();
return current_run.root_node_.get();
}
void stop_profiler_run() {
current_run().end_time_ = clock::now();
current_run().print_stats(num_threads_);
}
void stealing_start(unsigned thread_id) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
thread_stats.stealing_start_time_ = clock::now();
thread_stats.total_steals_++;
});
}
void stealing_end(unsigned thread_id, bool success) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
thread_stats.failed_steals_ += !success;
thread_stats.successful_steals_ += success;
auto end_time = clock::now();
auto
steal_duration =
std::chrono::duration_cast<measurement_resolution>(end_time - thread_stats.stealing_start_time_).count();
thread_stats.total_time_stealing_ += steal_duration;
});
}
void stealing_cas_op(unsigned thread_id) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
thread_stats.steal_cas_ops_++;
});
}
dag_node *task_spawn_child(unsigned thread_id, dag_node *parent) {
auto &thread_stats = thread_stats_for(thread_id);
dag_node *result;
thread_stats.run_on_stack([&] {
result = &parent->child_nodes_.emplace_back(thread_id);
});
return result;
}
dag_node *task_sync(unsigned thread_id, dag_node *synced) {
auto &thread_stats = thread_stats_for(thread_id);
dag_node *start_profiler_run();
void stop_profiler_run();
dag_node *result;
thread_stats.run_on_stack([&] {
synced->next_node_ = std::make_unique<dag_node>(thread_id);
result = synced->next_node_.get();
});
return result;
}
void task_start_running(unsigned thread_id, dag_node *in_node) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
thread_stats.task_run_start_time = clock::now();
});
}
void task_stop_running(unsigned thread_id, dag_node *in_node) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
auto end_time = clock::now();
auto user_code_duration =
std::chrono::duration_cast<measurement_resolution>(end_time - thread_stats.task_run_start_time).count();
in_node->total_runtime_ += user_code_duration;
});
}
static constexpr char MAGIC_BYTES[] = {'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B'};
void task_prepare_stack_measure(unsigned thread_id, char *stack_memory, size_t stack_size) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
for (size_t i = 0; i < stack_size - sizeof(MAGIC_BYTES); i += sizeof(MAGIC_BYTES)) {
for (size_t j = 0; j < sizeof(MAGIC_BYTES); j++) {
stack_memory[i + j] = MAGIC_BYTES[j];
}
}
});
}
void task_finish_stack_measure(unsigned thread_id, char *stack_memory, size_t stack_size, dag_node *in_node) {
auto &thread_stats = thread_stats_for(thread_id);
void stealing_start(unsigned thread_id);
void stealing_end(unsigned thread_id, bool success);
void stealing_cas_op(unsigned thread_id);
thread_stats.run_on_stack([&] {
for (size_t i = 0; i < stack_size - sizeof(MAGIC_BYTES); i += sizeof(MAGIC_BYTES)) {
bool section_clean = true;
for (size_t j = 0; j < sizeof(MAGIC_BYTES); j++) {
if (stack_memory[i + j] != MAGIC_BYTES[j]) {
section_clean = false;
}
}
in_node->max_memory_ = stack_size - i + sizeof(MAGIC_BYTES);
if (!section_clean) {
return;
}
}
});
}
dag_node *task_spawn_child(unsigned thread_id, dag_node *parent);
dag_node *task_sync(unsigned thread_id, dag_node *synced);
void task_start_running(unsigned thread_id, dag_node *in_node);
void task_stop_running(unsigned thread_id, dag_node *in_node);
void task_prepare_stack_measure(unsigned thread_id, char *stack_memory, size_t stack_size);
void task_finish_stack_measure(unsigned thread_id, char *stack_memory, size_t stack_size, dag_node *in_node);
private:
static constexpr char MAGIC_BYTES[] = {'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B'};
profiler_run &current_run() {
return profiler_runs_[profiler_runs_.size() - 1];
}
thread_stats &thread_stats_for(unsigned thread_id) {
return current_run().per_thread_stats_[thread_id];
}
......
......@@ -26,7 +26,7 @@ struct PLS_CACHE_ALIGN thread_stats {
void run_on_stack(const Function function) {
context_switcher::enter_context(stack_, STACK_SIZE, [function](auto cont) {
function();
return std::move(cont);
return cont;
});
}
......
......@@ -31,7 +31,7 @@ scheduler::scheduler(unsigned int num_threads,
terminated_{false},
stack_allocator_{std::make_shared<ALLOC>(std::forward<ALLOC>(stack_allocator))}
#if PLS_PROFILING_ENABLED
, profiler_{num_threads}
, profiler_{num_threads}
#endif
{
......@@ -200,7 +200,7 @@ void scheduler::spawn(Function &&lambda) {
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
spawned_task->profiling_node_);
#endif
return std::move(continuation);
return continuation;
}
});
......
#include "pls/internal/profiling/dag_node.h"
#include <algorithm>
namespace pls::internal::profiling {
void dag_node::dag_compact() {
while (next_node_ && next_node_->child_nodes_.empty()) {
max_memory_ = std::max(max_memory_, next_node_->max_memory_);
total_runtime_ += next_node_->total_runtime_;
next_node_ = std::move(next_node_->next_node_);
}
if (next_node_) {
next_node_->dag_compact();
}
for (auto &child : child_nodes_) {
child.dag_compact();
}
}
void dag_node::dag_print(std::ostream &stream, unsigned rank) {
stream << node_print_id() << " [label=" << spawning_thread_id_ << ", rank=" << rank << "];" << std::endl;
for (auto &child : child_nodes_) {
child.dag_print(stream, rank + 1);
stream << node_print_id() << " -> " << child.node_print_id() << ";" << std::endl;
}
if (next_node_) {
next_node_->dag_print(stream, rank);
stream << node_print_id() << " -> " << next_node_->node_print_id() << ";" << std::endl;
}
}
unsigned dag_node::dag_max_memory() {
unsigned max = max_memory_;
if (next_node_) {
max = std::max(max, next_node_->dag_max_memory());
}
for (auto &child : child_nodes_) {
max = std::max(max, child.dag_max_memory());
}
return max;
}
unsigned long dag_node::dag_total_user_time() {
unsigned long total_user_time = total_runtime_;
if (next_node_) {
total_user_time += next_node_->dag_total_user_time();
}
for (auto &child : child_nodes_) {
total_user_time += child.dag_total_user_time();
}
return total_user_time;
}
unsigned long dag_node::dag_critical_path() {
unsigned long critical_path = total_runtime_;
if (next_node_) {
critical_path += next_node_->dag_critical_path();
}
unsigned long slowest_child = 0;
for (auto &child : child_nodes_) {
slowest_child = std::max(slowest_child, child.dag_critical_path());
}
critical_path += slowest_child;
return critical_path;
}
}
#include "pls/internal/profiling/profiler.h"
namespace pls::internal::profiling {
void profiler::profiler_run::print_stats(unsigned num_threads) const {
root_node_->dag_compact();
auto run_duration = std::chrono::duration_cast<measurement_resolution>(end_time_ - start_time_).count();
std::cout << "===========================" << std::endl;
std::cout << "WALL TIME: " << m_to_d(run_duration) << std::endl;
unsigned long total_user_time = root_node_->dag_total_user_time();
std::cout << "USER TIME: " << m_to_d(total_user_time) << std::endl;
unsigned long critical_path_time = root_node_->dag_critical_path();
std::cout << "CRITICAL TIME: " << m_to_d(critical_path_time) << std::endl;
std::cout << "MAX SPEEDUP:" << (double) total_user_time / (double) critical_path_time << std::endl;
unsigned long total_failed_steals = 0;
unsigned long total_successful_steals = 0;
unsigned long total_steal_time = 0;
for (auto &thread_stats : per_thread_stats_) {
total_failed_steals += thread_stats.failed_steals_;
total_successful_steals += thread_stats.successful_steals_;
total_steal_time += thread_stats.total_time_stealing_;
}
std::cout << "STEALS: (Time " << m_to_d(total_steal_time)
<< ", Total: " << (total_successful_steals + total_failed_steals)
<< ", Success: " << total_successful_steals
<< ", Failed: " << total_failed_steals << ")" << std::endl;
unsigned long total_measured = total_steal_time + total_user_time;
unsigned long total_wall = run_duration * num_threads;
std::cout << "Wall Time vs. Measured: " << 100.0 * total_measured / total_wall << std::endl;
std::cout << "MEMORY: " << root_node_->dag_max_memory() << " bytes per stack" << std::endl;
// TODO: re-add graph printing
// std::cout << "digraph {" << std::endl;
// root_node_->dag_print(std::cout, 0);
// std::cout << "}" << std::endl;
}
dag_node *profiler::start_profiler_run() {
profiler_run &current_run = profiler_runs_.emplace_back(num_threads_);
current_run.start_time_ = clock::now();
return current_run.root_node_.get();
}
void profiler::stop_profiler_run() {
current_run().end_time_ = clock::now();
current_run().print_stats(num_threads_);
}
void profiler::stealing_start(unsigned thread_id) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
thread_stats.stealing_start_time_ = clock::now();
thread_stats.total_steals_++;
});
}
void profiler::stealing_end(unsigned thread_id, bool success) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
thread_stats.failed_steals_ += !success;
thread_stats.successful_steals_ += success;
auto end_time = clock::now();
auto
steal_duration =
std::chrono::duration_cast<measurement_resolution>(end_time - thread_stats.stealing_start_time_).count();
thread_stats.total_time_stealing_ += steal_duration;
});
}
void profiler::stealing_cas_op(unsigned thread_id) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
thread_stats.steal_cas_ops_++;
});
}
dag_node *profiler::task_spawn_child(unsigned thread_id, dag_node *parent) {
auto &thread_stats = thread_stats_for(thread_id);
dag_node *result;
thread_stats.run_on_stack([&] {
result = &parent->child_nodes_.emplace_back(thread_id);
});
return result;
}
dag_node *profiler::task_sync(unsigned thread_id, dag_node *synced) {
auto &thread_stats = thread_stats_for(thread_id);
dag_node *result;
thread_stats.run_on_stack([&] {
synced->next_node_ = std::make_unique<dag_node>(thread_id);
result = synced->next_node_.get();
});
return result;
}
void profiler::task_start_running(unsigned thread_id, dag_node *in_node) {
(void) in_node; // unused, keep for 'symmetric' API
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
thread_stats.task_run_start_time = clock::now();
});
}
void profiler::task_stop_running(unsigned thread_id, dag_node *in_node) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
auto end_time = clock::now();
auto user_code_duration =
std::chrono::duration_cast<measurement_resolution>(end_time - thread_stats.task_run_start_time).count();
in_node->total_runtime_ += user_code_duration;
});
}
void profiler::task_prepare_stack_measure(unsigned thread_id, char *stack_memory, size_t stack_size) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
for (size_t i = 0; i < stack_size - sizeof(MAGIC_BYTES); i += sizeof(MAGIC_BYTES)) {
for (size_t j = 0; j < sizeof(MAGIC_BYTES); j++) {
stack_memory[i + j] = MAGIC_BYTES[j];
}
}
});
}
void profiler::task_finish_stack_measure(unsigned thread_id, char *stack_memory, size_t stack_size, dag_node *in_node) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
for (size_t i = 0; i < stack_size - sizeof(MAGIC_BYTES); i += sizeof(MAGIC_BYTES)) {
bool section_clean = true;
for (size_t j = 0; j < sizeof(MAGIC_BYTES); j++) {
if (stack_memory[i + j] != MAGIC_BYTES[j]) {
section_clean = false;
}
}
in_node->max_memory_ = stack_size - i + sizeof(MAGIC_BYTES);
if (!section_clean) {
return;
}
}
});
}
}
#include "pls/internal/profiling/thread_stats.h"
......@@ -182,7 +182,7 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat
// Jump back to the continuation in main scheduling loop.
context_switcher::continuation result_cont = std::move(thread_state::get().main_continuation());
PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
return std::move(result_cont);
return result_cont;
} else {
// Make sure that we are owner of this full continuation/task chain.
last_task->next_ = this_task;
......@@ -194,7 +194,7 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat
// Jump to parent task and continue working on it.
context_switcher::continuation result_cont = std::move(last_task->continuation_);
PLS_ASSERT(result_cont.valid(), "Must return a valid continuation.");
return std::move(result_cont);
return result_cont;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment