Commit 2fa83029 by Marcus Winter

Merge branch 'embb613_reuse_main_thread' into development

parents 5ada53ed 6b97196b
...@@ -46,6 +46,9 @@ class Scheduler { ...@@ -46,6 +46,9 @@ class Scheduler {
int process_id, int process_id,
Action & action, Action & action,
embb::mtapi::ExecutionPolicy const & policy) = 0; embb::mtapi::ExecutionPolicy const & policy) = 0;
virtual void Run(
Action & action,
embb::mtapi::ExecutionPolicy const & policy) = 0;
virtual void WaitForSlice(int slice) = 0; virtual void WaitForSlice(int slice) = 0;
virtual int GetSlices() = 0; virtual int GetSlices() = 0;
}; };
......
...@@ -105,6 +105,16 @@ class SchedulerMTAPI : public Scheduler { ...@@ -105,6 +105,16 @@ class SchedulerMTAPI : public Scheduler {
group_[idx].Start(job_, &action, static_cast<void*>(NULL), group_[idx].Start(job_, &action, static_cast<void*>(NULL),
task_attr); task_attr);
} }
virtual void Run(
Action & action,
embb::mtapi::ExecutionPolicy const & policy) {
embb::mtapi::Node & node = embb::mtapi::Node::GetInstance();
embb::mtapi::TaskAttributes task_attr;
task_attr.SetPolicy(policy);
embb::mtapi::Task task = node.Start(job_, &action, static_cast<void*>(NULL),
task_attr);
task.Wait();
}
virtual void Enqueue( virtual void Enqueue(
int process_id, int process_id,
Action & action, Action & action,
......
...@@ -43,6 +43,11 @@ class SchedulerSequential : public Scheduler { ...@@ -43,6 +43,11 @@ class SchedulerSequential : public Scheduler {
embb::mtapi::ExecutionPolicy const &) { embb::mtapi::ExecutionPolicy const &) {
action.RunSequential(); action.RunSequential();
} }
virtual void Run(
Action & action,
embb::mtapi::ExecutionPolicy const &) {
action.RunSequential();
}
virtual void Enqueue( virtual void Enqueue(
int, int,
Action & action, Action & action,
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <embb/dataflow/internal/node.h> #include <embb/dataflow/internal/node.h>
#include <embb/dataflow/internal/outputs.h> #include <embb/dataflow/internal/outputs.h>
#include <embb/dataflow/internal/source_executor.h> #include <embb/dataflow/internal/source_executor.h>
#include <embb/dataflow/internal/action.h>
namespace embb { namespace embb {
namespace dataflow { namespace dataflow {
...@@ -67,7 +68,8 @@ class Source< Outputs<O1, O2, O3, O4, O5> > ...@@ -67,7 +68,8 @@ class Source< Outputs<O1, O2, O3, O4, O5> >
virtual bool Start(int clock) { virtual bool Start(int clock) {
if (not_done_) { if (not_done_) {
Run(clock); Action act(this, clock);
sched_->Run(act, embb::mtapi::ExecutionPolicy());
} }
return not_done_; return not_done_;
} }
......
...@@ -150,11 +150,12 @@ SimpleTest::SimpleTest() { ...@@ -150,11 +150,12 @@ SimpleTest::SimpleTest() {
#define MTAPI_DOMAIN_ID 1 #define MTAPI_DOMAIN_ID 1
#define MTAPI_NODE_ID 1 #define MTAPI_NODE_ID 1
void SimpleTest::TestBasic() { void SimpleTest::TrySimple(bool reuse_main_thread) {
// All available cores // All available cores
embb::base::CoreSet core_set(true); embb::base::CoreSet core_set(true);
embb::mtapi::NodeAttributes node_attr; embb::mtapi::NodeAttributes node_attr;
node_attr node_attr
.SetReuseMainThread(reuse_main_thread ? MTAPI_TRUE : MTAPI_FALSE)
.SetCoreAffinity(core_set) .SetCoreAffinity(core_set)
.SetMaxQueues(2); .SetMaxQueues(2);
embb::mtapi::Node::Initialize( embb::mtapi::Node::Initialize(
...@@ -162,7 +163,7 @@ void SimpleTest::TestBasic() { ...@@ -162,7 +163,7 @@ void SimpleTest::TestBasic() {
MTAPI_NODE_ID, MTAPI_NODE_ID,
node_attr); node_attr);
for (int ii = 0; ii < 10000; ii++) { for (int ii = 0; ii < 1000; ii++) {
ArraySink<TEST_COUNT> asink; ArraySink<TEST_COUNT> asink;
MyNetwork network(NUM_SLICES); MyNetwork network(NUM_SLICES);
MyConstantSource constant(network, 4); MyConstantSource constant(network, 4);
...@@ -225,3 +226,7 @@ void SimpleTest::TestBasic() { ...@@ -225,3 +226,7 @@ void SimpleTest::TestBasic() {
PT_EXPECT(embb_get_bytes_allocated() == 0); PT_EXPECT(embb_get_bytes_allocated() == 0);
} }
void SimpleTest::TestBasic() {
TrySimple(false);
TrySimple(true);
}
...@@ -35,6 +35,8 @@ class SimpleTest : public partest::TestCase { ...@@ -35,6 +35,8 @@ class SimpleTest : public partest::TestCase {
private: private:
void TestBasic(); void TestBasic();
void TrySimple(bool reuse_main_thread);
}; };
#endif // DATAFLOW_CPP_TEST_DATAFLOW_CPP_TEST_SIMPLE_H_ #endif // DATAFLOW_CPP_TEST_DATAFLOW_CPP_TEST_SIMPLE_H_
...@@ -213,6 +213,102 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context( ...@@ -213,6 +213,102 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context(
return context; return context;
} }
mtapi_boolean_t embb_mtapi_scheduler_execute_task(
embb_mtapi_task_t * task,
embb_mtapi_node_t * node,
embb_mtapi_thread_context_t * thread_context) {
embb_mtapi_task_context_t task_context;
mtapi_boolean_t result = MTAPI_FALSE;
embb_mtapi_queue_t * local_queue = MTAPI_NULL;
embb_mtapi_group_t * local_group = MTAPI_NULL;
embb_mtapi_action_t * local_action = MTAPI_NULL;
/* is task associated with a queue? */
if (embb_mtapi_queue_pool_is_handle_valid(
node->queue_pool, task->queue)) {
local_queue =
embb_mtapi_queue_pool_get_storage_for_handle(
node->queue_pool, task->queue);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, task->group)) {
local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, task->group);
}
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
}
switch (embb_atomic_load_int(&task->state)) {
case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
case MTAPI_TASK_RUNNING:
/* there was work, execute it */
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, task);
if (embb_mtapi_task_execute(task, &task_context)) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
}
result = MTAPI_TRUE;
break;
case MTAPI_TASK_RETAINED:
/* put task into queue again for later execution */
embb_mtapi_scheduler_schedule_task(
node->scheduler, task, 0);
/* yield, as there may be only retained tasks in the queue */
embb_thread_yield();
/* task is not done, so do not notify queue */
break;
case MTAPI_TASK_CANCELLED:
/* set return value to cancelled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
if (MTAPI_NULL != local_group) {
embb_mtapi_task_queue_push(&local_group->queue, task);
}
}
if (MTAPI_NULL != local_action) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
break;
case MTAPI_TASK_COMPLETED:
case MTAPI_TASK_DELETED:
case MTAPI_TASK_WAITING:
case MTAPI_TASK_CREATED:
case MTAPI_TASK_PRENATAL:
case MTAPI_TASK_ERROR:
case MTAPI_TASK_INTENTIONALLY_UNUSED:
default:
/* do nothing, although this is an error */
break;
}
/* issue task complete callback if set */
if (MTAPI_NULL != task->attributes.complete_func) {
task->attributes.complete_func(task->handle, MTAPI_NULL);
}
return result;
}
void embb_mtapi_scheduler_execute_task_or_yield( void embb_mtapi_scheduler_execute_task_or_yield(
embb_mtapi_scheduler_t * that, embb_mtapi_scheduler_t * that,
embb_mtapi_node_t * node, embb_mtapi_node_t * node,
...@@ -225,10 +321,7 @@ void embb_mtapi_scheduler_execute_task_or_yield( ...@@ -225,10 +321,7 @@ void embb_mtapi_scheduler_execute_task_or_yield(
that, node, thread_context); that, node, thread_context);
/* if there was work, execute it */ /* if there was work, execute it */
if (MTAPI_NULL != new_task) { if (MTAPI_NULL != new_task) {
embb_mtapi_task_context_t task_context; embb_mtapi_scheduler_execute_task(new_task, node, thread_context);
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, new_task);
embb_mtapi_task_execute(new_task, &task_context);
} else { } else {
embb_thread_yield(); embb_thread_yield();
} }
...@@ -253,7 +346,6 @@ embb_mtapi_scheduler_worker_func(embb_mtapi_scheduler_t * that) { ...@@ -253,7 +346,6 @@ embb_mtapi_scheduler_worker_func(embb_mtapi_scheduler_t * that) {
int embb_mtapi_scheduler_worker(void * arg) { int embb_mtapi_scheduler_worker(void * arg) {
embb_mtapi_thread_context_t * thread_context = embb_mtapi_thread_context_t * thread_context =
(embb_mtapi_thread_context_t*)arg; (embb_mtapi_thread_context_t*)arg;
embb_mtapi_task_context_t task_context;
embb_mtapi_node_t * node; embb_mtapi_node_t * node;
embb_duration_t sleep_duration; embb_duration_t sleep_duration;
int err; int err;
...@@ -293,91 +385,8 @@ int embb_mtapi_scheduler_worker(void * arg) { ...@@ -293,91 +385,8 @@ int embb_mtapi_scheduler_worker(void * arg) {
node->scheduler, node, thread_context); node->scheduler, node, thread_context);
/* check if there was work */ /* check if there was work */
if (MTAPI_NULL != task) { if (MTAPI_NULL != task) {
embb_mtapi_queue_t * local_queue = MTAPI_NULL; if (embb_mtapi_scheduler_execute_task(task, node, thread_context)) {
embb_mtapi_group_t * local_group = MTAPI_NULL;
embb_mtapi_action_t * local_action = MTAPI_NULL;
/* is task associated with a queue? */
if (embb_mtapi_queue_pool_is_handle_valid(
node->queue_pool, task->queue)) {
local_queue =
embb_mtapi_queue_pool_get_storage_for_handle(
node->queue_pool, task->queue);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, task->group)) {
local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, task->group);
}
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
}
switch (embb_atomic_load_int(&task->state)) {
case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
case MTAPI_TASK_RUNNING:
/* there was work, execute it */
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, task);
if (embb_mtapi_task_execute(task, &task_context)) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
}
counter = 0; counter = 0;
break;
case MTAPI_TASK_RETAINED:
/* put task into queue again for later execution */
embb_mtapi_scheduler_schedule_task(
node->scheduler, task, 0);
/* yield, as there may be only retained tasks in the queue */
embb_thread_yield();
/* task is not done, so do not notify queue */
break;
case MTAPI_TASK_CANCELLED:
/* set return value to cancelled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
if (MTAPI_NULL != local_group) {
embb_mtapi_task_queue_push(&local_group->queue, task);
}
}
if (MTAPI_NULL != local_action) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
break;
case MTAPI_TASK_COMPLETED:
case MTAPI_TASK_DELETED:
case MTAPI_TASK_WAITING:
case MTAPI_TASK_CREATED:
case MTAPI_TASK_PRENATAL:
case MTAPI_TASK_ERROR:
case MTAPI_TASK_INTENTIONALLY_UNUSED:
default:
/* do nothing, although this is an error */
break;
}
/* issue task complete callback if set */
if (MTAPI_NULL != task->attributes.complete_func) {
task->attributes.complete_func(task->handle, MTAPI_NULL);
} }
} else if (counter < 1024) { } else if (counter < 1024) {
/* spin and yield for a while before going to sleep */ /* spin and yield for a while before going to sleep */
......
...@@ -138,6 +138,15 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context( ...@@ -138,6 +138,15 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context(
embb_mtapi_scheduler_t * that); embb_mtapi_scheduler_t * that);
/** /**
* Executes the given task if the thread context is valid.
* \memberof embb_mtapi_scheduler_struct
*/
mtapi_boolean_t embb_mtapi_scheduler_execute_task(
embb_mtapi_task_t * task,
embb_mtapi_node_t * node,
embb_mtapi_thread_context_t * thread_context);
/**
* Fetches and executes a single task if the thread context is valid, * Fetches and executes a single task if the thread context is valid,
* yields otherwise. * yields otherwise.
* \memberof embb_mtapi_scheduler_struct * \memberof embb_mtapi_scheduler_struct
......
...@@ -43,6 +43,8 @@ class NodeAttributes { ...@@ -43,6 +43,8 @@ class NodeAttributes {
public: public:
/** /**
* Constructs a NodeAttributes object. * Constructs a NodeAttributes object.
*
* \waitfree
*/ */
NodeAttributes() { NodeAttributes() {
mtapi_status_t status; mtapi_status_t status;
...@@ -52,6 +54,8 @@ class NodeAttributes { ...@@ -52,6 +54,8 @@ class NodeAttributes {
/** /**
* Copies a NodeAttributes object. * Copies a NodeAttributes object.
*
* \waitfree
*/ */
NodeAttributes( NodeAttributes(
NodeAttributes const & other /**< The NodeAttributes to copy. */ NodeAttributes const & other /**< The NodeAttributes to copy. */
...@@ -62,6 +66,8 @@ class NodeAttributes { ...@@ -62,6 +66,8 @@ class NodeAttributes {
/** /**
* Copies a NodeAttributes object. * Copies a NodeAttributes object.
*
* \waitfree
*/ */
void operator=( void operator=(
NodeAttributes const & other /**< The NodeAttributes to copy. */ NodeAttributes const & other /**< The NodeAttributes to copy. */
...@@ -233,6 +239,22 @@ class NodeAttributes { ...@@ -233,6 +239,22 @@ class NodeAttributes {
} }
/** /**
* Enables or disables the reuse of the main thread as a worker.
*
* \returns Reference to this object.
* \notthreadsafe
*/
NodeAttributes & SetReuseMainThread(
mtapi_boolean_t reuse
) {
mtapi_status_t status;
mtapi_nodeattr_set(&attributes_, MTAPI_NODE_REUSE_MAIN_THREAD,
&reuse, sizeof(reuse), &status);
internal::CheckStatus(status);
return *this;
}
/**
* Returns the internal representation of this object. * Returns the internal representation of this object.
* Allows for interoperability with the C interface. * Allows for interoperability with the C interface.
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment