Commit 54dae64f by FritzFlorian

Add proper build flavour system and fix some worker-sleep issues.

parent a26720bd
Pipeline #1491 passed with stages
in 3 minutes 48 seconds
......@@ -102,6 +102,15 @@ After this is done you can use normal `make` commands like
or `make install` to install the library globally.
Available Settings:
- `-DPLS_PROFILER=ON/OFF`
- default OFF
- Enabling it will record execution DAGs with memory and runtime stats
- Enabling has a BIG performance hit (use only for development)
- `-DSLEEP_WORKERS=ON/OFF`
- default OFF
- Enabling it will make workers keep a central 'all workers empty flag'
- Workers try to sleep if there is no work in the system
- Has performance impact on isolated runs, but can benefit multiprogrammed systems
- `-DEASY_PROFILER=ON/OFF`
- default OFF
- Enabling will link the easy profiler library and enable its macros
......
......@@ -14,9 +14,6 @@ void pls_conquer(fft::complex_vector::iterator data, fft::complex_vector::iterat
fft::divide(data, swap_array, n);
if (n <= fft::RECURSIVE_CUTOFF) {
FILE* file = fopen("test.text", "w");
fprintf(file, "test %d", n);
fclose(file);
fft::conquer(data, swap_array, n / 2);
fft::conquer(data + n / 2, swap_array + n / 2, n / 2);
} else {
......@@ -50,13 +47,18 @@ int main(int argc, char **argv) {
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
runner.run_iterations(fft::NUM_ITERATIONS, [&]() {
// scheduler.get_profiler().disable_memory_measure();
runner.run_iterations(10, [&]() {
scheduler.perform_work([&]() {
pls_conquer(data.begin(), swap_array.begin(), fft::SIZE);;
});
}, fft::NUM_WARMUP_ITERATIONS, [&]() {
// scheduler.get_profiler().current_run().print_stats();
}, 1, [&]() {
fft::fill_input(data); // Reset data before each run
});
// scheduler.get_profiler().current_run().print_dag(std::cout);
// scheduler.get_profiler().current_run().print_stats();
runner.commit_results(true);
return 0;
......
......@@ -44,14 +44,14 @@ int main(int argc, char **argv) {
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
volatile int res;
scheduler.get_profiler().disable_memory_measure();
// scheduler.get_profiler().disable_memory_measure();
runner.run_iterations(fib::NUM_ITERATIONS, [&]() {
scheduler.perform_work([&]() {
res = pls_fib(fib::INPUT_N);
});
}, fib::NUM_WARMUP_ITERATIONS);
scheduler.get_profiler().current_run().print_dag(std::cout);
scheduler.get_profiler().current_run().print_stats();
// scheduler.get_profiler().current_run().print_dag(std::cout);
// scheduler.get_profiler().current_run().print_stats();
runner.commit_results(true);
......
......@@ -38,14 +38,14 @@ int main(int argc, char **argv) {
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
scheduler.get_profiler().disable_memory_measure();
// scheduler.get_profiler().disable_memory_measure();
runner.run_iterations(matrix::NUM_ITERATIONS, [&]() {
scheduler.perform_work([&]() {
result.multiply(a, b);
});
}, matrix::WARMUP_ITERATIONS);
scheduler.get_profiler().current_run().print_dag(std::cout);
scheduler.get_profiler().current_run().print_stats();
// scheduler.get_profiler().current_run().print_dag(std::cout);
// scheduler.get_profiler().current_run().print_stats();
runner.commit_results(true);
}
......@@ -45,14 +45,17 @@ int main(int argc, char **argv) {
scheduler scheduler{(unsigned) num_threads, MAX_NUM_TASKS, MAX_STACK_SIZE};
runner.run_iterations(unbalanced::NUM_ITERATIONS, [&]() {
// scheduler.get_profiler().disable_memory_measure();
runner.run_iterations(1, [&]() {
scheduler.perform_work([&]() {
unbalanced_tree_search(unbalanced::SEED,
unbalanced::ROOT_CHILDREN,
unbalanced::Q,
unbalanced::NORMAL_CHILDREN);
});
}, unbalanced::WARMUP_ITERATIONS);
}, 0);
// scheduler.get_profiler().current_run().print_dag(std::cout);
// scheduler.get_profiler().current_run().print_stats();
runner.commit_results(true);
}
......
# List all required files here (cmake best practice to NOT automate this step!)
add_library(pls STATIC
include/pls/pls.h
include/pls/internal/build_flavour.h
include/pls/algorithms/loop_partition_strategy.h
include/pls/algorithms/for_each.h include/pls/algorithms/for_each_impl.h
include/pls/algorithms/invoke.h include/pls/algorithms/invoke_impl.h
......@@ -15,6 +18,7 @@ add_library(pls STATIC
include/pls/internal/base/error_handling.h src/internal/base/error_handling.cpp
include/pls/internal/base/alignment.h src/internal/base/alignment.cpp
include/pls/internal/base/stack_allocator.h src/internal/base/stack_allocator.cpp
include/pls/internal/base/futex_wrapper.h
include/pls/internal/data_structures/aligned_stack.h src/internal/data_structures/aligned_stack.cpp
include/pls/internal/data_structures/aligned_stack_impl.h
......@@ -43,7 +47,7 @@ add_library(pls STATIC
include/pls/internal/profiling/dag_node.h src/internal/profiling/dag_node.cpp
include/pls/internal/profiling/profiler.h
include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp src/internal/profiling/profiler.cpp include/pls/internal/build_flavour.h include/pls/internal/base/futex_wrapper.h)
include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp src/internal/profiling/profiler.cpp)
# Dependencies for pls
target_link_libraries(pls Threads::Threads)
......@@ -52,13 +56,39 @@ if (EASY_PROFILER)
target_link_libraries(pls easy_profiler)
endif ()
# Generate defines needed
if (EASY_PROFILER)
set(PLS_EASY_PROFILER_ENABLED true)
else ()
set(PLS_EASY_PROFILER_ENABLED false)
endif ()
option(PLS_PROFILER "Enable the internal DAG profiler" OFF)
if (PLS_PROFILER)
set(PLS_PROFILING_ENABLED true)
else ()
set(PLS_PROFILING_ENABLED false)
endif ()
option(SLEEP_WORKERS "Enable sleeping workers if queues are empty" OFF)
if (SLEEP_WORKERS)
set(PLS_SLEEP_WORKERS_ON_EMPTY true)
else ()
set(PLS_SLEEP_WORKERS_ON_EMPTY false)
endif ()
configure_file(include/pls/internal/build_flavour.h.in include/pls/internal/build_flavour.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
PUBLIC
$<INSTALL_INTERFACE:include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/src # TODO: Set this up when we require private headers
${CMAKE_CURRENT_SOURCE_DIR}/src
)
target_include_directories(pls
PUBLIC
$<INSTALL_INTERFACE:include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}/include>
)
# Rules for installing the library on a system
......@@ -74,6 +104,12 @@ INSTALL(
DESTINATION include
FILES_MATCHING PATTERN "*.h*"
)
INSTALL(
DIRECTORY ${PROJECT_BINARY_DIR}/lib/pls/include/pls
DESTINATION include
FILES_MATCHING PATTERN "*.h*"
)
# ...allow our project to be a cmake dependency
install(
EXPORT pls-targets
......
......@@ -2,6 +2,8 @@
#ifndef PLS_INTERNAL_BUILD_FLAVOUR_H_
#define PLS_INTERNAL_BUILD_FLAVOUR_H_
#define PLS_SLEEP_WORKERS_ON_EMPTY true
#define PLS_PROFILING_ENABLED @PLS_PROFILING_ENABLED@
#define PLS_SLEEP_WORKERS_ON_EMPTY @PLS_SLEEP_WORKERS_ON_EMPTY@
#define PLS_EASY_PROFILER_ENABLED @PLS_EASY_PROFILER_ENABLED@
#endif //PLS_INTERNAL_BUILD_FLAVOUR_H_
#include "pls/internal/build_flavour.h"
#ifndef PLS_PROFILER_H
#define PLS_PROFILER_H
#ifdef ENABLE_EASY_PROFILER
#if PLS_EASY_PROFILER_ENABLED
#include <easy/profiler.h>
#include <easy/arbitrary_value.h>
......
......@@ -2,16 +2,14 @@
#ifndef PLS_INTERNAL_PROFILING_PROFILER_H_
#define PLS_INTERNAL_PROFILING_PROFILER_H_
#ifndef PLS_PROFILING_ENABLED
#define PLS_PROFILING_ENABLED true
#endif
#include <memory>
#include <chrono>
#include <vector>
#include <iostream>
#include "pls/internal/build_flavour.h"
#include "dag_node.h"
#include "thread_stats.h"
......@@ -51,6 +49,9 @@ class profiler {
unsigned long steals_cas_ops_;
unsigned long steals_time_;
unsigned long sleeps_;
unsigned long sleeps_time_;
unsigned long max_memory_per_stack_;
unsigned long spawn_depth_;
......@@ -82,6 +83,9 @@ class profiler {
void stealing_end(unsigned thread_id, bool success);
void stealing_cas_op(unsigned thread_id);
void sleep_start(unsigned thread_id);
void sleep_stop(unsigned thread_id);
dag_node *task_spawn_child(unsigned thread_id, dag_node *parent);
dag_node *task_sync(unsigned thread_id, dag_node *synced);
void task_start_running(unsigned thread_id, dag_node *in_node);
......
......@@ -24,7 +24,11 @@ struct PLS_CACHE_ALIGN thread_stats {
unsigned long total_time_stealing_{0};
unsigned long total_sleeps_{0};
unsigned long total_time_sleeping_{0};
clock::time_point stealing_start_time_;
clock::time_point sleeping_start_time_;
clock::time_point task_run_start_time_;
};
......
......@@ -132,18 +132,18 @@ class scheduler {
std::shared_ptr<base::stack_allocator> stack_allocator_;
#if PLS_PROFILING_ENABLED
profiling::profiler profiler_;
#endif
#if PLS_SLEEP_WORKERS_ON_EMPTY
PLS_CACHE_ALIGN std::atomic<int32_t> empty_queue_counter_{0};
PLS_CACHE_ALIGN std::atomic<int32_t> threads_sleeping_{0};
std::atomic<int32_t> threads_sleeping_{0};
void empty_queue_try_sleep_worker();
void empty_queue_increase_counter();
void empty_queue_decrease_counter_and_wake();
#endif
#if PLS_PROFILING_ENABLED
profiling::profiler profiler_;
#endif
};
}
......
......@@ -14,11 +14,17 @@ void profiler::profiler_run::calculate_stats() {
steals_successful_ = 0;
steals_cas_ops_ = 0;
steals_time_ = 0;
sleeps_ = 0;
sleeps_time_ = 0;
for (auto &thread_stats : per_thread_stats_) {
steals_failed_ += thread_stats.failed_steals_;
steals_successful_ += thread_stats.successful_steals_;
steals_cas_ops_ += thread_stats.steal_cas_ops_;
steals_time_ += thread_stats.total_time_stealing_;
sleeps_ += thread_stats.total_sleeps_;
sleeps_time_ += thread_stats.total_time_sleeping_;
}
max_memory_per_stack_ = root_node_->dag_max_memory();
......@@ -39,7 +45,10 @@ void profiler::profiler_run::print_stats() const {
<< ", Failed: " << steals_failed_
<< ", CAS: " << steals_cas_ops_ << ")" << std::endl;
unsigned long total_measured = steals_time_ + t_1_;
std::cout << "SLEEPS: (Time " << m_to_d(sleeps_time_)
<< ", Total: " << sleeps_ << ")" << std::endl;
unsigned long total_measured = steals_time_ + t_1_ + sleeps_time_;
unsigned long total_wall = wall_time_ * num_threads_;
std::cout << "Wall Time vs. Measured: " << 100.0 * total_measured / total_wall << std::endl;
......@@ -82,8 +91,7 @@ void profiler::stealing_end(unsigned thread_id, bool success) {
if (capture_time_) {
auto end_time = clock::now();
auto
steal_duration =
auto steal_duration =
std::chrono::duration_cast<measurement_resolution>(end_time - thread_stats.stealing_start_time_).count();
thread_stats.total_time_stealing_ += steal_duration;
}
......@@ -96,6 +104,27 @@ void profiler::stealing_cas_op(unsigned thread_id) {
});
}
void profiler::sleep_start(unsigned thread_id) {
run_on_stack(thread_id, [&](thread_stats &thread_stats) {
thread_stats.total_sleeps_++;
if (capture_time_) {
thread_stats.sleeping_start_time_ = clock::now();
}
});
}
void profiler::sleep_stop(unsigned thread_id) {
run_on_stack(thread_id, [&](thread_stats &thread_stats) {
if (capture_time_) {
auto end_time = clock::now();
auto sleep_duration =
std::chrono::duration_cast<measurement_resolution>(end_time - thread_stats.sleeping_start_time_).count();
thread_stats.total_time_sleeping_ += sleep_duration;
}
});
}
dag_node *profiler::task_spawn_child(unsigned thread_id, dag_node *parent) {
dag_node *result;
run_on_stack(thread_id, [&](thread_stats &) {
......
......@@ -301,26 +301,27 @@ bool scheduler::check_task_chain(base_task &start_task) {
void scheduler::empty_queue_try_sleep_worker() {
int32_t counter_value = empty_queue_counter_.load();
// printf("try sleep thread %d...\n", counter_value);
if (counter_value == num_threads() - 1) {
// printf("sleeping thread...\n");
if (counter_value == num_threads()) {
#if PLS_PROFILING_ENABLED
get_profiler().sleep_start(thread_state::get().get_thread_id());
#endif
threads_sleeping_++;
base::futex_wait((int32_t *) &empty_queue_counter_, num_threads() - 1);
base::futex_wakeup((int32_t *) &empty_queue_counter_, 1);
base::futex_wait((int32_t *) &empty_queue_counter_, num_threads());
threads_sleeping_--;
// printf("waked thread...\n");
base::futex_wakeup((int32_t *) &empty_queue_counter_, 1);
#if PLS_PROFILING_ENABLED
get_profiler().sleep_stop(thread_state::get().get_thread_id());
#endif
}
}
void scheduler::empty_queue_increase_counter() {
int32_t old_value = empty_queue_counter_.fetch_add(1);
// printf("increase counter from %d", old_value);
empty_queue_counter_.fetch_add(1);
}
void scheduler::empty_queue_decrease_counter_and_wake() {
empty_queue_counter_.fetch_sub(1);
if (threads_sleeping_.load() > 0) {
// Threads could be sleeping, we MUST wake them up
base::futex_wakeup((int32_t *) &empty_queue_counter_, 1);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment