Commit 32029a06 by Marcus Winter

Merge branch 'embb620_detached_tasks' into development

parents 2fa83029 40180ee8
......@@ -213,6 +213,29 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context(
return context;
}
void embb_mtapi_scheduler_finalize_task(
embb_mtapi_task_t * task,
embb_mtapi_node_t * node,
embb_mtapi_queue_t * queue,
embb_mtapi_group_t * group) {
/* tell queue that a task is done */
if (MTAPI_NULL != queue) {
embb_mtapi_queue_task_finished(queue);
}
/* move task to group queue */
if (MTAPI_NULL != group) {
embb_mtapi_task_queue_push(&group->queue, task);
}
/* issue task complete callback if set */
if (MTAPI_NULL != task->attributes.complete_func) {
task->attributes.complete_func(task->handle, MTAPI_NULL);
}
/* delete task if detached */
if (MTAPI_TRUE == task->attributes.is_detached) {
embb_mtapi_task_delete(task, node->task_pool);
}
}
mtapi_boolean_t embb_mtapi_scheduler_execute_task(
embb_mtapi_task_t * task,
embb_mtapi_node_t * node,
......@@ -254,10 +277,7 @@ mtapi_boolean_t embb_mtapi_scheduler_execute_task(
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, task);
if (embb_mtapi_task_execute(task, &task_context)) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
embb_mtapi_scheduler_finalize_task(task, node, local_queue, local_group);
}
result = MTAPI_TRUE;
break;
......@@ -276,13 +296,7 @@ mtapi_boolean_t embb_mtapi_scheduler_execute_task(
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
if (MTAPI_NULL != local_group) {
embb_mtapi_task_queue_push(&local_group->queue, task);
}
embb_mtapi_scheduler_finalize_task(task, node, local_queue, local_group);
}
if (MTAPI_NULL != local_action) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
......@@ -301,11 +315,6 @@ mtapi_boolean_t embb_mtapi_scheduler_execute_task(
break;
}
/* issue task complete callback if set */
if (MTAPI_NULL != task->attributes.complete_func) {
task->attributes.complete_func(task->handle, MTAPI_NULL);
}
return result;
}
......
......@@ -40,6 +40,7 @@ extern "C" {
/* ---- FORWARD DECLARATIONS ----------------------------------------------- */
#include <embb_mtapi_queue_t_fwd.h>
#include <embb_mtapi_group_t_fwd.h>
#include <embb_mtapi_thread_context_t_fwd.h>
#include <embb_mtapi_task_t_fwd.h>
#include <embb_mtapi_node_t_fwd.h>
......@@ -138,6 +139,16 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context(
embb_mtapi_scheduler_t * that);
/**
* Processes finished task.
* Notifies associated group and queue and deletes task if it is detached.
*/
void embb_mtapi_scheduler_finalize_task(
embb_mtapi_task_t * task,
embb_mtapi_node_t * node,
embb_mtapi_queue_t * queue,
embb_mtapi_group_t * group);
/**
* Executes the given task if the thread context is valid.
* \memberof embb_mtapi_scheduler_struct
*/
......
......@@ -139,14 +139,6 @@ mtapi_boolean_t embb_mtapi_task_execute(
}
if (todo == 1) {
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
context->thread_context->node->group_pool, that->group)) {
embb_mtapi_group_t* local_group =
embb_mtapi_group_pool_get_storage_for_handle(
context->thread_context->node->group_pool, that->group);
embb_mtapi_task_queue_push(&local_group->queue, that);
}
return MTAPI_TRUE;
} else {
return MTAPI_FALSE;
......
......@@ -34,6 +34,7 @@
#define JOB_TEST_TASK 42
#define JOB_TEST_MULTIINSTANCE_TASK 43
#define JOB_TEST_DETACHED_TASK 44
#define TASK_TEST_ID 23
static void testTaskAction(
......@@ -56,6 +57,16 @@ static void testTaskAction(
EMBB_UNUSED(args);
}
static void testDetachedTaskAction(
const void* /*args*/,
mtapi_size_t /*arg_size*/,
void* /*result_buffer*/,
mtapi_size_t /*result_buffer_size*/,
const void* /*node_local_data*/,
mtapi_size_t /*node_local_data_size*/,
mtapi_task_context_t* /*task_context*/) {
}
void testMultiInstanceTaskAction(
const void* args,
mtapi_size_t arg_size,
......@@ -98,45 +109,86 @@ TaskTest::TaskTest() {
CreateUnit("mtapi task test").Add(&TaskTest::TestBasic, this);
}
void TaskTest::TestBasic() {
mtapi_node_attributes_t node_attr;
mtapi_action_attributes_t action_attr;
mtapi_affinity_t affinity;
mtapi_info_t info;
void TaskTest::TrySimple() {
mtapi_status_t status;
mtapi_affinity_t affinity;
mtapi_action_hndl_t action;
mtapi_action_attributes_t action_attr;
mtapi_job_hndl_t job;
mtapi_task_hndl_t task[100];
mtapi_uint_t ii;
static const mtapi_uint_t kTaskCount = 100u;
mtapi_task_hndl_t task[kTaskCount];
embb_mtapi_log_info("running testTask...\n");
status = MTAPI_ERR_UNKNOWN;
mtapi_affinity_init(&affinity, MTAPI_TRUE, &status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_nodeattr_init(&node_attr, &status);
mtapi_actionattr_init(&action_attr, &status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_nodeattr_set(
&node_attr,
MTAPI_NODE_TYPE,
MTAPI_ATTRIBUTE_VALUE(MTAPI_NODE_TYPE_SMP),
MTAPI_ATTRIBUTE_POINTER_AS_VALUE,
mtapi_actionattr_set(
&action_attr,
MTAPI_ACTION_AFFINITY,
&affinity,
MTAPI_ACTION_AFFINITY_SIZE,
&status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_initialize(
THIS_DOMAIN_ID,
THIS_NODE_ID,
&node_attr,
&info,
action = mtapi_action_create(
JOB_TEST_TASK,
testTaskAction,
MTAPI_NULL,
0,
&action_attr,
&status);
MTAPI_CHECK_STATUS(status);
embb_mtapi_log_trace("mtapi successfully initialized...\n");
embb_mtapi_log_trace(
"hardware concurrency : %d\n", info.hardware_concurrency);
embb_mtapi_log_trace("used memory : %d\n", info.used_memory);
status = MTAPI_ERR_UNKNOWN;
job = mtapi_job_get(JOB_TEST_TASK, THIS_DOMAIN_ID, &status);
MTAPI_CHECK_STATUS(status);
for (ii = 0; ii < kTaskCount; ii++) {
status = MTAPI_ERR_UNKNOWN;
mtapi_uint_t arg = ii;
task[ii] = mtapi_task_start(
TASK_TEST_ID,
job,
reinterpret_cast<const void*>(&arg),
0,
MTAPI_NULL,
0,
MTAPI_DEFAULT_TASK_ATTRIBUTES,
MTAPI_GROUP_NONE,
&status);
MTAPI_CHECK_STATUS(status);
}
testDoSomethingElse();
for (ii = 0; ii < kTaskCount; ii++) {
status = MTAPI_ERR_UNKNOWN;
mtapi_task_wait(task[ii], 100000, &status);
MTAPI_CHECK_STATUS(status);
}
status = MTAPI_ERR_UNKNOWN;
mtapi_action_delete(action, 10, &status);
MTAPI_CHECK_STATUS(status);
}
void TaskTest::TryDetached() {
mtapi_status_t status;
mtapi_affinity_t affinity;
mtapi_action_hndl_t action;
mtapi_action_attributes_t action_attr;
mtapi_job_hndl_t job;
mtapi_uint_t ii;
static const mtapi_uint_t kTaskCount = MTAPI_NODE_MAX_TASKS_DEFAULT + 100u;
mtapi_task_attributes_t taskattr;
mtapi_boolean_t detached = MTAPI_TRUE;
status = MTAPI_ERR_UNKNOWN;
mtapi_affinity_init(&affinity, MTAPI_TRUE, &status);
......@@ -157,8 +209,8 @@ void TaskTest::TestBasic() {
status = MTAPI_ERR_UNKNOWN;
action = mtapi_action_create(
JOB_TEST_TASK,
testTaskAction,
JOB_TEST_DETACHED_TASK,
testDetachedTaskAction,
MTAPI_NULL,
0,
&action_attr,
......@@ -166,20 +218,25 @@ void TaskTest::TestBasic() {
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
job = mtapi_job_get(JOB_TEST_TASK, THIS_DOMAIN_ID, &status);
job = mtapi_job_get(JOB_TEST_DETACHED_TASK, THIS_DOMAIN_ID, &status);
MTAPI_CHECK_STATUS(status);
for (ii = 0; ii < 100u; ii++) {
mtapi_taskattr_init(&taskattr, &status);
MTAPI_CHECK_STATUS(status);
mtapi_taskattr_set(&taskattr, MTAPI_TASK_DETACHED, &detached, sizeof(detached), &status);
MTAPI_CHECK_STATUS(status);
for (ii = 0; ii < kTaskCount; ii++) {
status = MTAPI_ERR_UNKNOWN;
mtapi_uint_t arg = ii;
task[ii] = mtapi_task_start(
mtapi_task_start(
TASK_TEST_ID,
job,
reinterpret_cast<const void*>(&arg),
0,
MTAPI_NULL,
0,
MTAPI_DEFAULT_TASK_ATTRIBUTES,
&taskattr,
MTAPI_GROUP_NONE,
&status);
MTAPI_CHECK_STATUS(status);
......@@ -187,15 +244,13 @@ void TaskTest::TestBasic() {
testDoSomethingElse();
for (ii = 0; ii < 100u; ii++) {
status = MTAPI_ERR_UNKNOWN;
mtapi_task_wait(task[ii], 100000, &status);
mtapi_action_delete(action, 1000, &status);
MTAPI_CHECK_STATUS(status);
}
}
status = MTAPI_ERR_UNKNOWN;
mtapi_action_delete(action, 10, &status);
MTAPI_CHECK_STATUS(status);
void TaskTest::TryMultiInstance() {
mtapi_status_t status;
status = MTAPI_ERR_UNKNOWN;
mtapi_action_hndl_t multiinstance_action = mtapi_action_create(
......@@ -203,7 +258,7 @@ void TaskTest::TestBasic() {
testMultiInstanceTaskAction,
MTAPI_NULL,
0,
&action_attr,
MTAPI_DEFAULT_ACTION_ATTRIBUTES,
&status);
MTAPI_CHECK_STATUS(status);
......@@ -227,6 +282,7 @@ void TaskTest::TestBasic() {
MTAPI_CHECK_STATUS(status);
mtapi_uint_t result[kTaskInstances];
mtapi_uint_t ii;
for (ii = 0; ii < kTaskInstances; ii++) {
result[ii] = kTaskInstances + 1;
}
......@@ -252,6 +308,45 @@ void TaskTest::TestBasic() {
status = MTAPI_ERR_UNKNOWN;
mtapi_action_delete(multiinstance_action, 10, &status);
MTAPI_CHECK_STATUS(status);
}
void TaskTest::TestBasic() {
mtapi_node_attributes_t node_attr;
mtapi_info_t info;
mtapi_status_t status;
embb_mtapi_log_info("running testTask...\n");
status = MTAPI_ERR_UNKNOWN;
mtapi_nodeattr_init(&node_attr, &status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_nodeattr_set(
&node_attr,
MTAPI_NODE_TYPE,
MTAPI_ATTRIBUTE_VALUE(MTAPI_NODE_TYPE_SMP),
MTAPI_ATTRIBUTE_POINTER_AS_VALUE,
&status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_initialize(
THIS_DOMAIN_ID,
THIS_NODE_ID,
&node_attr,
&info,
&status);
MTAPI_CHECK_STATUS(status);
embb_mtapi_log_trace("mtapi successfully initialized...\n");
embb_mtapi_log_trace(
"hardware concurrency : %d\n", info.hardware_concurrency);
embb_mtapi_log_trace("used memory : %d\n", info.used_memory);
TrySimple();
TryDetached();
TryMultiInstance();
status = MTAPI_ERR_UNKNOWN;
mtapi_finalize(&status);
......
......@@ -35,6 +35,10 @@ class TaskTest : public partest::TestCase {
private:
void TestBasic();
void TrySimple();
void TryDetached();
void TryMultiInstance();
};
#endif // MTAPI_C_TEST_EMBB_MTAPI_TEST_TASK_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment