From 7f7e33d261fee95ec45eb91fa8d7a97aab4471ac Mon Sep 17 00:00:00 2001 From: Marcus Winter Date: Wed, 11 Mar 2015 17:18:25 +0100 Subject: [PATCH] mtapi_c: added multi-instance task support and test --- mtapi_c/src/embb_mtapi_scheduler_t.c | 30 ++++++++++++++++++------------ mtapi_c/src/embb_mtapi_scheduler_t.h | 3 ++- mtapi_c/src/embb_mtapi_task_context_t.c | 4 ++-- mtapi_c/src/embb_mtapi_task_t.c | 62 ++++++++++++++++++++++++++++++++++++++++++-------------------- mtapi_c/src/embb_mtapi_task_t.h | 3 ++- mtapi_c/test/embb_mtapi_test_task.cc | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- mtapi_c/test/main.cc | 4 +++- 7 files changed, 166 insertions(+), 39 deletions(-) diff --git a/mtapi_c/src/embb_mtapi_scheduler_t.c b/mtapi_c/src/embb_mtapi_scheduler_t.c index 3e95e95..5e9bf4c 100644 --- a/mtapi_c/src/embb_mtapi_scheduler_t.c +++ b/mtapi_c/src/embb_mtapi_scheduler_t.c @@ -304,13 +304,16 @@ int embb_mtapi_scheduler_worker(void * arg) { switch (task->state) { case MTAPI_TASK_SCHEDULED: + /* multi-instance task, another instance might be running */ + case MTAPI_TASK_RUNNING: /* there was work, execute it */ embb_mtapi_task_context_initialize_with_thread_context_and_task( &task_context, thread_context, task); - embb_mtapi_task_execute(task, &task_context); - /* tell queue that a task is done */ - if (MTAPI_NULL != local_queue) { - embb_mtapi_queue_task_finished(local_queue); + if (embb_mtapi_task_execute(task, &task_context)) { + /* tell queue that a task is done */ + if (MTAPI_NULL != local_queue) { + embb_mtapi_queue_task_finished(local_queue); + } } counter = 0; break; @@ -318,25 +321,27 @@ int embb_mtapi_scheduler_worker(void * arg) { case MTAPI_TASK_RETAINED: /* put task into queue again for later execution */ embb_mtapi_scheduler_schedule_task( - node->scheduler, task); + node->scheduler, task, 0); /* yield, as there may be only retained tasks in the queue */ embb_thread_yield(); /* task is not done, so do not notify queue */ break; case MTAPI_TASK_CANCELLED: - /* set return value to cancelled */ + /* set return value to canceled */ task->error_code = MTAPI_ERR_ACTION_CANCELLED; - /* tell queue that a task is done */ - if (MTAPI_NULL != local_queue) { - embb_mtapi_queue_task_finished(local_queue); + if (embb_atomic_fetch_and_add_unsigned_int( + &task->instances_todo, (unsigned int)-1) == 0) { + /* tell queue that a task is done */ + if (MTAPI_NULL != local_queue) { + embb_mtapi_queue_task_finished(local_queue); + } } break; case MTAPI_TASK_COMPLETED: case MTAPI_TASK_DELETED: case MTAPI_TASK_WAITING: - case MTAPI_TASK_RUNNING: case MTAPI_TASK_CREATED: case MTAPI_TASK_PRENATAL: case MTAPI_TASK_ERROR: @@ -526,10 +531,11 @@ mtapi_boolean_t embb_mtapi_scheduler_process_tasks( mtapi_boolean_t embb_mtapi_scheduler_schedule_task( embb_mtapi_scheduler_t * that, - embb_mtapi_task_t * task) { + embb_mtapi_task_t * task, + mtapi_uint_t instance) { embb_mtapi_scheduler_t * scheduler = that; /* distribute round robin */ - mtapi_uint_t ii = task->handle.id % scheduler->worker_count; + mtapi_uint_t ii = (task->handle.id + instance) % scheduler->worker_count; mtapi_boolean_t pushed = MTAPI_FALSE; embb_mtapi_node_t* node = embb_mtapi_node_get_instance(); diff --git a/mtapi_c/src/embb_mtapi_scheduler_t.h b/mtapi_c/src/embb_mtapi_scheduler_t.h index 07ff116..9b340d2 100644 --- a/mtapi_c/src/embb_mtapi_scheduler_t.h +++ b/mtapi_c/src/embb_mtapi_scheduler_t.h @@ -198,7 +198,8 @@ mtapi_boolean_t embb_mtapi_scheduler_process_tasks( */ mtapi_boolean_t embb_mtapi_scheduler_schedule_task( embb_mtapi_scheduler_t * that, - embb_mtapi_task_t * task); + embb_mtapi_task_t * task, + mtapi_uint_t instance); #ifdef __cplusplus diff --git a/mtapi_c/src/embb_mtapi_task_context_t.c b/mtapi_c/src/embb_mtapi_task_context_t.c index 92be087..a74088a 100644 --- a/mtapi_c/src/embb_mtapi_task_context_t.c +++ b/mtapi_c/src/embb_mtapi_task_context_t.c @@ -51,8 +51,8 @@ void embb_mtapi_task_context_initialize_with_thread_context_and_task( that->task = task; that->thread_context = thread_context; that->num_instances = task->attributes.num_instances; - that->instance_num = embb_atomic_fetch_and_add_unsigned_int( - &task->current_instance, 1); + that->instance_num = + embb_atomic_fetch_and_add_unsigned_int(&task->current_instance, 1); } void embb_mtapi_task_context_finalize(embb_mtapi_task_context_t* that) { diff --git a/mtapi_c/src/embb_mtapi_task_t.c b/mtapi_c/src/embb_mtapi_task_t.c index b26bcd1..7f0c27f 100644 --- a/mtapi_c/src/embb_mtapi_task_t.c +++ b/mtapi_c/src/embb_mtapi_task_t.c @@ -95,9 +95,11 @@ void embb_mtapi_task_finalize(embb_mtapi_task_t* that) { embb_mtapi_spinlock_finalize(&that->state_lock); } -void embb_mtapi_task_execute( +mtapi_boolean_t embb_mtapi_task_execute( embb_mtapi_task_t* that, embb_mtapi_task_context_t * context) { + unsigned int todo = that->attributes.num_instances; + assert(MTAPI_NULL != that); assert(MTAPI_NULL != context); @@ -110,17 +112,25 @@ void embb_mtapi_task_execute( embb_mtapi_action_t* local_action = embb_mtapi_action_pool_get_storage_for_handle( context->thread_context->node->action_pool, that->action); - local_action->action_function( - that->arguments, - that->arguments_size, - that->result_buffer, - that->result_size, - local_action->node_local_data, - local_action->node_local_data_size, - context); + /* only continue if there was no error so far */ + if (context->task->error_code == MTAPI_SUCCESS) { + local_action->action_function( + that->arguments, + that->arguments_size, + that->result_buffer, + that->result_size, + local_action->node_local_data, + local_action->node_local_data_size, + context); + } embb_atomic_memory_barrier(); - /* task has completed successfully */ - embb_mtapi_task_set_state(that, MTAPI_TASK_COMPLETED); + todo = embb_atomic_fetch_and_add_unsigned_int( + &that->instances_todo, (unsigned int)-1); + + if (todo == 1) { + /* task has completed successfully */ + embb_mtapi_task_set_state(that, MTAPI_TASK_COMPLETED); + } embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); } else { /* action was deleted, task did not complete */ @@ -128,13 +138,18 @@ void embb_mtapi_task_execute( embb_mtapi_task_set_state(that, MTAPI_TASK_ERROR); } - /* is task associated with a group? */ - if (embb_mtapi_group_pool_is_handle_valid( - context->thread_context->node->group_pool, that->group)) { - embb_mtapi_group_t* local_group = - embb_mtapi_group_pool_get_storage_for_handle( - context->thread_context->node->group_pool, that->group); - embb_mtapi_task_queue_push(&local_group->queue, that); + if (todo == 1) { + /* is task associated with a group? */ + if (embb_mtapi_group_pool_is_handle_valid( + context->thread_context->node->group_pool, that->group)) { + embb_mtapi_group_t* local_group = + embb_mtapi_group_pool_get_storage_for_handle( + context->thread_context->node->group_pool, that->group); + embb_mtapi_task_queue_push(&local_group->queue, that); + } + return MTAPI_TRUE; + } else { + return MTAPI_FALSE; } } @@ -189,6 +204,9 @@ static mtapi_task_hndl_t embb_mtapi_task_start( mtapi_taskattr_init(&task->attributes, &local_status); } + embb_atomic_store_unsigned_int( + &task->instances_todo, task->attributes.num_instances); + if (embb_mtapi_group_pool_is_handle_valid(node->group_pool, group)) { embb_mtapi_group_t* local_group = embb_mtapi_group_pool_get_storage_for_handle( @@ -233,8 +251,12 @@ static mtapi_task_hndl_t embb_mtapi_task_start( embb_mtapi_task_set_state(task, MTAPI_TASK_SCHEDULED); - was_scheduled = - embb_mtapi_scheduler_schedule_task(scheduler, task); + was_scheduled = MTAPI_TRUE; + + for (mtapi_uint_t kk = 0; kk < task->attributes.num_instances; kk++) { + was_scheduled = was_scheduled & + embb_mtapi_scheduler_schedule_task(scheduler, task, kk); + } if (was_scheduled) { /* if task is detached, do not return a handle, it will be deleted diff --git a/mtapi_c/src/embb_mtapi_task_t.h b/mtapi_c/src/embb_mtapi_task_t.h index 77617b3..f5dadc5 100644 --- a/mtapi_c/src/embb_mtapi_task_t.h +++ b/mtapi_c/src/embb_mtapi_task_t.h @@ -68,6 +68,7 @@ struct embb_mtapi_task_struct { embb_mtapi_spinlock_t state_lock; volatile mtapi_task_state_t state; embb_atomic_unsigned_int current_instance; + embb_atomic_unsigned_int instances_todo; mtapi_status_t error_code; }; @@ -106,7 +107,7 @@ void embb_mtapi_task_finalize(embb_mtapi_task_t* that); * detached. * \memberof embb_mtapi_task_struct */ -void embb_mtapi_task_execute( +mtapi_boolean_t embb_mtapi_task_execute( embb_mtapi_task_t* that, embb_mtapi_task_context_t * context); diff --git a/mtapi_c/test/embb_mtapi_test_task.cc b/mtapi_c/test/embb_mtapi_test_task.cc index 325f646..8d1d4b4 100644 --- a/mtapi_c/test/embb_mtapi_test_task.cc +++ b/mtapi_c/test/embb_mtapi_test_task.cc @@ -33,6 +33,7 @@ #include #define JOB_TEST_TASK 42 +#define JOB_TEST_MULTIINSTANCE_TASK 43 #define TASK_TEST_ID 23 static void testTaskAction( @@ -44,7 +45,9 @@ static void testTaskAction( mtapi_size_t /*node_local_data_size*/, mtapi_task_context_t* task_context) { int ii; - mtapi_uint_t core_num = mtapi_context_corenum_get(task_context, MTAPI_NULL); + mtapi_status_t status; + mtapi_uint_t core_num = mtapi_context_corenum_get(task_context, &status); + MTAPI_CHECK_STATUS(status); srand(core_num); for (ii = 1000; ii < rand()%1000000; ii ++) { } @@ -53,6 +56,42 @@ static void testTaskAction( EMBB_UNUSED(args); } +void testMultiInstanceTaskAction( + const void* args, + mtapi_size_t arg_size, + void* result_buffer, + mtapi_size_t result_buffer_size, + const void* node_local_data, + mtapi_size_t node_local_data_size, + mtapi_task_context_t* task_context) +{ + EMBB_UNUSED(args); + EMBB_UNUSED(arg_size); + + EMBB_UNUSED(node_local_data); + EMBB_UNUSED(node_local_data_size); + + mtapi_status_t status; + mtapi_uint_t this_instance, num_instances; + mtapi_uint_t* result; + num_instances = mtapi_context_numinst_get(task_context, &status); + this_instance = mtapi_context_instnum_get(task_context, &status); + + /* check result buffer size... */ + if (result_buffer_size == sizeof(int) * num_instances) { + /* ... and cast the result buffer */ + result = (mtapi_uint_t*)result_buffer; + } else { + mtapi_context_status_set(task_context, MTAPI_ERR_RESULT_SIZE, &status); + MTAPI_CHECK_STATUS(status); + return; + } + + /* dummy for calculating result */ + result[this_instance] = this_instance; +} + + static void testDoSomethingElse() { } @@ -160,10 +199,66 @@ void TaskTest::TestBasic() { MTAPI_CHECK_STATUS(status); status = MTAPI_ERR_UNKNOWN; + mtapi_action_hndl_t multiinstance_action = mtapi_action_create( + JOB_TEST_MULTIINSTANCE_TASK, + testMultiInstanceTaskAction, + MTAPI_NULL, + 0, + &action_attr, + &status); + MTAPI_CHECK_STATUS(status); + + status = MTAPI_ERR_UNKNOWN; + mtapi_job_hndl_t multiinstance_job = mtapi_job_get( + JOB_TEST_MULTIINSTANCE_TASK, THIS_DOMAIN_ID, &status); + MTAPI_CHECK_STATUS(status); + + mtapi_task_attributes_t task_attr; + + status = MTAPI_ERR_UNKNOWN; + mtapi_taskattr_init(&task_attr, &status); + MTAPI_CHECK_STATUS(status); + + const int task_instances = 5; + + status = MTAPI_ERR_UNKNOWN; + mtapi_taskattr_set(&task_attr, MTAPI_TASK_INSTANCES, + MTAPI_ATTRIBUTE_VALUE(task_instances), MTAPI_ATTRIBUTE_POINTER_AS_VALUE, + &status); + MTAPI_CHECK_STATUS(status); + + mtapi_uint_t result[task_instances]; + for (mtapi_uint_t ii = 0; ii < task_instances; ii++) { + result[ii] = task_instances + 1; + } + + status = MTAPI_ERR_UNKNOWN; + mtapi_task_hndl_t multiinstance_task = + mtapi_task_start(MTAPI_TASK_ID_NONE, multiinstance_job, + MTAPI_NULL, 0, + &result[0], sizeof(mtapi_uint_t) * task_instances, + &task_attr, + MTAPI_GROUP_NONE, + &status); + MTAPI_CHECK_STATUS(status); + + status = MTAPI_ERR_UNKNOWN; + mtapi_task_wait(multiinstance_task, MTAPI_INFINITE, &status); + MTAPI_CHECK_STATUS(status); + + for (mtapi_uint_t ii = 0; ii < task_instances; ii++) { + PT_EXPECT_EQ(result[ii], ii); + } + + status = MTAPI_ERR_UNKNOWN; + mtapi_action_delete(multiinstance_action, 10, &status); + MTAPI_CHECK_STATUS(status); + + status = MTAPI_ERR_UNKNOWN; mtapi_finalize(&status); MTAPI_CHECK_STATUS(status); - PT_EXPECT(embb_get_bytes_allocated() == 0); + PT_EXPECT_EQ(embb_get_bytes_allocated(), 0u); embb_mtapi_log_info("...done\n\n"); } diff --git a/mtapi_c/test/main.cc b/mtapi_c/test/main.cc index 6d8b6c2..2c91b7e 100644 --- a/mtapi_c/test/main.cc +++ b/mtapi_c/test/main.cc @@ -25,6 +25,7 @@ */ #include +#include #include @@ -38,10 +39,11 @@ PT_MAIN("MTAPI C") { embb_log_set_log_level(EMBB_LOG_LEVEL_NONE); + embb_thread_set_max_count(1024); + PT_RUN(TaskTest); PT_RUN(ErrorTest); PT_RUN(InitFinalizeTest); - PT_RUN(TaskTest); PT_RUN(GroupTest); PT_RUN(QueueTest); } -- libgit2 0.26.0