Commit 429e58d6 by Tobias Langer

Added implementation of priority queue

A blocking binary heap based priority queue for tasks with deadlines has been
added. The priority queue sorts incoming tasks according to their
increasing deadline attribute.
parent 47bb3fd9
/*
* Copyright (c) 2014-2016, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
//#ifdef EMBB_HARD_REALTIME
#include <stdio.h>
#include <assert.h>
#include <embb/mtapi/c/mtapi.h>
#include <embb/base/c/atomic.h>
#include <embb_mtapi_log.h>
#include <embb_mtapi_priority_queue_t.h>
#include <embb_mtapi_task_t.h>
#include <embb_mtapi_alloc.h>
#include <embb_mtapi_priority_queue_t_fwd.h>
/* ---- LOCAL HELPERS ------------------------------------------------------ */
static mtapi_uint64_t get_deadline(mtapi_task_hndl_t node) {
mtapi_status_t status;
mtapi_uint64_t deadline;
mtapi_task_get_attribute(
node,
MTAPI_TASK_DEADLINE,
&deadline,
sizeof(deadline),
&status);
if(status != MTAPI_SUCCESS) {
// TODO error handling
return -1;
}
return deadline;
}
/* ---- CLASS MEMBERS ------------------------------------------------------ */
void embb_mtapi_priority_queue_initialize(embb_mtapi_priority_queue_t* that) {
assert(MTAPI_NULL != that);
that->task_buffer = MTAPI_NULL;
that->tasks_available = 0;
mtapi_queueattr_init(&that->attributes, MTAPI_NULL);
embb_spin_init(&that->lock);
}
void embb_mtapi_priority_queue_initialize_with_capacity(
embb_mtapi_priority_queue_t* that,
mtapi_uint_t capacity) {
mtapi_uint_t ii;
assert(MTAPI_NULL != that);
that->task_buffer = (embb_mtapi_task_t **)
embb_mtapi_alloc_allocate(sizeof(embb_mtapi_task_t *)*capacity);
/* Nullify the array, just in case */
for(ii = 0; ii < capacity; ii++) {
that->task_buffer[ii] = MTAPI_NULL;
}
that->tasks_available = 0;
mtapi_queueattr_init(&that->attributes, MTAPI_NULL);
that->attributes.limit = capacity;
embb_spin_init(&that->lock);
}
void embb_mtapi_priority_queue_finalize(embb_mtapi_priority_queue_t* that) {
embb_mtapi_alloc_deallocate(that->task_buffer);
that->task_buffer = MTAPI_NULL;
embb_mtapi_priority_queue_initialize(that);
embb_spin_destroy(&that->lock);
}
embb_mtapi_task_t * embb_mtapi_priority_queue_pop(embb_mtapi_priority_queue_t* that) {
embb_mtapi_task_t * task = MTAPI_NULL;
embb_mtapi_task_t * tmp_task = MTAPI_NULL;
mtapi_uint_t ii;
mtapi_uint64_t deadline = 0;
mtapi_uint64_t left_deadline = 0;
mtapi_uint64_t right_deadline = 0;
mtapi_uint64_t swap = 0;
mtapi_uint64_t swap_deadline = 0;
assert(MTAPI_NULL != that);
if (embb_spin_try_lock(&that->lock, 128) == EMBB_SUCCESS) {
if (0 < that->tasks_available) {
ii = 0;
/* take away one task */
task = that->task_buffer[0];
that->task_buffer[0] = MTAPI_NULL;
that->tasks_available--;
/* move last task to head */
that->task_buffer[0] = that->task_buffer[that->tasks_available];
that->task_buffer[that->tasks_available] = MTAPI_NULL;
/* recreate heap property */
while(that->tasks_available > 0) {
deadline = get_deadline(that->task_buffer[ii]->handle);
if(that->task_buffer[ii + 1] != MTAPI_NULL) {
left_deadline = get_deadline(that->task_buffer[ii + 1]->handle);
} else {
left_deadline = ULLONG_MAX;
}
if(that->task_buffer[ii + 2] != MTAPI_NULL) {
right_deadline = get_deadline(that->task_buffer[ii + 2]->handle);
} else {
right_deadline = ULLONG_MAX;
}
/* min Heap, swap with the smaller of both children. */
if(left_deadline <= right_deadline) {
swap = ii + 1;
swap_deadline = left_deadline;
} else if(right_deadline < left_deadline) {
swap = ii + 2;
swap_deadline = right_deadline;
}
/* abort if heap property is restored. */
if(deadline <= swap_deadline) {
break;
}
tmp_task = that->task_buffer[swap];
that->task_buffer[swap] = that->task_buffer[ii];
that->task_buffer[ii] = tmp_task;
ii = swap;
}
}
embb_spin_unlock(&that->lock);
}
return task;
}
mtapi_boolean_t embb_mtapi_priority_queue_push(
embb_mtapi_priority_queue_t* that,
embb_mtapi_task_t * task) {
embb_mtapi_task_t * tmp_task = MTAPI_NULL;
mtapi_uint_t ii;
mtapi_uint_t kk;
mtapi_uint64_t deadline = 0;
mtapi_uint64_t parent_deadline = 0;
mtapi_boolean_t result = MTAPI_FALSE;
assert(MTAPI_NULL != that);
if (embb_spin_lock(&that->lock) == EMBB_SUCCESS) {
if (that->attributes.limit > that->tasks_available) {
ii = that->tasks_available;
kk = that->tasks_available;
deadline = get_deadline(task->handle);
/* add task to heap */
that->task_buffer[that->tasks_available] = task;
/* restore heap property */
while(ii > 0) {
/* get parent task */
ii /= 2;
parent_deadline = get_deadline(that->task_buffer[ii]->handle);
/* abort if heap property is restored. */
if(deadline >= parent_deadline) {
break;
}
/* swap with parent task */
tmp_task = that->task_buffer[ii];
that->task_buffer[ii] = that->task_buffer[kk];
that->task_buffer[kk] = tmp_task;
kk = ii;
}
/* make task available */
that->tasks_available++;
result = MTAPI_TRUE;
}
embb_spin_unlock(&that->lock);
}
return result;
}
mtapi_boolean_t embb_mtapi_priority_queue_process(
embb_mtapi_priority_queue_t * that,
embb_mtapi_task_visitor_function_t process,
void * user_data) {
mtapi_boolean_t result = MTAPI_TRUE;
mtapi_uint_t ii;
assert(MTAPI_NULL != that);
assert(MTAPI_NULL != process);
if (embb_spin_lock(&that->lock) == EMBB_SUCCESS) {
// Run from first to last
for (ii = 0; ii < that->tasks_available; ii++) {
result = process(that->task_buffer[ii], user_data);
if (MTAPI_FALSE == result) {
break;
}
}
embb_spin_unlock(&that->lock);
}
return result;
}
//#endif // EMBB_HARD_REALTIME
/*
* Copyright (c) 2014-2016, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
//#ifdef EMBB_HARD_REALTIME
#ifndef MTAPI_C_SRC_EMBB_MTAPI_PRIORITY_QUEUE_T_H_
#define MTAPI_C_SRC_EMBB_MTAPI_PRIORITY_QUEUE_T_H_
#include <embb/mtapi/c/mtapi.h>
#include <embb/base/c/atomic.h>
#include <embb/base/c/mutex.h>
#include <embb_mtapi_task_visitor_function_t.h>
#ifdef __cplusplus
extern "C" {
#endif
/* ---- FORWARD DECLARATIONS ----------------------------------------------- */
#include <embb_mtapi_task_t_fwd.h>
/* ---- CLASS DECLARATION -------------------------------------------------- */
/**
* \internal
* Priority queue class.
*
* \ingroup INTERNAL
*/
struct embb_mtapi_priority_queue_struct {
embb_mtapi_task_t ** task_buffer;
mtapi_uint_t tasks_available;
mtapi_queue_attributes_t attributes;
embb_spinlock_t lock;
};
#include <embb_mtapi_priority_queue_t_fwd.h>
/**
* Default constructor.
* \memberof embb_mtapi_priority_queue_struct
*/
void embb_mtapi_priority_queue_initialize(embb_mtapi_priority_queue_t* that);
/**
* Constructor with configurable capacity.
* \memberof embb_mtapi_priority_queue_struct
*/
void embb_mtapi_priority_queue_initialize_with_capacity(
embb_mtapi_priority_queue_t* that,
mtapi_uint_t capacity);
/**
* Destructor.
* \memberof embb_mtapi_priority_queue_struct
*/
void embb_mtapi_priority_queue_finalize(embb_mtapi_priority_queue_t* that);
/**
* Pop a task from the queue. Returns MTAPI_NULL if the queue is empty.
* \memberof embb_mtapi_priority_queue_struct
*/
embb_mtapi_task_t * embb_mtapi_priority_queue_pop(
embb_mtapi_priority_queue_t* that);
/**
* Push a task into the queue. Returns MTAPI_TRUE if successfull and
* MTAPI_FALSE if the queue is full or cannot be locked in time.
* \memberof embb_mtapi_priority_queue_struct
*/
mtapi_boolean_t embb_mtapi_priority_queue_push(
embb_mtapi_priority_queue_t* that,
embb_mtapi_task_t * task);
/**
* Process all elements of the priority queue using the given functor.
* \memberof embb_mtapi_priority_queue_struct
*/
mtapi_boolean_t embb_mtapi_priority_queue_process(
embb_mtapi_priority_queue_t * that,
embb_mtapi_task_visitor_function_t process,
void * user_data);
#ifdef __cplusplus
}
#endif
#endif // MTAPI_C_SRC_EMBB_MTAPI_PRIORITY_QUEUE_T_H_
//#endif // EMBB_HARD_REALTIME
/*
* Copyright (c) 2014-2016, Siemens AG. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
//#ifdef EMBB_HARD_REALTIME
#ifndef MTAPI_C_SRC_EMBB_MTAPI_PRIORITY_QUEUE_T_FWD_H_
#define MTAPI_C_SRC_EMBB_MTAPI_PRIORITY_QUEUE_T_FWD_H_
#ifdef __cplusplus
extern "C" {
#endif
/**
* Priority queue type.
* \memberof embb_mtapi_priority_queue_struct
*/
typedef struct embb_mtapi_priority_queue_struct embb_mtapi_priority_queue_t;
#ifdef __cplusplus
}
#endif
#endif // MTAPI_C_SRC_EMBB_MTAPI_PRIORITY_QUEUE_T_FWD_H_
//#endif // EMBB_HARD_REALTIME
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment