Commit 3202323c by FritzFlorian

Finish up DAG profiler.

parent 6148bd1e
Pipeline #1476 passed with stages
in 4 minutes 40 seconds
......@@ -44,11 +44,15 @@ int main(int argc, char **argv) {
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
volatile int res;
scheduler.get_profiler().disable_memory_measure();
runner.run_iterations(fib::NUM_ITERATIONS, [&]() {
scheduler.perform_work([&]() {
res = pls_fib(fib::INPUT_N);
});
}, fib::NUM_WARMUP_ITERATIONS);
scheduler.get_profiler().current_run().print_dag(std::cout);
scheduler.get_profiler().current_run().print_stats();
runner.commit_results(true);
return 0;
......
......@@ -4,6 +4,7 @@ using namespace pls;
#include "benchmark_runner.h"
#include "benchmark_base/matrix.h"
#include <iostream>
using namespace comparison_benchmarks::base;
......@@ -37,10 +38,14 @@ int main(int argc, char **argv) {
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
scheduler.get_profiler().disable_memory_measure();
runner.run_iterations(matrix::NUM_ITERATIONS, [&]() {
scheduler.perform_work([&]() {
result.multiply(a, b);
});
}, matrix::WARMUP_ITERATIONS);
scheduler.get_profiler().current_run().print_dag(std::cout);
scheduler.get_profiler().current_run().print_stats();
runner.commit_results(true);
}
......@@ -6,10 +6,19 @@
#include <memory>
#include <list>
#include <iostream>
#include <chrono>
namespace pls::internal::profiling {
struct dag_node {
using clock = std::chrono::steady_clock;
using measurement_resolution = std::chrono::nanoseconds;
using display_resolution = std::chrono::microseconds;
static unsigned long m_to_d(unsigned long duration) {
measurement_resolution measurement_duration{duration};
return std::chrono::duration_cast<display_resolution>(measurement_duration).count();
}
dag_node(unsigned spawning_thread_id) : spawning_thread_id_{spawning_thread_id} {};
unsigned spawning_thread_id_;
......@@ -28,6 +37,7 @@ struct dag_node {
unsigned dag_max_memory();
unsigned long dag_total_user_time();
unsigned long dag_critical_path();
unsigned long dag_depth();
};
}
......
......@@ -3,7 +3,7 @@
#define PLS_INTERNAL_PROFILING_PROFILER_H_
#ifndef PLS_PROFILING_ENABLED
#define PLS_PROFILING_ENABLED false
#define PLS_PROFILING_ENABLED true
#endif
#include <memory>
......@@ -20,29 +20,60 @@ class profiler {
using clock = std::chrono::steady_clock;
using measurement_resolution = std::chrono::nanoseconds;
using display_resolution = std::chrono::microseconds;
static unsigned long m_to_d(unsigned long duration) {
measurement_resolution measurement_duration{duration};
return std::chrono::duration_cast<display_resolution>(measurement_duration).count();
}
struct profiler_run {
profiler_run(unsigned num_threads) : start_time_{},
end_time_{},
root_node_{std::make_unique<dag_node>(0)},
per_thread_stats_(num_threads) {}
per_thread_stats_(num_threads),
num_threads_{num_threads} {}
// Runtime stats
clock::time_point start_time_;
clock::time_point end_time_;
std::unique_ptr<dag_node> root_node_;
std::vector<thread_stats> per_thread_stats_;
static unsigned long m_to_d(unsigned long duration) {
measurement_resolution measurement_duration{duration};
return std::chrono::duration_cast<display_resolution>(measurement_duration).count();
}
// Collective stats
unsigned num_threads_;
unsigned long wall_time_;
unsigned long t_1_;
unsigned long t_inf_;
unsigned long steals_failed_;
unsigned long steals_successful_;
unsigned long steals_cas_ops_;
unsigned long steals_time_;
void print_stats(unsigned num_threads) const;
unsigned long max_memory_per_stack_;
unsigned long spawn_depth_;
void calculate_stats();
void print_stats() const;
void print_dag(std::ostream &stream);
};
public:
profiler(unsigned num_threads) : num_threads_{num_threads},
profiler_runs_() {}
explicit profiler(unsigned num_threads) : num_threads_{num_threads},
capture_memory_{true},
capture_time_{true},
profiler_runs_() {
for (unsigned i = 0; i < num_threads_; i++) {
stacks_.push_back(stack_allocator_.allocate_stack(STACK_SIZE));
}
}
~profiler() {
for (unsigned i = 0; i < num_threads_; i++) {
stack_allocator_.free_stack(STACK_SIZE, stacks_[i]);
}
}
dag_node *start_profiler_run();
void stop_profiler_run();
......@@ -58,8 +89,6 @@ class profiler {
void task_prepare_stack_measure(unsigned thread_id, char *stack_memory, size_t stack_size);
void task_finish_stack_measure(unsigned thread_id, char *stack_memory, size_t stack_size, dag_node *in_node);
private:
static constexpr char MAGIC_BYTES[] = {'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B'};
profiler_run &current_run() {
return profiler_runs_[profiler_runs_.size() - 1];
}
......@@ -67,8 +96,39 @@ class profiler {
return current_run().per_thread_stats_[thread_id];
}
void disable_time_measure() {
capture_time_ = false;
}
void enable_time_measure() {
capture_time_ = true;
}
void disable_memory_measure() {
capture_memory_ = false;
}
void enable_memory_measure() {
capture_memory_ = true;
}
private:
static constexpr char MAGIC_BYTES[] = {'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B'};
unsigned num_threads_;
bool capture_memory_;
bool capture_time_;
std::vector<profiler_run> profiler_runs_;
// Stacks to run the profiler code to not influence profiled stacks.
static constexpr size_t STACK_SIZE = 4096 * 4;
base::mmap_stack_allocator stack_allocator_;
std::vector<char *> stacks_;
template<typename Function>
void run_on_stack(unsigned thread_id, const Function function) {
context_switcher::enter_context(stacks_[thread_id], STACK_SIZE, [this, thread_id, function](auto cont) {
function(thread_stats_for(thread_id));
return cont;
});
}
};
}
......
......@@ -10,26 +10,11 @@
#include "pls/internal/base/system_details.h"
#include "pls/internal/base/stack_allocator.h"
#include "pls/internal/profiling/dag_node.h"
namespace pls::internal::profiling {
struct PLS_CACHE_ALIGN thread_stats {
static constexpr size_t STACK_SIZE = 4096 * 4;
thread_stats() {
stack_ = stack_allocator_.allocate_stack(STACK_SIZE);
}
~thread_stats() {
stack_allocator_.free_stack(STACK_SIZE, stack_);
}
template<typename Function>
void run_on_stack(const Function function) {
context_switcher::enter_context(stack_, STACK_SIZE, [function](auto cont) {
function();
return cont;
});
}
using clock = std::chrono::steady_clock;
unsigned long total_steals_{0};
......@@ -40,10 +25,7 @@ struct PLS_CACHE_ALIGN thread_stats {
unsigned long total_time_stealing_{0};
clock::time_point stealing_start_time_;
clock::time_point task_run_start_time;
base::mmap_stack_allocator stack_allocator_;
char *stack_;
clock::time_point task_run_start_time_;
};
}
......
......@@ -99,6 +99,12 @@ class scheduler {
thread_state &thread_state_for(unsigned int thread_id) { return *thread_states_[thread_id]; }
task_manager &task_manager_for(unsigned int thread_id) { return *task_managers_[thread_id]; }
#if PLS_PROFILING_ENABLED
profiling::profiler &get_profiler() {
return profiler_;
}
#endif
private:
static context_switcher::continuation slow_return(thread_state &calling_state);
......
......@@ -31,7 +31,7 @@ scheduler::scheduler(unsigned int num_threads,
terminated_{false},
stack_allocator_{std::make_shared<ALLOC>(std::forward<ALLOC>(stack_allocator))}
#if PLS_PROFILING_ENABLED
, profiler_{num_threads}
, profiler_{num_threads}
#endif
{
......@@ -87,18 +87,16 @@ class scheduler::init_function_impl : public init_function {
PLS_ASSERT(thread_state::get().main_continuation().valid(), "Must return valid continuation from main task.");
#if PLS_PROFILING_ENABLED
thread_state::get().get_scheduler().profiler_.task_stop_running(thread_state::get().get_thread_id(),
root_task->profiling_node_);
#endif
return std::move(thread_state::get().main_continuation());
});
#if PLS_PROFILING_ENABLED
thread_state::get().get_scheduler().profiler_.task_finish_stack_measure(thread_state::get().get_thread_id(),
root_task->stack_memory_,
root_task->stack_size_,
root_task->profiling_node_);
thread_state::get().get_scheduler().profiler_.task_stop_running(thread_state::get().get_thread_id(),
root_task->profiling_node_);
#endif
return std::move(thread_state::get().main_continuation());
});
}
private:
F &function_;
......@@ -141,7 +139,6 @@ void scheduler::spawn(Function &&lambda) {
base_task *spawned_task = last_task->next_;
#if PLS_PROFILING_ENABLED
// Memory and DAG nodes
spawning_state.get_scheduler().profiler_.task_prepare_stack_measure(spawning_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_);
......@@ -189,24 +186,32 @@ void scheduler::spawn(Function &&lambda) {
syncing_state.set_active_task(last_task);
#if PLS_PROFILING_ENABLED
syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_,
spawned_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
spawned_task->profiling_node_);
#endif
return std::move(last_task->continuation_);
} else {
// Slow path, the last task was stolen. This path is common to sync() events.
auto continuation = slow_return(syncing_state);
#if PLS_PROFILING_ENABLED
syncing_state.get_scheduler().profiler_.task_finish_stack_measure(syncing_state.get_thread_id(),
spawned_task->stack_memory_,
spawned_task->stack_size_,
spawned_task->profiling_node_);
syncing_state.get_scheduler().profiler_.task_stop_running(syncing_state.get_thread_id(),
spawned_task->profiling_node_);
#endif
auto continuation = slow_return(syncing_state);
return continuation;
}
});
#if PLS_PROFILING_ENABLED
thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(),
thread_state::get().get_active_task()->profiling_node_);
last_task->profiling_node_);
#endif
if (continuation.valid()) {
......
......@@ -20,7 +20,11 @@ void dag_node::dag_compact() {
}
void dag_node::dag_print(std::ostream &stream, unsigned rank) {
stream << node_print_id() << " [label=" << spawning_thread_id_ << ", rank=" << rank << "];" << std::endl;
stream << node_print_id()
<< " [label=\"" << spawning_thread_id_ << "\n"
<< max_memory_ << " bytes\n"
<< m_to_d(total_runtime_) << " us\""
<< " ,rank=" << rank << "];" << std::endl;
for (auto &child : child_nodes_) {
child.dag_print(stream, rank + 1);
stream << node_print_id() << " -> " << child.node_print_id() << ";" << std::endl;
......@@ -68,4 +72,18 @@ unsigned long dag_node::dag_critical_path() {
return critical_path;
}
unsigned long dag_node::dag_depth() {
unsigned long deepest_at_level = 1;
if (next_node_) {
deepest_at_level = std::max(deepest_at_level, next_node_->dag_depth());
}
unsigned long deepest_child = 0;
for (auto &child : child_nodes_) {
deepest_child = std::max(deepest_child, child.dag_depth());
}
return std::max(deepest_child + 1, deepest_at_level);
}
}
#include "pls/internal/profiling/profiler.h"
namespace pls::internal::profiling {
void profiler::profiler_run::print_stats(unsigned num_threads) const {
void profiler::profiler_run::calculate_stats() {
root_node_->dag_compact();
auto run_duration = std::chrono::duration_cast<measurement_resolution>(end_time_ - start_time_).count();
std::cout << "===========================" << std::endl;
std::cout << "WALL TIME: " << m_to_d(run_duration) << std::endl;
unsigned long total_user_time = root_node_->dag_total_user_time();
std::cout << "USER TIME: " << m_to_d(total_user_time) << std::endl;
unsigned long critical_path_time = root_node_->dag_critical_path();
std::cout << "CRITICAL TIME: " << m_to_d(critical_path_time) << std::endl;
std::cout << "MAX SPEEDUP:" << (double) total_user_time / (double) critical_path_time << std::endl;
unsigned long total_failed_steals = 0;
unsigned long total_successful_steals = 0;
unsigned long total_steal_time = 0;
wall_time_ = std::chrono::duration_cast<measurement_resolution>(end_time_ - start_time_).count();
t_1_ = root_node_->dag_total_user_time();
t_inf_ = root_node_->dag_critical_path();
steals_failed_ = 0;
steals_successful_ = 0;
steals_cas_ops_ = 0;
steals_time_ = 0;
for (auto &thread_stats : per_thread_stats_) {
total_failed_steals += thread_stats.failed_steals_;
total_successful_steals += thread_stats.successful_steals_;
total_steal_time += thread_stats.total_time_stealing_;
steals_failed_ += thread_stats.failed_steals_;
steals_successful_ += thread_stats.successful_steals_;
steals_cas_ops_ += thread_stats.steal_cas_ops_;
steals_time_ += thread_stats.total_time_stealing_;
}
std::cout << "STEALS: (Time " << m_to_d(total_steal_time)
<< ", Total: " << (total_successful_steals + total_failed_steals)
<< ", Success: " << total_successful_steals
<< ", Failed: " << total_failed_steals << ")" << std::endl;
unsigned long total_measured = total_steal_time + total_user_time;
unsigned long total_wall = run_duration * num_threads;
std::cout << "Wall Time vs. Measured: " << 100.0 * total_measured / total_wall << std::endl;
max_memory_per_stack_ = root_node_->dag_max_memory();
spawn_depth_ = root_node_->dag_depth();
}
std::cout << "MEMORY: " << root_node_->dag_max_memory() << " bytes per stack" << std::endl;
void profiler::profiler_run::print_stats() const {
// TODO: re-add graph printing
std::cout << "===========================" << std::endl;
std::cout << "WALL TIME: " << m_to_d(wall_time_) << std::endl;
std::cout << "USER TIME: " << m_to_d(t_1_) << std::endl;
std::cout << "CRITICAL TIME: " << m_to_d(t_inf_) << std::endl;
std::cout << "MAX SPEEDUP:" << (double) t_1_ / (double) t_inf_ << std::endl;
std::cout << "STEALS: (Time " << m_to_d(steals_time_)
<< ", Total: " << (steals_failed_ + steals_successful_)
<< ", Success: " << steals_successful_
<< ", Failed: " << steals_failed_
<< ", CAS: " << steals_cas_ops_ << ")" << std::endl;
unsigned long total_measured = steals_time_ + t_1_;
unsigned long total_wall = wall_time_ * num_threads_;
std::cout << "Wall Time vs. Measured: " << 100.0 * total_measured / total_wall << std::endl;
std::cout << "MEMORY: " << max_memory_per_stack_ << " bytes per stack" << std::endl;
std::cout << "DEPTH:" << spawn_depth_ << std::endl;
}
// std::cout << "digraph {" << std::endl;
// root_node_->dag_print(std::cout, 0);
// std::cout << "}" << std::endl;
void profiler::profiler_run::print_dag(std::ostream &stream) {
stream << "digraph {" << std::endl;
root_node_->dag_print(std::cout, 0);
stream << "}" << std::endl;
}
dag_node *profiler::start_profiler_run() {
......@@ -47,56 +61,52 @@ dag_node *profiler::start_profiler_run() {
void profiler::stop_profiler_run() {
current_run().end_time_ = clock::now();
current_run().print_stats(num_threads_);
current_run().root_node_->dag_compact();
current_run().calculate_stats();
}
void profiler::stealing_start(unsigned thread_id) {
auto &thread_stats = thread_stats_for(thread_id);
run_on_stack(thread_id, [&](thread_stats &thread_stats) {
thread_stats.total_steals_++;
thread_stats.run_on_stack([&] {
if (capture_time_) {
thread_stats.stealing_start_time_ = clock::now();
thread_stats.total_steals_++;
}
});
}
void profiler::stealing_end(unsigned thread_id, bool success) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
run_on_stack(thread_id, [&](thread_stats &thread_stats) {
thread_stats.failed_steals_ += !success;
thread_stats.successful_steals_ += success;
if (capture_time_) {
auto end_time = clock::now();
auto
steal_duration =
std::chrono::duration_cast<measurement_resolution>(end_time - thread_stats.stealing_start_time_).count();
thread_stats.total_time_stealing_ += steal_duration;
}
});
}
void profiler::stealing_cas_op(unsigned thread_id) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
run_on_stack(thread_id, [&](thread_stats &thread_stats) {
thread_stats.steal_cas_ops_++;
});
}
dag_node *profiler::task_spawn_child(unsigned thread_id, dag_node *parent) {
auto &thread_stats = thread_stats_for(thread_id);
dag_node *result;
thread_stats.run_on_stack([&] {
run_on_stack(thread_id, [&](thread_stats &) {
result = &parent->child_nodes_.emplace_back(thread_id);
});
return result;
}
dag_node *profiler::task_sync(unsigned thread_id, dag_node *synced) {
auto &thread_stats = thread_stats_for(thread_id);
dag_node *result;
thread_stats.run_on_stack([&] {
run_on_stack(thread_id, [&](thread_stats &) {
synced->next_node_ = std::make_unique<dag_node>(thread_id);
result = synced->next_node_.get();
});
......@@ -104,41 +114,40 @@ dag_node *profiler::task_sync(unsigned thread_id, dag_node *synced) {
}
void profiler::task_start_running(unsigned thread_id, dag_node *in_node) {
if (capture_time_) {
(void) in_node; // unused, keep for 'symmetric' API
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
thread_stats.task_run_start_time = clock::now();
run_on_stack(thread_id, [&](thread_stats &thread_stats) {
thread_stats.task_run_start_time_ = clock::now();
});
}
}
void profiler::task_stop_running(unsigned thread_id, dag_node *in_node) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
if (capture_time_) {
run_on_stack(thread_id, [&](thread_stats &thread_stats) {
auto end_time = clock::now();
auto user_code_duration =
std::chrono::duration_cast<measurement_resolution>(end_time - thread_stats.task_run_start_time).count();
std::chrono::duration_cast<measurement_resolution>(end_time - thread_stats.task_run_start_time_).count();
in_node->total_runtime_ += user_code_duration;
});
}
}
void profiler::task_prepare_stack_measure(unsigned thread_id, char *stack_memory, size_t stack_size) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
if (capture_memory_) {
run_on_stack(thread_id, [&](thread_stats &) {
for (size_t i = 0; i < stack_size - sizeof(MAGIC_BYTES); i += sizeof(MAGIC_BYTES)) {
for (size_t j = 0; j < sizeof(MAGIC_BYTES); j++) {
stack_memory[i + j] = MAGIC_BYTES[j];
}
}
});
}
}
void profiler::task_finish_stack_measure(unsigned thread_id, char *stack_memory, size_t stack_size, dag_node *in_node) {
auto &thread_stats = thread_stats_for(thread_id);
thread_stats.run_on_stack([&] {
if (capture_memory_) {
run_on_stack(thread_id, [&](thread_stats &) {
for (size_t i = 0; i < stack_size - sizeof(MAGIC_BYTES); i += sizeof(MAGIC_BYTES)) {
bool section_clean = true;
for (size_t j = 0; j < sizeof(MAGIC_BYTES); j++) {
......@@ -153,6 +162,7 @@ void profiler::task_finish_stack_measure(unsigned thread_id, char *stack_memory,
}
}
});
}
}
}
......@@ -124,8 +124,8 @@ void scheduler::sync() {
if (active_task->is_synchronized_) {
#if PLS_PROFILING_ENABLED
syncing_state.get_scheduler().profiler_.task_start_running(syncing_state.get_thread_id(),
active_task->profiling_node_);
thread_state::get().get_scheduler().profiler_.task_start_running(thread_state::get().get_thread_id(),
thread_state::get().get_active_task()->profiling_node_);
#endif
return; // We are already the sole owner of last_task
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment