diff --git a/mtapi_c/src/embb_mtapi_scheduler_t.c b/mtapi_c/src/embb_mtapi_scheduler_t.c index a5ad6d3..8eb0f85 100644 --- a/mtapi_c/src/embb_mtapi_scheduler_t.c +++ b/mtapi_c/src/embb_mtapi_scheduler_t.c @@ -213,6 +213,102 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context( return context; } +mtapi_boolean_t embb_mtapi_scheduler_execute_task( + embb_mtapi_task_t * task, + embb_mtapi_node_t * node, + embb_mtapi_thread_context_t * thread_context) { + embb_mtapi_task_context_t task_context; + mtapi_boolean_t result = MTAPI_FALSE; + embb_mtapi_queue_t * local_queue = MTAPI_NULL; + embb_mtapi_group_t * local_group = MTAPI_NULL; + embb_mtapi_action_t * local_action = MTAPI_NULL; + + /* is task associated with a queue? */ + if (embb_mtapi_queue_pool_is_handle_valid( + node->queue_pool, task->queue)) { + local_queue = + embb_mtapi_queue_pool_get_storage_for_handle( + node->queue_pool, task->queue); + } + + /* is task associated with a group? */ + if (embb_mtapi_group_pool_is_handle_valid( + node->group_pool, task->group)) { + local_group = + embb_mtapi_group_pool_get_storage_for_handle( + node->group_pool, task->group); + } + + if (embb_mtapi_action_pool_is_handle_valid( + node->action_pool, task->action)) { + local_action = + embb_mtapi_action_pool_get_storage_for_handle( + node->action_pool, task->action); + } + + switch (embb_atomic_load_int(&task->state)) { + case MTAPI_TASK_SCHEDULED: + /* multi-instance task, another instance might be running */ + case MTAPI_TASK_RUNNING: + /* there was work, execute it */ + embb_mtapi_task_context_initialize_with_thread_context_and_task( + &task_context, thread_context, task); + if (embb_mtapi_task_execute(task, &task_context)) { + /* tell queue that a task is done */ + if (MTAPI_NULL != local_queue) { + embb_mtapi_queue_task_finished(local_queue); + } + } + result = MTAPI_TRUE; + break; + + case MTAPI_TASK_RETAINED: + /* put task into queue again for later execution */ + embb_mtapi_scheduler_schedule_task( + node->scheduler, task, 0); + /* yield, as there may be only retained tasks in the queue */ + embb_thread_yield(); + /* task is not done, so do not notify queue */ + break; + + case MTAPI_TASK_CANCELLED: + /* set return value to cancelled */ + task->error_code = MTAPI_ERR_ACTION_CANCELLED; + if (embb_atomic_fetch_and_add_unsigned_int( + &task->instances_todo, (unsigned int)-1) == 0) { + /* tell queue that a task is done */ + if (MTAPI_NULL != local_queue) { + embb_mtapi_queue_task_finished(local_queue); + } + if (MTAPI_NULL != local_group) { + embb_mtapi_task_queue_push(&local_group->queue, task); + } + } + if (MTAPI_NULL != local_action) { + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); + } + break; + + case MTAPI_TASK_COMPLETED: + case MTAPI_TASK_DELETED: + case MTAPI_TASK_WAITING: + case MTAPI_TASK_CREATED: + case MTAPI_TASK_PRENATAL: + case MTAPI_TASK_ERROR: + case MTAPI_TASK_INTENTIONALLY_UNUSED: + default: + /* do nothing, although this is an error */ + break; + } + + /* issue task complete callback if set */ + if (MTAPI_NULL != task->attributes.complete_func) { + task->attributes.complete_func(task->handle, MTAPI_NULL); + } + + return result; +} + void embb_mtapi_scheduler_execute_task_or_yield( embb_mtapi_scheduler_t * that, embb_mtapi_node_t * node, @@ -225,10 +321,7 @@ void embb_mtapi_scheduler_execute_task_or_yield( that, node, thread_context); /* if there was work, execute it */ if (MTAPI_NULL != new_task) { - embb_mtapi_task_context_t task_context; - embb_mtapi_task_context_initialize_with_thread_context_and_task( - &task_context, thread_context, new_task); - embb_mtapi_task_execute(new_task, &task_context); + embb_mtapi_scheduler_execute_task(new_task, node, thread_context); } else { embb_thread_yield(); } @@ -253,7 +346,6 @@ embb_mtapi_scheduler_worker_func(embb_mtapi_scheduler_t * that) { int embb_mtapi_scheduler_worker(void * arg) { embb_mtapi_thread_context_t * thread_context = (embb_mtapi_thread_context_t*)arg; - embb_mtapi_task_context_t task_context; embb_mtapi_node_t * node; embb_duration_t sleep_duration; int err; @@ -293,91 +385,8 @@ int embb_mtapi_scheduler_worker(void * arg) { node->scheduler, node, thread_context); /* check if there was work */ if (MTAPI_NULL != task) { - embb_mtapi_queue_t * local_queue = MTAPI_NULL; - embb_mtapi_group_t * local_group = MTAPI_NULL; - embb_mtapi_action_t * local_action = MTAPI_NULL; - - /* is task associated with a queue? */ - if (embb_mtapi_queue_pool_is_handle_valid( - node->queue_pool, task->queue)) { - local_queue = - embb_mtapi_queue_pool_get_storage_for_handle( - node->queue_pool, task->queue); - } - - /* is task associated with a group? */ - if (embb_mtapi_group_pool_is_handle_valid( - node->group_pool, task->group)) { - local_group = - embb_mtapi_group_pool_get_storage_for_handle( - node->group_pool, task->group); - } - - if (embb_mtapi_action_pool_is_handle_valid( - node->action_pool, task->action)) { - local_action = - embb_mtapi_action_pool_get_storage_for_handle( - node->action_pool, task->action); - } - - switch (embb_atomic_load_int(&task->state)) { - case MTAPI_TASK_SCHEDULED: - /* multi-instance task, another instance might be running */ - case MTAPI_TASK_RUNNING: - /* there was work, execute it */ - embb_mtapi_task_context_initialize_with_thread_context_and_task( - &task_context, thread_context, task); - if (embb_mtapi_task_execute(task, &task_context)) { - /* tell queue that a task is done */ - if (MTAPI_NULL != local_queue) { - embb_mtapi_queue_task_finished(local_queue); - } - } + if (embb_mtapi_scheduler_execute_task(task, node, thread_context)) { counter = 0; - break; - - case MTAPI_TASK_RETAINED: - /* put task into queue again for later execution */ - embb_mtapi_scheduler_schedule_task( - node->scheduler, task, 0); - /* yield, as there may be only retained tasks in the queue */ - embb_thread_yield(); - /* task is not done, so do not notify queue */ - break; - - case MTAPI_TASK_CANCELLED: - /* set return value to cancelled */ - task->error_code = MTAPI_ERR_ACTION_CANCELLED; - if (embb_atomic_fetch_and_add_unsigned_int( - &task->instances_todo, (unsigned int)-1) == 0) { - /* tell queue that a task is done */ - if (MTAPI_NULL != local_queue) { - embb_mtapi_queue_task_finished(local_queue); - } - if (MTAPI_NULL != local_group) { - embb_mtapi_task_queue_push(&local_group->queue, task); - } - } - if (MTAPI_NULL != local_action) { - embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); - } - break; - - case MTAPI_TASK_COMPLETED: - case MTAPI_TASK_DELETED: - case MTAPI_TASK_WAITING: - case MTAPI_TASK_CREATED: - case MTAPI_TASK_PRENATAL: - case MTAPI_TASK_ERROR: - case MTAPI_TASK_INTENTIONALLY_UNUSED: - default: - /* do nothing, although this is an error */ - break; - } - - /* issue task complete callback if set */ - if (MTAPI_NULL != task->attributes.complete_func) { - task->attributes.complete_func(task->handle, MTAPI_NULL); } } else if (counter < 1024) { /* spin and yield for a while before going to sleep */ diff --git a/mtapi_c/src/embb_mtapi_scheduler_t.h b/mtapi_c/src/embb_mtapi_scheduler_t.h index ac94a0d..c56688b 100644 --- a/mtapi_c/src/embb_mtapi_scheduler_t.h +++ b/mtapi_c/src/embb_mtapi_scheduler_t.h @@ -138,6 +138,15 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context( embb_mtapi_scheduler_t * that); /** + * Executes the given task if the thread context is valid. + * \memberof embb_mtapi_scheduler_struct + */ +mtapi_boolean_t embb_mtapi_scheduler_execute_task( + embb_mtapi_task_t * task, + embb_mtapi_node_t * node, + embb_mtapi_thread_context_t * thread_context); + +/** * Fetches and executes a single task if the thread context is valid, * yields otherwise. * \memberof embb_mtapi_scheduler_struct