From f01c5436c23e7de6f3891d18a2378155813fbdeb Mon Sep 17 00:00:00 2001 From: FritzFlorian Date: Tue, 18 Jun 2019 14:19:51 +0200 Subject: [PATCH] Fix bug in scheduler and tasks. Threads must be joined on scheduler termination and tasks must be pushed onto the stack to allow better memory management. --- app/benchmark_prefix/main.cpp | 24 +++++++++++++++++++++++- lib/pls/include/pls/internal/helpers/mini_benchmark.h | 9 +++++++-- lib/pls/include/pls/internal/scheduling/scheduler.h | 14 +------------- lib/pls/include/pls/internal/scheduling/scheduler_impl.h | 13 ------------- lib/pls/include/pls/internal/scheduling/task.h | 28 +++++++++++++++++++--------- lib/pls/src/internal/scheduling/scheduler.cpp | 13 ++++++------- lib/pls/src/internal/scheduling/task.cpp | 8 ++++++++ test/algorithm_test.cpp | 14 +++++++------- test/scheduling_tests.cpp | 2 -- 9 files changed, 71 insertions(+), 54 deletions(-) diff --git a/app/benchmark_prefix/main.cpp b/app/benchmark_prefix/main.cpp index 4667c1d..d7e3bdb 100644 --- a/app/benchmark_prefix/main.cpp +++ b/app/benchmark_prefix/main.cpp @@ -6,7 +6,7 @@ #include #include -static constexpr int INPUT_SIZE = 1000000; +static constexpr int INPUT_SIZE = 100; int main() { PROFILE_ENABLE @@ -23,3 +23,25 @@ int main() { PROFILE_SAVE("test_profile.prof") } + +//int main() { +// PROFILE_ENABLE +// pls::malloc_scheduler_memory my_scheduler_memory{8, 2u << 18}; +// pls::scheduler scheduler{&my_scheduler_memory, 8}; +// +// std::vector vec(INPUT_SIZE, 1); +// std::vector out(INPUT_SIZE); +// +// for (int i = 0; i < INPUT_SIZE; i++) { +// vec[i] = 1; +// } +// +// scheduler.perform_work([&] { +// PROFILE_MAIN_THREAD +// for (int i = 0; i < 100; i++) { +// pls::scan(vec.begin(), vec.end(), out.begin(), std::plus(), 0.0); +// } +// }); +// +// PROFILE_SAVE("test_profile.prof") +//} diff --git a/lib/pls/include/pls/internal/helpers/mini_benchmark.h b/lib/pls/include/pls/internal/helpers/mini_benchmark.h index 59409cb..5a5ecc9 100644 --- a/lib/pls/include/pls/internal/helpers/mini_benchmark.h +++ b/lib/pls/include/pls/internal/helpers/mini_benchmark.h @@ -24,6 +24,7 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon chrono::high_resolution_clock::time_point start_time; chrono::high_resolution_clock::time_point end_time; + long max_local_time = 0; unsigned long iterations = 0; local_scheduler.perform_work([&] { start_time = chrono::high_resolution_clock::now(); @@ -31,7 +32,11 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon chrono::high_resolution_clock::time_point planned_end_time = start_time + chrono::milliseconds(max_runtime_ms); while (end_time < planned_end_time) { + auto local_start_time = chrono::high_resolution_clock::now(); lambda(); + auto local_end_time = chrono::high_resolution_clock::now(); + long local_time = chrono::duration_cast(local_end_time - local_start_time).count(); + max_local_time = std::max(local_time, max_local_time); end_time = chrono::high_resolution_clock::now(); iterations++; } @@ -40,9 +45,9 @@ void run_mini_benchmark(const Function &lambda, size_t max_threads, unsigned lon long time = chrono::duration_cast(end_time - start_time).count(); double time_per_iteration = (double) time / iterations; - std::cout << time_per_iteration; + std::cout << (long) time_per_iteration << " (" << max_local_time << ")"; if (num_threads < max_threads) { - std::cout << ","; + std::cout << "\t\t"; } } std::cout << std::endl; diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 4b144c3..eb4dae9 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -70,10 +70,8 @@ class scheduler { /** * Explicitly terminate the worker threads. Scheduler must not be used after this. - * - * @param wait_for_workers Set to true if you wish to return from this method only after the workers are shut down. */ - void terminate(bool wait_for_workers = true); + void terminate(); /** * Helper to spawn a child on the currently running task. @@ -96,16 +94,6 @@ class scheduler { static void spawn_child_and_wait(ARGS &&... args); /** - * Allocates some memory on the task-stack. - * It's usage is restricted to the function scope, as this enforces correct memory management. - * - * @param bytes Number of bytes to allocate - * @param function The function in which you can access the allocated memory - */ - template - static void allocate_on_stack(size_t bytes, Function function); - - /** * Helper to wait for all children of the currently executing task. */ static void wait_for_all(); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 2bf15b9..1eb8c95 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -50,19 +50,6 @@ void scheduler::spawn_child_and_wait(ARGS &&... args) { thread_state::get()->current_task_->spawn_child_and_wait(std::forward(args)...); } -// TODO: Make this 'more pretty' with type-safety etc. -template -void scheduler::allocate_on_stack(size_t bytes, Function function) { - auto my_state = thread_state::get(); - - void *allocated_memory = my_state->task_stack_->push_bytes(bytes); - - auto old_deque_state = my_state->current_task_->deque_state_; - my_state->current_task_->deque_state_ = my_state->task_stack_->save_state(); - function(allocated_memory); - my_state->current_task_->deque_state_ = old_deque_state; -} - } } } diff --git a/lib/pls/include/pls/internal/scheduling/task.h b/lib/pls/include/pls/internal/scheduling/task.h index a589db0..3801d6f 100644 --- a/lib/pls/include/pls/internal/scheduling/task.h +++ b/lib/pls/include/pls/internal/scheduling/task.h @@ -16,6 +16,9 @@ namespace scheduling { class task { friend class scheduler; + // Memory-Management (allow to allocate memory blocks in constructor) + bool finished_construction_; + // Coordinate finishing of sub_tasks std::atomic ref_count_; task *parent_; @@ -24,9 +27,22 @@ class task { data_structures::deque::state deque_state_; protected: + /* + * Must call the parent constructor. + */ explicit task(); /** + * Allow to allocate extra memory during run-time for this task. + * Memory will be pushed onto the stack (in aligned memory, thus avoid many small chunks). + * MUST be called in constructor, never afterwards. + * + * @param size Number of bytes to be allocated + * @return The allocated memory region + */ + void *allocate_memory(long size); + + /** * Overwrite this with the actual behaviour of concrete tasks. */ virtual void execute_internal() = 0; @@ -53,23 +69,17 @@ void task::spawn_child(ARGS &&... args) { thread_state::get()->deque_.push_tail_cb([this](T *item) { // Assign forced values (for stack and parent management) item->parent_ = this; + item->finished_construction_ = true; item->deque_state_ = thread_state::get()->deque_.save_state(); }, std::forward(args)...); } template void task::spawn_child_and_wait(ARGS &&... args) { - PROFILE_FORK_JOIN_STEALING("spawn_child_wait") static_assert(std::is_base_of::type>::value, "Only pass task subclasses!"); - // Assign forced values (for stack and parent management) - // TODO: Move this after construction - T sub_task{std::forward(args)...}; - sub_task.parent_ = nullptr; - sub_task.deque_state_ = thread_state::get()->deque_.save_state(); - PROFILE_END_BLOCK - sub_task.execute(); - + // TODO: See if we can inline this (avoid counters/deque) while maintaining memory management + spawn_child(std::forward(args)...); wait_for_all(); } diff --git a/lib/pls/src/internal/scheduling/scheduler.cpp b/lib/pls/src/internal/scheduling/scheduler.cpp index 8adf8ea..9c6d2ce 100644 --- a/lib/pls/src/internal/scheduling/scheduler.cpp +++ b/lib/pls/src/internal/scheduling/scheduler.cpp @@ -71,7 +71,7 @@ void scheduler::worker_routine() { } } -void scheduler::terminate(bool wait_for_workers) { +void scheduler::terminate() { if (terminated_) { return; } @@ -79,13 +79,12 @@ void scheduler::terminate(bool wait_for_workers) { terminated_ = true; sync_barrier_.wait(); - if (wait_for_workers) { - for (unsigned int i = 0; i < num_threads_; i++) { - if (reuse_thread_ && i == 0) { - continue; - } - memory_->thread_for(i)->join(); + + for (unsigned int i = 0; i < num_threads_; i++) { + if (reuse_thread_ && i == 0) { + continue; } + memory_->thread_for(i)->join(); } } diff --git a/lib/pls/src/internal/scheduling/task.cpp b/lib/pls/src/internal/scheduling/task.cpp index 8b20667..b682203 100644 --- a/lib/pls/src/internal/scheduling/task.cpp +++ b/lib/pls/src/internal/scheduling/task.cpp @@ -9,10 +9,18 @@ namespace internal { namespace scheduling { task::task() : + finished_construction_{false}, ref_count_{0}, parent_{nullptr}, deque_state_{0} {} +void *task::allocate_memory(long size) { + if (finished_construction_) { + PLS_ERROR("Must not allocate dynamic task memory after it's construction.") + } + return thread_state::get()->task_stack_->push_bytes(size); +} + void task::execute() { PROFILE_WORK_BLOCK("execute task") auto last_executing = thread_state::get()->current_task_; diff --git a/test/algorithm_test.cpp b/test/algorithm_test.cpp index 498e169..2909a37 100644 --- a/test/algorithm_test.cpp +++ b/test/algorithm_test.cpp @@ -7,7 +7,7 @@ using namespace pls; TEST_CASE("for_each functions correctly", "[algorithms/for_each.h]") { malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; - scheduler my_scheduler{&my_scheduler_memory, 2}; + scheduler my_scheduler{&my_scheduler_memory, 8}; my_scheduler.perform_work([]() { constexpr int SIZE = 1000; std::array result_array{}; @@ -46,9 +46,9 @@ TEST_CASE("for_each functions correctly", "[algorithms/for_each.h]") { TEST_CASE("scan functions correctly", "[algorithms/scan.h]") { malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; - scheduler my_scheduler{&my_scheduler_memory, 2}; + scheduler my_scheduler{&my_scheduler_memory, 8}; my_scheduler.perform_work([]() { - constexpr int SIZE = 1000; + constexpr int SIZE = 10000; std::array input_array{}, result_array{}; input_array.fill(1); @@ -78,11 +78,11 @@ long fib(long n) { } TEST_CASE("invoke functions correctly", "[algorithms/invoke.h]") { - constexpr long fib_40 = 102334155; + constexpr long fib_30 = 832040; - malloc_scheduler_memory my_scheduler_memory{8, 2 << 12}; - scheduler my_scheduler{&my_scheduler_memory, 2}; + malloc_scheduler_memory my_scheduler_memory{8, 2u << 14}; + scheduler my_scheduler{&my_scheduler_memory, 8}; my_scheduler.perform_work([=]() { - REQUIRE(fib(40) == fib_40); + REQUIRE(fib(30) == fib_30); }); } diff --git a/test/scheduling_tests.cpp b/test/scheduling_tests.cpp index 481a75e..1334d6e 100644 --- a/test/scheduling_tests.cpp +++ b/test/scheduling_tests.cpp @@ -60,7 +60,6 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta }); REQUIRE(counter.load() == total_tasks); - my_scheduler.terminate(true); } SECTION("tasks can be stolen") { @@ -72,6 +71,5 @@ TEST_CASE("tbb task are scheduled correctly", "[internal/scheduling/fork_join_ta // Required, as child operates on our stack's memory!!! scheduler::wait_for_all(); }); - my_scheduler.terminate(true); } } -- libgit2 0.26.0