From 454e7619f40bba4ee0012ba1a45832e5a183618b Mon Sep 17 00:00:00 2001 From: Marcus Winter Date: Thu, 7 Apr 2016 12:59:04 +0200 Subject: [PATCH] mtapi_network_c: improved protocol for better error handling --- mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c | 346 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------------------------------------- 1 file changed, 236 insertions(+), 110 deletions(-) diff --git a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c index 7a9dac1..2140897 100644 --- a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c +++ b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c @@ -72,7 +72,8 @@ void embb_mtapi_network_finalize() { enum embb_mtapi_network_operation_enum { EMBB_MTAPI_NETWORK_START_TASK, - EMBB_MTAPI_NETWORK_RETURN_RESULT + EMBB_MTAPI_NETWORK_RETURN_RESULT, + EMBB_MTAPI_NETWORK_RETURN_FAILURE }; struct embb_mtapi_network_plugin_struct { @@ -114,12 +115,15 @@ struct embb_mtapi_network_task_struct { typedef struct embb_mtapi_network_task_struct embb_mtapi_network_task_t; +static void embb_mtapi_network_task_failure( + ) { + +} + static void embb_mtapi_network_task_complete( MTAPI_IN mtapi_task_hndl_t task, MTAPI_OUT mtapi_status_t* status) { mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; - int err; - EMBB_UNUSED_IN_RELEASE(err); if (embb_mtapi_node_is_initialized()) { embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); @@ -147,31 +151,37 @@ static void embb_mtapi_network_task_complete( // actual counts bytes actually put into the buffer int actual = 0; // expected counts bytes we intended to put into the buffer - int expected = 0; + int expected = + 1 + // operation + 4 + 4 + // remote task handle + 4 + // status + 4 + (int)local_task->result_size; // result buffer + + // packet size + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, expected); + expected += 4; // operation is "return result" actual += embb_mtapi_network_buffer_push_back_int8( send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT); - expected += 1; + // remote task id actual += embb_mtapi_network_buffer_push_back_int32( send_buf, network_task->remote_task_id); - expected += 4; actual += embb_mtapi_network_buffer_push_back_int32( send_buf, network_task->remote_task_tag); - expected += 4; + // status actual += embb_mtapi_network_buffer_push_back_int32( send_buf, local_task->error_code); - expected += 4; + // result size actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->result_size); - expected += 4; actual += embb_mtapi_network_buffer_push_back_rawdata( send_buf, (int32_t)local_task->result_size, local_task->result_buffer); - expected += (int)local_task->result_size; if (expected == actual) { int sent = embb_mtapi_network_socket_sendbuffer( @@ -194,9 +204,31 @@ static void embb_mtapi_network_task_complete( mtapi_status_set(status, local_status); } +static void embb_mtapi_network_return_failure( + int32_t remote_task_id, + int32_t remote_task_tag, + mtapi_status_t status, + embb_mtapi_network_socket_t * socket, + embb_mtapi_network_buffer_t * buffer) +{ + embb_mtapi_network_buffer_clear(buffer); + embb_mtapi_network_buffer_push_back_int32( + buffer, 12); + embb_mtapi_network_buffer_push_back_int32( + buffer, remote_task_id); + embb_mtapi_network_buffer_push_back_int32( + buffer, remote_task_tag); + embb_mtapi_network_buffer_push_back_int32( + buffer, (int32_t)status); + + embb_mtapi_network_socket_sendbuffer( + socket, buffer); +} + static mtapi_status_t embb_mtapi_network_handle_start_task( embb_mtapi_network_socket_t * socket, - embb_mtapi_network_buffer_t * buffer) { + embb_mtapi_network_buffer_t * buffer, + int packet_size) { int32_t domain_id; int32_t job_id; @@ -205,22 +237,18 @@ static mtapi_status_t embb_mtapi_network_handle_start_task( int err; int32_t arguments_size; + int32_t remote_task_id; + int32_t remote_task_tag; mtapi_uint_t priority = 0; mtapi_job_hndl_t job_hndl; mtapi_task_attributes_t task_attr; void * arguments; mtapi_task_complete_function_t func = embb_mtapi_network_task_complete; void * func_void; - embb_mtapi_network_task_t * network_task = - (embb_mtapi_network_task_t*)embb_alloc( - sizeof(embb_mtapi_network_task_t)); mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, buffer, 28); - - // check if we really got 28 bytes - if (err == 28) { + // check if we have at least 28 bytes + if (packet_size >= 28) { // domain id err = embb_mtapi_network_buffer_pop_front_int32(buffer, &domain_id); @@ -234,78 +262,98 @@ static mtapi_status_t embb_mtapi_network_handle_start_task( assert(err == 4); // remote task handle err = embb_mtapi_network_buffer_pop_front_int32( - buffer, &network_task->remote_task_id); + buffer, &remote_task_id); assert(err == 4); err = embb_mtapi_network_buffer_pop_front_int32( - buffer, &network_task->remote_task_tag); + buffer, &remote_task_tag); assert(err == 4); // result size err = embb_mtapi_network_buffer_pop_front_int32(buffer, &results_size); assert(err == 4); - results = embb_alloc((size_t)results_size); - assert(results != NULL); // arguments size - embb_mtapi_network_buffer_pop_front_int32(buffer, &arguments_size); + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &arguments_size); assert(err == 4); - arguments = embb_alloc((size_t)arguments_size); - assert(arguments != NULL); - - embb_mtapi_network_buffer_clear(buffer); - - // arguments - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, buffer, arguments_size); - assert(err == arguments_size); - err = embb_mtapi_network_buffer_pop_front_rawdata( - buffer, arguments_size, arguments); - assert(err == arguments_size); - - embb_mtapi_network_buffer_clear(buffer); - - network_task->socket = *socket; - mtapi_taskattr_init(&task_attr, &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA, - (void*)network_task, 0, &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_boolean_t task_detached = MTAPI_TRUE; - mtapi_taskattr_set(&task_attr, MTAPI_TASK_DETACHED, - (void*)&task_detached, sizeof(mtapi_boolean_t), &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_taskattr_set(&task_attr, MTAPI_TASK_PRIORITY, - (void*)&priority, sizeof(mtapi_uint_t), &local_status); - assert(local_status == MTAPI_SUCCESS); - memcpy(&func_void, &func, sizeof(void*)); - mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION, - func_void, 0, &local_status); - assert(local_status == MTAPI_SUCCESS); - job_hndl = mtapi_job_get((mtapi_job_id_t)job_id, - (mtapi_domain_t)domain_id, &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_task_start( - MTAPI_TASK_ID_NONE, job_hndl, - arguments, (mtapi_size_t)arguments_size, - results, (mtapi_size_t)results_size, - &task_attr, MTAPI_GROUP_NONE, - &local_status); - assert(local_status == MTAPI_SUCCESS); - - // send back result of task creation - //embb_mtapi_network_buffer_push_back_int32( - // &buffer, local_status); - //embb_mtapi_network_socket_sendbuffer( - // socket, &buffer); - } - embb_mtapi_network_buffer_clear(buffer); + embb_mtapi_network_task_t * network_task = + (embb_mtapi_network_task_t*)embb_alloc( + sizeof(embb_mtapi_network_task_t)); + + if (network_task == NULL) { + embb_mtapi_network_return_failure( + remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN, + socket, buffer); + return MTAPI_ERR_UNKNOWN; + } + network_task->remote_task_id = remote_task_id; + network_task->remote_task_tag = remote_task_tag; + + // check packet_size again + if (packet_size == 28 + arguments_size) { + // allocate buffers + results = embb_alloc((size_t)results_size); + if (results == NULL) { + embb_mtapi_network_return_failure( + remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN, + socket, buffer); + return MTAPI_ERR_UNKNOWN; + } + arguments = embb_alloc((size_t)arguments_size); + if (arguments == NULL) { + embb_free(results); + embb_mtapi_network_return_failure( + remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN, + socket, buffer); + return MTAPI_ERR_UNKNOWN; + } + + // arguments + err = embb_mtapi_network_buffer_pop_front_rawdata( + buffer, arguments_size, arguments); + assert(err == arguments_size); + + network_task->socket = *socket; + mtapi_taskattr_init(&task_attr, &local_status); + assert(local_status == MTAPI_SUCCESS); + mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA, + (void*)network_task, 0, &local_status); + assert(local_status == MTAPI_SUCCESS); + mtapi_boolean_t task_detached = MTAPI_TRUE; + mtapi_taskattr_set(&task_attr, MTAPI_TASK_DETACHED, + (void*)&task_detached, sizeof(mtapi_boolean_t), &local_status); + assert(local_status == MTAPI_SUCCESS); + mtapi_taskattr_set(&task_attr, MTAPI_TASK_PRIORITY, + (void*)&priority, sizeof(mtapi_uint_t), &local_status); + assert(local_status == MTAPI_SUCCESS); + memcpy(&func_void, &func, sizeof(void*)); + mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION, + func_void, 0, &local_status); + assert(local_status == MTAPI_SUCCESS); + job_hndl = mtapi_job_get((mtapi_job_id_t)job_id, + (mtapi_domain_t)domain_id, &local_status); + if (local_status == MTAPI_SUCCESS) { + mtapi_task_start( + MTAPI_TASK_ID_NONE, job_hndl, + arguments, (mtapi_size_t)arguments_size, + results, (mtapi_size_t)results_size, + &task_attr, MTAPI_GROUP_NONE, + &local_status); + } + if (local_status != MTAPI_SUCCESS) { + embb_free(arguments); + embb_free(results); + embb_mtapi_network_return_failure( + remote_task_id, remote_task_tag, local_status, socket, buffer); + } + } + } return local_status; } static mtapi_status_t embb_mtapi_network_handle_return_result( - embb_mtapi_network_socket_t * socket, - embb_mtapi_network_buffer_t * buffer) { + embb_mtapi_network_buffer_t * buffer, + int packet_size) { int task_status; int task_id; @@ -319,11 +367,8 @@ static mtapi_status_t embb_mtapi_network_handle_return_result( embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); mtapi_task_hndl_t task; - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, buffer, 16); - - // did we really receive 16 bytes? - if (err == 16) { + // do we have at least 16 bytes? + if (packet_size >= 16) { // local task id err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id); @@ -339,14 +384,8 @@ static mtapi_status_t embb_mtapi_network_handle_return_result( buffer, &results_size); assert(err == 4); - embb_mtapi_network_buffer_clear(buffer); - - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, buffer, results_size); - - // did we get the whole resultbuffer? - if (err == results_size) { - + // check packet_size again + if (packet_size == 16 + results_size) { task.id = (mtapi_task_id_t)task_id; task.tag = (mtapi_uint_t)task_tag; @@ -391,7 +430,71 @@ static mtapi_status_t embb_mtapi_network_handle_return_result( } } - embb_mtapi_network_buffer_clear(buffer); + return local_status; +} + +static mtapi_status_t embb_mtapi_network_handle_return_failure( + embb_mtapi_network_buffer_t * buffer, + int packet_size) { + + int task_status; + int task_id; + int task_tag; + + int err; + mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; + + if (embb_mtapi_node_is_initialized()) { + embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); + mtapi_task_hndl_t task; + + // do we have 12 bytes? + if (packet_size == 12) { + + // local task id + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id); + assert(err == 4); + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_tag); + assert(err == 4); + // task status + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &task_status); + assert(err == 4); + + task.id = (mtapi_task_id_t)task_id; + task.tag = (mtapi_uint_t)task_tag; + + if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) { + embb_mtapi_task_t * local_task = + embb_mtapi_task_pool_get_storage_for_handle( + node->task_pool, task); + + if (embb_mtapi_action_pool_is_handle_valid( + node->action_pool, local_task->action)) { + embb_mtapi_action_t * local_action = + embb_mtapi_action_pool_get_storage_for_handle( + node->action_pool, local_task->action); + + local_task->error_code = (mtapi_status_t)task_status; + embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR); + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); + + /* is task associated with a group? */ + if (embb_mtapi_group_pool_is_handle_valid( + node->group_pool, local_task->group)) { + embb_mtapi_group_t* local_group = + embb_mtapi_group_pool_get_storage_for_handle( + node->group_pool, local_task->group); + embb_mtapi_task_queue_push(&local_group->queue, local_task); + } + + local_status = MTAPI_SUCCESS; + } + + } + + } + } return local_status; } @@ -418,32 +521,46 @@ static int embb_mtapi_network_thread(void * args) { } } else if (0 < err) { int8_t operation; + int32_t packet_size; embb_mtapi_network_socket_t * socket = &plugin->sockets[err]; embb_mtapi_network_buffer_clear(buffer); err = embb_mtapi_network_socket_recvbuffer_sized( - socket, buffer, 1); - // did we receive one byte? - if (err == 1) { - err = embb_mtapi_network_buffer_pop_front_int8( - buffer, &operation); - assert(err == 1); + socket, buffer, 4); + if (err == 4) { + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &packet_size); + assert(err == 4); embb_mtapi_network_buffer_clear(buffer); - - switch (operation) { + err = embb_mtapi_network_socket_recvbuffer_sized( + socket, buffer, packet_size); + if (err == packet_size) { + err = embb_mtapi_network_buffer_pop_front_int8( + buffer, &operation); + assert(err == 1); + packet_size--; + + switch (operation) { case EMBB_MTAPI_NETWORK_START_TASK: - embb_mtapi_network_handle_start_task(socket, buffer); + embb_mtapi_network_handle_start_task(socket, buffer, packet_size); break; case EMBB_MTAPI_NETWORK_RETURN_RESULT: - embb_mtapi_network_handle_return_result(socket, buffer); + embb_mtapi_network_handle_return_result(buffer, packet_size); + break; + case EMBB_MTAPI_NETWORK_RETURN_FAILURE: + embb_mtapi_network_handle_return_failure(buffer, packet_size); break; default: // eh? break; + } } } + + embb_mtapi_network_buffer_clear(buffer); + } } @@ -610,42 +727,51 @@ static void network_task_start( // actual counts bytes actually put into the buffer int actual = 0; // expected counts bytes we intended to put into the buffer - int expected = 0; + int expected = + 1 + // operation + 4 + // domain_id + 4 + // job_id + 4 + // priority + 4 + 4 + // task handle + 4 + // result_size + 4 + local_task->arguments_size; // arguments buffer + + // packet size + actual += embb_mtapi_network_buffer_push_back_int32( + send_buf, (int32_t)expected); + expected += 4; // operation is "start task" actual += embb_mtapi_network_buffer_push_back_int8( send_buf, EMBB_MTAPI_NETWORK_START_TASK); - expected += 1; + // domain_id actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)network_action->domain_id); - expected += 4; + // job_id actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)network_action->job_id); - expected += 4; + // priority actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->attributes.priority); - expected += 4; + // task handle actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->handle.id); - expected += 4; actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->handle.tag); - expected += 4; + // result size actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->result_size); - expected += 4; + // arguments buffer actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->arguments_size); - expected += 4; actual += embb_mtapi_network_buffer_push_back_rawdata( send_buf, (int32_t)local_task->arguments_size, local_task->arguments); - expected += (int)local_task->arguments_size; // check if everything fit into the buffer if (actual == expected) { -- libgit2 0.26.0