Project 'las3/development/scheduling/predictable_parallel_patterns' was moved to 'las3_pub/predictable_parallel_patterns'. Please update any links and bookmarks that may still have the old path.
Commit 9c12addf by Florian Fritz

Merge branch 'timing_helpers' into 'master'

Merge: Timing Helpers

See merge request !5
parents 72bffbb1 76f1622e
Pipeline #1140 passed with stages
in 3 minutes 25 seconds
......@@ -32,6 +32,7 @@ add_subdirectory(lib/pls)
add_subdirectory(app/playground)
add_subdirectory(app/test_for_new)
add_subdirectory(app/invoke_parallel)
add_subdirectory(app/benchmark_fft)
# Add optional tests
option(PACKAGE_TESTS "Build the tests" ON)
......
......@@ -26,6 +26,22 @@ After this is done you can use normal `make` commands like
`make` to build everything `make <target>` to build a target
or `make install` to install the library globally.
Available Settings:
- `-DEASY_PROFILER=ON/OFF`
- default OFF
- Enabling will link the easy profiler library and enable its macros
- Enabling has a performance hit (do not use in releases)
- `-DADDRESS_SANITIZER=ON/OFF`
- default OFF
- Enables address sanitizer to be linked to the executable
- Only one sanitizer can be active at once
- Enabling has a performance hit (do not use in releases)
- `-DTHREAD_SANITIZER=ON/OFF`
- default OFF
- Enables thread/datarace sanitizer to be linked to the executable
- Only one sanitizer can be active at once
- Enabling has a performance hit (do not use in releases)
### Testing
Testing is done using [Catch2](https://github.com/catchorg/Catch2/)
......
add_executable(benchmark_fft main.cpp)
target_link_libraries(benchmark_fft pls)
if(EASY_PROFILER)
target_link_libraries(benchmark_fft easy_profiler)
endif()
#include <pls/pls.h>
#include <pls/internal/helpers/profiler.h>
#include <pls/internal/helpers/mini_benchmark.h>
#include <iostream>
#include <complex>
#include <vector>
static constexpr int CUTOFF = 10;
static constexpr int NUM_ITERATIONS = 1000;
static constexpr int INPUT_SIZE = 2064;
typedef std::vector<std::complex<double>> complex_vector;
void divide(complex_vector::iterator data, int n) {
complex_vector tmp_odd_elements(n / 2);
for (int i = 0; i < n / 2; i++) {
tmp_odd_elements[i] = data[i * 2 + 1];
}
for (int i = 0; i < n / 2; i++) {
data[i] = data[i * 2];
}
for (int i = 0; i < n / 2; i++) {
data[i + n / 2] = tmp_odd_elements[i];
}
}
void combine(complex_vector::iterator data, int n) {
for (int i = 0; i < n / 2; i++) {
std::complex<double> even = data[i];
std::complex<double> odd = data[i + n / 2];
// w is the "twiddle-factor".
// this could be cached, but we run the same 'base' algorithm parallel/serial,
// so it won't impact the performance comparison.
std::complex<double> w = exp(std::complex<double>(0, -2. * M_PI * i / n));
data[i] = even + w * odd;
data[i + n / 2] = even - w * odd;
}
}
void fft(complex_vector::iterator data, int n) {
if (n < 2) {
return;
}
divide(data, n);
if (n <= CUTOFF) {
fft(data, n / 2);
fft(data + n / 2, n / 2);
} else {
pls::invoke_parallel(
[&] { fft(data, n / 2); },
[&] { fft(data + n / 2, n / 2); }
);
}
combine(data, n);
}
complex_vector prepare_input(int input_size) {
std::vector<double> known_frequencies{2, 11, 52, 88, 256};
complex_vector data(input_size);
// Set our input data to match a time series of the known_frequencies.
// When applying fft to this time-series we should find these frequencies.
for (int i = 0; i < input_size; i++) {
data[i] = std::complex<double>(0.0, 0.0);
for (auto frequencie : known_frequencies) {
data[i] += sin(2 * M_PI * frequencie * i / input_size);
}
}
return data;
}
int main() {
PROFILE_ENABLE
complex_vector initial_input = prepare_input(INPUT_SIZE);
pls::internal::helpers::run_mini_benchmark([&] {
complex_vector input = initial_input;
fft(input.begin(), input.size());
}, 8, 4000);
PROFILE_SAVE("test_profile.prof")
}
add_executable(invoke_parallel main.cpp)
target_link_libraries(invoke_parallel pls easy_profiler)
target_link_libraries(invoke_parallel pls)
if(EASY_PROFILER)
target_link_libraries(invoke_parallel easy_profiler)
endif()
#include <pls/pls.h>
#include <iostream>
#include <pls/internal/helpers/profiler.h>
#include <easy/profiler.h>
#include <iostream>
static pls::static_scheduler_memory<8, 2 << 14> my_scheduler_memory;
......@@ -33,17 +33,19 @@ long fib(long n) {
}
int main() {
EASY_PROFILER_ENABLE;
PROFILE_ENABLE
pls::scheduler scheduler{&my_scheduler_memory, 8};
long result;
scheduler.perform_work([&] {
EASY_MAIN_THREAD;
PROFILE_MAIN_THREAD
// Call looks just the same, only requirement is
// the enclosure in the perform_work lambda.
result = fib(30);
for (int i = 0; i < 10; i++) {
result = fib(30);
std::cout << "Fib(30)=" << result << std::endl;
}
});
std::cout << "Fib(30)=" << result << std::endl;
profiler::dumpBlocksToFile("test_profile.prof");
PROFILE_SAVE("test_profile.prof")
}
......@@ -5,7 +5,7 @@
#include <atomic>
#include <pls/pls.h>
#include <pls/internal/base/prohibit_new.h>
#include <pls/internal/helpers/prohibit_new.h>
using namespace pls;
......
#include <pls/internal/base/thread.h>
#include <pls/internal/base/prohibit_new.h>
#include <pls/internal/helpers/prohibit_new.h>
using namespace pls::internal::base;
......
# Optional external dependencies
find_package(easy_profiler)
option(EASY_PROFILER "Enable the profiler" OFF)
if(EASY_PROFILER)
if(easy_profiler_FOUND)
# Optional external dependencies
find_package(easy_profiler)
if(easy_profiler_FOUND)
# Do nothing, add definitions below
else()
message(WARNING "EasyProfiler dependency not found on system, DISABLING it!")
set(EASY_PROFILER OFF)
endif()
endif()
if(NOT EASY_PROFILER)
if(EASY_PROFILER)
add_definitions(-DENABLE_EASY_PROFILER)
else()
add_definitions(-DDISABLE_EASY_PROFILER)
endif()
......
#!bin/python3
import sys
import os
if len(sys.argv) < 2:
print("Please pass the name of the benchmark target as an argument!")
exit(1)
target = sys.argv[1]
print('Comparing current modifications for benchmark target ' + target)
print('Executing current version...')
print(os.popen('cd cmake-build-release; make ' + target).read())
current = os.popen('chrt -rr 99 ./cmake-build-release/bin/' + target).read()
print('Executing old version...')
print(os.popen('git stash push').read())
print(os.popen('cd cmake-build-release; make ' + target).read())
before = os.popen('chrt -rr 99 ./cmake-build-release/bin/' + target).read()
print(os.popen('git stash pop').read())
print('=======================================================')
current = [float(value) for value in current.split(',')]
before = [float(value) for value in before.split(',')]
def formate_change(change):
if change > 1.05:
color = '31'
elif change < 0.95:
color = '32'
else:
color = '30'
return '\033[1;' + color + ';40m %8.2f' % (change * 100) + ' %'
format_string = ' '.join(['%10.2f us'] * len(current))
print('old: ' + format_string % tuple(before))
print('new: ' + format_string % tuple(current))
print('=' * 55)
change = [c / b for b, c in zip(before, current)]
formated_change = ''.join(list(map(formate_change, change)))
print(formated_change)
......@@ -3,7 +3,7 @@ add_library(pls STATIC
src/pls.cpp include/pls/pls.h
src/internal/base/spin_lock.cpp include/pls/internal/base/spin_lock.h
src/internal/base/thread.cpp include/pls/internal/base/thread.h
include/pls/internal/base/prohibit_new.h
include/pls/internal/helpers/prohibit_new.h
src/internal/scheduling/abstract_task.cpp include/pls/internal/scheduling/abstract_task.h
src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h
src/internal/scheduling/thread_state.cpp include/pls/internal/scheduling/thread_state.h
......@@ -14,7 +14,11 @@ add_library(pls STATIC
src/internal/scheduling/run_on_n_threads_task.cpp include/pls/internal/scheduling/run_on_n_threads_task.h
src/internal/scheduling/fork_join_task.cpp include/pls/internal/scheduling/fork_join_task.h
src/internal/base/deque.cpp include/pls/internal/base/deque.h
src/algorithms/invoke_parallel.cpp include/pls/algorithms/invoke_parallel.h include/pls/internal/base/error_handling.h)
src/algorithms/invoke_parallel.cpp include/pls/algorithms/invoke_parallel.h
include/pls/internal/base/error_handling.h
include/pls/internal/scheduling/scheduler_memory.h src/internal/scheduling/scheduler_memory.cpp
include/pls/internal/helpers/profiler.h
include/pls/internal/helpers/mini_benchmark.h)
# Add everything in `./include` to be in the include path of this project
target_include_directories(pls
......
......@@ -32,13 +32,13 @@ namespace pls {
template<typename T>
T* push(const T& object) {
// Copy-Construct into desired memory location
return new (push<T>())T(object);
// Placement new into desired memory location
return new ((void*)push<T>())T(object);
}
template<typename T>
T* push() {
T* result = reinterpret_cast<T*>(head_);
void* push() {
void* result = reinterpret_cast<T*>(head_);
// Move head to next aligned position after new object
head_ = next_alignment(head_ + sizeof(T));
......
#ifndef PLS_MINI_BENCHMARK_H
#define PLS_MINI_BENCHMARK_H
#include "pls/internal/scheduling/scheduler_memory.h"
#include "pls/internal/scheduling/scheduler.h"
#include <chrono>
#include <iostream>
namespace pls {
namespace internal {
namespace helpers {
// TODO: Clean up (separate into small functions and .cpp file)
template<typename Function>
void run_mini_benchmark(const Function& lambda, size_t max_threads, long max_runtime_ms=1000) {
using namespace std;
using namespace pls::internal::scheduling;
malloc_scheduler_memory scheduler_memory{max_threads};
for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) {
scheduler local_scheduler{&scheduler_memory, num_threads};
chrono::high_resolution_clock::time_point start_time;
chrono::high_resolution_clock::time_point end_time;
unsigned long iterations = 0;
local_scheduler.perform_work([&] {
start_time = chrono::high_resolution_clock::now();
end_time = start_time;
chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms);
while (end_time < planned_end_time) {
lambda();
end_time = chrono::high_resolution_clock::now();
iterations++;
}
});
long time = chrono::duration_cast<chrono::microseconds>(end_time - start_time).count();
double time_per_iteration = (double)time / iterations;
std::cout << time_per_iteration;
if (num_threads < max_threads) {
std::cout << ",";
}
}
std::cout << std::endl;
}
}
}
}
#endif //PLS_MINI_BENCHMARK_H
#ifndef PLS_PROFILER_H
#define PLS_PROFILER_H
#ifdef ENABLE_EASY_PROFILER
#include <easy/profiler.h>
#define PROFILE_WORK_BLOCK(msg) EASY_BLOCK(msg, profiler::colors::LightGreen)
#define PROFILE_FORK_JOIN_STEALING(msg) EASY_BLOCK(msg, profiler::colors::LightBlue)
#define PROFILE_STEALING(msg) EASY_BLOCK(msg, profiler::colors::Blue)
#define PROFILE_LOCK(msg) EASY_BLOCK(msg, profiler::colors::Red)
#define PROFILE_END_BLOCK EASY_END_BLOCK
#define PROFILE_SAVE(filename) profiler::dumpBlocksToFile(filename);
#define PROFILE_ENABLE EASY_PROFILER_ENABLE
#define PROFILE_MAIN_THREAD EASY_MAIN_THREAD
#else //ENABLE_EASY_PROFILER
#define PROFILE_WORK_BLOCK(msg)
#define PROFILE_FORK_JOIN_STEALING(msg)
#define PROFILE_STEALING(msg)
#define PROFILE_LOCK(msg)
#define PROFILE_END_BLOCK
#define PROFILE_SAVE(filename)
#define PROFILE_ENABLE
#define PROFILE_MAIN_THREAD
#endif //ENABLE_EASY_PROFILER
#endif //PLS_PROFILER_H
......@@ -2,7 +2,7 @@
#ifndef PLS_TBB_LIKE_TASK_H
#define PLS_TBB_LIKE_TASK_H
#include <easy/profiler.h>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/base/deque.h"
......@@ -86,7 +86,7 @@ namespace pls {
last_stolen_{nullptr} {};
void execute() override {
EASY_BLOCK("execute fork_join_task", profiler::colors::LightGreen);
PROFILE_WORK_BLOCK("execute fork_join_task");
// Bind this instance to our OS thread
my_stack_ = base::this_thread::state<thread_state>()->task_stack_;
......@@ -102,7 +102,7 @@ namespace pls {
template<typename T>
void fork_join_sub_task::spawn_child(const T& task) {
EASY_FUNCTION(profiler::colors::Blue)
PROFILE_FORK_JOIN_STEALING("spawn_child")
static_assert(std::is_base_of<fork_join_sub_task, T>::value, "Only pass fork_join_sub_task subclasses!");
T* new_task = tbb_task_->my_stack_->push(task);
......
......@@ -2,12 +2,13 @@
#ifndef PLS_ROOT_MASTER_TASK_H
#define PLS_ROOT_MASTER_TASK_H
#include <easy/profiler.h>
#include <mutex>
#include "abstract_task.h"
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/spin_lock.h"
#include "abstract_task.h"
namespace pls {
namespace internal {
namespace scheduling {
......@@ -30,7 +31,7 @@ namespace pls {
}
void execute() override {
EASY_BLOCK("execute root_task", profiler::colors::LightGreen);
PROFILE_WORK_BLOCK("execute root_task");
function_();
finished_ = 1;
}
......@@ -54,7 +55,7 @@ namespace pls {
master_task_{master_task} {}
void execute() override {
EASY_BLOCK("execute root_task", profiler::colors::LightGreen);
PROFILE_WORK_BLOCK("execute root_task");
do {
steal_work();
} while (!master_task_->finished());
......
......@@ -2,55 +2,25 @@
#ifndef PLS_SCHEDULER_H
#define PLS_SCHEDULER_H
#include <easy/profiler.h>
#include <array>
#include <iostream>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/base/thread.h"
#include "pls/internal/base/barrier.h"
#include "thread_state.h"
#include "root_task.h"
#include "scheduler_memory.h"
namespace pls {
namespace internal {
namespace scheduling {
// Upper thread limit for static memory allocation.
// Could be moved to templating if needed.
static constexpr int MAX_THREADS = 32;
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler_memory {
public:
virtual size_t max_threads() = 0;
virtual thread_state* thread_state_for(size_t id) = 0;
virtual scheduler_thread* thread_for(size_t id) = 0;
virtual base::aligned_stack* task_stack_for(size_t id) = 0;
};
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
class static_scheduler_memory: public scheduler_memory {
std::array<scheduler_thread, MAX_THREADS> threads_;
std::array<thread_state, MAX_THREADS> thread_states_;
std::array<std::array<char, TASK_STACK_SIZE>, MAX_THREADS> task_stacks_memory_;
std::array<base::aligned_stack, MAX_THREADS> task_stacks_;
public:
static_scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
task_stacks_[i] = base::aligned_stack(reinterpret_cast<char*>(&task_stacks_memory_[i]), TASK_STACK_SIZE);
}
}
size_t max_threads() override { return MAX_THREADS; }
thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; }
scheduler_thread* thread_for(size_t id) override { return &threads_[id]; }
base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; }
};
class scheduler {
friend void worker_routine();
......@@ -65,7 +35,7 @@ namespace pls {
template<typename Function>
void perform_work(Function work_section) {
EASY_FUNCTION();
PROFILE_WORK_BLOCK("scheduler::perform_work")
root_task<Function> master{work_section};
// Push root task on stacks
......
#include "pls/internal/base/aligned_stack.h"
#include "pls/internal/base/thread.h"
#include "thread_state.h"
#ifndef PLS_SCHEDULER_MEMORY_H
#define PLS_SCHEDULER_MEMORY_H
namespace pls {
namespace internal {
namespace scheduling {
void worker_routine();
using scheduler_thread = base::thread<decltype(&worker_routine), thread_state>;
class scheduler_memory {
public:
virtual size_t max_threads() = 0;
virtual thread_state* thread_state_for(size_t id) = 0;
virtual scheduler_thread* thread_for(size_t id) = 0;
virtual base::aligned_stack* task_stack_for(size_t id) = 0;
};
template<size_t MAX_THREADS, size_t TASK_STACK_SIZE>
class static_scheduler_memory: public scheduler_memory {
std::array<scheduler_thread, MAX_THREADS> threads_;
std::array<thread_state, MAX_THREADS> thread_states_;
std::array<std::array<char, TASK_STACK_SIZE>, MAX_THREADS> task_stacks_memory_;
std::array<base::aligned_stack, MAX_THREADS> task_stacks_;
public:
static_scheduler_memory() {
for (size_t i = 0; i < MAX_THREADS; i++) {
task_stacks_[i] = base::aligned_stack(task_stacks_memory_[i].data(), TASK_STACK_SIZE);
}
}
size_t max_threads() override { return MAX_THREADS; }
thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; }
scheduler_thread* thread_for(size_t id) override { return &threads_[id]; }
base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; }
};
class malloc_scheduler_memory: public scheduler_memory {
size_t num_threads_;
scheduler_thread* threads_;
thread_state* thread_states_;
char** task_stacks_memory_;
base::aligned_stack* task_stacks_;
public:
explicit malloc_scheduler_memory(size_t num_threads, size_t memory_per_stack = 2 << 16);
~malloc_scheduler_memory();
size_t max_threads() override { return num_threads_; }
thread_state* thread_state_for(size_t id) override { return &thread_states_[id]; }
scheduler_thread* thread_for(size_t id) override { return &threads_[id]; }
base::aligned_stack* task_stack_for(size_t id) override { return &task_stacks_[id]; }
};
}
}
}
#endif //PLS_SCHEDULER_MEMORY_H
......@@ -32,23 +32,6 @@ namespace pls {
current_task_{nullptr},
task_stack_{task_stack},
id_{id} {}
thread_state(const thread_state& other):
scheduler_{other.scheduler_},
root_task_{other.root_task_},
current_task_{other.current_task_},
task_stack_{other.task_stack_},
id_{other.id_} {}
thread_state& operator=(const thread_state& other) {
scheduler_ = other.scheduler_;
root_task_ = other.root_task_;
current_task_ = other.current_task_;
task_stack_ = other.task_stack_;
id_ = other.id_;
return *this;
}
};
}
}
......
......@@ -7,8 +7,10 @@
#include "pls/internal/scheduling/scheduler.h"
namespace pls {
using internal::scheduling::scheduler;
using internal::scheduling::static_scheduler_memory;
using internal::scheduling::malloc_scheduler_memory;
using internal::scheduling::scheduler;
using task_id = internal::scheduling::abstract_task::id;
using internal::scheduling::fork_join_sub_task;
......
#include <easy/profiler.h>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/thread_state.h"
#include "pls/internal/scheduling/abstract_task.h"
......@@ -8,7 +8,7 @@ namespace pls {
namespace internal {
namespace scheduling {
bool abstract_task::steal_work() {
EASY_FUNCTION(profiler::colors::Orange);
PROFILE_STEALING("abstract_task::steal_work")
auto my_state = base::this_thread::state<thread_state>();
auto my_scheduler = my_state->scheduler_;
......@@ -18,19 +18,19 @@ namespace pls {
auto target_state = my_scheduler->thread_state_for(target);
// TODO: Cleaner Locking Using std::guarded_lock
EASY_BLOCK("Acquire Thread Lock", profiler::colors::Red)
PROFILE_LOCK("Acquire Thread Lock")
target_state->lock_.lock();
EASY_END_BLOCK;
PROFILE_END_BLOCK
// Dig down to our level
EASY_BLOCK("Go to our level")
PROFILE_STEALING("Go to our level")
abstract_task* current_task = target_state->root_task_;
while (current_task != nullptr && current_task->depth() < depth()) {
current_task = current_task->child_task_;
}
EASY_END_BLOCK;
PROFILE_END_BLOCK
EASY_BLOCK("Internal Steal")
PROFILE_STEALING("Internal Steal")
if (current_task != nullptr) {
// See if it equals our type and depth of task
if (current_task->unique_id_ == unique_id_ &&
......@@ -45,12 +45,12 @@ namespace pls {
current_task = current_task->child_task_;
}
}
EASY_END_BLOCK;
PROFILE_END_BLOCK;
// Execute 'top level task steal' if possible
// (only try deeper tasks to keep depth restricted stealing)
EASY_BLOCK("Top Level Steal")
PROFILE_STEALING("Top Level Steal")
while (current_task != nullptr) {
auto lock = &target_state->lock_;
if (current_task->split_task(lock)) {
......@@ -60,7 +60,7 @@ namespace pls {
current_task = current_task->child_task_;
}
EASY_END_BLOCK;
PROFILE_END_BLOCK;
target_state->lock_.unlock();
}
......
#include <easy/profiler.h>
#include "pls/internal/helpers/profiler.h"
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/fork_join_task.h"
......@@ -13,16 +13,19 @@ namespace pls {
tbb_task_{nullptr},
stack_state_{nullptr} {}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other): base::deque_item(other) {
// Do Nothing, will be inited after this anyways
}
fork_join_sub_task::fork_join_sub_task(const fork_join_sub_task& other):
base::deque_item(other),
ref_count_{0},
parent_{nullptr},
tbb_task_{nullptr},
stack_state_{nullptr} {}
void fork_join_sub_task::execute() {
EASY_BLOCK("execute sub_task", profiler::colors::Green);
PROFILE_WORK_BLOCK("execute sub_task")
tbb_task_->currently_executing_ = this;
execute_internal();
tbb_task_->currently_executing_ = nullptr;
EASY_END_BLOCK;
PROFILE_END_BLOCK
wait_for_all();
if (parent_ != nullptr) {
......@@ -44,17 +47,17 @@ namespace pls {
void fork_join_sub_task::wait_for_all() {
while (ref_count_ > 0) {
EASY_BLOCK("get local sub task", profiler::colors::Blue)
PROFILE_STEALING("get local sub task")
fork_join_sub_task* local_task = tbb_task_->get_local_sub_task();
EASY_END_BLOCK
PROFILE_END_BLOCK
if (local_task != nullptr) {
local_task->execute();
} else {
// Try to steal work.
// External steal will be executed implicitly if success
EASY_BLOCK("steal work", profiler::colors::Blue)
PROFILE_STEALING("steal work")
bool internal_steal_success = tbb_task_->steal_work();
EASY_END_BLOCK
PROFILE_END_BLOCK
if (internal_steal_success) {
tbb_task_->last_stolen_->execute();
}
......@@ -72,7 +75,7 @@ namespace pls {
}
bool fork_join_task::internal_stealing(abstract_task* other_task) {
EASY_FUNCTION(profiler::colors::Blue);
PROFILE_STEALING("fork_join_task::internal_stealin")
auto cast_other_task = reinterpret_cast<fork_join_task*>(other_task);
auto stolen_sub_task = cast_other_task->get_stolen_sub_task();
......@@ -90,7 +93,7 @@ namespace pls {
}
bool fork_join_task::split_task(base::spin_lock* lock) {
EASY_FUNCTION(profiler::colors::Blue);
PROFILE_STEALING("fork_join_task::split_task")
fork_join_sub_task* stolen_sub_task = get_stolen_sub_task();
if (stolen_sub_task == nullptr) {
return false;
......
......@@ -9,13 +9,14 @@ namespace pls {
memory_{memory},
sync_barrier_{num_threads + 1},
terminated_{false} {
if (num_threads > MAX_THREADS) {
if (num_threads_ > memory_->max_threads()) {
PLS_ERROR("Tried to create scheduler with more OS threads than pre-allocated memory.");
}
for (unsigned int i = 0; i < num_threads; i++) {
*memory_->thread_state_for(i) = thread_state{this, memory_->task_stack_for(i), i};
*memory_->thread_for(i) = base::start_thread(&worker_routine, memory_->thread_state_for(i));
for (unsigned int i = 0; i < num_threads_; i++) {
// Placement new is required, as the memory of `memory_` is not required to be initialized.
new((void*)memory_->thread_state_for(i)) thread_state{this, memory_->task_stack_for(i), i};
new ((void*)memory_->thread_for(i))base::thread<void(*)(), thread_state>(&worker_routine, memory_->thread_state_for(i));
}
}
......
#include "pls/internal/scheduling/scheduler_memory.h"
namespace pls {
namespace internal {
namespace scheduling {
malloc_scheduler_memory::malloc_scheduler_memory(const size_t num_threads, const size_t memory_per_stack):
num_threads_{num_threads} {
threads_ = reinterpret_cast<scheduler_thread*>(malloc(num_threads * sizeof(scheduler_thread)));
thread_states_ = reinterpret_cast<thread_state*>(malloc(num_threads * sizeof(thread_state)));
task_stacks_ = reinterpret_cast<base::aligned_stack*>(malloc(num_threads * sizeof(base::aligned_stack)));
task_stacks_memory_ = reinterpret_cast<char**>(malloc(num_threads * sizeof(char*)));
for (size_t i = 0; i < num_threads_; i++) {
task_stacks_memory_[i] = reinterpret_cast<char*>(malloc(memory_per_stack));
task_stacks_[i] = base::aligned_stack(task_stacks_memory_[i], memory_per_stack);
}
}
malloc_scheduler_memory::~malloc_scheduler_memory() {
free(threads_);
free(thread_states_);
for (size_t i = 0; i < num_threads_; i++) {
free(task_stacks_memory_[i]);
}
free(task_stacks_);
free(task_stacks_memory_);
}
}
}
}
......@@ -48,7 +48,7 @@ public:
};
TEST_CASE( "tbb task are scheduled correctly", "[internal/scheduling/fork_join_task.h]") {
static static_scheduler_memory<8, 2 << 12> my_scheduler_memory;
malloc_scheduler_memory my_scheduler_memory{8, 2 << 12};
SECTION("tasks are executed exactly once") {
scheduler my_scheduler{&my_scheduler_memory, 2};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment