diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h index 3a82187..9f0c23c 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h @@ -46,6 +46,9 @@ class Scheduler { int process_id, Action & action, embb::mtapi::ExecutionPolicy const & policy) = 0; + virtual void Run( + Action & action, + embb::mtapi::ExecutionPolicy const & policy) = 0; virtual void WaitForSlice(int slice) = 0; virtual int GetSlices() = 0; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h index 05ea5d9..61e80ae 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h @@ -105,6 +105,16 @@ class SchedulerMTAPI : public Scheduler { group_[idx].Start(job_, &action, static_cast(NULL), task_attr); } + virtual void Run( + Action & action, + embb::mtapi::ExecutionPolicy const & policy) { + embb::mtapi::Node & node = embb::mtapi::Node::GetInstance(); + embb::mtapi::TaskAttributes task_attr; + task_attr.SetPolicy(policy); + embb::mtapi::Task task = node.Start(job_, &action, static_cast(NULL), + task_attr); + task.Wait(); + } virtual void Enqueue( int process_id, Action & action, diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h index 04a6c0e..1e150cc 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h @@ -43,6 +43,11 @@ class SchedulerSequential : public Scheduler { embb::mtapi::ExecutionPolicy const &) { action.RunSequential(); } + virtual void Run( + Action & action, + embb::mtapi::ExecutionPolicy const &) { + action.RunSequential(); + } virtual void Enqueue( int, Action & action, diff --git a/dataflow_cpp/include/embb/dataflow/internal/source.h b/dataflow_cpp/include/embb/dataflow/internal/source.h index 26d3734..b2c883a 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source.h @@ -32,6 +32,7 @@ #include #include #include +#include namespace embb { namespace dataflow { @@ -67,7 +68,8 @@ class Source< Outputs > virtual bool Start(int clock) { if (not_done_) { - Run(clock); + Action act(this, clock); + sched_->Run(act, embb::mtapi::ExecutionPolicy()); } return not_done_; } diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.cc b/dataflow_cpp/test/dataflow_cpp_test_simple.cc index c0b56db..f4eea87 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.cc +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.cc @@ -150,11 +150,12 @@ SimpleTest::SimpleTest() { #define MTAPI_DOMAIN_ID 1 #define MTAPI_NODE_ID 1 -void SimpleTest::TestBasic() { +void SimpleTest::TrySimple(bool reuse_main_thread) { // All available cores embb::base::CoreSet core_set(true); embb::mtapi::NodeAttributes node_attr; node_attr + .SetReuseMainThread(reuse_main_thread ? MTAPI_TRUE : MTAPI_FALSE) .SetCoreAffinity(core_set) .SetMaxQueues(2); embb::mtapi::Node::Initialize( @@ -162,7 +163,7 @@ void SimpleTest::TestBasic() { MTAPI_NODE_ID, node_attr); - for (int ii = 0; ii < 10000; ii++) { + for (int ii = 0; ii < 1000; ii++) { ArraySink asink; MyNetwork network(NUM_SLICES); MyConstantSource constant(network, 4); @@ -225,3 +226,7 @@ void SimpleTest::TestBasic() { PT_EXPECT(embb_get_bytes_allocated() == 0); } +void SimpleTest::TestBasic() { + TrySimple(false); + TrySimple(true); +} diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.h b/dataflow_cpp/test/dataflow_cpp_test_simple.h index d4fc9c1..dce1b99 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.h +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.h @@ -35,6 +35,8 @@ class SimpleTest : public partest::TestCase { private: void TestBasic(); + + void TrySimple(bool reuse_main_thread); }; #endif // DATAFLOW_CPP_TEST_DATAFLOW_CPP_TEST_SIMPLE_H_ diff --git a/mtapi_c/src/embb_mtapi_scheduler_t.c b/mtapi_c/src/embb_mtapi_scheduler_t.c index fb5bb30..1d3321a 100644 --- a/mtapi_c/src/embb_mtapi_scheduler_t.c +++ b/mtapi_c/src/embb_mtapi_scheduler_t.c @@ -213,6 +213,102 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context( return context; } +mtapi_boolean_t embb_mtapi_scheduler_execute_task( + embb_mtapi_task_t * task, + embb_mtapi_node_t * node, + embb_mtapi_thread_context_t * thread_context) { + embb_mtapi_task_context_t task_context; + mtapi_boolean_t result = MTAPI_FALSE; + embb_mtapi_queue_t * local_queue = MTAPI_NULL; + embb_mtapi_group_t * local_group = MTAPI_NULL; + embb_mtapi_action_t * local_action = MTAPI_NULL; + + /* is task associated with a queue? */ + if (embb_mtapi_queue_pool_is_handle_valid( + node->queue_pool, task->queue)) { + local_queue = + embb_mtapi_queue_pool_get_storage_for_handle( + node->queue_pool, task->queue); + } + + /* is task associated with a group? */ + if (embb_mtapi_group_pool_is_handle_valid( + node->group_pool, task->group)) { + local_group = + embb_mtapi_group_pool_get_storage_for_handle( + node->group_pool, task->group); + } + + if (embb_mtapi_action_pool_is_handle_valid( + node->action_pool, task->action)) { + local_action = + embb_mtapi_action_pool_get_storage_for_handle( + node->action_pool, task->action); + } + + switch (embb_atomic_load_int(&task->state)) { + case MTAPI_TASK_SCHEDULED: + /* multi-instance task, another instance might be running */ + case MTAPI_TASK_RUNNING: + /* there was work, execute it */ + embb_mtapi_task_context_initialize_with_thread_context_and_task( + &task_context, thread_context, task); + if (embb_mtapi_task_execute(task, &task_context)) { + /* tell queue that a task is done */ + if (MTAPI_NULL != local_queue) { + embb_mtapi_queue_task_finished(local_queue); + } + } + result = MTAPI_TRUE; + break; + + case MTAPI_TASK_RETAINED: + /* put task into queue again for later execution */ + embb_mtapi_scheduler_schedule_task( + node->scheduler, task, 0); + /* yield, as there may be only retained tasks in the queue */ + embb_thread_yield(); + /* task is not done, so do not notify queue */ + break; + + case MTAPI_TASK_CANCELLED: + /* set return value to cancelled */ + task->error_code = MTAPI_ERR_ACTION_CANCELLED; + if (embb_atomic_fetch_and_add_unsigned_int( + &task->instances_todo, (unsigned int)-1) == 0) { + /* tell queue that a task is done */ + if (MTAPI_NULL != local_queue) { + embb_mtapi_queue_task_finished(local_queue); + } + if (MTAPI_NULL != local_group) { + embb_mtapi_task_queue_push(&local_group->queue, task); + } + } + if (MTAPI_NULL != local_action) { + embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); + } + break; + + case MTAPI_TASK_COMPLETED: + case MTAPI_TASK_DELETED: + case MTAPI_TASK_WAITING: + case MTAPI_TASK_CREATED: + case MTAPI_TASK_PRENATAL: + case MTAPI_TASK_ERROR: + case MTAPI_TASK_INTENTIONALLY_UNUSED: + default: + /* do nothing, although this is an error */ + break; + } + + /* issue task complete callback if set */ + if (MTAPI_NULL != task->attributes.complete_func) { + task->attributes.complete_func(task->handle, MTAPI_NULL); + } + + return result; +} + void embb_mtapi_scheduler_execute_task_or_yield( embb_mtapi_scheduler_t * that, embb_mtapi_node_t * node, @@ -225,10 +321,7 @@ void embb_mtapi_scheduler_execute_task_or_yield( that, node, thread_context); /* if there was work, execute it */ if (MTAPI_NULL != new_task) { - embb_mtapi_task_context_t task_context; - embb_mtapi_task_context_initialize_with_thread_context_and_task( - &task_context, thread_context, new_task); - embb_mtapi_task_execute(new_task, &task_context); + embb_mtapi_scheduler_execute_task(new_task, node, thread_context); } else { embb_thread_yield(); } @@ -253,7 +346,6 @@ embb_mtapi_scheduler_worker_func(embb_mtapi_scheduler_t * that) { int embb_mtapi_scheduler_worker(void * arg) { embb_mtapi_thread_context_t * thread_context = (embb_mtapi_thread_context_t*)arg; - embb_mtapi_task_context_t task_context; embb_mtapi_node_t * node; embb_duration_t sleep_duration; int err; @@ -293,91 +385,8 @@ int embb_mtapi_scheduler_worker(void * arg) { node->scheduler, node, thread_context); /* check if there was work */ if (MTAPI_NULL != task) { - embb_mtapi_queue_t * local_queue = MTAPI_NULL; - embb_mtapi_group_t * local_group = MTAPI_NULL; - embb_mtapi_action_t * local_action = MTAPI_NULL; - - /* is task associated with a queue? */ - if (embb_mtapi_queue_pool_is_handle_valid( - node->queue_pool, task->queue)) { - local_queue = - embb_mtapi_queue_pool_get_storage_for_handle( - node->queue_pool, task->queue); - } - - /* is task associated with a group? */ - if (embb_mtapi_group_pool_is_handle_valid( - node->group_pool, task->group)) { - local_group = - embb_mtapi_group_pool_get_storage_for_handle( - node->group_pool, task->group); - } - - if (embb_mtapi_action_pool_is_handle_valid( - node->action_pool, task->action)) { - local_action = - embb_mtapi_action_pool_get_storage_for_handle( - node->action_pool, task->action); - } - - switch (embb_atomic_load_int(&task->state)) { - case MTAPI_TASK_SCHEDULED: - /* multi-instance task, another instance might be running */ - case MTAPI_TASK_RUNNING: - /* there was work, execute it */ - embb_mtapi_task_context_initialize_with_thread_context_and_task( - &task_context, thread_context, task); - if (embb_mtapi_task_execute(task, &task_context)) { - /* tell queue that a task is done */ - if (MTAPI_NULL != local_queue) { - embb_mtapi_queue_task_finished(local_queue); - } - } + if (embb_mtapi_scheduler_execute_task(task, node, thread_context)) { counter = 0; - break; - - case MTAPI_TASK_RETAINED: - /* put task into queue again for later execution */ - embb_mtapi_scheduler_schedule_task( - node->scheduler, task, 0); - /* yield, as there may be only retained tasks in the queue */ - embb_thread_yield(); - /* task is not done, so do not notify queue */ - break; - - case MTAPI_TASK_CANCELLED: - /* set return value to cancelled */ - task->error_code = MTAPI_ERR_ACTION_CANCELLED; - if (embb_atomic_fetch_and_add_unsigned_int( - &task->instances_todo, (unsigned int)-1) == 0) { - /* tell queue that a task is done */ - if (MTAPI_NULL != local_queue) { - embb_mtapi_queue_task_finished(local_queue); - } - if (MTAPI_NULL != local_group) { - embb_mtapi_task_queue_push(&local_group->queue, task); - } - } - if (MTAPI_NULL != local_action) { - embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1); - } - break; - - case MTAPI_TASK_COMPLETED: - case MTAPI_TASK_DELETED: - case MTAPI_TASK_WAITING: - case MTAPI_TASK_CREATED: - case MTAPI_TASK_PRENATAL: - case MTAPI_TASK_ERROR: - case MTAPI_TASK_INTENTIONALLY_UNUSED: - default: - /* do nothing, although this is an error */ - break; - } - - /* issue task complete callback if set */ - if (MTAPI_NULL != task->attributes.complete_func) { - task->attributes.complete_func(task->handle, MTAPI_NULL); } if (MTAPI_TRUE == task->attributes.is_detached) { embb_mtapi_task_delete(task, node->task_pool); diff --git a/mtapi_c/src/embb_mtapi_scheduler_t.h b/mtapi_c/src/embb_mtapi_scheduler_t.h index ac94a0d..c56688b 100644 --- a/mtapi_c/src/embb_mtapi_scheduler_t.h +++ b/mtapi_c/src/embb_mtapi_scheduler_t.h @@ -138,6 +138,15 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context( embb_mtapi_scheduler_t * that); /** + * Executes the given task if the thread context is valid. + * \memberof embb_mtapi_scheduler_struct + */ +mtapi_boolean_t embb_mtapi_scheduler_execute_task( + embb_mtapi_task_t * task, + embb_mtapi_node_t * node, + embb_mtapi_thread_context_t * thread_context); + +/** * Fetches and executes a single task if the thread context is valid, * yields otherwise. * \memberof embb_mtapi_scheduler_struct diff --git a/mtapi_cpp/include/embb/mtapi/node_attributes.h b/mtapi_cpp/include/embb/mtapi/node_attributes.h index 9c6c0ab..bb96c12 100644 --- a/mtapi_cpp/include/embb/mtapi/node_attributes.h +++ b/mtapi_cpp/include/embb/mtapi/node_attributes.h @@ -43,6 +43,8 @@ class NodeAttributes { public: /** * Constructs a NodeAttributes object. + * + * \waitfree */ NodeAttributes() { mtapi_status_t status; @@ -52,6 +54,8 @@ class NodeAttributes { /** * Copies a NodeAttributes object. + * + * \waitfree */ NodeAttributes( NodeAttributes const & other /**< The NodeAttributes to copy. */ @@ -62,6 +66,8 @@ class NodeAttributes { /** * Copies a NodeAttributes object. + * + * \waitfree */ void operator=( NodeAttributes const & other /**< The NodeAttributes to copy. */ @@ -233,6 +239,22 @@ class NodeAttributes { } /** + * Enables or disables the reuse of the main thread as a worker. + * + * \returns Reference to this object. + * \notthreadsafe + */ + NodeAttributes & SetReuseMainThread( + mtapi_boolean_t reuse + ) { + mtapi_status_t status; + mtapi_nodeattr_set(&attributes_, MTAPI_NODE_REUSE_MAIN_THREAD, + &reuse, sizeof(reuse), &status); + internal::CheckStatus(status); + return *this; + } + + /** * Returns the internal representation of this object. * Allows for interoperability with the C interface. *