Commit 6b97196b by Marcus Winter

mtapi_c: fixed handling of tasks executed during a wait operation

parent 36b43e9f
......@@ -213,6 +213,102 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context(
return context;
}
mtapi_boolean_t embb_mtapi_scheduler_execute_task(
embb_mtapi_task_t * task,
embb_mtapi_node_t * node,
embb_mtapi_thread_context_t * thread_context) {
embb_mtapi_task_context_t task_context;
mtapi_boolean_t result = MTAPI_FALSE;
embb_mtapi_queue_t * local_queue = MTAPI_NULL;
embb_mtapi_group_t * local_group = MTAPI_NULL;
embb_mtapi_action_t * local_action = MTAPI_NULL;
/* is task associated with a queue? */
if (embb_mtapi_queue_pool_is_handle_valid(
node->queue_pool, task->queue)) {
local_queue =
embb_mtapi_queue_pool_get_storage_for_handle(
node->queue_pool, task->queue);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, task->group)) {
local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, task->group);
}
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
}
switch (embb_atomic_load_int(&task->state)) {
case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
case MTAPI_TASK_RUNNING:
/* there was work, execute it */
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, task);
if (embb_mtapi_task_execute(task, &task_context)) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
}
result = MTAPI_TRUE;
break;
case MTAPI_TASK_RETAINED:
/* put task into queue again for later execution */
embb_mtapi_scheduler_schedule_task(
node->scheduler, task, 0);
/* yield, as there may be only retained tasks in the queue */
embb_thread_yield();
/* task is not done, so do not notify queue */
break;
case MTAPI_TASK_CANCELLED:
/* set return value to cancelled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
if (MTAPI_NULL != local_group) {
embb_mtapi_task_queue_push(&local_group->queue, task);
}
}
if (MTAPI_NULL != local_action) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
break;
case MTAPI_TASK_COMPLETED:
case MTAPI_TASK_DELETED:
case MTAPI_TASK_WAITING:
case MTAPI_TASK_CREATED:
case MTAPI_TASK_PRENATAL:
case MTAPI_TASK_ERROR:
case MTAPI_TASK_INTENTIONALLY_UNUSED:
default:
/* do nothing, although this is an error */
break;
}
/* issue task complete callback if set */
if (MTAPI_NULL != task->attributes.complete_func) {
task->attributes.complete_func(task->handle, MTAPI_NULL);
}
return result;
}
void embb_mtapi_scheduler_execute_task_or_yield(
embb_mtapi_scheduler_t * that,
embb_mtapi_node_t * node,
......@@ -225,10 +321,7 @@ void embb_mtapi_scheduler_execute_task_or_yield(
that, node, thread_context);
/* if there was work, execute it */
if (MTAPI_NULL != new_task) {
embb_mtapi_task_context_t task_context;
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, new_task);
embb_mtapi_task_execute(new_task, &task_context);
embb_mtapi_scheduler_execute_task(new_task, node, thread_context);
} else {
embb_thread_yield();
}
......@@ -253,7 +346,6 @@ embb_mtapi_scheduler_worker_func(embb_mtapi_scheduler_t * that) {
int embb_mtapi_scheduler_worker(void * arg) {
embb_mtapi_thread_context_t * thread_context =
(embb_mtapi_thread_context_t*)arg;
embb_mtapi_task_context_t task_context;
embb_mtapi_node_t * node;
embb_duration_t sleep_duration;
int err;
......@@ -293,91 +385,8 @@ int embb_mtapi_scheduler_worker(void * arg) {
node->scheduler, node, thread_context);
/* check if there was work */
if (MTAPI_NULL != task) {
embb_mtapi_queue_t * local_queue = MTAPI_NULL;
embb_mtapi_group_t * local_group = MTAPI_NULL;
embb_mtapi_action_t * local_action = MTAPI_NULL;
/* is task associated with a queue? */
if (embb_mtapi_queue_pool_is_handle_valid(
node->queue_pool, task->queue)) {
local_queue =
embb_mtapi_queue_pool_get_storage_for_handle(
node->queue_pool, task->queue);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, task->group)) {
local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, task->group);
}
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
}
switch (embb_atomic_load_int(&task->state)) {
case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
case MTAPI_TASK_RUNNING:
/* there was work, execute it */
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, task);
if (embb_mtapi_task_execute(task, &task_context)) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
}
if (embb_mtapi_scheduler_execute_task(task, node, thread_context)) {
counter = 0;
break;
case MTAPI_TASK_RETAINED:
/* put task into queue again for later execution */
embb_mtapi_scheduler_schedule_task(
node->scheduler, task, 0);
/* yield, as there may be only retained tasks in the queue */
embb_thread_yield();
/* task is not done, so do not notify queue */
break;
case MTAPI_TASK_CANCELLED:
/* set return value to cancelled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
if (MTAPI_NULL != local_group) {
embb_mtapi_task_queue_push(&local_group->queue, task);
}
}
if (MTAPI_NULL != local_action) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
break;
case MTAPI_TASK_COMPLETED:
case MTAPI_TASK_DELETED:
case MTAPI_TASK_WAITING:
case MTAPI_TASK_CREATED:
case MTAPI_TASK_PRENATAL:
case MTAPI_TASK_ERROR:
case MTAPI_TASK_INTENTIONALLY_UNUSED:
default:
/* do nothing, although this is an error */
break;
}
/* issue task complete callback if set */
if (MTAPI_NULL != task->attributes.complete_func) {
task->attributes.complete_func(task->handle, MTAPI_NULL);
}
} else if (counter < 1024) {
/* spin and yield for a while before going to sleep */
......
......@@ -138,6 +138,15 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context(
embb_mtapi_scheduler_t * that);
/**
* Executes the given task if the thread context is valid.
* \memberof embb_mtapi_scheduler_struct
*/
mtapi_boolean_t embb_mtapi_scheduler_execute_task(
embb_mtapi_task_t * task,
embb_mtapi_node_t * node,
embb_mtapi_thread_context_t * thread_context);
/**
* Fetches and executes a single task if the thread context is valid,
* yields otherwise.
* \memberof embb_mtapi_scheduler_struct
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment