Commit 253edaf2 by Marcus Winter

mtapi_network_c: improved error handling

parent 5bbc63a1
......@@ -84,6 +84,8 @@ struct embb_mtapi_network_plugin_struct {
embb_mutex_t send_mutex;
embb_mtapi_network_buffer_t send_buffer;
embb_mtapi_network_buffer_t recv_buffer;
};
typedef struct embb_mtapi_network_plugin_struct embb_mtapi_network_plugin_t;
......@@ -185,52 +187,16 @@ static void embb_mtapi_network_task_complete(
mtapi_status_set(status, local_status);
}
static int embb_mtapi_network_thread(void * args) {
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
embb_mtapi_network_buffer_t buffer;
int err;
EMBB_UNUSED(args);
embb_mtapi_network_buffer_initialize(&buffer, (int)plugin->buffer_size);
static mtapi_status_t embb_mtapi_network_handle_start_task(
embb_mtapi_network_socket_t * socket,
embb_mtapi_network_buffer_t * buffer) {
while (embb_atomic_load_int(&plugin->run)) {
err = embb_mtapi_network_socket_select(
plugin->sockets, plugin->socket_count, 100);
if (0 == err) {
// listening socket, accept connection
embb_mtapi_network_socket_t accept_socket;
err = embb_mtapi_network_socket_accept(
&plugin->sockets[0], &accept_socket);
if (0 < err) {
// add socket to socket list
plugin->sockets[plugin->socket_count] = accept_socket;
plugin->socket_count++;
}
} else if (0 < err) {
int32_t domain_id;
int32_t job_id;
int32_t results_size;
void * results;
int8_t operation;
embb_mtapi_network_socket_t * socket = &plugin->sockets[err];
embb_mtapi_network_buffer_clear(&buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 1);
if (err == 0) {
// there was some socket error, ignore
continue;
}
assert(err == 1);
err = embb_mtapi_network_buffer_pop_front_int8(
&buffer, &operation);
assert(err == 1);
embb_mtapi_network_buffer_clear(&buffer);
int err;
if (operation == EMBB_MTAPI_NETWORK_START_TASK) {
int32_t arguments_size;
mtapi_uint_t priority = 0;
mtapi_job_hndl_t job_hndl;
......@@ -244,48 +210,51 @@ static int embb_mtapi_network_thread(void * args) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 28);
assert(err == 28);
socket, buffer, 28);
// check if we really got 28 bytes
if (err == 28) {
// domain id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &domain_id);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &domain_id);
assert(err == 4);
// job id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &job_id);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &job_id);
assert(err == 4);
// priority
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, (int32_t*)&priority);
buffer, (int32_t*)&priority);
assert(err == 4);
// remote task handle
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &network_task->remote_task_id);
buffer, &network_task->remote_task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &network_task->remote_task_tag);
buffer, &network_task->remote_task_tag);
assert(err == 4);
// result size
err = embb_mtapi_network_buffer_pop_front_int32(&buffer,
err = embb_mtapi_network_buffer_pop_front_int32(buffer,
&results_size);
assert(err == 4);
results = embb_alloc((size_t)results_size);
assert(results != NULL);
// arguments size
embb_mtapi_network_buffer_pop_front_int32(&buffer, &arguments_size);
embb_mtapi_network_buffer_pop_front_int32(buffer, &arguments_size);
assert(err == 4);
arguments = embb_alloc((size_t)arguments_size);
assert(arguments != NULL);
embb_mtapi_network_buffer_clear(&buffer);
embb_mtapi_network_buffer_clear(buffer);
// arguments
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, arguments_size);
socket, buffer, arguments_size);
assert(err == arguments_size);
err = embb_mtapi_network_buffer_pop_front_rawdata(
&buffer, arguments_size, arguments);
buffer, arguments_size, arguments);
assert(err == arguments_size);
embb_mtapi_network_buffer_clear(&buffer);
embb_mtapi_network_buffer_clear(buffer);
network_task->socket = *socket;
mtapi_taskattr_init(&task_attr, &local_status);
......@@ -320,41 +289,56 @@ static int embb_mtapi_network_thread(void * args) {
// &buffer, local_status);
//embb_mtapi_network_socket_sendbuffer(
// socket, &buffer);
}
embb_mtapi_network_buffer_clear(buffer);
return local_status;
}
static mtapi_status_t embb_mtapi_network_handle_return_result(
embb_mtapi_network_socket_t * socket,
embb_mtapi_network_buffer_t * buffer) {
embb_mtapi_network_buffer_clear(&buffer);
} else if (operation == EMBB_MTAPI_NETWORK_RETURN_RESULT) {
int task_status;
int task_id;
int task_tag;
embb_mtapi_network_buffer_clear(&buffer);
int32_t results_size;
int err;
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_task_hndl_t task;
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 16);
assert(err == 16);
socket, buffer, 16);
// did we really receive 16 bytes?
if (err == 16) {
// local task id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_id);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_tag);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_tag);
assert(err == 4);
// task status
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &task_status);
buffer, &task_status);
assert(err == 4);
// result size
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &results_size);
buffer, &results_size);
assert(err == 4);
embb_mtapi_network_buffer_clear(&buffer);
embb_mtapi_network_buffer_clear(buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, results_size);
assert(err == results_size);
socket, buffer, results_size);
// did we get the whole resultbuffer?
if (err == results_size) {
task.id = (mtapi_task_id_t)task_id;
task.tag = (mtapi_uint_t)task_tag;
......@@ -375,7 +359,7 @@ static int embb_mtapi_network_thread(void * args) {
(embb_mtapi_network_action_t*)local_action->plugin_data;*/
err = embb_mtapi_network_buffer_pop_front_rawdata(
&buffer, results_size, local_task->result_buffer);
buffer, results_size, local_task->result_buffer);
assert(err == results_size);
local_task->error_code = (mtapi_status_t)task_status;
......@@ -390,14 +374,71 @@ static int embb_mtapi_network_thread(void * args) {
node->group_pool, local_task->group);
embb_mtapi_task_queue_push(&local_group->queue, local_task);
}
local_status = MTAPI_SUCCESS;
}
}
}
}
}
embb_mtapi_network_buffer_clear(buffer);
return local_status;
}
static int embb_mtapi_network_thread(void * args) {
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
embb_mtapi_network_buffer_t * buffer = &plugin->recv_buffer;
int err;
EMBB_UNUSED(args);
while (embb_atomic_load_int(&plugin->run)) {
err = embb_mtapi_network_socket_select(
plugin->sockets, plugin->socket_count, 100);
if (0 == err) {
// listening socket, accept connection
embb_mtapi_network_socket_t accept_socket;
err = embb_mtapi_network_socket_accept(
&plugin->sockets[0], &accept_socket);
if (0 < err) {
// add socket to socket list
plugin->sockets[plugin->socket_count] = accept_socket;
plugin->socket_count++;
}
} else if (0 < err) {
int8_t operation;
embb_mtapi_network_socket_t * socket = &plugin->sockets[err];
embb_mtapi_network_buffer_clear(buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, buffer, 1);
// did we receive one byte?
if (err == 1) {
err = embb_mtapi_network_buffer_pop_front_int8(
buffer, &operation);
assert(err == 1);
embb_mtapi_network_buffer_finalize(&buffer);
embb_mtapi_network_buffer_clear(buffer);
switch (operation) {
case EMBB_MTAPI_NETWORK_START_TASK:
embb_mtapi_network_handle_start_task(socket, buffer);
break;
case EMBB_MTAPI_NETWORK_RETURN_RESULT:
embb_mtapi_network_handle_return_result(socket, buffer);
break;
default:
// eh?
break;
}
}
}
}
return EMBB_SUCCESS;
}
......@@ -408,42 +449,106 @@ void mtapi_network_plugin_initialize(
MTAPI_IN mtapi_uint16_t max_connections,
MTAPI_IN mtapi_size_t buffer_size,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
int err;
mtapi_status_set(status, MTAPI_ERR_UNKNOWN);
plugin->socket_count = 0;
plugin->buffer_size = 0;
plugin->sockets = NULL;
embb_atomic_store_int(&plugin->run, 0);
err = embb_mtapi_network_initialize();
if (err) {
embb_atomic_store_int(&plugin->run, 1);
if (0 == err) return;
err = embb_mtapi_network_buffer_initialize(
&plugin->recv_buffer, (int)buffer_size);
if (0 == err) {
embb_mtapi_network_finalize();
return;
}
err = embb_mtapi_network_buffer_initialize(
&plugin->send_buffer, (int)buffer_size);
if (0 == err) {
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
embb_mtapi_network_finalize();
return;
}
plugin->buffer_size = buffer_size;
plugin->socket_count = 1;
// 1 listening socket and max_connections connections
// (2 sockets each if local)
plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc(
sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2));
if (NULL == plugin->sockets) {
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
embb_mtapi_network_buffer_initialize(
&plugin->send_buffer, (int)plugin->buffer_size);
embb_mutex_init(&plugin->send_mutex, 0);
err = embb_mutex_init(&plugin->send_mutex, 0);
if (EMBB_SUCCESS != err) {
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
if (NULL != plugin->sockets) {
err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]);
if (err) {
if (0 == err) {
embb_mutex_destroy(&plugin->send_mutex);
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
plugin->socket_count = 1;
err = embb_mtapi_network_socket_bind_and_listen(
&plugin->sockets[0], host, port, max_connections);
if (err) {
if (0 == err) {
embb_mtapi_network_socket_finalize(&plugin->sockets[0]);
plugin->socket_count = 0;
embb_mutex_destroy(&plugin->send_mutex);
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
embb_atomic_store_int(&plugin->run, 1);
err = embb_thread_create(
&plugin->thread, NULL, embb_mtapi_network_thread, NULL);
if (EMBB_SUCCESS == err) {
local_status = MTAPI_SUCCESS;
}
}
}
}
if (EMBB_SUCCESS != err) {
embb_atomic_store_int(&plugin->run, 0);
embb_mtapi_network_socket_finalize(&plugin->sockets[0]);
plugin->socket_count = 0;
embb_mutex_destroy(&plugin->send_mutex);
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
mtapi_status_set(status, local_status);
mtapi_status_set(status, MTAPI_SUCCESS);
}
void mtapi_network_plugin_finalize(
......@@ -458,6 +563,8 @@ void mtapi_network_plugin_finalize(
embb_mutex_destroy(&plugin->send_mutex);
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
embb_mtapi_network_socket_finalize(&plugin->sockets[0]);
embb_free(plugin->sockets);
embb_mtapi_network_finalize();
......@@ -468,9 +575,9 @@ void mtapi_network_plugin_finalize(
static void network_task_start(
MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
int err;
EMBB_UNUSED_IN_RELEASE(err);
// assume failure
mtapi_status_set(status, MTAPI_ERR_UNKNOWN);
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
......@@ -491,59 +598,70 @@ static void network_task_start(
// serialize sending
embb_mutex_lock(&network_action->send_mutex);
embb_mtapi_network_buffer_clear(send_buf);
// actual counts bytes actually put into the buffer
int actual = 0;
// expected counts bytes we intended to put into the buffer
int expected = 0;
// operation is "start task"
err = embb_mtapi_network_buffer_push_back_int8(
actual += embb_mtapi_network_buffer_push_back_int8(
send_buf, EMBB_MTAPI_NETWORK_START_TASK);
assert(err == 1);
expected += 1;
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)network_action->domain_id);
assert(err == 4);
expected += 4;
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)network_action->job_id);
assert(err == 4);
expected += 4;
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->attributes.priority);
assert(err == 4);
expected += 4;
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
expected += 4;
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.tag);
assert(err == 4);
expected += 4;
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->result_size);
assert(err == 4);
expected += 4;
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->arguments_size);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_rawdata(
expected += 4;
actual += embb_mtapi_network_buffer_push_back_rawdata(
send_buf, (int32_t)local_task->arguments_size, local_task->arguments);
assert(err == (int)local_task->arguments_size);
expected += (int)local_task->arguments_size;
err = embb_mtapi_network_socket_sendbuffer(
// check if everything fit into the buffer
if (actual == expected) {
int sent = embb_mtapi_network_socket_sendbuffer(
&network_action->socket, send_buf);
assert(err == send_buf->size);
// was everything sent?
if (sent == send_buf->size) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
embb_atomic_store_int(&local_task->state, MTAPI_TASK_RUNNING);
// we've done it, success!
mtapi_status_set(status, MTAPI_SUCCESS);
} else {
// could not send the whole task, this will fail on the remote side,
// so we can safely assume that the task is in error
embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR);
}
}
embb_mtapi_network_buffer_clear(send_buf);
embb_mutex_unlock(&network_action->send_mutex);
local_status = MTAPI_SUCCESS;
}
}
}
mtapi_status_set(status, local_status);
}
static void network_task_cancel(
......@@ -602,15 +720,15 @@ mtapi_action_hndl_t mtapi_network_action_create(
action->domain_id = domain_id;
action->job_id = remote_job_id;
embb_mtapi_network_buffer_initialize(
err = embb_mtapi_network_buffer_initialize(
&action->send_buffer, (int)plugin->buffer_size);
embb_mutex_init(&action->send_mutex, 0);
if (0 != err) {
err = embb_mutex_init(&action->send_mutex, 0);
if (EMBB_SUCCESS == err) {
action->host = host;
action->port = port;
embb_mtapi_network_socket_initialize(&action->socket);
err = embb_mtapi_network_socket_connect(&action->socket, host, port);
if (0 != err) {
// store socket for select
plugin->sockets[plugin->socket_count] = action->socket;
......@@ -632,6 +750,8 @@ mtapi_action_hndl_t mtapi_network_action_create(
embb_free(action);
}
}
}
}
mtapi_status_set(status, local_status);
return action_hndl;
......
......@@ -28,9 +28,10 @@
#include <embb/base/c/memory_allocation.h>
#include <string.h>
void embb_mtapi_network_buffer_initialize(
int embb_mtapi_network_buffer_initialize(
embb_mtapi_network_buffer_t * that,
int capacity) {
int result = 1;
that->position = 0;
that->size = 0;
that->data = (char*)embb_alloc((size_t)capacity);
......@@ -38,7 +39,9 @@ void embb_mtapi_network_buffer_initialize(
that->capacity = capacity;
} else {
that->capacity = 0;
result = 0;
}
return result;
}
void embb_mtapi_network_buffer_finalize(
......
......@@ -43,7 +43,7 @@ struct embb_mtapi_network_buffer_struct {
typedef struct embb_mtapi_network_buffer_struct embb_mtapi_network_buffer_t;
void embb_mtapi_network_buffer_initialize(
int embb_mtapi_network_buffer_initialize(
embb_mtapi_network_buffer_t * that,
int capacity
);
......
......@@ -116,7 +116,8 @@ int embb_mtapi_network_socket_connect(
if (SOCKET_ERROR == connect(that->handle, (struct sockaddr *)&addr,
sizeof(addr))) {
#ifdef _WIN32
if (WSAEWOULDBLOCK != WSAGetLastError())
int err = WSAGetLastError();
if (WSAEWOULDBLOCK != err)
#else
if (EAGAIN != errno)
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment