Commit f0088aa7 by Winter

mtapi_c: changed task state attribute to atomic

mtapi_network_c: signaling of task completion is now atomic
parent 35ddd0d0
...@@ -302,7 +302,7 @@ int embb_mtapi_scheduler_worker(void * arg) { ...@@ -302,7 +302,7 @@ int embb_mtapi_scheduler_worker(void * arg) {
node->queue_pool, task->queue); node->queue_pool, task->queue);
} }
switch (task->state) { switch (embb_atomic_load_int(&task->state)) {
case MTAPI_TASK_SCHEDULED: case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */ /* multi-instance task, another instance might be running */
case MTAPI_TASK_RUNNING: case MTAPI_TASK_RUNNING:
...@@ -398,10 +398,11 @@ mtapi_boolean_t embb_mtapi_scheduler_wait_for_task( ...@@ -398,10 +398,11 @@ mtapi_boolean_t embb_mtapi_scheduler_wait_for_task(
node->scheduler); node->scheduler);
/* now wait and schedule new tasks if we are on a worker */ /* now wait and schedule new tasks if we are on a worker */
mtapi_task_state_t task_state = embb_atomic_load_int(&task->state);
while ( while (
(MTAPI_TASK_SCHEDULED == task->state) || (MTAPI_TASK_SCHEDULED == task_state) ||
(MTAPI_TASK_RUNNING == task->state) || (MTAPI_TASK_RUNNING == task_state) ||
(MTAPI_TASK_RETAINED == task->state) ) { (MTAPI_TASK_RETAINED == task_state) ) {
if (MTAPI_INFINITE < timeout) { if (MTAPI_INFINITE < timeout) {
embb_time_t current_time; embb_time_t current_time;
embb_time_now(&current_time); embb_time_now(&current_time);
...@@ -416,6 +417,8 @@ mtapi_boolean_t embb_mtapi_scheduler_wait_for_task( ...@@ -416,6 +417,8 @@ mtapi_boolean_t embb_mtapi_scheduler_wait_for_task(
node->scheduler, node->scheduler,
node, node,
context); context);
task_state = embb_atomic_load_int(&task->state);
} }
return MTAPI_TRUE; return MTAPI_TRUE;
......
...@@ -187,7 +187,7 @@ mtapi_task_state_t mtapi_context_taskstate_get( ...@@ -187,7 +187,7 @@ mtapi_task_state_t mtapi_context_taskstate_get(
&(task_context->thread_context->tss_id)); &(task_context->thread_context->tss_id));
if (local_context == task_context->thread_context) { if (local_context == task_context->thread_context) {
task_state = task_context->task->state; task_state = embb_atomic_load_int(&task_context->task->state);
local_status = MTAPI_SUCCESS; local_status = MTAPI_SUCCESS;
} else { } else {
local_status = MTAPI_ERR_CONTEXT_OUTOFCONTEXT; local_status = MTAPI_ERR_CONTEXT_OUTOFCONTEXT;
......
...@@ -79,7 +79,7 @@ void embb_mtapi_task_initialize(embb_mtapi_task_t* that) { ...@@ -79,7 +79,7 @@ void embb_mtapi_task_initialize(embb_mtapi_task_t* that) {
that->action.id = EMBB_MTAPI_IDPOOL_INVALID_ID; that->action.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
that->job.id = EMBB_MTAPI_IDPOOL_INVALID_ID; that->job.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
that->state = MTAPI_TASK_ERROR; embb_atomic_store_int(&that->state, MTAPI_TASK_ERROR);
that->task_id = MTAPI_TASK_ID_NONE; that->task_id = MTAPI_TASK_ID_NONE;
that->group.id = EMBB_MTAPI_IDPOOL_INVALID_ID; that->group.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
that->queue.id = EMBB_MTAPI_IDPOOL_INVALID_ID; that->queue.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
...@@ -159,7 +159,7 @@ void embb_mtapi_task_set_state( ...@@ -159,7 +159,7 @@ void embb_mtapi_task_set_state(
assert(MTAPI_NULL != that); assert(MTAPI_NULL != that);
embb_spin_lock(&that->state_lock); embb_spin_lock(&that->state_lock);
that->state = state; embb_atomic_store_int(&that->state, state);
embb_atomic_memory_barrier(); embb_atomic_memory_barrier();
embb_spin_unlock(&that->state_lock); embb_spin_unlock(&that->state_lock);
} }
......
...@@ -66,7 +66,7 @@ struct embb_mtapi_task_struct { ...@@ -66,7 +66,7 @@ struct embb_mtapi_task_struct {
mtapi_action_hndl_t action; mtapi_action_hndl_t action;
embb_spinlock_t state_lock; embb_spinlock_t state_lock;
volatile mtapi_task_state_t state; embb_atomic_int state;
embb_atomic_unsigned_int current_instance; embb_atomic_unsigned_int current_instance;
embb_atomic_unsigned_int instances_todo; embb_atomic_unsigned_int instances_todo;
......
...@@ -377,7 +377,7 @@ static int embb_mtapi_network_thread(void * args) { ...@@ -377,7 +377,7 @@ static int embb_mtapi_network_thread(void * args) {
assert(err == results_size); assert(err == results_size);
local_task->error_code = (mtapi_status_t)task_status; local_task->error_code = (mtapi_status_t)task_status;
local_task->state = MTAPI_TASK_COMPLETED; embb_atomic_store_int(&local_task->state, MTAPI_TASK_COMPLETED);
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
/* is task associated with a group? */ /* is task associated with a group? */
...@@ -530,7 +530,7 @@ static void network_task_start( ...@@ -530,7 +530,7 @@ static void network_task_start(
assert(err == send_buf->size); assert(err == send_buf->size);
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
local_task->state = MTAPI_TASK_RUNNING; embb_atomic_store_int(&local_task->state, MTAPI_TASK_RUNNING);
embb_mtapi_network_buffer_clear(send_buf); embb_mtapi_network_buffer_clear(send_buf);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment