Commit 46ac2bd4 by Bernhard Gatzhammer

Merge branch 'development' of https://github.com/siemens/embb into embb349_terms_callable

parents 0682909e e9ad6754
......@@ -37,42 +37,45 @@ namespace internal {
template<typename ValueType>
class ValueComparisonFunction{
public:
explicit ValueComparisonFunction(const ValueType &value)
:value_(value) {}
ValueComparisonFunction(const ValueComparisonFunction &other)
:value_(other.value_) {}
explicit ValueComparisonFunction(const ValueType& value)
: value_(value) {}
ValueComparisonFunction(const ValueComparisonFunction& other)
: value_(other.value_) {}
template<typename ElementType>
int operator()(ElementType element) {
if(element == value_)
if (element == value_) {
return 1;
else
} else {
return 0;
}
}
private:
const ValueType &value_;
ValueComparisonFunction &operator=(const ValueComparisonFunction &other);
ValueComparisonFunction &operator=(
const ValueComparisonFunction& other);
};
template<typename Function>
class FunctionComparisonFunction{
public:
explicit FunctionComparisonFunction(Function function)
:function_(function) {}
: function_(function) {}
FunctionComparisonFunction(const FunctionComparisonFunction &other)
:function_(other.function_) {}
: function_(other.function_) {}
template<typename ElementType>
int operator()(ElementType element) {
if(function_(element))
if (function_(element)) {
return 1;
else
} else {
return 0;
}
}
private:
Function function_;
FunctionComparisonFunction &operator=(const FunctionComparisonFunction &
other);
FunctionComparisonFunction &operator=(
const FunctionComparisonFunction& other);
};
} // namespace internal
......
......@@ -45,44 +45,54 @@ class ForEachFunctor {
/**
* Constructs a for-each functor with arguments.
*/
ForEachFunctor(RAI first, RAI last, Function unary,
const embb::mtapi::ExecutionPolicy& policy, size_t block_size)
: first_(first), last_(last), unary_(unary), policy_(policy),
block_size_(block_size) {
ForEachFunctor(size_t chunk_first, size_t chunk_last, Function unary,
const embb::mtapi::ExecutionPolicy& policy,
const BlockSizePartitioner<RAI>& partitioner)
: chunk_first_(chunk_first), chunk_last_(chunk_last),
unary_(unary), policy_(policy), partitioner_(partitioner) {
}
void Action(mtapi::TaskContext&) {
size_t distance = static_cast<size_t>(std::distance(first_, last_));
if (distance == 0) return;
if (distance <= block_size_) { // leaf case -> do work
for (RAI curIter(first_); curIter != last_; ++curIter) {
unary_(*curIter);
if (chunk_first_ == chunk_last_) {
// Leaf case, recursed to single chunk. Do work on chunk:
ChunkDescriptor<RAI> chunk = partitioner_[chunk_first_];
RAI first = chunk.GetFirst();
RAI last = chunk.GetLast();
for (RAI it = first; it != last; ++it) {
unary_(*it);
}
} else { // recurse further
ChunkPartitioner<RAI> partitioner(first_, last_, 2);
ForEachFunctor<RAI, Function> functorL(partitioner[0].GetFirst(),
partitioner[0].GetLast(), unary_, policy_, block_size_);
ForEachFunctor<RAI, Function> functorR(partitioner[1].GetFirst(),
partitioner[1].GetLast(), unary_, policy_, block_size_);
mtapi::Node& node = mtapi::Node::GetInstance();
mtapi::Task taskL = node.Spawn(mtapi::Action(base::MakeFunction(
functorL, &ForEachFunctor<RAI, Function>::Action),
} else {
// Recurse further:
size_t chunk_split_index = (chunk_first_ + chunk_last_) / 2;
// Split chunks into left / right branches:
self_t functor_l(chunk_first_,
chunk_split_index,
unary_, policy_, partitioner_);
self_t functor_r(chunk_split_index + 1,
chunk_last_,
unary_, policy_, partitioner_);
mtapi::Task task_l = mtapi::Node::GetInstance().Spawn(
mtapi::Action(
base::MakeFunction(functor_l, &self_t::Action),
policy_));
mtapi::Task taskR = node.Spawn(mtapi::Action(base::MakeFunction(
functorR, &ForEachFunctor<RAI, Function>::Action),
mtapi::Task task_r = mtapi::Node::GetInstance().Spawn(
mtapi::Action(
base::MakeFunction(functor_r, &self_t::Action),
policy_));
taskL.Wait(MTAPI_INFINITE);
taskR.Wait(MTAPI_INFINITE);
task_l.Wait(MTAPI_INFINITE);
task_r.Wait(MTAPI_INFINITE);
}
}
private:
RAI first_;
RAI last_;
typedef ForEachFunctor<RAI, Function> self_t;
private:
size_t chunk_first_;
size_t chunk_last_;
Function unary_;
const embb::mtapi::ExecutionPolicy& policy_;
size_t block_size_;
const BlockSizePartitioner<RAI>& partitioner_;
/**
* Disables assignment.
......@@ -95,21 +105,31 @@ void ForEachRecursive(RAI first, RAI last, Function unary,
const embb::mtapi::ExecutionPolicy& policy, size_t block_size) {
typedef typename std::iterator_traits<RAI>::difference_type difference_type;
difference_type distance = std::distance(first, last);
assert(distance > 0);
if (distance == 0) {
return;
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
EMBB_THROW(embb::base::ErrorException, "No cores in execution policy");
}
mtapi::Node& node = mtapi::Node::GetInstance();
// Determine actually used block size
if (block_size == 0) {
block_size = (static_cast<size_t>(distance) / node.GetCoreCount());
block_size = (static_cast<size_t>(distance) / num_cores);
if (block_size == 0) {
block_size = 1;
}
}
// Perform check of task number sufficiency
// Check task number sufficiency
if (((distance / block_size) * 2) + 1 > MTAPI_NODE_MAX_TASKS_DEFAULT) {
EMBB_THROW(embb::base::ErrorException, "Not enough MTAPI tasks available "
"to perform the parallel foreach loop");
EMBB_THROW(embb::base::ErrorException,
"Not enough MTAPI tasks available for parallel foreach");
}
ForEachFunctor<RAI, Function> functor(first, last, unary, policy, block_size);
BlockSizePartitioner<RAI> partitioner(first, last, block_size);
ForEachFunctor<RAI, Function> functor(0,
partitioner.Size() - 1,
unary, policy, partitioner);
mtapi::Task task = node.Spawn(mtapi::Action(
base::MakeFunction(functor,
&ForEachFunctor<RAI, Function>::Action),
......@@ -127,7 +147,7 @@ void ForEachIteratorCheck(RAI first, RAI last, Function unary,
} // namespace internal
template<typename RAI, typename Function>
void ForEach(RAI first, RAI last, Function unary,
void ForEach(RAI first, const RAI last, Function unary,
const embb::mtapi::ExecutionPolicy& policy, size_t block_size) {
typename std::iterator_traits<RAI>::iterator_category category;
internal::ForEachIteratorCheck(first, last, unary, policy, block_size,
......
......@@ -48,58 +48,112 @@ class MergeSortFunctor {
public:
typedef typename std::iterator_traits<RAI>::value_type value_type;
MergeSortFunctor(RAI first, RAI last, RAITemp temporary_first,
ComparisonFunction comparison, const embb::mtapi::ExecutionPolicy& policy,
size_t block_size, const RAI& global_first, int depth)
: first_(first), last_(last), temp_first_(temporary_first),
comparison_(comparison), policy_(policy), block_size_(block_size),
global_first_(global_first), depth_(depth) {
MergeSortFunctor(size_t chunk_first, size_t chunk_last,
RAITemp temporary_first, ComparisonFunction comparison,
const embb::mtapi::ExecutionPolicy& policy,
const BlockSizePartitioner<RAI>& partitioner,
const RAI& global_first, int depth)
: chunk_first_(chunk_first), chunk_last_(chunk_last),
temp_first_(temporary_first),
comparison_(comparison), policy_(policy), partitioner_(partitioner),
global_first_(global_first), depth_(depth) {
}
void Action(mtapi::TaskContext& context) {
typedef typename std::iterator_traits<RAI>::difference_type difference_type;
size_t distance = static_cast<size_t>(std::distance(first_, last_));
if (distance <= 1) {
if(!CloneBackToInput() && distance != 0) {
RAITemp temp_first = temp_first_;
temp_first += std::distance(global_first_, first_);
*temp_first = *first_;
}
return;
}
internal::ChunkPartitioner<RAI> partitioner(first_, last_, 2);
MergeSortFunctor<RAI, RAITemp, ComparisonFunction> functorL(
partitioner[0].GetFirst(), partitioner[0].GetLast(), temp_first_,
comparison_, policy_, block_size_, global_first_, depth_ + 1);
MergeSortFunctor<RAI, RAITemp, ComparisonFunction> functorR(
partitioner[1].GetFirst(), partitioner[1].GetLast(), temp_first_,
comparison_, policy_, block_size_, global_first_, depth_ + 1);
if (distance <= block_size_) {
functorL.Action(context);
functorR.Action(context);
void Action(mtapi::TaskContext&) {
size_t chunk_split_index = (chunk_first_ + chunk_last_) / 2;
if (chunk_first_ == chunk_last_) {
// Leaf case: recurse into a single chunk's elements:
ChunkDescriptor<RAI> chunk = partitioner_[chunk_first_];
MergeSortChunk(chunk.GetFirst(), chunk.GetLast(), depth_);
} else {
// Recurse further, split chunks:
self_t functor_l(chunk_first_,
chunk_split_index,
temp_first_,
comparison_, policy_, partitioner_,
global_first_, depth_ + 1);
self_t functor_r(chunk_split_index + 1,
chunk_last_,
temp_first_,
comparison_, policy_, partitioner_,
global_first_, depth_ + 1);
mtapi::Node& node = mtapi::Node::GetInstance();
mtapi::Task taskL = node.Spawn(mtapi::Action(base::MakeFunction(functorL,
&MergeSortFunctor<RAI, RAITemp, ComparisonFunction>::Action),
mtapi::Task task_l = node.Spawn(
mtapi::Action(
base::MakeFunction(functor_l, &self_t::Action),
policy_));
mtapi::Task taskR = node.Spawn(mtapi::Action(base::MakeFunction(functorR,
&MergeSortFunctor<RAI, RAITemp, ComparisonFunction>::Action),
mtapi::Task task_r = node.Spawn(
mtapi::Action(
base::MakeFunction(functor_r, &self_t::Action),
policy_));
taskL.Wait(MTAPI_INFINITE);
taskR.Wait(MTAPI_INFINITE);
task_l.Wait(MTAPI_INFINITE);
task_r.Wait(MTAPI_INFINITE);
ChunkDescriptor<RAI> ck_f = partitioner_[chunk_first_];
ChunkDescriptor<RAI> ck_m = partitioner_[chunk_split_index + 1];
ChunkDescriptor<RAI> ck_l = partitioner_[chunk_last_];
if(CloneBackToInput(depth_)) {
// Merge from temp into input:
difference_type first = std::distance(global_first_, ck_f.GetFirst());
difference_type mid = std::distance(global_first_, ck_m.GetFirst());
difference_type last = std::distance(global_first_, ck_l.GetLast());
SerialMerge(temp_first_ + first, temp_first_ + mid, temp_first_ + last,
ck_f.GetFirst(),
comparison_);
} else {
// Merge from input into temp:
SerialMerge(ck_f.GetFirst(), ck_m.GetFirst(), ck_l.GetLast(),
temp_first_ + std::distance(global_first_, ck_f.GetFirst()),
comparison_);
}
}
}
if(CloneBackToInput()) {
difference_type first = std::distance(global_first_, functorL.first_);
difference_type mid = std::distance(global_first_, functorR.first_);
difference_type last = std::distance(global_first_, functorR.last_);
SerialMerge(temp_first_ + first, temp_first_ + mid,
temp_first_ + last, functorL.first_, comparison_);
/**
* Serial merge sort of elements within a single chunk.
*/
void MergeSortChunk(RAI first,
RAI last,
int depth) {
size_t distance = static_cast<size_t>(
std::distance(first, last));
if (distance <= 1) {
// Leaf case:
if (!CloneBackToInput(depth) && distance != 0) {
RAITemp temp_first = temp_first_;
std::advance(temp_first, std::distance(global_first_, first));
*temp_first = *first;
}
return;
}
// Recurse further. Use binary split, ignoring chunk size as this
// recursion is serial and has leaf size 1:
ChunkPartitioner<RAI> partitioner(first, last, 2);
ChunkDescriptor<RAI> ck_l = partitioner[0];
ChunkDescriptor<RAI> ck_r = partitioner[1];
MergeSortChunk(
ck_l.GetFirst(),
ck_l.GetLast(),
depth + 1);
MergeSortChunk(
ck_r.GetFirst(),
ck_r.GetLast(),
depth + 1);
if (CloneBackToInput(depth)) {
// Merge from temp into input:
difference_type d_first = std::distance(global_first_, ck_l.GetFirst());
difference_type d_mid = std::distance(global_first_, ck_r.GetFirst());
difference_type d_last = std::distance(global_first_, ck_r.GetLast());
SerialMerge(
temp_first_ + d_first, temp_first_ + d_mid, temp_first_ + d_last,
ck_l.GetFirst(),
comparison_);
} else {
SerialMerge(functorL.first_, functorR.first_, functorR.last_,
temp_first_ + std::distance(global_first_, functorL.first_),
comparison_);
// Merge from input into temp:
SerialMerge(
ck_l.GetFirst(), ck_r.GetFirst(), ck_r.GetLast(),
temp_first_ + std::distance(global_first_, ck_l.GetFirst()),
comparison_);
}
}
......@@ -109,17 +163,22 @@ class MergeSortFunctor {
* \return \c true if the temporary data range is input and the array to be
* sorted is output. \c false, if the other way around.
*/
bool CloneBackToInput() {
return depth_ % 2 == 0 ? true : false;
bool CloneBackToInput(int depth) {
return depth % 2 == 0 ? true : false;
}
private:
RAI first_;
RAI last_;
typedef MergeSortFunctor<RAI, RAITemp, ComparisonFunction> self_t;
typedef typename std::iterator_traits<RAI>::difference_type
difference_type;
private:
size_t chunk_first_;
size_t chunk_last_;
RAITemp temp_first_;
ComparisonFunction comparison_;
const embb::mtapi::ExecutionPolicy& policy_;
size_t block_size_;
const BlockSizePartitioner<RAI>& partitioner_;
const RAI& global_first_;
int depth_;
......@@ -166,29 +225,47 @@ void MergeSort(
size_t block_size
) {
typedef typename std::iterator_traits<RAI>::difference_type difference_type;
embb::mtapi::Node &node = embb::mtapi::Node::GetInstance();
difference_type distance = last - first;
assert(distance >= 0);
typedef internal::MergeSortFunctor<RAI, RAITemp, ComparisonFunction>
functor_t;
difference_type distance = std::distance(first, last);
if (distance == 0) {
EMBB_THROW(embb::base::ErrorException, "Distance for ForEach is 0");
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
EMBB_THROW(embb::base::ErrorException, "No cores in execution policy");
}
// Determine actually used block size
if (block_size == 0) {
block_size = (static_cast<size_t>(distance) / node.GetCoreCount());
block_size = (static_cast<size_t>(distance) / num_cores);
if (block_size == 0)
block_size = 1;
}
if (((distance/block_size) * 2) + 1 > MTAPI_NODE_MAX_TASKS_DEFAULT) {
// Check task number sufficiency
if (((distance / block_size) * 2) + 1 > MTAPI_NODE_MAX_TASKS_DEFAULT) {
EMBB_THROW(embb::base::ErrorException,
"Not enough MTAPI tasks available to perform the merge sort");
"Not enough MTAPI tasks available to perform merge sort");
}
internal::MergeSortFunctor<RAI, RAITemp, ComparisonFunction> functor(
first, last, temporary_first, comparison, policy, block_size, first, 0);
mtapi::Task task = node.Spawn(mtapi::Action(base::MakeFunction(functor,
&internal::MergeSortFunctor<RAI, RAITemp, ComparisonFunction>::Action),
policy));
internal::BlockSizePartitioner<RAI> partitioner(first, last, block_size);
functor_t functor(0,
partitioner.Size() - 1,
temporary_first,
comparison,
policy,
partitioner,
first,
0);
mtapi::Task task = embb::mtapi::Node::GetInstance().Spawn(
mtapi::Action(
base::MakeFunction(functor, &functor_t::Action),
policy));
task.Wait(MTAPI_INFINITE);
}
// @NOTE: Why is there no type guard for RAI?
} // namespace algorithms
} // namespace embb
......
......@@ -71,8 +71,8 @@ const ChunkDescriptor<ForwardIterator>
ForwardIterator last_new = first_new;
if (index == elements_count / chunkSize) {
std::advance(last_new, elements_count % chunkSize);
if (index >= chunks - 1) {
last_new = last;
} else {
std::advance(last_new, chunkSize);
}
......@@ -94,7 +94,7 @@ ChunkPartitioner<ForwardIterator>::ChunkPartitioner(ForwardIterator first,
} else {
// if no concrete chunk size was given, use number of cores...
mtapi::Node& node = mtapi::Node::GetInstance();
size = node.GetCoreCount();
size = node.GetWorkerThreadCount();
}
elements_count = static_cast<size_t>(std::distance(first, last));
......
......@@ -192,10 +192,17 @@ template <typename RAI, typename ComparisonFunction>
void QuickSort(RAI first, RAI last, ComparisonFunction comparison,
const embb::mtapi::ExecutionPolicy& policy, size_t block_size) {
embb::mtapi::Node& node = embb::mtapi::Node::GetInstance();
typename std::iterator_traits<RAI>::difference_type distance = last - first;
assert(distance > 0);
typedef typename std::iterator_traits<RAI>::difference_type difference_type;
difference_type distance = std::distance(first, last);
if (distance <= 0) {
return;
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
EMBB_THROW(embb::base::ErrorException, "No cores in execution policy");
}
if (block_size == 0) {
block_size = (static_cast<size_t>(distance) / node.GetCoreCount());
block_size = (static_cast<size_t>(distance) / num_cores);
if (block_size == 0)
block_size = 1;
}
......
......@@ -42,45 +42,55 @@ template<typename RAI, typename ReturnType, typename ReductionFunction,
typename TransformationFunction>
class ReduceFunctor {
public:
ReduceFunctor(RAI first, RAI last, ReturnType neutral,
ReduceFunctor(size_t chunk_first, size_t chunk_last,
ReturnType neutral,
ReductionFunction reduction,
TransformationFunction transformation,
const embb::mtapi::ExecutionPolicy &policy, size_t block_size,
const embb::mtapi::ExecutionPolicy& policy,
const BlockSizePartitioner<RAI>& partitioner,
ReturnType& result)
:
first_(first), last_(last), neutral_(neutral), reduction_(reduction),
transformation_(transformation), policy_(policy),
block_size_(block_size), result_(result) {
: chunk_first_(chunk_first), chunk_last_(chunk_last), neutral_(neutral),
reduction_(reduction), transformation_(transformation), policy_(policy),
partitioner_(partitioner), result_(result) {
}
void Action(mtapi::TaskContext&) {
if (first_ == last_) {
return;
}
size_t distance = static_cast<size_t>(std::distance(first_, last_));
if (distance <= block_size_) { // leaf case -> do work
if (chunk_first_ == chunk_last_) {
// Leaf case, recursed to single chunk. Do work on chunk:
ChunkDescriptor<RAI> chunk = partitioner_[chunk_first_];
RAI first = chunk.GetFirst();
RAI last = chunk.GetLast();
ReturnType result(neutral_);
for (RAI iter = first_; iter != last_; ++iter) {
result = reduction_(result, transformation_(*iter));
for (RAI it = first; it != last; ++it) {
result = reduction_(result, transformation_(*it));
}
result_ = result;
} else { // recurse further
internal::ChunkPartitioner<RAI> partitioner(first_, last_, 2);
} else {
// Recurse further:
size_t chunk_split_index = (chunk_first_ + chunk_last_) / 2;
// Split chunks into left / right branches:
ReturnType result_l(neutral_);
ReturnType result_r(neutral_);
ReduceFunctor functor_l(partitioner[0].GetFirst(),
partitioner[0].GetLast(),
neutral_, reduction_, transformation_, policy_,
block_size_, result_l);
ReduceFunctor functor_r(partitioner[1].GetFirst(),
partitioner[1].GetLast(),
neutral_, reduction_, transformation_, policy_,
block_size_, result_r);
mtapi::Node& node = mtapi::Node::GetInstance();
mtapi::Task task_l = node.Spawn(mtapi::Action(base::MakeFunction(
functor_l, &ReduceFunctor::Action), policy_));
mtapi::Task task_r = node.Spawn(mtapi::Action(base::MakeFunction(
functor_r, &ReduceFunctor::Action), policy_));
self_t functor_l(chunk_first_,
chunk_split_index,
neutral_, reduction_, transformation_, policy_,
partitioner_,
result_l);
self_t functor_r(chunk_split_index + 1,
chunk_last_,
neutral_, reduction_, transformation_, policy_,
partitioner_,
result_r);
mtapi::Task task_l = mtapi::Node::GetInstance().Spawn(
mtapi::Action(
base::MakeFunction(
functor_l, &self_t::Action),
policy_));
mtapi::Task task_r = mtapi::Node::GetInstance().Spawn(
mtapi::Action(
base::MakeFunction(
functor_r, &self_t::Action),
policy_));
task_l.Wait(MTAPI_INFINITE);
task_r.Wait(MTAPI_INFINITE);
result_ = reduction_(result_l, result_r);
......@@ -88,15 +98,23 @@ class ReduceFunctor {
}
private:
RAI first_;
RAI last_;
typedef ReduceFunctor<RAI, ReturnType,
ReductionFunction,
TransformationFunction> self_t;
private:
size_t chunk_first_;
size_t chunk_last_;
ReturnType neutral_;
ReductionFunction reduction_;
TransformationFunction transformation_;
const embb::mtapi::ExecutionPolicy& policy_;
size_t block_size_;
const BlockSizePartitioner<RAI>& partitioner_;
ReturnType& result_;
/**
* Disables assignment and copy-construction.
*/
ReduceFunctor& operator=(const ReduceFunctor&);
ReduceFunctor(const ReduceFunctor&);
};
......@@ -110,27 +128,40 @@ ReturnType ReduceRecursive(RAI first, RAI last, ReturnType neutral,
size_t block_size) {
typedef typename std::iterator_traits<RAI>::difference_type difference_type;
difference_type distance = std::distance(first, last);
assert(distance > 0);
if (distance == 0) {
EMBB_THROW(embb::base::ErrorException, "Distance for Reduce is 0");
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
EMBB_THROW(embb::base::ErrorException, "No cores in execution policy");
}
mtapi::Node& node = mtapi::Node::GetInstance();
size_t used_block_size = block_size;
if (used_block_size == 0) {
used_block_size = static_cast<size_t>(distance) / node.GetCoreCount();
if (used_block_size == 0) used_block_size = 1;
// Determine actually used block size
if (block_size == 0) {
block_size = (static_cast<size_t>(distance) / num_cores);
if (block_size == 0) {
block_size = 1;
}
}
if (((distance / used_block_size) * 2) + 1 > MTAPI_NODE_MAX_TASKS_DEFAULT) {
// Perform check of task number sufficiency
if (((distance / block_size) * 2) + 1 > MTAPI_NODE_MAX_TASKS_DEFAULT) {
EMBB_THROW(embb::base::ErrorException,
"Number of computation tasks required in reduction would "
"exceed MTAPI maximum number of tasks");
}
ReturnType result = neutral;
typedef ReduceFunctor<RAI, ReturnType, ReductionFunction,
TransformationFunction> Functor;
Functor functor(first, last, neutral, reduction, transformation, policy,
used_block_size, result);
mtapi::Task task = node.Spawn(mtapi::Action(base::MakeFunction(
BlockSizePartitioner<RAI> partitioner(first, last, block_size);
ReturnType result = neutral;
Functor functor(0,
partitioner.Size() - 1,
neutral,
reduction, transformation,
policy,
partitioner,
result);
mtapi::Task task = node.Spawn(
mtapi::Action(base::MakeFunction(
functor, &Functor::Action), policy));
task.Wait(MTAPI_INFINITE);
return result;
......
......@@ -41,73 +41,79 @@ template<typename RAIIn, typename RAIOut, typename ReturnType,
typename ScanFunction, typename TransformationFunction>
class ScanFunctor {
public:
ScanFunctor(RAIIn first, RAIIn last, RAIOut output_iterator,
ScanFunctor(size_t chunk_first, size_t chunk_last, RAIOut output_iterator,
ReturnType neutral, ScanFunction scan,
TransformationFunction transformation,
const embb::mtapi::ExecutionPolicy& policy,
size_t block_size, ReturnType* tree_values, size_t node_id,
const BlockSizePartitioner<RAIIn>& partitioner,
ReturnType* tree_values, size_t node_id,
bool going_down)
: policy_(policy), first_(first), last_(last),
: policy_(policy), chunk_first_(chunk_first), chunk_last_(chunk_last),
output_iterator_(output_iterator), scan_(scan),
transformation_(transformation),
neutral_(neutral), block_size_(block_size), tree_values_(tree_values),
neutral_(neutral), partitioner_(partitioner), tree_values_(tree_values),
node_id_(node_id), parent_value_(neutral), is_first_pass_(going_down) {
}
void Action(mtapi::TaskContext&) {
if (first_ == last_) {
return;
}
size_t distance = static_cast<size_t>(std::distance(first_, last_));
if (distance <= block_size_) { // leaf case -> do work
if (chunk_first_ == chunk_last_) {
ChunkDescriptor<RAIIn> chunk = partitioner_[chunk_first_];
RAIIn iter_in = chunk.GetFirst();
RAIIn last_in = chunk.GetLast();
RAIOut iter_out = output_iterator_;
// leaf case -> do work
if (is_first_pass_) {
RAIIn iter_in = first_;
RAIOut iter_out = output_iterator_;
ReturnType result = transformation_(*first_);
ReturnType result = transformation_(*iter_in);
*iter_out = result;
++iter_in;
++iter_out;
while (iter_in != last_) {
for (; iter_in != last_in; ++iter_in, ++iter_out) {
result = scan_(result, transformation_(*iter_in));
*iter_out = result;
++iter_in;
++iter_out;
}
SetTreeValue(result);
} else { // Second pass
RAIIn iter_in = first_;
RAIOut iter_out = output_iterator_;
while (iter_in != last_) {
} else {
// Second pass
for (; iter_in != last_in; ++iter_in, ++iter_out) {
*iter_out = scan_(parent_value_, *iter_out);
++iter_in;
++iter_out;
}
}
} else {
internal::ChunkPartitioner<RAIIn> partitioner(first_, last_, 2);
ScanFunctor functor_l(partitioner[0].GetFirst(), partitioner[0].GetLast(),
output_iterator_, neutral_, scan_, transformation_,
policy_, block_size_, tree_values_, node_id_,
is_first_pass_);
ScanFunctor functor_r(partitioner[1].GetFirst(), partitioner[1].GetLast(),
output_iterator_, neutral_, scan_, transformation_,
policy_, block_size_, tree_values_, node_id_,
is_first_pass_);
functor_l.SetID(1);
functor_r.SetID(2);
// recurse further
size_t chunk_split_index = (chunk_first_ + chunk_last_) / 2;
// Split chunks into left / right branches:
ScanFunctor functor_l(
chunk_first_, chunk_split_index,
output_iterator_, neutral_, scan_, transformation_,
policy_, partitioner_, tree_values_, node_id_,
is_first_pass_);
ScanFunctor functor_r(
chunk_split_index + 1, chunk_last_,
output_iterator_, neutral_, scan_, transformation_,
policy_, partitioner_, tree_values_, node_id_,
is_first_pass_);
functor_l.SetID(LEFT);
functor_r.SetID(RIGHT);
// Advance output iterator of right branch:
ChunkDescriptor<RAIIn> chunk_left = partitioner_[chunk_first_];
ChunkDescriptor<RAIIn> chunk_right = partitioner_[chunk_split_index + 1];
std::advance(functor_r.output_iterator_,
std::distance(functor_l.first_, functor_r.first_));
std::distance(chunk_left.GetFirst(), chunk_right.GetFirst()));
if (!is_first_pass_) {
functor_l.parent_value_ = parent_value_;
functor_r.parent_value_ = functor_l.GetTreeValue() + parent_value_;
}
// Spawn tasks to recurse:
mtapi::Node& node = mtapi::Node::GetInstance();
mtapi::Task task_l = node.Spawn(mtapi::Action(base::MakeFunction(
functor_l, &ScanFunctor::Action),
policy_));
mtapi::Task task_r = node.Spawn(mtapi::Action(base::MakeFunction(
functor_r, &ScanFunctor::Action),
policy_));
mtapi::Task task_l = node.Spawn(
mtapi::Action(
base::MakeFunction(functor_l, &ScanFunctor::Action),
policy_));
mtapi::Task task_r = node.Spawn(
mtapi::Action(
base::MakeFunction(functor_r, &ScanFunctor::Action),
policy_));
// Wait for tasks to complete:
task_l.Wait(MTAPI_INFINITE);
task_r.Wait(MTAPI_INFINITE);
SetTreeValue(scan_(functor_l.GetTreeValue(), functor_r.GetTreeValue()));
......@@ -123,23 +129,25 @@ class ScanFunctor {
}
private:
static const int LEFT = 1;
static const int RIGHT = 2;
const embb::mtapi::ExecutionPolicy& policy_;
RAIIn first_;
RAIIn last_;
size_t chunk_first_;
size_t chunk_last_;
RAIOut output_iterator_;
ScanFunction scan_;
TransformationFunction transformation_;
ReturnType neutral_;
size_t block_size_;
const BlockSizePartitioner<RAIIn>& partitioner_;
ReturnType* tree_values_;
size_t node_id_;
ReturnType parent_value_;
bool is_first_pass_;
void SetID(int is_left) {
if (is_left == 1) {
void SetID(int branch) {
if (branch == LEFT) {
node_id_ = 2 * node_id_ + 1;
} else if (is_left == 2) {
} else if (branch == RIGHT) {
node_id_ = 2 * node_id_ + 2;
}
}
......@@ -168,34 +176,43 @@ void ScanIteratorCheck(RAIIn first, RAIIn last, RAIOut output_iterator,
if (distance <= 0) {
return;
}
mtapi::Node& node = mtapi::Node::GetInstance();
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
EMBB_THROW(embb::base::ErrorException, "No cores in execution policy");
}
ReturnType values[MTAPI_NODE_MAX_TASKS_DEFAULT];
size_t used_block_size = block_size;
if (block_size == 0) {
used_block_size = static_cast<size_t>(distance) / node.GetCoreCount();
if (used_block_size == 0) used_block_size = 1;
block_size = static_cast<size_t>(distance) / num_cores;
if (block_size == 0) {
block_size = 1;
}
}
if (((distance / used_block_size) * 2) + 1 > MTAPI_NODE_MAX_TASKS_DEFAULT) {
if (((distance / block_size) * 2) + 1 > MTAPI_NODE_MAX_TASKS_DEFAULT) {
EMBB_THROW(embb::base::ErrorException,
"Number of computation tasks required in scan "
"exceeds MTAPI maximum number of tasks");
"Not enough MTAPI tasks available for parallel scan");
}
// first pass. Calculates prefix sums for leaves and when recursion returns
// it creates the tree.
typedef ScanFunctor<RAIIn, RAIOut, ReturnType, ScanFunction,
TransformationFunction> Functor;
Functor functor_down(first, last, output_iterator, neutral, scan,
transformation, policy, used_block_size, values, 0,
true);
mtapi::Node& node = mtapi::Node::GetInstance();
BlockSizePartitioner<RAIIn> partitioner_down(first, last, block_size);
Functor functor_down(0, partitioner_down.Size() - 1, output_iterator,
neutral, scan, transformation, policy, partitioner_down,
values, 0, true);
mtapi::Task task_down = node.Spawn(mtapi::Action(base::MakeFunction(
functor_down, &Functor::Action),
policy));
task_down.Wait(MTAPI_INFINITE);
// Second pass. Gives to each leaf the part of the prefix missing
Functor functor_up(first, last, output_iterator, neutral, scan,
transformation, policy, used_block_size, values, 0, false);
BlockSizePartitioner<RAIIn> partitioner_up(first, last, block_size);
Functor functor_up(0, partitioner_up.Size() - 1, output_iterator,
neutral, scan, transformation, policy, partitioner_up,
values, 0, false);
mtapi::Task task_up = node.Spawn(mtapi::Action(base::MakeFunction(
functor_up, &Functor::Action),
policy));
......
......@@ -168,7 +168,7 @@ void MergeSortAllocate(
typename std::iterator_traits<RAI>::difference_type distance = last - first;
typedef typename std::iterator_traits<RAI>::value_type value_type;
value_type* temporary = static_cast<value_type*>(
Alloc::Allocate(distance * sizeof(value_type)));
Alloc::Allocate(distance * sizeof(value_type)));
MergeSort(first, last, temporary, comparison, policy, block_size);
Alloc::Free(temporary);
}
......
......@@ -60,7 +60,7 @@ CountTest::CountTest() {
void CountTest::TestDataStructures() {
using embb::algorithms::Count;
const int size = 10;
int array[] = {10, 20, 30, 30, 20, 10, 10, 20, 20, 20};
int array[] = { 10, 20, 30, 30, 20, 10, 10, 20, 20, 20 };
std::vector<int> vector(array, array + size);
std::deque<int> deque(array, array + size);
const std::vector<int> const_vector(array, array + size);
......@@ -74,7 +74,7 @@ void CountTest::TestDataStructures() {
void CountTest::TestCountIf() {
using embb::algorithms::CountIf;
const int size = 10;
int array[] = {10, 21, 30, 31, 20, 11, 10, 21, 20, 20};
int array[] = { 10, 21, 30, 31, 20, 11, 10, 21, 20, 20 };
PT_EXPECT_EQ(CountIf(array, array + size, IsEven()), 6);
PT_EXPECT_EQ(CountIf(array, array + size, &IsEvenFunction), 6);
}
......@@ -128,8 +128,6 @@ void CountTest::TestPolicy() {
PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10, ExecutionPolicy()), 3);
PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10, ExecutionPolicy(true)),
3);
PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10, ExecutionPolicy(false)),
3);
PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10,
ExecutionPolicy(true, 1)), 3);
}
......
......@@ -205,13 +205,7 @@ void ForEachTest::TestPolicy() {
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector[i], init[i]*init[i]);
}
vector = init;
ForEach(vector.begin(), vector.end(), Square(), ExecutionPolicy(false));
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector[i], init[i]*init[i]);
}
vector = init;
ForEach(vector.begin(), vector.end(), Square(), ExecutionPolicy(true, 1));
for (size_t i = 0; i < count; i++) {
......
......@@ -208,13 +208,6 @@ void MergeSortTest::TestPolicy() {
vector = init;
MergeSortAllocate(vector.begin(), vector.end(), std::less<int>(),
ExecutionPolicy(false));
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]);
}
vector = init;
MergeSortAllocate(vector.begin(), vector.end(), std::less<int>(),
ExecutionPolicy(true, 1));
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]);
......
......@@ -214,13 +214,6 @@ void QuickSortTest::TestPolicy() {
vector = init;
QuickSort(vector.begin(), vector.end(), std::greater<int>(),
ExecutionPolicy(false));
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]);
}
vector = init;
QuickSort(vector.begin(), vector.end(), std::greater<int>(),
ExecutionPolicy(true, 1));
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]);
......
......@@ -179,8 +179,6 @@ void ReduceTest::TestPolicy() {
Identity(), ExecutionPolicy()), sum);
PT_EXPECT_EQ(Reduce(vector.begin(), vector.end(), 0, std::plus<int>(),
Identity(), ExecutionPolicy(true)), sum);
PT_EXPECT_EQ(Reduce(vector.begin(), vector.end(), 0,
std::plus<int>(), Identity(), ExecutionPolicy(false)), sum);
PT_EXPECT_EQ(Reduce(vector.begin(), vector.end(), 0, std::plus<int>(),
Identity(), ExecutionPolicy(true, 1)), sum);
}
......
......@@ -284,15 +284,6 @@ void ScanTest::TestPolicy() {
outputVector = init;
Scan(vector.begin(), vector.end(), outputVector.begin(), 0, std::plus<int>(),
Identity(), ExecutionPolicy(false));
expected = 0;
for (size_t i = 0; i < count; i++) {
expected += vector[i];
PT_EXPECT_EQ(expected, outputVector[i]);
}
outputVector = init;
Scan(vector.begin(), vector.end(), outputVector.begin(), 0, std::plus<int>(),
Identity(), ExecutionPolicy(true, 1));
expected = 0;
for (size_t i = 0; i < count; i++) {
......
......@@ -105,4 +105,4 @@ typedef pthread_cond_t embb_condition_t;
} /* Close extern "C" { */
#endif
#endif // EMBB_BASE_C_INTERNAL_PLATFORM_H_
#endif // EMBB_BASE_C_INTERNAL_PLATFORM_H_
......@@ -81,4 +81,4 @@ class AllocTest : public partest::TestCase {
} // namespace base
} // namespace embb
#endif // BASE_C_TEST_ALLOC_TEST_H_
#endif // BASE_C_TEST_ALLOC_TEST_H_
......@@ -65,4 +65,4 @@ class ConditionVarTest : public partest::TestCase {
} // namespace base
} // namespace embb
#endif // BASE_C_TEST_CONDITION_VAR_TEST_H_
#endif // BASE_C_TEST_CONDITION_VAR_TEST_H_
......@@ -51,4 +51,4 @@ class CoreSetTest : public partest::TestCase {
} // namespace base
} // namespace embb
#endif // BASE_C_TEST_CORE_SET_TEST_H_
#endif // BASE_C_TEST_CORE_SET_TEST_H_
......@@ -97,4 +97,4 @@ class CounterTest : public partest::TestCase {
} // namespace base
} // namespace embb
#endif // BASE_C_TEST_COUNTER_TEST_H_
#endif // BASE_C_TEST_COUNTER_TEST_H_
......@@ -73,4 +73,4 @@ class DurationTest : public partest::TestCase {
} // namespace base
} // namespace embb
#endif // BASE_C_TEST_DURATION_TEST_H_
#endif // BASE_C_TEST_DURATION_TEST_H_
......@@ -78,4 +78,4 @@ int ThreadStart(void* arg);
#endif // BASE_C_TEST_THREAD_INDEX_TEST_H_
#endif // BASE_C_TEST_THREAD_INDEX_TEST_H_
......@@ -61,4 +61,4 @@ class ThreadSpecificStorageTest : public partest::TestCase {
#endif // BASE_C_TEST_THREAD_SPECIFIC_STORAGE_TEST_H_
#endif // BASE_C_TEST_THREAD_SPECIFIC_STORAGE_TEST_H_
......@@ -66,4 +66,4 @@ int ThreadStartFunction(void* arg);
} // namespace base
} // namespace embb
#endif // BASE_C_TEST_THREAD_TEST_H_
#endif // BASE_C_TEST_THREAD_TEST_H_
......@@ -53,4 +53,4 @@ class TimeTest : public partest::TestCase {
#endif // BASE_C_TEST_TIME_TEST_H_
#endif // BASE_C_TEST_TIME_TEST_H_
......@@ -201,4 +201,4 @@ class CoreSet {
#endif // EMBB_BASE_CORE_SET_H_
#endif // EMBB_BASE_CORE_SET_H_
......@@ -522,4 +522,4 @@ class Nanoseconds : public Tick {
#include <embb/base/internal/duration-inl.h>
#endif // EMBB_BASE_DURATION_H_
#endif // EMBB_BASE_DURATION_H_
......@@ -175,16 +175,6 @@ class Exception : public std::exception {
}
/**
* Returns the error message.
* This is here for compatibility with std::exception.
*
* \return Pointer to error message
*/
virtual const char* what() const throw() {
return What();
}
/**
* Returns an integer code representing the exception.
*
* \return %Exception code
......@@ -286,4 +276,4 @@ class ErrorException : public Exception {
} // namespace base
} // namespace embb
#endif // EMBB_BASE_EXCEPTIONS_H_
#endif // EMBB_BASE_EXCEPTIONS_H_
......@@ -112,4 +112,4 @@ Duration<Tick>::Duration(const embb_duration_t& duration) : rep_() {
} // namespace base
} // namespace embb
#endif // EMBB_BASE_INTERNAL_DURATION_INL_H_
#endif // EMBB_BASE_INTERNAL_DURATION_INL_H_
......@@ -116,4 +116,4 @@ bool UniqueLock<Mutex>::OwnsLock() const {
} // namespace base
} // namespace embb
#endif // EMBB_BASE_INTERNAL_MUTEX_INL_H_
#endif // EMBB_BASE_INTERNAL_MUTEX_INL_H_
......@@ -107,4 +107,4 @@ std::basic_ostream<CharT, Traits>&
} // namespace base
} // namespace embb
#endif // EMBB_BASE_INTERNAL_THREAD_INL_H_
#endif // EMBB_BASE_INTERNAL_THREAD_INL_H_
......@@ -103,4 +103,4 @@ struct ThreadClosureArg2 {
} // namespace base
} // namespace embb
#endif // EMBB_BASE_INTERNAL_THREAD_CLOSURES_H_
#endif // EMBB_BASE_INTERNAL_THREAD_CLOSURES_H_
......@@ -149,4 +149,4 @@ void ThreadSpecificStorage<Type>::Prepare() {
} // namespace base
} // namespace embb
#endif // EMBB_BASE_INTERNAL_THREAD_SPECIFIC_STORAGE_INL_H_
#endif // EMBB_BASE_INTERNAL_THREAD_SPECIFIC_STORAGE_INL_H_
......@@ -35,12 +35,10 @@ namespace embb {
namespace base {
namespace test {
/**
* Forward declaration for friending.
*/
* Forward declaration for friending.
*/
class ThreadSpecificStorageTest;
}
/**
......@@ -178,4 +176,4 @@ class ThreadSpecificStorage {
#include <embb/base/internal/thread_specific_storage-inl.h>
#endif // EMBB_BASE_THREAD_SPECIFIC_STORAGE_H_
#endif // EMBB_BASE_THREAD_SPECIFIC_STORAGE_H_
......@@ -78,4 +78,4 @@ class Time {
} // namespace base
} // namespace embb
#endif // EMBB_BASE_TIME_H_
#endif // EMBB_BASE_TIME_H_
......@@ -43,8 +43,7 @@ class ThreadTest : public partest::TestCase {
/**
* Adds test methods.
*/
ThreadTest(); /**<
number of threads concurrently running test methods */
ThreadTest();
private:
/**
......
......@@ -89,7 +89,7 @@ class Inputs<Slices, T1, embb::base::internal::Nil, embb::base::internal::Nil,
const int idx = clock % Slices;
if (count_[idx] == 0) {
EMBB_THROW(embb::base::ErrorException,
"All inputs already fired for this clock.")
"All inputs already fired for this clock.");
}
if (--count_[idx] == 0) {
count_[idx] = 1;
......@@ -135,7 +135,7 @@ class Inputs<Slices, T1, T2, embb::base::internal::Nil,
const int idx = clock % Slices;
if (count_[idx] == 0) {
EMBB_THROW(embb::base::ErrorException,
"All inputs already fired for this clock.")
"All inputs already fired for this clock.");
}
if (--count_[idx] == 0) {
count_[idx] = 2;
......@@ -185,7 +185,7 @@ class Inputs<Slices, T1, T2, T3, embb::base::internal::Nil,
const int idx = clock % Slices;
if (count_[idx] == 0) {
EMBB_THROW(embb::base::ErrorException,
"All inputs already fired for this clock.")
"All inputs already fired for this clock.");
}
if (--count_[idx] == 0) {
count_[idx] = 3;
......@@ -238,7 +238,7 @@ class Inputs<Slices, T1, T2, T3, T4, embb::base::internal::Nil>
const int idx = clock % Slices;
if (count_[idx] == 0) {
EMBB_THROW(embb::base::ErrorException,
"All inputs already fired for this clock.")
"All inputs already fired for this clock.");
}
if (--count_[idx] == 0) {
count_[idx] = 4;
......@@ -296,7 +296,7 @@ class Inputs
const int idx = clock % Slices;
if (count_[idx] == 0) {
EMBB_THROW(embb::base::ErrorException,
"All inputs already fired for this clock.")
"All inputs already fired for this clock.");
}
if (--count_[idx] == 0) {
count_[idx] = 5;
......
......@@ -106,6 +106,13 @@ class ExecutionPolicy{
);
/**
* Returns the number of cores the policy is affine to.
*
* \return the number of cores
*/
unsigned int GetCoreCount() const;
/**
* Returns the affinity
*
* \return the affinity
......
......@@ -129,6 +129,15 @@ class Node {
}
/**
* Returns the number of worker threads.
* \return The number of worker threads.
* \waitfree
*/
mtapi_uint_t GetWorkerThreadCount() const {
return worker_thread_count_;
}
/**
* Creates a Group to launch \link Task Tasks \endlink in.
* \return A reference to the created Group
* \throws ErrorException if the Group object could not be constructed.
......@@ -210,6 +219,7 @@ class Node {
mtapi_task_context_t * context);
mtapi_uint_t core_count_;
mtapi_uint_t worker_thread_count_;
mtapi_action_hndl_t action_handle_;
std::list<Queue*> queues_;
std::list<Group*> groups_;
......
......@@ -27,6 +27,7 @@
#include <embb/mtapi/execution_policy.h>
#include <embb/mtapi/mtapi.h>
#include <embb/base/exceptions.h>
#include <embb/base/c/internal/bitset.h>
#include <cassert>
namespace embb {
......@@ -105,6 +106,10 @@ bool ExecutionPolicy::IsSetWorker(mtapi_uint_t worker) {
return MTAPI_TRUE == aff;
}
unsigned int ExecutionPolicy::GetCoreCount() const {
return embb_bitset_count(&affinity_);
}
const mtapi_affinity_t &ExecutionPolicy::GetAffinity() const {
return affinity_;
}
......
......@@ -75,6 +75,7 @@ Node::Node(
"mtapi::Node could not initialize mtapi");
}
core_count_ = info.hardware_concurrency;
worker_thread_count_ = embb_core_set_count(&attr->core_affinity);
action_handle_ = mtapi_action_create(MTAPI_CPP_TASK_JOB, action_func,
MTAPI_NULL, 0, MTAPI_NULL, &status);
if (MTAPI_SUCCESS != status) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment