From 132011c6b86c9e42942299bbe8c7c3cba72db589 Mon Sep 17 00:00:00 2001 From: Tobias Langer Date: Fri, 19 Aug 2016 11:47:14 +0200 Subject: [PATCH] Added global task queue. --- mtapi_c/src/embb_mtapi_node_t.c | 9 +++++++++ mtapi_c/src/embb_mtapi_node_t.h | 7 +++++++ mtapi_c/src/embb_mtapi_scheduler_t.c | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------------------------------- mtapi_c/src/embb_mtapi_scheduler_t.h | 4 ++++ 4 files changed, 83 insertions(+), 51 deletions(-) diff --git a/mtapi_c/src/embb_mtapi_node_t.c b/mtapi_c/src/embb_mtapi_node_t.c index 32126ce..c9d28ef 100644 --- a/mtapi_c/src/embb_mtapi_node_t.c +++ b/mtapi_c/src/embb_mtapi_node_t.c @@ -126,6 +126,12 @@ void mtapi_initialize( local_status = MTAPI_ERR_NODE_INITFAILED; } +#ifdef EMBB_HARD_REALTIME + embb_mtapi_task_queue_initialize_with_capacity( + &node->global_task_queue, + node->attributes.max_tasks); +#endif + if (local_status == MTAPI_SUCCESS) { /* initialize scheduler for local node */ node->scheduler = embb_mtapi_scheduler_new(); @@ -200,6 +206,9 @@ void mtapi_finalize(MTAPI_OUT mtapi_status_t* status) { } embb_atomic_destroy_int(&node->is_scheduler_running); +#ifdef EMBB_HARD_REALTIME + embb_mtapi_task_queue_finalize(&node->global_task_queue); +#endif /* free system instance */ embb_mtapi_alloc_deallocate(node); diff --git a/mtapi_c/src/embb_mtapi_node_t.h b/mtapi_c/src/embb_mtapi_node_t.h index be16101..ba2f3e8 100644 --- a/mtapi_c/src/embb_mtapi_node_t.h +++ b/mtapi_c/src/embb_mtapi_node_t.h @@ -32,6 +32,10 @@ #include +#ifdef EMBB_HARD_REALTIME +#include +#endif /*EMBB_HARD_REALTIME*/ + #ifdef __cplusplus extern "C" { #endif @@ -71,6 +75,9 @@ struct embb_mtapi_node_struct { embb_mtapi_queue_pool_t * queue_pool; embb_atomic_int is_scheduler_running; mtapi_affinity_t affinity_all; +#ifdef EMBB_HARD_REALTIME + embb_mtapi_task_queue_t global_task_queue; +#endif /*EMBB_HARD_REALTIME*/ }; #include diff --git a/mtapi_c/src/embb_mtapi_scheduler_t.c b/mtapi_c/src/embb_mtapi_scheduler_t.c index 51d5c6e..2964ace 100644 --- a/mtapi_c/src/embb_mtapi_scheduler_t.c +++ b/mtapi_c/src/embb_mtapi_scheduler_t.c @@ -37,6 +37,9 @@ #include #include #include +#ifdef EMBB_HARD_REALTIME +#include +#endif /*EMBB_HARD_REALTIME*/ #include #include #include @@ -631,67 +634,76 @@ mtapi_boolean_t embb_mtapi_scheduler_schedule_task( assert(MTAPI_NULL != node); - if (embb_mtapi_action_pool_is_handle_valid( - node->action_pool, task->action)) { - embb_mtapi_queue_t* local_queue = MTAPI_NULL; - /* fetch action and schedule */ - embb_mtapi_action_t* local_action = - embb_mtapi_action_pool_get_storage_for_handle( - node->action_pool, task->action); +#ifdef EMBB_HARD_REALTIME + if (scheduler->mode == GLOBAL_EDF) { + embb_mtapi_task_queue_push(&node->global_task_queue, task); + pushed = MTAPI_TRUE; + } else { +#endif /*EMBB_HARD_REALTIME*/ + if (embb_mtapi_action_pool_is_handle_valid( + node->action_pool, task->action)) { + embb_mtapi_queue_t* local_queue = MTAPI_NULL; + /* fetch action and schedule */ + embb_mtapi_action_t* local_action = + embb_mtapi_action_pool_get_storage_for_handle( + node->action_pool, task->action); - mtapi_affinity_t affinity = - local_action->attributes.affinity & task->attributes.affinity; + mtapi_affinity_t affinity = + local_action->attributes.affinity & task->attributes.affinity; - /* check if task is running from an ordered queue */ - if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, task->queue)) { - local_queue = embb_mtapi_queue_pool_get_storage_for_handle( - node->queue_pool, task->queue); - if (local_queue->attributes.ordered) { - /* yes, modify affinity accordingly */ - affinity = local_queue->ordered_affinity; + /* check if task is running from an ordered queue */ + if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, task->queue)) { + local_queue = embb_mtapi_queue_pool_get_storage_for_handle( + node->queue_pool, task->queue); + if (local_queue->attributes.ordered) { + /* yes, modify affinity accordingly */ + affinity = local_queue->ordered_affinity; + } } - } - /* check affinity */ - if (affinity == 0) { - affinity = node->affinity_all; - } - - /* one more task in flight for this action */ - embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); + /* check affinity */ + if (affinity == 0) { + affinity = node->affinity_all; + } - if (affinity == node->affinity_all) { - /* no affinity restrictions, schedule for stealing */ - pushed = embb_mtapi_task_queue_push( - scheduler->worker_contexts[ii].queue[task->attributes.priority], - task); - } else { - mtapi_status_t affinity_status; - - /* affinity is restricted, check and adapt scheduling target */ - ii = (mtapi_uint_t)embb_atomic_fetch_and_add_int( - &scheduler->affine_task_counter, 1); - while (MTAPI_FALSE == mtapi_affinity_get( - &affinity, ii, &affinity_status)) { - ii = (ii + 1) % scheduler->worker_count; + /* one more task in flight for this action */ + embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); + + if (affinity == node->affinity_all) { + /* no affinity restrictions, schedule for stealing */ + pushed = embb_mtapi_task_queue_push( + scheduler->worker_contexts[ii].queue[task->attributes.priority], + task); + } else { + mtapi_status_t affinity_status; + + /* affinity is restricted, check and adapt scheduling target */ + ii = (mtapi_uint_t)embb_atomic_fetch_and_add_int( + &scheduler->affine_task_counter, 1); + while (MTAPI_FALSE == mtapi_affinity_get( + &affinity, ii, &affinity_status)) { + ii = (ii + 1) % scheduler->worker_count; + } + /* schedule into private queue to disable stealing */ + pushed = embb_mtapi_task_queue_push( + scheduler->worker_contexts[ii].private_queue[task->attributes.priority], + task); } - /* schedule into private queue to disable stealing */ - pushed = embb_mtapi_task_queue_push( - scheduler->worker_contexts[ii].private_queue[task->attributes.priority], - task); - } - if (pushed) { - /* signal the worker thread a task was pushed to */ - if (embb_atomic_load_int(&scheduler->worker_contexts[ii].is_sleeping)) { - embb_condition_notify_one( - &scheduler->worker_contexts[ii].work_available); + if (pushed) { + /* signal the worker thread a task was pushed to */ + if (embb_atomic_load_int(&scheduler->worker_contexts[ii].is_sleeping)) { + embb_condition_notify_one( + &scheduler->worker_contexts[ii].work_available); + } + } else { + /* task could not be launched */ + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); } - } else { - /* task could not be launched */ - embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); } +#ifdef EMBB_HARD_REALTIME } +#endif /*EMBB_HARD_REALTIME*/ return pushed; } diff --git a/mtapi_c/src/embb_mtapi_scheduler_t.h b/mtapi_c/src/embb_mtapi_scheduler_t.h index 654bb90..f6cc062 100644 --- a/mtapi_c/src/embb_mtapi_scheduler_t.h +++ b/mtapi_c/src/embb_mtapi_scheduler_t.h @@ -59,6 +59,10 @@ enum embb_mtapi_scheduler_mode_enum { WORK_STEAL_VHPF = 0, // Local First. Steal if all local queues are empty. WORK_STEAL_LF = 1, +#ifdef EMBB_HARD_REALTIME + // Global EDF. No work stealing + GLOBAL_EDF = 2, +#endif /*EMBB_HARD_REALTIME*/ NUM_SCHEDULER_MODES }; -- libgit2 0.26.0