Commit c015aa4c by Marcus Winter

Merge branch 'development' into embb517_mutex_based_atomics

# Conflicts:
#	base_c/include/embb/base/c/internal/cmake_config.h.in
parents 64f0d2c0 1fe6f342
Embedded Multicore Building Blocks (EMB²)
Embedded Multicore Building Blocks (EMB²)
=========================================
......
......@@ -81,6 +81,8 @@ unsigned int embb_core_count_available();
* The second parameter specifies whether the set is initially empty or contains
* all cores.
*
* \pre \c core_set is not NULL.
*
* \notthreadsafe
*/
void embb_core_set_init(
......@@ -96,6 +98,9 @@ void embb_core_set_init(
*
* If the core is already contained in the set, the operation has no effect.
*
* \pre \c core_set is not NULL and \c core_number is smaller than
* embb_core_count_available().
*
* \notthreadsafe
* \see embb_core_set_remove()
*/
......@@ -107,13 +112,16 @@ void embb_core_set_add(
);
/**
* Removes a core from the specified set.
*
* If the core is not in the set, the operation has no effect.
*
* \notthreadsafe
* \see embb_core_set_add()
*/
* Removes a core from the specified set.
*
* If the core is not in the set, the operation has no effect.
*
* \pre \c core_set is not NULL and \c core_number is smaller than
* embb_core_count_available().
*
* \notthreadsafe
* \see embb_core_set_add()
*/
void embb_core_set_remove(
embb_core_set_t* core_set,
/**< [IN/OUT] Core set to be manipulated */
......@@ -124,8 +132,11 @@ void embb_core_set_remove(
/**
* Determines whether a core is contained in the specified set.
*
* \return 0 if the core is not contained in the set, otherwise a number greater
* than zero.
* \pre \c core_set is not NULL and \c core_number is smaller than
* embb_core_count_available().
*
* \return 0 if the core is not contained in the set, otherwise a number
* greater than zero.
* \notthreadsafe
*/
int embb_core_set_contains(
......@@ -140,6 +151,8 @@ int embb_core_set_contains(
*
* The result is stored in \c set1.
*
* \pre \c set1 and \c set2 are not NULL.
*
* \notthreadsafe
* \see embb_core_set_union()
*/
......@@ -155,6 +168,8 @@ void embb_core_set_intersection(
*
* The result is stored in \c set1.
*
* \pre \c set1 and \c set2 are not NULL.
*
* \notthreadsafe
* \see embb_core_set_intersection()
*/
......@@ -168,6 +183,8 @@ void embb_core_set_union(
/**
* Returns the number of cores contained in the specified set.
*
* \pre \c core_set is not NULL.
*
* \notthreadsafe
* \return Number of cores in \c core_set
*/
......
......@@ -67,6 +67,8 @@ int embb_counter_init(
/**
* Returns the current value of \c counter.
*
* \pre \c counter is not NULL.
*
* \return Current value
*
* \waitfree
......@@ -79,6 +81,8 @@ unsigned int embb_counter_get(
/**
* Increments \c counter and returns the old value.
*
* \pre \c counter is not NULL.
*
* \return Old, non-incremented value
* \waitfree
*/
......@@ -90,6 +94,8 @@ unsigned int embb_counter_increment(
/**
* Decrements \c counter and returns the old value.
*
* \pre \c counter is not NULL.
*
* \return Old, non-decremented value
* \waitfree
*/
......@@ -101,8 +107,8 @@ unsigned int embb_counter_decrement(
/**
* Destroys an initialized counter.
*
* \pre Counter is initialized
* \post Counter is invalid and cannot be used anymore
* \pre \c counter is initialized and not NULL.
* \post \c counter is invalid and cannot be used anymore
* \waitfree
*/
void embb_counter_destroy(
......
......@@ -249,6 +249,8 @@ int embb_duration_as_seconds(
/**
* Compares two durations.
*
* \pre \c lhs and \c rhs are not NULL and properly initialized.
*
* \return -1 if \c lhs < \c rhs \n
* 0 if \c lhs == \c rhs \n
* 1 if \c lhs > \c rhs
......
......@@ -58,4 +58,12 @@
*/
#cmakedefine EMBB_THREADING_ANALYSIS_MODE
/**
* Version defines.
*/
#define EMBB_BASE_VERSION_MAJOR ${EMBB_BASE_VERSION_MAJOR}
#define EMBB_BASE_VERSION_MINOR ${EMBB_BASE_VERSION_MINOR}
#define EMBB_BASE_VERSION_PATCH ${EMBB_BASE_VERSION_PATCH}
#endif /* EMBB_BASE_INTERNAL_CMAKE_CONFIG_H_ */
......@@ -67,6 +67,7 @@ typedef void(*embb_log_function_t)(void * context, char const * message);
/**
* Default logging function.
* Writes to the given file (context needs to be a FILE*).
* \pre \c context is not NULL.
* \ingroup C_LOG
* \threadsafe
*/
......
......@@ -73,6 +73,8 @@ extern "C" {
*
* Keeps track of freed memory in debug mode.
*
* \pre \c ptr is not NULL.
*
* \threadsafe
*
* \see embb_get_bytes_allocated()
......@@ -161,6 +163,8 @@ extern "C" {
*
* Keeps track of freed memory in debug mode.
*
* \pre \c ptr is not NULL and was allocated by an aligned method.
*
* \threadsafe
*
* \see embb_alloc_aligned(), embb_alloc_cache_aligned(),
......
......@@ -150,7 +150,7 @@ int embb_mutex_unlock(
/**
* Destroys a mutex and frees its resources.
*
* \pre \c mutex has been initialized
* \pre \c mutex is initialized and is not NULL.
* \post \c mutex is uninitialized
* \notthreadsafe
* \see embb_mutex_init()
......@@ -233,7 +233,7 @@ int embb_spin_unlock(
/**
* Destroys a spinlock and frees its resources.
*
* \pre \c spinlock has been initialized
* \pre \c spinlock is initialized and is not NULL.
* \post \c spinlock is uninitialized
* \notthreadsafe
* \see embb_spin_init()
......
......@@ -112,7 +112,7 @@ void* embb_tss_get(
*
* Does not delete the values pointed to.
*
* \pre TSS has been created successfully
* \pre \c tss has been created successfully and is not NULL.
* \post All slots are deleted
* \notthreadsafe
* \see embb_tss_create()
......
......@@ -89,6 +89,8 @@ int embb_time_in(
/**
* Compares two time points.
*
* \pre \c lhs and \c rhs are not NULL and properly initialized.
*
* \return -1 if \c lhs < \c rhs \n
* 0 if \c lhs == \c rhs \n
* 1 if \c lhs > \c rhs
......
......@@ -114,7 +114,7 @@ define_or_assign @embb_internal__atomic_or_assign_1_asm@8, byte, dl
define_store macro name, size, value
public name
name proc
mov size ptr [ecx], value
xchg size ptr [ecx], value
ret
name endp
endm
......
......@@ -107,7 +107,7 @@ define_or_assign embb_internal__atomic_or_assign_1_asm, byte, dl
define_store macro name, size, value
public name
name proc
mov size ptr [rcx], value
xchg size ptr [rcx], value
ret
name endp
endm
......
......@@ -33,8 +33,9 @@
int embb_condition_wait_for(embb_condition_t* condition_var,
embb_mutex_t* mutex,
const embb_duration_t* duration) {
assert(condition_var != NULL);
assert(mutex != NULL);
if (condition_var == NULL || mutex == NULL) {
return EMBB_ERROR;
}
embb_time_t time;
int status = embb_time_in(&time, duration);
if (status != EMBB_SUCCESS) {
......@@ -46,27 +47,34 @@ int embb_condition_wait_for(embb_condition_t* condition_var,
#ifdef EMBB_PLATFORM_THREADING_WINTHREADS
int embb_condition_init(embb_condition_t* condition_var) {
assert(condition_var != NULL);
if (condition_var == NULL) {
return EMBB_ERROR;
}
InitializeConditionVariable(condition_var);
return EMBB_SUCCESS;
}
int embb_condition_notify_one(embb_condition_t* condition_var) {
assert(condition_var != NULL);
if (condition_var == NULL) {
return EMBB_ERROR;
}
WakeConditionVariable(condition_var);
return EMBB_SUCCESS;
}
int embb_condition_notify_all(embb_condition_t* condition_var) {
assert(condition_var != NULL);
if (condition_var == NULL) {
return EMBB_ERROR;
}
WakeAllConditionVariable(condition_var);
return EMBB_SUCCESS;
}
int embb_condition_wait(embb_condition_t* condition_var,
embb_mutex_t* mutex) {
assert(condition_var != NULL);
assert(mutex != NULL);
if (condition_var == NULL || mutex == NULL) {
return EMBB_ERROR;
}
if (SleepConditionVariableCS(condition_var, mutex, INFINITE)) {
return EMBB_SUCCESS;
}
......@@ -75,9 +83,9 @@ int embb_condition_wait(embb_condition_t* condition_var,
int embb_condition_wait_until(embb_condition_t* condition_var,
embb_mutex_t* mutex, const embb_time_t* time) {
assert(condition_var != NULL);
assert(mutex != NULL);
assert(time != NULL);
if (condition_var == NULL || mutex == NULL || time == NULL) {
return EMBB_ERROR;
}
/* The Windows API needs a time duration, so we need to convert the given time
by using the time now. */
embb_time_t now;
......@@ -103,7 +111,9 @@ int embb_condition_wait_until(embb_condition_t* condition_var,
}
int embb_condition_destroy(embb_condition_t* condition_var) {
assert(condition_var != NULL);
if (condition_var == NULL) {
return EMBB_ERROR;
}
EMBB_UNUSED_IN_RELEASE(condition_var);
return EMBB_SUCCESS;
}
......@@ -113,35 +123,42 @@ int embb_condition_destroy(embb_condition_t* condition_var) {
#ifdef EMBB_PLATFORM_THREADING_POSIXTHREADS
int embb_condition_init(embb_condition_t* condition_var) {
assert(condition_var != NULL);
if (condition_var == NULL) {
return EMBB_ERROR;
}
int result = pthread_cond_init(condition_var, NULL);
return result == 0 ? EMBB_SUCCESS : EMBB_ERROR;
}
int embb_condition_notify_one(embb_condition_t* condition_var) {
assert(condition_var != NULL);
if (condition_var == NULL) {
return EMBB_ERROR;
}
int result = pthread_cond_signal(condition_var);
return result == 0 ? EMBB_SUCCESS : EMBB_ERROR;
}
int embb_condition_notify_all(embb_condition_t* condition_var) {
assert(condition_var != NULL);
if (condition_var == NULL) {
return EMBB_ERROR;
}
int result = pthread_cond_broadcast(condition_var);
return result == 0 ? EMBB_SUCCESS : EMBB_ERROR;
}
int embb_condition_wait(embb_condition_t* condition_var, embb_mutex_t* mutex) {
assert(condition_var != NULL);
assert(mutex != NULL);
if (condition_var == NULL || mutex == NULL) {
return EMBB_ERROR;
}
int result = pthread_cond_wait(condition_var, mutex);
return result == 0 ? EMBB_SUCCESS : EMBB_ERROR;
}
int embb_condition_wait_until(embb_condition_t* condition_var,
embb_mutex_t* mutex, const embb_time_t* time) {
assert(condition_var != NULL);
assert(mutex != NULL);
assert(time != NULL);
if (condition_var == NULL || mutex == NULL || time == NULL) {
return EMBB_ERROR;
}
/* Convert EMBB time to Unix time format */
struct timespec unix_time;
unix_time.tv_sec = time->seconds;
......@@ -157,7 +174,9 @@ int embb_condition_wait_until(embb_condition_t* condition_var,
}
int embb_condition_destroy(embb_condition_t* condition_var) {
assert(condition_var != NULL);
if (condition_var == NULL) {
return EMBB_ERROR;
}
int status = pthread_cond_destroy(condition_var);
if (status != 0) {
return EMBB_ERROR;
......
......@@ -146,13 +146,18 @@ int embb_core_set_contains(const embb_core_set_t* core_set,
void embb_core_set_intersection(embb_core_set_t* set1,
const embb_core_set_t* set2) {
assert(set1 != NULL);
assert(set2 != NULL);
embb_bitset_intersect(&set1->rep, set2->rep);
}
void embb_core_set_union(embb_core_set_t* set1, const embb_core_set_t* set2) {
assert(set1 != NULL);
assert(set2 != NULL);
embb_bitset_union(&set1->rep, set2->rep);
}
unsigned int embb_core_set_count(const embb_core_set_t* core_set) {
assert(core_set != NULL);
return embb_bitset_count(&core_set->rep);
}
......@@ -30,7 +30,9 @@
#include <assert.h>
int embb_counter_init(embb_counter_t* counter) {
assert(counter != NULL);
if (counter == NULL) {
return EMBB_ERROR;
}
embb_atomic_store_unsigned_int(&(counter->value), 0);
return EMBB_SUCCESS;
}
......
......@@ -46,13 +46,19 @@ const embb_duration_t* embb_duration_zero() {
int embb_duration_set_nanoseconds(embb_duration_t* duration,
unsigned long long nanoseconds) {
assert(duration != NULL);
if (duration == NULL) {
return EMBB_ERROR;
}
if (nanoseconds > 0) {
if (embb_duration_min()->nanoseconds > nanoseconds) {
duration->seconds = 0;
duration->nanoseconds = 0;
return EMBB_UNDERFLOW;
}
const embb_duration_t* max = embb_duration_max();
if (max->seconds * 1000000000 + max->nanoseconds < nanoseconds) {
duration->seconds = max->seconds;
duration->nanoseconds = max->nanoseconds;
return EMBB_OVERFLOW;
}
}
......@@ -63,13 +69,19 @@ int embb_duration_set_nanoseconds(embb_duration_t* duration,
int embb_duration_set_microseconds(embb_duration_t* duration,
unsigned long long microseconds) {
assert(duration != NULL);
if (duration == NULL) {
return EMBB_ERROR;
}
if (microseconds > 0) {
if (embb_duration_min()->nanoseconds > microseconds*1000) {
duration->seconds = 0;
duration->nanoseconds = 0;
return EMBB_UNDERFLOW;
}
const embb_duration_t* max = embb_duration_max();
if (max->seconds * 1000000 + max->nanoseconds / 1000 < microseconds) {
duration->seconds = max->seconds;
duration->nanoseconds = max->nanoseconds;
return EMBB_OVERFLOW;
}
}
......@@ -80,13 +92,19 @@ int embb_duration_set_microseconds(embb_duration_t* duration,
int embb_duration_set_milliseconds(embb_duration_t* duration,
unsigned long long milliseconds) {
assert(duration != NULL);
if (duration == NULL) {
return EMBB_ERROR;
}
if (milliseconds > 0) {
if (embb_duration_min()->nanoseconds > milliseconds*1000000) {
duration->seconds = 0;
duration->nanoseconds = 0;
return EMBB_UNDERFLOW;
}
const embb_duration_t* max = embb_duration_max();
if (max->seconds * 1000 + max->nanoseconds / 1000000 < milliseconds) {
duration->seconds = max->seconds;
duration->nanoseconds = max->nanoseconds;
return EMBB_OVERFLOW;
}
}
......@@ -97,13 +115,19 @@ int embb_duration_set_milliseconds(embb_duration_t* duration,
int embb_duration_set_seconds(embb_duration_t* duration,
unsigned long long seconds) {
assert(duration != NULL);
if (duration == NULL) {
return EMBB_ERROR;
}
if (seconds > 0) {
if (embb_duration_min()->nanoseconds > seconds*1000000000) {
duration->seconds = 0;
duration->nanoseconds = 0;
return EMBB_UNDERFLOW;
}
const embb_duration_t* max = embb_duration_max();
if (max->seconds + max->nanoseconds / 1000000000 < seconds) {
duration->seconds = max->seconds;
duration->nanoseconds = max->nanoseconds;
return EMBB_OVERFLOW;
}
}
......@@ -113,10 +137,13 @@ int embb_duration_set_seconds(embb_duration_t* duration,
}
int embb_duration_add(embb_duration_t* lhs, const embb_duration_t* rhs) {
assert(lhs != NULL);
assert(rhs != NULL);
if (lhs == NULL || rhs == NULL) {
return EMBB_ERROR;
}
int carry = (int)((lhs->nanoseconds + rhs->nanoseconds) / 1000000000);
if (lhs->seconds + rhs->seconds + carry > EMBB_DURATION_MAX_SECONDS) {
lhs->seconds = 0;
lhs->nanoseconds = 0;
return EMBB_OVERFLOW;
}
lhs->nanoseconds = (lhs->nanoseconds + rhs->nanoseconds) % 1000000000;
......@@ -126,8 +153,9 @@ int embb_duration_add(embb_duration_t* lhs, const embb_duration_t* rhs) {
int embb_duration_as_nanoseconds(const embb_duration_t* duration,
unsigned long long* nanoseconds) {
assert(duration != NULL);
assert(nanoseconds != NULL);
if (duration == NULL || nanoseconds == NULL) {
return EMBB_ERROR;
}
if (duration->seconds*1000000000 + duration->nanoseconds > ULLONG_MAX) {
return EMBB_OVERFLOW;
}
......@@ -137,8 +165,9 @@ int embb_duration_as_nanoseconds(const embb_duration_t* duration,
int embb_duration_as_microseconds(const embb_duration_t* duration,
unsigned long long* microseconds) {
assert(duration != NULL);
assert(microseconds != NULL);
if (duration == NULL || microseconds == NULL) {
return EMBB_ERROR;
}
if (duration->nanoseconds % 1000 > 0) {
return EMBB_UNDERFLOW;
}
......@@ -151,8 +180,9 @@ int embb_duration_as_microseconds(const embb_duration_t* duration,
int embb_duration_as_milliseconds(const embb_duration_t* duration,
unsigned long long* milliseconds) {
assert(duration != NULL);
assert(milliseconds != NULL);
if (duration == NULL || milliseconds == NULL) {
return EMBB_ERROR;
}
if (duration->nanoseconds % 1000000 > 0) {
return EMBB_UNDERFLOW;
}
......@@ -165,12 +195,12 @@ int embb_duration_as_milliseconds(const embb_duration_t* duration,
int embb_duration_as_seconds(const embb_duration_t* duration,
unsigned long long* seconds) {
assert(duration != NULL);
assert(seconds != NULL);
if (duration == NULL || seconds == NULL) {
return EMBB_ERROR;
}
if (duration->nanoseconds % 1000000000 > 0) {
return EMBB_UNDERFLOW;
}
assert(duration->nanoseconds % 1000000000 == 0);
if (duration->seconds > ULLONG_MAX) {
return EMBB_OVERFLOW;
}
......@@ -180,11 +210,8 @@ int embb_duration_as_seconds(const embb_duration_t* duration,
int embb_duration_compare(const embb_duration_t* lhs,
const embb_duration_t* rhs) {
assert(lhs != NULL);
assert(rhs != NULL);
assert(lhs->nanoseconds < 1000000000);
assert(rhs->nanoseconds < 1000000000);
assert(lhs != NULL && rhs != NULL);
assert(lhs->nanoseconds < 1000000000 && rhs->nanoseconds < 1000000000);
if (lhs->seconds > rhs->seconds) {
return 1;
} else if (lhs->seconds < rhs->seconds) {
......
......@@ -27,6 +27,7 @@
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
#include <assert.h>
#include <embb/base/c/log.h>
......@@ -35,6 +36,7 @@
void embb_log_write_file(
void * context,
char const * message) {
assert(context != NULL);
FILE * ff = (FILE*)context;
fprintf(ff, "%s", message);
fflush(ff);
......@@ -90,7 +92,6 @@ void embb_log_write_internal(
case EMBB_LOG_LEVEL_NONE:
default:
log_level_str = " ";
break;
}
#if defined(EMBB_PLATFORM_COMPILER_MSVC)
......
......@@ -135,6 +135,7 @@ void* embb_alloc_aligned(size_t alignment, size_t size) {
void embb_free_aligned(void* ptr) {
assert(ptr != NULL);
size_t* ptr_conv = (size_t*)ptr;
// If embb_free_aligned is called, the memory block should have been allocated
......@@ -193,6 +194,8 @@ void *embb_alloc_aligned(size_t alignment, size_t size) {
}
void embb_free_aligned(void* ptr) {
assert(ptr != NULL);
#ifdef EMBB_PLATFORM_COMPILER_MSVC
_aligned_free(ptr);
#else
......
......@@ -33,6 +33,9 @@
#ifdef EMBB_PLATFORM_THREADING_WINTHREADS
int embb_mutex_init(embb_mutex_t* mutex, int type) {
if (NULL == mutex) {
return EMBB_ERROR;
}
/* Critical sections in Windows are always recursive */
InitializeCriticalSection(mutex);
EMBB_UNUSED(type);
......@@ -40,11 +43,17 @@ int embb_mutex_init(embb_mutex_t* mutex, int type) {
}
int embb_mutex_lock(embb_mutex_t* mutex) {
if (NULL == mutex) {
return EMBB_ERROR;
}
EnterCriticalSection(mutex);
return EMBB_SUCCESS;
}
int embb_mutex_try_lock(embb_mutex_t* mutex) {
if (NULL == mutex) {
return EMBB_ERROR;
}
BOOL success;
success = TryEnterCriticalSection(mutex);
if (success == FALSE) return EMBB_ERROR;
......@@ -52,11 +61,15 @@ int embb_mutex_try_lock(embb_mutex_t* mutex) {
}
int embb_mutex_unlock(embb_mutex_t* mutex) {
if (NULL == mutex) {
return EMBB_ERROR;
}
LeaveCriticalSection(mutex);
return EMBB_SUCCESS;
}
void embb_mutex_destroy(embb_mutex_t* mutex) {
assert(NULL != mutex);
DeleteCriticalSection(mutex);
}
......@@ -65,6 +78,9 @@ void embb_mutex_destroy(embb_mutex_t* mutex) {
#ifdef EMBB_PLATFORM_THREADING_POSIXTHREADS
int embb_mutex_init(embb_mutex_t* mutex, int type) {
if (NULL == mutex) {
return EMBB_ERROR;
}
if (type == EMBB_MUTEX_PLAIN) {
if (pthread_mutex_init(mutex, NULL) != 0) return EMBB_ERROR;
} else {
......@@ -85,6 +101,9 @@ int embb_mutex_init(embb_mutex_t* mutex, int type) {
}
int embb_mutex_lock(embb_mutex_t* mutex) {
if (NULL == mutex) {
return EMBB_ERROR;
}
int result = pthread_mutex_lock(mutex);
if (result != 0) {
return EMBB_ERROR;
......@@ -93,6 +112,9 @@ int embb_mutex_lock(embb_mutex_t* mutex) {
}
int embb_mutex_try_lock(embb_mutex_t* mutex) {
if (NULL == mutex) {
return EMBB_ERROR;
}
int result = pthread_mutex_trylock(mutex);
if (result == 0) {
return EMBB_SUCCESS;
......@@ -104,6 +126,9 @@ int embb_mutex_try_lock(embb_mutex_t* mutex) {
}
int embb_mutex_unlock(embb_mutex_t* mutex) {
if (NULL == mutex) {
return EMBB_ERROR;
}
int result = pthread_mutex_unlock(mutex);
if (result != 0) {
return EMBB_ERROR;
......@@ -112,12 +137,16 @@ int embb_mutex_unlock(embb_mutex_t* mutex) {
}
void embb_mutex_destroy(embb_mutex_t* mutex) {
assert(NULL != mutex);
pthread_mutex_destroy(mutex);
}
#endif /* EMBB_PLATFORM_THREADING_POSIXTHREADS */
int embb_spin_init(embb_spinlock_t* spinlock) {
if (NULL == spinlock) {
return EMBB_ERROR;
}
// For now, store the initial value. In the future will use atomic init
// function (as soon as available).
embb_atomic_store_int(&spinlock->atomic_spin_variable_, 0);
......@@ -125,6 +154,9 @@ int embb_spin_init(embb_spinlock_t* spinlock) {
}
int embb_spin_lock(embb_spinlock_t* spinlock) {
if (NULL == spinlock) {
return EMBB_ERROR;
}
int expected = 0;
int spins = 1;
......@@ -143,6 +175,9 @@ int embb_spin_lock(embb_spinlock_t* spinlock) {
int embb_spin_try_lock(embb_spinlock_t* spinlock,
unsigned int max_number_spins) {
if (NULL == spinlock) {
return EMBB_ERROR;
}
if (max_number_spins == 0)
return EMBB_BUSY;
......@@ -161,6 +196,9 @@ int embb_spin_try_lock(embb_spinlock_t* spinlock,
}
int embb_spin_unlock(embb_spinlock_t* spinlock) {
if (NULL == spinlock) {
return EMBB_ERROR;
}
int expected = 1;
return embb_atomic_compare_and_swap_int(&spinlock->atomic_spin_variable_,
&expected, 0) ?
......@@ -168,6 +206,7 @@ int embb_spin_unlock(embb_spinlock_t* spinlock) {
}
void embb_spin_destroy(embb_spinlock_t* spinlock) {
assert(NULL != spinlock);
// for now, doing nothing here... in future, will call the respective
// destroy function for atomics...
EMBB_UNUSED(spinlock);
......
......@@ -80,10 +80,15 @@ void embb_thread_yield() {
int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
embb_thread_start_t func, void *arg) {
assert(thread != NULL);
if (thread == NULL) {
return EMBB_ERROR;
}
thread->embb_internal_arg = (embb_internal_thread_arg_t*)
embb_alloc(sizeof(embb_internal_thread_arg_t));
if (thread->embb_internal_arg == NULL) return EMBB_NOMEM;
if (thread->embb_internal_arg == NULL) {
thread->embb_internal_handle = NULL;
return EMBB_NOMEM;
}
thread->embb_internal_arg->func = func;
thread->embb_internal_arg->arg = arg;
......@@ -95,6 +100,8 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
0, /* no creation arguments */
0); /* no system thread ID */
if (thread->embb_internal_handle == NULL) {
embb_free(thread->embb_internal_arg);
thread->embb_internal_arg = NULL;
return EMBB_ERROR;
}
......@@ -118,6 +125,9 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
}
int embb_thread_join(embb_thread_t* thread, int* result_code) {
if (thread == NULL) {
return EMBB_ERROR;
}
BOOL success;
DWORD result;
result = WaitForSingleObject(thread->embb_internal_handle, INFINITE);
......@@ -143,6 +153,9 @@ int embb_thread_join(embb_thread_t* thread, int* result_code) {
}
int embb_thread_equal(const embb_thread_t* lhs, const embb_thread_t* rhs) {
if (lhs == NULL || rhs == NULL) {
return 0;
}
embb_thread_id_t idLhs = GetThreadId(lhs->embb_internal_handle);
embb_thread_id_t idRhs = GetThreadId(rhs->embb_internal_handle);
if (idLhs == idRhs) {
......@@ -203,6 +216,9 @@ void embb_thread_yield() {
int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
embb_thread_start_t func, void* arg) {
if (thread == NULL) {
return EMBB_ERROR;
}
pthread_attr_t attr; /* Used to set thread affinities */
int status = pthread_attr_init(&attr);
if (status != 0) return EMBB_ERROR;
......@@ -223,7 +239,11 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
}
}
status = pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset);
if (status != 0) return EMBB_ERROR;
if (status != 0) {
thread->embb_internal_arg = NULL;
thread->embb_internal_handle = 0;
return EMBB_ERROR;
}
#else
embb_log_write("base_c", EMBB_LOG_LEVEL_WARNING, "Could not set thread "
"affinity, since no implementation available!\n");
......@@ -233,6 +253,11 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
/* Dynamic allocation of thread arguments. Freed on call of join. */
thread->embb_internal_arg = (embb_internal_thread_arg_t*)
embb_alloc(sizeof(embb_internal_thread_arg_t));
if (thread->embb_internal_arg == NULL) {
thread->embb_internal_handle = 0;
pthread_attr_destroy(&attr);
return EMBB_NOMEM;
}
thread->embb_internal_arg->func = func;
thread->embb_internal_arg->arg = arg;
......@@ -250,12 +275,17 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
}
int embb_thread_join(embb_thread_t* thread, int *result_code) {
if (thread == NULL) {
return EMBB_ERROR;
}
int status = 0;
status = pthread_join(thread->embb_internal_handle, NULL);
if (result_code != NULL) {
*result_code = thread->embb_internal_arg->result;
if (thread->embb_internal_arg != NULL) {
if (result_code != NULL) {
*result_code = thread->embb_internal_arg->result;
}
embb_free(thread->embb_internal_arg);
}
embb_free(thread->embb_internal_arg);
if (status != 0) {
return EMBB_ERROR;
}
......@@ -263,6 +293,9 @@ int embb_thread_join(embb_thread_t* thread, int *result_code) {
}
int embb_thread_equal(const embb_thread_t* lhs, const embb_thread_t* rhs) {
if (lhs == NULL || rhs == NULL) {
return 0;
}
return pthread_equal(lhs->embb_internal_handle, rhs->embb_internal_handle);
}
......
......@@ -32,7 +32,9 @@
#include <assert.h>
int embb_tss_create(embb_tss_t* tss) {
assert(tss != NULL);
if (tss == NULL) {
return EMBB_ERROR;
}
tss->size = embb_thread_get_max_count();
tss->values = (void**) embb_alloc_cache_aligned(tss->size * sizeof(void*));
if (tss->values == NULL) {
......@@ -45,7 +47,9 @@ int embb_tss_create(embb_tss_t* tss) {
}
int embb_tss_set(embb_tss_t* tss, void* value) {
assert(tss != NULL);
if (tss == NULL) {
return EMBB_ERROR;
}
unsigned int index = 0;
int status = embb_internal_thread_index(&index);
if ((status != EMBB_SUCCESS) || (index >= tss->size)) {
......@@ -56,8 +60,12 @@ int embb_tss_set(embb_tss_t* tss, void* value) {
}
void* embb_tss_get(const embb_tss_t* tss) {
assert(tss != NULL);
assert(tss->values != NULL);
if (tss == NULL) {
return NULL;
}
if (tss->values == NULL) {
return NULL;
}
unsigned int index = 0;
int status = embb_internal_thread_index(&index);
if ((status != EMBB_SUCCESS) || (index >= tss->size)) {
......@@ -68,5 +76,7 @@ void* embb_tss_get(const embb_tss_t* tss) {
void embb_tss_delete(embb_tss_t* tss) {
assert(tss != NULL);
embb_free_aligned(tss->values);
if (tss->values != NULL) {
embb_free_aligned(tss->values);
}
}
......@@ -33,11 +33,8 @@ void embb_time_now(embb_time_t* time) {
}
int embb_time_compare(const embb_time_t* lhs, const embb_time_t* rhs) {
assert(lhs != NULL);
assert(rhs != NULL);
assert(lhs->nanoseconds < 1000000000);
assert(rhs->nanoseconds < 1000000000);
assert(lhs != NULL && rhs != NULL);
assert(lhs->nanoseconds < 1000000000 && rhs->nanoseconds < 1000000000);
if (lhs->seconds > rhs->seconds) {
return 1;
} else if (lhs->seconds < rhs->seconds) {
......@@ -56,8 +53,9 @@ int embb_time_compare(const embb_time_t* lhs, const embb_time_t* rhs) {
#ifdef EMBB_PLATFORM_THREADING_WINTHREADS
int embb_time_in(embb_time_t* time, const embb_duration_t* duration) {
assert(time != NULL);
assert(duration != NULL);
if (time == NULL || duration == NULL) {
return EMBB_ERROR;
}
/* Get system time */
SYSTEMTIME system_time;
GetLocalTime(&system_time);
......@@ -87,8 +85,9 @@ int embb_time_in(embb_time_t* time, const embb_duration_t* duration) {
#ifdef EMBB_PLATFORM_THREADING_POSIXTHREADS
int embb_time_in(embb_time_t* time, const embb_duration_t* duration) {
assert(time != NULL);
assert(duration != NULL);
if (time == NULL || duration == NULL) {
return EMBB_ERROR;
}
struct timespec unix_time;
clock_gettime(CLOCK_REALTIME, &unix_time);
time->seconds = unix_time.tv_sec;
......
......@@ -152,6 +152,9 @@ void AllocTest::TestMixedAllocs() {
void* plain = NULL;
plain = embb_alloc(2);
PT_EXPECT_NE(plain, static_cast<void*>(NULL));
if (NULL == plain) {
return;
}
allocated = embb_get_bytes_allocated();
#ifdef EMBB_DEBUG
expected += 2 + 2*sizeof(size_t);
......@@ -162,6 +165,10 @@ void AllocTest::TestMixedAllocs() {
void* aligned = NULL;
aligned = embb_alloc_aligned(2*sizeof(void*), 2);
PT_EXPECT_NE(aligned, static_cast<void*>(NULL));
if (NULL == aligned) {
embb_free(plain);
return;
}
allocated = embb_get_bytes_allocated();
#ifdef EMBB_DEBUG
expected += (1 + 1) * 2 * sizeof(void*) + 3 * sizeof(size_t) - 1;
......@@ -172,6 +179,11 @@ void AllocTest::TestMixedAllocs() {
void* cache_aligned = NULL;
cache_aligned = embb_alloc_cache_aligned(2);
PT_EXPECT_NE(cache_aligned, static_cast<void*>(NULL));
if (NULL == cache_aligned) {
embb_free(plain);
embb_free_aligned(aligned);
return;
}
allocated = embb_get_bytes_allocated();
#ifdef EMBB_DEBUG
expected += (1 + 1) * EMBB_PLATFORM_CACHE_LINE_SIZE + 3 * sizeof(size_t) - 1;
......
......@@ -50,7 +50,11 @@ void ThreadSpecificStorageTest::Test() {
size_t rank = partest::TestSuite::GetCurrentThreadID();
void* value = embb_tss_get(&tss_);
if (value == NULL) {
int status = embb_tss_set(&tss_, new size_t(rank));
size_t * prank = new size_t(rank);
int status = embb_tss_set(&tss_, prank);
if (EMBB_SUCCESS != status) {
delete prank;
}
PT_EXPECT_EQ(status, EMBB_SUCCESS);
} else {
size_t stored_rank = *static_cast<size_t*>(value);
......
......@@ -32,7 +32,10 @@ namespace base {
namespace internal {
MutexBase::MutexBase(int mutex_type) : mutex_() {
embb_mutex_init(&mutex_, mutex_type);
int result = embb_mutex_init(&mutex_, mutex_type);
if (EMBB_SUCCESS != result) {
EMBB_THROW(ErrorException, "Could not initialize mutex.");
}
}
MutexBase::~MutexBase() {
......@@ -40,7 +43,10 @@ MutexBase::~MutexBase() {
}
void MutexBase::Lock() {
embb_mutex_lock(&mutex_);
int result = embb_mutex_lock(&mutex_);
if (EMBB_SUCCESS != result) {
EMBB_THROW(ErrorException, "Could not lock mutex.");
}
}
bool MutexBase::TryLock() {
......@@ -49,7 +55,10 @@ bool MutexBase::TryLock() {
}
void MutexBase::Unlock() {
embb_mutex_unlock(&mutex_);
int result = embb_mutex_unlock(&mutex_);
if (EMBB_SUCCESS != result) {
EMBB_THROW(ErrorException, "Could not unlock mutex.");
}
}
} // namespace internal
......
......@@ -173,10 +173,6 @@ AtomicTest::TestStressSwap::TestStressSwap(
size_t number_threads, size_t number_iterations)
: TestUnit("Swap Stress test for Atomics"), swap1_counter(1)
, swap2_counter(2) {
bitsets[0] = std::bitset<ATOMIC_TESTS_ITERATIONS * 2 + 1>();
bitsets[1] = std::bitset<ATOMIC_TESTS_ITERATIONS * 2 + 1>();
bitsets[2] = std::bitset<ATOMIC_TESTS_ITERATIONS * 2 + 1>();
PT_ASSERT(number_threads == 1);
Pre(&TestStressSwap::Init, this);
......
......@@ -28,7 +28,7 @@
#include <embb/base/c/internal/unused.h>
#include <embb/base/log.h>
#include <cstring>
#include <string>
namespace embb {
namespace base {
......@@ -38,7 +38,7 @@ LogTest::LogTest() {
CreateUnit("Test all").Add(&LogTest::Test, this);
}
static char const * logged_message;
static std::string logged_message;
static void test_log_function(void * context, char const * msg) {
EMBB_UNUSED(context);
......@@ -48,91 +48,90 @@ static void test_log_function(void * context, char const * msg) {
void LogTest::Test() {
using embb::base::Log;
char const * test_msg = "hello";
char const * null = 0;
Log::SetLogFunction(0, test_log_function);
Log::SetLogLevel(EMBB_LOG_LEVEL_TRACE);
logged_message = null;
logged_message = "none";
Log::Trace("chn", test_msg);
#ifdef EMBB_DEBUG
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [TRACE] hello"));
PT_EXPECT(logged_message == "[chn] - [TRACE] hello");
#else
PT_EXPECT_EQ(null, logged_message);
PT_EXPECT(logged_message == "none");
#endif
logged_message = null;
logged_message = "none";
Log::Info("chn", test_msg);
#ifdef EMBB_DEBUG
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [INFO ] hello"));
PT_EXPECT(logged_message == "[chn] - [INFO ] hello");
#else
PT_EXPECT_EQ(null, logged_message);
PT_EXPECT(logged_message == "none");
#endif
logged_message = null;
logged_message = "none";
Log::Warning("chn", test_msg);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [WARN ] hello"));
logged_message = null;
PT_EXPECT(logged_message == "[chn] - [WARN ] hello");
logged_message = "none";
Log::Error("chn", test_msg);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [ERROR] hello"));
PT_EXPECT(logged_message == "[chn] - [ERROR] hello");
Log::SetLogLevel(EMBB_LOG_LEVEL_INFO);
logged_message = null;
logged_message = "none";
Log::Trace("chn", test_msg);
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
PT_EXPECT(logged_message == "none");
logged_message = "none";
Log::Info("chn", test_msg);
#ifdef EMBB_DEBUG
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [INFO ] hello"));
PT_EXPECT(logged_message == "[chn] - [INFO ] hello");
#else
PT_EXPECT_EQ(null, logged_message);
PT_EXPECT(logged_message == "none");
#endif
logged_message = null;
logged_message = "none";
Log::Warning("chn", test_msg);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [WARN ] hello"));
logged_message = null;
PT_EXPECT(logged_message == "[chn] - [WARN ] hello");
logged_message = "none";
Log::Error("chn", test_msg);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [ERROR] hello"));
PT_EXPECT(logged_message == "[chn] - [ERROR] hello");
Log::SetLogLevel(EMBB_LOG_LEVEL_WARNING);
logged_message = null;
logged_message = "none";
Log::Trace("chn", test_msg);
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
PT_EXPECT(logged_message == "none");
logged_message = "none";
Log::Info("chn", test_msg);
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
PT_EXPECT(logged_message == "none");
logged_message = "none";
Log::Warning("chn", test_msg);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [WARN ] hello"));
logged_message = null;
PT_EXPECT(logged_message == "[chn] - [WARN ] hello");
logged_message = "none";
Log::Error("chn", test_msg);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [ERROR] hello"));
PT_EXPECT(logged_message == "[chn] - [ERROR] hello");
Log::SetLogLevel(EMBB_LOG_LEVEL_ERROR);
logged_message = null;
logged_message = "none";
Log::Trace("chn", test_msg);
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
PT_EXPECT(logged_message == "none");
logged_message = "none";
Log::Info("chn", test_msg);
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
PT_EXPECT(logged_message == "none");
logged_message = "none";
Log::Warning("chn", test_msg);
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
PT_EXPECT(logged_message == "none");
logged_message = "none";
Log::Error("chn", test_msg);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [ERROR] hello"));
PT_EXPECT(logged_message == "[chn] - [ERROR] hello");
Log::SetLogLevel(EMBB_LOG_LEVEL_NONE);
logged_message = null;
logged_message = "none";
Log::Trace("chn", test_msg);
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
PT_EXPECT(logged_message == "none");
logged_message = "none";
Log::Info("chn", test_msg);
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
PT_EXPECT(logged_message == "none");
logged_message = "none";
Log::Warning("chn", test_msg);
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
PT_EXPECT(logged_message == "none");
logged_message = "none";
Log::Error("chn", test_msg);
PT_EXPECT_EQ(null, logged_message);
PT_EXPECT(logged_message == "none");
}
} // namespace test
......
......@@ -123,8 +123,10 @@ bool LockFreeStack< Type, ValuePool >::TryPop(Type & element) {
top_cached = top;
// Stack empty, cannot pop
if (top_cached == NULL)
if (top_cached == NULL) {
element = Type();
return false;
}
// Guard top_cached
hazardPointer.Guard(0, top_cached);
......
......@@ -113,14 +113,17 @@ allocate_rec(int node, Type& element) {
int pool_index = NodeIndexToPoolIndex(node);
Type expected = pool_[pool_index];
if (expected == Undefined)
if (expected == Undefined) {
element = Type();
return -1;
}
if (pool_[pool_index].CompareAndSwap(expected, Undefined)) {
element = expected;
return pool_index;
}
element = Type();
return -1;
}
......@@ -133,8 +136,10 @@ allocate_rec(int node, Type& element) {
do {
current = tree_[node];
desired = current - 1;
if (desired < 0)
if (desired < 0) {
element = Type();
return -1;
}
} while (!tree_[node].CompareAndSwap(current, desired));
int leftResult = allocate_rec(GetLeftChildIndex(node), element);
......
......@@ -55,6 +55,7 @@ Allocate(Type & element) {
return i;
}
}
element = Type();
return -1;
}
......
......@@ -144,6 +144,7 @@ void HazardPointerTest::HazardPointerTest1ThreadMethod() {
for (int i = 0; i != n_elements_per_thread_; ++i) {
embb::base::Atomic<int>* allocated_object = object_pool_->Allocate(0);
PT_ASSERT(NULL != allocated_object);
hazard_pointer_->Guard(0, allocated_object);
......@@ -210,8 +211,9 @@ void HazardPointerTest2::DeletePointerCallback(int* to_delete) {
}
bool HazardPointerTest2::SetRelativeGuards() {
unsigned int thread_index;
embb_internal_thread_index(&thread_index);
unsigned int thread_index = 0;
int result = embb_internal_thread_index(&thread_index);
PT_ASSERT(EMBB_SUCCESS == result);
unsigned int my_begin = guards_per_phread_count_*thread_index;
int guard_number = 0;
......@@ -247,6 +249,7 @@ void HazardPointerTest2::HazardPointerTest2Master() {
// while the hazard pointer guard array is not full
int** allocatedLocal = static_cast<int**>(
embb::base::Allocation::Allocate(sizeof(int*)*guaranteed_capacity_pool_));
PT_ASSERT(NULL != allocatedLocal);
bool full = false;
while (!full) {
......@@ -294,16 +297,19 @@ void HazardPointerTest2::HazardPointerTest2Pre() {
// first the test pool has to be created
test_pool_ = embb::base::Allocation::New<IntObjectTestPool>
(pool_size_using_hazard_pointer_);
PT_ASSERT(NULL != test_pool_);
// after the pool has been created, we create the hp class
hazard_pointer_ = embb::base::Allocation::New <
embb::containers::internal::HazardPointer<int*> >
(delete_pointer_callback_, static_cast<int*>(NULL),
static_cast<int>(guards_per_phread_count_), n_threads);
PT_ASSERT(NULL != hazard_pointer_);
shared_guarded_ = static_cast<embb::base::Atomic<int*>*>(
embb::base::Allocation::Allocate(sizeof(embb::base::Atomic<int*>)*
guaranteed_capacity_pool_));
PT_ASSERT(NULL != shared_guarded_);
for (unsigned int i = 0; i != guaranteed_capacity_pool_; ++i) {
// in-place new for each array cell
......@@ -450,8 +456,9 @@ void HazardPointerTest2::HazardPointerTest2Post() {
void HazardPointerTest2::HazardPointerTest2ThreadMethod() {
for (;;) {
unsigned int thread_index;
embb_internal_thread_index(&thread_index);
unsigned int thread_index = 0;
int result = embb_internal_thread_index(&thread_index);
PT_ASSERT(EMBB_SUCCESS == result);
if (thread_index == current_master_) {
HazardPointerTest2Master();
......
......@@ -31,19 +31,11 @@ namespace embb {
namespace dataflow {
namespace internal {
class Scheduler;
class ClockListener;
struct InitData {
Scheduler * sched;
ClockListener * sink_listener;
};
class ClockListener {
public:
virtual ~ClockListener() {}
virtual void OnClock(int /*clock*/) = 0;
virtual void OnInit(InitData * /*sched*/) = 0;
virtual bool OnHasCycle(ClockListener * /*node*/) { return false; }
};
} // namespace internal
......
......@@ -35,18 +35,21 @@ namespace embb {
namespace dataflow {
namespace internal {
template <int Slices, typename Type>
template <typename Type>
class ConstantSource
: public Node {
public:
typedef Outputs<Slices, Type> OutputsType;
typedef Outputs<Type> OutputsType;
private:
OutputsType outputs_;
Type value_;
public:
explicit ConstantSource(Type value) : value_(value) {}
ConstantSource(Scheduler * sched, Type value)
: value_(value) {
SetScheduler(sched);
}
virtual bool HasOutputs() const {
return outputs_.Size() > 0;
......@@ -56,9 +59,8 @@ class ConstantSource
GetOutput<0>().Send(Signal<Type>(clock, value_));
}
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
GetOutput<0>().SendInit(init_data);
virtual bool IsFullyConnected() {
return outputs_.IsFullyConnected();
}
virtual bool Start(int clock) {
......
......@@ -41,54 +41,83 @@ namespace internal {
class Scheduler;
template <typename, int>
template <typename>
class Out;
template <typename Type, int Slices>
template <typename Type>
class In {
public:
typedef Signal<Type> SignalType;
In() : connected_(false) {}
In() : values_(NULL), connected_(false), slices_(0) {}
~In() {
if (NULL != values_) {
for (int ii = 0; ii < slices_; ii++) {
values_[ii].~SignalType();
}
embb::base::Allocation::Free(values_);
}
}
SignalType const & GetSignal(int clock) const {
return values_[clock % Slices];
return values_[clock % slices_];
}
Type GetValue(int clock) const {
SignalType const & signal = GetSignal(clock);
if (signal.IsBlank())
EMBB_THROW(embb::base::ErrorException,
"Signal is blank, cannot get a value.")
assert(!signal.IsBlank());
return signal.GetValue();
}
bool IsConnected() const { return connected_; }
void SetConnected() { connected_ = true; }
bool HasCycle(ClockListener * node) {
return listener_->OnHasCycle(node);
}
void SetSlices(int slices) {
if (0 < slices_) {
for (int ii = 0; ii < slices_; ii++) {
values_[ii].~SignalType();
}
embb::base::Allocation::Free(values_);
values_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
values_ = reinterpret_cast<SignalType*>(
embb::base::Allocation::Allocate(
sizeof(SignalType)*slices_));
for (int ii = 0; ii < slices_; ii++) {
new (&values_[ii]) SignalType();
}
}
}
void SetListener(ClockListener * listener) { listener_ = listener; }
void Clear(int clock) {
const int idx = clock % Slices;
const int idx = clock % slices_;
values_[idx].Clear();
}
friend class Out<Type, Slices>;
friend class Out<Type>;
private:
SignalType values_[Slices];
SignalType * values_;
ClockListener * listener_;
bool connected_;
int slices_;
#if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY
embb::base::Spinlock lock_;
std::vector<SignalType> history_;
#endif
void Receive(SignalType const & value) {
const int idx = value.GetClock() % Slices;
if (values_[idx].GetClock() >= value.GetClock())
EMBB_THROW(embb::base::ErrorException,
"Received signal does not increase clock.");
const int idx = value.GetClock() % slices_;
assert(values_[idx].GetClock() < value.GetClock());
values_[idx] = value;
listener_->OnClock(value.GetClock());
#if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY
......@@ -97,10 +126,6 @@ class In {
lock_.Unlock();
#endif
}
void ReceiveInit(InitData * init_data) {
listener_->OnInit(init_data);
}
};
} // namespace internal
......
......@@ -43,18 +43,28 @@ class Node {
virtual bool HasInputs() const { return false; }
virtual bool HasOutputs() const { return false; }
virtual void Run(int clock) = 0;
virtual bool IsFullyConnected() = 0;
virtual bool IsSequential() { return true; }
virtual bool HasCycle() { return false; }
virtual bool Start(int /*clock*/) {
EMBB_THROW(embb::base::ErrorException,
"Nodes are started implicitly.");
}
virtual void Init(InitData * init_data) = 0;
void SetScheduler(Scheduler * sched) {
sched_ = sched;
if (NULL != sched_) {
SetSlices(sched_->GetSlices());
} else {
SetSlices(0);
}
}
protected:
Scheduler * sched_;
static int next_process_id_;
void SetScheduler(Scheduler * sched) { sched_ = sched; }
static int GetNextProcessID() { return next_process_id_++; }
virtual void SetSlices(int /*slices*/) {}
};
} // namespace internal
......
......@@ -36,12 +36,13 @@ namespace dataflow {
namespace internal {
class Scheduler;
class ClockListener;
template <typename Type, int Slices>
template <typename Type>
class Out {
public:
typedef Signal<Type> SignalType;
typedef In<Type, Slices> InType;
typedef In<Type> InType;
Out() {
}
......@@ -52,12 +53,6 @@ class Out {
}
}
void SendInit(InitData * init_data) {
for (size_t ii = 0; ii < targets_.size(); ii++) {
targets_[ii]->ReceiveInit(init_data);
}
}
void Connect(InType & input) {
if (input.IsConnected()) {
EMBB_THROW(embb::base::ErrorException,
......@@ -72,6 +67,18 @@ class Out {
Connect(input);
}
bool IsConnected() const {
return targets_.size() > 0;
}
bool HasCycle(ClockListener * node) {
bool result = false;
for (size_t ii = 0; ii < targets_.size() && !result; ii++) {
result = result || targets_[ii]->HasCycle(node);
}
return result;
}
private:
std::vector< InType * > targets_;
};
......
......@@ -35,7 +35,6 @@ namespace dataflow {
namespace internal {
template <
int,
typename = embb::base::internal::Nil,
typename = embb::base::internal::Nil,
typename = embb::base::internal::Nil,
......@@ -43,54 +42,112 @@ template <
typename = embb::base::internal::Nil >
class Outputs;
template <int Slices>
class Outputs<Slices, embb::base::internal::Nil, embb::base::internal::Nil,
class ClockListener;
template <>
class Outputs<embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil>
: public Tuple<embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil> {
public:
bool IsFullyConnected() {
return true;
}
bool HasCycle(ClockListener * /*node*/) {
return false;
}
};
template <int Slices, typename T1>
class Outputs<Slices, T1, embb::base::internal::Nil, embb::base::internal::Nil,
template <typename T1>
class Outputs<T1, embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil>
: public Tuple<Out<T1, Slices>, embb::base::internal::Nil,
: public Tuple<Out<T1>, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil> {
public:
bool IsFullyConnected() {
return this->template Get<0>().IsConnected();
}
bool HasCycle(ClockListener * node) {
return this->template Get<0>().HasCycle(node);
}
};
template <int Slices, typename T1, typename T2>
class Outputs<Slices, T1, T2, embb::base::internal::Nil,
template <typename T1, typename T2>
class Outputs<T1, T2, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil>
: public Tuple<Out<T1, Slices>, Out<T2, Slices>, embb::base::internal::Nil,
: public Tuple<Out<T1>, Out<T2>, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil> {
public:
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected();
}
bool HasCycle(ClockListener * node) {
return this->template Get<0>().HasCycle(node) ||
this->template Get<1>().HasCycle(node);
}
};
template <int Slices, typename T1, typename T2, typename T3>
class Outputs<Slices, T1, T2, T3, embb::base::internal::Nil,
template <typename T1, typename T2, typename T3>
class Outputs<T1, T2, T3, embb::base::internal::Nil,
embb::base::internal::Nil>
: public Tuple<Out<T1, Slices>, Out<T2, Slices>, Out<T3, Slices>,
: public Tuple<Out<T1>, Out<T2>, Out<T3>,
embb::base::internal::Nil, embb::base::internal::Nil> {
public:
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected() &&
this->template Get<2>().IsConnected();
}
bool HasCycle(ClockListener * node) {
return this->template Get<0>().HasCycle(node) ||
this->template Get<1>().HasCycle(node) ||
this->template Get<2>().HasCycle(node);
}
};
template <int Slices, typename T1, typename T2, typename T3, typename T4>
class Outputs<Slices, T1, T2, T3, T4, embb::base::internal::Nil>
: public Tuple<Out<T1, Slices>, Out<T2, Slices>, Out<T3, Slices>,
Out<T4, Slices>, embb::base::internal::Nil>{
template <typename T1, typename T2, typename T3, typename T4>
class Outputs<T1, T2, T3, T4, embb::base::internal::Nil>
: public Tuple<Out<T1>, Out<T2>, Out<T3>,
Out<T4>, embb::base::internal::Nil>{
public:
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected() &&
this->template Get<2>().IsConnected() &&
this->template Get<3>().IsConnected();
}
bool HasCycle(ClockListener * node) {
return this->template Get<0>().HasCycle(node) ||
this->template Get<1>().HasCycle(node) ||
this->template Get<2>().HasCycle(node) ||
this->template Get<3>().HasCycle(node);
}
};
template <int Slices, typename T1, typename T2, typename T3, typename T4,
template <typename T1, typename T2, typename T3, typename T4,
typename T5>
class Outputs
: public Tuple<Out<T1, Slices>, Out<T2, Slices>, Out<T3, Slices>,
Out<T4, Slices>, Out<T5, Slices> > {
: public Tuple<Out<T1>, Out<T2>, Out<T3>,
Out<T4>, Out<T5> > {
public:
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected() &&
this->template Get<2>().IsConnected() &&
this->template Get<3>().IsConnected() &&
this->template Get<4>().IsConnected();
}
bool HasCycle(ClockListener * node) {
return this->template Get<0>().HasCycle(node) ||
this->template Get<1>().HasCycle(node) ||
this->template Get<2>().HasCycle(node) ||
this->template Get<3>().HasCycle(node) ||
this->template Get<4>().HasCycle(node);
}
};
} // namespace internal
......
......@@ -37,24 +37,27 @@ namespace embb {
namespace dataflow {
namespace internal {
template <int Slices, bool Serial, class INPUTS, class OUTPUTS> class Process;
template <bool Serial, class INPUTS, class OUTPUTS> class Process;
template <
int Slices, bool Serial,
bool Serial,
typename I1, typename I2, typename I3, typename I4, typename I5,
typename O1, typename O2, typename O3, typename O4, typename O5>
class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
Outputs<Slices, O1, O2, O3, O4, O5> >
class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
Outputs<O1, O2, O3, O4, O5> >
: public Node
, public ClockListener {
public:
typedef Inputs<Slices, I1, I2, I3, I4, I5> InputsType;
typedef Outputs<Slices, O1, O2, O3, O4, O5> OutputsType;
typedef Inputs<I1, I2, I3, I4, I5> InputsType;
typedef Outputs<O1, O2, O3, O4, O5> OutputsType;
typedef ProcessExecutor< InputsType, OutputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
explicit Process(FunctionType function)
: executor_(function) {
Process(Scheduler * sched, FunctionType function)
: inputs_()
, executor_(function)
, action_(NULL)
, slices_(0) {
next_clock_ = 0;
queued_clock_ = 0;
bool ordered = Serial;
......@@ -64,6 +67,13 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
queue_id_ = 0;
}
inputs_.SetListener(this);
SetScheduler(sched);
}
~Process() {
if (NULL != action_) {
embb::base::Allocation::Free(action_);
}
}
virtual bool HasInputs() const {
......@@ -78,9 +88,16 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
executor_.Execute(clock, inputs_, outputs_);
}
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
executor_.Init(init_data, outputs_);
virtual bool IsFullyConnected() {
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
}
virtual bool IsSequential() {
return Serial;
}
virtual bool HasCycle() {
return outputs_.HasCycle(this);
}
InputsType & GetInputs() {
......@@ -108,17 +125,14 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
}
virtual void OnClock(int clock) {
if (!inputs_.AreAtClock(clock)) {
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
}
assert(inputs_.AreAtClock(clock));
bool ordered = Serial;
if (ordered) {
bool retry = true;
while (retry) {
int clk = next_clock_;
int clk_end = clk + Slices;
int clk_end = clk + slices_;
int clk_res = clk;
for (int ii = clk; ii < clk_end; ii++) {
if (!inputs_.AreAtClock(ii)) {
......@@ -130,7 +144,7 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
if (next_clock_.CompareAndSwap(clk, clk_res)) {
while (queued_clock_.Load() < clk) continue;
for (int ii = clk; ii < clk_res; ii++) {
const int idx = ii % Slices;
const int idx = ii % slices_;
action_[idx] = Action(this, ii);
sched_->Enqueue(queue_id_, action_[idx]);
}
......@@ -142,24 +156,47 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
}
}
} else {
const int idx = clock % Slices;
const int idx = clock % slices_;
action_[idx] = Action(this, clock);
sched_->Spawn(action_[idx]);
}
}
virtual void OnInit(InitData * init_data) {
Init(init_data);
virtual bool OnHasCycle(ClockListener * node) {
ClockListener * this_node = this;
if (this_node == node) {
return true;
} else {
return outputs_.HasCycle(node);
}
}
private:
InputsType inputs_;
OutputsType outputs_;
ExecutorType executor_;
Action action_[Slices];
Action * action_;
embb::base::Atomic<int> next_clock_;
embb::base::Atomic<int> queued_clock_;
int queue_id_;
int slices_;
virtual void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(action_);
action_ = NULL;
}
slices_ = slices;
inputs_.SetSlices(slices);
if (0 < slices_) {
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
}
}
};
} // namespace internal
......
......@@ -40,6 +40,7 @@ class Scheduler {
virtual void Spawn(Action & action) = 0;
virtual void Enqueue(int process_id, Action & action) = 0;
virtual void WaitForSlice(int slice) = 0;
virtual int GetSlices() = 0;
};
} // namespace internal
......
......@@ -38,12 +38,24 @@ namespace embb {
namespace dataflow {
namespace internal {
template <int Slices>
class SchedulerMTAPI : public Scheduler {
public:
SchedulerMTAPI() {
explicit SchedulerMTAPI(int slices)
: slices_(slices) {
embb::tasks::Node & node = embb::tasks::Node::GetInstance();
for (int ii = 0; ii < Slices; ii++) {
int tl = std::min(
static_cast<int>(node.GetTaskLimit()),
static_cast<int>(node.GetGroupCount()));
if (tl < slices_) {
slices_ = tl;
}
group_ = reinterpret_cast<embb::tasks::Group**>(
embb::base::Allocation::Allocate(
sizeof(embb::tasks::Group*)*slices_));
for (int ii = 0; ii < slices_; ii++) {
embb::tasks::Group & group = node.CreateGroup();
group_[ii] = &group;
}
......@@ -61,22 +73,26 @@ class SchedulerMTAPI : public Scheduler {
}
}
virtual ~SchedulerMTAPI() {
embb::tasks::Node & node = embb::tasks::Node::GetInstance();
for (int ii = 0; ii < Slices; ii++) {
group_[ii]->WaitAll(MTAPI_INFINITE);
node.DestroyGroup(*group_[ii]);
}
for (int ii = 0; ii < queue_count_; ii++) {
node.DestroyQueue(*queue_[ii]);
if (embb::tasks::Node::IsInitialized()) {
// only destroy groups and queues if there still is an instance
embb::tasks::Node & node = embb::tasks::Node::GetInstance();
for (int ii = 0; ii < slices_; ii++) {
group_[ii]->WaitAll(MTAPI_INFINITE);
node.DestroyGroup(*group_[ii]);
}
for (int ii = 0; ii < queue_count_; ii++) {
node.DestroyQueue(*queue_[ii]);
}
}
embb::base::Allocation::Free(group_);
embb::base::Allocation::Free(queue_);
}
virtual void Spawn(Action & action) {
const int idx = action.GetClock() % Slices;
const int idx = action.GetClock() % slices_;
group_[idx]->Spawn(embb::base::MakeFunction(action, &Action::RunMTAPI));
}
virtual void Enqueue(int process_id, Action & action) {
const int idx = action.GetClock() % Slices;
const int idx = action.GetClock() % slices_;
const int queue_id = process_id % queue_count_;
queue_[queue_id]->Spawn(group_[idx],
embb::base::MakeFunction(action, &Action::RunMTAPI));
......@@ -84,11 +100,13 @@ class SchedulerMTAPI : public Scheduler {
virtual void WaitForSlice(int slice) {
group_[slice]->WaitAll(MTAPI_INFINITE);
}
virtual int GetSlices() { return slices_; }
private:
embb::tasks::Group * group_[Slices];
embb::tasks::Group ** group_;
embb::tasks::Queue ** queue_;
int queue_count_;
int slices_;
};
} // namespace internal
......
......@@ -45,6 +45,7 @@ class SchedulerSequential : public Scheduler {
action.RunSequential();
}
virtual void WaitForSlice(int /*slice*/) {}
virtual int GetSlices() { return 1; }
};
} // namespace internal
......
......@@ -27,7 +27,6 @@
#ifndef EMBB_DATAFLOW_INTERNAL_SELECT_H_
#define EMBB_DATAFLOW_INTERNAL_SELECT_H_
#include <embb/dataflow/internal/action.h>
#include <embb/dataflow/internal/signal.h>
#include <embb/dataflow/internal/node.h>
#include <embb/dataflow/internal/inputs.h>
......@@ -37,16 +36,17 @@ namespace embb {
namespace dataflow {
namespace internal {
template <int Slices, typename Type>
template <typename Type>
class Select
: public Node
, public ClockListener {
public:
typedef Inputs<Slices, bool, Type, Type> InputsType;
typedef Outputs<Slices, Type> OutputsType;
typedef Inputs<bool, Type, Type> InputsType;
typedef Outputs<Type> OutputsType;
Select() {
explicit Select(Scheduler * sched) : inputs_(), slices_(0) {
inputs_.SetListener(this);
SetScheduler(sched);
}
virtual bool HasInputs() const {
......@@ -81,9 +81,17 @@ class Select
}
}
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
GetOutput<0>().SendInit(init_data);
virtual bool IsFullyConnected() {
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
}
virtual bool OnHasCycle(ClockListener * node) {
ClockListener * this_node = this;
if (this_node == node) {
return true;
} else {
return outputs_.HasCycle(node);
}
}
InputsType & GetInputs() {
......@@ -112,20 +120,19 @@ class Select
virtual void OnClock(int clock) {
//const int idx = clock % Slices;
if (!inputs_.AreAtClock(clock))
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
assert(inputs_.AreAtClock(clock));
Run(clock);
}
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private:
InputsType inputs_;
OutputsType outputs_;
Action action_[Slices];
int slices_;
virtual void SetSlices(int slices) {
slices_ = slices;
inputs_.SetSlices(slices);
}
};
} // namespace internal
......
......@@ -36,29 +36,36 @@ namespace embb {
namespace dataflow {
namespace internal {
template <int Slices, class Inputs> class Sink;
template <class Inputs> class Sink;
template <
int Slices,
typename I1, typename I2, typename I3, typename I4, typename I5>
class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
class Sink< Inputs<I1, I2, I3, I4, I5> >
: public Node
, public ClockListener {
public:
typedef Inputs<Slices, I1, I2, I3, I4, I5> InputsType;
typedef Inputs<I1, I2, I3, I4, I5> InputsType;
typedef SinkExecutor< InputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
explicit Sink(FunctionType function)
: executor_(function) {
Sink(Scheduler * sched, ClockListener * listener,
FunctionType function)
: inputs_()
, executor_(function)
, action_(NULL)
, slices_(0) {
next_clock_ = 0;
queued_clock_ = 0;
queue_id_ = GetNextProcessID();
inputs_.SetListener(this);
listener_ = listener;
SetScheduler(sched);
}
void SetListener(ClockListener * listener) {
listener_ = listener;
~Sink() {
if (NULL != action_) {
embb::base::Allocation::Free(action_);
}
}
virtual bool HasInputs() const {
......@@ -72,10 +79,8 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
listener_->OnClock(clock);
}
virtual void Init(InitData * init_data) {
SetListener(init_data->sink_listener);
SetScheduler(init_data->sched);
listener_->OnInit(init_data);
virtual bool IsFullyConnected() {
return inputs_.IsFullyConnected();
}
InputsType & GetInputs() {
......@@ -88,15 +93,13 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
}
virtual void OnClock(int clock) {
if (!inputs_.AreAtClock(clock)) {
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
}
EMBB_UNUSED_IN_RELEASE(clock);
assert(inputs_.AreAtClock(clock));
bool retry = true;
while (retry) {
int clk = next_clock_;
int clk_end = clk + Slices;
int clk_end = clk + slices_;
int clk_res = clk;
for (int ii = clk; ii < clk_end; ii++) {
if (!inputs_.AreAtClock(ii)) {
......@@ -108,7 +111,7 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
if (next_clock_.CompareAndSwap(clk, clk_res)) {
while (queued_clock_.Load() < clk) continue;
for (int ii = clk; ii < clk_res; ii++) {
const int idx = ii % Slices;
const int idx = ii % slices_;
action_[idx] = Action(this, ii);
sched_->Enqueue(queue_id_, action_[idx]);
}
......@@ -121,18 +124,32 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
}
}
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private:
InputsType inputs_;
ExecutorType executor_;
Action action_[Slices];
Action * action_;
ClockListener * listener_;
embb::base::Atomic<int> next_clock_;
embb::base::Atomic<int> queued_clock_;
int queue_id_;
int slices_;
virtual void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(action_);
action_ = NULL;
}
slices_ = slices;
inputs_.SetSlices(slices);
if (0 < slices_) {
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
}
}
};
} // namespace internal
......
......@@ -38,8 +38,8 @@ namespace internal {
template <class Inputs>
class SinkExecutor;
template <int Slices, typename I1>
class SinkExecutor< Inputs<Slices, I1> > {
template <typename I1>
class SinkExecutor< Inputs<I1> > {
public:
typedef embb::base::Function<void, I1 const &> FunctionType;
......@@ -47,7 +47,7 @@ class SinkExecutor< Inputs<Slices, I1> > {
void Execute(
int clock,
Inputs<Slices, I1> & inputs) {
Inputs<I1> & inputs) {
function_(
inputs.template Get<0>().GetValue(clock));
}
......@@ -56,8 +56,8 @@ class SinkExecutor< Inputs<Slices, I1> > {
FunctionType function_;
};
template <int Slices, typename I1, typename I2>
class SinkExecutor< Inputs<Slices, I1, I2> > {
template <typename I1, typename I2>
class SinkExecutor< Inputs<I1, I2> > {
public:
typedef embb::base::Function<void, I1 const &, I2 const &> FunctionType;
......@@ -65,7 +65,7 @@ class SinkExecutor< Inputs<Slices, I1, I2> > {
void Execute(
int clock,
Inputs<Slices, I1, I2> & inputs) {
Inputs<I1, I2> & inputs) {
function_(
inputs.template Get<0>().GetValue(clock),
inputs.template Get<1>().GetValue(clock));
......@@ -75,8 +75,8 @@ class SinkExecutor< Inputs<Slices, I1, I2> > {
FunctionType function_;
};
template <int Slices, typename I1, typename I2, typename I3>
class SinkExecutor< Inputs<Slices, I1, I2, I3> > {
template <typename I1, typename I2, typename I3>
class SinkExecutor< Inputs<I1, I2, I3> > {
public:
typedef embb::base::Function<void, I1 const &, I2 const &, I3 const &>
FunctionType;
......@@ -85,7 +85,7 @@ class SinkExecutor< Inputs<Slices, I1, I2, I3> > {
void Execute(
int clock,
Inputs<Slices, I1, I2, I3> & inputs) {
Inputs<I1, I2, I3> & inputs) {
function_(
inputs.template Get<0>().GetValue(clock),
inputs.template Get<1>().GetValue(clock),
......@@ -96,8 +96,8 @@ class SinkExecutor< Inputs<Slices, I1, I2, I3> > {
FunctionType function_;
};
template <int Slices, typename I1, typename I2, typename I3, typename I4>
class SinkExecutor< Inputs<Slices, I1, I2, I3, I4> > {
template <typename I1, typename I2, typename I3, typename I4>
class SinkExecutor< Inputs<I1, I2, I3, I4> > {
public:
typedef embb::base::Function<void, I1 const &, I2 const &, I3 const &,
I4 const &> FunctionType;
......@@ -106,7 +106,7 @@ class SinkExecutor< Inputs<Slices, I1, I2, I3, I4> > {
void Execute(
int clock,
Inputs<Slices, I1, I2, I3, I4> & inputs) {
Inputs<I1, I2, I3, I4> & inputs) {
function_(
inputs.template Get<0>().GetValue(clock),
inputs.template Get<1>().GetValue(clock),
......@@ -118,9 +118,9 @@ class SinkExecutor< Inputs<Slices, I1, I2, I3, I4> > {
FunctionType function_;
};
template <int Slices, typename I1, typename I2, typename I3, typename I4,
template <typename I1, typename I2, typename I3, typename I4,
typename I5>
class SinkExecutor< Inputs<Slices, I1, I2, I3, I4, I5> > {
class SinkExecutor< Inputs<I1, I2, I3, I4, I5> > {
public:
typedef embb::base::Function<void, I1 const &, I2 const &, I3 const &,
I4 const &, I5 const &> FunctionType;
......@@ -129,7 +129,7 @@ class SinkExecutor< Inputs<Slices, I1, I2, I3, I4, I5> > {
void Execute(
int clock,
Inputs<Slices, I1, I2, I3, I4, I5> & inputs) {
Inputs<I1, I2, I3, I4, I5> & inputs) {
function_(
inputs.template Get<0>().GetValue(clock),
inputs.template Get<1>().GetValue(clock),
......
......@@ -37,20 +37,20 @@ namespace embb {
namespace dataflow {
namespace internal {
template <int Slices, class Outputs> class Source;
template <class Outputs> class Source;
template <
int Slices,
typename O1, typename O2, typename O3, typename O4, typename O5>
class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
class Source< Outputs<O1, O2, O3, O4, O5> >
: public Node {
public:
typedef Outputs<Slices, O1, O2, O3, O4, O5> OutputsType;
typedef Outputs<O1, O2, O3, O4, O5> OutputsType;
typedef SourceExecutor< OutputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
explicit Source(FunctionType function)
Source(Scheduler * sched, FunctionType function)
: executor_(function), not_done_(true) {
SetScheduler(sched);
}
virtual bool HasOutputs() const {
......@@ -61,9 +61,8 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
not_done_ = executor_.Execute(clock, outputs_);
}
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
executor_.Init(init_data, outputs_);
virtual bool IsFullyConnected() {
return outputs_.IsFullyConnected();
}
virtual bool Start(int clock) {
......
......@@ -36,13 +36,11 @@ namespace embb {
namespace dataflow {
namespace internal {
class Scheduler;
template <class OUTPUTS>
class SourceExecutor;
template <int Slices, typename O1>
class SourceExecutor< Outputs<Slices, O1> > {
template <typename O1>
class SourceExecutor< Outputs<O1> > {
public:
typedef embb::base::Function<bool, O1 &> FunctionType;
......@@ -50,7 +48,7 @@ class SourceExecutor< Outputs<Slices, O1> > {
bool Execute(
int clock,
Outputs<Slices, O1> & outputs) {
Outputs<O1> & outputs) {
O1 o1;
bool result = function_(o1);
if (result) {
......@@ -59,16 +57,12 @@ class SourceExecutor< Outputs<Slices, O1> > {
return result;
}
void Init(InitData * init_data, Outputs<Slices, O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private:
FunctionType function_;
};
template <int Slices, typename O1, typename O2>
class SourceExecutor< Outputs<Slices, O1, O2> > {
template <typename O1, typename O2>
class SourceExecutor< Outputs<O1, O2> > {
public:
typedef embb::base::Function<bool, O1 &, O2 &> FunctionType;
......@@ -76,7 +70,7 @@ class SourceExecutor< Outputs<Slices, O1, O2> > {
bool Execute(
int clock,
Outputs<Slices, O1, O2> & outputs) {
Outputs<O1, O2> & outputs) {
O1 o1;
O2 o2;
bool result = function_(o1, o2);
......@@ -87,17 +81,12 @@ class SourceExecutor< Outputs<Slices, O1, O2> > {
return result;
}
void Init(InitData * init_data, Outputs<Slices, O1, O2> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
}
private:
FunctionType function_;
};
template <int Slices, typename O1, typename O2, typename O3>
class SourceExecutor< Outputs<Slices, O1, O2, O3> > {
template <typename O1, typename O2, typename O3>
class SourceExecutor< Outputs<O1, O2, O3> > {
public:
typedef embb::base::Function<bool, O1 &, O2 &, O3 &> FunctionType;
......@@ -105,7 +94,7 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3> > {
bool Execute(
int clock,
Outputs<Slices, O1, O2, O3> & outputs) {
Outputs<O1, O2, O3> & outputs) {
O1 o1;
O2 o2;
O3 o3;
......@@ -118,18 +107,12 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3> > {
return result;
}
void Init(InitData * init_data, Outputs<Slices, O1, O2, O3> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
}
private:
FunctionType function_;
};
template <int Slices, typename O1, typename O2, typename O3, typename O4>
class SourceExecutor< Outputs<Slices, O1, O2, O3, O4> > {
template <typename O1, typename O2, typename O3, typename O4>
class SourceExecutor< Outputs<O1, O2, O3, O4> > {
public:
typedef embb::base::Function<bool, O1 &, O2 &, O3 &, O4 &> FunctionType;
......@@ -137,7 +120,7 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3, O4> > {
bool Execute(
int clock,
Outputs<Slices, O1, O2, O3, O4> & outputs) {
Outputs<O1, O2, O3, O4> & outputs) {
O1 o1;
O2 o2;
O3 o3;
......@@ -152,20 +135,13 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3, O4> > {
return result;
}
void Init(InitData * init_data, Outputs<Slices, O1, O2, O3, O4> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
outputs.template Get<3>().SendInit(init_data);
}
private:
FunctionType function_;
};
template <int Slices, typename O1, typename O2, typename O3, typename O4,
template <typename O1, typename O2, typename O3, typename O4,
typename O5>
class SourceExecutor< Outputs<Slices, O1, O2, O3, O4, O5> > {
class SourceExecutor< Outputs<O1, O2, O3, O4, O5> > {
public:
typedef embb::base::Function<bool, O1 &, O2 &, O3 &, O4 &, O5 &> FunctionType;
......@@ -173,7 +149,7 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3, O4, O5> > {
bool Execute(
int clock,
Outputs<Slices, O1, O2, O3, O4, O5> & outputs) {
Outputs<O1, O2, O3, O4, O5> & outputs) {
O1 o1;
O2 o2;
O3 o3;
......@@ -190,15 +166,6 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3, O4, O5> > {
return result;
}
void Init(
InitData * init_data, Outputs<Slices, O1, O2, O3, O4, O5> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
outputs.template Get<3>().SendInit(init_data);
outputs.template Get<4>().SendInit(init_data);
}
private:
FunctionType function_;
};
......
......@@ -27,7 +27,6 @@
#ifndef EMBB_DATAFLOW_INTERNAL_SWITCH_H_
#define EMBB_DATAFLOW_INTERNAL_SWITCH_H_
#include <embb/dataflow/internal/action.h>
#include <embb/dataflow/internal/signal.h>
#include <embb/dataflow/internal/node.h>
#include <embb/dataflow/internal/inputs.h>
......@@ -37,16 +36,17 @@ namespace embb {
namespace dataflow {
namespace internal {
template <int Slices, typename Type>
template <typename Type>
class Switch
: public Node
, public ClockListener {
public:
typedef Inputs<Slices, bool, Type> InputsType;
typedef Outputs<Slices, Type, Type> OutputsType;
typedef Inputs<bool, Type> InputsType;
typedef Outputs<Type, Type> OutputsType;
Switch() {
explicit Switch(Scheduler * sched) : inputs_() {
inputs_.SetListener(this);
SetScheduler(sched);
}
virtual bool HasInputs() const {
......@@ -78,10 +78,17 @@ class Switch
}
}
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
GetOutput<0>().SendInit(init_data);
GetOutput<1>().SendInit(init_data);
virtual bool IsFullyConnected() {
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
}
virtual bool OnHasCycle(ClockListener * node) {
ClockListener * this_node = this;
if (this_node == node) {
return true;
} else {
return outputs_.HasCycle(node);
}
}
InputsType & GetInputs() {
......@@ -110,20 +117,17 @@ class Switch
virtual void OnClock(int clock) {
//const int idx = clock % Slices;
if (!inputs_.AreAtClock(clock))
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
assert(inputs_.AreAtClock(clock));
Run(clock);
}
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private:
InputsType inputs_;
OutputsType outputs_;
Action action_[Slices];
virtual void SetSlices(int slices) {
inputs_.SetSlices(slices);
}
};
} // namespace internal
......
......@@ -39,15 +39,15 @@
#define NUM_SLICES 8
#define TEST_COUNT 12
typedef embb::dataflow::Network<NUM_SLICES> MyNetwork;
typedef embb::dataflow::Network MyNetwork;
typedef MyNetwork::ConstantSource< int > MyConstantSource;
typedef MyNetwork::Source< int > MySource;
typedef MyNetwork::SerialProcess< MyNetwork::Inputs<int>::Type,
MyNetwork::Outputs<bool>::Type > MyPred;
typedef MyNetwork::ParallelProcess< MyNetwork::Inputs<int>::Type,
MyNetwork::Outputs<int>::Type > MyFilter;
typedef MyNetwork::ParallelProcess< MyNetwork::Inputs<int, int>::Type,
MyNetwork::Outputs<int>::Type > MyMult;
typedef MyNetwork::SerialProcess< MyNetwork::Inputs<int>,
MyNetwork::Outputs<bool> > MyPred;
typedef MyNetwork::ParallelProcess< MyNetwork::Inputs<int>,
MyNetwork::Outputs<int> > MyFilter;
typedef MyNetwork::ParallelProcess< MyNetwork::Inputs<int, int>,
MyNetwork::Outputs<int> > MyMult;
typedef MyNetwork::Sink< int > MySink;
typedef MyNetwork::Switch< int > MySwitch;
typedef MyNetwork::Select< int > MySelect;
......@@ -165,15 +165,16 @@ void SimpleTest::TestBasic() {
for (int ii = 0; ii < 10000; ii++) {
ArraySink<TEST_COUNT> asink;
MyNetwork network;
MyConstantSource constant(4);
MySource source(embb::base::MakeFunction(sourceFunc));
MyFilter filter(embb::base::MakeFunction(filterFunc));
MyMult mult(embb::base::MakeFunction(multFunc));
MySink sink(embb::base::MakeFunction(asink, &ArraySink<TEST_COUNT>::Run));
MyPred pred(embb::base::MakeFunction(predFunc));
MySwitch sw;
MySelect sel;
MyNetwork network(NUM_SLICES);
MyConstantSource constant(network, 4);
MySource source(network, embb::base::MakeFunction(sourceFunc));
MyFilter filter(network, embb::base::MakeFunction(filterFunc));
MyMult mult(network, embb::base::MakeFunction(multFunc));
MySink sink(network,
embb::base::MakeFunction(asink, &ArraySink<TEST_COUNT>::Run));
MyPred pred(network, embb::base::MakeFunction(predFunc));
MySwitch sw(network);
MySelect sel(network);
for (int kk = 0; kk < TEST_COUNT; kk++) {
source_array[kk] = -1;
......@@ -208,10 +209,10 @@ void SimpleTest::TestBasic() {
sel.GetOutput<0>() >> sink.GetInput<0>();
network.AddSource(constant);
network.AddSource(source);
try {
if (!network.IsValid()) {
EMBB_THROW(embb::base::ErrorException, "network is invalid");
}
network();
} catch (embb::base::ErrorException & e) {
PT_ASSERT_MSG(false, e.What());
......
......@@ -6,15 +6,14 @@
Network::Source<int>
source1(
network,
embb::base::MakeFunction(producer1, &Producer<int>::Run) ),
source2(
network,
embb::base::MakeFunction(producer2, &Producer<int>::Run) ),
source3(
network,
embb::base::MakeFunction(producer3, &Producer<int>::Run) ),
source4(
network,
embb::base::MakeFunction(producer4, &Producer<int>::Run) );
nw.AddSource(source1);
nw.AddSource(source2);
nw.AddSource(source3);
nw.AddSource(source4);
Network::ParallelProcess<
Network::Inputs<std::string>::Type,
Network::Outputs<std::string>::Type> replace(
embb::base::MakeFunction(ReplaceFunction)
Network::Inputs<std::string>,
Network::Outputs<std::string> > replace(
network, embb::base::MakeFunction(ReplaceFunction)
);
Network::Sink<std::string> write(
embb::base::MakeFunction(SinkFunction)
network, embb::base::MakeFunction(SinkFunction)
);
Network::Source<std::string> read(
embb::base::MakeFunction(SourceFunction)
network, embb::base::MakeFunction(SourceFunction)
);
......@@ -52,10 +52,10 @@ std::string with("hello");
#include "dataflow/dataflow_sink_function-snippet.h"
void RunDataflowLinear() {
#include "dataflow/dataflow_make-snippet.h"
#include "dataflow/dataflow_declare_source-snippet.h"
#include "dataflow/dataflow_declare_replace-snippet.h"
#include "dataflow/dataflow_declare_sink-snippet.h"
#include "dataflow/dataflow_connect-snippet.h"
#include "dataflow/dataflow_add-snippet.h"
#include "dataflow/dataflow_run-snippet.h"
}
typedef embb::dataflow::Network<2> Network;
static Network nw;
typedef embb::dataflow::Network Network;
......@@ -48,22 +48,28 @@ static int SimpleRand(int & seed) {
#include "dataflow/dataflow_network-snippet.h"
void RunDataflowNonLinear() {
#include "dataflow/dataflow_make-snippet.h"
#include "dataflow/dataflow_declare_add_sources-snippet.h"
Comparator<int> comparator;
Network::ParallelProcess<
Network::Inputs<int, int>::Type, Network::Outputs<int, int>::Type>
process1( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ),
process2( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ),
process3( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ),
process4( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ),
process5( embb::base::MakeFunction(comparator, &Comparator<int>::Run) );
Network::Inputs<int, int>, Network::Outputs<int, int> >
process1(network,
embb::base::MakeFunction(comparator, &Comparator<int>::Run)),
process2(network,
embb::base::MakeFunction(comparator, &Comparator<int>::Run)),
process3(network,
embb::base::MakeFunction(comparator, &Comparator<int>::Run)),
process4(network,
embb::base::MakeFunction(comparator, &Comparator<int>::Run)),
process5(network,
embb::base::MakeFunction(comparator, &Comparator<int>::Run));
Consumer<int> consumer;
Network::Sink<int, int, int, int>
sink1(embb::base::MakeFunction(consumer, &Consumer<int>::Run));
sink1(network, embb::base::MakeFunction(consumer, &Consumer<int>::Run));
source1.GetOutput<0>() >> process1.GetInput<0>();
source2.GetOutput<0>() >> process2.GetInput<0>();
......@@ -83,5 +89,5 @@ void RunDataflowNonLinear() {
process5.GetOutput<1>() >> sink1.GetInput<2>();
process4.GetOutput<1>() >> sink1.GetInput<3>();
nw();
#include "dataflow/dataflow_run-snippet.h"
}
......@@ -104,10 +104,14 @@ This pipeline can be easily implemented using the Dataflow building block. As th
%
Then, we have to construct a \emph{network}. A network consists of a set of processes that are connected by communication channels.
%\footnote{Pipelines belong to the most simple networks, where the processes are connected in string-like (linear) fashion.}
\embb provides a class template \lstinline|Network| that can be customized to your needs. For the moment, we are using 2 as a template argument:
\embb provides a class \lstinline|Network| that handles data routing and scheduling of your processes:
%
\\\inputlisting{../examples/dataflow/dataflow_network-snippet.h}
%
We need to prepare the network for the desired maximum number of elements that can be in the network at a time. The number of elements is limited to avoid that the network is flooded with new elements before the previous elements have been processed. In a linear pipeline, for example, this may happen if the source is faster than the sink. In our example, at most four elements may be processed simultaneously: one in the source, one in the sink, and two in the middle stage (see below). Finding an optimal value depends on the application and usually requires some experimentation. In general, large values boost the throughput but also increase the latency. Conversely, small values reduce the latency but may lead to a drop of performance in terms of throughput:
%
\\\inputlisting{../examples/dataflow/dataflow_make-snippet.h}
%
As the next step, we have to construct the processes shown in Figure~\ref{fig:replace_par}. The easiest way to construct a process is to wrap the user-defined code in a lambda function and to pass it to the network. The network constructs an object for that process and executes the lambda function whenever new data is available. There are several methods for constructing processes depending on their type. The process \textbf{read} is a \emph{source} process, since it produces data (by reading it from the specified file) but does not consume any data. Source processes are constructed from a function object
%
\\\inputlisting{../examples/dataflow/dataflow_source_function-snippet.h}
......@@ -142,20 +146,14 @@ is used to construct the sink:
\emph{\textbf{Note:} If you parallelize your own application using \embb and your compiler emits a lengthy error message containing lots of templates, it is very likely that for at least one process, the ports and their directions do not match the signature of the given function.}
The network needs to know about the source declared above, so we add it to our network:
%
\\\inputlisting{../examples/dataflow/dataflow_add-snippet.h}
%
As the last step, we have to connect the processes (ports). This is straightforward using the C++ stream operator:
%
\\\inputlisting{../examples/dataflow/dataflow_connect-snippet.h}
%
Once all connections have been established, we can start the network:
Then we can start the network:
%
\\\inputlisting{../examples/dataflow/dataflow_run-snippet.h}
%
The integer given as a template parameter to the network specifies the maximum number of elements that can be in the network at a time. The number of elements is limited to avoid that the network is flooded with new elements before the previous elements have been processed. In a linear pipeline, for example, this may happen if the source is faster than the sink. In our example, at most four elements may be processed simultaneously: one in the source, one in the sink, and two in the middle stage (see above). Finding an optimal value depends on the application and usually requires some experimentation. In general, large values boost the throughput but also increase the latency. Conversely, small values reduce the latency but may lead to a drop of performance in terms of throughput.
Note that you will probably not observe a speedup when you run this program on a multicore processor. One reason for this is that input$/$output operations like reading a file from the hard disk and writing the output to the screen are typically a bottleneck. Moreover, the amount of work done in the middle stage of the pipeline (\textbf{replace}) is rather low. To outweigh the overhead for parallel execution, the amount of work must be much higher. In image processing, for example, a single pipeline stage may process a complete image. To sum up, we haven chosen this example for its simplicity, not for its efficiency.
......
......@@ -250,6 +250,17 @@ extern "C" {
#endif
/* ---- MCA ORGANIZATION IDS ----------------------------------------------- */
#define MCA_ORG_ID_PSI 0 /* PolyCore Software, Inc. */
#define MCA_ORG_ID_FSL 1 /* Freescale, Inc. */
#define MCA_ORG_ID_MGC 2 /* Mentor Graphics, Corp. */
#define MCA_ORG_ID_ADI 3 /* Analog Devices */
#define MCA_ORG_ID_SIE 4 /* Siemens */
#define MCA_ORG_ID_EMB 5 /* EMB2 project */
#define MCA_ORG_ID_TBD 6 /* TBD */
/* ---- BASIC DEFINITIONS -------------------------------------------------- */
/** marks input parameters */
......@@ -307,8 +318,22 @@ typedef mtapi_uint_t mtapi_size_t;
* \ingroup RUNTIME_INIT_SHUTDOWN
*/
struct mtapi_info_struct {
unsigned int hardware_concurrency; /**< number of CPU cores */
unsigned int used_memory; /**< bytes of memory used by MTAPI */
mtapi_uint_t mtapi_version; /**< The three last (rightmost) hex
digits are the minor number, and
those left of the minor number are
the major number. */
mtapi_uint_t organization_id; /**< Implementation vendor or
organization ID. */
mtapi_uint_t implementation_version; /**< The three last (rightmost) hex
digits are the minor number, and
those left of the minor number are
the major number.*/
mtapi_uint_t number_of_domains; /**< Number of domains allowed by the
implementation.*/
mtapi_uint_t number_of_nodes; /**< Number of nodes allowed by the
implementation.*/
mtapi_uint_t hardware_concurrency; /**< Number of CPU cores available. */
mtapi_uint_t used_memory; /**< Bytes of memory used by MTAPI. */
};
/**
......
......@@ -279,6 +279,7 @@ void mtapi_group_wait_any(
MTAPI_IN mtapi_timeout_t timeout,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
void* local_result = MTAPI_NULL;
embb_mtapi_log_trace("mtapi_group_wait_any() called\n");
......@@ -335,10 +336,7 @@ void mtapi_group_wait_any(
}
/* was there a timeout, or is there a result? */
if (MTAPI_NULL != local_task) {
/* store result */
if (MTAPI_NULL != result) {
*result = local_task->result_buffer;
}
local_result = local_task->result_buffer;
/* return error code set by the task */
local_status = local_task->error_code;
......@@ -356,6 +354,11 @@ void mtapi_group_wait_any(
local_status = MTAPI_ERR_NODE_NOTINIT;
}
/* store result */
if (MTAPI_NULL != result) {
*result = local_result;
}
mtapi_status_set(status, local_status);
embb_mtapi_log_trace("mtapi_group_wait_any() returns\n");
}
......
......@@ -36,14 +36,19 @@ void embb_mtapi_id_pool_initialize(
mtapi_uint_t capacity) {
mtapi_uint_t ii;
that->capacity = capacity;
that->id_buffer = (mtapi_uint_t*)
embb_mtapi_alloc_allocate(sizeof(mtapi_uint_t)*(capacity + 1));
that->id_buffer[0] = EMBB_MTAPI_IDPOOL_INVALID_ID;
for (ii = 1; ii <= capacity; ii++) {
that->id_buffer[ii] = ii;
if (NULL != that->id_buffer) {
that->capacity = capacity;
that->id_buffer[0] = EMBB_MTAPI_IDPOOL_INVALID_ID;
for (ii = 1; ii <= capacity; ii++) {
that->id_buffer[ii] = ii;
}
that->ids_available = capacity;
} else {
that->capacity = 0;
that->ids_available = 0;
}
that->ids_available = capacity;
that->put_id_position = 0;
that->get_id_position = 1;
embb_spin_init(&that->lock);
......
......@@ -41,6 +41,9 @@
mtapi_boolean_t embb_mtapi_job_initialize_list(embb_mtapi_node_t * node) {
node->job_list = (embb_mtapi_job_t*)embb_mtapi_alloc_allocate(
sizeof(embb_mtapi_job_t)*(node->attributes.max_jobs + 1));
if (NULL == node->job_list) {
return MTAPI_FALSE;
}
mtapi_uint_t ii;
for (ii = 0; ii <= node->attributes.max_jobs; ii++) {
embb_mtapi_job_initialize(
......@@ -112,11 +115,15 @@ void embb_mtapi_job_initialize(
that->domain_id = 0;
that->node_id = 0;
that->num_actions = 0;
that->max_actions = max_actions;
that->actions = (mtapi_action_hndl_t*)
embb_mtapi_alloc_allocate(sizeof(mtapi_action_hndl_t)*max_actions);
for (ii = 0; ii < max_actions; ii++) {
that->actions[ii].id = EMBB_MTAPI_IDPOOL_INVALID_ID;
if (NULL != that->actions) {
that->max_actions = max_actions;
for (ii = 0; ii < max_actions; ii++) {
that->actions[ii].id = EMBB_MTAPI_IDPOOL_INVALID_ID;
}
} else {
that->max_actions = 0;
}
}
......@@ -159,7 +166,7 @@ void embb_mtapi_job_remove_action(
embb_mtapi_action_t * action) {
assert(MTAPI_NULL != that);
assert(MTAPI_NULL != action);
mtapi_uint_t ii = 0;
mtapi_uint_t ii;
for (ii = 0; ii + 1 < that->num_actions; ii++) {
if (that->actions[ii].id == action->handle.id &&
......
......@@ -39,6 +39,8 @@
#include <embb_mtapi_scheduler_t.h>
#include <embb_mtapi_attr.h>
#include <embb/base/c/internal/cmake_config.h>
static embb_mtapi_node_t* embb_mtapi_node_instance = NULL;
......@@ -115,29 +117,39 @@ void mtapi_initialize(
node->attributes.max_tasks);
node->queue_pool = embb_mtapi_queue_pool_new(
node->attributes.max_queues);
if (MTAPI_NULL == node->job_list ||
MTAPI_NULL == node->action_pool ||
MTAPI_NULL == node->group_pool ||
MTAPI_NULL == node->task_pool ||
MTAPI_NULL == node->queue_pool) {
mtapi_finalize(NULL);
local_status = MTAPI_ERR_NODE_INITFAILED;
}
/* initialize scheduler for local node */
node->scheduler = embb_mtapi_scheduler_new();
if (MTAPI_NULL != node->scheduler) {
/* fill information structure */
node->info.hardware_concurrency = embb_core_count_available();
node->info.used_memory = embb_mtapi_alloc_get_bytes_allocated();
if (MTAPI_NULL != mtapi_info) {
*mtapi_info = node->info;
}
/* initialization succeeded, tell workers to start working */
embb_atomic_store_int(&node->is_scheduler_running, MTAPI_TRUE);
if (MTAPI_SUCCESS != local_status) {
if (local_status == MTAPI_SUCCESS) {
/* initialize scheduler for local node */
node->scheduler = embb_mtapi_scheduler_new();
if (MTAPI_NULL != node->scheduler) {
/* fill information structure */
node->info.mtapi_version = 0x1000; // mtapi version 1.0
node->info.organization_id = MCA_ORG_ID_EMB;
node->info.implementation_version =
EMBB_BASE_VERSION_MAJOR * 0x1000 + EMBB_BASE_VERSION_MINOR;
node->info.number_of_domains = ~0u;
node->info.number_of_nodes = ~0u;
node->info.hardware_concurrency = embb_core_count_available();
node->info.used_memory = embb_mtapi_alloc_get_bytes_allocated();
if (MTAPI_NULL != mtapi_info) {
*mtapi_info = node->info;
}
/* initialization succeeded, tell workers to start working */
embb_atomic_store_int(&node->is_scheduler_running, MTAPI_TRUE);
} else {
mtapi_finalize(MTAPI_NULL);
local_status = MTAPI_ERR_NODE_INITFAILED;
}
} else {
mtapi_finalize(MTAPI_NULL);
local_status = MTAPI_ERR_NODE_INITFAILED;
}
} else {
embb_mtapi_alloc_deallocate(node);
local_status = MTAPI_ERR_PARAMETER;
......@@ -163,19 +175,29 @@ void mtapi_finalize(MTAPI_OUT mtapi_status_t* status) {
}
/* finalize storage in reverse order */
embb_mtapi_queue_pool_delete(node->queue_pool);
node->queue_pool = MTAPI_NULL;
if (MTAPI_NULL != node->queue_pool) {
embb_mtapi_queue_pool_delete(node->queue_pool);
node->queue_pool = MTAPI_NULL;
}
embb_mtapi_task_pool_delete(node->task_pool);
node->task_pool = MTAPI_NULL;
if (MTAPI_NULL != node->task_pool) {
embb_mtapi_task_pool_delete(node->task_pool);
node->task_pool = MTAPI_NULL;
}
embb_mtapi_group_pool_delete(node->group_pool);
node->group_pool = MTAPI_NULL;
if (MTAPI_NULL != node->group_pool) {
embb_mtapi_group_pool_delete(node->group_pool);
node->group_pool = MTAPI_NULL;
}
embb_mtapi_action_pool_delete(node->action_pool);
node->action_pool = MTAPI_NULL;
if (MTAPI_NULL != node->action_pool) {
embb_mtapi_action_pool_delete(node->action_pool);
node->action_pool = MTAPI_NULL;
}
embb_mtapi_job_finalize_list(node);
if (MTAPI_NULL != node->job_list) {
embb_mtapi_job_finalize_list(node);
}
/* free system instance */
embb_mtapi_alloc_deallocate(node);
......
......@@ -61,13 +61,18 @@ mtapi_boolean_t embb_mtapi_##TYPE##_pool_initialize( \
embb_mtapi_id_pool_initialize(&that->id_pool, capacity); \
that->storage = (embb_mtapi_##TYPE##_t*)embb_mtapi_alloc_allocate( \
sizeof(embb_mtapi_##TYPE##_t)*(capacity + 1)); \
for (ii = 0; ii <= capacity; ii++) { \
that->storage[ii].handle.id = EMBB_MTAPI_IDPOOL_INVALID_ID; \
that->storage[ii].handle.tag = 0; \
if (NULL != that->storage) { \
for (ii = 0; ii <= capacity; ii++) { \
that->storage[ii].handle.id = EMBB_MTAPI_IDPOOL_INVALID_ID; \
that->storage[ii].handle.tag = 0; \
} \
/* use entry 0 as invalid */ \
embb_mtapi_##TYPE##_initialize(that->storage); \
return MTAPI_TRUE; \
} else { \
that->id_pool.ids_available = 0; \
return MTAPI_FALSE; \
} \
/* use entry 0 as invalid */ \
embb_mtapi_##TYPE##_initialize(that->storage); \
return MTAPI_TRUE; \
} \
\
void embb_mtapi_##TYPE##_pool_finalize(embb_mtapi_##TYPE##_pool_t * that) { \
......
......@@ -325,7 +325,7 @@ mtapi_queue_hndl_t mtapi_queue_get(
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
mtapi_uint_t ii = 0;
mtapi_uint_t ii;
local_status = MTAPI_ERR_QUEUE_INVALID;
for (ii = 0; ii < node->attributes.max_queues; ii++) {
......
......@@ -40,6 +40,7 @@
#include <embb_mtapi_action_t.h>
#include <embb_mtapi_alloc.h>
#include <embb_mtapi_queue_t.h>
#include <embb_mtapi_group_t.h>
/* ---- CLASS MEMBERS ------------------------------------------------------ */
......@@ -78,8 +79,7 @@ embb_mtapi_task_t * embb_mtapi_scheduler_get_next_task_vhpf(
embb_mtapi_node_t * node,
embb_mtapi_thread_context_t * thread_context) {
embb_mtapi_task_t * task = MTAPI_NULL;
mtapi_uint_t ii = 0;
mtapi_uint_t kk = 0;
mtapi_uint_t ii;
assert(MTAPI_NULL != that);
assert(MTAPI_NULL != node);
......@@ -102,6 +102,7 @@ embb_mtapi_task_t * embb_mtapi_scheduler_get_next_task_vhpf(
*/
mtapi_uint_t context_index =
(thread_context->worker_index + 1) % that->worker_count;
mtapi_uint_t kk;
for (kk = 0;
kk < that->worker_count - 1 && MTAPI_NULL == task;
kk++) {
......@@ -121,8 +122,7 @@ embb_mtapi_task_t * embb_mtapi_scheduler_get_next_task_lf(
embb_mtapi_node_t * node,
embb_mtapi_thread_context_t * thread_context) {
embb_mtapi_task_t * task = MTAPI_NULL;
mtapi_uint_t prio = 0;
mtapi_uint_t kk = 0;
mtapi_uint_t prio;
assert(MTAPI_NULL != that);
assert(MTAPI_NULL != node);
......@@ -153,6 +153,7 @@ embb_mtapi_task_t * embb_mtapi_scheduler_get_next_task_lf(
prio++) {
mtapi_uint_t context_index =
(thread_context->worker_index + 1) % that->worker_count;
mtapi_uint_t kk;
for (kk = 0;
kk < that->worker_count - 1 && MTAPI_NULL == task;
kk++) {
......@@ -195,7 +196,7 @@ embb_mtapi_task_t * embb_mtapi_scheduler_get_next_task(
embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context(
embb_mtapi_scheduler_t * that) {
mtapi_uint_t ii = 0;
mtapi_uint_t ii;
embb_mtapi_thread_context_t * context = NULL;
assert(MTAPI_NULL != that);
......@@ -293,6 +294,8 @@ int embb_mtapi_scheduler_worker(void * arg) {
/* check if there was work */
if (MTAPI_NULL != task) {
embb_mtapi_queue_t * local_queue = MTAPI_NULL;
embb_mtapi_group_t * local_group = MTAPI_NULL;
embb_mtapi_action_t * local_action = MTAPI_NULL;
/* is task associated with a queue? */
if (embb_mtapi_queue_pool_is_handle_valid(
......@@ -302,6 +305,21 @@ int embb_mtapi_scheduler_worker(void * arg) {
node->queue_pool, task->queue);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, task->group)) {
local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, task->group);
}
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
}
switch (embb_atomic_load_int(&task->state)) {
case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
......@@ -328,7 +346,7 @@ int embb_mtapi_scheduler_worker(void * arg) {
break;
case MTAPI_TASK_CANCELLED:
/* set return value to canceled */
/* set return value to cancelled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
......@@ -336,6 +354,12 @@ int embb_mtapi_scheduler_worker(void * arg) {
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
if (MTAPI_NULL != local_group) {
embb_mtapi_task_queue_push(&local_group->queue, task);
}
}
if (MTAPI_NULL != local_action) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
break;
......@@ -434,7 +458,7 @@ mtapi_boolean_t embb_mtapi_scheduler_initialize_with_mode(
embb_mtapi_scheduler_t * that,
embb_mtapi_scheduler_mode_t mode) {
embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
mtapi_uint_t ii = 0;
mtapi_uint_t ii;
embb_mtapi_log_trace("embb_mtapi_scheduler_initialize() called\n");
......@@ -456,6 +480,10 @@ mtapi_boolean_t embb_mtapi_scheduler_initialize_with_mode(
that->worker_contexts = (embb_mtapi_thread_context_t*)
embb_mtapi_alloc_allocate(
sizeof(embb_mtapi_thread_context_t)*that->worker_count);
if (NULL == that->worker_contexts) {
return MTAPI_FALSE;
}
mtapi_boolean_t isinit = MTAPI_TRUE;
for (ii = 0; ii < that->worker_count; ii++) {
unsigned int core_num = 0;
mtapi_uint_t ll = 0;
......@@ -467,9 +495,12 @@ mtapi_boolean_t embb_mtapi_scheduler_initialize_with_mode(
}
core_num++;
}
embb_mtapi_thread_context_initialize_with_node_worker_and_core(
isinit &= embb_mtapi_thread_context_initialize_with_node_worker_and_core(
&that->worker_contexts[ii], node, ii, core_num);
}
if (!isinit) {
return MTAPI_FALSE;
}
for (ii = 0; ii < that->worker_count; ii++) {
if (MTAPI_FALSE == embb_mtapi_thread_context_start(
&that->worker_contexts[ii], that)) {
......@@ -481,22 +512,24 @@ mtapi_boolean_t embb_mtapi_scheduler_initialize_with_mode(
}
void embb_mtapi_scheduler_finalize(embb_mtapi_scheduler_t * that) {
mtapi_uint_t ii = 0;
mtapi_uint_t ii;
embb_mtapi_log_trace("embb_mtapi_scheduler_finalize() called\n");
assert(MTAPI_NULL != that);
/* finalize all workers */
for (ii = 0; ii < that->worker_count; ii++) {
embb_mtapi_thread_context_stop(&that->worker_contexts[ii]);
}
for (ii = 0; ii < that->worker_count; ii++) {
embb_mtapi_thread_context_finalize(&that->worker_contexts[ii]);
}
if (MTAPI_NULL != that->worker_contexts) {
/* finalize all workers */
for (ii = 0; ii < that->worker_count; ii++) {
embb_mtapi_thread_context_stop(&that->worker_contexts[ii]);
}
for (ii = 0; ii < that->worker_count; ii++) {
embb_mtapi_thread_context_finalize(&that->worker_contexts[ii]);
}
that->worker_count = 0;
embb_mtapi_alloc_deallocate(that->worker_contexts);
that->worker_contexts = MTAPI_NULL;
that->worker_count = 0;
embb_mtapi_alloc_deallocate(that->worker_contexts);
that->worker_contexts = MTAPI_NULL;
}
}
embb_mtapi_scheduler_t * embb_mtapi_scheduler_new() {
......@@ -506,6 +539,7 @@ embb_mtapi_scheduler_t * embb_mtapi_scheduler_new() {
if (MTAPI_NULL != that) {
if (MTAPI_FALSE == embb_mtapi_scheduler_initialize(that)) {
/* on error delete and return MTAPI_NULL */
embb_mtapi_scheduler_finalize(that);
embb_mtapi_scheduler_delete(that);
return MTAPI_NULL;
}
......
......@@ -501,7 +501,6 @@ void mtapi_task_cancel(
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t* local_task =
embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
/* call plugin action cancel function */
if (embb_mtapi_action_pool_is_handle_valid(
......@@ -511,8 +510,14 @@ void mtapi_task_cancel(
node->action_pool, local_task->action);
if (local_action->is_plugin_action) {
local_action->plugin_task_cancel_function(task, &local_status);
} else {
embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
local_task->error_code = MTAPI_ERR_ACTION_CANCELLED;
local_status = MTAPI_SUCCESS;
}
} else {
embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
local_task->error_code = MTAPI_ERR_ACTION_CANCELLED;
local_status = MTAPI_SUCCESS;
}
} else {
......
......@@ -38,12 +38,13 @@
/* ---- CLASS MEMBERS ------------------------------------------------------ */
void embb_mtapi_thread_context_initialize_with_node_worker_and_core(
mtapi_boolean_t embb_mtapi_thread_context_initialize_with_node_worker_and_core(
embb_mtapi_thread_context_t* that,
embb_mtapi_node_t* node,
mtapi_uint_t worker_index,
mtapi_uint_t core_num) {
mtapi_uint_t ii;
mtapi_boolean_t result = MTAPI_TRUE;
assert(MTAPI_NULL != that);
assert(MTAPI_NULL != node);
......@@ -52,25 +53,55 @@ void embb_mtapi_thread_context_initialize_with_node_worker_and_core(
that->worker_index = worker_index;
that->core_num = core_num;
that->priorities = node->attributes.max_priorities;
that->is_initialized = MTAPI_FALSE;
embb_atomic_store_int(&that->run, 0);
that->queue = (embb_mtapi_task_queue_t**)embb_mtapi_alloc_allocate(
sizeof(embb_mtapi_task_queue_t)*that->priorities);
that->private_queue = (embb_mtapi_task_queue_t**)embb_mtapi_alloc_allocate(
sizeof(embb_mtapi_task_queue_t)*that->priorities);
if (that->queue == NULL) {
that->private_queue = NULL;
return MTAPI_FALSE;
}
for (ii = 0; ii < that->priorities; ii++) {
that->queue[ii] = (embb_mtapi_task_queue_t*)
embb_mtapi_alloc_allocate(sizeof(embb_mtapi_task_queue_t));
embb_mtapi_task_queue_initialize_with_capacity(
that->queue[ii], node->attributes.queue_limit);
if (that->queue[ii] != NULL) {
embb_mtapi_task_queue_initialize_with_capacity(
that->queue[ii], node->attributes.queue_limit);
} else {
result = MTAPI_FALSE;
}
}
if (!result) {
return MTAPI_FALSE;
}
that->private_queue = (embb_mtapi_task_queue_t**)embb_mtapi_alloc_allocate(
sizeof(embb_mtapi_task_queue_t)*that->priorities);
if (that->private_queue == NULL) {
return MTAPI_FALSE;
}
for (ii = 0; ii < that->priorities; ii++) {
that->private_queue[ii] = (embb_mtapi_task_queue_t*)
embb_mtapi_alloc_allocate(sizeof(embb_mtapi_task_queue_t));
embb_mtapi_task_queue_initialize_with_capacity(
that->private_queue[ii], node->attributes.queue_limit);
if (that->private_queue[ii] != NULL) {
embb_mtapi_task_queue_initialize_with_capacity(
that->private_queue[ii], node->attributes.queue_limit);
} else {
result = MTAPI_FALSE;
}
}
if (!result) {
return MTAPI_FALSE;
}
embb_mutex_init(&that->work_available_mutex, EMBB_MUTEX_PLAIN);
embb_condition_init(&that->work_available);
embb_atomic_store_int(&that->is_sleeping, 0);
that->is_initialized = MTAPI_TRUE;
return MTAPI_TRUE;
}
mtapi_boolean_t embb_mtapi_thread_context_start(
......@@ -126,22 +157,37 @@ void embb_mtapi_thread_context_finalize(embb_mtapi_thread_context_t* that) {
embb_mtapi_log_trace("embb_mtapi_thread_context_finalize() called\n");
embb_condition_destroy(&that->work_available);
embb_mutex_destroy(&that->work_available_mutex);
if (that->is_initialized) {
embb_condition_destroy(&that->work_available);
embb_mutex_destroy(&that->work_available_mutex);
}
for (ii = 0; ii < that->priorities; ii++) {
embb_mtapi_task_queue_finalize(that->queue[ii]);
embb_mtapi_alloc_deallocate(that->queue[ii]);
that->queue[ii] = MTAPI_NULL;
embb_mtapi_task_queue_finalize(that->private_queue[ii]);
embb_mtapi_alloc_deallocate(that->private_queue[ii]);
that->private_queue[ii] = MTAPI_NULL;
if (that->queue != NULL) {
for (ii = 0; ii < that->priorities; ii++) {
if (that->queue[ii] != NULL) {
embb_mtapi_task_queue_finalize(that->queue[ii]);
embb_mtapi_alloc_deallocate(that->queue[ii]);
that->queue[ii] = MTAPI_NULL;
}
}
embb_mtapi_alloc_deallocate(that->queue);
that->queue = MTAPI_NULL;
}
embb_mtapi_alloc_deallocate(that->queue);
that->queue = MTAPI_NULL;
embb_mtapi_alloc_deallocate(that->private_queue);
that->private_queue = MTAPI_NULL;
if (that->private_queue != NULL) {
for (ii = 0; ii < that->priorities; ii++) {
if (that->private_queue[ii] != NULL) {
embb_mtapi_task_queue_finalize(that->private_queue[ii]);
embb_mtapi_alloc_deallocate(that->private_queue[ii]);
that->private_queue[ii] = MTAPI_NULL;
}
}
embb_mtapi_alloc_deallocate(that->private_queue);
that->private_queue = MTAPI_NULL;
}
that->priorities = 0;
that->is_initialized = MTAPI_FALSE;
that->node = MTAPI_NULL;
}
......
......@@ -67,6 +67,7 @@ struct embb_mtapi_thread_context_struct {
mtapi_uint_t core_num;
embb_atomic_int run;
mtapi_status_t status;
mtapi_boolean_t is_initialized;
};
#include <embb_mtapi_thread_context_t_fwd.h>
......@@ -74,8 +75,9 @@ struct embb_mtapi_thread_context_struct {
/**
* Constructor using attributes from node and a given core number.
* \memberof embb_mtapi_thread_context_struct
* \returns MTAPI_TRUE if successful, MTAPI_FALSE on error
*/
void embb_mtapi_thread_context_initialize_with_node_worker_and_core(
mtapi_boolean_t embb_mtapi_thread_context_initialize_with_node_worker_and_core(
embb_mtapi_thread_context_t* that,
embb_mtapi_node_t* node,
mtapi_uint_t worker_index,
......
......@@ -28,9 +28,10 @@
#include <embb/base/c/memory_allocation.h>
#include <string.h>
void embb_mtapi_network_buffer_initialize(
int embb_mtapi_network_buffer_initialize(
embb_mtapi_network_buffer_t * that,
int capacity) {
int result = 1;
that->position = 0;
that->size = 0;
that->data = (char*)embb_alloc((size_t)capacity);
......@@ -38,7 +39,9 @@ void embb_mtapi_network_buffer_initialize(
that->capacity = capacity;
} else {
that->capacity = 0;
result = 0;
}
return result;
}
void embb_mtapi_network_buffer_finalize(
......@@ -107,6 +110,7 @@ int embb_mtapi_network_buffer_pop_front_int8(
embb_mtapi_network_buffer_t * that,
int8_t * value) {
if (that->position + 1 > that->size) {
*value = 0;
return 0;
}
memcpy(value, that->data + that->position, 1);
......@@ -118,6 +122,7 @@ int embb_mtapi_network_buffer_pop_front_int16(
embb_mtapi_network_buffer_t * that,
int16_t * value) {
if (that->position + 2 > that->size) {
*value = 0;
return 0;
}
memcpy(value, that->data + that->position, 2);
......@@ -129,6 +134,7 @@ int embb_mtapi_network_buffer_pop_front_int32(
embb_mtapi_network_buffer_t * that,
int32_t * value) {
if (that->position + 4 > that->size) {
*value = 0;
return 0;
}
memcpy(value, that->data + that->position, 4);
......@@ -141,6 +147,7 @@ int embb_mtapi_network_buffer_pop_front_rawdata(
int32_t size,
void * rawdata) {
if (that->position + size > that->size) {
memset(rawdata, 0, (size_t)size);
return 0;
}
memcpy(rawdata, that->data + that->position, (size_t)size);
......
......@@ -43,7 +43,7 @@ struct embb_mtapi_network_buffer_struct {
typedef struct embb_mtapi_network_buffer_struct embb_mtapi_network_buffer_t;
void embb_mtapi_network_buffer_initialize(
int embb_mtapi_network_buffer_initialize(
embb_mtapi_network_buffer_t * that,
int capacity
);
......
......@@ -71,13 +71,6 @@ int embb_mtapi_network_socket_bind_and_listen(
uint16_t port,
uint16_t max_connections) {
struct sockaddr_in in_addr;
int reuseaddr_on = 1;
// addr reuse
if (SOCKET_ERROR == setsockopt(that->handle, SOL_SOCKET, SO_REUSEADDR,
(const char *)&reuseaddr_on, sizeof(reuseaddr_on))) {
return 0;
}
// bind & listen
memset(&in_addr, 0, sizeof(in_addr));
......@@ -123,7 +116,8 @@ int embb_mtapi_network_socket_connect(
if (SOCKET_ERROR == connect(that->handle, (struct sockaddr *)&addr,
sizeof(addr))) {
#ifdef _WIN32
if (WSAEWOULDBLOCK != WSAGetLastError())
int err = WSAGetLastError();
if (WSAEWOULDBLOCK != err)
#else
if (EAGAIN != errno)
#endif
......
......@@ -61,13 +61,52 @@ static void test(
}
}
static void cancel_test(
void const * /*arguments*/,
mtapi_size_t /*arguments_size*/,
void * /*result_buffer*/,
mtapi_size_t /*result_buffer_size*/,
void const * /*node_local_data*/,
mtapi_size_t /*node_local_data_size*/,
mtapi_task_context_t * context) {
mtapi_status_t status;
while (true) {
mtapi_task_state_t state = mtapi_context_taskstate_get(context, &status);
if (status != MTAPI_SUCCESS) {
break;
} else {
if (state == MTAPI_TASK_CANCELLED) {
break;
}
}
}
}
NetworkTaskTest::NetworkTaskTest() {
CreateUnit("mtapi network task test").Add(&NetworkTaskTest::TestBasic, this);
CreateUnit("mtapi network task test")
.Add(&NetworkTaskTest::TestBasic, this);
}
void NetworkTaskTest::TestBasic() {
mtapi_status_t status;
mtapi_initialize(
NETWORK_DOMAIN,
NETWORK_LOCAL_NODE,
MTAPI_NULL,
MTAPI_NULL,
&status);
MTAPI_CHECK_STATUS(status);
TestSimple();
TestCancel();
mtapi_finalize(&status);
MTAPI_CHECK_STATUS(status);
}
void NetworkTaskTest::TestSimple() {
mtapi_status_t status;
mtapi_job_hndl_t job;
mtapi_task_hndl_t task;
mtapi_action_hndl_t network_action, local_action;
......@@ -81,14 +120,6 @@ void NetworkTaskTest::TestBasic() {
arguments[ii + kElements] = static_cast<float>(ii);
}
mtapi_initialize(
NETWORK_DOMAIN,
NETWORK_LOCAL_NODE,
MTAPI_NULL,
MTAPI_NULL,
&status);
MTAPI_CHECK_STATUS(status);
mtapi_network_plugin_initialize("127.0.0.1", 12345, 5,
kElements * 4 * 3 + 32, &status);
MTAPI_CHECK_STATUS(status);
......@@ -139,7 +170,68 @@ void NetworkTaskTest::TestBasic() {
mtapi_network_plugin_finalize(&status);
MTAPI_CHECK_STATUS(status);
}
mtapi_finalize(&status);
void NetworkTaskTest::TestCancel() {
mtapi_status_t status;
mtapi_job_hndl_t job;
mtapi_task_hndl_t task;
mtapi_action_hndl_t network_action, local_action;
float argument = 1.0f;
float result;
mtapi_network_plugin_initialize("127.0.0.1", 12346, 5,
4 * 3 + 32, &status);
MTAPI_CHECK_STATUS(status);
float node_remote = 1.0f;
local_action = mtapi_action_create(
NETWORK_REMOTE_JOB,
cancel_test,
&node_remote, sizeof(float),
MTAPI_DEFAULT_ACTION_ATTRIBUTES,
&status);
MTAPI_CHECK_STATUS(status);
network_action = mtapi_network_action_create(
NETWORK_DOMAIN,
NETWORK_LOCAL_JOB,
NETWORK_REMOTE_JOB,
"127.0.0.1", 12346,
&status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
job = mtapi_job_get(NETWORK_LOCAL_JOB, NETWORK_DOMAIN, &status);
MTAPI_CHECK_STATUS(status);
task = mtapi_task_start(
MTAPI_TASK_ID_NONE,
job,
&argument, sizeof(float),
&result, sizeof(float),
MTAPI_DEFAULT_TASK_ATTRIBUTES,
MTAPI_GROUP_NONE,
&status);
MTAPI_CHECK_STATUS(status);
mtapi_task_wait(task, 1, &status);
PT_ASSERT_EQ(status, MTAPI_TIMEOUT);
mtapi_task_cancel(task, &status);
MTAPI_CHECK_STATUS(status);
mtapi_task_wait(task, MTAPI_INFINITE, &status);
PT_ASSERT_NE(status, MTAPI_TIMEOUT);
PT_ASSERT_EQ(status, MTAPI_ERR_ACTION_CANCELLED);
mtapi_action_delete(network_action, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
mtapi_action_delete(local_action, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
mtapi_network_plugin_finalize(&status);
MTAPI_CHECK_STATUS(status);
}
......@@ -35,6 +35,9 @@ class NetworkTaskTest : public partest::TestCase {
private:
void TestBasic();
void TestSimple();
void TestCancel();
};
#endif // MTAPI_PLUGINS_C_MTAPI_NETWORK_C_TEST_EMBB_MTAPI_NETWORK_TEST_TASK_H_
......@@ -130,6 +130,24 @@ class Node {
}
/**
* Returns the number of available groups.
* \return The number of available groups
* \waitfree
*/
mtapi_uint_t GetGroupCount() const {
return group_count_;
}
/**
* Returns the number of available tasks.
* \return The number of available tasks
* \waitfree
*/
mtapi_uint_t GetTaskLimit() const {
return task_limit_;
}
/**
* Returns the number of available cores.
* \return The number of available cores
* \waitfree
......@@ -229,6 +247,8 @@ class Node {
mtapi_task_context_t * context);
mtapi_uint_t queue_count_;
mtapi_uint_t group_count_;
mtapi_uint_t task_limit_;
mtapi_uint_t core_count_;
mtapi_uint_t worker_thread_count_;
mtapi_action_hndl_t action_handle_;
......
......@@ -41,7 +41,7 @@ namespace {
static embb::tasks::Node * node_instance = NULL;
#if TASKS_CPP_AUTOMATIC_INITIALIZE
static embb::base::Mutex init_mutex;
static embb_spinlock_t init_mutex = { { 0 } };
#endif
}
......@@ -78,6 +78,12 @@ Node::Node(
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_QUEUES, &queue_count_,
sizeof(queue_count_), &status);
assert(MTAPI_SUCCESS == status);
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_GROUPS, &group_count_,
sizeof(group_count_), &status);
assert(MTAPI_SUCCESS == status);
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_TASKS, &task_limit_,
sizeof(queue_count_), &status);
assert(MTAPI_SUCCESS == status);
core_count_ = info.hardware_concurrency;
worker_thread_count_ = embb_core_set_count(&attr->core_affinity);
action_handle_ = mtapi_action_create(TASKS_CPP_JOB, action_func,
......@@ -126,7 +132,7 @@ void Node::Initialize(
mtapi_nodeattr_set(&attr, MTAPI_NODE_MAX_ACTIONS,
&tmp, sizeof(tmp), &status);
assert(MTAPI_SUCCESS == status);
tmp = 4;
// tmp = 4;
mtapi_nodeattr_set(&attr, MTAPI_NODE_MAX_JOBS,
&tmp, sizeof(tmp), &status);
assert(MTAPI_SUCCESS == status);
......@@ -207,13 +213,13 @@ bool Node::IsInitialized() {
Node & Node::GetInstance() {
#if TASKS_CPP_AUTOMATIC_INITIALIZE
if (!IsInitialized()) {
init_mutex.Lock();
embb_spin_lock(&init_mutex);
if (!IsInitialized()) {
Node::Initialize(
TASKS_CPP_AUTOMATIC_DOMAIN_ID, TASKS_CPP_AUTOMATIC_NODE_ID);
atexit(Node::Finalize);
}
init_mutex.Unlock();
embb_spin_unlock(&init_mutex);
}
return *node_instance;
#else
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment