/* * Copyright (c) 2014, Siemens AG. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* ---- POOL STORAGE FUNCTIONS --------------------------------------------- */ embb_mtapi_pool_implementation(queue) /* ---- CLASS MEMBERS ------------------------------------------------------ */ void embb_mtapi_queue_initialize(embb_mtapi_queue_t* that) { assert(MTAPI_NULL != that); mtapi_queueattr_init(&that->attributes, MTAPI_NULL); that->queue_id = MTAPI_QUEUE_ID_NONE; embb_atomic_store_char(&that->enabled, MTAPI_FALSE); embb_atomic_store_int(&that->num_tasks, 0); that->job_handle.id = 0; that->job_handle.tag = 0; } void embb_mtapi_queue_initialize_with_attributes_and_job( embb_mtapi_queue_t* that, mtapi_queue_attributes_t* attributes, mtapi_job_hndl_t job) { assert(MTAPI_NULL != that); assert(MTAPI_NULL != attributes); that->attributes = *attributes; that->queue_id = MTAPI_QUEUE_ID_NONE; embb_atomic_store_char(&that->enabled, MTAPI_TRUE); embb_atomic_store_int(&that->num_tasks, 0); that->job_handle = job; } void embb_mtapi_queue_finalize(embb_mtapi_queue_t* that) { assert(MTAPI_NULL != that); that->job_handle.id = 0; that->job_handle.tag = 0; embb_mtapi_queue_initialize(that); } void embb_mtapi_queue_task_started(embb_mtapi_queue_t* that) { assert(MTAPI_NULL != that); embb_atomic_fetch_and_add_int(&that->num_tasks, 1); } void embb_mtapi_queue_task_finished(embb_mtapi_queue_t* that) { assert(MTAPI_NULL != that); embb_atomic_fetch_and_add_int(&that->num_tasks, -1); } static mtapi_boolean_t embb_mtapi_queue_delete_visitor( embb_mtapi_task_t * task, void * user_data) { embb_mtapi_queue_t * queue = (embb_mtapi_queue_t*)user_data; mtapi_boolean_t result = MTAPI_FALSE; assert(MTAPI_NULL != queue); assert(MTAPI_NULL != task); if (task->queue.id == queue->handle.id && task->queue.tag == queue->handle.tag) { /* task is scheduled and needs to be cancelled */ embb_mtapi_task_set_state(task, MTAPI_TASK_CANCELLED); task->error_code = MTAPI_ERR_QUEUE_DELETED; result = MTAPI_TRUE; } return result; } static mtapi_boolean_t embb_mtapi_queue_disable_visitor( embb_mtapi_task_t * task, void * user_data) { embb_mtapi_queue_t * queue = (embb_mtapi_queue_t*)user_data; mtapi_boolean_t result = MTAPI_FALSE; assert(MTAPI_NULL != queue); assert(MTAPI_NULL != task); if (task->queue.id == queue->handle.id && task->queue.tag == queue->handle.tag) { if (queue->attributes.retain) { /* task is scheduled and needs to be retained */ embb_mtapi_task_set_state(task, MTAPI_TASK_RETAINED); } else { /* task is scheduled and needs to be cancelled */ embb_mtapi_task_set_state(task, MTAPI_TASK_CANCELLED); task->error_code = MTAPI_ERR_QUEUE_DISABLED; } result = MTAPI_TRUE; } return result; } static mtapi_boolean_t embb_mtapi_queue_enable_visitor( embb_mtapi_task_t * task, void * user_data) { embb_mtapi_queue_t * queue = (embb_mtapi_queue_t*)user_data; mtapi_boolean_t result = MTAPI_FALSE; assert(MTAPI_NULL != queue); assert(MTAPI_NULL != task); if (task->queue.id == queue->handle.id && task->queue.tag == queue->handle.tag) { /* task is retained and should be scheduled */ embb_mtapi_task_set_state(task, MTAPI_TASK_SCHEDULED); result = MTAPI_TRUE; } return result; } /* ---- INTERFACE FUNCTIONS ------------------------------------------------ */ mtapi_queue_hndl_t mtapi_queue_create( MTAPI_IN mtapi_queue_id_t queue_id, MTAPI_IN mtapi_job_hndl_t job, MTAPI_IN mtapi_queue_attributes_t* attributes, MTAPI_OUT mtapi_status_t* status) { mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; embb_mtapi_node_t* node = embb_mtapi_node_get_instance(); embb_mtapi_queue_t* queue = MTAPI_NULL; mtapi_queue_hndl_t queue_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID }; mtapi_queue_attributes_t attr; embb_mtapi_log_trace("mtapi_queue_create() called\n"); if (embb_mtapi_node_is_initialized()) { queue = embb_mtapi_queue_pool_allocate(node->queue_pool); if (MTAPI_NULL != queue) { if (MTAPI_NULL != attributes) { attr = *attributes; local_status = MTAPI_SUCCESS; } else { mtapi_queueattr_init(&attr, &local_status); } if (MTAPI_SUCCESS == local_status) { if (embb_mtapi_job_is_handle_valid(node, job)) { embb_mtapi_queue_initialize_with_attributes_and_job( queue, &attr, job); /* for an ordered queue, initialize affinity */ if (queue->attributes.ordered) { mtapi_affinity_init( &queue->ordered_affinity, MTAPI_FALSE, MTAPI_NULL); mtapi_affinity_set( &queue->ordered_affinity, queue->handle.id % node->scheduler->worker_count, MTAPI_TRUE, MTAPI_NULL); } queue->queue_id = queue_id; queue_hndl = queue->handle; } else { local_status = MTAPI_ERR_JOB_INVALID; } } else { embb_mtapi_queue_pool_deallocate(node->queue_pool, queue); } } else { local_status = MTAPI_ERR_QUEUE_LIMIT; } } else { local_status = MTAPI_ERR_NODE_NOTINIT; } mtapi_status_set(status, local_status); return queue_hndl; } void mtapi_queue_set_attribute( MTAPI_IN mtapi_queue_hndl_t queue, MTAPI_IN mtapi_uint_t attribute_num, MTAPI_IN void* attribute, MTAPI_IN mtapi_size_t attribute_size, MTAPI_OUT mtapi_status_t* status) { mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; embb_mtapi_node_t* node = embb_mtapi_node_get_instance(); embb_mtapi_queue_t* local_queue; embb_mtapi_log_trace("mtapi_queue_set_attribute() called\n"); if (embb_mtapi_node_is_initialized()) { if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) { local_queue = embb_mtapi_queue_pool_get_storage_for_handle( node->queue_pool, queue); mtapi_queueattr_set(&local_queue->attributes, attribute_num, attribute, attribute_size, &local_status); } else { local_status = MTAPI_ERR_QUEUE_INVALID; } } else { local_status = MTAPI_ERR_NODE_NOTINIT; } mtapi_status_set(status, local_status); } void mtapi_queue_get_attribute( MTAPI_IN mtapi_queue_hndl_t queue, MTAPI_IN mtapi_uint_t attribute_num, MTAPI_OUT void* attribute, MTAPI_IN mtapi_size_t attribute_size, MTAPI_OUT mtapi_status_t* status ) { mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; embb_mtapi_node_t* node = embb_mtapi_node_get_instance(); embb_mtapi_queue_t* local_queue; embb_mtapi_log_trace("mtapi_queue_get_attribute() called\n"); if (embb_mtapi_node_is_initialized()) { if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) { local_queue = embb_mtapi_queue_pool_get_storage_for_handle( node->queue_pool, queue); if (MTAPI_NULL == attribute) { local_status = MTAPI_ERR_PARAMETER; } else { switch (attribute_num) { case MTAPI_QUEUE_GLOBAL: local_status = embb_mtapi_attr_get_mtapi_boolean_t( &local_queue->attributes.global, attribute, attribute_size); break; case MTAPI_QUEUE_PRIORITY: local_status = embb_mtapi_attr_get_mtapi_uint_t( &local_queue->attributes.priority, attribute, attribute_size); break; case MTAPI_QUEUE_LIMIT: local_status = embb_mtapi_attr_get_mtapi_uint_t( &local_queue->attributes.limit, attribute, attribute_size); break; case MTAPI_QUEUE_ORDERED: local_status = embb_mtapi_attr_get_mtapi_boolean_t( &local_queue->attributes.ordered, attribute, attribute_size); break; case MTAPI_QUEUE_RETAIN: local_status = embb_mtapi_attr_get_mtapi_boolean_t( &local_queue->attributes.retain, attribute, attribute_size); break; case MTAPI_QUEUE_DOMAIN_SHARED: local_status = embb_mtapi_attr_get_mtapi_boolean_t( &local_queue->attributes.domain_shared, attribute, attribute_size); break; default: local_status = MTAPI_ERR_ATTR_NUM; break; } } } else { local_status = MTAPI_ERR_QUEUE_INVALID; } } else { local_status = MTAPI_ERR_NODE_NOTINIT; } mtapi_status_set(status, local_status); } mtapi_queue_hndl_t mtapi_queue_get( MTAPI_IN mtapi_queue_id_t queue_id, MTAPI_IN mtapi_domain_t domain_id, MTAPI_OUT mtapi_status_t* status) { mtapi_queue_hndl_t queue_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID }; mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; EMBB_UNUSED(domain_id); embb_mtapi_log_trace("mtapi_queue_get() called\n"); if (embb_mtapi_node_is_initialized()) { embb_mtapi_node_t* node = embb_mtapi_node_get_instance(); mtapi_uint_t ii = 0; local_status = MTAPI_ERR_QUEUE_INVALID; for (ii = 0; ii < node->attributes.max_queues; ii++) { if (queue_id == node->queue_pool->storage[ii].queue_id) { queue_hndl = node->queue_pool->storage[ii].handle; local_status = MTAPI_SUCCESS; break; } } } else { local_status = MTAPI_ERR_NODE_NOTINIT; } mtapi_status_set(status, local_status); return queue_hndl; } void mtapi_queue_delete( MTAPI_IN mtapi_queue_hndl_t queue, MTAPI_IN mtapi_timeout_t timeout, MTAPI_OUT mtapi_status_t* status) { mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; embb_mtapi_log_trace("mtapi_queue_delete() called\n"); if (embb_mtapi_node_is_initialized()) { embb_mtapi_node_t* node = embb_mtapi_node_get_instance(); if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) { embb_mtapi_queue_t* local_queue = embb_mtapi_queue_pool_get_storage_for_handle( node->queue_pool, queue); embb_mtapi_thread_context_t * context = NULL; embb_duration_t wait_duration; embb_time_t end_time; if (MTAPI_INFINITE < timeout) { embb_duration_set_milliseconds( &wait_duration, (unsigned long long)timeout); embb_time_in(&end_time, &wait_duration); } /* find out on which thread we are */ context = embb_mtapi_scheduler_get_current_thread_context( node->scheduler); /* cancel all tasks */ embb_mtapi_scheduler_process_tasks( node->scheduler, embb_mtapi_queue_delete_visitor, local_queue); /* wait for tasks in queue to finish */ local_status = MTAPI_SUCCESS; while (0 != embb_atomic_load_int(&local_queue->num_tasks)) { if (MTAPI_INFINITE < timeout) { embb_time_t current_time; embb_time_now(¤t_time); if (embb_time_compare(¤t_time, &end_time) > 0) { /* timeout! */ local_status = MTAPI_TIMEOUT; break; } } /* do other work if applicable */ embb_mtapi_scheduler_execute_task_or_yield( node->scheduler, node, context); } /* delete queue */ embb_mtapi_queue_finalize(local_queue); embb_mtapi_queue_pool_deallocate(node->queue_pool, local_queue); } else { local_status = MTAPI_ERR_QUEUE_INVALID; } } else { local_status = MTAPI_ERR_NODE_NOTINIT; } mtapi_status_set(status, local_status); } void mtapi_queue_disable( MTAPI_IN mtapi_queue_hndl_t queue, MTAPI_IN mtapi_timeout_t timeout, MTAPI_OUT mtapi_status_t* status) { mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; embb_mtapi_log_trace("mtapi_queue_disable() called\n"); if (embb_mtapi_node_is_initialized()) { embb_mtapi_node_t* node = embb_mtapi_node_get_instance(); if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) { embb_mtapi_queue_t* local_queue = embb_mtapi_queue_pool_get_storage_for_handle( node->queue_pool, queue); embb_atomic_store_char(&local_queue->enabled, MTAPI_FALSE); /* cancel or retain all tasks scheduled via queue */ embb_mtapi_scheduler_process_tasks( node->scheduler, embb_mtapi_queue_disable_visitor, local_queue); /* if queue is not retaining, wait for all tasks to finish */ if (MTAPI_FALSE == local_queue->attributes.retain) { /* find out on which thread we are */ embb_mtapi_thread_context_t * context = embb_mtapi_scheduler_get_current_thread_context(node->scheduler); embb_duration_t wait_duration; embb_time_t end_time; if (MTAPI_INFINITE < timeout) { embb_duration_set_milliseconds( &wait_duration, (unsigned long long)timeout); embb_time_in(&end_time, &wait_duration); } /* wait for tasks in queue to finish */ local_status = MTAPI_SUCCESS; while (0 != embb_atomic_load_int(&local_queue->num_tasks)) { if (MTAPI_INFINITE < timeout) { embb_time_t current_time; embb_time_now(¤t_time); if (embb_time_compare(¤t_time, &end_time) > 0) { /* timeout! */ local_status = MTAPI_TIMEOUT; break; } } /* do other work if applicable */ embb_mtapi_scheduler_execute_task_or_yield( node->scheduler, node, context); } } else { local_status = MTAPI_SUCCESS; } } else { local_status = MTAPI_ERR_QUEUE_INVALID; } } else { local_status = MTAPI_ERR_NODE_NOTINIT; } mtapi_status_set(status, local_status); } void mtapi_queue_enable( MTAPI_IN mtapi_queue_hndl_t queue, MTAPI_OUT mtapi_status_t* status) { mtapi_status_t local_status = MTAPI_ERR_UNKNOWN; embb_mtapi_log_trace("mtapi_queue_enable() called\n"); if (embb_mtapi_node_is_initialized()) { embb_mtapi_node_t* node = embb_mtapi_node_get_instance(); if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) { embb_mtapi_queue_t* local_queue = embb_mtapi_queue_pool_get_storage_for_handle( node->queue_pool, queue); embb_atomic_store_char(&local_queue->enabled, MTAPI_TRUE); local_status = MTAPI_SUCCESS; if (local_queue->attributes.retain) { /* reschedule retained tasks */ embb_mtapi_scheduler_process_tasks( node->scheduler, embb_mtapi_queue_enable_visitor, local_queue); } } else { local_status = MTAPI_ERR_QUEUE_INVALID; } } else { local_status = MTAPI_ERR_NODE_NOTINIT; } mtapi_status_set(status, local_status); }