Commit c47fcd25 by Tobias Fuchs

mtapi_cpp, algorithms_cpp: resolving number of cores from affinity now

parent 0f7719f9
...@@ -106,12 +106,16 @@ void ForEachRecursive(RAI first, RAI last, Function unary, ...@@ -106,12 +106,16 @@ void ForEachRecursive(RAI first, RAI last, Function unary,
typedef typename std::iterator_traits<RAI>::difference_type difference_type; typedef typename std::iterator_traits<RAI>::difference_type difference_type;
difference_type distance = std::distance(first, last); difference_type distance = std::distance(first, last);
if (distance == 0) { if (distance == 0) {
EMBB_THROW(embb::base::ErrorException, "Distance for ForEach is 0"); return;
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
EMBB_THROW(embb::base::ErrorException, "No cores in execution policy");
} }
mtapi::Node& node = mtapi::Node::GetInstance(); mtapi::Node& node = mtapi::Node::GetInstance();
// Determine actually used block size // Determine actually used block size
if (block_size == 0) { if (block_size == 0) {
block_size = (static_cast<size_t>(distance) / node.GetCoreCount()); block_size = (static_cast<size_t>(distance) / num_cores);
if (block_size == 0) { if (block_size == 0) {
block_size = 1; block_size = 1;
} }
......
...@@ -89,21 +89,21 @@ class MergeSortFunctor { ...@@ -89,21 +89,21 @@ class MergeSortFunctor {
task_l.Wait(MTAPI_INFINITE); task_l.Wait(MTAPI_INFINITE);
task_r.Wait(MTAPI_INFINITE); task_r.Wait(MTAPI_INFINITE);
ChunkDescriptor<RAI> chunk_f = partitioner_[chunk_first_]; ChunkDescriptor<RAI> ck_f = partitioner_[chunk_first_];
ChunkDescriptor<RAI> chunk_m = partitioner_[chunk_split_index + 1]; ChunkDescriptor<RAI> ck_m = partitioner_[chunk_split_index + 1];
ChunkDescriptor<RAI> chunk_l = partitioner_[chunk_last_]; ChunkDescriptor<RAI> ck_l = partitioner_[chunk_last_];
if(CloneBackToInput(depth_)) { if(CloneBackToInput(depth_)) {
// Merge from temp into input: // Merge from temp into input:
difference_type first = std::distance(global_first_, chunk_f.GetFirst()); difference_type first = std::distance(global_first_, ck_f.GetFirst());
difference_type mid = std::distance(global_first_, chunk_m.GetFirst()); difference_type mid = std::distance(global_first_, ck_m.GetFirst());
difference_type last = std::distance(global_first_, chunk_l.GetLast()); difference_type last = std::distance(global_first_, ck_l.GetLast());
SerialMerge(temp_first_ + first, temp_first_ + mid, temp_first_ + last, SerialMerge(temp_first_ + first, temp_first_ + mid, temp_first_ + last,
chunk_f.GetFirst(), ck_f.GetFirst(),
comparison_); comparison_);
} else { } else {
// Merge from input into temp: // Merge from input into temp:
SerialMerge(chunk_f.GetFirst(), chunk_m.GetFirst(), chunk_l.GetLast(), SerialMerge(ck_f.GetFirst(), ck_m.GetFirst(), ck_l.GetLast(),
temp_first_ + std::distance(global_first_, chunk_f.GetFirst()), temp_first_ + std::distance(global_first_, ck_f.GetFirst()),
comparison_); comparison_);
} }
} }
...@@ -129,31 +129,30 @@ class MergeSortFunctor { ...@@ -129,31 +129,30 @@ class MergeSortFunctor {
// Recurse further. Use binary split, ignoring chunk size as this // Recurse further. Use binary split, ignoring chunk size as this
// recursion is serial and has leaf size 1: // recursion is serial and has leaf size 1:
ChunkPartitioner<RAI> partitioner(first, last, 2); ChunkPartitioner<RAI> partitioner(first, last, 2);
ChunkDescriptor<RAI> chunk_l = partitioner[0]; ChunkDescriptor<RAI> ck_l = partitioner[0];
ChunkDescriptor<RAI> chunk_r = partitioner[1]; ChunkDescriptor<RAI> ck_r = partitioner[1];
MergeSortChunk( MergeSortChunk(
chunk_l.GetFirst(), ck_l.GetFirst(),
chunk_l.GetLast(), ck_l.GetLast(),
depth + 1); depth + 1);
MergeSortChunk( MergeSortChunk(
chunk_r.GetFirst(), ck_r.GetFirst(),
chunk_r.GetLast(), ck_r.GetLast(),
depth + 1); depth + 1);
if (CloneBackToInput(depth)) { if (CloneBackToInput(depth)) {
// Merge from temp into input: // Merge from temp into input:
difference_type d_first = std::distance(global_first_, chunk_l.GetFirst()); difference_type d_first = std::distance(global_first_, ck_l.GetFirst());
difference_type d_mid = std::distance(global_first_, chunk_r.GetFirst()); difference_type d_mid = std::distance(global_first_, ck_r.GetFirst());
difference_type d_last = std::distance(global_first_, chunk_r.GetLast()); difference_type d_last = std::distance(global_first_, ck_r.GetLast());
SerialMerge( SerialMerge(
temp_first_ + d_first, temp_first_ + d_mid, temp_first_ + d_last, temp_first_ + d_first, temp_first_ + d_mid, temp_first_ + d_last,
chunk_l.GetFirst(), ck_l.GetFirst(),
comparison_); comparison_);
} } else {
else {
// Merge from input into temp: // Merge from input into temp:
SerialMerge( SerialMerge(
chunk_l.GetFirst(), chunk_r.GetFirst(), chunk_r.GetLast(), ck_l.GetFirst(), ck_r.GetFirst(), ck_r.GetLast(),
temp_first_ + std::distance(global_first_, chunk_l.GetFirst()), temp_first_ + std::distance(global_first_, ck_l.GetFirst()),
comparison_); comparison_);
} }
} }
...@@ -226,15 +225,19 @@ void MergeSort( ...@@ -226,15 +225,19 @@ void MergeSort(
size_t block_size size_t block_size
) { ) {
typedef typename std::iterator_traits<RAI>::difference_type difference_type; typedef typename std::iterator_traits<RAI>::difference_type difference_type;
typedef internal::MergeSortFunctor<RAI, RAITemp, ComparisonFunction> functor_t; typedef internal::MergeSortFunctor<RAI, RAITemp, ComparisonFunction>
functor_t;
difference_type distance = std::distance(first, last); difference_type distance = std::distance(first, last);
if (distance == 0) { if (distance == 0) {
EMBB_THROW(embb::base::ErrorException, "Distance for ForEach is 0"); EMBB_THROW(embb::base::ErrorException, "Distance for ForEach is 0");
} }
embb::mtapi::Node &node = embb::mtapi::Node::GetInstance(); unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
EMBB_THROW(embb::base::ErrorException, "No cores in execution policy");
}
// Determine actually used block size // Determine actually used block size
if (block_size == 0) { if (block_size == 0) {
block_size = (static_cast<size_t>(distance) / node.GetCoreCount()); block_size = (static_cast<size_t>(distance) / num_cores);
if (block_size == 0) if (block_size == 0)
block_size = 1; block_size = 1;
} }
...@@ -253,9 +256,10 @@ void MergeSort( ...@@ -253,9 +256,10 @@ void MergeSort(
partitioner, partitioner,
first, first,
0); 0);
mtapi::Task task = node.Spawn(mtapi::Action(base::MakeFunction(functor, mtapi::Task task = embb::mtapi::Node::GetInstance().Spawn(
&functor_t::Action), mtapi::Action(
policy)); base::MakeFunction(functor, &functor_t::Action),
policy));
task.Wait(MTAPI_INFINITE); task.Wait(MTAPI_INFINITE);
} }
......
...@@ -94,7 +94,7 @@ ChunkPartitioner<ForwardIterator>::ChunkPartitioner(ForwardIterator first, ...@@ -94,7 +94,7 @@ ChunkPartitioner<ForwardIterator>::ChunkPartitioner(ForwardIterator first,
} else { } else {
// if no concrete chunk size was given, use number of cores... // if no concrete chunk size was given, use number of cores...
mtapi::Node& node = mtapi::Node::GetInstance(); mtapi::Node& node = mtapi::Node::GetInstance();
size = node.GetCoreCount(); size = node.GetWorkerThreadCount();
} }
elements_count = static_cast<size_t>(std::distance(first, last)); elements_count = static_cast<size_t>(std::distance(first, last));
......
...@@ -192,10 +192,17 @@ template <typename RAI, typename ComparisonFunction> ...@@ -192,10 +192,17 @@ template <typename RAI, typename ComparisonFunction>
void QuickSort(RAI first, RAI last, ComparisonFunction comparison, void QuickSort(RAI first, RAI last, ComparisonFunction comparison,
const embb::mtapi::ExecutionPolicy& policy, size_t block_size) { const embb::mtapi::ExecutionPolicy& policy, size_t block_size) {
embb::mtapi::Node& node = embb::mtapi::Node::GetInstance(); embb::mtapi::Node& node = embb::mtapi::Node::GetInstance();
typename std::iterator_traits<RAI>::difference_type distance = last - first; typedef typename std::iterator_traits<RAI>::difference_type difference_type;
assert(distance > 0); difference_type distance = std::distance(first, last);
if (distance <= 0) {
return;
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
EMBB_THROW(embb::base::ErrorException, "No cores in execution policy");
}
if (block_size == 0) { if (block_size == 0) {
block_size = (static_cast<size_t>(distance) / node.GetCoreCount()); block_size = (static_cast<size_t>(distance) / num_cores);
if (block_size == 0) if (block_size == 0)
block_size = 1; block_size = 1;
} }
......
...@@ -131,10 +131,14 @@ ReturnType ReduceRecursive(RAI first, RAI last, ReturnType neutral, ...@@ -131,10 +131,14 @@ ReturnType ReduceRecursive(RAI first, RAI last, ReturnType neutral,
if (distance == 0) { if (distance == 0) {
EMBB_THROW(embb::base::ErrorException, "Distance for Reduce is 0"); EMBB_THROW(embb::base::ErrorException, "Distance for Reduce is 0");
} }
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
EMBB_THROW(embb::base::ErrorException, "No cores in execution policy");
}
mtapi::Node& node = mtapi::Node::GetInstance(); mtapi::Node& node = mtapi::Node::GetInstance();
// Determine actually used block size // Determine actually used block size
if (block_size == 0) { if (block_size == 0) {
block_size = (static_cast<size_t>(distance) / node.GetCoreCount()); block_size = (static_cast<size_t>(distance) / num_cores);
if (block_size == 0) { if (block_size == 0) {
block_size = 1; block_size = 1;
} }
......
...@@ -176,10 +176,14 @@ void ScanIteratorCheck(RAIIn first, RAIIn last, RAIOut output_iterator, ...@@ -176,10 +176,14 @@ void ScanIteratorCheck(RAIIn first, RAIIn last, RAIOut output_iterator,
if (distance <= 0) { if (distance <= 0) {
return; return;
} }
mtapi::Node& node = mtapi::Node::GetInstance(); unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
EMBB_THROW(embb::base::ErrorException, "No cores in execution policy");
}
ReturnType values[MTAPI_NODE_MAX_TASKS_DEFAULT]; ReturnType values[MTAPI_NODE_MAX_TASKS_DEFAULT];
if (block_size == 0) { if (block_size == 0) {
block_size = static_cast<size_t>(distance) / node.GetCoreCount(); block_size = static_cast<size_t>(distance) / num_cores;
if (block_size == 0) { if (block_size == 0) {
block_size = 1; block_size = 1;
} }
...@@ -193,6 +197,7 @@ void ScanIteratorCheck(RAIIn first, RAIIn last, RAIOut output_iterator, ...@@ -193,6 +197,7 @@ void ScanIteratorCheck(RAIIn first, RAIIn last, RAIOut output_iterator,
// it creates the tree. // it creates the tree.
typedef ScanFunctor<RAIIn, RAIOut, ReturnType, ScanFunction, typedef ScanFunctor<RAIIn, RAIOut, ReturnType, ScanFunction,
TransformationFunction> Functor; TransformationFunction> Functor;
mtapi::Node& node = mtapi::Node::GetInstance();
BlockSizePartitioner<RAIIn> partitioner_down(first, last, block_size); BlockSizePartitioner<RAIIn> partitioner_down(first, last, block_size);
Functor functor_down(0, partitioner_down.Size() - 1, output_iterator, Functor functor_down(0, partitioner_down.Size() - 1, output_iterator,
......
...@@ -128,8 +128,6 @@ void CountTest::TestPolicy() { ...@@ -128,8 +128,6 @@ void CountTest::TestPolicy() {
PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10, ExecutionPolicy()), 3); PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10, ExecutionPolicy()), 3);
PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10, ExecutionPolicy(true)), PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10, ExecutionPolicy(true)),
3); 3);
PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10, ExecutionPolicy(false)),
3);
PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10, PT_EXPECT_EQ(Count(vector.begin(), vector.end(), 10,
ExecutionPolicy(true, 1)), 3); ExecutionPolicy(true, 1)), 3);
} }
......
...@@ -205,13 +205,7 @@ void ForEachTest::TestPolicy() { ...@@ -205,13 +205,7 @@ void ForEachTest::TestPolicy() {
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector[i], init[i]*init[i]); PT_EXPECT_EQ(vector[i], init[i]*init[i]);
} }
vector = init;
ForEach(vector.begin(), vector.end(), Square(), ExecutionPolicy(false));
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector[i], init[i]*init[i]);
}
vector = init; vector = init;
ForEach(vector.begin(), vector.end(), Square(), ExecutionPolicy(true, 1)); ForEach(vector.begin(), vector.end(), Square(), ExecutionPolicy(true, 1));
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
......
...@@ -208,13 +208,6 @@ void MergeSortTest::TestPolicy() { ...@@ -208,13 +208,6 @@ void MergeSortTest::TestPolicy() {
vector = init; vector = init;
MergeSortAllocate(vector.begin(), vector.end(), std::less<int>(), MergeSortAllocate(vector.begin(), vector.end(), std::less<int>(),
ExecutionPolicy(false));
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]);
}
vector = init;
MergeSortAllocate(vector.begin(), vector.end(), std::less<int>(),
ExecutionPolicy(true, 1)); ExecutionPolicy(true, 1));
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]); PT_EXPECT_EQ(vector_copy[i], vector[i]);
......
...@@ -214,13 +214,6 @@ void QuickSortTest::TestPolicy() { ...@@ -214,13 +214,6 @@ void QuickSortTest::TestPolicy() {
vector = init; vector = init;
QuickSort(vector.begin(), vector.end(), std::greater<int>(), QuickSort(vector.begin(), vector.end(), std::greater<int>(),
ExecutionPolicy(false));
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]);
}
vector = init;
QuickSort(vector.begin(), vector.end(), std::greater<int>(),
ExecutionPolicy(true, 1)); ExecutionPolicy(true, 1));
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]); PT_EXPECT_EQ(vector_copy[i], vector[i]);
......
...@@ -179,8 +179,6 @@ void ReduceTest::TestPolicy() { ...@@ -179,8 +179,6 @@ void ReduceTest::TestPolicy() {
Identity(), ExecutionPolicy()), sum); Identity(), ExecutionPolicy()), sum);
PT_EXPECT_EQ(Reduce(vector.begin(), vector.end(), 0, std::plus<int>(), PT_EXPECT_EQ(Reduce(vector.begin(), vector.end(), 0, std::plus<int>(),
Identity(), ExecutionPolicy(true)), sum); Identity(), ExecutionPolicy(true)), sum);
PT_EXPECT_EQ(Reduce(vector.begin(), vector.end(), 0,
std::plus<int>(), Identity(), ExecutionPolicy(false)), sum);
PT_EXPECT_EQ(Reduce(vector.begin(), vector.end(), 0, std::plus<int>(), PT_EXPECT_EQ(Reduce(vector.begin(), vector.end(), 0, std::plus<int>(),
Identity(), ExecutionPolicy(true, 1)), sum); Identity(), ExecutionPolicy(true, 1)), sum);
} }
......
...@@ -284,15 +284,6 @@ void ScanTest::TestPolicy() { ...@@ -284,15 +284,6 @@ void ScanTest::TestPolicy() {
outputVector = init; outputVector = init;
Scan(vector.begin(), vector.end(), outputVector.begin(), 0, std::plus<int>(), Scan(vector.begin(), vector.end(), outputVector.begin(), 0, std::plus<int>(),
Identity(), ExecutionPolicy(false));
expected = 0;
for (size_t i = 0; i < count; i++) {
expected += vector[i];
PT_EXPECT_EQ(expected, outputVector[i]);
}
outputVector = init;
Scan(vector.begin(), vector.end(), outputVector.begin(), 0, std::plus<int>(),
Identity(), ExecutionPolicy(true, 1)); Identity(), ExecutionPolicy(true, 1));
expected = 0; expected = 0;
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
......
...@@ -106,6 +106,13 @@ class ExecutionPolicy{ ...@@ -106,6 +106,13 @@ class ExecutionPolicy{
); );
/** /**
* Returns the number of cores the policy is affine to.
*
* \return the number of cores
*/
unsigned int GetCoreCount() const;
/**
* Returns the affinity * Returns the affinity
* *
* \return the affinity * \return the affinity
......
...@@ -129,6 +129,15 @@ class Node { ...@@ -129,6 +129,15 @@ class Node {
} }
/** /**
* Returns the number of worker threads.
* \return The number of worker threads.
* \waitfree
*/
mtapi_uint_t GetWorkerThreadCount() const {
return worker_thread_count_;
}
/**
* Creates a Group to launch \link Task Tasks \endlink in. * Creates a Group to launch \link Task Tasks \endlink in.
* \return A reference to the created Group * \return A reference to the created Group
* \throws ErrorException if the Group object could not be constructed. * \throws ErrorException if the Group object could not be constructed.
...@@ -210,6 +219,7 @@ class Node { ...@@ -210,6 +219,7 @@ class Node {
mtapi_task_context_t * context); mtapi_task_context_t * context);
mtapi_uint_t core_count_; mtapi_uint_t core_count_;
mtapi_uint_t worker_thread_count_;
mtapi_action_hndl_t action_handle_; mtapi_action_hndl_t action_handle_;
std::list<Queue*> queues_; std::list<Queue*> queues_;
std::list<Group*> groups_; std::list<Group*> groups_;
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include <embb/mtapi/execution_policy.h> #include <embb/mtapi/execution_policy.h>
#include <embb/mtapi/mtapi.h> #include <embb/mtapi/mtapi.h>
#include <embb/base/exceptions.h> #include <embb/base/exceptions.h>
#include <embb/base/c/internal/bitset.h>
#include <cassert> #include <cassert>
namespace embb { namespace embb {
...@@ -105,6 +106,10 @@ bool ExecutionPolicy::IsSetWorker(mtapi_uint_t worker) { ...@@ -105,6 +106,10 @@ bool ExecutionPolicy::IsSetWorker(mtapi_uint_t worker) {
return MTAPI_TRUE == aff; return MTAPI_TRUE == aff;
} }
unsigned int ExecutionPolicy::GetCoreCount() const {
return embb_bitset_count(&affinity_);
}
const mtapi_affinity_t &ExecutionPolicy::GetAffinity() const { const mtapi_affinity_t &ExecutionPolicy::GetAffinity() const {
return affinity_; return affinity_;
} }
......
...@@ -75,6 +75,7 @@ Node::Node( ...@@ -75,6 +75,7 @@ Node::Node(
"mtapi::Node could not initialize mtapi"); "mtapi::Node could not initialize mtapi");
} }
core_count_ = info.hardware_concurrency; core_count_ = info.hardware_concurrency;
worker_thread_count_ = embb_core_set_count(&attr->core_affinity);
action_handle_ = mtapi_action_create(MTAPI_CPP_TASK_JOB, action_func, action_handle_ = mtapi_action_create(MTAPI_CPP_TASK_JOB, action_func,
MTAPI_NULL, 0, MTAPI_NULL, &status); MTAPI_NULL, 0, MTAPI_NULL, &status);
if (MTAPI_SUCCESS != status) { if (MTAPI_SUCCESS != status) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment