diff --git a/lib/pls/include/pls/algorithms/parallel_scan.h b/lib/pls/include/pls/algorithms/parallel_scan.h index fd1a621..9149371 100644 --- a/lib/pls/include/pls/algorithms/parallel_scan.h +++ b/lib/pls/include/pls/algorithms/parallel_scan.h @@ -6,7 +6,7 @@ namespace pls { namespace algorithm { template -void parallel_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element); +void parallel_scan(InIter in_start, const InIter in_end, OutIter out, BinaryOp op, Type neutral_elem); } } diff --git a/lib/pls/include/pls/algorithms/parallel_scan_impl.h b/lib/pls/include/pls/algorithms/parallel_scan_impl.h index 1de1840..3afd8bf 100644 --- a/lib/pls/include/pls/algorithms/parallel_scan_impl.h +++ b/lib/pls/include/pls/algorithms/parallel_scan_impl.h @@ -24,55 +24,53 @@ void serial_scan(InIter input_start, const InIter input_end, OutIter output, Bin current_output++; } } + } template -void parallel_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element) { +void parallel_scan(InIter in_start, const InIter in_end, OutIter out, BinaryOp op, Type neutral_elem) { constexpr auto chunks_per_thread = 4; using namespace pls::internal::scheduling; - auto size = std::distance(input_start, input_end); - auto my_state = thread_state::get(); + // TODO: This must be dynamic to make sense, as it has a far bigger influence than any other cutoff. + // The current strategy is static partitioning, and suboptimal in inballanced workloads. + auto size = std::distance(in_start, in_end); auto num_threads = thread_state::get()->scheduler_->num_threads(); auto chunks = num_threads * chunks_per_thread; auto items_per_chunk = std::max(1l, size / chunks); - // First Pass -> each chunk isolated - // TODO: Put this onto the stack, not into heap allocated memory - Type *tmp = new Type[chunks]; //reinterpret_cast(my_state->task_stack_->push_bytes(sizeof(Type) * (chunks))); - parallel_for(0, chunks, - [tmp, input_start, input_end, output, items_per_chunk, &op, &neutral_element](int i) { - auto local_start = input_start + items_per_chunk * i; - auto local_end = local_start + items_per_chunk; - if (local_end > input_end) { - local_end = input_end; - } - - // TODO: This MUST be dynamic to make sense, as it has a far bigger influence than any other cutoff - // - internal::serial_scan(local_start, local_end, output + items_per_chunk * i, op, neutral_element); - tmp[i] = *(output + std::distance(local_start, local_end) - 1); - }); - internal::serial_scan(tmp, tmp + chunks, tmp, std::plus(), 0); - - // Sum up pivot elements... - auto output_start = output; - auto output_end = output + size; - parallel_for(1, chunks, - [tmp, output_start, output_end, items_per_chunk, &op, &neutral_element](int i) { - auto local_start = output_start + items_per_chunk * i; - auto local_end = local_start + items_per_chunk; - if (local_end > output_end) { - local_end = output_end; - } - - for (; local_start != local_end; local_start++) { - *local_start = op(*local_start, tmp[i - 1]); - } - }); - -// scheduler::wait_for_all(); - delete[] tmp; + scheduler::allocate_on_stack(sizeof(Type) * (chunks), [&](void *memory) { + Type *chunk_sums = reinterpret_cast(memory); + + // First Pass = calculate each chunks individual prefix sum + parallel_for(0, chunks, [&](int i) { + auto chunk_start = in_start + items_per_chunk * i; + auto chunk_end = std::min(in_end, chunk_start + items_per_chunk); + auto chunk_output = out + items_per_chunk * i; + + internal::serial_scan(chunk_start, chunk_end, chunk_output, op, neutral_elem); + chunk_sums[i] = *(out + std::distance(chunk_start, chunk_end) - 1); + }); + + // Calculate prefix sums of each chunks sum + // (effectively the prefix sum at the end of each chunk, then used to correct the following chunk). + internal::serial_scan(chunk_sums, chunk_sums + chunks, chunk_sums, std::plus(), 0); + + // Second Pass = Use results from first pass to correct each chunks sum + auto output_start = out; + auto output_end = out + size; + parallel_for(1, chunks, [&](int i) { + auto chunk_start = output_start + items_per_chunk * i; + auto chunk_end = std::min(output_end, chunk_start + items_per_chunk); + + for (; chunk_start != chunk_end; chunk_start++) { + *chunk_start = op(*chunk_start, chunk_sums[i - 1]); + } + }); + }); + + // End this work section by cleaning up stack and tasks + scheduler::wait_for_all(); } } diff --git a/lib/pls/include/pls/internal/data_structures/aligned_stack.h b/lib/pls/include/pls/internal/data_structures/aligned_stack.h index bd49830..133d02d 100644 --- a/lib/pls/include/pls/internal/data_structures/aligned_stack.h +++ b/lib/pls/include/pls/internal/data_structures/aligned_stack.h @@ -23,8 +23,8 @@ using base::system_details::pointer_t; * * Usage: * aligned_stack stack{pointer_to_memory, size_of_memory}; - * T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack - * stack.pop(); // Deconstruct the top object of type T + * T* pointer = stack.push(constructor_arguments); // Perfect-Forward-Construct the object on top of stack + * stack.pop(); // Remove the top object of type T */ class aligned_stack { // Keep bounds of our memory block diff --git a/lib/pls/include/pls/internal/scheduling/scheduler.h b/lib/pls/include/pls/internal/scheduling/scheduler.h index 242b912..4b144c3 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler.h @@ -96,6 +96,16 @@ class scheduler { static void spawn_child_and_wait(ARGS &&... args); /** + * Allocates some memory on the task-stack. + * It's usage is restricted to the function scope, as this enforces correct memory management. + * + * @param bytes Number of bytes to allocate + * @param function The function in which you can access the allocated memory + */ + template + static void allocate_on_stack(size_t bytes, Function function); + + /** * Helper to wait for all children of the currently executing task. */ static void wait_for_all(); diff --git a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h index 1eb8c95..2bf15b9 100644 --- a/lib/pls/include/pls/internal/scheduling/scheduler_impl.h +++ b/lib/pls/include/pls/internal/scheduling/scheduler_impl.h @@ -50,6 +50,19 @@ void scheduler::spawn_child_and_wait(ARGS &&... args) { thread_state::get()->current_task_->spawn_child_and_wait(std::forward(args)...); } +// TODO: Make this 'more pretty' with type-safety etc. +template +void scheduler::allocate_on_stack(size_t bytes, Function function) { + auto my_state = thread_state::get(); + + void *allocated_memory = my_state->task_stack_->push_bytes(bytes); + + auto old_deque_state = my_state->current_task_->deque_state_; + my_state->current_task_->deque_state_ = my_state->task_stack_->save_state(); + function(allocated_memory); + my_state->current_task_->deque_state_ = old_deque_state; +} + } } }