Commit f01c5436 by FritzFlorian

Fix bug in scheduler and tasks.

Threads must be joined on scheduler termination and tasks must be pushed onto the stack to allow better memory management.
parent 779978e2
Pipeline #1268 failed with stages
in 36 seconds
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
#include <vector> #include <vector>
#include <functional> #include <functional>
static constexpr int INPUT_SIZE = 1000000; static constexpr int INPUT_SIZE = 100;
int main() { int main() {
PROFILE_ENABLE PROFILE_ENABLE
...@@ -23,3 +23,25 @@ int main() { ...@@ -23,3 +23,25 @@ int main() {
PROFILE_SAVE("test_profile.prof") PROFILE_SAVE("test_profile.prof")
} }
//int main() {
// PROFILE_ENABLE
// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18};
// pls::scheduler scheduler{&my_scheduler_memory, 8};
//
// std::vector<double> vec(INPUT_SIZE, 1);
// std::vector<double> out(INPUT_SIZE);
//
// for (int i = 0; i < INPUT_SIZE; i++) {
// vec[i] = 1;
// }
//
// scheduler.perform_work([&] {
// PROFILE_MAIN_THREAD
// for (int i = 0; i < 100; i++) {
// pls::scan(vec.begin(), vec.end(), out.begin(), std::plus<double>(), 0.0);
// }
// });
//
// PROFILE_SAVE("test_profile.prof")
//}
...@@ -24,6 +24,7 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon ...@@ -24,6 +24,7 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon
chrono::high_resolution_clock::time_point start_time; chrono::high_resolution_clock::time_point start_time;
chrono::high_resolution_clock::time_point end_time; chrono::high_resolution_clock::time_point end_time;
long max_local_time = 0;
unsigned long iterations = 0; unsigned long iterations = 0;
local_scheduler.perform_work([&] { local_scheduler.perform_work([&] {
start_time = chrono::high_resolution_clock::now(); start_time = chrono::high_resolution_clock::now();
...@@ -31,7 +32,11 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon ...@@ -31,7 +32,11 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon
chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms); chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms);
while (end_time < planned_end_time) { while (end_time < planned_end_time) {
auto local_start_time = chrono::high_resolution_clock::now();
lambda(); lambda();
auto local_end_time = chrono::high_resolution_clock::now();
long local_time = chrono::duration_cast<chrono::microseconds>(local_end_time - local_start_time).count();
max_local_time = std::max(local_time, max_local_time);
end_time = chrono::high_resolution_clock::now(); end_time = chrono::high_resolution_clock::now();
iterations++; iterations++;
} }
...@@ -40,9 +45,9 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon ...@@ -40,9 +45,9 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon
long time = chrono::duration_cast<chrono::microseconds>(end_time - start_time).count(); long time = chrono::duration_cast<chrono::microseconds>(end_time - start_time).count();
double time_per_iteration = (double) time / iterations; double time_per_iteration = (double) time / iterations;
std::cout << time_per_iteration; std::cout << (long) time_per_iteration << " (" << max_local_time << ")";
if (num_threads < max_threads) { if (num_threads < max_threads) {
std::cout << ","; std::cout << "\t\t";
} }
} }
std::cout << std::endl; std::cout << std::endl;
......
...@@ -70,10 +70,8 @@ class scheduler { ...@@ -70,10 +70,8 @@ class scheduler {
/** /**
* Explicitly terminate the worker threads. Scheduler must not be used after this. * Explicitly terminate the worker threads. Scheduler must not be used after this.
*
* @param wait_for_workers Set to true if you wish to return from this method only after the workers are shut down.
*/ */
void terminate(bool wait_for_workers = true); void terminate();
/** /**
* Helper to spawn a child on the currently running task. * Helper to spawn a child on the currently running task.
...@@ -96,16 +94,6 @@ class scheduler { ...@@ -96,16 +94,6 @@ class scheduler {
static void spawn_child_and_wait(ARGS &&... args); static void spawn_child_and_wait(ARGS &&... args);
/** /**
* Allocates some memory on the task-stack.
* It's usage is restricted to the function scope, as this enforces correct memory management.
*
* @param bytes Number of bytes to allocate
* @param function The function in which you can access the allocated memory
*/
template<typename Function>
static void allocate_on_stack(size_t bytes, Function function);
/**
* Helper to wait for all children of the currently executing task. * Helper to wait for all children of the currently executing task.
*/ */
static void wait_for_all(); static void wait_for_all();
......
...@@ -50,19 +50,6 @@ void scheduler::spawn_child_and_wait(ARGS &&... args) { ...@@ -50,19 +50,6 @@ void scheduler::spawn_child_and_wait(ARGS &&... args) {
thread_state::get()->current_task_->spawn_child_and_wait<T>(std::forward<ARGS>(args)...); thread_state::get()->current_task_->spawn_child_and_wait<T>(std::forward<ARGS>(args)...);
} }
// TODO: Make this 'more pretty' with type-safety etc.
template<typename Function>
void scheduler::allocate_on_stack(size_t bytes, Function function) {
auto my_state = thread_state::get();
void *allocated_memory = my_state->task_stack_->push_bytes(bytes);
auto old_deque_state = my_state->current_task_->deque_state_;
my_state->current_task_->deque_state_ = my_state->task_stack_->save_state();
function(allocated_memory);
my_state->current_task_->deque_state_ = old_deque_state;
}
} }
} }
} }
......
...@@ -16,6 +16,9 @@ namespace scheduling { ...@@ -16,6 +16,9 @@ namespace scheduling {
class task { class task {
friend class scheduler; friend class scheduler;
// Memory-Management (allow to allocate memory blocks in constructor)
bool finished_construction_;
// Coordinate finishing of sub_tasks // Coordinate finishing of sub_tasks
std::atomic<unsigned int> ref_count_; std::atomic<unsigned int> ref_count_;
task *parent_; task *parent_;
...@@ -24,9 +27,22 @@ class task { ...@@ -24,9 +27,22 @@ class task {
data_structures::deque<task>::state deque_state_; data_structures::deque<task>::state deque_state_;
protected: protected:
/*
* Must call the parent constructor.
*/
explicit task(); explicit task();
/** /**
* Allow to allocate extra memory during run-time for this task.
* Memory will be pushed onto the stack (in aligned memory, thus avoid many small chunks).
* MUST be called in constructor, never afterwards.
*
* @param size Number of bytes to be allocated
* @return The allocated memory region
*/
void *allocate_memory(long size);
/**
* Overwrite this with the actual behaviour of concrete tasks. * Overwrite this with the actual behaviour of concrete tasks.
*/ */
virtual void execute_internal() = 0; virtual void execute_internal() = 0;
...@@ -53,23 +69,17 @@ void task::spawn_child(ARGS &&... args) { ...@@ -53,23 +69,17 @@ void task::spawn_child(ARGS &&... args) {
thread_state::get()->deque_.push_tail_cb<T>([this](T *item) { thread_state::get()->deque_.push_tail_cb<T>([this](T *item) {
// Assign forced values (for stack and parent management) // Assign forced values (for stack and parent management)
item->parent_ = this; item->parent_ = this;
item->finished_construction_ = true;
item->deque_state_ = thread_state::get()->deque_.save_state(); item->deque_state_ = thread_state::get()->deque_.save_state();
}, std::forward<ARGS>(args)...); }, std::forward<ARGS>(args)...);
} }
template<typename T, typename ...ARGS> template<typename T, typename ...ARGS>
void task::spawn_child_and_wait(ARGS &&... args) { void task::spawn_child_and_wait(ARGS &&... args) {
PROFILE_FORK_JOIN_STEALING("spawn_child_wait")
static_assert(std::is_base_of<task, typename std::remove_reference<T>::type>::value, "Only pass task subclasses!"); static_assert(std::is_base_of<task, typename std::remove_reference<T>::type>::value, "Only pass task subclasses!");
// Assign forced values (for stack and parent management) // TODO: See if we can inline this (avoid counters/deque) while maintaining memory management
// TODO: Move this after construction spawn_child<T>(std::forward<ARGS>(args)...);
T sub_task{std::forward<ARGS>(args)...};
sub_task.parent_ = nullptr;
sub_task.deque_state_ = thread_state::get()->deque_.save_state();
PROFILE_END_BLOCK
sub_task.execute();
wait_for_all(); wait_for_all();
} }
......
...@@ -71,7 +71,7 @@ void scheduler::worker_routine() { ...@@ -71,7 +71,7 @@ void scheduler::worker_routine() {
} }
} }
void scheduler::terminate(bool wait_for_workers) { void scheduler::terminate() {
if (terminated_) { if (terminated_) {
return; return;
} }
...@@ -79,14 +79,13 @@ void scheduler::terminate(bool wait_for_workers) { ...@@ -79,14 +79,13 @@ void scheduler::terminate(bool wait_for_workers) {
terminated_ = true; terminated_ = true;
sync_barrier_.wait(); sync_barrier_.wait();
if (wait_for_workers) {
for (unsigned int i = 0; i < num_threads_; i++) { for (unsigned int i = 0; i < num_threads_; i++) {
if (reuse_thread_ && i == 0) { if (reuse_thread_ && i == 0) {
continue; continue;
} }
memory_->thread_for(i)->join(); memory_->thread_for(i)->join();
} }
}
} }
task *scheduler::get_local_task() { task *scheduler::get_local_task() {
......
...@@ -9,10 +9,18 @@ namespace internal { ...@@ -9,10 +9,18 @@ namespace internal {
namespace scheduling { namespace scheduling {
task::task() : task::task() :
finished_construction_{false},
ref_count_{0}, ref_count_{0},
parent_{nullptr}, parent_{nullptr},
deque_state_{0} {} deque_state_{0} {}
void *task::allocate_memory(long size) {
if (finished_construction_) {
PLS_ERROR("Must not allocate dynamic task memory after it's construction.")
}
return thread_state::get()->task_stack_->push_bytes(size);
}
void task::execute() { void task::execute() {
PROFILE_WORK_BLOCK("execute task") PROFILE_WORK_BLOCK("execute task")
auto last_executing = thread_state::get()->current_task_; auto last_executing = thread_state::get()->current_task_;
......
...@@ -7,7 +7,7 @@ using namespace pls; ...@@ -7,7 +7,7 @@ using namespace pls;
TEST_CASE("for_each functions correctly", "[algorithms/for_each.h]") { TEST_CASE("for_each functions correctly", "[algorithms/for_each.h]") {
malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; malloc_scheduler_memory my_scheduler_memory{8, 2 << 12};
scheduler my_scheduler{&my_scheduler_memory, 2}; scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([]() { my_scheduler.perform_work([]() {
constexpr int SIZE = 1000; constexpr int SIZE = 1000;
std::array<int, SIZE> result_array{}; std::array<int, SIZE> result_array{};
...@@ -46,9 +46,9 @@ TEST_CASE("for_each functions correctly", "[algorithms/for_each.h]") { ...@@ -46,9 +46,9 @@ TEST_CASE("for_each functions correctly", "[algorithms/for_each.h]") {
TEST_CASE("scan functions correctly", "[algorithms/scan.h]") { TEST_CASE("scan functions correctly", "[algorithms/scan.h]") {
malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; malloc_scheduler_memory my_scheduler_memory{8, 2 << 12};
scheduler my_scheduler{&my_scheduler_memory, 2}; scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([]() { my_scheduler.perform_work([]() {
constexpr int SIZE = 1000; constexpr int SIZE = 10000;
std::array<int, SIZE> input_array{}, result_array{}; std::array<int, SIZE> input_array{}, result_array{};
input_array.fill(1); input_array.fill(1);
...@@ -78,11 +78,11 @@ long fib(long n) { ...@@ -78,11 +78,11 @@ long fib(long n) {
} }
TEST_CASE("invoke functions correctly", "[algorithms/invoke.h]") { TEST_CASE("invoke functions correctly", "[algorithms/invoke.h]") {
constexpr long fib_40 = 102334155; constexpr long fib_30 = 832040;
malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; malloc_scheduler_memory my_scheduler_memory{8, 2u << 14};
scheduler my_scheduler{&my_scheduler_memory, 2}; scheduler my_scheduler{&my_scheduler_memory, 8};
my_scheduler.perform_work([=]() { my_scheduler.perform_work([=]() {
REQUIRE(fib(40) == fib_40); REQUIRE(fib(30) == fib_30);
}); });
} }
...@@ -60,7 +60,6 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta ...@@ -60,7 +60,6 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta
}); });
REQUIRE(counter.load() == total_tasks); REQUIRE(counter.load() == total_tasks);
my_scheduler.terminate(true);
} }
SECTION("tasks can be stolen") { SECTION("tasks can be stolen") {
...@@ -72,6 +71,5 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta ...@@ -72,6 +71,5 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta
// Required, as child operates on our stack's memory!!! // Required, as child operates on our stack's memory!!!
scheduler::wait_for_all(); scheduler::wait_for_all();
}); });
my_scheduler.terminate(true);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment