Commit abcc8801 by Tobias Langer

Endless loop bug fix.

On releasing all resources, the node waits for the completion of all
tasks which are associated to a specific action. Since we never
incremented the task counter for actions, this loop spun.
parent 1ba533bb
......@@ -658,76 +658,85 @@ mtapi_boolean_t embb_mtapi_scheduler_schedule_task(
assert(MTAPI_NULL != node);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
#ifdef EMBB_HARD_REALTIME
if (scheduler->mode == GLOBAL_EDF) {
embb_mtapi_task_queue_push(&node->global_task_queue, task);
pushed = MTAPI_TRUE;
} else {
#endif /*EMBB_HARD_REALTIME*/
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
embb_mtapi_queue_t* local_queue = MTAPI_NULL;
if (scheduler->mode == GLOBAL_EDF) {
/* fetch action and schedule */
embb_mtapi_action_t* local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
/* one more task in flight for this action */
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
mtapi_affinity_t affinity =
local_action->attributes.affinity & task->attributes.affinity;
/* check if task is running from an ordered queue */
if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, task->queue)) {
local_queue = embb_mtapi_queue_pool_get_storage_for_handle(
node->queue_pool, task->queue);
if (local_queue->attributes.ordered) {
/* yes, modify affinity accordingly */
affinity = local_queue->ordered_affinity;
}
pushed = embb_mtapi_task_queue_push(&node->global_task_queue, task);
if (!pushed) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
} else {
#endif /*EMBB_HARD_REALTIME*/
embb_mtapi_queue_t* local_queue = MTAPI_NULL;
/* fetch action and schedule */
embb_mtapi_action_t* local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
/* check affinity */
if (affinity == 0) {
affinity = node->affinity_all;
mtapi_affinity_t affinity =
local_action->attributes.affinity & task->attributes.affinity;
/* check if task is running from an ordered queue */
if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, task->queue)) {
local_queue = embb_mtapi_queue_pool_get_storage_for_handle(
node->queue_pool, task->queue);
if (local_queue->attributes.ordered) {
/* yes, modify affinity accordingly */
affinity = local_queue->ordered_affinity;
}
}
/* one more task in flight for this action */
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
/* check affinity */
if (affinity == 0) {
affinity = node->affinity_all;
}
if (affinity == node->affinity_all) {
/* no affinity restrictions, schedule for stealing */
pushed = embb_mtapi_task_queue_push(
scheduler->worker_contexts[ii].queue[task->attributes.priority],
task);
} else {
mtapi_status_t affinity_status;
/* affinity is restricted, check and adapt scheduling target */
ii = (mtapi_uint_t)embb_atomic_fetch_and_add_int(
&scheduler->affine_task_counter, 1);
while (MTAPI_FALSE == mtapi_affinity_get(
&affinity, ii, &affinity_status)) {
ii = (ii + 1) % scheduler->worker_count;
}
/* schedule into private queue to disable stealing */
pushed = embb_mtapi_task_queue_push(
scheduler->worker_contexts[ii].private_queue[task->attributes.priority],
task);
/* one more task in flight for this action */
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
if (affinity == node->affinity_all) {
/* no affinity restrictions, schedule for stealing */
pushed = embb_mtapi_task_queue_push(
scheduler->worker_contexts[ii].queue[task->attributes.priority],
task);
} else {
mtapi_status_t affinity_status;
/* affinity is restricted, check and adapt scheduling target */
ii = (mtapi_uint_t)embb_atomic_fetch_and_add_int(
&scheduler->affine_task_counter, 1);
while (MTAPI_FALSE == mtapi_affinity_get(
&affinity, ii, &affinity_status)) {
ii = (ii + 1) % scheduler->worker_count;
}
/* schedule into private queue to disable stealing */
pushed = embb_mtapi_task_queue_push(
scheduler->worker_contexts[ii].private_queue[task->attributes.priority],
task);
}
if (pushed) {
/* signal the worker thread a task was pushed to */
if (embb_atomic_load_int(&scheduler->worker_contexts[ii].is_sleeping)) {
embb_condition_notify_one(
&scheduler->worker_contexts[ii].work_available);
}
} else {
/* task could not be launched */
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
if (pushed) {
/* signal the worker thread a task was pushed to */
if (embb_atomic_load_int(&scheduler->worker_contexts[ii].is_sleeping)) {
embb_condition_notify_one(
&scheduler->worker_contexts[ii].work_available);
}
} else {
/* task could not be launched */
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
#ifdef EMBB_HARD_REALTIME
}
}
#endif /*EMBB_HARD_REALTIME*/
}
return pushed;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment