Commit deaee276 by Marcus Winter

Merge branch 'embb533_worker_thread_os_priorities' into development

parents 33355366 ad762084
...@@ -57,6 +57,19 @@ typedef opaque_type embb_thread_t; ...@@ -57,6 +57,19 @@ typedef opaque_type embb_thread_t;
#endif /* DOXYGEN */ #endif /* DOXYGEN */
/** /**
* Thread priority type.
*/
typedef enum {
EMBB_THREAD_PRIORITY_IDLE,
EMBB_THREAD_PRIORITY_LOWEST,
EMBB_THREAD_PRIORITY_BELOW_NORMAL,
EMBB_THREAD_PRIORITY_NORMAL,
EMBB_THREAD_PRIORITY_ABOVE_NORMAL,
EMBB_THREAD_PRIORITY_HIGHEST,
EMBB_THREAD_PRIORITY_TIME_CRITICAL
} embb_thread_priority_t;
/**
* Thread start function pointer type. * Thread start function pointer type.
* *
* The return value can be used to return a user-defined exit code when the * The return value can be used to return a user-defined exit code when the
...@@ -123,7 +136,7 @@ void embb_thread_yield(); ...@@ -123,7 +136,7 @@ void embb_thread_yield();
* Creates and runs a thread. * Creates and runs a thread.
* *
* \pre The given thread is not running and has not yet been successfully * \pre The given thread is not running and has not yet been successfully
joined. * joined.
* \post On success, the given thread has started to run. * \post On success, the given thread has started to run.
* \return EMBB_SUCCESS if the thread could be created. \n * \return EMBB_SUCCESS if the thread could be created. \n
* EMBB_NOMEM if there was insufficient amount of memory \n * EMBB_NOMEM if there was insufficient amount of memory \n
...@@ -148,6 +161,36 @@ int embb_thread_create( ...@@ -148,6 +161,36 @@ int embb_thread_create(
); );
/** /**
* Creates and runs a thread.
*
* \pre The given thread is not running and has not yet been successfully
* joined.
* \post On success, the given thread has started to run.
* \return EMBB_SUCCESS if the thread could be created. \n
* EMBB_NOMEM if there was insufficient amount of memory \n
* EMBB_ERROR otherwise.
* \memory Dynamically allocates a small constant amount of memory to store the
* function and argument pointers. This memory is freed when the thread
* is joined.
* \notthreadsafe
* \see embb_thread_join()
*/
int embb_thread_create_with_priority(
embb_thread_t* thread,
/**< [OUT] Thread to be run */
const embb_core_set_t* core_set,
/**< [IN] Set of cores on which the thread shall be executed. Can be NULL to
indicate automatic thread scheduling by the OS. */
embb_thread_priority_t priority,
/**< [IN] Priority to run the thread at. */
embb_thread_start_t function,
/**< [IN] Function which is executed by the thread when started. Has to be of
type embb_thread_start_t. */
void* arg
/**< [IN/OUT] Argument to thread start function. Can be NULL. */
);
/**
* Waits until the given thread has finished execution. * Waits until the given thread has finished execution.
* *
* \pre The given thread has been successfully created using * \pre The given thread has been successfully created using
......
...@@ -40,6 +40,15 @@ void embb_thread_set_max_count(unsigned int max) { ...@@ -40,6 +40,15 @@ void embb_thread_set_max_count(unsigned int max) {
embb_internal_thread_index_set_max(max); embb_internal_thread_index_set_max(max);
} }
int embb_thread_create(
embb_thread_t* thread,
const embb_core_set_t* core_set,
embb_thread_start_t func,
void *arg) {
return embb_thread_create_with_priority(thread, core_set,
EMBB_THREAD_PRIORITY_NORMAL, func, arg);
}
#ifdef EMBB_PLATFORM_THREADING_WINTHREADS #ifdef EMBB_PLATFORM_THREADING_WINTHREADS
/** /**
...@@ -78,8 +87,12 @@ void embb_thread_yield() { ...@@ -78,8 +87,12 @@ void embb_thread_yield() {
SwitchToThread(); SwitchToThread();
} }
int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set, int embb_thread_create_with_priority(
embb_thread_start_t func, void *arg) { embb_thread_t* thread,
const embb_core_set_t* core_set,
embb_thread_priority_t priority,
embb_thread_start_t func,
void *arg) {
if (thread == NULL) { if (thread == NULL) {
return EMBB_ERROR; return EMBB_ERROR;
} }
...@@ -121,6 +134,36 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set, ...@@ -121,6 +134,36 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
} }
} }
int internal_priority;
switch (priority) {
case EMBB_THREAD_PRIORITY_IDLE:
internal_priority = THREAD_PRIORITY_IDLE;
break;
case EMBB_THREAD_PRIORITY_LOWEST:
internal_priority = THREAD_PRIORITY_LOWEST;
break;
case EMBB_THREAD_PRIORITY_BELOW_NORMAL:
internal_priority = THREAD_PRIORITY_BELOW_NORMAL;
break;
case EMBB_THREAD_PRIORITY_ABOVE_NORMAL:
internal_priority = THREAD_PRIORITY_ABOVE_NORMAL;
break;
case EMBB_THREAD_PRIORITY_HIGHEST:
internal_priority = THREAD_PRIORITY_HIGHEST;
break;
case EMBB_THREAD_PRIORITY_TIME_CRITICAL:
internal_priority = THREAD_PRIORITY_TIME_CRITICAL;
break;
case EMBB_THREAD_PRIORITY_NORMAL:
default:
internal_priority = THREAD_PRIORITY_NORMAL;
break;
}
BOOL result = SetThreadPriority(thread->embb_internal_handle, internal_priority);
if (result == 0) {
return EMBB_ERROR;
}
return EMBB_SUCCESS; return EMBB_SUCCESS;
} }
...@@ -180,12 +223,18 @@ int embb_thread_equal(const embb_thread_t* lhs, const embb_thread_t* rhs) { ...@@ -180,12 +223,18 @@ int embb_thread_equal(const embb_thread_t* lhs, const embb_thread_t* rhs) {
#include <sys/sysinfo.h> /* Used to get number of processors */ #include <sys/sysinfo.h> /* Used to get number of processors */
#endif /* EMBB_PLATFORM_HAS_HEADER_SYSINFO */ #endif /* EMBB_PLATFORM_HAS_HEADER_SYSINFO */
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <sys/resource.h>
/** /**
* Used to wrap client thread start function and argument when calling internal * Used to wrap client thread start function and argument when calling internal
* thread start function embb_internal_thread_start. * thread start function embb_internal_thread_start.
*/ */
typedef struct embb_internal_thread_arg_t { typedef struct embb_internal_thread_arg_t {
embb_thread_start_t func; embb_thread_start_t func;
int priority;
void* arg; void* arg;
int result; int result;
} embb_internal_thread_arg_t; } embb_internal_thread_arg_t;
...@@ -197,6 +246,12 @@ typedef struct embb_internal_thread_arg_t { ...@@ -197,6 +246,12 @@ typedef struct embb_internal_thread_arg_t {
* argument. * argument.
*/ */
void* embb_internal_thread_start(void* internalArg) { void* embb_internal_thread_start(void* internalArg) {
#ifdef EMBB_PLATFORM_HAS_GLIB_CPU
pid_t tid;
tid = syscall(SYS_gettid);
setpriority(PRIO_PROCESS, tid,
((embb_internal_thread_arg_t*)internalArg)->priority);
#endif
((embb_internal_thread_arg_t*)internalArg)->result = ((embb_internal_thread_arg_t*)internalArg)->result =
((embb_internal_thread_arg_t*)internalArg)->func( ((embb_internal_thread_arg_t*)internalArg)->func(
((struct embb_internal_thread_arg_t*)internalArg)->arg); ((struct embb_internal_thread_arg_t*)internalArg)->arg);
...@@ -214,8 +269,12 @@ void embb_thread_yield() { ...@@ -214,8 +269,12 @@ void embb_thread_yield() {
pthread_yield(); pthread_yield();
} }
int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set, int embb_thread_create_with_priority(
embb_thread_start_t func, void* arg) { embb_thread_t* thread,
const embb_core_set_t* core_set,
embb_thread_priority_t priority,
embb_thread_start_t func,
void* arg) {
if (thread == NULL) { if (thread == NULL) {
return EMBB_ERROR; return EMBB_ERROR;
} }
...@@ -261,6 +320,31 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set, ...@@ -261,6 +320,31 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
thread->embb_internal_arg->func = func; thread->embb_internal_arg->func = func;
thread->embb_internal_arg->arg = arg; thread->embb_internal_arg->arg = arg;
switch (priority) {
case EMBB_THREAD_PRIORITY_IDLE:
thread->embb_internal_arg->priority = 19;
break;
case EMBB_THREAD_PRIORITY_LOWEST:
thread->embb_internal_arg->priority = 2;
break;
case EMBB_THREAD_PRIORITY_BELOW_NORMAL:
thread->embb_internal_arg->priority = 1;
break;
case EMBB_THREAD_PRIORITY_ABOVE_NORMAL:
thread->embb_internal_arg->priority = -1;
break;
case EMBB_THREAD_PRIORITY_HIGHEST:
thread->embb_internal_arg->priority = -2;
break;
case EMBB_THREAD_PRIORITY_TIME_CRITICAL:
thread->embb_internal_arg->priority = -19;
break;
case EMBB_THREAD_PRIORITY_NORMAL:
default:
thread->embb_internal_arg->priority = 0;
break;
}
status = pthread_create( status = pthread_create(
&(thread->embb_internal_handle), /* pthread handle */ &(thread->embb_internal_handle), /* pthread handle */
&attr, /* additional attributes, &attr, /* additional attributes,
......
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
#define EMBB_BASE_INTERNAL_THREAD_INL_H_ #define EMBB_BASE_INTERNAL_THREAD_INL_H_
#include <embb/base/exceptions.h> #include <embb/base/exceptions.h>
#include <embb/base/c/thread.h>
#include <embb/base/internal/thread_closures.h> #include <embb/base/internal/thread_closures.h>
#include <embb/base/memory_allocation.h> #include <embb/base/memory_allocation.h>
#include <iostream> #include <iostream>
...@@ -60,6 +59,22 @@ Thread::Thread(CoreSet& core_set, Function function) : rep_() { ...@@ -60,6 +59,22 @@ Thread::Thread(CoreSet& core_set, Function function) : rep_() {
CheckThreadCreationErrors(result, closure); CheckThreadCreationErrors(result, closure);
} }
template<typename Function>
Thread::Thread(
CoreSet& core_set,
embb_thread_priority_t priority,
Function function) : rep_() {
typedef internal::ThreadClosure<Function> Closure;
Closure* closure = Allocation::New<Closure>(function);
int result = embb_thread_create_with_priority(
&rep_,
&core_set.rep_,
priority,
internal::ThreadClosure<Function>::ThreadStart,
static_cast<void*>(closure));
CheckThreadCreationErrors(result, closure);
}
template<typename Function, typename Arg1> template<typename Function, typename Arg1>
Thread::Thread(Function function, Arg1 arg1) : rep_() { Thread::Thread(Function function, Arg1 arg1) : rep_() {
typedef internal::ThreadClosureArg1<Function, Arg1> Closure; typedef internal::ThreadClosureArg1<Function, Arg1> Closure;
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include <embb/base/internal/thread_closures.h> #include <embb/base/internal/thread_closures.h>
#include <embb/base/mutex.h> #include <embb/base/mutex.h>
#include <embb/base/core_set.h> #include <embb/base/core_set.h>
#include <embb/base/c/thread.h>
#include <ostream> #include <ostream>
namespace embb { namespace embb {
...@@ -177,9 +178,33 @@ class Thread { ...@@ -177,9 +178,33 @@ class Thread {
* \tparam Function Function object type * \tparam Function Function object type
*/ */
template<typename Function> template<typename Function>
explicit Thread( Thread(
CoreSet& core_set,
/**< [IN] Set of cores on which the thread shall be executed. */
Function function
/**< [IN] Copyable function object, callable without arguments */
);
/**
* Creates and runs a thread with zero-argument start function.
*
* \note If the function is passed as a temporary object when creating a
* thread, this might be interpreted as a function declaration ("most vexing
* parse"). C++11 resolves this by using curly braces for initialization.
*
* \throws NoMemoryException if not enough memory is available
* \throws ErrorException in case of another error
* \memory A small constant amount of memory to store the function. This
* memory is freed the thread is joined.
* \notthreadsafe
* \tparam Function Function object type
*/
template<typename Function>
Thread(
CoreSet& core_set, CoreSet& core_set,
/**< [IN] Set of cores on which the thread shall be executed. */ /**< [IN] Set of cores on which the thread shall be executed. */
embb_thread_priority_t priority,
/**< [IN] Priority of the new thread. */
Function function Function function
/**< [IN] Copyable function object, callable without arguments */ /**< [IN] Copyable function object, callable without arguments */
); );
......
...@@ -244,6 +244,7 @@ ...@@ -244,6 +244,7 @@
#include <stdint.h> #include <stdint.h>
#include <embb/base/c/core_set.h> #include <embb/base/c/core_set.h>
#include <embb/base/c/thread.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -529,6 +530,96 @@ enum mtapi_notification_enum { ...@@ -529,6 +530,96 @@ enum mtapi_notification_enum {
typedef enum mtapi_notification_enum mtapi_notification_t; typedef enum mtapi_notification_enum mtapi_notification_t;
/**< runtime notification */ /**< runtime notification */
/**
* Enum to select default or specific worker for priority setter
*/
enum mtapi_worker_priority_type_enum {
MTAPI_WORKER_PRIORITY_END = 0,
MTAPI_WORKER_PRIORITY_DEFAULT = 1,
MTAPI_WORKER_PRIORITY_WORKER = 100,
MTAPI_WORKER_PRIORITY_WORKER_0 = MTAPI_WORKER_PRIORITY_WORKER + 0,
MTAPI_WORKER_PRIORITY_WORKER_1 = MTAPI_WORKER_PRIORITY_WORKER + 1,
MTAPI_WORKER_PRIORITY_WORKER_2 = MTAPI_WORKER_PRIORITY_WORKER + 2,
MTAPI_WORKER_PRIORITY_WORKER_3 = MTAPI_WORKER_PRIORITY_WORKER + 3,
MTAPI_WORKER_PRIORITY_WORKER_4 = MTAPI_WORKER_PRIORITY_WORKER + 4,
MTAPI_WORKER_PRIORITY_WORKER_5 = MTAPI_WORKER_PRIORITY_WORKER + 5,
MTAPI_WORKER_PRIORITY_WORKER_6 = MTAPI_WORKER_PRIORITY_WORKER + 6,
MTAPI_WORKER_PRIORITY_WORKER_7 = MTAPI_WORKER_PRIORITY_WORKER + 7,
MTAPI_WORKER_PRIORITY_WORKER_8 = MTAPI_WORKER_PRIORITY_WORKER + 8,
MTAPI_WORKER_PRIORITY_WORKER_9 = MTAPI_WORKER_PRIORITY_WORKER + 9,
MTAPI_WORKER_PRIORITY_WORKER_10 = MTAPI_WORKER_PRIORITY_WORKER + 10,
MTAPI_WORKER_PRIORITY_WORKER_11 = MTAPI_WORKER_PRIORITY_WORKER + 11,
MTAPI_WORKER_PRIORITY_WORKER_12 = MTAPI_WORKER_PRIORITY_WORKER + 12,
MTAPI_WORKER_PRIORITY_WORKER_13 = MTAPI_WORKER_PRIORITY_WORKER + 13,
MTAPI_WORKER_PRIORITY_WORKER_14 = MTAPI_WORKER_PRIORITY_WORKER + 14,
MTAPI_WORKER_PRIORITY_WORKER_15 = MTAPI_WORKER_PRIORITY_WORKER + 15,
MTAPI_WORKER_PRIORITY_WORKER_16 = MTAPI_WORKER_PRIORITY_WORKER + 16,
MTAPI_WORKER_PRIORITY_WORKER_17 = MTAPI_WORKER_PRIORITY_WORKER + 17,
MTAPI_WORKER_PRIORITY_WORKER_18 = MTAPI_WORKER_PRIORITY_WORKER + 18,
MTAPI_WORKER_PRIORITY_WORKER_19 = MTAPI_WORKER_PRIORITY_WORKER + 19,
MTAPI_WORKER_PRIORITY_WORKER_20 = MTAPI_WORKER_PRIORITY_WORKER + 20,
MTAPI_WORKER_PRIORITY_WORKER_21 = MTAPI_WORKER_PRIORITY_WORKER + 21,
MTAPI_WORKER_PRIORITY_WORKER_22 = MTAPI_WORKER_PRIORITY_WORKER + 22,
MTAPI_WORKER_PRIORITY_WORKER_23 = MTAPI_WORKER_PRIORITY_WORKER + 23,
MTAPI_WORKER_PRIORITY_WORKER_24 = MTAPI_WORKER_PRIORITY_WORKER + 24,
MTAPI_WORKER_PRIORITY_WORKER_25 = MTAPI_WORKER_PRIORITY_WORKER + 25,
MTAPI_WORKER_PRIORITY_WORKER_26 = MTAPI_WORKER_PRIORITY_WORKER + 26,
MTAPI_WORKER_PRIORITY_WORKER_27 = MTAPI_WORKER_PRIORITY_WORKER + 27,
MTAPI_WORKER_PRIORITY_WORKER_28 = MTAPI_WORKER_PRIORITY_WORKER + 28,
MTAPI_WORKER_PRIORITY_WORKER_29 = MTAPI_WORKER_PRIORITY_WORKER + 29,
MTAPI_WORKER_PRIORITY_WORKER_30 = MTAPI_WORKER_PRIORITY_WORKER + 30,
MTAPI_WORKER_PRIORITY_WORKER_31 = MTAPI_WORKER_PRIORITY_WORKER + 31,
MTAPI_WORKER_PRIORITY_WORKER_32 = MTAPI_WORKER_PRIORITY_WORKER + 32,
MTAPI_WORKER_PRIORITY_WORKER_33 = MTAPI_WORKER_PRIORITY_WORKER + 33,
MTAPI_WORKER_PRIORITY_WORKER_34 = MTAPI_WORKER_PRIORITY_WORKER + 34,
MTAPI_WORKER_PRIORITY_WORKER_35 = MTAPI_WORKER_PRIORITY_WORKER + 35,
MTAPI_WORKER_PRIORITY_WORKER_36 = MTAPI_WORKER_PRIORITY_WORKER + 36,
MTAPI_WORKER_PRIORITY_WORKER_37 = MTAPI_WORKER_PRIORITY_WORKER + 37,
MTAPI_WORKER_PRIORITY_WORKER_38 = MTAPI_WORKER_PRIORITY_WORKER + 38,
MTAPI_WORKER_PRIORITY_WORKER_39 = MTAPI_WORKER_PRIORITY_WORKER + 39,
MTAPI_WORKER_PRIORITY_WORKER_40 = MTAPI_WORKER_PRIORITY_WORKER + 40,
MTAPI_WORKER_PRIORITY_WORKER_41 = MTAPI_WORKER_PRIORITY_WORKER + 41,
MTAPI_WORKER_PRIORITY_WORKER_42 = MTAPI_WORKER_PRIORITY_WORKER + 42,
MTAPI_WORKER_PRIORITY_WORKER_43 = MTAPI_WORKER_PRIORITY_WORKER + 43,
MTAPI_WORKER_PRIORITY_WORKER_44 = MTAPI_WORKER_PRIORITY_WORKER + 44,
MTAPI_WORKER_PRIORITY_WORKER_45 = MTAPI_WORKER_PRIORITY_WORKER + 45,
MTAPI_WORKER_PRIORITY_WORKER_46 = MTAPI_WORKER_PRIORITY_WORKER + 46,
MTAPI_WORKER_PRIORITY_WORKER_47 = MTAPI_WORKER_PRIORITY_WORKER + 47,
MTAPI_WORKER_PRIORITY_WORKER_48 = MTAPI_WORKER_PRIORITY_WORKER + 48,
MTAPI_WORKER_PRIORITY_WORKER_49 = MTAPI_WORKER_PRIORITY_WORKER + 49,
MTAPI_WORKER_PRIORITY_WORKER_50 = MTAPI_WORKER_PRIORITY_WORKER + 50,
MTAPI_WORKER_PRIORITY_WORKER_51 = MTAPI_WORKER_PRIORITY_WORKER + 51,
MTAPI_WORKER_PRIORITY_WORKER_52 = MTAPI_WORKER_PRIORITY_WORKER + 52,
MTAPI_WORKER_PRIORITY_WORKER_53 = MTAPI_WORKER_PRIORITY_WORKER + 53,
MTAPI_WORKER_PRIORITY_WORKER_54 = MTAPI_WORKER_PRIORITY_WORKER + 54,
MTAPI_WORKER_PRIORITY_WORKER_55 = MTAPI_WORKER_PRIORITY_WORKER + 55,
MTAPI_WORKER_PRIORITY_WORKER_56 = MTAPI_WORKER_PRIORITY_WORKER + 56,
MTAPI_WORKER_PRIORITY_WORKER_57 = MTAPI_WORKER_PRIORITY_WORKER + 57,
MTAPI_WORKER_PRIORITY_WORKER_58 = MTAPI_WORKER_PRIORITY_WORKER + 58,
MTAPI_WORKER_PRIORITY_WORKER_59 = MTAPI_WORKER_PRIORITY_WORKER + 59,
MTAPI_WORKER_PRIORITY_WORKER_60 = MTAPI_WORKER_PRIORITY_WORKER + 60,
MTAPI_WORKER_PRIORITY_WORKER_61 = MTAPI_WORKER_PRIORITY_WORKER + 61,
MTAPI_WORKER_PRIORITY_WORKER_62 = MTAPI_WORKER_PRIORITY_WORKER + 62,
MTAPI_WORKER_PRIORITY_WORKER_63 = MTAPI_WORKER_PRIORITY_WORKER + 63
};
/**
* Enum to select default or specific worker for priority setter
*/
typedef enum mtapi_worker_priority_type_enum mtapi_worker_priority_type_t;
/**
* Describes the default priority of all workers or the priority of a
* specific worker.
*/
struct mtapi_worker_priority_entry_struct {
mtapi_worker_priority_type_t type; /**< default or specific worker */
embb_thread_priority_t priority; /**< priority to set */
};
/**
* Describes the default priority of all workers or the priority of a
* specific worker.
*/
typedef struct mtapi_worker_priority_entry_struct mtapi_worker_priority_entry_t;
/** /**
* Node attributes, to be extended for implementation specific attributes * Node attributes, to be extended for implementation specific attributes
...@@ -554,7 +645,8 @@ enum mtapi_node_attributes_enum { ...@@ -554,7 +645,8 @@ enum mtapi_node_attributes_enum {
allowed by the node */ allowed by the node */
MTAPI_NODE_MAX_PRIORITIES, /**< maximum number of priorities MTAPI_NODE_MAX_PRIORITIES, /**< maximum number of priorities
allowed by the node */ allowed by the node */
MTAPI_NODE_REUSE_MAIN_THREAD /**< reuse main thread as worker */ MTAPI_NODE_REUSE_MAIN_THREAD, /**< reuse main thread as worker */
MTAPI_NODE_WORKER_PRIORITIES /**< set worker priorites */
}; };
/** size of the \a MTAPI_NODE_CORE_AFFINITY attribute */ /** size of the \a MTAPI_NODE_CORE_AFFINITY attribute */
#define MTAPI_NODE_CORE_AFFINITY_SIZE sizeof(embb_core_set_t) #define MTAPI_NODE_CORE_AFFINITY_SIZE sizeof(embb_core_set_t)
...@@ -580,6 +672,8 @@ enum mtapi_node_attributes_enum { ...@@ -580,6 +672,8 @@ enum mtapi_node_attributes_enum {
#define MTAPI_NODE_MAX_PRIORITIES_SIZE sizeof(mtapi_uint_t) #define MTAPI_NODE_MAX_PRIORITIES_SIZE sizeof(mtapi_uint_t)
/** size of the \a MTAPI_NODE_REUSE_MAIN_THREAD attribute */ /** size of the \a MTAPI_NODE_REUSE_MAIN_THREAD attribute */
#define MTAPI_NODE_REUSE_MAIN_THREAD_SIZE sizeof(mtapi_boolean_t) #define MTAPI_NODE_REUSE_MAIN_THREAD_SIZE sizeof(mtapi_boolean_t)
/** size of the \a MTAPI_NODE_WORKER_PRIORITIES attribute */
#define MTAPI_NODE_WORKER_PRIORITIES_SIZE 0
/* example attribute value */ /* example attribute value */
#define MTAPI_NODE_TYPE_SMP 1 #define MTAPI_NODE_TYPE_SMP 1
...@@ -693,6 +787,9 @@ struct mtapi_node_attributes_struct { ...@@ -693,6 +787,9 @@ struct mtapi_node_attributes_struct {
mtapi_uint_t max_priorities; /**< stores MTAPI_NODE_MAX_PRIORITIES */ mtapi_uint_t max_priorities; /**< stores MTAPI_NODE_MAX_PRIORITIES */
mtapi_boolean_t reuse_main_thread; /**< stores mtapi_boolean_t reuse_main_thread; /**< stores
MTAPI_NODE_REUSE_MAIN_THREAD */ MTAPI_NODE_REUSE_MAIN_THREAD */
mtapi_worker_priority_entry_t * worker_priorities;
/**< stores
MTAPI_NODE_WORKER_PRIORITIES */
}; };
/** /**
......
...@@ -503,8 +503,25 @@ mtapi_boolean_t embb_mtapi_scheduler_initialize_with_mode( ...@@ -503,8 +503,25 @@ mtapi_boolean_t embb_mtapi_scheduler_initialize_with_mode(
} }
core_num++; core_num++;
} }
isinit &= embb_mtapi_thread_context_initialize_with_node_worker_and_core( embb_thread_priority_t priority = EMBB_THREAD_PRIORITY_NORMAL;
&that->worker_contexts[ii], node, ii, core_num); if (NULL != node->attributes.worker_priorities) {
mtapi_worker_priority_entry_t * entry =
node->attributes.worker_priorities;
mtapi_worker_priority_type_t type = entry->type;
while (type != MTAPI_WORKER_PRIORITY_END) {
if (type == MTAPI_WORKER_PRIORITY_DEFAULT) {
priority = entry->priority;
} else if (type ==
(mtapi_worker_priority_type_t)(MTAPI_WORKER_PRIORITY_WORKER + ii)) {
priority = entry->priority;
break;
}
entry++;
type = entry->type;
}
}
isinit &= embb_mtapi_thread_context_initialize(
&that->worker_contexts[ii], node, ii, core_num, priority);
} }
if (!isinit) { if (!isinit) {
return MTAPI_FALSE; return MTAPI_FALSE;
......
...@@ -38,11 +38,12 @@ ...@@ -38,11 +38,12 @@
/* ---- CLASS MEMBERS ------------------------------------------------------ */ /* ---- CLASS MEMBERS ------------------------------------------------------ */
mtapi_boolean_t embb_mtapi_thread_context_initialize_with_node_worker_and_core( mtapi_boolean_t embb_mtapi_thread_context_initialize(
embb_mtapi_thread_context_t* that, embb_mtapi_thread_context_t* that,
embb_mtapi_node_t* node, embb_mtapi_node_t* node,
mtapi_uint_t worker_index, mtapi_uint_t worker_index,
mtapi_uint_t core_num) { mtapi_uint_t core_num,
embb_thread_priority_t priority) {
mtapi_uint_t ii; mtapi_uint_t ii;
mtapi_boolean_t result = MTAPI_TRUE; mtapi_boolean_t result = MTAPI_TRUE;
...@@ -54,6 +55,7 @@ mtapi_boolean_t embb_mtapi_thread_context_initialize_with_node_worker_and_core( ...@@ -54,6 +55,7 @@ mtapi_boolean_t embb_mtapi_thread_context_initialize_with_node_worker_and_core(
that->core_num = core_num; that->core_num = core_num;
that->priorities = node->attributes.max_priorities; that->priorities = node->attributes.max_priorities;
that->is_initialized = MTAPI_FALSE; that->is_initialized = MTAPI_FALSE;
that->thread_priority = priority;
that->is_main_thread = (worker_index == 0) ? that->is_main_thread = (worker_index == 0) ?
node->attributes.reuse_main_thread : MTAPI_FALSE; node->attributes.reuse_main_thread : MTAPI_FALSE;
embb_atomic_store_int(&that->run, 0); embb_atomic_store_int(&that->run, 0);
...@@ -135,7 +137,8 @@ mtapi_boolean_t embb_mtapi_thread_context_start( ...@@ -135,7 +137,8 @@ mtapi_boolean_t embb_mtapi_thread_context_start(
embb_tss_set(&(that->tss_id), that); embb_tss_set(&(that->tss_id), that);
embb_atomic_store_int(&that->run, 1); embb_atomic_store_int(&that->run, 1);
} else { } else {
err = embb_thread_create(&that->thread, &core_set, worker_func, that); err = embb_thread_create_with_priority(
&that->thread, &core_set, that->thread_priority, worker_func, that);
if (EMBB_SUCCESS != err) { if (EMBB_SUCCESS != err) {
embb_mtapi_log_error( embb_mtapi_log_error(
"embb_mtapi_ThreadContext_initializeWithNodeAndCoreNumber() could not " "embb_mtapi_ThreadContext_initializeWithNodeAndCoreNumber() could not "
......
...@@ -69,6 +69,8 @@ struct embb_mtapi_thread_context_struct { ...@@ -69,6 +69,8 @@ struct embb_mtapi_thread_context_struct {
mtapi_status_t status; mtapi_status_t status;
mtapi_boolean_t is_initialized; mtapi_boolean_t is_initialized;
mtapi_boolean_t is_main_thread; mtapi_boolean_t is_main_thread;
embb_thread_priority_t thread_priority;
}; };
#include <embb_mtapi_thread_context_t_fwd.h> #include <embb_mtapi_thread_context_t_fwd.h>
...@@ -78,11 +80,12 @@ struct embb_mtapi_thread_context_struct { ...@@ -78,11 +80,12 @@ struct embb_mtapi_thread_context_struct {
* \memberof embb_mtapi_thread_context_struct * \memberof embb_mtapi_thread_context_struct
* \returns MTAPI_TRUE if successful, MTAPI_FALSE on error * \returns MTAPI_TRUE if successful, MTAPI_FALSE on error
*/ */
mtapi_boolean_t embb_mtapi_thread_context_initialize_with_node_worker_and_core( mtapi_boolean_t embb_mtapi_thread_context_initialize(
embb_mtapi_thread_context_t* that, embb_mtapi_thread_context_t* that,
embb_mtapi_node_t* node, embb_mtapi_node_t* node,
mtapi_uint_t worker_index, mtapi_uint_t worker_index,
mtapi_uint_t core_num); mtapi_uint_t core_num,
embb_thread_priority_t priority);
/** /**
* Destructor. * Destructor.
......
...@@ -53,6 +53,7 @@ void mtapi_nodeattr_init( ...@@ -53,6 +53,7 @@ void mtapi_nodeattr_init(
attributes->max_actions_per_job = MTAPI_NODE_MAX_ACTIONS_PER_JOB_DEFAULT; attributes->max_actions_per_job = MTAPI_NODE_MAX_ACTIONS_PER_JOB_DEFAULT;
attributes->max_priorities = MTAPI_NODE_MAX_PRIORITIES_DEFAULT; attributes->max_priorities = MTAPI_NODE_MAX_PRIORITIES_DEFAULT;
attributes->reuse_main_thread = MTAPI_FALSE; attributes->reuse_main_thread = MTAPI_FALSE;
attributes->worker_priorities = NULL;
embb_core_set_init(&attributes->core_affinity, 1); embb_core_set_init(&attributes->core_affinity, 1);
attributes->num_cores = embb_core_set_count(&attributes->core_affinity); attributes->num_cores = embb_core_set_count(&attributes->core_affinity);
...@@ -149,6 +150,11 @@ void mtapi_nodeattr_set( ...@@ -149,6 +150,11 @@ void mtapi_nodeattr_set(
&attributes->reuse_main_thread, attribute, attribute_size); &attributes->reuse_main_thread, attribute, attribute_size);
break; break;
case MTAPI_NODE_WORKER_PRIORITIES:
local_status = MTAPI_SUCCESS;
attributes->worker_priorities = (mtapi_worker_priority_entry_t*)attribute;
break;
default: default:
/* attribute unknown */ /* attribute unknown */
local_status = MTAPI_ERR_ATTR_NUM; local_status = MTAPI_ERR_ATTR_NUM;
......
...@@ -87,6 +87,23 @@ class NodeAttributes { ...@@ -87,6 +87,23 @@ class NodeAttributes {
} }
/** /**
* Sets the priority of the specified worker threads.
*
* \returns Reference to this object.
* \notthreadsafe
*/
NodeAttributes & SetWorkerPriority(
mtapi_worker_priority_entry_t * worker_priorities
/**< Array of priorities */
) {
mtapi_status_t status;
mtapi_nodeattr_set(&attributes_, MTAPI_NODE_WORKER_PRIORITIES,
worker_priorities, MTAPI_NODE_WORKER_PRIORITIES_SIZE, &status);
internal::CheckStatus(status);
return *this;
}
/**
* Sets the maximum number of concurrently active tasks. * Sets the maximum number of concurrently active tasks.
* *
* \returns Reference to this object. * \returns Reference to this object.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment