diff --git a/mtapi_c/src/embb_mtapi_scheduler_t.c b/mtapi_c/src/embb_mtapi_scheduler_t.c index 844321d..d7549c6 100644 --- a/mtapi_c/src/embb_mtapi_scheduler_t.c +++ b/mtapi_c/src/embb_mtapi_scheduler_t.c @@ -40,6 +40,7 @@ #include #include #include +#include /* ---- CLASS MEMBERS ------------------------------------------------------ */ @@ -293,6 +294,8 @@ int embb_mtapi_scheduler_worker(void * arg) { /* check if there was work */ if (MTAPI_NULL != task) { embb_mtapi_queue_t * local_queue = MTAPI_NULL; + embb_mtapi_group_t * local_group = MTAPI_NULL; + embb_mtapi_action_t * local_action = MTAPI_NULL; /* is task associated with a queue? */ if (embb_mtapi_queue_pool_is_handle_valid( @@ -302,6 +305,21 @@ int embb_mtapi_scheduler_worker(void * arg) { node->queue_pool, task->queue); } + /* is task associated with a group? */ + if (embb_mtapi_group_pool_is_handle_valid( + node->group_pool, task->group)) { + local_group = + embb_mtapi_group_pool_get_storage_for_handle( + node->group_pool, task->group); + } + + if (embb_mtapi_action_pool_is_handle_valid( + node->action_pool, task->action)) { + local_action = + embb_mtapi_action_pool_get_storage_for_handle( + node->action_pool, task->action); + } + switch (embb_atomic_load_int(&task->state)) { case MTAPI_TASK_SCHEDULED: /* multi-instance task, another instance might be running */ @@ -328,7 +346,7 @@ int embb_mtapi_scheduler_worker(void * arg) { break; case MTAPI_TASK_CANCELLED: - /* set return value to canceled */ + /* set return value to cancelled */ task->error_code = MTAPI_ERR_ACTION_CANCELLED; if (embb_atomic_fetch_and_add_unsigned_int( &task->instances_todo, (unsigned int)-1) == 0) { @@ -336,6 +354,12 @@ int embb_mtapi_scheduler_worker(void * arg) { if (MTAPI_NULL != local_queue) { embb_mtapi_queue_task_finished(local_queue); } + if (MTAPI_NULL != local_group) { + embb_mtapi_task_queue_push(&local_group->queue, task); + } + } + if (MTAPI_NULL != local_action) { + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); } break; diff --git a/mtapi_c/src/embb_mtapi_task_t.c b/mtapi_c/src/embb_mtapi_task_t.c index 4323a13..9abc3ca 100644 --- a/mtapi_c/src/embb_mtapi_task_t.c +++ b/mtapi_c/src/embb_mtapi_task_t.c @@ -501,7 +501,6 @@ void mtapi_task_cancel( if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) { embb_mtapi_task_t* local_task = embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task); - embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED); /* call plugin action cancel function */ if (embb_mtapi_action_pool_is_handle_valid( @@ -511,8 +510,14 @@ void mtapi_task_cancel( node->action_pool, local_task->action); if (local_action->is_plugin_action) { local_action->plugin_task_cancel_function(task, &local_status); + } else { + embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED); + local_task->error_code = MTAPI_ERR_ACTION_CANCELLED; + local_status = MTAPI_SUCCESS; } } else { + embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED); + local_task->error_code = MTAPI_ERR_ACTION_CANCELLED; local_status = MTAPI_SUCCESS; } } else { diff --git a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c index 73a5585..83638a9 100644 --- a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c +++ b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c @@ -71,8 +71,10 @@ void embb_mtapi_network_finalize() { } enum embb_mtapi_network_operation_enum { - EMBB_MTAPI_NETWORK_START_TASK, - EMBB_MTAPI_NETWORK_RETURN_RESULT + EMBB_MTAPI_NETWORK_START_TASK = 0x01AFFE01, + EMBB_MTAPI_NETWORK_RETURN_RESULT = 0x02AFFE02, + EMBB_MTAPI_NETWORK_RETURN_FAILURE = 0x03AFFE03, + EMBB_MTAPI_NETWORK_CANCEL_TASK = 0x04AFFE04 }; struct embb_mtapi_network_plugin_struct { @@ -84,6 +86,8 @@ struct embb_mtapi_network_plugin_struct { embb_mutex_t send_mutex; embb_mtapi_network_buffer_t send_buffer; + + embb_mtapi_network_buffer_t recv_buffer; }; typedef struct embb_mtapi_network_plugin_struct embb_mtapi_network_plugin_t; @@ -112,12 +116,41 @@ struct embb_mtapi_network_task_struct { typedef struct embb_mtapi_network_task_struct embb_mtapi_network_task_t; +static void embb_mtapi_network_return_failure( + int32_t remote_task_id, + int32_t remote_task_tag, + mtapi_status_t status, + embb_mtapi_network_socket_t * socket, + embb_mtapi_network_buffer_t * buffer) +{ + embb_mtapi_network_buffer_clear(buffer); + + // packet size + embb_mtapi_network_buffer_push_back_int32( + buffer, 16); + + // operation + embb_mtapi_network_buffer_push_back_int32( + buffer, EMBB_MTAPI_NETWORK_RETURN_FAILURE); + + // task handle + embb_mtapi_network_buffer_push_back_int32( + buffer, remote_task_id); + embb_mtapi_network_buffer_push_back_int32( + buffer, remote_task_tag); + + // status + embb_mtapi_network_buffer_push_back_int32( + buffer, (int32_t)status); + + embb_mtapi_network_socket_sendbuffer( + socket, buffer); +} + static void embb_mtapi_network_task_complete( MTAPI_IN mtapi_task_hndl_t task, MTAPI_OUT mtapi_status_t* status) { mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; - int err; - EMBB_UNUSED_IN_RELEASE(err); if (embb_mtapi_node_is_initialized()) { embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); @@ -138,37 +171,69 @@ static void embb_mtapi_network_task_complete( (embb_mtapi_network_task_t*)local_task->attributes.user_data; embb_mtapi_network_buffer_t * send_buf = &plugin->send_buffer; + embb_atomic_memory_barrier(); + local_task->attributes.complete_func = NULL; + embb_atomic_memory_barrier(); + // serialize sending of results embb_mutex_lock(&plugin->send_mutex); embb_mtapi_network_buffer_clear(send_buf); - // operation is "return result" - err = embb_mtapi_network_buffer_push_back_int8( - send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT); - assert(err == 1); - // remote task id - err = embb_mtapi_network_buffer_push_back_int32( - send_buf, network_task->remote_task_id); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_int32( - send_buf, network_task->remote_task_tag); - assert(err == 4); - // status - err = embb_mtapi_network_buffer_push_back_int32( - send_buf, local_task->error_code); - assert(err == 4); - // result size - err = embb_mtapi_network_buffer_push_back_int32( - send_buf, (int32_t)local_task->result_size); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_rawdata( - send_buf, (int32_t)local_task->result_size, - local_task->result_buffer); - assert(err == (int)local_task->result_size); + if (local_task->error_code == MTAPI_SUCCESS) { + // actual counts bytes actually put into the buffer + int actual = 0; + // expected counts bytes we intended to put into the buffer + int expected = + 4 + // operation + 4 + 4 + // remote task handle + 4 + // status + 4 + (int)local_task->result_size; // result buffer + + // packet size + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, expected); + expected += 4; + + // operation is "return result" + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT); + + // remote task id + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, network_task->remote_task_id); + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, network_task->remote_task_tag); + + // status + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, local_task->error_code); - err = embb_mtapi_network_socket_sendbuffer( - &network_task->socket, send_buf); - assert(err == send_buf->size); + // result size + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, (int32_t)local_task->result_size); + actual += embb_mtapi_network_buffer_push_back_rawdata( + send_buf, (int32_t)local_task->result_size, + local_task->result_buffer); + + if (expected == actual) { + int sent = embb_mtapi_network_socket_sendbuffer( + &network_task->socket, send_buf); + assert(sent == send_buf->size); + } + else { + embb_mtapi_network_return_failure( + network_task->remote_task_id, + network_task->remote_task_tag, + MTAPI_ERR_UNKNOWN, + &network_task->socket, send_buf); + } + } else { + embb_mtapi_network_return_failure( + network_task->remote_task_id, + network_task->remote_task_tag, + local_task->error_code, + &network_task->socket, send_buf); + } // sending done embb_mutex_unlock(&plugin->send_mutex); @@ -177,6 +242,14 @@ static void embb_mtapi_network_task_complete( embb_free((void*)local_task->arguments); embb_free(local_task->result_buffer); + void * data = local_task->attributes.user_data; + + embb_atomic_memory_barrier(); + local_task->attributes.user_data = NULL; + embb_atomic_memory_barrier(); + + embb_free(data); + local_status = MTAPI_SUCCESS; } } @@ -185,15 +258,333 @@ static void embb_mtapi_network_task_complete( mtapi_status_set(status, local_status); } +static mtapi_status_t embb_mtapi_network_handle_start_task( + embb_mtapi_network_socket_t * socket, + embb_mtapi_network_buffer_t * buffer, + int packet_size) { + + int32_t domain_id; + int32_t job_id; + int32_t results_size; + void * results; + int err; + + int32_t arguments_size; + int32_t remote_task_id; + int32_t remote_task_tag; + mtapi_uint_t priority = 0; + mtapi_job_hndl_t job_hndl; + mtapi_task_attributes_t task_attr; + void * arguments; + mtapi_task_complete_function_t func = embb_mtapi_network_task_complete; + void * func_void; + mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; + + // check if we have at least 28 bytes + if (packet_size >= 28) { + + // domain id + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &domain_id); + assert(err == 4); + // job id + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &job_id); + assert(err == 4); + // priority + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, (int32_t*)&priority); + assert(err == 4); + // remote task handle + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &remote_task_id); + assert(err == 4); + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &remote_task_tag); + assert(err == 4); + // result size + err = embb_mtapi_network_buffer_pop_front_int32(buffer, + &results_size); + assert(err == 4); + // arguments size + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &arguments_size); + assert(err == 4); + + embb_mtapi_network_task_t * network_task = + (embb_mtapi_network_task_t*)embb_alloc( + sizeof(embb_mtapi_network_task_t)); + + if (network_task == NULL) { + embb_mtapi_network_return_failure( + remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN, + socket, buffer); + return MTAPI_ERR_UNKNOWN; + } + network_task->remote_task_id = remote_task_id; + network_task->remote_task_tag = remote_task_tag; + + // check packet_size again + if (packet_size == 28 + arguments_size) { + // allocate buffers + results = embb_alloc((size_t)results_size); + if (results == NULL) { + embb_mtapi_network_return_failure( + remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN, + socket, buffer); + return MTAPI_ERR_UNKNOWN; + } + arguments = embb_alloc((size_t)arguments_size); + if (arguments == NULL) { + embb_free(results); + embb_mtapi_network_return_failure( + remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN, + socket, buffer); + return MTAPI_ERR_UNKNOWN; + } + + // arguments + err = embb_mtapi_network_buffer_pop_front_rawdata( + buffer, arguments_size, arguments); + assert(err == arguments_size); + + network_task->socket = *socket; + mtapi_taskattr_init(&task_attr, &local_status); + assert(local_status == MTAPI_SUCCESS); + mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA, + (void*)network_task, 0, &local_status); + assert(local_status == MTAPI_SUCCESS); + mtapi_boolean_t task_detached = MTAPI_TRUE; + mtapi_taskattr_set(&task_attr, MTAPI_TASK_DETACHED, + (void*)&task_detached, sizeof(mtapi_boolean_t), &local_status); + assert(local_status == MTAPI_SUCCESS); + mtapi_taskattr_set(&task_attr, MTAPI_TASK_PRIORITY, + (void*)&priority, sizeof(mtapi_uint_t), &local_status); + assert(local_status == MTAPI_SUCCESS); + memcpy(&func_void, &func, sizeof(void*)); + mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION, + func_void, 0, &local_status); + assert(local_status == MTAPI_SUCCESS); + job_hndl = mtapi_job_get((mtapi_job_id_t)job_id, + (mtapi_domain_t)domain_id, &local_status); + if (local_status == MTAPI_SUCCESS) { + mtapi_task_start( + MTAPI_TASK_ID_NONE, job_hndl, + arguments, (mtapi_size_t)arguments_size, + results, (mtapi_size_t)results_size, + &task_attr, MTAPI_GROUP_NONE, + &local_status); + } + if (local_status != MTAPI_SUCCESS) { + embb_free(arguments); + embb_free(results); + embb_mtapi_network_return_failure( + remote_task_id, remote_task_tag, local_status, socket, buffer); + } + } + } + + return local_status; +} + +static mtapi_status_t embb_mtapi_network_handle_return_result( + embb_mtapi_network_buffer_t * buffer, + int packet_size) { + + int32_t task_status; + int32_t task_id; + int32_t task_tag; + + int32_t results_size; + int err; + mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; + + if (embb_mtapi_node_is_initialized()) { + embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); + mtapi_task_hndl_t task; + + // do we have at least 16 bytes? + if (packet_size >= 16) { + + // local task id + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id); + assert(err == 4); + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_tag); + assert(err == 4); + // task status + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &task_status); + assert(err == 4); + // result size + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &results_size); + assert(err == 4); + + // check packet_size again + if (packet_size == 16 + results_size) { + task.id = (mtapi_task_id_t)task_id; + task.tag = (mtapi_uint_t)task_tag; + + if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) { + embb_mtapi_task_t * local_task = + embb_mtapi_task_pool_get_storage_for_handle( + node->task_pool, task); + + if (embb_mtapi_action_pool_is_handle_valid( + node->action_pool, local_task->action)) { + embb_mtapi_action_t * local_action = + embb_mtapi_action_pool_get_storage_for_handle( + node->action_pool, local_task->action); + + /* not needed right now + embb_mtapi_network_action_t * network_action = + (embb_mtapi_network_action_t*)local_action->plugin_data;*/ + + err = embb_mtapi_network_buffer_pop_front_rawdata( + buffer, results_size, local_task->result_buffer); + assert(err == results_size); + + local_task->error_code = (mtapi_status_t)task_status; + embb_atomic_store_int(&local_task->state, MTAPI_TASK_COMPLETED); + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); + + /* is task associated with a group? */ + if (embb_mtapi_group_pool_is_handle_valid( + node->group_pool, local_task->group)) { + embb_mtapi_group_t* local_group = + embb_mtapi_group_pool_get_storage_for_handle( + node->group_pool, local_task->group); + embb_mtapi_task_queue_push(&local_group->queue, local_task); + } + + local_status = MTAPI_SUCCESS; + } + } + + } + + } + } + + return local_status; +} + +static mtapi_status_t embb_mtapi_network_handle_return_failure( + embb_mtapi_network_buffer_t * buffer, + int packet_size) { + + int32_t task_status; + int32_t task_id; + int32_t task_tag; + + int err; + mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; + + if (embb_mtapi_node_is_initialized()) { + embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); + mtapi_task_hndl_t task; + + // do we have 12 bytes? + if (packet_size == 12) { + + // local task id + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id); + assert(err == 4); + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_tag); + assert(err == 4); + // task status + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &task_status); + assert(err == 4); + + task.id = (mtapi_task_id_t)task_id; + task.tag = (mtapi_uint_t)task_tag; + + if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) { + embb_mtapi_task_t * local_task = + embb_mtapi_task_pool_get_storage_for_handle( + node->task_pool, task); + + if (embb_mtapi_action_pool_is_handle_valid( + node->action_pool, local_task->action)) { + embb_mtapi_action_t * local_action = + embb_mtapi_action_pool_get_storage_for_handle( + node->action_pool, local_task->action); + + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); + local_task->error_code = (mtapi_status_t)task_status; + if (MTAPI_ERR_ACTION_CANCELLED == task_status) { + embb_atomic_store_int(&local_task->state, MTAPI_TASK_CANCELLED); + } else { + embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR); + } + + /* is task associated with a group? */ + if (embb_mtapi_group_pool_is_handle_valid( + node->group_pool, local_task->group)) { + embb_mtapi_group_t* local_group = + embb_mtapi_group_pool_get_storage_for_handle( + node->group_pool, local_task->group); + embb_mtapi_task_queue_push(&local_group->queue, local_task); + } + + local_status = MTAPI_SUCCESS; + } + + } + + } + } + + return local_status; +} + +static mtapi_status_t embb_mtapi_network_handle_cancel_task( + embb_mtapi_network_buffer_t * buffer, + int packet_size) { + + mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; + int32_t remote_task_id; + int32_t remote_task_tag; + int err; + EMBB_UNUSED_IN_RELEASE(err); + + // do we have 8 bytes? + if (packet_size == 8) { + // get task handle + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &remote_task_id); + assert(err == 4); + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &remote_task_tag); + assert(err == 4); + + if (embb_mtapi_node_is_initialized()) { + embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); + + // search for task to cancel + for (mtapi_uint_t ii = 0; ii < node->attributes.max_tasks; ii++) { + embb_mtapi_task_t * task = &node->task_pool->storage[ii]; + // is this our task? + if (embb_mtapi_network_task_complete == task->attributes.complete_func) { + embb_mtapi_network_task_t * network_task = + (embb_mtapi_network_task_t*)task->attributes.user_data; + // is this task the one matching the given remote task? + if (remote_task_id == network_task->remote_task_id && + remote_task_tag == network_task->remote_task_tag) { + mtapi_task_cancel(task->handle, &local_status); + break; + } + } + } + } + } + + return local_status; +} + static int embb_mtapi_network_thread(void * args) { embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin; - embb_mtapi_network_buffer_t buffer; + embb_mtapi_network_buffer_t * buffer = &plugin->recv_buffer; int err; EMBB_UNUSED(args); - embb_mtapi_network_buffer_initialize(&buffer, (int)plugin->buffer_size); - while (embb_atomic_load_int(&plugin->run)) { err = embb_mtapi_network_socket_select( plugin->sockets, plugin->socket_count, 100); @@ -208,197 +599,53 @@ static int embb_mtapi_network_thread(void * args) { plugin->socket_count++; } } else if (0 < err) { - int32_t domain_id; - int32_t job_id; - int32_t results_size; - void * results; - int8_t operation; + int32_t operation; + int32_t packet_size; embb_mtapi_network_socket_t * socket = &plugin->sockets[err]; - embb_mtapi_network_buffer_clear(&buffer); + embb_mtapi_network_buffer_clear(buffer); err = embb_mtapi_network_socket_recvbuffer_sized( - socket, &buffer, 1); - if (err == 0) { - // there was some socket error, ignore - continue; - } - assert(err == 1); - err = embb_mtapi_network_buffer_pop_front_int8( - &buffer, &operation); - assert(err == 1); - - embb_mtapi_network_buffer_clear(&buffer); - - if (operation == EMBB_MTAPI_NETWORK_START_TASK) { - int32_t arguments_size; - mtapi_uint_t priority = 0; - mtapi_job_hndl_t job_hndl; - mtapi_task_attributes_t task_attr; - void * arguments; - mtapi_task_complete_function_t func = embb_mtapi_network_task_complete; - void * func_void; - embb_mtapi_network_task_t * network_task = - (embb_mtapi_network_task_t*)embb_alloc( - sizeof(embb_mtapi_network_task_t)); - mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; - - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, &buffer, 28); - assert(err == 28); - // domain id - err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &domain_id); - assert(err == 4); - // job id - err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &job_id); - assert(err == 4); - // priority - err = embb_mtapi_network_buffer_pop_front_int32( - &buffer, (int32_t*)&priority); - assert(err == 4); - // remote task handle - err = embb_mtapi_network_buffer_pop_front_int32( - &buffer, &network_task->remote_task_id); - assert(err == 4); + socket, buffer, 4); + if (err == 4) { err = embb_mtapi_network_buffer_pop_front_int32( - &buffer, &network_task->remote_task_tag); + buffer, &packet_size); assert(err == 4); - // result size - err = embb_mtapi_network_buffer_pop_front_int32(&buffer, - &results_size); - assert(err == 4); - results = embb_alloc((size_t)results_size); - assert(results != NULL); - // arguments size - embb_mtapi_network_buffer_pop_front_int32(&buffer, &arguments_size); - assert(err == 4); - arguments = embb_alloc((size_t)arguments_size); - assert(arguments != NULL); - - embb_mtapi_network_buffer_clear(&buffer); - // arguments + embb_mtapi_network_buffer_clear(buffer); err = embb_mtapi_network_socket_recvbuffer_sized( - socket, &buffer, arguments_size); - assert(err == arguments_size); - err = embb_mtapi_network_buffer_pop_front_rawdata( - &buffer, arguments_size, arguments); - assert(err == arguments_size); - - embb_mtapi_network_buffer_clear(&buffer); - - network_task->socket = *socket; - mtapi_taskattr_init(&task_attr, &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA, - (void*)network_task, 0, &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_boolean_t task_detached = MTAPI_TRUE; - mtapi_taskattr_set(&task_attr, MTAPI_TASK_DETACHED, - (void*)&task_detached, sizeof(mtapi_boolean_t), &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_taskattr_set(&task_attr, MTAPI_TASK_PRIORITY, - (void*)&priority, sizeof(mtapi_uint_t), &local_status); - assert(local_status == MTAPI_SUCCESS); - memcpy(&func_void, &func, sizeof(void*)); - mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION, - func_void, 0, &local_status); - assert(local_status == MTAPI_SUCCESS); - job_hndl = mtapi_job_get((mtapi_job_id_t)job_id, - (mtapi_domain_t)domain_id, &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_task_start( - MTAPI_TASK_ID_NONE, job_hndl, - arguments, (mtapi_size_t)arguments_size, - results, (mtapi_size_t)results_size, - &task_attr, MTAPI_GROUP_NONE, - &local_status); - assert(local_status == MTAPI_SUCCESS); - - // send back result of task creation - //embb_mtapi_network_buffer_push_back_int32( - // &buffer, local_status); - //embb_mtapi_network_socket_sendbuffer( - // socket, &buffer); - - embb_mtapi_network_buffer_clear(&buffer); - } else if (operation == EMBB_MTAPI_NETWORK_RETURN_RESULT) { - int task_status; - int task_id; - int task_tag; - - embb_mtapi_network_buffer_clear(&buffer); - - if (embb_mtapi_node_is_initialized()) { - embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); - mtapi_task_hndl_t task; - - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, &buffer, 16); - assert(err == 16); - // local task id - err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_id); - assert(err == 4); - err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_tag); - assert(err == 4); - // task status - err = embb_mtapi_network_buffer_pop_front_int32( - &buffer, &task_status); - assert(err == 4); - // result size + socket, buffer, packet_size); + if (err == packet_size) { err = embb_mtapi_network_buffer_pop_front_int32( - &buffer, &results_size); + buffer, &operation); assert(err == 4); - - embb_mtapi_network_buffer_clear(&buffer); - - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, &buffer, results_size); - assert(err == results_size); - - task.id = (mtapi_task_id_t)task_id; - task.tag = (mtapi_uint_t)task_tag; - - if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) { - embb_mtapi_task_t * local_task = - embb_mtapi_task_pool_get_storage_for_handle( - node->task_pool, task); - - if (embb_mtapi_action_pool_is_handle_valid( - node->action_pool, local_task->action)) { - embb_mtapi_action_t * local_action = - embb_mtapi_action_pool_get_storage_for_handle( - node->action_pool, local_task->action); - - /* not needed right now - embb_mtapi_network_action_t * network_action = - (embb_mtapi_network_action_t*)local_action->plugin_data;*/ - - err = embb_mtapi_network_buffer_pop_front_rawdata( - &buffer, results_size, local_task->result_buffer); - assert(err == results_size); - - local_task->error_code = (mtapi_status_t)task_status; - embb_atomic_store_int(&local_task->state, MTAPI_TASK_COMPLETED); - embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); - - /* is task associated with a group? */ - if (embb_mtapi_group_pool_is_handle_valid( - node->group_pool, local_task->group)) { - embb_mtapi_group_t* local_group = - embb_mtapi_group_pool_get_storage_for_handle( - node->group_pool, local_task->group); - embb_mtapi_task_queue_push(&local_group->queue, local_task); - } - } + packet_size -= 4; + + switch (operation) { + case EMBB_MTAPI_NETWORK_START_TASK: + embb_mtapi_network_handle_start_task(socket, buffer, packet_size); + break; + case EMBB_MTAPI_NETWORK_RETURN_RESULT: + embb_mtapi_network_handle_return_result(buffer, packet_size); + break; + case EMBB_MTAPI_NETWORK_RETURN_FAILURE: + embb_mtapi_network_handle_return_failure(buffer, packet_size); + break; + case EMBB_MTAPI_NETWORK_CANCEL_TASK: + embb_mtapi_network_handle_cancel_task(buffer, packet_size); + break; + default: + // invalid, ignore + break; } } } + + embb_mtapi_network_buffer_clear(buffer); + } } - embb_mtapi_network_buffer_finalize(&buffer); - return EMBB_SUCCESS; } @@ -408,42 +655,106 @@ void mtapi_network_plugin_initialize( MTAPI_IN mtapi_uint16_t max_connections, MTAPI_IN mtapi_size_t buffer_size, MTAPI_OUT mtapi_status_t* status) { - mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin; int err; + mtapi_status_set(status, MTAPI_ERR_UNKNOWN); + + plugin->socket_count = 0; + plugin->buffer_size = 0; + plugin->sockets = NULL; + embb_atomic_store_int(&plugin->run, 0); + err = embb_mtapi_network_initialize(); - if (err) { - embb_atomic_store_int(&plugin->run, 1); - plugin->buffer_size = buffer_size; - - plugin->socket_count = 1; - // 1 listening socket and max_connections connections - // (2 sockets each if local) - plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc( - sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2)); - - embb_mtapi_network_buffer_initialize( - &plugin->send_buffer, (int)plugin->buffer_size); - embb_mutex_init(&plugin->send_mutex, 0); - - if (NULL != plugin->sockets) { - err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]); - if (err) { - err = embb_mtapi_network_socket_bind_and_listen( - &plugin->sockets[0], host, port, max_connections); - if (err) { - err = embb_thread_create( - &plugin->thread, NULL, embb_mtapi_network_thread, NULL); - if (EMBB_SUCCESS == err) { - local_status = MTAPI_SUCCESS; - } - } - } - } + if (0 == err) return; + + err = embb_mtapi_network_buffer_initialize( + &plugin->recv_buffer, (int)buffer_size); + if (0 == err) { + embb_mtapi_network_finalize(); + return; } - mtapi_status_set(status, local_status); + err = embb_mtapi_network_buffer_initialize( + &plugin->send_buffer, (int)buffer_size); + if (0 == err) { + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + embb_mtapi_network_finalize(); + return; + } + + plugin->buffer_size = buffer_size; + + // 1 listening socket and max_connections connections + // (2 sockets each if local) + plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc( + sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2)); + if (NULL == plugin->sockets) { + embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + plugin->buffer_size = 0; + embb_mtapi_network_finalize(); + return; + } + + err = embb_mutex_init(&plugin->send_mutex, 0); + if (EMBB_SUCCESS != err) { + embb_free(plugin->sockets); + plugin->sockets = NULL; + embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + plugin->buffer_size = 0; + embb_mtapi_network_finalize(); + return; + } + + err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]); + if (0 == err) { + embb_mutex_destroy(&plugin->send_mutex); + embb_free(plugin->sockets); + plugin->sockets = NULL; + embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + plugin->buffer_size = 0; + embb_mtapi_network_finalize(); + return; + } + plugin->socket_count = 1; + + err = embb_mtapi_network_socket_bind_and_listen( + &plugin->sockets[0], host, port, max_connections); + if (0 == err) { + embb_mtapi_network_socket_finalize(&plugin->sockets[0]); + plugin->socket_count = 0; + embb_mutex_destroy(&plugin->send_mutex); + embb_free(plugin->sockets); + plugin->sockets = NULL; + embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + plugin->buffer_size = 0; + embb_mtapi_network_finalize(); + return; + } + + embb_atomic_store_int(&plugin->run, 1); + + err = embb_thread_create( + &plugin->thread, NULL, embb_mtapi_network_thread, NULL); + if (EMBB_SUCCESS != err) { + embb_atomic_store_int(&plugin->run, 0); + embb_mtapi_network_socket_finalize(&plugin->sockets[0]); + plugin->socket_count = 0; + embb_mutex_destroy(&plugin->send_mutex); + embb_free(plugin->sockets); + plugin->sockets = NULL; + embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + plugin->buffer_size = 0; + embb_mtapi_network_finalize(); + return; + } + + mtapi_status_set(status, MTAPI_SUCCESS); } void mtapi_network_plugin_finalize( @@ -458,6 +769,8 @@ void mtapi_network_plugin_finalize( embb_mutex_destroy(&plugin->send_mutex); embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + embb_mtapi_network_socket_finalize(&plugin->sockets[0]); embb_free(plugin->sockets); embb_mtapi_network_finalize(); @@ -468,9 +781,9 @@ void mtapi_network_plugin_finalize( static void network_task_start( MTAPI_IN mtapi_task_hndl_t task, MTAPI_OUT mtapi_status_t* status) { - mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; - int err; - EMBB_UNUSED_IN_RELEASE(err); + + // assume failure + mtapi_status_set(status, MTAPI_ERR_UNKNOWN); if (embb_mtapi_node_is_initialized()) { embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); @@ -491,69 +804,152 @@ static void network_task_start( // serialize sending embb_mutex_lock(&network_action->send_mutex); + embb_mtapi_network_buffer_clear(send_buf); + + // actual counts bytes actually put into the buffer + int actual = 0; + // expected counts bytes we intended to put into the buffer + int expected = + 4 + // operation + 4 + // domain_id + 4 + // job_id + 4 + // priority + 4 + 4 + // task handle + 4 + // result_size + 4 + local_task->arguments_size; // arguments buffer + + // packet size + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, (int32_t)expected); + expected += 4; // operation is "start task" - err = embb_mtapi_network_buffer_push_back_int8( + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, EMBB_MTAPI_NETWORK_START_TASK); - assert(err == 1); - err = embb_mtapi_network_buffer_push_back_int32( + // domain_id + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)network_action->domain_id); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_int32( + // job_id + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)network_action->job_id); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_int32( + // priority + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->attributes.priority); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_int32( + // task handle + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->handle.id); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_int32( + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->handle.tag); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_int32( + // result size + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->result_size); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_int32( + // arguments buffer + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->arguments_size); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_rawdata( + actual += embb_mtapi_network_buffer_push_back_rawdata( send_buf, (int32_t)local_task->arguments_size, local_task->arguments); - assert(err == (int)local_task->arguments_size); - err = embb_mtapi_network_socket_sendbuffer( - &network_action->socket, send_buf); - assert(err == send_buf->size); - - embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); - embb_atomic_store_int(&local_task->state, MTAPI_TASK_RUNNING); + // check if everything fit into the buffer + if (actual == expected) { + embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); + embb_atomic_store_int(&local_task->state, MTAPI_TASK_RUNNING); + int sent = embb_mtapi_network_socket_sendbuffer( + &network_action->socket, send_buf); + // was everything sent? + if (sent == send_buf->size) { + // we've done it, success! + mtapi_status_set(status, MTAPI_SUCCESS); + } else { + // could not send the whole task, this will fail on the remote side, + // so we can safely assume that the task is in error + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); + embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR); + } + } embb_mtapi_network_buffer_clear(send_buf); - embb_mutex_unlock(&network_action->send_mutex); - - local_status = MTAPI_SUCCESS; } } } - - mtapi_status_set(status, local_status); } static void network_task_cancel( MTAPI_IN mtapi_task_hndl_t task, MTAPI_OUT mtapi_status_t* status) { - mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; - EMBB_UNUSED(task); + // assume failure + mtapi_status_set(status, MTAPI_ERR_UNKNOWN); - mtapi_status_set(status, local_status); + if (embb_mtapi_node_is_initialized()) { + embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); + + if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) { + embb_mtapi_task_t * local_task = + embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task); + + if (embb_mtapi_action_pool_is_handle_valid( + node->action_pool, local_task->action)) { + embb_mtapi_action_t * local_action = + embb_mtapi_action_pool_get_storage_for_handle( + node->action_pool, local_task->action); + + embb_mtapi_network_action_t * network_action = + (embb_mtapi_network_action_t*)local_action->plugin_data; + embb_mtapi_network_buffer_t * send_buf = &network_action->send_buffer; + + // serialize sending + embb_mutex_lock(&network_action->send_mutex); + embb_mtapi_network_buffer_clear(send_buf); + + // actual counts bytes actually put into the buffer + int actual = 0; + // expected counts bytes we intended to put into the buffer + int expected = + 4 + // operation + 4 + 4; // task handle + + // packet size + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, (int32_t)expected); + expected += 4; + + // operation is "cancel task" + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, EMBB_MTAPI_NETWORK_CANCEL_TASK); + + // task handle + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, (int32_t)local_task->handle.id); + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, (int32_t)local_task->handle.tag); + + // check if everything fit into the buffer + if (actual == expected) { + int sent = embb_mtapi_network_socket_sendbuffer( + &network_action->socket, send_buf); + // was everything sent? + if (sent == send_buf->size) { + // we've done it, success! + mtapi_status_set(status, MTAPI_SUCCESS); + } else { + embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR); + } + } else { + embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR); + } + + embb_mtapi_network_buffer_clear(send_buf); + embb_mutex_unlock(&network_action->send_mutex); + } + } + } } static void network_action_finalize( @@ -602,34 +998,36 @@ mtapi_action_hndl_t mtapi_network_action_create( action->domain_id = domain_id; action->job_id = remote_job_id; - embb_mtapi_network_buffer_initialize( + err = embb_mtapi_network_buffer_initialize( &action->send_buffer, (int)plugin->buffer_size); - embb_mutex_init(&action->send_mutex, 0); - - action->host = host; - action->port = port; - embb_mtapi_network_socket_initialize(&action->socket); - err = embb_mtapi_network_socket_connect(&action->socket, host, port); - if (0 != err) { - // store socket for select - plugin->sockets[plugin->socket_count] = action->socket; - plugin->socket_count++; - - action_hndl = mtapi_ext_plugin_action_create( - local_job_id, - network_task_start, - network_task_cancel, - network_action_finalize, - action, - NULL, 0, // no node local data obviously - MTAPI_NULL, - &local_status); - } else { - embb_mutex_destroy(&action->send_mutex); - embb_mtapi_network_buffer_finalize(&action->send_buffer); - embb_mtapi_network_socket_finalize(&action->socket); - embb_free(action); + err = embb_mutex_init(&action->send_mutex, 0); + if (EMBB_SUCCESS == err) { + action->host = host; + action->port = port; + embb_mtapi_network_socket_initialize(&action->socket); + err = embb_mtapi_network_socket_connect(&action->socket, host, port); + if (0 != err) { + // store socket for select + plugin->sockets[plugin->socket_count] = action->socket; + plugin->socket_count++; + + action_hndl = mtapi_ext_plugin_action_create( + local_job_id, + network_task_start, + network_task_cancel, + network_action_finalize, + action, + NULL, 0, // no node local data obviously + MTAPI_NULL, + &local_status); + } else { + embb_mutex_destroy(&action->send_mutex); + embb_mtapi_network_buffer_finalize(&action->send_buffer); + embb_mtapi_network_socket_finalize(&action->socket); + embb_free(action); + } + } } } diff --git a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.c b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.c index 76ef1b0..61b0482 100644 --- a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.c +++ b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.c @@ -28,9 +28,10 @@ #include #include -void embb_mtapi_network_buffer_initialize( +int embb_mtapi_network_buffer_initialize( embb_mtapi_network_buffer_t * that, int capacity) { + int result = 1; that->position = 0; that->size = 0; that->data = (char*)embb_alloc((size_t)capacity); @@ -38,7 +39,9 @@ void embb_mtapi_network_buffer_initialize( that->capacity = capacity; } else { that->capacity = 0; + result = 0; } + return result; } void embb_mtapi_network_buffer_finalize( diff --git a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.h b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.h index 2d5c306..f4a6c11 100644 --- a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.h +++ b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.h @@ -43,7 +43,7 @@ struct embb_mtapi_network_buffer_struct { typedef struct embb_mtapi_network_buffer_struct embb_mtapi_network_buffer_t; -void embb_mtapi_network_buffer_initialize( +int embb_mtapi_network_buffer_initialize( embb_mtapi_network_buffer_t * that, int capacity ); diff --git a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_socket.c b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_socket.c index 4d203cf..bcb46ea 100644 --- a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_socket.c +++ b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_socket.c @@ -116,7 +116,8 @@ int embb_mtapi_network_socket_connect( if (SOCKET_ERROR == connect(that->handle, (struct sockaddr *)&addr, sizeof(addr))) { #ifdef _WIN32 - if (WSAEWOULDBLOCK != WSAGetLastError()) + int err = WSAGetLastError(); + if (WSAEWOULDBLOCK != err) #else if (EAGAIN != errno) #endif diff --git a/mtapi_plugins_c/mtapi_network_c/test/embb_mtapi_network_test_task.cc b/mtapi_plugins_c/mtapi_network_c/test/embb_mtapi_network_test_task.cc index cec09a2..bfc27d0 100644 --- a/mtapi_plugins_c/mtapi_network_c/test/embb_mtapi_network_test_task.cc +++ b/mtapi_plugins_c/mtapi_network_c/test/embb_mtapi_network_test_task.cc @@ -61,13 +61,52 @@ static void test( } } +static void cancel_test( + void const * /*arguments*/, + mtapi_size_t /*arguments_size*/, + void * /*result_buffer*/, + mtapi_size_t /*result_buffer_size*/, + void const * /*node_local_data*/, + mtapi_size_t /*node_local_data_size*/, + mtapi_task_context_t * context) { + mtapi_status_t status; + while (true) { + mtapi_task_state_t state = mtapi_context_taskstate_get(context, &status); + if (status != MTAPI_SUCCESS) { + break; + } else { + if (state == MTAPI_TASK_CANCELLED) { + break; + } + } + } +} NetworkTaskTest::NetworkTaskTest() { - CreateUnit("mtapi network task test").Add(&NetworkTaskTest::TestBasic, this); + CreateUnit("mtapi network task test") + .Add(&NetworkTaskTest::TestBasic, this); } void NetworkTaskTest::TestBasic() { mtapi_status_t status; + + mtapi_initialize( + NETWORK_DOMAIN, + NETWORK_LOCAL_NODE, + MTAPI_NULL, + MTAPI_NULL, + &status); + MTAPI_CHECK_STATUS(status); + + TestSimple(); + TestCancel(); + + mtapi_finalize(&status); + MTAPI_CHECK_STATUS(status); +} + +void NetworkTaskTest::TestSimple() { + mtapi_status_t status; mtapi_job_hndl_t job; mtapi_task_hndl_t task; mtapi_action_hndl_t network_action, local_action; @@ -81,14 +120,6 @@ void NetworkTaskTest::TestBasic() { arguments[ii + kElements] = static_cast(ii); } - mtapi_initialize( - NETWORK_DOMAIN, - NETWORK_LOCAL_NODE, - MTAPI_NULL, - MTAPI_NULL, - &status); - MTAPI_CHECK_STATUS(status); - mtapi_network_plugin_initialize("127.0.0.1", 12345, 5, kElements * 4 * 3 + 32, &status); MTAPI_CHECK_STATUS(status); @@ -139,7 +170,68 @@ void NetworkTaskTest::TestBasic() { mtapi_network_plugin_finalize(&status); MTAPI_CHECK_STATUS(status); +} - mtapi_finalize(&status); +void NetworkTaskTest::TestCancel() { + mtapi_status_t status; + mtapi_job_hndl_t job; + mtapi_task_hndl_t task; + mtapi_action_hndl_t network_action, local_action; + + float argument = 1.0f; + float result; + + mtapi_network_plugin_initialize("127.0.0.1", 12345, 5, + 4 * 3 + 32, &status); + MTAPI_CHECK_STATUS(status); + + float node_remote = 1.0f; + local_action = mtapi_action_create( + NETWORK_REMOTE_JOB, + cancel_test, + &node_remote, sizeof(float), + MTAPI_DEFAULT_ACTION_ATTRIBUTES, + &status); + MTAPI_CHECK_STATUS(status); + + network_action = mtapi_network_action_create( + NETWORK_DOMAIN, + NETWORK_LOCAL_JOB, + NETWORK_REMOTE_JOB, + "127.0.0.1", 12345, + &status); + MTAPI_CHECK_STATUS(status); + + status = MTAPI_ERR_UNKNOWN; + job = mtapi_job_get(NETWORK_LOCAL_JOB, NETWORK_DOMAIN, &status); + MTAPI_CHECK_STATUS(status); + + task = mtapi_task_start( + MTAPI_TASK_ID_NONE, + job, + &argument, sizeof(float), + &result, sizeof(float), + MTAPI_DEFAULT_TASK_ATTRIBUTES, + MTAPI_GROUP_NONE, + &status); + MTAPI_CHECK_STATUS(status); + + mtapi_task_wait(task, 1, &status); + PT_ASSERT_EQ(status, MTAPI_TIMEOUT); + + mtapi_task_cancel(task, &status); + MTAPI_CHECK_STATUS(status); + + mtapi_task_wait(task, MTAPI_INFINITE, &status); + PT_ASSERT_NE(status, MTAPI_TIMEOUT); + PT_ASSERT_EQ(status, MTAPI_ERR_ACTION_CANCELLED); + + mtapi_action_delete(network_action, MTAPI_INFINITE, &status); + MTAPI_CHECK_STATUS(status); + + mtapi_action_delete(local_action, MTAPI_INFINITE, &status); + MTAPI_CHECK_STATUS(status); + + mtapi_network_plugin_finalize(&status); MTAPI_CHECK_STATUS(status); } diff --git a/mtapi_plugins_c/mtapi_network_c/test/embb_mtapi_network_test_task.h b/mtapi_plugins_c/mtapi_network_c/test/embb_mtapi_network_test_task.h index 8162679..aa2573d 100644 --- a/mtapi_plugins_c/mtapi_network_c/test/embb_mtapi_network_test_task.h +++ b/mtapi_plugins_c/mtapi_network_c/test/embb_mtapi_network_test_task.h @@ -35,6 +35,9 @@ class NetworkTaskTest : public partest::TestCase { private: void TestBasic(); + + void TestSimple(); + void TestCancel(); }; #endif // MTAPI_PLUGINS_C_MTAPI_NETWORK_C_TEST_EMBB_MTAPI_NETWORK_TEST_TASK_H_