Commit 5d6e1751 by Tobias Langer

Added global task queue.

parent 60d5af97
...@@ -126,6 +126,12 @@ void mtapi_initialize( ...@@ -126,6 +126,12 @@ void mtapi_initialize(
local_status = MTAPI_ERR_NODE_INITFAILED; local_status = MTAPI_ERR_NODE_INITFAILED;
} }
#ifdef EMBB_HARD_REALTIME
embb_mtapi_task_queue_initialize_with_capacity(
&node->global_task_queue,
node->attributes.max_tasks);
#endif
if (local_status == MTAPI_SUCCESS) { if (local_status == MTAPI_SUCCESS) {
/* initialize scheduler for local node */ /* initialize scheduler for local node */
node->scheduler = embb_mtapi_scheduler_new(); node->scheduler = embb_mtapi_scheduler_new();
...@@ -199,6 +205,10 @@ void mtapi_finalize(MTAPI_OUT mtapi_status_t* status) { ...@@ -199,6 +205,10 @@ void mtapi_finalize(MTAPI_OUT mtapi_status_t* status) {
embb_mtapi_job_finalize_list(node); embb_mtapi_job_finalize_list(node);
} }
#ifdef EMBB_HARD_REALTIME
embb_mtapi_task_queue_finalize(&node->global_task_queue);
#endif
/* free system instance */ /* free system instance */
embb_mtapi_alloc_deallocate(node); embb_mtapi_alloc_deallocate(node);
embb_mtapi_node_instance = MTAPI_NULL; embb_mtapi_node_instance = MTAPI_NULL;
......
...@@ -32,6 +32,10 @@ ...@@ -32,6 +32,10 @@
#include <embb_mtapi_log.h> #include <embb_mtapi_log.h>
#ifdef EMBB_HARD_REALTIME
#include <embb_mtapi_task_queue_t.h>
#endif /*EMBB_HARD_REALTIME*/
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -71,6 +75,9 @@ struct embb_mtapi_node_struct { ...@@ -71,6 +75,9 @@ struct embb_mtapi_node_struct {
embb_mtapi_queue_pool_t * queue_pool; embb_mtapi_queue_pool_t * queue_pool;
embb_atomic_int is_scheduler_running; embb_atomic_int is_scheduler_running;
mtapi_affinity_t affinity_all; mtapi_affinity_t affinity_all;
#ifdef EMBB_HARD_REALTIME
embb_mtapi_task_queue_t global_task_queue;
#endif /*EMBB_HARD_REALTIME*/
}; };
#include <embb_mtapi_node_t_fwd.h> #include <embb_mtapi_node_t_fwd.h>
......
...@@ -37,6 +37,9 @@ ...@@ -37,6 +37,9 @@
#include <embb_mtapi_thread_context_t.h> #include <embb_mtapi_thread_context_t.h>
#include <embb_mtapi_task_context_t.h> #include <embb_mtapi_task_context_t.h>
#include <embb_mtapi_task_t.h> #include <embb_mtapi_task_t.h>
#ifdef EMBB_HARD_REALTIME
#include <embb_mtapi_task_queue_t.h>
#endif /*EMBB_HARD_REALTIME*/
#include <embb_mtapi_action_t.h> #include <embb_mtapi_action_t.h>
#include <embb_mtapi_alloc.h> #include <embb_mtapi_alloc.h>
#include <embb_mtapi_queue_t.h> #include <embb_mtapi_queue_t.h>
...@@ -611,67 +614,76 @@ mtapi_boolean_t embb_mtapi_scheduler_schedule_task( ...@@ -611,67 +614,76 @@ mtapi_boolean_t embb_mtapi_scheduler_schedule_task(
assert(MTAPI_NULL != node); assert(MTAPI_NULL != node);
if (embb_mtapi_action_pool_is_handle_valid( #ifdef EMBB_HARD_REALTIME
node->action_pool, task->action)) { if (scheduler->mode == GLOBAL_EDF) {
embb_mtapi_queue_t* local_queue = MTAPI_NULL; embb_mtapi_task_queue_push(&node->global_task_queue, task);
/* fetch action and schedule */ pushed = MTAPI_TRUE;
embb_mtapi_action_t* local_action = } else {
embb_mtapi_action_pool_get_storage_for_handle( #endif /*EMBB_HARD_REALTIME*/
node->action_pool, task->action); if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
mtapi_affinity_t affinity = embb_mtapi_queue_t* local_queue = MTAPI_NULL;
local_action->attributes.affinity & task->attributes.affinity; /* fetch action and schedule */
embb_mtapi_action_t* local_action =
/* check if task is running from an ordered queue */ embb_mtapi_action_pool_get_storage_for_handle(
if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, task->queue)) { node->action_pool, task->action);
local_queue = embb_mtapi_queue_pool_get_storage_for_handle(
node->queue_pool, task->queue); mtapi_affinity_t affinity =
if (local_queue->attributes.ordered) { local_action->attributes.affinity & task->attributes.affinity;
/* yes, modify affinity accordingly */
affinity = local_queue->ordered_affinity; /* check if task is running from an ordered queue */
if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, task->queue)) {
local_queue = embb_mtapi_queue_pool_get_storage_for_handle(
node->queue_pool, task->queue);
if (local_queue->attributes.ordered) {
/* yes, modify affinity accordingly */
affinity = local_queue->ordered_affinity;
}
} }
}
/* check affinity */
if (affinity == 0) {
affinity = node->affinity_all;
}
/* one more task in flight for this action */ /* check affinity */
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1); if (affinity == 0) {
affinity = node->affinity_all;
}
if (affinity == node->affinity_all) { /* one more task in flight for this action */
/* no affinity restrictions, schedule for stealing */ embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
pushed = embb_mtapi_task_queue_push(
scheduler->worker_contexts[ii].queue[task->attributes.priority], if (affinity == node->affinity_all) {
task); /* no affinity restrictions, schedule for stealing */
} else { pushed = embb_mtapi_task_queue_push(
mtapi_status_t affinity_status; scheduler->worker_contexts[ii].queue[task->attributes.priority],
task);
/* affinity is restricted, check and adapt scheduling target */ } else {
ii = (mtapi_uint_t)embb_atomic_fetch_and_add_int( mtapi_status_t affinity_status;
&scheduler->affine_task_counter, 1);
while (MTAPI_FALSE == mtapi_affinity_get( /* affinity is restricted, check and adapt scheduling target */
&affinity, ii, &affinity_status)) { ii = (mtapi_uint_t)embb_atomic_fetch_and_add_int(
ii = (ii + 1) % scheduler->worker_count; &scheduler->affine_task_counter, 1);
while (MTAPI_FALSE == mtapi_affinity_get(
&affinity, ii, &affinity_status)) {
ii = (ii + 1) % scheduler->worker_count;
}
/* schedule into private queue to disable stealing */
pushed = embb_mtapi_task_queue_push(
scheduler->worker_contexts[ii].private_queue[task->attributes.priority],
task);
} }
/* schedule into private queue to disable stealing */
pushed = embb_mtapi_task_queue_push(
scheduler->worker_contexts[ii].private_queue[task->attributes.priority],
task);
}
if (pushed) { if (pushed) {
/* signal the worker thread a task was pushed to */ /* signal the worker thread a task was pushed to */
if (embb_atomic_load_int(&scheduler->worker_contexts[ii].is_sleeping)) { if (embb_atomic_load_int(&scheduler->worker_contexts[ii].is_sleeping)) {
embb_condition_notify_one( embb_condition_notify_one(
&scheduler->worker_contexts[ii].work_available); &scheduler->worker_contexts[ii].work_available);
}
} else {
/* task could not be launched */
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
} }
} else {
/* task could not be launched */
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
} }
#ifdef EMBB_HARD_REALTIME
} }
#endif /*EMBB_HARD_REALTIME*/
return pushed; return pushed;
} }
...@@ -58,6 +58,10 @@ enum embb_mtapi_scheduler_mode_enum { ...@@ -58,6 +58,10 @@ enum embb_mtapi_scheduler_mode_enum {
WORK_STEAL_VHPF = 0, WORK_STEAL_VHPF = 0,
// Local First. Steal if all local queues are empty. // Local First. Steal if all local queues are empty.
WORK_STEAL_LF = 1, WORK_STEAL_LF = 1,
#ifdef EMBB_HARD_REALTIME
// Global EDF. No work stealing
GLOBAL_EDF = 2,
#endif /*EMBB_HARD_REALTIME*/
NUM_SCHEDULER_MODES NUM_SCHEDULER_MODES
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment