Commit 7ee71b96 by Marcus Winter

Merge remote-tracking branch 'origin/development' into mtapi_distributed

Conflicts:
	mtapi_c/src/embb_mtapi_task_t.c
	mtapi_c/test/main.cc
parents 48c8ebc0 a517b341
Embedded Multicore Building Blocks (EMB²)
=========================================
Version 0.2.3
-------------
### Bug fixes:
- Fixed freeing of temporary buffer in MergeSortAllocate
- Fixed minor bugs in mtapi_c
- Fixed paths in Doxyfile.in template
### Changes and improvements:
- Changed use of partitioners in ForEach, Reduce, Scan, Count, and MergeSort
- Added guard definition to QuickSort and MergeSort requiring random access iterators as inputs
- Unified behavior of algorithms on empty input
- Cleaned up MergeSort and Scan
- Extended computation of number of cores to take into account affinities
- Changed MTAPI_CHECK_STATUS in examples to use exit() instead of abort()
- Added overload for std::exception::what() in embb::base::Exception
- Added missing include in execution_policy.cc
- Added tests for Thread::ID (base_cpp), ExecutionPolicy (mtapi_cpp), and error cases in mtapi_c
- Added tests on empty and negative input ranges in algorithms
### Features:
- None
### Build system:
- Added option to CMake to toggle automatic initialization of MTAPI C++ interface
- Changed run_tests_cygwin script to work with /bin/sh
- Modified create_tarball.sh script for completely automatic tarball creation
- Removed cppcheck warnings
- Removed cpplint warnings
- Updated partest
### Documentation:
- Added paragraphs in tutorial and README regarding performance impact of automatic initialization of MTAPI C++ interface
- Removed automatic collapsing of trees in Doxygen documentation due to incompatibility with latest versions of Doxygen
- Modified reference manual to consistently use function object concept
- Added description of default node attributes in mtapi_c and fixed typo in mtapi_cpp documentation
- Modified paragraph on documentation in README and fixed typo
Version 0.2.2
-------------
......
......@@ -40,11 +40,6 @@ function (CreateDoxygenDocumentationTarget)
if (TARGET doxygen)
# Do nothing, since the repeated adding causes an error
else()
set(DOXYGEN_TEMPLATE_FILES
"doc/reference/header.html")
file(COPY ${DOXYGEN_TEMPLATE_FILES} DESTINATION ${PROJECT_BINARY_DIR})
add_custom_target (
doxygen
#ALL
......
......@@ -28,7 +28,7 @@ cmake_minimum_required (VERSION 2.8.9)
# Version number
set (EMBB_BASE_VERSION_MAJOR 0)
set (EMBB_BASE_VERSION_MINOR 2)
set (EMBB_BASE_VERSION_PATCH 2)
set (EMBB_BASE_VERSION_PATCH 3)
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release" CACHE STRING
......
......@@ -21,10 +21,10 @@ processor cores. It builds on MTAPI, a standardized programming interface for
leveraging task parallelism in embedded systems containing symmetric or
asymmetric multicore processors. A core feature of MTAPI is low-overhead
scheduling of fine-grained tasks among the available cores during runtime.
Unlike existing libraries, EMB² supports task priorities, which allows the
creation of soft real-time systems. Additionally, the scheduling strategy can
be optimized for non-functional requirements such as minimal latency and
fairness.
Unlike existing libraries, EMB² supports task priorities and affinities, which
allows the creation of soft real-time systems. Additionally, the scheduling
strategy can be optimized for non-functional requirements such as minimal
latency and fairness.
Besides the task scheduler, EMB² provides basic parallel algorithms, concurrent
data structures, and skeletons for implementing stream processing applications
......
......@@ -107,6 +107,8 @@ void ForEachRecursive(RAI first, RAI last, Function unary,
difference_type distance = std::distance(first, last);
if (distance == 0) {
return;
} else if (distance < 0) {
EMBB_THROW(embb::base::ErrorException, "Negative range for ForEach");
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
......
......@@ -213,23 +213,24 @@ class MergeSortFunctor {
}
};
} // namespace internal
template<typename RAI, typename RAITemp, typename ComparisonFunction>
void MergeSort(
void MergeSortIteratorCheck(
RAI first,
RAI last,
RAITemp temporary_first,
ComparisonFunction comparison,
const embb::mtapi::ExecutionPolicy& policy,
size_t block_size
size_t block_size,
std::random_access_iterator_tag
) {
typedef typename std::iterator_traits<RAI>::difference_type difference_type;
typedef internal::MergeSortFunctor<RAI, RAITemp, ComparisonFunction>
typedef MergeSortFunctor<RAI, RAITemp, ComparisonFunction>
functor_t;
difference_type distance = std::distance(first, last);
if (distance == 0) {
EMBB_THROW(embb::base::ErrorException, "Distance for ForEach is 0");
return;
} else if (distance < 0) {
EMBB_THROW(embb::base::ErrorException, "Negative range for MergeSort");
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
......@@ -247,7 +248,7 @@ void MergeSort(
"Not enough MTAPI tasks available to perform merge sort");
}
internal::BlockSizePartitioner<RAI> partitioner(first, last, block_size);
BlockSizePartitioner<RAI> partitioner(first, last, block_size);
functor_t functor(0,
partitioner.Size() - 1,
temporary_first,
......@@ -264,7 +265,16 @@ void MergeSort(
task.Wait(MTAPI_INFINITE);
}
// @NOTE: Why is there no type guard for RAI?
} // namespace internal
template<typename RAI, typename RAITemp, typename ComparisonFunction>
void MergeSort(RAI first, RAI last, RAITemp temporary_first,
ComparisonFunction comparison, const embb::mtapi::ExecutionPolicy& policy,
size_t block_size) {
typedef typename std::iterator_traits<RAI>::iterator_category category;
internal::MergeSortIteratorCheck(first, last, temporary_first, comparison,
policy, block_size, category());
}
} // namespace algorithms
} // namespace embb
......
......@@ -186,16 +186,19 @@ class QuickSortFunctor {
QuickSortFunctor(const QuickSortFunctor&);
};
} // namespace internal
template <typename RAI, typename ComparisonFunction>
void QuickSort(RAI first, RAI last, ComparisonFunction comparison,
const embb::mtapi::ExecutionPolicy& policy, size_t block_size) {
void QuickSortIteratorCheck(RAI first, RAI last,
ComparisonFunction comparison,
const embb::mtapi::ExecutionPolicy& policy,
size_t block_size,
std::random_access_iterator_tag) {
embb::mtapi::Node& node = embb::mtapi::Node::GetInstance();
typedef typename std::iterator_traits<RAI>::difference_type difference_type;
difference_type distance = std::distance(first, last);
if (distance <= 0) {
if (distance == 0) {
return;
} else if (distance < 0) {
EMBB_THROW(embb::base::ErrorException, "Negative range for QuickSort");
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
......@@ -210,13 +213,23 @@ void QuickSort(RAI first, RAI last, ComparisonFunction comparison,
EMBB_THROW(embb::base::ErrorException,
"Not enough MTAPI tasks available for performing quick sort");
}
internal::QuickSortFunctor<RAI, ComparisonFunction> functor(
QuickSortFunctor<RAI, ComparisonFunction> functor(
first, last, comparison, policy, block_size);
mtapi::Task task = node.Spawn(mtapi::Action(base::MakeFunction(
functor, &internal::QuickSortFunctor<RAI, ComparisonFunction>::Action)));
functor, &QuickSortFunctor<RAI, ComparisonFunction>::Action)));
task.Wait(MTAPI_INFINITE);
}
} // namespace internal
template <typename RAI, typename ComparisonFunction>
void QuickSort(RAI first, RAI last, ComparisonFunction comparison,
const embb::mtapi::ExecutionPolicy& policy, size_t block_size) {
typedef typename std::iterator_traits<RAI>::iterator_category category;
internal::QuickSortIteratorCheck(first, last, comparison,
policy, block_size, category());
}
} // namespace algorithms
} // namespace embb
......
......@@ -129,7 +129,9 @@ ReturnType ReduceRecursive(RAI first, RAI last, ReturnType neutral,
typedef typename std::iterator_traits<RAI>::difference_type difference_type;
difference_type distance = std::distance(first, last);
if (distance == 0) {
EMBB_THROW(embb::base::ErrorException, "Distance for Reduce is 0");
return neutral;
} else if (distance < 0) {
EMBB_THROW(embb::base::ErrorException, "Negative range for Reduce");
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
......
......@@ -173,8 +173,10 @@ void ScanIteratorCheck(RAIIn first, RAIIn last, RAIOut output_iterator,
std::random_access_iterator_tag) {
typedef typename std::iterator_traits<RAIIn>::difference_type difference_type;
difference_type distance = std::distance(first, last);
if (distance <= 0) {
if (distance == 0) {
return;
} else if (distance < 0) {
EMBB_THROW(embb::base::ErrorException, "Negative range for Scan");
}
unsigned int num_cores = policy.GetCoreCount();
if (num_cores == 0) {
......
......@@ -167,9 +167,26 @@ void MergeSortAllocate(
typedef base::Allocation Alloc;
typename std::iterator_traits<RAI>::difference_type distance = last - first;
typedef typename std::iterator_traits<RAI>::value_type value_type;
if (distance == 0) {
return;
} else if (distance < 0) {
EMBB_THROW(embb::base::ErrorException, "Negative range for MergeSort");
}
value_type* temporary = static_cast<value_type*>(
Alloc::Allocate(distance * sizeof(value_type)));
MergeSort(first, last, temporary, comparison, policy, block_size);
EMBB_TRY {
MergeSort(first, last, temporary, comparison, policy, block_size);
} EMBB_CATCH (embb::base::ErrorException & e) {
// embb exception handling does not support catch(...) and rethrow yet.
Alloc::Free(temporary);
// Rethrow only, if exceptions are enabled... Otherwise, the parameter
// e cannot be used, as it is not defined.
#ifdef EMBB_USE_EXCEPTIONS
EMBB_THROW(embb::base::ErrorException, e.what());
#endif
}
Alloc::Free(temporary);
}
......
......@@ -211,6 +211,31 @@ void ForEachTest::TestPolicy() {
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector[i], init[i]*init[i]);
}
// ForEach on empty list should not throw:
ForEach(vector.begin(), vector.begin(), Square());
#ifdef EMBB_USE_EXCEPTIONS
bool empty_core_set_thrown = false;
try {
ForEach(vector.begin(), vector.end(), Square(), ExecutionPolicy(false));
}
catch (embb::base::ErrorException &) {
empty_core_set_thrown = true;
}
PT_EXPECT_MSG(empty_core_set_thrown,
"Empty core set should throw ErrorException");
bool negative_range_thrown = false;
try {
std::vector<int>::iterator second = vector.begin() + 1;
ForEach(second, vector.begin(), Square());
}
catch (embb::base::ErrorException &) {
negative_range_thrown = true;
}
PT_EXPECT_MSG(negative_range_thrown,
"Negative range should throw ErrorException");
#endif
}
void ForEachTest::StressTest() {
......
......@@ -156,28 +156,28 @@ void MergeSortTest::TestRanges() {
}
}
//void MergeSortTest::TestBlockSizes() {
// using embb::algorithms::MergeSortAllocate;
// using embb::algorithms::ExecutionPolicy;
// size_t count = 4;
// std::vector<int> init(count);
// std::vector<int> vector(count);
// std::vector<int> vector_copy(count);
// for (size_t i = count - 1; i > 0; i--) {
// init[i] = static_cast<int>(i+2);
// }
// vector_copy = init;
// std::sort(vector_copy.begin(), vector_copy.end());
//
// for (size_t block_size = 1; block_size < count + 2; block_size++) {
// vector = init;
// MergeSortAllocate(vector.begin(), vector.end(), std::less<int>(),
// ExecutionPolicy(), block_size);
// for (size_t i = 0; i < count; i++) {
// PT_EXPECT_EQ(vector[i], vector_copy[i]);
// }
// }
//}
void MergeSortTest::TestBlockSizes() {
using embb::algorithms::MergeSortAllocate;
using embb::mtapi::ExecutionPolicy;
size_t count = 4;
std::vector<int> init(count);
std::vector<int> vector(count);
std::vector<int> vector_copy(count);
for (size_t i = count - 1; i > 0; i--) {
init[i] = static_cast<int>(i+2);
}
vector_copy = init;
std::sort(vector_copy.begin(), vector_copy.end());
for (size_t block_size = 1; block_size < count + 2; block_size++) {
vector = init;
MergeSortAllocate(vector.begin(), vector.end(), std::less<int>(),
ExecutionPolicy(), block_size);
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector[i], vector_copy[i]);
}
}
}
void MergeSortTest::TestPolicy() {
using embb::algorithms::MergeSortAllocate;
......@@ -201,17 +201,43 @@ void MergeSortTest::TestPolicy() {
vector = init;
MergeSortAllocate(vector.begin(), vector.end(), std::less<int>(),
ExecutionPolicy(true));
ExecutionPolicy(true));
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]);
}
vector = init;
MergeSortAllocate(vector.begin(), vector.end(), std::less<int>(),
ExecutionPolicy(true, 1));
ExecutionPolicy(true, 1));
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]);
}
// MergeSort on empty list should not throw:
MergeSortAllocate(vector.begin(), vector.begin(), std::less<int>());
#ifdef EMBB_USE_EXCEPTIONS
bool empty_core_set_thrown = false;
try {
MergeSortAllocate(vector.begin(), vector.end(), std::less<int>(),
ExecutionPolicy(false));
}
catch (embb::base::ErrorException &) {
empty_core_set_thrown = true;
}
PT_EXPECT_MSG(empty_core_set_thrown,
"Empty core set should throw ErrorException");
bool negative_range_thrown = false;
try {
std::vector<int>::iterator second = vector.begin() + 1;
MergeSortAllocate(second, vector.begin(), std::less<int>());
}
catch (embb::base::ErrorException &) {
negative_range_thrown = true;
}
PT_EXPECT_MSG(negative_range_thrown,
"Negative range should throw ErrorException");
#endif
}
void MergeSortTest::StressTest() {
......
......@@ -58,7 +58,7 @@ class MergeSortTest : public partest::TestCase {
/**
* Tests various block sizes for the workers.
*/
//void TestBlockSizes();
void TestBlockSizes();
/**
* Tests setting policies (without checking their actual execution).
......
......@@ -218,6 +218,32 @@ void QuickSortTest::TestPolicy() {
for (size_t i = 0; i < count; i++) {
PT_EXPECT_EQ(vector_copy[i], vector[i]);
}
// MergeSort on empty list should not throw:
QuickSort(vector.begin(), vector.begin(), std::less<int>());
#ifdef EMBB_USE_EXCEPTIONS
bool empty_core_set_thrown = false;
try {
QuickSort(vector.begin(), vector.end(), std::less<int>(),
ExecutionPolicy(false));
}
catch (embb::base::ErrorException &) {
empty_core_set_thrown = true;
}
PT_EXPECT_MSG(empty_core_set_thrown,
"Empty core set should throw ErrorException");
bool negative_range_thrown = false;
try {
std::vector<int>::iterator second = vector.begin() + 1;
QuickSort(second, vector.begin(), std::less<int>());
}
catch (embb::base::ErrorException &) {
negative_range_thrown = true;
}
PT_EXPECT_MSG(negative_range_thrown,
"Negative range should throw ErrorException");
#endif
}
void QuickSortTest::StressTest() {
......
......@@ -181,6 +181,31 @@ void ReduceTest::TestPolicy() {
Identity(), ExecutionPolicy(true)), sum);
PT_EXPECT_EQ(Reduce(vector.begin(), vector.end(), 0, std::plus<int>(),
Identity(), ExecutionPolicy(true, 1)), sum);
// Empty list should return neutral element:
PT_EXPECT_EQ(Reduce(vector.begin(), vector.begin(), 41, std::plus<int>(),
Identity(), ExecutionPolicy(true, 1)), 41);
#ifdef EMBB_USE_EXCEPTIONS
bool empty_core_set_thrown = false;
try {
Reduce(vector.begin(), vector.end(), 0,
std::plus<int>(), Identity(),
ExecutionPolicy(false));
} catch (embb::base::ErrorException &) {
empty_core_set_thrown = true;
}
PT_EXPECT_MSG(empty_core_set_thrown,
"Empty core set should throw ErrorException");
bool negative_range_thrown = false;
try {
std::vector<int>::iterator second = vector.begin() + 1;
Reduce(second, vector.begin(), 0, std::plus<int>());
}
catch (embb::base::ErrorException &) {
negative_range_thrown = true;
}
PT_EXPECT_MSG(negative_range_thrown,
"Negative range should throw ErrorException");
#endif
}
void ReduceTest::StressTest() {
......
......@@ -290,6 +290,35 @@ void ScanTest::TestPolicy() {
expected += vector[i];
PT_EXPECT_EQ(expected, outputVector[i]);
}
// Empty list should not throw and not change output:
outputVector = init;
std::vector<int>::iterator out_it = outputVector.begin();
Scan(vector.begin(), vector.begin(), out_it, 0, std::plus<int>());
PT_EXPECT(out_it == outputVector.begin());
#ifdef EMBB_USE_EXCEPTIONS
bool empty_core_set_thrown = false;
try {
Scan(vector.begin(), vector.end(), outputVector.begin(),
0, std::plus<int>(), Identity(),
ExecutionPolicy(false));
}
catch (embb::base::ErrorException &) {
empty_core_set_thrown = true;
}
PT_EXPECT_MSG(empty_core_set_thrown,
"Empty core set should throw ErrorException");
bool negative_range_thrown = false;
try {
std::vector<int>::iterator second = vector.begin() + 1;
Scan(second, vector.begin(), outputVector.begin(), 0, std::plus<int>());
}
catch (embb::base::ErrorException &) {
negative_range_thrown = true;
}
PT_EXPECT_MSG(negative_range_thrown,
"Negative range should throw ErrorException");
#endif
}
void ScanTest::StressTest() {
......
......@@ -46,6 +46,9 @@ namespace base {
* Represents a relative time duration for a given tick type.
*
* \notthreadsafe
* \note The typedefs DurationSeconds, DurationMilliseconds,
* DurationMicroseconds, and DurationNanoseconds provide directly usable
* duration types.
* \tparam Tick Possible tick types are Seconds, Milliseconds, Microseconds,
* Nanoseconds
* \ingroup CPP_BASE_TIMEDURATION
......@@ -271,6 +274,8 @@ Duration<Tick> operator+(
return Duration<Tick>(lhs.Count() + rhs.Count());
}
namespace internal {
/**
* Base class for ticks.
*/
......@@ -517,6 +522,33 @@ class Nanoseconds : public Tick {
static unsigned long long Max();
};
} // namespace internal
/**
* Duration with seconds tick.
*
* \ingroup CPP_BASE_TIMEDURATION
*/
typedef Duration<internal::Seconds> DurationSeconds;
/**
* Duration with milliseconds tick.
*
* \ingroup CPP_BASE_TIMEDURATION
*/
typedef Duration<internal::Milliseconds> DurationMilliseconds;
/**
* Duration with microseconds tick.
*
* \ingroup CPP_BASE_TIMEDURATION
*/
typedef Duration<internal::Microseconds> DurationMicroseconds;
/**
* Duration with nanoseconds tick.
*
* \ingroup CPP_BASE_TIMEDURATION
*/
typedef Duration<internal::Nanoseconds> DurationNanoseconds;
} // namespace base
} // namespace embb
......
......@@ -35,7 +35,7 @@
namespace embb {
namespace base {
void Tick::CheckExceptions(int status, const char* msg) {
void internal::Tick::CheckExceptions(int status, const char* msg) {
switch (status) {
case EMBB_SUCCESS: return;
case EMBB_OVERFLOW: EMBB_THROW(OverflowException, msg);
......@@ -44,16 +44,18 @@ void Tick::CheckExceptions(int status, const char* msg) {
}
}
int Seconds::Set(embb_duration_t& duration, unsigned long long ticks) {
int internal::Seconds::Set(embb_duration_t& duration,
unsigned long long ticks) {
return embb_duration_set_seconds(&duration, ticks);
}
void Seconds::SetAndCheck(embb_duration_t& duration, unsigned long long ticks) {
void internal::Seconds::SetAndCheck(embb_duration_t& duration,
unsigned long long ticks) {
int status = Set(duration, ticks);
CheckExceptions(status, "Setting duration from seconds");
}
unsigned long long Seconds::Get(const embb_duration_t& duration) {
unsigned long long internal::Seconds::Get(const embb_duration_t& duration) {
unsigned long long ticks = 0;
int status = embb_duration_as_seconds(&duration, &ticks);
assert(status == EMBB_SUCCESS);
......@@ -61,25 +63,27 @@ unsigned long long Seconds::Get(const embb_duration_t& duration) {
return ticks;
}
unsigned long long Seconds::Min() {
unsigned long long internal::Seconds::Min() {
return 1;
}
unsigned long long Seconds::Max() {
unsigned long long internal::Seconds::Max() {
return EMBB_DURATION_MAX_SECONDS;
}
int Milliseconds::Set(embb_duration_t& duration, unsigned long long ticks) {
int internal::Milliseconds::Set(embb_duration_t& duration,
unsigned long long ticks) {
return embb_duration_set_milliseconds(&duration, ticks);
}
void Milliseconds::SetAndCheck(
void internal::Milliseconds::SetAndCheck(
embb_duration_t& duration, unsigned long long ticks) {
int status = Set(duration, ticks);
CheckExceptions(status, "Setting duration from milliseconds");
}
unsigned long long Milliseconds::Get(const embb_duration_t& duration) {
unsigned long long internal::Milliseconds::Get(
const embb_duration_t& duration) {
unsigned long long ticks = 0;
int status = embb_duration_as_milliseconds(&duration, &ticks);
assert(status == EMBB_SUCCESS);
......@@ -87,7 +91,7 @@ unsigned long long Milliseconds::Get(const embb_duration_t& duration) {
return ticks;
}
unsigned long long Milliseconds::Min() {
unsigned long long internal::Milliseconds::Min() {
#if EMBB_DURATION_MIN_NANOSECONDS > 1000000
assert(EMBB_DURATION_MIN_NANOSECONDS % 1000000 == 0);
return EMBB_DURATION_MIN_NANOSECONDS / 1000000;
......@@ -95,7 +99,7 @@ unsigned long long Milliseconds::Min() {
return 1;
}
unsigned long long Milliseconds::Max() {
unsigned long long internal::Milliseconds::Max() {
#if EMBB_DURATION_MAX_SECONDS < ULLONG_MAX / 1000
return ULLONG_MAX;
#else
......@@ -103,17 +107,19 @@ unsigned long long Milliseconds::Max() {
#endif
}
int Microseconds::Set(embb_duration_t& duration, unsigned long long ticks) {
int internal::Microseconds::Set(embb_duration_t& duration,
unsigned long long ticks) {
return embb_duration_set_microseconds(&duration, ticks);
}
void Microseconds::SetAndCheck(
void internal::Microseconds::SetAndCheck(
embb_duration_t& duration, unsigned long long ticks) {
int status = Set(duration, ticks);
CheckExceptions(status, "Setting duration from microseconds");
}
unsigned long long Microseconds::Get(const embb_duration_t& duration) {
unsigned long long internal::Microseconds::Get(
const embb_duration_t& duration) {
unsigned long long ticks = 0;
int status = embb_duration_as_microseconds(&duration, &ticks);
......@@ -123,7 +129,7 @@ unsigned long long Microseconds::Get(const embb_duration_t& duration) {
return ticks;
}
unsigned long long Microseconds::Min() {
unsigned long long internal::Microseconds::Min() {
#if EMBB_DURATION_MIN_NANOSECONDS > 1000
assert(EMBB_DURATION_MIN_NANOSECONDS % 1000 == 0);
return EMBB_DURATION_MIN_NANOSECONDS / 1000;
......@@ -131,7 +137,7 @@ unsigned long long Microseconds::Min() {
return 1;
}
unsigned long long Microseconds::Max() {
unsigned long long internal::Microseconds::Max() {
#if EMBB_DURATION_MAX_SECONDS < ULLONG_MAX / 1000000
return ULLONG_MAX;
#else
......@@ -139,17 +145,18 @@ unsigned long long Microseconds::Max() {
#endif
}
int Nanoseconds::Set(embb_duration_t& duration, unsigned long long ticks) {
int internal::Nanoseconds::Set(embb_duration_t& duration,
unsigned long long ticks) {
return embb_duration_set_nanoseconds(&duration, ticks);
}
void Nanoseconds::SetAndCheck(
void internal::Nanoseconds::SetAndCheck(
embb_duration_t& duration, unsigned long long ticks) {
int status = Set(duration, ticks);
CheckExceptions(status, "Setting duration from microseconds");
}
unsigned long long Nanoseconds::Get(const embb_duration_t& duration) {
unsigned long long internal::Nanoseconds::Get(const embb_duration_t& duration) {
unsigned long long ticks = 0;
int status = embb_duration_as_nanoseconds(&duration, &ticks);
assert(status == EMBB_SUCCESS);
......@@ -157,11 +164,11 @@ unsigned long long Nanoseconds::Get(const embb_duration_t& duration) {
return ticks;
}
unsigned long long Nanoseconds::Min() {
unsigned long long internal::Nanoseconds::Min() {
return EMBB_DURATION_MIN_NANOSECONDS;
}
unsigned long long Nanoseconds::Max() {
unsigned long long internal::Nanoseconds::Max() {
#if EMBB_DURATION_MAX_SECONDS < ULLONG_MAX / 1000000000
return ULLONG_MAX;
#else
......
......@@ -63,15 +63,15 @@ void ConditionVarTest::TestTimedWaitTimeouts() {
PT_EXPECT_EQ(success, false);
// Wait for a future timepoint
success = cond.WaitUntil(lock, Time(Duration<Milliseconds>(1)));
success = cond.WaitUntil(lock, Time(DurationMilliseconds(1)));
PT_EXPECT_EQ(success, false);
// Wait for a zero duration
success = cond.WaitFor(lock, Duration<Milliseconds>());
success = cond.WaitFor(lock, DurationMilliseconds());
PT_EXPECT_EQ(success, false);
// Wait for some duration
success = cond.WaitFor(lock, Duration<Milliseconds>(1));
success = cond.WaitFor(lock, DurationMilliseconds(1));
PT_EXPECT_EQ(success, false);
}
......@@ -96,14 +96,14 @@ void ConditionVarTest::TestNotify() {
cond_notify_.NotifyOne();
cond_wait_.WaitUntil(lock_wait, Time(Duration<Milliseconds>(1)));
cond_wait_.WaitUntil(lock_wait, Time(DurationMilliseconds(1)));
while (embb_counter_get(&counter_) == 0)
{} // If hangs here signal has not succeeded
PT_ASSERT_EQ_MSG(embb_counter_get(&counter_),
static_cast<unsigned int>(1), "Only 1 thread notified");
cond_notify_.NotifyAll();
cond_wait_.WaitUntil(lock_wait, Time(Duration<Milliseconds>(2)));
cond_wait_.WaitUntil(lock_wait, Time(DurationMilliseconds(2)));
while (embb_counter_get(&counter_) !=
static_cast<unsigned int>(num_threads_-1))
......
......@@ -31,10 +31,13 @@ namespace base {
namespace test {
DurationTest::DurationTest() {
CreateUnit("Seconds").Add(&DurationTest::Test<Seconds>, this);
CreateUnit("Milliseconds").Add(&DurationTest::Test<Milliseconds>, this);
CreateUnit("Microseconds").Add(&DurationTest::Test<Microseconds>, this);
CreateUnit("Nanoseconds").Add(&DurationTest::Test<Nanoseconds>, this);
CreateUnit("Seconds").Add(&DurationTest::Test<internal::Seconds>, this);
CreateUnit("Milliseconds").Add(&DurationTest::Test<internal::Milliseconds>,
this);
CreateUnit("Microseconds").Add(&DurationTest::Test<internal::Microseconds>,
this);
CreateUnit("Nanoseconds").Add(&DurationTest::Test<internal::Nanoseconds>,
this);
}
} // namespace test
......
......@@ -26,7 +26,7 @@ target_link_libraries(embb_dataflow_cpp embb_mtapi_cpp embb_base_cpp embb_mtapi_
if (BUILD_TESTS STREQUAL ON)
include_directories(${CMAKE_CURRENT_BINARY_DIR}/../partest/include)
add_executable (embb_dataflow_cpp_test ${EMBB_DATAFLOW_CPP_TEST_SOURCES})
target_link_libraries(embb_dataflow_cpp_test embb_mtapi_cpp embb_mtapi_c partest
target_link_libraries(embb_dataflow_cpp_test embb_dataflow_cpp embb_mtapi_cpp embb_mtapi_c partest
embb_base_cpp embb_base_c ${compiler_libs})
CopyBin(BIN embb_dataflow_cpp_test DEST ${local_install_dir})
endif()
......
......@@ -27,6 +27,7 @@
#ifndef EMBB_DATAFLOW_INTERNAL_INPUTS_H_
#define EMBB_DATAFLOW_INTERNAL_INPUTS_H_
#include <embb/base/atomic.h>
#include <embb/dataflow/internal/tuple.h>
#include <embb/dataflow/internal/in.h>
......
......@@ -28,7 +28,7 @@
#define EMBB_DATAFLOW_INTERNAL_NODE_H_
#include <cstddef>
#include <embb/base/exceptions.h>
#include <embb/dataflow/internal/scheduler.h>
namespace embb {
......@@ -50,6 +50,9 @@ class Node {
protected:
Scheduler * sched_;
static int next_process_id_;
static int GetNextProcessID() { return next_process_id_++; }
};
} // namespace internal
......
......@@ -56,6 +56,13 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
explicit Process(FunctionType function)
: executor_(function) {
next_clock_ = 0;
queued_clock_ = 0;
bool ordered = Serial;
if (ordered) {
queue_id_ = GetNextProcessID();
} else {
queue_id_ = 0;
}
inputs_.SetListener(this);
}
......@@ -95,21 +102,39 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
}
virtual void OnClock(int clock) {
if (!inputs_.AreAtClock(clock))
if (!inputs_.AreAtClock(clock)) {
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
}
bool ordered = Serial;
if (ordered) {
lock_.Lock();
for (int ii = next_clock_; ii < next_clock_ + Slices; ii++) {
if (!inputs_.AreAtClock(ii)) {
break;
bool retry = true;
while (retry) {
int clk = next_clock_;
int clk_end = clk + Slices;
int clk_res = clk;
for (int ii = clk; ii < clk_end; ii++) {
if (!inputs_.AreAtClock(ii)) {
break;
}
clk_res++;
}
if (clk_res > clk) {
if (next_clock_.CompareAndSwap(clk, clk_res)) {
while (queued_clock_.Load() < clk) continue;
for (int ii = clk; ii < clk_res; ii++) {
const int idx = ii % Slices;
action_[idx] = Action(this, ii);
sched_->Enqueue(queue_id_, action_[idx]);
}
queued_clock_.Store(clk_res);
retry = false;
}
} else {
retry = false;
}
next_clock_ = ii + 1;
Run(ii);
}
lock_.Unlock();
} else {
const int idx = clock % Slices;
action_[idx] = Action(this, clock);
......@@ -121,9 +146,10 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
InputsType inputs_;
OutputsType outputs_;
ExecutorType executor_;
int next_clock_;
Action action_[Slices];
SpinLock lock_;
embb::base::Atomic<int> next_clock_;
embb::base::Atomic<int> queued_clock_;
int queue_id_;
};
} // namespace internal
......
......@@ -38,6 +38,7 @@ class Scheduler {
Scheduler() {}
virtual ~Scheduler() {}
virtual void Spawn(Action & action) = 0;
virtual void Enqueue(int process_id, Action & action) = 0;
virtual void WaitForSlice(int slice) = 0;
};
......
......@@ -45,6 +45,16 @@ class SchedulerMTAPI : public Scheduler {
embb::mtapi::Group & group = node.CreateGroup();
group_[ii] = &group;
}
queue_count_ = static_cast<int>(node.GetWorkerThreadCount());
queue_ = reinterpret_cast<embb::mtapi::Queue**>(
embb::base::Allocation::Allocate(
sizeof(embb::mtapi::Queue*)*queue_count_));
for (int ii = 0; ii < queue_count_; ii++) {
embb::mtapi::Queue & queue = node.CreateQueue(0, true);
queue_[ii] = &queue;
}
}
virtual ~SchedulerMTAPI() {
embb::mtapi::Node & node = embb::mtapi::Node::GetInstance();
......@@ -52,17 +62,29 @@ class SchedulerMTAPI : public Scheduler {
group_[ii]->WaitAll(MTAPI_INFINITE);
node.DestroyGroup(*group_[ii]);
}
for (int ii = 0; ii < queue_count_; ii++) {
node.DestroyQueue(*queue_[ii]);
}
embb::base::Allocation::Free(queue_);
}
virtual void Spawn(Action & action) {
const int idx = action.GetClock() % Slices;
group_[idx]->Spawn(embb::base::MakeFunction(action, &Action::RunMTAPI));
}
virtual void Enqueue(int process_id, Action & action) {
const int idx = action.GetClock() % Slices;
const int queue_id = process_id % queue_count_;
queue_[queue_id]->Spawn(group_[idx],
embb::base::MakeFunction(action, &Action::RunMTAPI));
}
virtual void WaitForSlice(int slice) {
group_[slice]->WaitAll(MTAPI_INFINITE);
}
private:
embb::mtapi::Group * group_[Slices];
embb::mtapi::Queue ** queue_;
int queue_count_;
};
} // namespace internal
......
......@@ -41,6 +41,9 @@ class SchedulerSequential : public Scheduler {
virtual void Spawn(Action & action) {
action.RunSequential();
}
virtual void Enqueue(int, Action & action) {
action.RunSequential();
}
virtual void WaitForSlice(int /*slice*/) {}
};
......
......@@ -27,7 +27,7 @@
#ifndef EMBB_DATAFLOW_INTERNAL_SIGNAL_H_
#define EMBB_DATAFLOW_INTERNAL_SIGNAL_H_
#include <embb/dataflow/internal/spinlock.h>
#include <embb/base/c/atomic.h>
namespace embb {
namespace dataflow {
......@@ -42,27 +42,23 @@ class Signal {
Signal(Signal const & other)
: blank_(other.blank_), value_(other.value_), clock_(other.clock_) {}
void operator = (Signal const & rhs) {
lock_.Lock();
blank_ = rhs.blank_;
value_ = rhs.value_;
clock_ = rhs.clock_;
lock_.Unlock();
embb_atomic_memory_barrier();
}
int GetClock() const { return clock_; }
bool IsBlank() const { return blank_; }
Type const & GetValue() const { return value_; }
void Clear() {
lock_.Lock();
blank_ = true;
clock_ = -1;
lock_.Unlock();
}
private:
bool blank_;
Type value_;
int clock_;
SpinLock lock_;
};
} // namespace internal
......
......@@ -52,6 +52,8 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
explicit Sink(FunctionType function)
: executor_(function) {
next_clock_ = 0;
queued_clock_ = 0;
queue_id_ = GetNextProcessID();
inputs_.SetListener(this);
}
......@@ -80,32 +82,47 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
}
virtual void OnClock(int clock) {
TrySpawn(clock);
if (!inputs_.AreAtClock(clock)) {
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
}
bool retry = true;
while (retry) {
int clk = next_clock_;
int clk_end = clk + Slices;
int clk_res = clk;
for (int ii = clk; ii < clk_end; ii++) {
if (!inputs_.AreAtClock(ii)) {
break;
}
clk_res++;
}
if (clk_res > clk) {
if (next_clock_.CompareAndSwap(clk, clk_res)) {
while (queued_clock_.Load() < clk) continue;
for (int ii = clk; ii < clk_res; ii++) {
const int idx = ii % Slices;
action_[idx] = Action(this, ii);
sched_->Enqueue(queue_id_, action_[idx]);
}
queued_clock_.Store(clk_res);
retry = false;
}
} else {
retry = false;
}
}
}
private:
InputsType inputs_;
ExecutorType executor_;
int next_clock_;
Action action_[Slices];
ClockListener * listener_;
SpinLock lock_;
void TrySpawn(int clock) {
if (!inputs_.AreAtClock(clock))
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
lock_.Lock();
for (int ii = next_clock_; ii < next_clock_ + Slices; ii++) {
if (!inputs_.AreAtClock(ii)) {
break;
}
next_clock_ = ii + 1;
Run(ii);
}
lock_.Unlock();
}
embb::base::Atomic<int> next_clock_;
embb::base::Atomic<int> queued_clock_;
int queue_id_;
};
} // namespace internal
......
......@@ -27,13 +27,9 @@
#ifndef EMBB_DATAFLOW_INTERNAL_SOURCE_H_
#define EMBB_DATAFLOW_INTERNAL_SOURCE_H_
#include <embb/base/atomic.h>
#include <embb/base/thread.h>
#include <embb/dataflow/internal/node.h>
#include <embb/dataflow/internal/outputs.h>
#include <embb/dataflow/internal/source_executor.h>
#include <embb/dataflow/internal/action.h>
namespace embb {
namespace dataflow {
......@@ -53,7 +49,6 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
explicit Source(FunctionType function)
: executor_(function), not_done_(true) {
next_clock_ = 0;
}
virtual bool HasOutputs() const {
......@@ -62,7 +57,6 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
virtual void Run(int clock) {
not_done_ = executor_.Execute(clock, outputs_);
next_clock_++;
}
virtual bool Start(int clock) {
......@@ -89,9 +83,7 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
private:
OutputsType outputs_;
ExecutorType executor_;
Action action_[Slices];
volatile bool not_done_;
embb::base::Atomic<int> next_clock_;
};
} // namespace internal
......
......@@ -24,3 +24,6 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <embb/dataflow/internal/node.h>
int embb::dataflow::internal::Node::next_process_id_ = 0;
......@@ -60,7 +60,7 @@ bool sourceFunc(int & out) {
source_array[source_counter] = out;
source_counter++;
return source_counter < 12;
return source_counter < TEST_COUNT;
}
embb::base::Atomic<int> pred_counter;
......
......@@ -125,7 +125,7 @@ SHOW_USED_FILES = NO
SHOW_FILES = YES
SHOW_NAMESPACES = YES
FILE_VERSION_FILTER =
LAYOUT_FILE = @CMAKE_SOURCE_DIR@/doc/reference/DoxygenLayout.xml
LAYOUT_FILE = "@CMAKE_SOURCE_DIR@/doc/reference/DoxygenLayout.xml"
CITE_BIB_FILES =
# ==============================================================================
......@@ -170,7 +170,7 @@ EXCLUDE_SYMBOLS = *test* \
EXAMPLE_PATH =
EXAMPLE_PATTERNS = *
EXAMPLE_RECURSIVE = NO
IMAGE_PATH = @CMAKE_SOURCE_DIR@/doc
IMAGE_PATH = "@CMAKE_SOURCE_DIR@/doc"
INPUT_FILTER =
FILTER_PATTERNS =
FILTER_SOURCE_FILES = NO
......@@ -208,10 +208,9 @@ IGNORE_PREFIX = cm
GENERATE_HTML = YES
HTML_OUTPUT = html
HTML_FILE_EXTENSION = .html
HTML_HEADER = header.html
HTML_FOOTER =
HTML_STYLESHEET =
HTML_EXTRA_STYLESHEET = @CMAKE_SOURCE_DIR@/doc/reference/DoxygenHTMLStyle.css
HTML_EXTRA_STYLESHEET = "@CMAKE_SOURCE_DIR@/doc/reference/DoxygenHTMLStyle.css"
HTML_EXTRA_FILES =
HTML_COLORSTYLE_HUE = 220
HTML_COLORSTYLE_SAT = 100
......
......@@ -20,10 +20,10 @@ programming interface for leveraging task parallelism in embedded
systems containing symmetric or asymmetric multicore processors. A core
feature of MTAPI is low-overhead scheduling of fine-grained tasks among
the available cores during runtime. Unlike existing libraries,
EMB<sup>2</sup> supports task priorities, which allows the creation of
soft real-time systems. Additionally, the scheduling strategy can be
optimized for non-functional requirements such as minimal latency and
fairness.
EMB<sup>2</sup> supports task priorities and affinities, which allows
the creation of soft real-time systems. Additionally, the scheduling
strategy can be optimized for non-functional requirements such as minimal
latency and fairness.
Besides the task scheduler, EMB<sup>2</sup> provides basic parallel
algorithms, concurrent data structures, and skeletons for implementing
......
<!-- HTML header for doxygen 1.8.6-->
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/xhtml;charset=UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=9"/>
<meta name="generator" content="Doxygen $doxygenversion"/>
<!--BEGIN PROJECT_NAME--><title>$projectname: $title</title><!--END PROJECT_NAME-->
<!--BEGIN !PROJECT_NAME--><title>$title</title><!--END !PROJECT_NAME-->
<link href="$relpath^tabs.css" rel="stylesheet" type="text/css"/>
<script type="text/javascript" src="$relpath^jquery.js"></script>
<script type="text/javascript" src="$relpath^dynsections.js"></script>
<script type="text/javascript">
var is_modules_page = location.pathname.match(".*modules.html");
$(document).ready(function() { if (toggleLevel && is_modules_page) toggleLevel(2); });
</script>
$treeview
$search
$mathjax
<link href="$relpath^$stylesheet" rel="stylesheet" type="text/css" />
$extrastylesheet
</head>
<body>
<div id="top"><!-- do not remove this div, it is closed by doxygen! -->
<!--BEGIN TITLEAREA-->
<div id="titlearea">
<table cellspacing="0" cellpadding="0">
<tbody>
<tr style="height: 56px;">
<!--BEGIN PROJECT_LOGO-->
<td id="projectlogo"><img alt="Logo" src="$relpath^$projectlogo"/></td>
<!--END PROJECT_LOGO-->
<!--BEGIN PROJECT_NAME-->
<td style="padding-left: 0.5em;">
<div id="projectname">$projectname
<!--BEGIN PROJECT_NUMBER-->&#160;<span id="projectnumber">$projectnumber</span><!--END PROJECT_NUMBER-->
</div>
<!--BEGIN PROJECT_BRIEF--><div id="projectbrief">$projectbrief</div><!--END PROJECT_BRIEF-->
</td>
<!--END PROJECT_NAME-->
<!--BEGIN !PROJECT_NAME-->
<!--BEGIN PROJECT_BRIEF-->
<td style="padding-left: 0.5em;">
<div id="projectbrief">$projectbrief</div>
</td>
<!--END PROJECT_BRIEF-->
<!--END !PROJECT_NAME-->
<!--BEGIN DISABLE_INDEX-->
<!--BEGIN SEARCHENGINE-->
<td>$searchbox</td>
<!--END SEARCHENGINE-->
<!--END DISABLE_INDEX-->
</tr>
</tbody>
</table>
</div>
<!--END TITLEAREA-->
<!-- end header part -->
......@@ -304,13 +304,16 @@ int embb_mtapi_scheduler_worker(void * arg) {
switch (task->state) {
case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
case MTAPI_TASK_RUNNING:
/* there was work, execute it */
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, task);
embb_mtapi_task_execute(task, &task_context);
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
if (embb_mtapi_task_execute(task, &task_context)) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
}
counter = 0;
break;
......@@ -318,25 +321,27 @@ int embb_mtapi_scheduler_worker(void * arg) {
case MTAPI_TASK_RETAINED:
/* put task into queue again for later execution */
embb_mtapi_scheduler_schedule_task(
node->scheduler, task);
node->scheduler, task, 0);
/* yield, as there may be only retained tasks in the queue */
embb_thread_yield();
/* task is not done, so do not notify queue */
break;
case MTAPI_TASK_CANCELLED:
/* set return value to cancelled */
/* set return value to canceled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
}
break;
case MTAPI_TASK_COMPLETED:
case MTAPI_TASK_DELETED:
case MTAPI_TASK_WAITING:
case MTAPI_TASK_RUNNING:
case MTAPI_TASK_CREATED:
case MTAPI_TASK_PRENATAL:
case MTAPI_TASK_ERROR:
......@@ -356,12 +361,14 @@ int embb_mtapi_scheduler_worker(void * arg) {
counter++;
} else {
/* no work, go to sleep */
embb_atomic_store_int(&thread_context->is_sleeping, 1);
embb_mutex_lock(&thread_context->work_available_mutex);
embb_condition_wait_for(
&thread_context->work_available,
&thread_context->work_available_mutex,
&sleep_duration);
embb_mutex_unlock(&thread_context->work_available_mutex);
embb_atomic_store_int(&thread_context->is_sleeping, 0);
}
}
......@@ -531,10 +538,11 @@ mtapi_boolean_t embb_mtapi_scheduler_process_tasks(
mtapi_boolean_t embb_mtapi_scheduler_schedule_task(
embb_mtapi_scheduler_t * that,
embb_mtapi_task_t * task) {
embb_mtapi_task_t * task,
mtapi_uint_t instance) {
embb_mtapi_scheduler_t * scheduler = that;
/* distribute round robin */
mtapi_uint_t ii = task->handle.id % scheduler->worker_count;
mtapi_uint_t ii = (task->handle.id + instance) % scheduler->worker_count;
mtapi_boolean_t pushed = MTAPI_FALSE;
embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
......@@ -591,8 +599,8 @@ mtapi_boolean_t embb_mtapi_scheduler_schedule_task(
}
if (pushed) {
/* signal all threads */
for (ii = 0; ii < scheduler->worker_count; ii++) {
/* signal the worker thread a task was pushed to */
if (embb_atomic_load_int(&scheduler->worker_contexts[ii].is_sleeping)) {
embb_condition_notify_one(
&scheduler->worker_contexts[ii].work_available);
}
......
......@@ -198,7 +198,8 @@ mtapi_boolean_t embb_mtapi_scheduler_process_tasks(
*/
mtapi_boolean_t embb_mtapi_scheduler_schedule_task(
embb_mtapi_scheduler_t * that,
embb_mtapi_task_t * task);
embb_mtapi_task_t * task,
mtapi_uint_t instance);
#ifdef __cplusplus
......
......@@ -51,8 +51,8 @@ void embb_mtapi_task_context_initialize_with_thread_context_and_task(
that->task = task;
that->thread_context = thread_context;
that->num_instances = task->attributes.num_instances;
that->instance_num = embb_atomic_fetch_and_add_unsigned_int(
&task->current_instance, 1);
that->instance_num =
embb_atomic_fetch_and_add_unsigned_int(&task->current_instance, 1);
}
void embb_mtapi_task_context_finalize(embb_mtapi_task_context_t* that) {
......
......@@ -95,9 +95,11 @@ void embb_mtapi_task_finalize(embb_mtapi_task_t* that) {
embb_mtapi_spinlock_finalize(&that->state_lock);
}
void embb_mtapi_task_execute(
mtapi_boolean_t embb_mtapi_task_execute(
embb_mtapi_task_t* that,
embb_mtapi_task_context_t * context) {
unsigned int todo = that->attributes.num_instances;
assert(MTAPI_NULL != that);
assert(MTAPI_NULL != context);
......@@ -110,17 +112,25 @@ void embb_mtapi_task_execute(
embb_mtapi_action_t* local_action =
embb_mtapi_action_pool_get_storage_for_handle(
context->thread_context->node->action_pool, that->action);
local_action->action_function(
that->arguments,
that->arguments_size,
that->result_buffer,
that->result_size,
local_action->node_local_data,
local_action->node_local_data_size,
context);
/* only continue if there was no error so far */
if (context->task->error_code == MTAPI_SUCCESS) {
local_action->action_function(
that->arguments,
that->arguments_size,
that->result_buffer,
that->result_size,
local_action->node_local_data,
local_action->node_local_data_size,
context);
}
embb_atomic_memory_barrier();
/* task has completed successfully */
embb_mtapi_task_set_state(that, MTAPI_TASK_COMPLETED);
todo = embb_atomic_fetch_and_add_unsigned_int(
&that->instances_todo, (unsigned int)-1);
if (todo == 1) {
/* task has completed successfully */
embb_mtapi_task_set_state(that, MTAPI_TASK_COMPLETED);
}
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
} else {
/* action was deleted, task did not complete */
......@@ -128,13 +138,18 @@ void embb_mtapi_task_execute(
embb_mtapi_task_set_state(that, MTAPI_TASK_ERROR);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
context->thread_context->node->group_pool, that->group)) {
embb_mtapi_group_t* local_group =
embb_mtapi_group_pool_get_storage_for_handle(
context->thread_context->node->group_pool, that->group);
embb_mtapi_task_queue_push(&local_group->queue, that);
if (todo == 1) {
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
context->thread_context->node->group_pool, that->group)) {
embb_mtapi_group_t* local_group =
embb_mtapi_group_pool_get_storage_for_handle(
context->thread_context->node->group_pool, that->group);
embb_mtapi_task_queue_push(&local_group->queue, that);
}
return MTAPI_TRUE;
} else {
return MTAPI_FALSE;
}
}
......@@ -189,6 +204,9 @@ static mtapi_task_hndl_t embb_mtapi_task_start(
mtapi_taskattr_init(&task->attributes, &local_status);
}
embb_atomic_store_unsigned_int(
&task->instances_todo, task->attributes.num_instances);
if (embb_mtapi_group_pool_is_handle_valid(node->group_pool, group)) {
embb_mtapi_group_t* local_group =
embb_mtapi_group_pool_get_storage_for_handle(
......@@ -259,8 +277,12 @@ static mtapi_task_hndl_t embb_mtapi_task_start(
MTAPI_TRUE : MTAPI_FALSE;
} else {
/* schedule local task */
was_scheduled =
embb_mtapi_scheduler_schedule_task(scheduler, task);
was_scheduled = MTAPI_TRUE;
for (mtapi_uint_t kk = 0; kk < task->attributes.num_instances; kk++) {
was_scheduled = was_scheduled &
embb_mtapi_scheduler_schedule_task(scheduler, task, kk);
}
}
if (was_scheduled) {
......
......@@ -68,6 +68,7 @@ struct embb_mtapi_task_struct {
embb_mtapi_spinlock_t state_lock;
volatile mtapi_task_state_t state;
embb_atomic_unsigned_int current_instance;
embb_atomic_unsigned_int instances_todo;
mtapi_status_t error_code;
};
......@@ -106,7 +107,7 @@ void embb_mtapi_task_finalize(embb_mtapi_task_t* that);
* detached.
* \memberof embb_mtapi_task_struct
*/
void embb_mtapi_task_execute(
mtapi_boolean_t embb_mtapi_task_execute(
embb_mtapi_task_t* that,
embb_mtapi_task_context_t * context);
......
......@@ -70,6 +70,7 @@ void embb_mtapi_thread_context_initialize_with_node_worker_and_core(
embb_mutex_init(&that->work_available_mutex, EMBB_MUTEX_PLAIN);
embb_condition_init(&that->work_available);
embb_atomic_store_int(&that->is_sleeping, 0);
}
mtapi_boolean_t embb_mtapi_thread_context_start(
......
......@@ -56,6 +56,7 @@ struct embb_mtapi_thread_context_struct {
embb_condition_t work_available;
embb_thread_t thread;
embb_tss_t tss_id;
embb_atomic_int is_sleeping;
embb_mtapi_node_t* node;
embb_mtapi_task_queue_t** queue;
......
......@@ -33,6 +33,7 @@
#include <embb/base/c/internal/unused.h>
#define JOB_TEST_TASK 42
#define JOB_TEST_MULTIINSTANCE_TASK 43
#define TASK_TEST_ID 23
static void testTaskAction(
......@@ -44,7 +45,9 @@ static void testTaskAction(
mtapi_size_t /*node_local_data_size*/,
mtapi_task_context_t* task_context) {
int ii;
mtapi_uint_t core_num = mtapi_context_corenum_get(task_context, MTAPI_NULL);
mtapi_status_t status;
mtapi_uint_t core_num = mtapi_context_corenum_get(task_context, &status);
MTAPI_CHECK_STATUS(status);
srand(core_num);
for (ii = 1000; ii < rand()%1000000; ii ++) {
}
......@@ -53,6 +56,41 @@ static void testTaskAction(
EMBB_UNUSED(args);
}
void testMultiInstanceTaskAction(
const void* args,
mtapi_size_t arg_size,
void* result_buffer,
mtapi_size_t result_buffer_size,
const void* node_local_data,
mtapi_size_t node_local_data_size,
mtapi_task_context_t* task_context) {
EMBB_UNUSED(args);
EMBB_UNUSED(arg_size);
EMBB_UNUSED(node_local_data);
EMBB_UNUSED(node_local_data_size);
mtapi_status_t status;
mtapi_uint_t this_instance, num_instances;
mtapi_uint_t* result;
num_instances = mtapi_context_numinst_get(task_context, &status);
this_instance = mtapi_context_instnum_get(task_context, &status);
/* check result buffer size... */
if (result_buffer_size == sizeof(int) * num_instances) {
/* ... and cast the result buffer */
result = reinterpret_cast<mtapi_uint_t*>(result_buffer);
} else {
mtapi_context_status_set(task_context, MTAPI_ERR_RESULT_SIZE, &status);
MTAPI_CHECK_STATUS(status);
return;
}
/* dummy for calculating result */
result[this_instance] = this_instance;
}
static void testDoSomethingElse() {
}
......@@ -151,7 +189,7 @@ void TaskTest::TestBasic() {
for (ii = 0; ii < 100; ii++) {
status = MTAPI_ERR_UNKNOWN;
mtapi_task_wait(task[ii], 100, &status);
mtapi_task_wait(task[ii], 100000, &status);
MTAPI_CHECK_STATUS(status);
}
......@@ -160,10 +198,66 @@ void TaskTest::TestBasic() {
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_action_hndl_t multiinstance_action = mtapi_action_create(
JOB_TEST_MULTIINSTANCE_TASK,
testMultiInstanceTaskAction,
MTAPI_NULL,
0,
&action_attr,
&status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_job_hndl_t multiinstance_job = mtapi_job_get(
JOB_TEST_MULTIINSTANCE_TASK, THIS_DOMAIN_ID, &status);
MTAPI_CHECK_STATUS(status);
mtapi_task_attributes_t task_attr;
status = MTAPI_ERR_UNKNOWN;
mtapi_taskattr_init(&task_attr, &status);
MTAPI_CHECK_STATUS(status);
const int kTaskInstances = 5;
status = MTAPI_ERR_UNKNOWN;
mtapi_taskattr_set(&task_attr, MTAPI_TASK_INSTANCES,
MTAPI_ATTRIBUTE_VALUE(kTaskInstances), MTAPI_ATTRIBUTE_POINTER_AS_VALUE,
&status);
MTAPI_CHECK_STATUS(status);
mtapi_uint_t result[kTaskInstances];
for (mtapi_uint_t ii = 0; ii < kTaskInstances; ii++) {
result[ii] = kTaskInstances + 1;
}
status = MTAPI_ERR_UNKNOWN;
mtapi_task_hndl_t multiinstance_task =
mtapi_task_start(MTAPI_TASK_ID_NONE, multiinstance_job,
MTAPI_NULL, 0,
&result[0], sizeof(mtapi_uint_t) * kTaskInstances,
&task_attr,
MTAPI_GROUP_NONE,
&status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_task_wait(multiinstance_task, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
for (mtapi_uint_t ii = 0; ii < kTaskInstances; ii++) {
PT_EXPECT_EQ(result[ii], ii);
}
status = MTAPI_ERR_UNKNOWN;
mtapi_action_delete(multiinstance_action, 10, &status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_finalize(&status);
MTAPI_CHECK_STATUS(status);
PT_EXPECT(embb_get_bytes_allocated() == 0);
PT_EXPECT_EQ(embb_get_bytes_allocated(), 0u);
embb_mtapi_log_info("...done\n\n");
}
......@@ -25,6 +25,7 @@
*/
#include <partest/partest.h>
#include <embb/base/c/thread.h>
#include <stdio.h>
......@@ -39,11 +40,12 @@
PT_MAIN("MTAPI C") {
embb_log_set_log_level(EMBB_LOG_LEVEL_NONE);
embb_thread_set_max_count(1024);
PT_RUN(TaskTest);
PT_RUN(PluginTest);
PT_RUN(ErrorTest);
PT_RUN(InitFinalizeTest);
PT_RUN(TaskTest);
PT_RUN(GroupTest);
PT_RUN(QueueTest);
}
create_tarball.sh diff
#!/usr/bin/env bash
#!/bin/sh
# Copyright (c) 2014-2015, Siemens AG. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -26,17 +26,20 @@
# Needs to be located in the folder containing the tests!!
# Is copied automatically there when generating build files with cmake.
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
"$DIR/embb_base_c_test.exe"
echo
"$DIR/embb_base_cpp_test.exe"
echo
"$DIR/embb_mtapi_c_test.exe"
echo
"$DIR/embb_mtapi_cpp_test.exe"
echo
"$DIR/embb_algorithms_cpp_test.exe"
echo
"$DIR/embb_containers_cpp_test.exe"
echo
"$DIR/embb_dataflow_cpp_test.exe"
SCRIPT_LOCATION="$0"
# case we have symlinks...
while [ -h "$SCRIPT_LOCATION" ] ; do
SCRIPT_LOCATION=`readlink "$SCRIPT_LOCATION"`
done
DIR=`dirname "$SCRIPT_LOCATION"`
TESTS="embb_base_c_test embb_base_cpp_test embb_mtapi_c_test \
embb_mtapi_cpp_test embb_algorithms_cpp_test \
embb_containers_cpp_test embb_dataflow_cpp_test"
for TEST in $TESTS; do
"$DIR/$TEST".exe;
done
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment