Commit 88dd5384 by FritzFlorian

Clean-Up parallel_scan algorithm.

parent fa7c5120
......@@ -6,7 +6,7 @@ namespace pls {
namespace algorithm {
template<typename InIter, typename OutIter, typename BinaryOp, typename Type>
void parallel_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element);
void parallel_scan(InIter in_start, const InIter in_end, OutIter out, BinaryOp op, Type neutral_elem);
}
}
......
......@@ -24,55 +24,53 @@ void serial_scan(InIter input_start, const InIter input_end, OutIter output, Bin
current_output++;
}
}
}
template<typename InIter, typename OutIter, typename BinaryOp, typename Type>
void parallel_scan(InIter input_start, const InIter input_end, OutIter output, BinaryOp op, Type neutral_element) {
void parallel_scan(InIter in_start, const InIter in_end, OutIter out, BinaryOp op, Type neutral_elem) {
constexpr auto chunks_per_thread = 4;
using namespace pls::internal::scheduling;
auto size = std::distance(input_start, input_end);
auto my_state = thread_state::get();
// TODO: This must be dynamic to make sense, as it has a far bigger influence than any other cutoff.
// The current strategy is static partitioning, and suboptimal in inballanced workloads.
auto size = std::distance(in_start, in_end);
auto num_threads = thread_state::get()->scheduler_->num_threads();
auto chunks = num_threads * chunks_per_thread;
auto items_per_chunk = std::max(1l, size / chunks);
// First Pass -> each chunk isolated
// TODO: Put this onto the stack, not into heap allocated memory
Type *tmp = new Type[chunks]; //reinterpret_cast<Type *>(my_state->task_stack_->push_bytes(sizeof(Type) * (chunks)));
parallel_for(0, chunks,
[tmp, input_start, input_end, output, items_per_chunk, &op, &neutral_element](int i) {
auto local_start = input_start + items_per_chunk * i;
auto local_end = local_start + items_per_chunk;
if (local_end > input_end) {
local_end = input_end;
}
// TODO: This MUST be dynamic to make sense, as it has a far bigger influence than any other cutoff
//
internal::serial_scan(local_start, local_end, output + items_per_chunk * i, op, neutral_element);
tmp[i] = *(output + std::distance(local_start, local_end) - 1);
});
internal::serial_scan(tmp, tmp + chunks, tmp, std::plus<int>(), 0);
// Sum up pivot elements...
auto output_start = output;
auto output_end = output + size;
parallel_for(1, chunks,
[tmp, output_start, output_end, items_per_chunk, &op, &neutral_element](int i) {
auto local_start = output_start + items_per_chunk * i;
auto local_end = local_start + items_per_chunk;
if (local_end > output_end) {
local_end = output_end;
}
for (; local_start != local_end; local_start++) {
*local_start = op(*local_start, tmp[i - 1]);
}
});
// scheduler::wait_for_all();
delete[] tmp;
scheduler::allocate_on_stack(sizeof(Type) * (chunks), [&](void *memory) {
Type *chunk_sums = reinterpret_cast<Type *>(memory);
// First Pass = calculate each chunks individual prefix sum
parallel_for(0, chunks, [&](int i) {
auto chunk_start = in_start + items_per_chunk * i;
auto chunk_end = std::min(in_end, chunk_start + items_per_chunk);
auto chunk_output = out + items_per_chunk * i;
internal::serial_scan(chunk_start, chunk_end, chunk_output, op, neutral_elem);
chunk_sums[i] = *(out + std::distance(chunk_start, chunk_end) - 1);
});
// Calculate prefix sums of each chunks sum
// (effectively the prefix sum at the end of each chunk, then used to correct the following chunk).
internal::serial_scan(chunk_sums, chunk_sums + chunks, chunk_sums, std::plus<int>(), 0);
// Second Pass = Use results from first pass to correct each chunks sum
auto output_start = out;
auto output_end = out + size;
parallel_for(1, chunks, [&](int i) {
auto chunk_start = output_start + items_per_chunk * i;
auto chunk_end = std::min(output_end, chunk_start + items_per_chunk);
for (; chunk_start != chunk_end; chunk_start++) {
*chunk_start = op(*chunk_start, chunk_sums[i - 1]);
}
});
});
// End this work section by cleaning up stack and tasks
scheduler::wait_for_all();
}
}
......
......@@ -23,8 +23,8 @@ using base::system_details::pointer_t;
*
* Usage:
* aligned_stack stack{pointer_to_memory, size_of_memory};
* T* pointer = stack.push(some_object); // Copy-Constrict the object on top of stack
* stack.pop<T>(); // Deconstruct the top object of type T
* T* pointer = stack.push<T>(constructor_arguments); // Perfect-Forward-Construct the object on top of stack
* stack.pop<T>(); // Remove the top object of type T
*/
class aligned_stack {
// Keep bounds of our memory block
......
......@@ -96,6 +96,16 @@ class scheduler {
static void spawn_child_and_wait(ARGS &&... args);
/**
* Allocates some memory on the task-stack.
* It's usage is restricted to the function scope, as this enforces correct memory management.
*
* @param bytes Number of bytes to allocate
* @param function The function in which you can access the allocated memory
*/
template<typename Function>
static void allocate_on_stack(size_t bytes, Function function);
/**
* Helper to wait for all children of the currently executing task.
*/
static void wait_for_all();
......
......@@ -50,6 +50,19 @@ void scheduler::spawn_child_and_wait(ARGS &&... args) {
thread_state::get()->current_task_->spawn_child_and_wait<T>(std::forward<ARGS>(args)...);
}
// TODO: Make this 'more pretty' with type-safety etc.
template<typename Function>
void scheduler::allocate_on_stack(size_t bytes, Function function) {
auto my_state = thread_state::get();
void *allocated_memory = my_state->task_stack_->push_bytes(bytes);
auto old_deque_state = my_state->current_task_->deque_state_;
my_state->current_task_->deque_state_ = my_state->task_stack_->save_state();
function(allocated_memory);
my_state->current_task_->deque_state_ = old_deque_state;
}
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment