diff --git a/mtapi_c/src/embb_mtapi_scheduler_t.c b/mtapi_c/src/embb_mtapi_scheduler_t.c index 0495030..1bdeddf 100644 --- a/mtapi_c/src/embb_mtapi_scheduler_t.c +++ b/mtapi_c/src/embb_mtapi_scheduler_t.c @@ -658,76 +658,85 @@ mtapi_boolean_t embb_mtapi_scheduler_schedule_task( assert(MTAPI_NULL != node); + if (embb_mtapi_action_pool_is_handle_valid( + node->action_pool, task->action)) { #ifdef EMBB_HARD_REALTIME - if (scheduler->mode == GLOBAL_EDF) { - embb_mtapi_task_queue_push(&node->global_task_queue, task); - pushed = MTAPI_TRUE; - } else { -#endif /*EMBB_HARD_REALTIME*/ - if (embb_mtapi_action_pool_is_handle_valid( - node->action_pool, task->action)) { - embb_mtapi_queue_t* local_queue = MTAPI_NULL; + if (scheduler->mode == GLOBAL_EDF) { /* fetch action and schedule */ embb_mtapi_action_t* local_action = embb_mtapi_action_pool_get_storage_for_handle( node->action_pool, task->action); + /* one more task in flight for this action */ + embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); - mtapi_affinity_t affinity = - local_action->attributes.affinity & task->attributes.affinity; - - /* check if task is running from an ordered queue */ - if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, task->queue)) { - local_queue = embb_mtapi_queue_pool_get_storage_for_handle( - node->queue_pool, task->queue); - if (local_queue->attributes.ordered) { - /* yes, modify affinity accordingly */ - affinity = local_queue->ordered_affinity; - } + pushed = embb_mtapi_task_queue_push(&node->global_task_queue, task); + if (!pushed) { + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); } + } else { +#endif /*EMBB_HARD_REALTIME*/ + embb_mtapi_queue_t* local_queue = MTAPI_NULL; + /* fetch action and schedule */ + embb_mtapi_action_t* local_action = + embb_mtapi_action_pool_get_storage_for_handle( + node->action_pool, task->action); - /* check affinity */ - if (affinity == 0) { - affinity = node->affinity_all; + mtapi_affinity_t affinity = + local_action->attributes.affinity & task->attributes.affinity; + + /* check if task is running from an ordered queue */ + if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, task->queue)) { + local_queue = embb_mtapi_queue_pool_get_storage_for_handle( + node->queue_pool, task->queue); + if (local_queue->attributes.ordered) { + /* yes, modify affinity accordingly */ + affinity = local_queue->ordered_affinity; } + } - /* one more task in flight for this action */ - embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); + /* check affinity */ + if (affinity == 0) { + affinity = node->affinity_all; + } - if (affinity == node->affinity_all) { - /* no affinity restrictions, schedule for stealing */ - pushed = embb_mtapi_task_queue_push( - scheduler->worker_contexts[ii].queue[task->attributes.priority], - task); - } else { - mtapi_status_t affinity_status; - - /* affinity is restricted, check and adapt scheduling target */ - ii = (mtapi_uint_t)embb_atomic_fetch_and_add_int( - &scheduler->affine_task_counter, 1); - while (MTAPI_FALSE == mtapi_affinity_get( - &affinity, ii, &affinity_status)) { - ii = (ii + 1) % scheduler->worker_count; - } - /* schedule into private queue to disable stealing */ - pushed = embb_mtapi_task_queue_push( - scheduler->worker_contexts[ii].private_queue[task->attributes.priority], - task); + /* one more task in flight for this action */ + embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); + + if (affinity == node->affinity_all) { + /* no affinity restrictions, schedule for stealing */ + pushed = embb_mtapi_task_queue_push( + scheduler->worker_contexts[ii].queue[task->attributes.priority], + task); + } else { + mtapi_status_t affinity_status; + + /* affinity is restricted, check and adapt scheduling target */ + ii = (mtapi_uint_t)embb_atomic_fetch_and_add_int( + &scheduler->affine_task_counter, 1); + while (MTAPI_FALSE == mtapi_affinity_get( + &affinity, ii, &affinity_status)) { + ii = (ii + 1) % scheduler->worker_count; } + /* schedule into private queue to disable stealing */ + pushed = embb_mtapi_task_queue_push( + scheduler->worker_contexts[ii].private_queue[task->attributes.priority], + task); + } - if (pushed) { - /* signal the worker thread a task was pushed to */ - if (embb_atomic_load_int(&scheduler->worker_contexts[ii].is_sleeping)) { - embb_condition_notify_one( - &scheduler->worker_contexts[ii].work_available); - } - } else { - /* task could not be launched */ - embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); + if (pushed) { + /* signal the worker thread a task was pushed to */ + if (embb_atomic_load_int(&scheduler->worker_contexts[ii].is_sleeping)) { + embb_condition_notify_one( + &scheduler->worker_contexts[ii].work_available); } + } else { + /* task could not be launched */ + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); } #ifdef EMBB_HARD_REALTIME - } + } #endif /*EMBB_HARD_REALTIME*/ + } return pushed; }