Commit 5dcde678 by Marcus Winter

mtapi_network_c: network action plugin and corresponding test

parent f03238cc
...@@ -29,6 +29,7 @@ include_directories(${EMBB_MTAPI_NETWORK_INCLUDE_DIRS} ...@@ -29,6 +29,7 @@ include_directories(${EMBB_MTAPI_NETWORK_INCLUDE_DIRS}
${CMAKE_CURRENT_SOURCE_DIR}/../base_c/include ${CMAKE_CURRENT_SOURCE_DIR}/../base_c/include
${CMAKE_CURRENT_BINARY_DIR}/../base_c/include ${CMAKE_CURRENT_BINARY_DIR}/../base_c/include
${CMAKE_CURRENT_SOURCE_DIR}/../mtapi_c/include ${CMAKE_CURRENT_SOURCE_DIR}/../mtapi_c/include
${CMAKE_CURRENT_SOURCE_DIR}/../mtapi_c/src
) )
add_library(embb_mtapi_network_c ${EMBB_MTAPI_NETWORK_C_SOURCES} ${EMBB_MTAPI_NETWORK_C_HEADERS}) add_library(embb_mtapi_network_c ${EMBB_MTAPI_NETWORK_C_SOURCES} ${EMBB_MTAPI_NETWORK_C_HEADERS})
......
/*
* Copyright (c) 2014, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef EMBB_MTAPI_C_MTAPI_NETWORK_H_
#define EMBB_MTAPI_C_MTAPI_NETWORK_H_
#include <embb/mtapi/c/mtapi_ext.h>
#ifdef __cplusplus
extern "C" {
#endif
void mtapi_network_plugin_initialize(
MTAPI_IN char * host,
MTAPI_IN mtapi_uint16_t port,
MTAPI_IN mtapi_uint16_t max_connections,
MTAPI_IN mtapi_size_t buffer_size,
MTAPI_OUT mtapi_status_t* status
);
void mtapi_network_plugin_finalize(
MTAPI_OUT mtapi_status_t* status
);
mtapi_action_hndl_t mtapi_network_action_create(
MTAPI_IN mtapi_domain_t domain_id,
MTAPI_IN mtapi_job_id_t local_job_id,
MTAPI_IN mtapi_job_id_t remote_job_id,
MTAPI_IN char * host,
MTAPI_IN mtapi_uint16_t port,
MTAPI_OUT mtapi_status_t* status
);
#ifdef __cplusplus
}
#endif
#endif // EMBB_MTAPI_C_MTAPI_NETWORK_H_
...@@ -24,12 +24,24 @@ ...@@ -24,12 +24,24 @@
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
#include <embb_mtapi_network.h>
#ifdef _WIN32 #ifdef _WIN32
#include <WinSock2.h> #include <WinSock2.h>
#endif #endif
#include <embb/mtapi/c/mtapi_network.h>
#include <embb/base/c/memory_allocation.h>
#include <embb/base/c/thread.h>
#include <embb/base/c/atomic.h>
#include <embb/base/c/mutex.h>
#include <embb/base/c/internal/unused.h>
#include <embb_mtapi_network_socket.h>
#include <embb_mtapi_network.h>
#include <embb_mtapi_task_t.h>
#include <embb_mtapi_action_t.h>
#include <embb_mtapi_node_t.h>
#include <mtapi_status_t.h>
int embb_mtapi_network_initialize() { int embb_mtapi_network_initialize() {
#ifdef _WIN32 #ifdef _WIN32
WORD ver_request; WORD ver_request;
...@@ -54,3 +66,468 @@ void embb_mtapi_network_finalize() { ...@@ -54,3 +66,468 @@ void embb_mtapi_network_finalize() {
WSACleanup(); WSACleanup();
#endif #endif
} }
enum embb_mtapi_network_operation_enum {
EMBB_MTAPI_NETWORK_START_TASK,
EMBB_MTAPI_NETWORK_RETURN_RESULT
};
struct embb_mtapi_network_plugin_struct {
embb_thread_t thread;
embb_mtapi_network_socket_t *sockets; // sockets[0] is the listening socket
int socket_count;
embb_atomic_int run;
mtapi_size_t buffer_size;
};
typedef struct embb_mtapi_network_plugin_struct embb_mtapi_network_plugin_t;
static embb_mtapi_network_plugin_t embb_mtapi_network_plugin;
struct embb_mtapi_network_action_struct {
mtapi_domain_t domain_id;
mtapi_job_id_t job_id;
char const * host;
mtapi_uint16_t port;
embb_mtapi_network_socket_t socket;
embb_mutex_t send_mutex;
embb_mtapi_network_buffer_t send_buffer;
};
typedef struct embb_mtapi_network_action_struct embb_mtapi_network_action_t;
struct embb_mtapi_network_task_struct {
embb_mtapi_network_socket_t socket;
int32_t remote_task_id;
int32_t remote_task_tag;
};
typedef struct embb_mtapi_network_task_struct embb_mtapi_network_task_t;
static void embb_mtapi_network_task_complete(
MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
int err;
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t * local_task =
embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, local_task->action)) {
/* not needed right now
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, local_task->action);*/
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
embb_mtapi_network_task_t * network_task =
(embb_mtapi_network_task_t*)local_task->attributes.user_data;
embb_mtapi_network_buffer_t send_buf;
// init buffer for sending
embb_mtapi_network_buffer_initialize(&send_buf, plugin->buffer_size);
// operation is "return result"
err = embb_mtapi_network_buffer_push_back_int8(
&send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT);
// remote task id
err = embb_mtapi_network_buffer_push_back_int32(
&send_buf, network_task->remote_task_id);
err = embb_mtapi_network_buffer_push_back_int32(
&send_buf, network_task->remote_task_tag);
// status
err = embb_mtapi_network_buffer_push_back_int32(
&send_buf, local_task->error_code);
// result size
err = embb_mtapi_network_buffer_push_back_int32(
&send_buf, local_task->result_size);
err = embb_mtapi_network_buffer_push_back_rawdata(
&send_buf, local_task->result_size, local_task->result_buffer);
err = embb_mtapi_network_socket_sendbuffer(
&network_task->socket, &send_buf);
// sending done, free the buffer
embb_mtapi_network_buffer_finalize(&send_buf);
// we allocated arguments and results on receive, so free them here
embb_free((void*)local_task->arguments);
embb_free(local_task->result_buffer);
local_status = MTAPI_SUCCESS;
}
}
}
mtapi_status_set(status, local_status);
}
static int embb_mtapi_network_thread(void * args) {
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
embb_mtapi_network_buffer_t buffer;
int err;
EMBB_UNUSED(args);
embb_mtapi_network_buffer_initialize(&buffer, plugin->buffer_size);
while (embb_atomic_load_int(&plugin->run)) {
err = embb_mtapi_network_socket_select(
plugin->sockets, plugin->socket_count, 100);
if (0 == err) {
// listening socket, accept connection
embb_mtapi_network_socket_t accept_socket;
err = embb_mtapi_network_socket_accept(
&plugin->sockets[0], &accept_socket);
if (0 < err) {
// add socket to socket list
plugin->sockets[plugin->socket_count] = accept_socket;
plugin->socket_count++;
}
} else if (0 < err) {
int32_t domain_id;
int32_t job_id;
int32_t results_size;
void * results;
int8_t operation;
embb_mtapi_network_socket_t * socket = &plugin->sockets[err];
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 1);
embb_mtapi_network_buffer_pop_front_int8(
&buffer, &operation);
embb_mtapi_network_buffer_clear(&buffer);
if (operation == EMBB_MTAPI_NETWORK_START_TASK) {
int32_t arguments_size;
mtapi_job_hndl_t job_hndl;
mtapi_task_hndl_t task_hndl;
mtapi_task_attributes_t task_attr;
void * arguments;
mtapi_task_complete_function_t func = embb_mtapi_network_task_complete;
void * func_void;
embb_mtapi_network_task_t * network_task =
(embb_mtapi_network_task_t*)embb_alloc(
sizeof(embb_mtapi_network_task_t));
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 24);
// domain id
embb_mtapi_network_buffer_pop_front_int32(&buffer, &domain_id);
// job id
embb_mtapi_network_buffer_pop_front_int32(&buffer, &job_id);
// remote task handle
embb_mtapi_network_buffer_pop_front_int32(
&buffer, &network_task->remote_task_id);
embb_mtapi_network_buffer_pop_front_int32(
&buffer, &network_task->remote_task_tag);
// result size
embb_mtapi_network_buffer_pop_front_int32(&buffer, &results_size);
results = embb_alloc(results_size);
// arguments size
embb_mtapi_network_buffer_pop_front_int32(&buffer, &arguments_size);
arguments = embb_alloc(arguments_size);
embb_mtapi_network_buffer_clear(&buffer);
// arguments
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, arguments_size);
embb_mtapi_network_buffer_pop_front_rawdata(
&buffer, arguments_size, arguments);
embb_mtapi_network_buffer_clear(&buffer);
network_task->socket = *socket;
mtapi_taskattr_init(&task_attr, &local_status);
mtapi_taskattr_set(&task_attr, MTAPI_TASK_USER_DATA,
(void*)network_task, 0, &local_status);
memcpy(&func_void, &func, sizeof(void*));
mtapi_taskattr_set(&task_attr, MTAPI_TASK_COMPLETE_FUNCTION,
func_void, 0, &local_status);
job_hndl = mtapi_job_get(job_id, domain_id, &local_status);
task_hndl = mtapi_task_start(
MTAPI_TASK_ID_NONE, job_hndl,
arguments, arguments_size,
results, results_size,
&task_attr, MTAPI_GROUP_NONE,
&local_status);
// send back result of task creation
//embb_mtapi_network_buffer_push_back_int32(
// &buffer, local_status);
//embb_mtapi_network_socket_sendbuffer(
// socket, &buffer);
embb_mtapi_network_buffer_clear(&buffer);
}
else if (operation == EMBB_MTAPI_NETWORK_RETURN_RESULT) {
int task_status;
int task_id;
int task_tag;
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_task_hndl_t task;
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 16);
// local task id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_id);
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_tag);
// task status
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &task_status);
// result size
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &results_size);
embb_mtapi_network_buffer_clear(&buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, results_size);
task.id = task_id;
task.tag = task_tag;
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t * local_task =
embb_mtapi_task_pool_get_storage_for_handle(
node->task_pool, task);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, local_task->action)) {
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, local_task->action);
/* not needed right now
embb_mtapi_network_action_t * network_action =
(embb_mtapi_network_action_t*)local_action->plugin_data;*/
err = embb_mtapi_network_buffer_pop_front_rawdata(
&buffer, results_size, local_task->result_buffer);
local_task->error_code = task_status;
local_task->state = MTAPI_TASK_COMPLETED;
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
}
}
}
}
}
embb_mtapi_network_buffer_finalize(&buffer);
return EMBB_SUCCESS;
}
void mtapi_network_plugin_initialize(
MTAPI_IN char * host,
MTAPI_IN mtapi_uint16_t port,
MTAPI_IN mtapi_uint16_t max_connections,
MTAPI_IN mtapi_size_t buffer_size,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
int err;
err = embb_mtapi_network_initialize();
embb_atomic_store_int(&plugin->run, 1);
plugin->socket_count = 1;
// 1 listening socket and max_connections connections
// (2 sockets each if local)
plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc(
sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2));
err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]);
err = embb_mtapi_network_socket_bind_and_listen(
&plugin->sockets[0], host, port, max_connections);
plugin->buffer_size = buffer_size;
err = embb_thread_create(&plugin->thread, NULL, embb_mtapi_network_thread, NULL);
if (EMBB_SUCCESS == err) {
local_status = MTAPI_SUCCESS;
}
mtapi_status_set(status, local_status);
}
void mtapi_network_plugin_finalize(
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_SUCCESS;
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
int err;
embb_atomic_store_int(&plugin->run, 0);
embb_thread_join(&plugin->thread, &err);
embb_mtapi_network_socket_finalize(&plugin->sockets[0]);
embb_free(plugin->sockets);
embb_mtapi_network_finalize();
mtapi_status_set(status, local_status);
}
static void network_task_start(
MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
int err;
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t * local_task =
embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, local_task->action)) {
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, local_task->action);
embb_mtapi_network_action_t * network_action =
(embb_mtapi_network_action_t*)local_action->plugin_data;
embb_mtapi_network_buffer_t * send_buf = &network_action->send_buffer;
// serialize sending
embb_mutex_lock(&network_action->send_mutex);
// operation is "start task"
err = embb_mtapi_network_buffer_push_back_int8(
send_buf, EMBB_MTAPI_NETWORK_START_TASK);
err = embb_mtapi_network_buffer_push_back_int32(
send_buf, network_action->domain_id);
err = embb_mtapi_network_buffer_push_back_int32(
send_buf, network_action->job_id);
err = embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->handle.id);
err = embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->handle.tag);
err = embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->result_size);
err = embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->arguments_size);
err = embb_mtapi_network_buffer_push_back_rawdata(
send_buf, local_task->arguments_size, local_task->arguments);
err = embb_mtapi_network_socket_sendbuffer(
&network_action->socket, send_buf);
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
local_task->state = MTAPI_TASK_RUNNING;
embb_mtapi_network_buffer_clear(send_buf);
embb_mutex_unlock(&network_action->send_mutex);
local_status = MTAPI_SUCCESS;
}
}
}
mtapi_status_set(status, local_status);
}
static void network_task_cancel(
MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
EMBB_UNUSED(task);
mtapi_status_set(status, local_status);
}
static void network_action_finalize(
MTAPI_IN mtapi_action_hndl_t action,
MTAPI_OUT mtapi_status_t* status
) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
if (embb_mtapi_action_pool_is_handle_valid(node->action_pool, action)) {
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, action);
embb_mtapi_network_action_t * network_action =
(embb_mtapi_network_action_t *)local_action->plugin_data;
embb_mutex_destroy(&network_action->send_mutex);
embb_mtapi_network_buffer_finalize(&network_action->send_buffer);
embb_mtapi_network_socket_finalize(&network_action->socket);
embb_free(network_action);
local_status = MTAPI_SUCCESS;
}
}
mtapi_status_set(status, local_status);
}
mtapi_action_hndl_t mtapi_network_action_create(
MTAPI_IN mtapi_domain_t domain_id,
MTAPI_IN mtapi_job_id_t local_job_id,
MTAPI_IN mtapi_job_id_t remote_job_id,
MTAPI_IN char * host,
MTAPI_IN mtapi_uint16_t port,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
embb_mtapi_network_action_t * action =
(embb_mtapi_network_action_t*)embb_alloc(
sizeof(embb_mtapi_network_action_t));
mtapi_action_hndl_t action_hndl = { 0, 0 };
int err;
action->domain_id = domain_id;
action->job_id = remote_job_id;
embb_mtapi_network_buffer_initialize(&action->send_buffer, plugin->buffer_size);
embb_mutex_init(&action->send_mutex, 0);
action->host = host;
action->port = port;
embb_mtapi_network_socket_initialize(&action->socket);
err = embb_mtapi_network_socket_connect(&action->socket, host, port);
if (0 != err) {
// store socket for select
plugin->sockets[plugin->socket_count] = action->socket;
plugin->socket_count++;
action_hndl = mtapi_ext_plugin_action_create(
local_job_id,
network_task_start,
network_task_cancel,
network_action_finalize,
action,
NULL, 0, // no node local data obviously
MTAPI_NULL,
&local_status);
mtapi_status_set(status, local_status);
}
return action_hndl;
}
/*
* Copyright (c) 2014, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <embb_mtapi_network_test_task.h>
#include <embb/mtapi/c/mtapi_ext.h>
#include <embb/mtapi/c/mtapi_network.h>
#include <embb/base/c/internal/unused.h>
#define MTAPI_CHECK_STATUS(status) \
PT_ASSERT(MTAPI_SUCCESS == status)
#define NETWORK_DOMAIN 1
#define NETWORK_LOCAL_NODE 3
#define NETWORK_LOCAL_JOB 3
#define NETWORK_REMOTE_NODE 3
#define NETWORK_REMOTE_JOB 4
static void test(
void const * arguments,
mtapi_size_t arguments_size,
void * result_buffer,
mtapi_size_t result_buffer_size,
void const * node_local_data,
mtapi_size_t node_local_data_size,
mtapi_task_context_t * context) {
EMBB_UNUSED(context);
EMBB_UNUSED(result_buffer_size);
EMBB_UNUSED(node_local_data_size);
int elements = (int)(arguments_size / sizeof(float) / 2);
float* a = (float*)arguments;
float* b = ((float*)arguments) + elements;
float* c = (float*)result_buffer;
float* d = (float*)node_local_data;
for (int ii = 0; ii < elements; ii++) {
c[ii] = a[ii] + b[ii] + d[0];
}
}
NetworkTaskTest::NetworkTaskTest() {
CreateUnit("mtapi network task test").Add(&NetworkTaskTest::TestBasic, this);
}
void NetworkTaskTest::TestBasic() {
mtapi_status_t status;
mtapi_job_hndl_t job;
mtapi_task_hndl_t task;
mtapi_action_hndl_t network_action, local_action;
const int elements = 64;
float arguments[elements * 2];
float results[elements];
for (int ii = 0; ii < elements; ii++) {
arguments[ii] = (float)ii;
arguments[ii + elements] = (float)ii;
}
mtapi_initialize(
NETWORK_DOMAIN,
NETWORK_LOCAL_NODE,
MTAPI_NULL,
MTAPI_NULL,
&status);
MTAPI_CHECK_STATUS(status);
mtapi_network_plugin_initialize("127.0.0.1", 12345, 5,
elements * 4 * 3 + 32, &status);
MTAPI_CHECK_STATUS(status);
float node_remote = 1.0f;
local_action = mtapi_action_create(
NETWORK_REMOTE_JOB,
test,
&node_remote, sizeof(float),
MTAPI_DEFAULT_ACTION_ATTRIBUTES,
&status);
MTAPI_CHECK_STATUS(status);
network_action = mtapi_network_action_create(
NETWORK_DOMAIN,
NETWORK_LOCAL_JOB,
NETWORK_REMOTE_JOB,
"127.0.0.1", 12345,
&status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
job = mtapi_job_get(NETWORK_LOCAL_JOB, NETWORK_DOMAIN, &status);
MTAPI_CHECK_STATUS(status);
task = mtapi_task_start(
MTAPI_TASK_ID_NONE,
job,
arguments, elements * 2 * sizeof(float),
results, elements*sizeof(float),
MTAPI_DEFAULT_TASK_ATTRIBUTES,
MTAPI_GROUP_NONE,
&status);
MTAPI_CHECK_STATUS(status);
mtapi_task_wait(task, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
for (int ii = 0; ii < elements; ii++) {
PT_EXPECT_EQ(results[ii], ii * 2 + 1);
}
mtapi_action_delete(network_action, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
mtapi_action_delete(local_action, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
mtapi_network_plugin_finalize(&status);
MTAPI_CHECK_STATUS(status);
mtapi_finalize(&status);
MTAPI_CHECK_STATUS(status);
}
/*
* Copyright (c) 2014, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef MTAPI_C_TEST_EMBB_MTAPI_NETWORK_TEST_TASK_H_
#define MTAPI_C_TEST_EMBB_MTAPI_NETWORK_TEST_TASK_H_
#include <partest/partest.h>
class NetworkTaskTest : public partest::TestCase {
public:
NetworkTaskTest();
private:
void TestBasic();
};
#endif // MTAPI_C_TEST_EMBB_MTAPI_NETWORK_TEST_TASK_H_
...@@ -28,8 +28,10 @@ ...@@ -28,8 +28,10 @@
#include <embb_mtapi_network_test_buffer.h> #include <embb_mtapi_network_test_buffer.h>
#include <embb_mtapi_network_test_socket.h> #include <embb_mtapi_network_test_socket.h>
#include <embb_mtapi_network_test_task.h>
PT_MAIN("MTAPI NETWORK") { PT_MAIN("MTAPI NETWORK") {
PT_RUN(NetworkBufferTest); PT_RUN(NetworkBufferTest);
PT_RUN(NetworkSocketTest); PT_RUN(NetworkSocketTest);
PT_RUN(NetworkTaskTest);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment