From 253edaf245a3d66259d5820347d0871e9d5c84ce Mon Sep 17 00:00:00 2001 From: Marcus Winter Date: Mon, 21 Mar 2016 14:32:10 +0100 Subject: [PATCH] mtapi_network_c: improved error handling --- mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c | 664 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.c | 5 ++++- mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.h | 2 +- mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_socket.c | 3 ++- 4 files changed, 399 insertions(+), 275 deletions(-) diff --git a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c index 73a5585..ee908c3 100644 --- a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c +++ b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network.c @@ -84,6 +84,8 @@ struct embb_mtapi_network_plugin_struct { embb_mutex_t send_mutex; embb_mtapi_network_buffer_t send_buffer; + + embb_mtapi_network_buffer_t recv_buffer; }; typedef struct embb_mtapi_network_plugin_struct embb_mtapi_network_plugin_t; @@ -185,15 +187,215 @@ static void embb_mtapi_network_task_complete( mtapi_status_set(status, local_status); } +static mtapi_status_t embb_mtapi_network_handle_start_task( + embb_mtapi_network_socket_t * socket, + embb_mtapi_network_buffer_t * buffer) { + + int32_t domain_id; + int32_t job_id; + int32_t results_size; + void * results; + int err; + + int32_t arguments_size; + mtapi_uint_t priority = 0; + mtapi_job_hndl_t job_hndl; + mtapi_task_attributes_t task_attr; + void * arguments; + mtapi_task_complete_function_t func = embb_mtapi_network_task_complete; + void * func_void; + embb_mtapi_network_task_t * network_task = + (embb_mtapi_network_task_t*)embb_alloc( + sizeof(embb_mtapi_network_task_t)); + mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; + + err = embb_mtapi_network_socket_recvbuffer_sized( + socket, buffer, 28); + + // check if we really got 28 bytes + if (err == 28) { + + // domain id + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &domain_id); + assert(err == 4); + // job id + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &job_id); + assert(err == 4); + // priority + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, (int32_t*)&priority); + assert(err == 4); + // remote task handle + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &network_task->remote_task_id); + assert(err == 4); + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &network_task->remote_task_tag); + assert(err == 4); + // result size + err = embb_mtapi_network_buffer_pop_front_int32(buffer, + &results_size); + assert(err == 4); + results = embb_alloc((size_t)results_size); + assert(results != NULL); + // arguments size + embb_mtapi_network_buffer_pop_front_int32(buffer, &arguments_size); + assert(err == 4); + arguments = embb_alloc((size_t)arguments_size); + assert(arguments != NULL); + + embb_mtapi_network_buffer_clear(buffer); + + // arguments + err = embb_mtapi_network_socket_recvbuffer_sized( + socket, buffer, arguments_size); + assert(err == arguments_size); + err = embb_mtapi_network_buffer_pop_front_rawdata( + buffer, arguments_size, arguments); + assert(err == arguments_size); + + embb_mtapi_network_buffer_clear(buffer); + + network_task->socket = *socket; + mtapi_taskattr_init(&task_attr, &local_status); + assert(local_status == MTAPI_SUCCESS); + mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA, + (void*)network_task, 0, &local_status); + assert(local_status == MTAPI_SUCCESS); + mtapi_boolean_t task_detached = MTAPI_TRUE; + mtapi_taskattr_set(&task_attr, MTAPI_TASK_DETACHED, + (void*)&task_detached, sizeof(mtapi_boolean_t), &local_status); + assert(local_status == MTAPI_SUCCESS); + mtapi_taskattr_set(&task_attr, MTAPI_TASK_PRIORITY, + (void*)&priority, sizeof(mtapi_uint_t), &local_status); + assert(local_status == MTAPI_SUCCESS); + memcpy(&func_void, &func, sizeof(void*)); + mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION, + func_void, 0, &local_status); + assert(local_status == MTAPI_SUCCESS); + job_hndl = mtapi_job_get((mtapi_job_id_t)job_id, + (mtapi_domain_t)domain_id, &local_status); + assert(local_status == MTAPI_SUCCESS); + mtapi_task_start( + MTAPI_TASK_ID_NONE, job_hndl, + arguments, (mtapi_size_t)arguments_size, + results, (mtapi_size_t)results_size, + &task_attr, MTAPI_GROUP_NONE, + &local_status); + assert(local_status == MTAPI_SUCCESS); + + // send back result of task creation + //embb_mtapi_network_buffer_push_back_int32( + // &buffer, local_status); + //embb_mtapi_network_socket_sendbuffer( + // socket, &buffer); + } + + embb_mtapi_network_buffer_clear(buffer); + + return local_status; +} + +static mtapi_status_t embb_mtapi_network_handle_return_result( + embb_mtapi_network_socket_t * socket, + embb_mtapi_network_buffer_t * buffer) { + + int task_status; + int task_id; + int task_tag; + + int32_t results_size; + int err; + mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; + + if (embb_mtapi_node_is_initialized()) { + embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); + mtapi_task_hndl_t task; + + err = embb_mtapi_network_socket_recvbuffer_sized( + socket, buffer, 16); + + // did we really receive 16 bytes? + if (err == 16) { + + // local task id + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id); + assert(err == 4); + err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_tag); + assert(err == 4); + // task status + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &task_status); + assert(err == 4); + // result size + err = embb_mtapi_network_buffer_pop_front_int32( + buffer, &results_size); + assert(err == 4); + + embb_mtapi_network_buffer_clear(buffer); + + err = embb_mtapi_network_socket_recvbuffer_sized( + socket, buffer, results_size); + + // did we get the whole resultbuffer? + if (err == results_size) { + + task.id = (mtapi_task_id_t)task_id; + task.tag = (mtapi_uint_t)task_tag; + + if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) { + embb_mtapi_task_t * local_task = + embb_mtapi_task_pool_get_storage_for_handle( + node->task_pool, task); + + if (embb_mtapi_action_pool_is_handle_valid( + node->action_pool, local_task->action)) { + embb_mtapi_action_t * local_action = + embb_mtapi_action_pool_get_storage_for_handle( + node->action_pool, local_task->action); + + /* not needed right now + embb_mtapi_network_action_t * network_action = + (embb_mtapi_network_action_t*)local_action->plugin_data;*/ + + err = embb_mtapi_network_buffer_pop_front_rawdata( + buffer, results_size, local_task->result_buffer); + assert(err == results_size); + + local_task->error_code = (mtapi_status_t)task_status; + embb_atomic_store_int(&local_task->state, MTAPI_TASK_COMPLETED); + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); + + /* is task associated with a group? */ + if (embb_mtapi_group_pool_is_handle_valid( + node->group_pool, local_task->group)) { + embb_mtapi_group_t* local_group = + embb_mtapi_group_pool_get_storage_for_handle( + node->group_pool, local_task->group); + embb_mtapi_task_queue_push(&local_group->queue, local_task); + } + + local_status = MTAPI_SUCCESS; + } + } + + } + + } + } + + embb_mtapi_network_buffer_clear(buffer); + + return local_status; +} + static int embb_mtapi_network_thread(void * args) { embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin; - embb_mtapi_network_buffer_t buffer; + embb_mtapi_network_buffer_t * buffer = &plugin->recv_buffer; int err; EMBB_UNUSED(args); - embb_mtapi_network_buffer_initialize(&buffer, (int)plugin->buffer_size); - while (embb_atomic_load_int(&plugin->run)) { err = embb_mtapi_network_socket_select( plugin->sockets, plugin->socket_count, 100); @@ -208,197 +410,36 @@ static int embb_mtapi_network_thread(void * args) { plugin->socket_count++; } } else if (0 < err) { - int32_t domain_id; - int32_t job_id; - int32_t results_size; - void * results; int8_t operation; embb_mtapi_network_socket_t * socket = &plugin->sockets[err]; - embb_mtapi_network_buffer_clear(&buffer); + embb_mtapi_network_buffer_clear(buffer); err = embb_mtapi_network_socket_recvbuffer_sized( - socket, &buffer, 1); - if (err == 0) { - // there was some socket error, ignore - continue; - } - assert(err == 1); - err = embb_mtapi_network_buffer_pop_front_int8( - &buffer, &operation); - assert(err == 1); - - embb_mtapi_network_buffer_clear(&buffer); - - if (operation == EMBB_MTAPI_NETWORK_START_TASK) { - int32_t arguments_size; - mtapi_uint_t priority = 0; - mtapi_job_hndl_t job_hndl; - mtapi_task_attributes_t task_attr; - void * arguments; - mtapi_task_complete_function_t func = embb_mtapi_network_task_complete; - void * func_void; - embb_mtapi_network_task_t * network_task = - (embb_mtapi_network_task_t*)embb_alloc( - sizeof(embb_mtapi_network_task_t)); - mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; - - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, &buffer, 28); - assert(err == 28); - // domain id - err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &domain_id); - assert(err == 4); - // job id - err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &job_id); - assert(err == 4); - // priority - err = embb_mtapi_network_buffer_pop_front_int32( - &buffer, (int32_t*)&priority); - assert(err == 4); - // remote task handle - err = embb_mtapi_network_buffer_pop_front_int32( - &buffer, &network_task->remote_task_id); - assert(err == 4); - err = embb_mtapi_network_buffer_pop_front_int32( - &buffer, &network_task->remote_task_tag); - assert(err == 4); - // result size - err = embb_mtapi_network_buffer_pop_front_int32(&buffer, - &results_size); - assert(err == 4); - results = embb_alloc((size_t)results_size); - assert(results != NULL); - // arguments size - embb_mtapi_network_buffer_pop_front_int32(&buffer, &arguments_size); - assert(err == 4); - arguments = embb_alloc((size_t)arguments_size); - assert(arguments != NULL); - - embb_mtapi_network_buffer_clear(&buffer); - - // arguments - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, &buffer, arguments_size); - assert(err == arguments_size); - err = embb_mtapi_network_buffer_pop_front_rawdata( - &buffer, arguments_size, arguments); - assert(err == arguments_size); - - embb_mtapi_network_buffer_clear(&buffer); - - network_task->socket = *socket; - mtapi_taskattr_init(&task_attr, &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA, - (void*)network_task, 0, &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_boolean_t task_detached = MTAPI_TRUE; - mtapi_taskattr_set(&task_attr, MTAPI_TASK_DETACHED, - (void*)&task_detached, sizeof(mtapi_boolean_t), &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_taskattr_set(&task_attr, MTAPI_TASK_PRIORITY, - (void*)&priority, sizeof(mtapi_uint_t), &local_status); - assert(local_status == MTAPI_SUCCESS); - memcpy(&func_void, &func, sizeof(void*)); - mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION, - func_void, 0, &local_status); - assert(local_status == MTAPI_SUCCESS); - job_hndl = mtapi_job_get((mtapi_job_id_t)job_id, - (mtapi_domain_t)domain_id, &local_status); - assert(local_status == MTAPI_SUCCESS); - mtapi_task_start( - MTAPI_TASK_ID_NONE, job_hndl, - arguments, (mtapi_size_t)arguments_size, - results, (mtapi_size_t)results_size, - &task_attr, MTAPI_GROUP_NONE, - &local_status); - assert(local_status == MTAPI_SUCCESS); - - // send back result of task creation - //embb_mtapi_network_buffer_push_back_int32( - // &buffer, local_status); - //embb_mtapi_network_socket_sendbuffer( - // socket, &buffer); - - embb_mtapi_network_buffer_clear(&buffer); - } else if (operation == EMBB_MTAPI_NETWORK_RETURN_RESULT) { - int task_status; - int task_id; - int task_tag; - - embb_mtapi_network_buffer_clear(&buffer); - - if (embb_mtapi_node_is_initialized()) { - embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); - mtapi_task_hndl_t task; - - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, &buffer, 16); - assert(err == 16); - // local task id - err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_id); - assert(err == 4); - err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_tag); - assert(err == 4); - // task status - err = embb_mtapi_network_buffer_pop_front_int32( - &buffer, &task_status); - assert(err == 4); - // result size - err = embb_mtapi_network_buffer_pop_front_int32( - &buffer, &results_size); - assert(err == 4); - - embb_mtapi_network_buffer_clear(&buffer); - - err = embb_mtapi_network_socket_recvbuffer_sized( - socket, &buffer, results_size); - assert(err == results_size); - - task.id = (mtapi_task_id_t)task_id; - task.tag = (mtapi_uint_t)task_tag; - - if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) { - embb_mtapi_task_t * local_task = - embb_mtapi_task_pool_get_storage_for_handle( - node->task_pool, task); - - if (embb_mtapi_action_pool_is_handle_valid( - node->action_pool, local_task->action)) { - embb_mtapi_action_t * local_action = - embb_mtapi_action_pool_get_storage_for_handle( - node->action_pool, local_task->action); - - /* not needed right now - embb_mtapi_network_action_t * network_action = - (embb_mtapi_network_action_t*)local_action->plugin_data;*/ - - err = embb_mtapi_network_buffer_pop_front_rawdata( - &buffer, results_size, local_task->result_buffer); - assert(err == results_size); - - local_task->error_code = (mtapi_status_t)task_status; - embb_atomic_store_int(&local_task->state, MTAPI_TASK_COMPLETED); - embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); + socket, buffer, 1); + // did we receive one byte? + if (err == 1) { + err = embb_mtapi_network_buffer_pop_front_int8( + buffer, &operation); + assert(err == 1); - /* is task associated with a group? */ - if (embb_mtapi_group_pool_is_handle_valid( - node->group_pool, local_task->group)) { - embb_mtapi_group_t* local_group = - embb_mtapi_group_pool_get_storage_for_handle( - node->group_pool, local_task->group); - embb_mtapi_task_queue_push(&local_group->queue, local_task); - } - } - } + embb_mtapi_network_buffer_clear(buffer); + + switch (operation) { + case EMBB_MTAPI_NETWORK_START_TASK: + embb_mtapi_network_handle_start_task(socket, buffer); + break; + case EMBB_MTAPI_NETWORK_RETURN_RESULT: + embb_mtapi_network_handle_return_result(socket, buffer); + break; + default: + // eh? + break; } } } } - embb_mtapi_network_buffer_finalize(&buffer); - return EMBB_SUCCESS; } @@ -408,42 +449,106 @@ void mtapi_network_plugin_initialize( MTAPI_IN mtapi_uint16_t max_connections, MTAPI_IN mtapi_size_t buffer_size, MTAPI_OUT mtapi_status_t* status) { - mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin; int err; + mtapi_status_set(status, MTAPI_ERR_UNKNOWN); + + plugin->socket_count = 0; + plugin->buffer_size = 0; + plugin->sockets = NULL; + embb_atomic_store_int(&plugin->run, 0); + err = embb_mtapi_network_initialize(); - if (err) { - embb_atomic_store_int(&plugin->run, 1); - plugin->buffer_size = buffer_size; - - plugin->socket_count = 1; - // 1 listening socket and max_connections connections - // (2 sockets each if local) - plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc( - sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2)); - - embb_mtapi_network_buffer_initialize( - &plugin->send_buffer, (int)plugin->buffer_size); - embb_mutex_init(&plugin->send_mutex, 0); - - if (NULL != plugin->sockets) { - err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]); - if (err) { - err = embb_mtapi_network_socket_bind_and_listen( - &plugin->sockets[0], host, port, max_connections); - if (err) { - err = embb_thread_create( - &plugin->thread, NULL, embb_mtapi_network_thread, NULL); - if (EMBB_SUCCESS == err) { - local_status = MTAPI_SUCCESS; - } - } - } - } + if (0 == err) return; + + err = embb_mtapi_network_buffer_initialize( + &plugin->recv_buffer, (int)buffer_size); + if (0 == err) { + embb_mtapi_network_finalize(); + return; } - mtapi_status_set(status, local_status); + err = embb_mtapi_network_buffer_initialize( + &plugin->send_buffer, (int)buffer_size); + if (0 == err) { + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + embb_mtapi_network_finalize(); + return; + } + + plugin->buffer_size = buffer_size; + + // 1 listening socket and max_connections connections + // (2 sockets each if local) + plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc( + sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2)); + if (NULL == plugin->sockets) { + embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + plugin->buffer_size = 0; + embb_mtapi_network_finalize(); + return; + } + + err = embb_mutex_init(&plugin->send_mutex, 0); + if (EMBB_SUCCESS != err) { + embb_free(plugin->sockets); + plugin->sockets = NULL; + embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + plugin->buffer_size = 0; + embb_mtapi_network_finalize(); + return; + } + + err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]); + if (0 == err) { + embb_mutex_destroy(&plugin->send_mutex); + embb_free(plugin->sockets); + plugin->sockets = NULL; + embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + plugin->buffer_size = 0; + embb_mtapi_network_finalize(); + return; + } + plugin->socket_count = 1; + + err = embb_mtapi_network_socket_bind_and_listen( + &plugin->sockets[0], host, port, max_connections); + if (0 == err) { + embb_mtapi_network_socket_finalize(&plugin->sockets[0]); + plugin->socket_count = 0; + embb_mutex_destroy(&plugin->send_mutex); + embb_free(plugin->sockets); + plugin->sockets = NULL; + embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + plugin->buffer_size = 0; + embb_mtapi_network_finalize(); + return; + } + + embb_atomic_store_int(&plugin->run, 1); + + err = embb_thread_create( + &plugin->thread, NULL, embb_mtapi_network_thread, NULL); + if (EMBB_SUCCESS != err) { + embb_atomic_store_int(&plugin->run, 0); + embb_mtapi_network_socket_finalize(&plugin->sockets[0]); + plugin->socket_count = 0; + embb_mutex_destroy(&plugin->send_mutex); + embb_free(plugin->sockets); + plugin->sockets = NULL; + embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + plugin->buffer_size = 0; + embb_mtapi_network_finalize(); + return; + } + + mtapi_status_set(status, MTAPI_SUCCESS); } void mtapi_network_plugin_finalize( @@ -458,6 +563,8 @@ void mtapi_network_plugin_finalize( embb_mutex_destroy(&plugin->send_mutex); embb_mtapi_network_buffer_finalize(&plugin->send_buffer); + embb_mtapi_network_buffer_finalize(&plugin->recv_buffer); + embb_mtapi_network_socket_finalize(&plugin->sockets[0]); embb_free(plugin->sockets); embb_mtapi_network_finalize(); @@ -468,9 +575,9 @@ void mtapi_network_plugin_finalize( static void network_task_start( MTAPI_IN mtapi_task_hndl_t task, MTAPI_OUT mtapi_status_t* status) { - mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; - int err; - EMBB_UNUSED_IN_RELEASE(err); + + // assume failure + mtapi_status_set(status, MTAPI_ERR_UNKNOWN); if (embb_mtapi_node_is_initialized()) { embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); @@ -491,59 +598,70 @@ static void network_task_start( // serialize sending embb_mutex_lock(&network_action->send_mutex); + embb_mtapi_network_buffer_clear(send_buf); + + // actual counts bytes actually put into the buffer + int actual = 0; + // expected counts bytes we intended to put into the buffer + int expected = 0; // operation is "start task" - err = embb_mtapi_network_buffer_push_back_int8( + actual += embb_mtapi_network_buffer_push_back_int8( send_buf, EMBB_MTAPI_NETWORK_START_TASK); - assert(err == 1); + expected += 1; - err = embb_mtapi_network_buffer_push_back_int32( + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)network_action->domain_id); - assert(err == 4); + expected += 4; - err = embb_mtapi_network_buffer_push_back_int32( + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)network_action->job_id); - assert(err == 4); + expected += 4; - err = embb_mtapi_network_buffer_push_back_int32( + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->attributes.priority); - assert(err == 4); + expected += 4; - err = embb_mtapi_network_buffer_push_back_int32( + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->handle.id); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_int32( + expected += 4; + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->handle.tag); - assert(err == 4); + expected += 4; - err = embb_mtapi_network_buffer_push_back_int32( + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->result_size); - assert(err == 4); + expected += 4; - err = embb_mtapi_network_buffer_push_back_int32( + actual += embb_mtapi_network_buffer_push_back_int32( send_buf, (int32_t)local_task->arguments_size); - assert(err == 4); - err = embb_mtapi_network_buffer_push_back_rawdata( + expected += 4; + actual += embb_mtapi_network_buffer_push_back_rawdata( send_buf, (int32_t)local_task->arguments_size, local_task->arguments); - assert(err == (int)local_task->arguments_size); - - err = embb_mtapi_network_socket_sendbuffer( - &network_action->socket, send_buf); - assert(err == send_buf->size); - - embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); - embb_atomic_store_int(&local_task->state, MTAPI_TASK_RUNNING); + expected += (int)local_task->arguments_size; + + // check if everything fit into the buffer + if (actual == expected) { + int sent = embb_mtapi_network_socket_sendbuffer( + &network_action->socket, send_buf); + // was everything sent? + if (sent == send_buf->size) { + embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); + embb_atomic_store_int(&local_task->state, MTAPI_TASK_RUNNING); + // we've done it, success! + mtapi_status_set(status, MTAPI_SUCCESS); + } else { + // could not send the whole task, this will fail on the remote side, + // so we can safely assume that the task is in error + embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR); + } + } embb_mtapi_network_buffer_clear(send_buf); - embb_mutex_unlock(&network_action->send_mutex); - - local_status = MTAPI_SUCCESS; } } } - - mtapi_status_set(status, local_status); } static void network_task_cancel( @@ -602,34 +720,36 @@ mtapi_action_hndl_t mtapi_network_action_create( action->domain_id = domain_id; action->job_id = remote_job_id; - embb_mtapi_network_buffer_initialize( + err = embb_mtapi_network_buffer_initialize( &action->send_buffer, (int)plugin->buffer_size); - embb_mutex_init(&action->send_mutex, 0); - - action->host = host; - action->port = port; - embb_mtapi_network_socket_initialize(&action->socket); - err = embb_mtapi_network_socket_connect(&action->socket, host, port); - if (0 != err) { - // store socket for select - plugin->sockets[plugin->socket_count] = action->socket; - plugin->socket_count++; - - action_hndl = mtapi_ext_plugin_action_create( - local_job_id, - network_task_start, - network_task_cancel, - network_action_finalize, - action, - NULL, 0, // no node local data obviously - MTAPI_NULL, - &local_status); - } else { - embb_mutex_destroy(&action->send_mutex); - embb_mtapi_network_buffer_finalize(&action->send_buffer); - embb_mtapi_network_socket_finalize(&action->socket); - embb_free(action); + err = embb_mutex_init(&action->send_mutex, 0); + if (EMBB_SUCCESS == err) { + action->host = host; + action->port = port; + embb_mtapi_network_socket_initialize(&action->socket); + err = embb_mtapi_network_socket_connect(&action->socket, host, port); + if (0 != err) { + // store socket for select + plugin->sockets[plugin->socket_count] = action->socket; + plugin->socket_count++; + + action_hndl = mtapi_ext_plugin_action_create( + local_job_id, + network_task_start, + network_task_cancel, + network_action_finalize, + action, + NULL, 0, // no node local data obviously + MTAPI_NULL, + &local_status); + } else { + embb_mutex_destroy(&action->send_mutex); + embb_mtapi_network_buffer_finalize(&action->send_buffer); + embb_mtapi_network_socket_finalize(&action->socket); + embb_free(action); + } + } } } diff --git a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.c b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.c index 76ef1b0..61b0482 100644 --- a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.c +++ b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.c @@ -28,9 +28,10 @@ #include #include -void embb_mtapi_network_buffer_initialize( +int embb_mtapi_network_buffer_initialize( embb_mtapi_network_buffer_t * that, int capacity) { + int result = 1; that->position = 0; that->size = 0; that->data = (char*)embb_alloc((size_t)capacity); @@ -38,7 +39,9 @@ void embb_mtapi_network_buffer_initialize( that->capacity = capacity; } else { that->capacity = 0; + result = 0; } + return result; } void embb_mtapi_network_buffer_finalize( diff --git a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.h b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.h index 2d5c306..f4a6c11 100644 --- a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.h +++ b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_buffer.h @@ -43,7 +43,7 @@ struct embb_mtapi_network_buffer_struct { typedef struct embb_mtapi_network_buffer_struct embb_mtapi_network_buffer_t; -void embb_mtapi_network_buffer_initialize( +int embb_mtapi_network_buffer_initialize( embb_mtapi_network_buffer_t * that, int capacity ); diff --git a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_socket.c b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_socket.c index 4d203cf..bcb46ea 100644 --- a/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_socket.c +++ b/mtapi_plugins_c/mtapi_network_c/src/embb_mtapi_network_socket.c @@ -116,7 +116,8 @@ int embb_mtapi_network_socket_connect( if (SOCKET_ERROR == connect(that->handle, (struct sockaddr *)&addr, sizeof(addr))) { #ifdef _WIN32 - if (WSAEWOULDBLOCK != WSAGetLastError()) + int err = WSAGetLastError(); + if (WSAEWOULDBLOCK != err) #else if (EAGAIN != errno) #endif -- libgit2 0.26.0