Commit 2a8b92ad by Marcus Winter

mtapi_network_c: assert state after every operation for debugging

parent 3f458c34
...@@ -43,6 +43,8 @@ ...@@ -43,6 +43,8 @@
#include <embb_mtapi_group_t.h> #include <embb_mtapi_group_t.h>
#include <mtapi_status_t.h> #include <mtapi_status_t.h>
#include <assert.h>
int embb_mtapi_network_initialize() { int embb_mtapi_network_initialize() {
#ifdef _WIN32 #ifdef _WIN32
WORD ver_request; WORD ver_request;
...@@ -138,22 +140,29 @@ static void embb_mtapi_network_task_complete( ...@@ -138,22 +140,29 @@ static void embb_mtapi_network_task_complete(
// operation is "return result" // operation is "return result"
err = embb_mtapi_network_buffer_push_back_int8( err = embb_mtapi_network_buffer_push_back_int8(
&send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT); &send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT);
assert(err == 1);
// remote task id // remote task id
err = embb_mtapi_network_buffer_push_back_int32( err = embb_mtapi_network_buffer_push_back_int32(
&send_buf, network_task->remote_task_id); &send_buf, network_task->remote_task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32( err = embb_mtapi_network_buffer_push_back_int32(
&send_buf, network_task->remote_task_tag); &send_buf, network_task->remote_task_tag);
assert(err == 4);
// status // status
err = embb_mtapi_network_buffer_push_back_int32( err = embb_mtapi_network_buffer_push_back_int32(
&send_buf, local_task->error_code); &send_buf, local_task->error_code);
assert(err == 4);
// result size // result size
err = embb_mtapi_network_buffer_push_back_int32( err = embb_mtapi_network_buffer_push_back_int32(
&send_buf, local_task->result_size); &send_buf, local_task->result_size);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_rawdata( err = embb_mtapi_network_buffer_push_back_rawdata(
&send_buf, local_task->result_size, local_task->result_buffer); &send_buf, local_task->result_size, local_task->result_buffer);
assert(err == local_task->result_size);
err = embb_mtapi_network_socket_sendbuffer( err = embb_mtapi_network_socket_sendbuffer(
&network_task->socket, &send_buf); &network_task->socket, &send_buf);
assert(err == send_buf.size);
// sending done, free the buffer // sending done, free the buffer
embb_mtapi_network_buffer_finalize(&send_buf); embb_mtapi_network_buffer_finalize(&send_buf);
...@@ -200,10 +209,14 @@ static int embb_mtapi_network_thread(void * args) { ...@@ -200,10 +209,14 @@ static int embb_mtapi_network_thread(void * args) {
int8_t operation; int8_t operation;
embb_mtapi_network_socket_t * socket = &plugin->sockets[err]; embb_mtapi_network_socket_t * socket = &plugin->sockets[err];
embb_mtapi_network_buffer_clear(&buffer);
err = embb_mtapi_network_socket_recvbuffer_sized( err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 1); socket, &buffer, 1);
embb_mtapi_network_buffer_pop_front_int8( assert(err == 1);
err = embb_mtapi_network_buffer_pop_front_int8(
&buffer, &operation); &buffer, &operation);
assert(err == 1);
embb_mtapi_network_buffer_clear(&buffer); embb_mtapi_network_buffer_clear(&buffer);
...@@ -222,29 +235,41 @@ static int embb_mtapi_network_thread(void * args) { ...@@ -222,29 +235,41 @@ static int embb_mtapi_network_thread(void * args) {
err = embb_mtapi_network_socket_recvbuffer_sized( err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 24); socket, &buffer, 24);
assert(err == 24);
// domain id // domain id
embb_mtapi_network_buffer_pop_front_int32(&buffer, &domain_id); err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &domain_id);
assert(err == 4);
// job id // job id
embb_mtapi_network_buffer_pop_front_int32(&buffer, &job_id); err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &job_id);
assert(err == 4);
// remote task handle // remote task handle
embb_mtapi_network_buffer_pop_front_int32( err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &network_task->remote_task_id); &buffer, &network_task->remote_task_id);
embb_mtapi_network_buffer_pop_front_int32( assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &network_task->remote_task_tag); &buffer, &network_task->remote_task_tag);
assert(err == 4);
// result size // result size
embb_mtapi_network_buffer_pop_front_int32(&buffer, &results_size); err = embb_mtapi_network_buffer_pop_front_int32(&buffer,
&results_size);
assert(err == 4);
results = embb_alloc(results_size); results = embb_alloc(results_size);
assert(results != NULL);
// arguments size // arguments size
embb_mtapi_network_buffer_pop_front_int32(&buffer, &arguments_size); embb_mtapi_network_buffer_pop_front_int32(&buffer, &arguments_size);
assert(err == 4);
arguments = embb_alloc(arguments_size); arguments = embb_alloc(arguments_size);
assert(arguments != NULL);
embb_mtapi_network_buffer_clear(&buffer); embb_mtapi_network_buffer_clear(&buffer);
// arguments // arguments
err = embb_mtapi_network_socket_recvbuffer_sized( err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, arguments_size); socket, &buffer, arguments_size);
embb_mtapi_network_buffer_pop_front_rawdata( assert(err == arguments_size);
err = embb_mtapi_network_buffer_pop_front_rawdata(
&buffer, arguments_size, arguments); &buffer, arguments_size, arguments);
assert(err == arguments_size);
embb_mtapi_network_buffer_clear(&buffer); embb_mtapi_network_buffer_clear(&buffer);
...@@ -252,16 +277,20 @@ static int embb_mtapi_network_thread(void * args) { ...@@ -252,16 +277,20 @@ static int embb_mtapi_network_thread(void * args) {
mtapi_taskattr_init(&task_attr, &local_status); mtapi_taskattr_init(&task_attr, &local_status);
mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA, mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA,
(void*)network_task, 0, &local_status); (void*)network_task, 0, &local_status);
assert(local_status == MTAPI_SUCCESS);
memcpy(&func_void, &func, sizeof(void*)); memcpy(&func_void, &func, sizeof(void*));
mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION, mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION,
func_void, 0, &local_status); func_void, 0, &local_status);
assert(local_status == MTAPI_SUCCESS);
job_hndl = mtapi_job_get(job_id, domain_id, &local_status); job_hndl = mtapi_job_get(job_id, domain_id, &local_status);
assert(local_status == MTAPI_SUCCESS);
task_hndl = mtapi_task_start( task_hndl = mtapi_task_start(
MTAPI_TASK_ID_NONE, job_hndl, MTAPI_TASK_ID_NONE, job_hndl,
arguments, arguments_size, arguments, arguments_size,
results, results_size, results, results_size,
&task_attr, MTAPI_GROUP_NONE, &task_attr, MTAPI_GROUP_NONE,
&local_status); &local_status);
assert(local_status == MTAPI_SUCCESS);
// send back result of task creation // send back result of task creation
//embb_mtapi_network_buffer_push_back_int32( //embb_mtapi_network_buffer_push_back_int32(
...@@ -275,26 +304,34 @@ static int embb_mtapi_network_thread(void * args) { ...@@ -275,26 +304,34 @@ static int embb_mtapi_network_thread(void * args) {
int task_id; int task_id;
int task_tag; int task_tag;
embb_mtapi_network_buffer_clear(&buffer);
if (embb_mtapi_node_is_initialized()) { if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance(); embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_task_hndl_t task; mtapi_task_hndl_t task;
err = embb_mtapi_network_socket_recvbuffer_sized( err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 16); socket, &buffer, 16);
assert(err == 16);
// local task id // local task id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_id); err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_tag); err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_tag);
assert(err == 4);
// task status // task status
err = embb_mtapi_network_buffer_pop_front_int32( err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &task_status); &buffer, &task_status);
assert(err == 4);
// result size // result size
err = embb_mtapi_network_buffer_pop_front_int32( err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &results_size); &buffer, &results_size);
assert(err == 4);
embb_mtapi_network_buffer_clear(&buffer); embb_mtapi_network_buffer_clear(&buffer);
err = embb_mtapi_network_socket_recvbuffer_sized( err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, results_size); socket, &buffer, results_size);
assert(err == results_size);
task.id = task_id; task.id = task_id;
task.tag = task_tag; task.tag = task_tag;
...@@ -316,6 +353,7 @@ static int embb_mtapi_network_thread(void * args) { ...@@ -316,6 +353,7 @@ static int embb_mtapi_network_thread(void * args) {
err = embb_mtapi_network_buffer_pop_front_rawdata( err = embb_mtapi_network_buffer_pop_front_rawdata(
&buffer, results_size, local_task->result_buffer); &buffer, results_size, local_task->result_buffer);
assert(err == results_size);
local_task->error_code = task_status; local_task->error_code = task_status;
local_task->state = MTAPI_TASK_COMPLETED; local_task->state = MTAPI_TASK_COMPLETED;
...@@ -352,23 +390,30 @@ void mtapi_network_plugin_initialize( ...@@ -352,23 +390,30 @@ void mtapi_network_plugin_initialize(
int err; int err;
err = embb_mtapi_network_initialize(); err = embb_mtapi_network_initialize();
embb_atomic_store_int(&plugin->run, 1); if (err) {
embb_atomic_store_int(&plugin->run, 1);
plugin->socket_count = 1; plugin->buffer_size = buffer_size;
// 1 listening socket and max_connections connections
// (2 sockets each if local) plugin->socket_count = 1;
plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc( // 1 listening socket and max_connections connections
sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2)); // (2 sockets each if local)
plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc(
err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]); sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2));
err = embb_mtapi_network_socket_bind_and_listen(
&plugin->sockets[0], host, port, max_connections); if (NULL != plugin->sockets) {
plugin->buffer_size = buffer_size; err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]);
if (err) {
err = embb_thread_create( err = embb_mtapi_network_socket_bind_and_listen(
&plugin->thread, NULL, embb_mtapi_network_thread, NULL); &plugin->sockets[0], host, port, max_connections);
if (EMBB_SUCCESS == err) { if (err) {
local_status = MTAPI_SUCCESS; err = embb_thread_create(
&plugin->thread, NULL, embb_mtapi_network_thread, NULL);
if (EMBB_SUCCESS == err) {
local_status = MTAPI_SUCCESS;
}
}
}
}
} }
mtapi_status_set(status, local_status); mtapi_status_set(status, local_status);
...@@ -419,28 +464,37 @@ static void network_task_start( ...@@ -419,28 +464,37 @@ static void network_task_start(
// operation is "start task" // operation is "start task"
err = embb_mtapi_network_buffer_push_back_int8( err = embb_mtapi_network_buffer_push_back_int8(
send_buf, EMBB_MTAPI_NETWORK_START_TASK); send_buf, EMBB_MTAPI_NETWORK_START_TASK);
assert(err == 1);
err = embb_mtapi_network_buffer_push_back_int32( err = embb_mtapi_network_buffer_push_back_int32(
send_buf, network_action->domain_id); send_buf, network_action->domain_id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32( err = embb_mtapi_network_buffer_push_back_int32(
send_buf, network_action->job_id); send_buf, network_action->job_id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32( err = embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->handle.id); send_buf, local_task->handle.id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32( err = embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->handle.tag); send_buf, local_task->handle.tag);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32( err = embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->result_size); send_buf, local_task->result_size);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32( err = embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->arguments_size); send_buf, local_task->arguments_size);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_rawdata( err = embb_mtapi_network_buffer_push_back_rawdata(
send_buf, local_task->arguments_size, local_task->arguments); send_buf, local_task->arguments_size, local_task->arguments);
assert(err == local_task->arguments_size);
err = embb_mtapi_network_socket_sendbuffer( err = embb_mtapi_network_socket_sendbuffer(
&network_action->socket, send_buf); &network_action->socket, send_buf);
assert(err == send_buf->size);
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
local_task->state = MTAPI_TASK_RUNNING; local_task->state = MTAPI_TASK_RUNNING;
...@@ -509,6 +563,7 @@ mtapi_action_hndl_t mtapi_network_action_create( ...@@ -509,6 +563,7 @@ mtapi_action_hndl_t mtapi_network_action_create(
mtapi_action_hndl_t action_hndl = { 0, 0 }; mtapi_action_hndl_t action_hndl = { 0, 0 };
int err; int err;
// TODO: check action for allocation failure
action->domain_id = domain_id; action->domain_id = domain_id;
action->job_id = remote_job_id; action->job_id = remote_job_id;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment