Commit 7f7e33d2 by Marcus Winter

mtapi_c: added multi-instance task support and test

parent eeb14df9
...@@ -304,39 +304,44 @@ int embb_mtapi_scheduler_worker(void * arg) { ...@@ -304,39 +304,44 @@ int embb_mtapi_scheduler_worker(void * arg) {
switch (task->state) { switch (task->state) {
case MTAPI_TASK_SCHEDULED: case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
case MTAPI_TASK_RUNNING:
/* there was work, execute it */ /* there was work, execute it */
embb_mtapi_task_context_initialize_with_thread_context_and_task( embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, task); &task_context, thread_context, task);
embb_mtapi_task_execute(task, &task_context); if (embb_mtapi_task_execute(task, &task_context)) {
/* tell queue that a task is done */ /* tell queue that a task is done */
if (MTAPI_NULL != local_queue) { if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue); embb_mtapi_queue_task_finished(local_queue);
} }
}
counter = 0; counter = 0;
break; break;
case MTAPI_TASK_RETAINED: case MTAPI_TASK_RETAINED:
/* put task into queue again for later execution */ /* put task into queue again for later execution */
embb_mtapi_scheduler_schedule_task( embb_mtapi_scheduler_schedule_task(
node->scheduler, task); node->scheduler, task, 0);
/* yield, as there may be only retained tasks in the queue */ /* yield, as there may be only retained tasks in the queue */
embb_thread_yield(); embb_thread_yield();
/* task is not done, so do not notify queue */ /* task is not done, so do not notify queue */
break; break;
case MTAPI_TASK_CANCELLED: case MTAPI_TASK_CANCELLED:
/* set return value to cancelled */ /* set return value to canceled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED; task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
/* tell queue that a task is done */ /* tell queue that a task is done */
if (MTAPI_NULL != local_queue) { if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue); embb_mtapi_queue_task_finished(local_queue);
} }
}
break; break;
case MTAPI_TASK_COMPLETED: case MTAPI_TASK_COMPLETED:
case MTAPI_TASK_DELETED: case MTAPI_TASK_DELETED:
case MTAPI_TASK_WAITING: case MTAPI_TASK_WAITING:
case MTAPI_TASK_RUNNING:
case MTAPI_TASK_CREATED: case MTAPI_TASK_CREATED:
case MTAPI_TASK_PRENATAL: case MTAPI_TASK_PRENATAL:
case MTAPI_TASK_ERROR: case MTAPI_TASK_ERROR:
...@@ -526,10 +531,11 @@ mtapi_boolean_t embb_mtapi_scheduler_process_tasks( ...@@ -526,10 +531,11 @@ mtapi_boolean_t embb_mtapi_scheduler_process_tasks(
mtapi_boolean_t embb_mtapi_scheduler_schedule_task( mtapi_boolean_t embb_mtapi_scheduler_schedule_task(
embb_mtapi_scheduler_t * that, embb_mtapi_scheduler_t * that,
embb_mtapi_task_t * task) { embb_mtapi_task_t * task,
mtapi_uint_t instance) {
embb_mtapi_scheduler_t * scheduler = that; embb_mtapi_scheduler_t * scheduler = that;
/* distribute round robin */ /* distribute round robin */
mtapi_uint_t ii = task->handle.id % scheduler->worker_count; mtapi_uint_t ii = (task->handle.id + instance) % scheduler->worker_count;
mtapi_boolean_t pushed = MTAPI_FALSE; mtapi_boolean_t pushed = MTAPI_FALSE;
embb_mtapi_node_t* node = embb_mtapi_node_get_instance(); embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
......
...@@ -198,7 +198,8 @@ mtapi_boolean_t embb_mtapi_scheduler_process_tasks( ...@@ -198,7 +198,8 @@ mtapi_boolean_t embb_mtapi_scheduler_process_tasks(
*/ */
mtapi_boolean_t embb_mtapi_scheduler_schedule_task( mtapi_boolean_t embb_mtapi_scheduler_schedule_task(
embb_mtapi_scheduler_t * that, embb_mtapi_scheduler_t * that,
embb_mtapi_task_t * task); embb_mtapi_task_t * task,
mtapi_uint_t instance);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -51,8 +51,8 @@ void embb_mtapi_task_context_initialize_with_thread_context_and_task( ...@@ -51,8 +51,8 @@ void embb_mtapi_task_context_initialize_with_thread_context_and_task(
that->task = task; that->task = task;
that->thread_context = thread_context; that->thread_context = thread_context;
that->num_instances = task->attributes.num_instances; that->num_instances = task->attributes.num_instances;
that->instance_num = embb_atomic_fetch_and_add_unsigned_int( that->instance_num =
&task->current_instance, 1); embb_atomic_fetch_and_add_unsigned_int(&task->current_instance, 1);
} }
void embb_mtapi_task_context_finalize(embb_mtapi_task_context_t* that) { void embb_mtapi_task_context_finalize(embb_mtapi_task_context_t* that) {
......
...@@ -95,9 +95,11 @@ void embb_mtapi_task_finalize(embb_mtapi_task_t* that) { ...@@ -95,9 +95,11 @@ void embb_mtapi_task_finalize(embb_mtapi_task_t* that) {
embb_mtapi_spinlock_finalize(&that->state_lock); embb_mtapi_spinlock_finalize(&that->state_lock);
} }
void embb_mtapi_task_execute( mtapi_boolean_t embb_mtapi_task_execute(
embb_mtapi_task_t* that, embb_mtapi_task_t* that,
embb_mtapi_task_context_t * context) { embb_mtapi_task_context_t * context) {
unsigned int todo = that->attributes.num_instances;
assert(MTAPI_NULL != that); assert(MTAPI_NULL != that);
assert(MTAPI_NULL != context); assert(MTAPI_NULL != context);
...@@ -110,6 +112,8 @@ void embb_mtapi_task_execute( ...@@ -110,6 +112,8 @@ void embb_mtapi_task_execute(
embb_mtapi_action_t* local_action = embb_mtapi_action_t* local_action =
embb_mtapi_action_pool_get_storage_for_handle( embb_mtapi_action_pool_get_storage_for_handle(
context->thread_context->node->action_pool, that->action); context->thread_context->node->action_pool, that->action);
/* only continue if there was no error so far */
if (context->task->error_code == MTAPI_SUCCESS) {
local_action->action_function( local_action->action_function(
that->arguments, that->arguments,
that->arguments_size, that->arguments_size,
...@@ -118,9 +122,15 @@ void embb_mtapi_task_execute( ...@@ -118,9 +122,15 @@ void embb_mtapi_task_execute(
local_action->node_local_data, local_action->node_local_data,
local_action->node_local_data_size, local_action->node_local_data_size,
context); context);
}
embb_atomic_memory_barrier(); embb_atomic_memory_barrier();
todo = embb_atomic_fetch_and_add_unsigned_int(
&that->instances_todo, (unsigned int)-1);
if (todo == 1) {
/* task has completed successfully */ /* task has completed successfully */
embb_mtapi_task_set_state(that, MTAPI_TASK_COMPLETED); embb_mtapi_task_set_state(that, MTAPI_TASK_COMPLETED);
}
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
} else { } else {
/* action was deleted, task did not complete */ /* action was deleted, task did not complete */
...@@ -128,6 +138,7 @@ void embb_mtapi_task_execute( ...@@ -128,6 +138,7 @@ void embb_mtapi_task_execute(
embb_mtapi_task_set_state(that, MTAPI_TASK_ERROR); embb_mtapi_task_set_state(that, MTAPI_TASK_ERROR);
} }
if (todo == 1) {
/* is task associated with a group? */ /* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid( if (embb_mtapi_group_pool_is_handle_valid(
context->thread_context->node->group_pool, that->group)) { context->thread_context->node->group_pool, that->group)) {
...@@ -136,6 +147,10 @@ void embb_mtapi_task_execute( ...@@ -136,6 +147,10 @@ void embb_mtapi_task_execute(
context->thread_context->node->group_pool, that->group); context->thread_context->node->group_pool, that->group);
embb_mtapi_task_queue_push(&local_group->queue, that); embb_mtapi_task_queue_push(&local_group->queue, that);
} }
return MTAPI_TRUE;
} else {
return MTAPI_FALSE;
}
} }
void embb_mtapi_task_set_state( void embb_mtapi_task_set_state(
...@@ -189,6 +204,9 @@ static mtapi_task_hndl_t embb_mtapi_task_start( ...@@ -189,6 +204,9 @@ static mtapi_task_hndl_t embb_mtapi_task_start(
mtapi_taskattr_init(&task->attributes, &local_status); mtapi_taskattr_init(&task->attributes, &local_status);
} }
embb_atomic_store_unsigned_int(
&task->instances_todo, task->attributes.num_instances);
if (embb_mtapi_group_pool_is_handle_valid(node->group_pool, group)) { if (embb_mtapi_group_pool_is_handle_valid(node->group_pool, group)) {
embb_mtapi_group_t* local_group = embb_mtapi_group_t* local_group =
embb_mtapi_group_pool_get_storage_for_handle( embb_mtapi_group_pool_get_storage_for_handle(
...@@ -233,8 +251,12 @@ static mtapi_task_hndl_t embb_mtapi_task_start( ...@@ -233,8 +251,12 @@ static mtapi_task_hndl_t embb_mtapi_task_start(
embb_mtapi_task_set_state(task, MTAPI_TASK_SCHEDULED); embb_mtapi_task_set_state(task, MTAPI_TASK_SCHEDULED);
was_scheduled = was_scheduled = MTAPI_TRUE;
embb_mtapi_scheduler_schedule_task(scheduler, task);
for (mtapi_uint_t kk = 0; kk < task->attributes.num_instances; kk++) {
was_scheduled = was_scheduled &
embb_mtapi_scheduler_schedule_task(scheduler, task, kk);
}
if (was_scheduled) { if (was_scheduled) {
/* if task is detached, do not return a handle, it will be deleted /* if task is detached, do not return a handle, it will be deleted
......
...@@ -68,6 +68,7 @@ struct embb_mtapi_task_struct { ...@@ -68,6 +68,7 @@ struct embb_mtapi_task_struct {
embb_mtapi_spinlock_t state_lock; embb_mtapi_spinlock_t state_lock;
volatile mtapi_task_state_t state; volatile mtapi_task_state_t state;
embb_atomic_unsigned_int current_instance; embb_atomic_unsigned_int current_instance;
embb_atomic_unsigned_int instances_todo;
mtapi_status_t error_code; mtapi_status_t error_code;
}; };
...@@ -106,7 +107,7 @@ void embb_mtapi_task_finalize(embb_mtapi_task_t* that); ...@@ -106,7 +107,7 @@ void embb_mtapi_task_finalize(embb_mtapi_task_t* that);
* detached. * detached.
* \memberof embb_mtapi_task_struct * \memberof embb_mtapi_task_struct
*/ */
void embb_mtapi_task_execute( mtapi_boolean_t embb_mtapi_task_execute(
embb_mtapi_task_t* that, embb_mtapi_task_t* that,
embb_mtapi_task_context_t * context); embb_mtapi_task_context_t * context);
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include <embb/base/c/internal/unused.h> #include <embb/base/c/internal/unused.h>
#define JOB_TEST_TASK 42 #define JOB_TEST_TASK 42
#define JOB_TEST_MULTIINSTANCE_TASK 43
#define TASK_TEST_ID 23 #define TASK_TEST_ID 23
static void testTaskAction( static void testTaskAction(
...@@ -44,7 +45,9 @@ static void testTaskAction( ...@@ -44,7 +45,9 @@ static void testTaskAction(
mtapi_size_t /*node_local_data_size*/, mtapi_size_t /*node_local_data_size*/,
mtapi_task_context_t* task_context) { mtapi_task_context_t* task_context) {
int ii; int ii;
mtapi_uint_t core_num = mtapi_context_corenum_get(task_context, MTAPI_NULL); mtapi_status_t status;
mtapi_uint_t core_num = mtapi_context_corenum_get(task_context, &status);
MTAPI_CHECK_STATUS(status);
srand(core_num); srand(core_num);
for (ii = 1000; ii < rand()%1000000; ii ++) { for (ii = 1000; ii < rand()%1000000; ii ++) {
} }
...@@ -53,6 +56,42 @@ static void testTaskAction( ...@@ -53,6 +56,42 @@ static void testTaskAction(
EMBB_UNUSED(args); EMBB_UNUSED(args);
} }
void testMultiInstanceTaskAction(
const void* args,
mtapi_size_t arg_size,
void* result_buffer,
mtapi_size_t result_buffer_size,
const void* node_local_data,
mtapi_size_t node_local_data_size,
mtapi_task_context_t* task_context)
{
EMBB_UNUSED(args);
EMBB_UNUSED(arg_size);
EMBB_UNUSED(node_local_data);
EMBB_UNUSED(node_local_data_size);
mtapi_status_t status;
mtapi_uint_t this_instance, num_instances;
mtapi_uint_t* result;
num_instances = mtapi_context_numinst_get(task_context, &status);
this_instance = mtapi_context_instnum_get(task_context, &status);
/* check result buffer size... */
if (result_buffer_size == sizeof(int) * num_instances) {
/* ... and cast the result buffer */
result = (mtapi_uint_t*)result_buffer;
} else {
mtapi_context_status_set(task_context, MTAPI_ERR_RESULT_SIZE, &status);
MTAPI_CHECK_STATUS(status);
return;
}
/* dummy for calculating result */
result[this_instance] = this_instance;
}
static void testDoSomethingElse() { static void testDoSomethingElse() {
} }
...@@ -160,10 +199,66 @@ void TaskTest::TestBasic() { ...@@ -160,10 +199,66 @@ void TaskTest::TestBasic() {
MTAPI_CHECK_STATUS(status); MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN; status = MTAPI_ERR_UNKNOWN;
mtapi_action_hndl_t multiinstance_action = mtapi_action_create(
JOB_TEST_MULTIINSTANCE_TASK,
testMultiInstanceTaskAction,
MTAPI_NULL,
0,
&action_attr,
&status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_job_hndl_t multiinstance_job = mtapi_job_get(
JOB_TEST_MULTIINSTANCE_TASK, THIS_DOMAIN_ID, &status);
MTAPI_CHECK_STATUS(status);
mtapi_task_attributes_t task_attr;
status = MTAPI_ERR_UNKNOWN;
mtapi_taskattr_init(&task_attr, &status);
MTAPI_CHECK_STATUS(status);
const int task_instances = 5;
status = MTAPI_ERR_UNKNOWN;
mtapi_taskattr_set(&task_attr, MTAPI_TASK_INSTANCES,
MTAPI_ATTRIBUTE_VALUE(task_instances), MTAPI_ATTRIBUTE_POINTER_AS_VALUE,
&status);
MTAPI_CHECK_STATUS(status);
mtapi_uint_t result[task_instances];
for (mtapi_uint_t ii = 0; ii < task_instances; ii++) {
result[ii] = task_instances + 1;
}
status = MTAPI_ERR_UNKNOWN;
mtapi_task_hndl_t multiinstance_task =
mtapi_task_start(MTAPI_TASK_ID_NONE, multiinstance_job,
MTAPI_NULL, 0,
&result[0], sizeof(mtapi_uint_t) * task_instances,
&task_attr,
MTAPI_GROUP_NONE,
&status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_task_wait(multiinstance_task, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
for (mtapi_uint_t ii = 0; ii < task_instances; ii++) {
PT_EXPECT_EQ(result[ii], ii);
}
status = MTAPI_ERR_UNKNOWN;
mtapi_action_delete(multiinstance_action, 10, &status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
mtapi_finalize(&status); mtapi_finalize(&status);
MTAPI_CHECK_STATUS(status); MTAPI_CHECK_STATUS(status);
PT_EXPECT(embb_get_bytes_allocated() == 0); PT_EXPECT_EQ(embb_get_bytes_allocated(), 0u);
embb_mtapi_log_info("...done\n\n"); embb_mtapi_log_info("...done\n\n");
} }
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
*/ */
#include <partest/partest.h> #include <partest/partest.h>
#include <embb/base/c/thread.h>
#include <stdio.h> #include <stdio.h>
...@@ -38,10 +39,11 @@ ...@@ -38,10 +39,11 @@
PT_MAIN("MTAPI C") { PT_MAIN("MTAPI C") {
embb_log_set_log_level(EMBB_LOG_LEVEL_NONE); embb_log_set_log_level(EMBB_LOG_LEVEL_NONE);
embb_thread_set_max_count(1024);
PT_RUN(TaskTest);
PT_RUN(ErrorTest); PT_RUN(ErrorTest);
PT_RUN(InitFinalizeTest); PT_RUN(InitFinalizeTest);
PT_RUN(TaskTest);
PT_RUN(GroupTest); PT_RUN(GroupTest);
PT_RUN(QueueTest); PT_RUN(QueueTest);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment