Commit d83a8571 by Marcus Winter

Merge branch 'embb413_network_plugin_improvements' into development

parents a7e94398 a5832fc1
......@@ -40,6 +40,7 @@
#include <embb_mtapi_action_t.h>
#include <embb_mtapi_alloc.h>
#include <embb_mtapi_queue_t.h>
#include <embb_mtapi_group_t.h>
/* ---- CLASS MEMBERS ------------------------------------------------------ */
......@@ -293,6 +294,8 @@ int embb_mtapi_scheduler_worker(void * arg) {
/* check if there was work */
if (MTAPI_NULL != task) {
embb_mtapi_queue_t * local_queue = MTAPI_NULL;
embb_mtapi_group_t * local_group = MTAPI_NULL;
embb_mtapi_action_t * local_action = MTAPI_NULL;
/* is task associated with a queue? */
if (embb_mtapi_queue_pool_is_handle_valid(
......@@ -302,6 +305,21 @@ int embb_mtapi_scheduler_worker(void * arg) {
node->queue_pool, task->queue);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, task->group)) {
local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, task->group);
}
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
}
switch (embb_atomic_load_int(&task->state)) {
case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
......@@ -328,7 +346,7 @@ int embb_mtapi_scheduler_worker(void * arg) {
break;
case MTAPI_TASK_CANCELLED:
/* set return value to canceled */
/* set return value to cancelled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
......@@ -336,6 +354,12 @@ int embb_mtapi_scheduler_worker(void * arg) {
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
if (MTAPI_NULL != local_group) {
embb_mtapi_task_queue_push(&local_group->queue, task);
}
}
if (MTAPI_NULL != local_action) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
break;
......
......@@ -501,7 +501,6 @@ void mtapi_task_cancel(
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t* local_task =
embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
/* call plugin action cancel function */
if (embb_mtapi_action_pool_is_handle_valid(
......@@ -511,8 +510,14 @@ void mtapi_task_cancel(
node->action_pool, local_task->action);
if (local_action->is_plugin_action) {
local_action->plugin_task_cancel_function(task, &local_status);
} else {
embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
local_task->error_code = MTAPI_ERR_ACTION_CANCELLED;
local_status = MTAPI_SUCCESS;
}
} else {
embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
local_task->error_code = MTAPI_ERR_ACTION_CANCELLED;
local_status = MTAPI_SUCCESS;
}
} else {
......
......@@ -71,8 +71,10 @@ void embb_mtapi_network_finalize() {
}
enum embb_mtapi_network_operation_enum {
EMBB_MTAPI_NETWORK_START_TASK,
EMBB_MTAPI_NETWORK_RETURN_RESULT
EMBB_MTAPI_NETWORK_START_TASK = 0x01AFFE01,
EMBB_MTAPI_NETWORK_RETURN_RESULT = 0x02AFFE02,
EMBB_MTAPI_NETWORK_RETURN_FAILURE = 0x03AFFE03,
EMBB_MTAPI_NETWORK_CANCEL_TASK = 0x04AFFE04
};
struct embb_mtapi_network_plugin_struct {
......@@ -84,6 +86,8 @@ struct embb_mtapi_network_plugin_struct {
embb_mutex_t send_mutex;
embb_mtapi_network_buffer_t send_buffer;
embb_mtapi_network_buffer_t recv_buffer;
};
typedef struct embb_mtapi_network_plugin_struct embb_mtapi_network_plugin_t;
......@@ -112,12 +116,41 @@ struct embb_mtapi_network_task_struct {
typedef struct embb_mtapi_network_task_struct embb_mtapi_network_task_t;
static void embb_mtapi_network_return_failure(
int32_t remote_task_id,
int32_t remote_task_tag,
mtapi_status_t status,
embb_mtapi_network_socket_t * socket,
embb_mtapi_network_buffer_t * buffer)
{
embb_mtapi_network_buffer_clear(buffer);
// packet size
embb_mtapi_network_buffer_push_back_int32(
buffer, 16);
// operation
embb_mtapi_network_buffer_push_back_int32(
buffer, EMBB_MTAPI_NETWORK_RETURN_FAILURE);
// task handle
embb_mtapi_network_buffer_push_back_int32(
buffer, remote_task_id);
embb_mtapi_network_buffer_push_back_int32(
buffer, remote_task_tag);
// status
embb_mtapi_network_buffer_push_back_int32(
buffer, (int32_t)status);
embb_mtapi_network_socket_sendbuffer(
socket, buffer);
}
static void embb_mtapi_network_task_complete(
MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
int err;
EMBB_UNUSED_IN_RELEASE(err);
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
......@@ -138,37 +171,69 @@ static void embb_mtapi_network_task_complete(
(embb_mtapi_network_task_t*)local_task->attributes.user_data;
embb_mtapi_network_buffer_t * send_buf = &plugin->send_buffer;
embb_atomic_memory_barrier();
local_task->attributes.complete_func = NULL;
embb_atomic_memory_barrier();
// serialize sending of results
embb_mutex_lock(&plugin->send_mutex);
embb_mtapi_network_buffer_clear(send_buf);
// operation is "return result"
err = embb_mtapi_network_buffer_push_back_int8(
send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT);
assert(err == 1);
// remote task id
err = embb_mtapi_network_buffer_push_back_int32(
send_buf, network_task->remote_task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
send_buf, network_task->remote_task_tag);
assert(err == 4);
// status
err = embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->error_code);
assert(err == 4);
// result size
err = embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->result_size);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_rawdata(
send_buf, (int32_t)local_task->result_size,
local_task->result_buffer);
assert(err == (int)local_task->result_size);
if (local_task->error_code == MTAPI_SUCCESS) {
// actual counts bytes actually put into the buffer
int actual = 0;
// expected counts bytes we intended to put into the buffer
int expected =
4 + // operation
4 + 4 + // remote task handle
4 + // status
4 + (int)local_task->result_size; // result buffer
// packet size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, expected);
expected += 4;
// operation is "return result"
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT);
// remote task id
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, network_task->remote_task_id);
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, network_task->remote_task_tag);
// status
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->error_code);
err = embb_mtapi_network_socket_sendbuffer(
&network_task->socket, send_buf);
assert(err == send_buf->size);
// result size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->result_size);
actual += embb_mtapi_network_buffer_push_back_rawdata(
send_buf, (int32_t)local_task->result_size,
local_task->result_buffer);
if (expected == actual) {
int sent = embb_mtapi_network_socket_sendbuffer(
&network_task->socket, send_buf);
assert(sent == send_buf->size);
}
else {
embb_mtapi_network_return_failure(
network_task->remote_task_id,
network_task->remote_task_tag,
MTAPI_ERR_UNKNOWN,
&network_task->socket, send_buf);
}
} else {
embb_mtapi_network_return_failure(
network_task->remote_task_id,
network_task->remote_task_tag,
local_task->error_code,
&network_task->socket, send_buf);
}
// sending done
embb_mutex_unlock(&plugin->send_mutex);
......@@ -177,6 +242,14 @@ static void embb_mtapi_network_task_complete(
embb_free((void*)local_task->arguments);
embb_free(local_task->result_buffer);
void * data = local_task->attributes.user_data;
embb_atomic_memory_barrier();
local_task->attributes.user_data = NULL;
embb_atomic_memory_barrier();
embb_free(data);
local_status = MTAPI_SUCCESS;
}
}
......@@ -185,15 +258,333 @@ static void embb_mtapi_network_task_complete(
mtapi_status_set(status, local_status);
}
static mtapi_status_t embb_mtapi_network_handle_start_task(
embb_mtapi_network_socket_t * socket,
embb_mtapi_network_buffer_t * buffer,
int packet_size) {
int32_t domain_id;
int32_t job_id;
int32_t results_size;
void * results;
int err;
int32_t arguments_size;
int32_t remote_task_id;
int32_t remote_task_tag;
mtapi_uint_t priority = 0;
mtapi_job_hndl_t job_hndl;
mtapi_task_attributes_t task_attr;
void * arguments;
mtapi_task_complete_function_t func = embb_mtapi_network_task_complete;
void * func_void;
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
// check if we have at least 28 bytes
if (packet_size >= 28) {
// domain id
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &domain_id);
assert(err == 4);
// job id
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &job_id);
assert(err == 4);
// priority
err = embb_mtapi_network_buffer_pop_front_int32(
buffer, (int32_t*)&priority);
assert(err == 4);
// remote task handle
err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &remote_task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &remote_task_tag);
assert(err == 4);
// result size
err = embb_mtapi_network_buffer_pop_front_int32(buffer,
&results_size);
assert(err == 4);
// arguments size
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &arguments_size);
assert(err == 4);
embb_mtapi_network_task_t * network_task =
(embb_mtapi_network_task_t*)embb_alloc(
sizeof(embb_mtapi_network_task_t));
if (network_task == NULL) {
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN,
socket, buffer);
return MTAPI_ERR_UNKNOWN;
}
network_task->remote_task_id = remote_task_id;
network_task->remote_task_tag = remote_task_tag;
// check packet_size again
if (packet_size == 28 + arguments_size) {
// allocate buffers
results = embb_alloc((size_t)results_size);
if (results == NULL) {
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN,
socket, buffer);
return MTAPI_ERR_UNKNOWN;
}
arguments = embb_alloc((size_t)arguments_size);
if (arguments == NULL) {
embb_free(results);
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN,
socket, buffer);
return MTAPI_ERR_UNKNOWN;
}
// arguments
err = embb_mtapi_network_buffer_pop_front_rawdata(
buffer, arguments_size, arguments);
assert(err == arguments_size);
network_task->socket = *socket;
mtapi_taskattr_init(&task_attr, &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA,
(void*)network_task, 0, &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_boolean_t task_detached = MTAPI_TRUE;
mtapi_taskattr_set(&task_attr, MTAPI_TASK_DETACHED,
(void*)&task_detached, sizeof(mtapi_boolean_t), &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_taskattr_set(&task_attr, MTAPI_TASK_PRIORITY,
(void*)&priority, sizeof(mtapi_uint_t), &local_status);
assert(local_status == MTAPI_SUCCESS);
memcpy(&func_void, &func, sizeof(void*));
mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION,
func_void, 0, &local_status);
assert(local_status == MTAPI_SUCCESS);
job_hndl = mtapi_job_get((mtapi_job_id_t)job_id,
(mtapi_domain_t)domain_id, &local_status);
if (local_status == MTAPI_SUCCESS) {
mtapi_task_start(
MTAPI_TASK_ID_NONE, job_hndl,
arguments, (mtapi_size_t)arguments_size,
results, (mtapi_size_t)results_size,
&task_attr, MTAPI_GROUP_NONE,
&local_status);
}
if (local_status != MTAPI_SUCCESS) {
embb_free(arguments);
embb_free(results);
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, local_status, socket, buffer);
}
}
}
return local_status;
}
static mtapi_status_t embb_mtapi_network_handle_return_result(
embb_mtapi_network_buffer_t * buffer,
int packet_size) {
int32_t task_status;
int32_t task_id;
int32_t task_tag;
int32_t results_size;
int err;
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_task_hndl_t task;
// do we have at least 16 bytes?
if (packet_size >= 16) {
// local task id
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_tag);
assert(err == 4);
// task status
err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &task_status);
assert(err == 4);
// result size
err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &results_size);
assert(err == 4);
// check packet_size again
if (packet_size == 16 + results_size) {
task.id = (mtapi_task_id_t)task_id;
task.tag = (mtapi_uint_t)task_tag;
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t * local_task =
embb_mtapi_task_pool_get_storage_for_handle(
node->task_pool, task);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, local_task->action)) {
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, local_task->action);
/* not needed right now
embb_mtapi_network_action_t * network_action =
(embb_mtapi_network_action_t*)local_action->plugin_data;*/
err = embb_mtapi_network_buffer_pop_front_rawdata(
buffer, results_size, local_task->result_buffer);
assert(err == results_size);
local_task->error_code = (mtapi_status_t)task_status;
embb_atomic_store_int(&local_task->state, MTAPI_TASK_COMPLETED);
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, local_task->group)) {
embb_mtapi_group_t* local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, local_task->group);
embb_mtapi_task_queue_push(&local_group->queue, local_task);
}
local_status = MTAPI_SUCCESS;
}
}
}
}
}
return local_status;
}
static mtapi_status_t embb_mtapi_network_handle_return_failure(
embb_mtapi_network_buffer_t * buffer,
int packet_size) {
int32_t task_status;
int32_t task_id;
int32_t task_tag;
int err;
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_task_hndl_t task;
// do we have 12 bytes?
if (packet_size == 12) {
// local task id
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_tag);
assert(err == 4);
// task status
err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &task_status);
assert(err == 4);
task.id = (mtapi_task_id_t)task_id;
task.tag = (mtapi_uint_t)task_tag;
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t * local_task =
embb_mtapi_task_pool_get_storage_for_handle(
node->task_pool, task);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, local_task->action)) {
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, local_task->action);
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
local_task->error_code = (mtapi_status_t)task_status;
if (MTAPI_ERR_ACTION_CANCELLED == task_status) {
embb_atomic_store_int(&local_task->state, MTAPI_TASK_CANCELLED);
} else {
embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, local_task->group)) {
embb_mtapi_group_t* local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, local_task->group);
embb_mtapi_task_queue_push(&local_group->queue, local_task);
}
local_status = MTAPI_SUCCESS;
}
}
}
}
return local_status;
}
static mtapi_status_t embb_mtapi_network_handle_cancel_task(
embb_mtapi_network_buffer_t * buffer,
int packet_size) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
int32_t remote_task_id;
int32_t remote_task_tag;
int err;
EMBB_UNUSED_IN_RELEASE(err);
// do we have 8 bytes?
if (packet_size == 8) {
// get task handle
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &remote_task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &remote_task_tag);
assert(err == 4);
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
// search for task to cancel
for (mtapi_uint_t ii = 0; ii < node->attributes.max_tasks; ii++) {
embb_mtapi_task_t * task = &node->task_pool->storage[ii];
// is this our task?
if (embb_mtapi_network_task_complete == task->attributes.complete_func) {
embb_mtapi_network_task_t * network_task =
(embb_mtapi_network_task_t*)task->attributes.user_data;
// is this task the one matching the given remote task?
if (remote_task_id == network_task->remote_task_id &&
remote_task_tag == network_task->remote_task_tag) {
mtapi_task_cancel(task->handle, &local_status);
break;
}
}
}
}
}
return local_status;
}
static int embb_mtapi_network_thread(void * args) {
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
embb_mtapi_network_buffer_t buffer;
embb_mtapi_network_buffer_t * buffer = &plugin->recv_buffer;
int err;
EMBB_UNUSED(args);
embb_mtapi_network_buffer_initialize(&buffer, (int)plugin->buffer_size);
while (embb_atomic_load_int(&plugin->run)) {
err = embb_mtapi_network_socket_select(
plugin->sockets, plugin->socket_count, 100);
......@@ -208,197 +599,53 @@ static int embb_mtapi_network_thread(void * args) {
plugin->socket_count++;
}
} else if (0 < err) {
int32_t domain_id;
int32_t job_id;
int32_t results_size;
void * results;
int8_t operation;
int32_t operation;
int32_t packet_size;
embb_mtapi_network_socket_t * socket = &plugin->sockets[err];
embb_mtapi_network_buffer_clear(&buffer);
embb_mtapi_network_buffer_clear(buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 1);
if (err == 0) {
// there was some socket error, ignore
continue;
}
assert(err == 1);
err = embb_mtapi_network_buffer_pop_front_int8(
&buffer, &operation);
assert(err == 1);
embb_mtapi_network_buffer_clear(&buffer);
if (operation == EMBB_MTAPI_NETWORK_START_TASK) {
int32_t arguments_size;
mtapi_uint_t priority = 0;
mtapi_job_hndl_t job_hndl;
mtapi_task_attributes_t task_attr;
void * arguments;
mtapi_task_complete_function_t func = embb_mtapi_network_task_complete;
void * func_void;
embb_mtapi_network_task_t * network_task =
(embb_mtapi_network_task_t*)embb_alloc(
sizeof(embb_mtapi_network_task_t));
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 28);
assert(err == 28);
// domain id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &domain_id);
assert(err == 4);
// job id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &job_id);
assert(err == 4);
// priority
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, (int32_t*)&priority);
assert(err == 4);
// remote task handle
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &network_task->remote_task_id);
assert(err == 4);
socket, buffer, 4);
if (err == 4) {
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &network_task->remote_task_tag);
buffer, &packet_size);
assert(err == 4);
// result size
err = embb_mtapi_network_buffer_pop_front_int32(&buffer,
&results_size);
assert(err == 4);
results = embb_alloc((size_t)results_size);
assert(results != NULL);
// arguments size
embb_mtapi_network_buffer_pop_front_int32(&buffer, &arguments_size);
assert(err == 4);
arguments = embb_alloc((size_t)arguments_size);
assert(arguments != NULL);
embb_mtapi_network_buffer_clear(&buffer);
// arguments
embb_mtapi_network_buffer_clear(buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, arguments_size);
assert(err == arguments_size);
err = embb_mtapi_network_buffer_pop_front_rawdata(
&buffer, arguments_size, arguments);
assert(err == arguments_size);
embb_mtapi_network_buffer_clear(&buffer);
network_task->socket = *socket;
mtapi_taskattr_init(&task_attr, &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA,
(void*)network_task, 0, &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_boolean_t task_detached = MTAPI_TRUE;
mtapi_taskattr_set(&task_attr, MTAPI_TASK_DETACHED,
(void*)&task_detached, sizeof(mtapi_boolean_t), &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_taskattr_set(&task_attr, MTAPI_TASK_PRIORITY,
(void*)&priority, sizeof(mtapi_uint_t), &local_status);
assert(local_status == MTAPI_SUCCESS);
memcpy(&func_void, &func, sizeof(void*));
mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION,
func_void, 0, &local_status);
assert(local_status == MTAPI_SUCCESS);
job_hndl = mtapi_job_get((mtapi_job_id_t)job_id,
(mtapi_domain_t)domain_id, &local_status);
assert(local_status == MTAPI_SUCCESS);
mtapi_task_start(
MTAPI_TASK_ID_NONE, job_hndl,
arguments, (mtapi_size_t)arguments_size,
results, (mtapi_size_t)results_size,
&task_attr, MTAPI_GROUP_NONE,
&local_status);
assert(local_status == MTAPI_SUCCESS);
// send back result of task creation
//embb_mtapi_network_buffer_push_back_int32(
// &buffer, local_status);
//embb_mtapi_network_socket_sendbuffer(
// socket, &buffer);
embb_mtapi_network_buffer_clear(&buffer);
} else if (operation == EMBB_MTAPI_NETWORK_RETURN_RESULT) {
int task_status;
int task_id;
int task_tag;
embb_mtapi_network_buffer_clear(&buffer);
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_task_hndl_t task;
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 16);
assert(err == 16);
// local task id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_tag);
assert(err == 4);
// task status
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &task_status);
assert(err == 4);
// result size
socket, buffer, packet_size);
if (err == packet_size) {
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &results_size);
buffer, &operation);
assert(err == 4);
embb_mtapi_network_buffer_clear(&buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, results_size);
assert(err == results_size);
task.id = (mtapi_task_id_t)task_id;
task.tag = (mtapi_uint_t)task_tag;
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t * local_task =
embb_mtapi_task_pool_get_storage_for_handle(
node->task_pool, task);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, local_task->action)) {
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, local_task->action);
/* not needed right now
embb_mtapi_network_action_t * network_action =
(embb_mtapi_network_action_t*)local_action->plugin_data;*/
err = embb_mtapi_network_buffer_pop_front_rawdata(
&buffer, results_size, local_task->result_buffer);
assert(err == results_size);
local_task->error_code = (mtapi_status_t)task_status;
embb_atomic_store_int(&local_task->state, MTAPI_TASK_COMPLETED);
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, local_task->group)) {
embb_mtapi_group_t* local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, local_task->group);
embb_mtapi_task_queue_push(&local_group->queue, local_task);
}
}
packet_size -= 4;
switch (operation) {
case EMBB_MTAPI_NETWORK_START_TASK:
embb_mtapi_network_handle_start_task(socket, buffer, packet_size);
break;
case EMBB_MTAPI_NETWORK_RETURN_RESULT:
embb_mtapi_network_handle_return_result(buffer, packet_size);
break;
case EMBB_MTAPI_NETWORK_RETURN_FAILURE:
embb_mtapi_network_handle_return_failure(buffer, packet_size);
break;
case EMBB_MTAPI_NETWORK_CANCEL_TASK:
embb_mtapi_network_handle_cancel_task(buffer, packet_size);
break;
default:
// invalid, ignore
break;
}
}
}
embb_mtapi_network_buffer_clear(buffer);
}
}
embb_mtapi_network_buffer_finalize(&buffer);
return EMBB_SUCCESS;
}
......@@ -408,42 +655,106 @@ void mtapi_network_plugin_initialize(
MTAPI_IN mtapi_uint16_t max_connections,
MTAPI_IN mtapi_size_t buffer_size,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
int err;
mtapi_status_set(status, MTAPI_ERR_UNKNOWN);
plugin->socket_count = 0;
plugin->buffer_size = 0;
plugin->sockets = NULL;
embb_atomic_store_int(&plugin->run, 0);
err = embb_mtapi_network_initialize();
if (err) {
embb_atomic_store_int(&plugin->run, 1);
plugin->buffer_size = buffer_size;
plugin->socket_count = 1;
// 1 listening socket and max_connections connections
// (2 sockets each if local)
plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc(
sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2));
embb_mtapi_network_buffer_initialize(
&plugin->send_buffer, (int)plugin->buffer_size);
embb_mutex_init(&plugin->send_mutex, 0);
if (NULL != plugin->sockets) {
err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]);
if (err) {
err = embb_mtapi_network_socket_bind_and_listen(
&plugin->sockets[0], host, port, max_connections);
if (err) {
err = embb_thread_create(
&plugin->thread, NULL, embb_mtapi_network_thread, NULL);
if (EMBB_SUCCESS == err) {
local_status = MTAPI_SUCCESS;
}
}
}
}
if (0 == err) return;
err = embb_mtapi_network_buffer_initialize(
&plugin->recv_buffer, (int)buffer_size);
if (0 == err) {
embb_mtapi_network_finalize();
return;
}
mtapi_status_set(status, local_status);
err = embb_mtapi_network_buffer_initialize(
&plugin->send_buffer, (int)buffer_size);
if (0 == err) {
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
embb_mtapi_network_finalize();
return;
}
plugin->buffer_size = buffer_size;
// 1 listening socket and max_connections connections
// (2 sockets each if local)
plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc(
sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2));
if (NULL == plugin->sockets) {
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
err = embb_mutex_init(&plugin->send_mutex, 0);
if (EMBB_SUCCESS != err) {
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]);
if (0 == err) {
embb_mutex_destroy(&plugin->send_mutex);
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
plugin->socket_count = 1;
err = embb_mtapi_network_socket_bind_and_listen(
&plugin->sockets[0], host, port, max_connections);
if (0 == err) {
embb_mtapi_network_socket_finalize(&plugin->sockets[0]);
plugin->socket_count = 0;
embb_mutex_destroy(&plugin->send_mutex);
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
embb_atomic_store_int(&plugin->run, 1);
err = embb_thread_create(
&plugin->thread, NULL, embb_mtapi_network_thread, NULL);
if (EMBB_SUCCESS != err) {
embb_atomic_store_int(&plugin->run, 0);
embb_mtapi_network_socket_finalize(&plugin->sockets[0]);
plugin->socket_count = 0;
embb_mutex_destroy(&plugin->send_mutex);
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
mtapi_status_set(status, MTAPI_SUCCESS);
}
void mtapi_network_plugin_finalize(
......@@ -458,6 +769,8 @@ void mtapi_network_plugin_finalize(
embb_mutex_destroy(&plugin->send_mutex);
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
embb_mtapi_network_socket_finalize(&plugin->sockets[0]);
embb_free(plugin->sockets);
embb_mtapi_network_finalize();
......@@ -468,9 +781,9 @@ void mtapi_network_plugin_finalize(
static void network_task_start(
MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
int err;
EMBB_UNUSED_IN_RELEASE(err);
// assume failure
mtapi_status_set(status, MTAPI_ERR_UNKNOWN);
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
......@@ -491,69 +804,152 @@ static void network_task_start(
// serialize sending
embb_mutex_lock(&network_action->send_mutex);
embb_mtapi_network_buffer_clear(send_buf);
// actual counts bytes actually put into the buffer
int actual = 0;
// expected counts bytes we intended to put into the buffer
int expected =
4 + // operation
4 + // domain_id
4 + // job_id
4 + // priority
4 + 4 + // task handle
4 + // result_size
4 + local_task->arguments_size; // arguments buffer
// packet size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)expected);
expected += 4;
// operation is "start task"
err = embb_mtapi_network_buffer_push_back_int8(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, EMBB_MTAPI_NETWORK_START_TASK);
assert(err == 1);
err = embb_mtapi_network_buffer_push_back_int32(
// domain_id
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)network_action->domain_id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
// job_id
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)network_action->job_id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
// priority
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->attributes.priority);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
// task handle
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.tag);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
// result size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->result_size);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
// arguments buffer
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->arguments_size);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_rawdata(
actual += embb_mtapi_network_buffer_push_back_rawdata(
send_buf, (int32_t)local_task->arguments_size, local_task->arguments);
assert(err == (int)local_task->arguments_size);
err = embb_mtapi_network_socket_sendbuffer(
&network_action->socket, send_buf);
assert(err == send_buf->size);
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
embb_atomic_store_int(&local_task->state, MTAPI_TASK_RUNNING);
// check if everything fit into the buffer
if (actual == expected) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
embb_atomic_store_int(&local_task->state, MTAPI_TASK_RUNNING);
int sent = embb_mtapi_network_socket_sendbuffer(
&network_action->socket, send_buf);
// was everything sent?
if (sent == send_buf->size) {
// we've done it, success!
mtapi_status_set(status, MTAPI_SUCCESS);
} else {
// could not send the whole task, this will fail on the remote side,
// so we can safely assume that the task is in error
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR);
}
}
embb_mtapi_network_buffer_clear(send_buf);
embb_mutex_unlock(&network_action->send_mutex);
local_status = MTAPI_SUCCESS;
}
}
}
mtapi_status_set(status, local_status);
}
static void network_task_cancel(
MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
EMBB_UNUSED(task);
// assume failure
mtapi_status_set(status, MTAPI_ERR_UNKNOWN);
mtapi_status_set(status, local_status);
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t * local_task =
embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, local_task->action)) {
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, local_task->action);
embb_mtapi_network_action_t * network_action =
(embb_mtapi_network_action_t*)local_action->plugin_data;
embb_mtapi_network_buffer_t * send_buf = &network_action->send_buffer;
// serialize sending
embb_mutex_lock(&network_action->send_mutex);
embb_mtapi_network_buffer_clear(send_buf);
// actual counts bytes actually put into the buffer
int actual = 0;
// expected counts bytes we intended to put into the buffer
int expected =
4 + // operation
4 + 4; // task handle
// packet size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)expected);
expected += 4;
// operation is "cancel task"
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, EMBB_MTAPI_NETWORK_CANCEL_TASK);
// task handle
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.id);
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.tag);
// check if everything fit into the buffer
if (actual == expected) {
int sent = embb_mtapi_network_socket_sendbuffer(
&network_action->socket, send_buf);
// was everything sent?
if (sent == send_buf->size) {
// we've done it, success!
mtapi_status_set(status, MTAPI_SUCCESS);
} else {
embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR);
}
} else {
embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR);
}
embb_mtapi_network_buffer_clear(send_buf);
embb_mutex_unlock(&network_action->send_mutex);
}
}
}
}
static void network_action_finalize(
......@@ -602,34 +998,36 @@ mtapi_action_hndl_t mtapi_network_action_create(
action->domain_id = domain_id;
action->job_id = remote_job_id;
embb_mtapi_network_buffer_initialize(
err = embb_mtapi_network_buffer_initialize(
&action->send_buffer, (int)plugin->buffer_size);
embb_mutex_init(&action->send_mutex, 0);
action->host = host;
action->port = port;
embb_mtapi_network_socket_initialize(&action->socket);
err = embb_mtapi_network_socket_connect(&action->socket, host, port);
if (0 != err) {
// store socket for select
plugin->sockets[plugin->socket_count] = action->socket;
plugin->socket_count++;
action_hndl = mtapi_ext_plugin_action_create(
local_job_id,
network_task_start,
network_task_cancel,
network_action_finalize,
action,
NULL, 0, // no node local data obviously
MTAPI_NULL,
&local_status);
} else {
embb_mutex_destroy(&action->send_mutex);
embb_mtapi_network_buffer_finalize(&action->send_buffer);
embb_mtapi_network_socket_finalize(&action->socket);
embb_free(action);
err = embb_mutex_init(&action->send_mutex, 0);
if (EMBB_SUCCESS == err) {
action->host = host;
action->port = port;
embb_mtapi_network_socket_initialize(&action->socket);
err = embb_mtapi_network_socket_connect(&action->socket, host, port);
if (0 != err) {
// store socket for select
plugin->sockets[plugin->socket_count] = action->socket;
plugin->socket_count++;
action_hndl = mtapi_ext_plugin_action_create(
local_job_id,
network_task_start,
network_task_cancel,
network_action_finalize,
action,
NULL, 0, // no node local data obviously
MTAPI_NULL,
&local_status);
} else {
embb_mutex_destroy(&action->send_mutex);
embb_mtapi_network_buffer_finalize(&action->send_buffer);
embb_mtapi_network_socket_finalize(&action->socket);
embb_free(action);
}
}
}
}
......
......@@ -28,9 +28,10 @@
#include <embb/base/c/memory_allocation.h>
#include <string.h>
void embb_mtapi_network_buffer_initialize(
int embb_mtapi_network_buffer_initialize(
embb_mtapi_network_buffer_t * that,
int capacity) {
int result = 1;
that->position = 0;
that->size = 0;
that->data = (char*)embb_alloc((size_t)capacity);
......@@ -38,7 +39,9 @@ void embb_mtapi_network_buffer_initialize(
that->capacity = capacity;
} else {
that->capacity = 0;
result = 0;
}
return result;
}
void embb_mtapi_network_buffer_finalize(
......
......@@ -43,7 +43,7 @@ struct embb_mtapi_network_buffer_struct {
typedef struct embb_mtapi_network_buffer_struct embb_mtapi_network_buffer_t;
void embb_mtapi_network_buffer_initialize(
int embb_mtapi_network_buffer_initialize(
embb_mtapi_network_buffer_t * that,
int capacity
);
......
......@@ -116,7 +116,8 @@ int embb_mtapi_network_socket_connect(
if (SOCKET_ERROR == connect(that->handle, (struct sockaddr *)&addr,
sizeof(addr))) {
#ifdef _WIN32
if (WSAEWOULDBLOCK != WSAGetLastError())
int err = WSAGetLastError();
if (WSAEWOULDBLOCK != err)
#else
if (EAGAIN != errno)
#endif
......
......@@ -61,13 +61,52 @@ static void test(
}
}
static void cancel_test(
void const * /*arguments*/,
mtapi_size_t /*arguments_size*/,
void * /*result_buffer*/,
mtapi_size_t /*result_buffer_size*/,
void const * /*node_local_data*/,
mtapi_size_t /*node_local_data_size*/,
mtapi_task_context_t * context) {
mtapi_status_t status;
while (true) {
mtapi_task_state_t state = mtapi_context_taskstate_get(context, &status);
if (status != MTAPI_SUCCESS) {
break;
} else {
if (state == MTAPI_TASK_CANCELLED) {
break;
}
}
}
}
NetworkTaskTest::NetworkTaskTest() {
CreateUnit("mtapi network task test").Add(&NetworkTaskTest::TestBasic, this);
CreateUnit("mtapi network task test")
.Add(&NetworkTaskTest::TestBasic, this);
}
void NetworkTaskTest::TestBasic() {
mtapi_status_t status;
mtapi_initialize(
NETWORK_DOMAIN,
NETWORK_LOCAL_NODE,
MTAPI_NULL,
MTAPI_NULL,
&status);
MTAPI_CHECK_STATUS(status);
TestSimple();
TestCancel();
mtapi_finalize(&status);
MTAPI_CHECK_STATUS(status);
}
void NetworkTaskTest::TestSimple() {
mtapi_status_t status;
mtapi_job_hndl_t job;
mtapi_task_hndl_t task;
mtapi_action_hndl_t network_action, local_action;
......@@ -81,14 +120,6 @@ void NetworkTaskTest::TestBasic() {
arguments[ii + kElements] = static_cast<float>(ii);
}
mtapi_initialize(
NETWORK_DOMAIN,
NETWORK_LOCAL_NODE,
MTAPI_NULL,
MTAPI_NULL,
&status);
MTAPI_CHECK_STATUS(status);
mtapi_network_plugin_initialize("127.0.0.1", 12345, 5,
kElements * 4 * 3 + 32, &status);
MTAPI_CHECK_STATUS(status);
......@@ -139,7 +170,68 @@ void NetworkTaskTest::TestBasic() {
mtapi_network_plugin_finalize(&status);
MTAPI_CHECK_STATUS(status);
}
mtapi_finalize(&status);
void NetworkTaskTest::TestCancel() {
mtapi_status_t status;
mtapi_job_hndl_t job;
mtapi_task_hndl_t task;
mtapi_action_hndl_t network_action, local_action;
float argument = 1.0f;
float result;
mtapi_network_plugin_initialize("127.0.0.1", 12345, 5,
4 * 3 + 32, &status);
MTAPI_CHECK_STATUS(status);
float node_remote = 1.0f;
local_action = mtapi_action_create(
NETWORK_REMOTE_JOB,
cancel_test,
&node_remote, sizeof(float),
MTAPI_DEFAULT_ACTION_ATTRIBUTES,
&status);
MTAPI_CHECK_STATUS(status);
network_action = mtapi_network_action_create(
NETWORK_DOMAIN,
NETWORK_LOCAL_JOB,
NETWORK_REMOTE_JOB,
"127.0.0.1", 12345,
&status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
job = mtapi_job_get(NETWORK_LOCAL_JOB, NETWORK_DOMAIN, &status);
MTAPI_CHECK_STATUS(status);
task = mtapi_task_start(
MTAPI_TASK_ID_NONE,
job,
&argument, sizeof(float),
&result, sizeof(float),
MTAPI_DEFAULT_TASK_ATTRIBUTES,
MTAPI_GROUP_NONE,
&status);
MTAPI_CHECK_STATUS(status);
mtapi_task_wait(task, 1, &status);
PT_ASSERT_EQ(status, MTAPI_TIMEOUT);
mtapi_task_cancel(task, &status);
MTAPI_CHECK_STATUS(status);
mtapi_task_wait(task, MTAPI_INFINITE, &status);
PT_ASSERT_NE(status, MTAPI_TIMEOUT);
PT_ASSERT_EQ(status, MTAPI_ERR_ACTION_CANCELLED);
mtapi_action_delete(network_action, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
mtapi_action_delete(local_action, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
mtapi_network_plugin_finalize(&status);
MTAPI_CHECK_STATUS(status);
}
......@@ -35,6 +35,9 @@ class NetworkTaskTest : public partest::TestCase {
private:
void TestBasic();
void TestSimple();
void TestCancel();
};
#endif // MTAPI_PLUGINS_C_MTAPI_NETWORK_C_TEST_EMBB_MTAPI_NETWORK_TEST_TASK_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment