Commit 454e7619 by Marcus Winter

mtapi_network_c: improved protocol for better error handling

parent 14b8ff89
...@@ -72,7 +72,8 @@ void embb_mtapi_network_finalize() { ...@@ -72,7 +72,8 @@ void embb_mtapi_network_finalize() {
enum embb_mtapi_network_operation_enum { enum embb_mtapi_network_operation_enum {
EMBB_MTAPI_NETWORK_START_TASK, EMBB_MTAPI_NETWORK_START_TASK,
EMBB_MTAPI_NETWORK_RETURN_RESULT EMBB_MTAPI_NETWORK_RETURN_RESULT,
EMBB_MTAPI_NETWORK_RETURN_FAILURE
}; };
struct embb_mtapi_network_plugin_struct { struct embb_mtapi_network_plugin_struct {
...@@ -114,12 +115,15 @@ struct embb_mtapi_network_task_struct { ...@@ -114,12 +115,15 @@ struct embb_mtapi_network_task_struct {
typedef struct embb_mtapi_network_task_struct embb_mtapi_network_task_t; typedef struct embb_mtapi_network_task_struct embb_mtapi_network_task_t;
static void embb_mtapi_network_task_failure(
) {
}
static void embb_mtapi_network_task_complete( static void embb_mtapi_network_task_complete(
MTAPI_IN mtapi_task_hndl_t task, MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) { MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
int err;
EMBB_UNUSED_IN_RELEASE(err);
if (embb_mtapi_node_is_initialized()) { if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
...@@ -147,31 +151,37 @@ static void embb_mtapi_network_task_complete( ...@@ -147,31 +151,37 @@ static void embb_mtapi_network_task_complete(
// actual counts bytes actually put into the buffer // actual counts bytes actually put into the buffer
int actual = 0; int actual = 0;
// expected counts bytes we intended to put into the buffer // expected counts bytes we intended to put into the buffer
int expected = 0; int expected =
1 + // operation
4 + 4 + // remote task handle
4 + // status
4 + (int)local_task->result_size; // result buffer
// packet size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, expected);
expected += 4;
// operation is "return result" // operation is "return result"
actual += embb_mtapi_network_buffer_push_back_int8( actual += embb_mtapi_network_buffer_push_back_int8(
send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT); send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT);
expected += 1;
// remote task id // remote task id
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, network_task->remote_task_id); send_buf, network_task->remote_task_id);
expected += 4;
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, network_task->remote_task_tag); send_buf, network_task->remote_task_tag);
expected += 4;
// status // status
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->error_code); send_buf, local_task->error_code);
expected += 4;
// result size // result size
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->result_size); send_buf, (int32_t)local_task->result_size);
expected += 4;
actual += embb_mtapi_network_buffer_push_back_rawdata( actual += embb_mtapi_network_buffer_push_back_rawdata(
send_buf, (int32_t)local_task->result_size, send_buf, (int32_t)local_task->result_size,
local_task->result_buffer); local_task->result_buffer);
expected += (int)local_task->result_size;
if (expected == actual) { if (expected == actual) {
int sent = embb_mtapi_network_socket_sendbuffer( int sent = embb_mtapi_network_socket_sendbuffer(
...@@ -194,9 +204,31 @@ static void embb_mtapi_network_task_complete( ...@@ -194,9 +204,31 @@ static void embb_mtapi_network_task_complete(
mtapi_status_set(status, local_status); mtapi_status_set(status, local_status);
} }
static void embb_mtapi_network_return_failure(
int32_t remote_task_id,
int32_t remote_task_tag,
mtapi_status_t status,
embb_mtapi_network_socket_t * socket,
embb_mtapi_network_buffer_t * buffer)
{
embb_mtapi_network_buffer_clear(buffer);
embb_mtapi_network_buffer_push_back_int32(
buffer, 12);
embb_mtapi_network_buffer_push_back_int32(
buffer, remote_task_id);
embb_mtapi_network_buffer_push_back_int32(
buffer, remote_task_tag);
embb_mtapi_network_buffer_push_back_int32(
buffer, (int32_t)status);
embb_mtapi_network_socket_sendbuffer(
socket, buffer);
}
static mtapi_status_t embb_mtapi_network_handle_start_task( static mtapi_status_t embb_mtapi_network_handle_start_task(
embb_mtapi_network_socket_t * socket, embb_mtapi_network_socket_t * socket,
embb_mtapi_network_buffer_t * buffer) { embb_mtapi_network_buffer_t * buffer,
int packet_size) {
int32_t domain_id; int32_t domain_id;
int32_t job_id; int32_t job_id;
...@@ -205,22 +237,18 @@ static mtapi_status_t embb_mtapi_network_handle_start_task( ...@@ -205,22 +237,18 @@ static mtapi_status_t embb_mtapi_network_handle_start_task(
int err; int err;
int32_t arguments_size; int32_t arguments_size;
int32_t remote_task_id;
int32_t remote_task_tag;
mtapi_uint_t priority = 0; mtapi_uint_t priority = 0;
mtapi_job_hndl_t job_hndl; mtapi_job_hndl_t job_hndl;
mtapi_task_attributes_t task_attr; mtapi_task_attributes_t task_attr;
void * arguments; void * arguments;
mtapi_task_complete_function_t func = embb_mtapi_network_task_complete; mtapi_task_complete_function_t func = embb_mtapi_network_task_complete;
void * func_void; void * func_void;
embb_mtapi_network_task_t * network_task =
(embb_mtapi_network_task_t*)embb_alloc(
sizeof(embb_mtapi_network_task_t));
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
err = embb_mtapi_network_socket_recvbuffer_sized( // check if we have at least 28 bytes
socket, buffer, 28); if (packet_size >= 28) {
// check if we really got 28 bytes
if (err == 28) {
// domain id // domain id
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &domain_id); err = embb_mtapi_network_buffer_pop_front_int32(buffer, &domain_id);
...@@ -234,78 +262,98 @@ static mtapi_status_t embb_mtapi_network_handle_start_task( ...@@ -234,78 +262,98 @@ static mtapi_status_t embb_mtapi_network_handle_start_task(
assert(err == 4); assert(err == 4);
// remote task handle // remote task handle
err = embb_mtapi_network_buffer_pop_front_int32( err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &network_task->remote_task_id); buffer, &remote_task_id);
assert(err == 4); assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32( err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &network_task->remote_task_tag); buffer, &remote_task_tag);
assert(err == 4); assert(err == 4);
// result size // result size
err = embb_mtapi_network_buffer_pop_front_int32(buffer, err = embb_mtapi_network_buffer_pop_front_int32(buffer,
&results_size); &results_size);
assert(err == 4); assert(err == 4);
results = embb_alloc((size_t)results_size);
assert(results != NULL);
// arguments size // arguments size
embb_mtapi_network_buffer_pop_front_int32(buffer, &arguments_size); err = embb_mtapi_network_buffer_pop_front_int32(buffer, &arguments_size);
assert(err == 4); assert(err == 4);
arguments = embb_alloc((size_t)arguments_size);
assert(arguments != NULL);
embb_mtapi_network_buffer_clear(buffer);
// arguments
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, buffer, arguments_size);
assert(err == arguments_size);
err = embb_mtapi_network_buffer_pop_front_rawdata(
buffer, arguments_size, arguments);
assert(err == arguments_size);
embb_mtapi_network_buffer_clear(buffer);
network_task->socket = *socket;
mtapi_taskattr_init(&task_attr, &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA,
(void*)network_task, 0, &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_boolean_t task_detached = MTAPI_TRUE;
mtapi_taskattr_set(&task_attr, MTAPI_TASK_DETACHED,
(void*)&task_detached, sizeof(mtapi_boolean_t), &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_taskattr_set(&task_attr, MTAPI_TASK_PRIORITY,
(void*)&priority, sizeof(mtapi_uint_t), &local_status);
assert(local_status == MTAPI_SUCCESS);
memcpy(&func_void, &func, sizeof(void*));
mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION,
func_void, 0, &local_status);
assert(local_status == MTAPI_SUCCESS);
job_hndl = mtapi_job_get((mtapi_job_id_t)job_id,
(mtapi_domain_t)domain_id, &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_task_start(
MTAPI_TASK_ID_NONE, job_hndl,
arguments, (mtapi_size_t)arguments_size,
results, (mtapi_size_t)results_size,
&task_attr, MTAPI_GROUP_NONE,
&local_status);
assert(local_status == MTAPI_SUCCESS);
// send back result of task creation
//embb_mtapi_network_buffer_push_back_int32(
// &buffer, local_status);
//embb_mtapi_network_socket_sendbuffer(
// socket, &buffer);
}
embb_mtapi_network_buffer_clear(buffer); embb_mtapi_network_task_t * network_task =
(embb_mtapi_network_task_t*)embb_alloc(
sizeof(embb_mtapi_network_task_t));
if (network_task == NULL) {
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN,
socket, buffer);
return MTAPI_ERR_UNKNOWN;
}
network_task->remote_task_id = remote_task_id;
network_task->remote_task_tag = remote_task_tag;
// check packet_size again
if (packet_size == 28 + arguments_size) {
// allocate buffers
results = embb_alloc((size_t)results_size);
if (results == NULL) {
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN,
socket, buffer);
return MTAPI_ERR_UNKNOWN;
}
arguments = embb_alloc((size_t)arguments_size);
if (arguments == NULL) {
embb_free(results);
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN,
socket, buffer);
return MTAPI_ERR_UNKNOWN;
}
// arguments
err = embb_mtapi_network_buffer_pop_front_rawdata(
buffer, arguments_size, arguments);
assert(err == arguments_size);
network_task->socket = *socket;
mtapi_taskattr_init(&task_attr, &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA,
(void*)network_task, 0, &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_boolean_t task_detached = MTAPI_TRUE;
mtapi_taskattr_set(&task_attr, MTAPI_TASK_DETACHED,
(void*)&task_detached, sizeof(mtapi_boolean_t), &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_taskattr_set(&task_attr, MTAPI_TASK_PRIORITY,
(void*)&priority, sizeof(mtapi_uint_t), &local_status);
assert(local_status == MTAPI_SUCCESS);
memcpy(&func_void, &func, sizeof(void*));
mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION,
func_void, 0, &local_status);
assert(local_status == MTAPI_SUCCESS);
job_hndl = mtapi_job_get((mtapi_job_id_t)job_id,
(mtapi_domain_t)domain_id, &local_status);
if (local_status == MTAPI_SUCCESS) {
mtapi_task_start(
MTAPI_TASK_ID_NONE, job_hndl,
arguments, (mtapi_size_t)arguments_size,
results, (mtapi_size_t)results_size,
&task_attr, MTAPI_GROUP_NONE,
&local_status);
}
if (local_status != MTAPI_SUCCESS) {
embb_free(arguments);
embb_free(results);
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, local_status, socket, buffer);
}
}
}
return local_status; return local_status;
} }
static mtapi_status_t embb_mtapi_network_handle_return_result( static mtapi_status_t embb_mtapi_network_handle_return_result(
embb_mtapi_network_socket_t * socket, embb_mtapi_network_buffer_t * buffer,
embb_mtapi_network_buffer_t * buffer) { int packet_size) {
int task_status; int task_status;
int task_id; int task_id;
...@@ -319,11 +367,8 @@ static mtapi_status_t embb_mtapi_network_handle_return_result( ...@@ -319,11 +367,8 @@ static mtapi_status_t embb_mtapi_network_handle_return_result(
embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_task_hndl_t task; mtapi_task_hndl_t task;
err = embb_mtapi_network_socket_recvbuffer_sized( // do we have at least 16 bytes?
socket, buffer, 16); if (packet_size >= 16) {
// did we really receive 16 bytes?
if (err == 16) {
// local task id // local task id
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id); err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id);
...@@ -339,14 +384,8 @@ static mtapi_status_t embb_mtapi_network_handle_return_result( ...@@ -339,14 +384,8 @@ static mtapi_status_t embb_mtapi_network_handle_return_result(
buffer, &results_size); buffer, &results_size);
assert(err == 4); assert(err == 4);
embb_mtapi_network_buffer_clear(buffer); // check packet_size again
if (packet_size == 16 + results_size) {
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, buffer, results_size);
// did we get the whole resultbuffer?
if (err == results_size) {
task.id = (mtapi_task_id_t)task_id; task.id = (mtapi_task_id_t)task_id;
task.tag = (mtapi_uint_t)task_tag; task.tag = (mtapi_uint_t)task_tag;
...@@ -391,7 +430,71 @@ static mtapi_status_t embb_mtapi_network_handle_return_result( ...@@ -391,7 +430,71 @@ static mtapi_status_t embb_mtapi_network_handle_return_result(
} }
} }
embb_mtapi_network_buffer_clear(buffer); return local_status;
}
static mtapi_status_t embb_mtapi_network_handle_return_failure(
embb_mtapi_network_buffer_t * buffer,
int packet_size) {
int task_status;
int task_id;
int task_tag;
int err;
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_task_hndl_t task;
// do we have 12 bytes?
if (packet_size == 12) {
// local task id
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_tag);
assert(err == 4);
// task status
err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &task_status);
assert(err == 4);
task.id = (mtapi_task_id_t)task_id;
task.tag = (mtapi_uint_t)task_tag;
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t * local_task =
embb_mtapi_task_pool_get_storage_for_handle(
node->task_pool, task);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, local_task->action)) {
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, local_task->action);
local_task->error_code = (mtapi_status_t)task_status;
embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR);
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, local_task->group)) {
embb_mtapi_group_t* local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, local_task->group);
embb_mtapi_task_queue_push(&local_group->queue, local_task);
}
local_status = MTAPI_SUCCESS;
}
}
}
}
return local_status; return local_status;
} }
...@@ -418,32 +521,46 @@ static int embb_mtapi_network_thread(void * args) { ...@@ -418,32 +521,46 @@ static int embb_mtapi_network_thread(void * args) {
} }
} else if (0 < err) { } else if (0 < err) {
int8_t operation; int8_t operation;
int32_t packet_size;
embb_mtapi_network_socket_t * socket = &plugin->sockets[err]; embb_mtapi_network_socket_t * socket = &plugin->sockets[err];
embb_mtapi_network_buffer_clear(buffer); embb_mtapi_network_buffer_clear(buffer);
err = embb_mtapi_network_socket_recvbuffer_sized( err = embb_mtapi_network_socket_recvbuffer_sized(
socket, buffer, 1); socket, buffer, 4);
// did we receive one byte? if (err == 4) {
if (err == 1) { err = embb_mtapi_network_buffer_pop_front_int32(
err = embb_mtapi_network_buffer_pop_front_int8( buffer, &packet_size);
buffer, &operation); assert(err == 4);
assert(err == 1);
embb_mtapi_network_buffer_clear(buffer); embb_mtapi_network_buffer_clear(buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
switch (operation) { socket, buffer, packet_size);
if (err == packet_size) {
err = embb_mtapi_network_buffer_pop_front_int8(
buffer, &operation);
assert(err == 1);
packet_size--;
switch (operation) {
case EMBB_MTAPI_NETWORK_START_TASK: case EMBB_MTAPI_NETWORK_START_TASK:
embb_mtapi_network_handle_start_task(socket, buffer); embb_mtapi_network_handle_start_task(socket, buffer, packet_size);
break; break;
case EMBB_MTAPI_NETWORK_RETURN_RESULT: case EMBB_MTAPI_NETWORK_RETURN_RESULT:
embb_mtapi_network_handle_return_result(socket, buffer); embb_mtapi_network_handle_return_result(buffer, packet_size);
break;
case EMBB_MTAPI_NETWORK_RETURN_FAILURE:
embb_mtapi_network_handle_return_failure(buffer, packet_size);
break; break;
default: default:
// eh? // eh?
break; break;
}
} }
} }
embb_mtapi_network_buffer_clear(buffer);
} }
} }
...@@ -610,42 +727,51 @@ static void network_task_start( ...@@ -610,42 +727,51 @@ static void network_task_start(
// actual counts bytes actually put into the buffer // actual counts bytes actually put into the buffer
int actual = 0; int actual = 0;
// expected counts bytes we intended to put into the buffer // expected counts bytes we intended to put into the buffer
int expected = 0; int expected =
1 + // operation
4 + // domain_id
4 + // job_id
4 + // priority
4 + 4 + // task handle
4 + // result_size
4 + local_task->arguments_size; // arguments buffer
// packet size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)expected);
expected += 4;
// operation is "start task" // operation is "start task"
actual += embb_mtapi_network_buffer_push_back_int8( actual += embb_mtapi_network_buffer_push_back_int8(
send_buf, EMBB_MTAPI_NETWORK_START_TASK); send_buf, EMBB_MTAPI_NETWORK_START_TASK);
expected += 1;
// domain_id
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)network_action->domain_id); send_buf, (int32_t)network_action->domain_id);
expected += 4;
// job_id
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)network_action->job_id); send_buf, (int32_t)network_action->job_id);
expected += 4;
// priority
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->attributes.priority); send_buf, (int32_t)local_task->attributes.priority);
expected += 4;
// task handle
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.id); send_buf, (int32_t)local_task->handle.id);
expected += 4;
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.tag); send_buf, (int32_t)local_task->handle.tag);
expected += 4;
// result size
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->result_size); send_buf, (int32_t)local_task->result_size);
expected += 4;
// arguments buffer
actual += embb_mtapi_network_buffer_push_back_int32( actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->arguments_size); send_buf, (int32_t)local_task->arguments_size);
expected += 4;
actual += embb_mtapi_network_buffer_push_back_rawdata( actual += embb_mtapi_network_buffer_push_back_rawdata(
send_buf, (int32_t)local_task->arguments_size, local_task->arguments); send_buf, (int32_t)local_task->arguments_size, local_task->arguments);
expected += (int)local_task->arguments_size;
// check if everything fit into the buffer // check if everything fit into the buffer
if (actual == expected) { if (actual == expected) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment