Commit e9783817 by Marcus Winter

Merge branch 'embb613_reuse_main_thread' into embb620_detached_tasks

parents bcd0ad73 6b97196b
......@@ -46,6 +46,9 @@ class Scheduler {
int process_id,
Action & action,
embb::mtapi::ExecutionPolicy const & policy) = 0;
virtual void Run(
Action & action,
embb::mtapi::ExecutionPolicy const & policy) = 0;
virtual void WaitForSlice(int slice) = 0;
virtual int GetSlices() = 0;
};
......
......@@ -105,6 +105,16 @@ class SchedulerMTAPI : public Scheduler {
group_[idx].Start(job_, &action, static_cast<void*>(NULL),
task_attr);
}
virtual void Run(
Action & action,
embb::mtapi::ExecutionPolicy const & policy) {
embb::mtapi::Node & node = embb::mtapi::Node::GetInstance();
embb::mtapi::TaskAttributes task_attr;
task_attr.SetPolicy(policy);
embb::mtapi::Task task = node.Start(job_, &action, static_cast<void*>(NULL),
task_attr);
task.Wait();
}
virtual void Enqueue(
int process_id,
Action & action,
......
......@@ -43,6 +43,11 @@ class SchedulerSequential : public Scheduler {
embb::mtapi::ExecutionPolicy const &) {
action.RunSequential();
}
virtual void Run(
Action & action,
embb::mtapi::ExecutionPolicy const &) {
action.RunSequential();
}
virtual void Enqueue(
int,
Action & action,
......
......@@ -32,6 +32,7 @@
#include <embb/dataflow/internal/node.h>
#include <embb/dataflow/internal/outputs.h>
#include <embb/dataflow/internal/source_executor.h>
#include <embb/dataflow/internal/action.h>
namespace embb {
namespace dataflow {
......@@ -67,7 +68,8 @@ class Source< Outputs<O1, O2, O3, O4, O5> >
virtual bool Start(int clock) {
if (not_done_) {
Run(clock);
Action act(this, clock);
sched_->Run(act, embb::mtapi::ExecutionPolicy());
}
return not_done_;
}
......
......@@ -150,11 +150,12 @@ SimpleTest::SimpleTest() {
#define MTAPI_DOMAIN_ID 1
#define MTAPI_NODE_ID 1
void SimpleTest::TestBasic() {
void SimpleTest::TrySimple(bool reuse_main_thread) {
// All available cores
embb::base::CoreSet core_set(true);
embb::mtapi::NodeAttributes node_attr;
node_attr
.SetReuseMainThread(reuse_main_thread ? MTAPI_TRUE : MTAPI_FALSE)
.SetCoreAffinity(core_set)
.SetMaxQueues(2);
embb::mtapi::Node::Initialize(
......@@ -162,7 +163,7 @@ void SimpleTest::TestBasic() {
MTAPI_NODE_ID,
node_attr);
for (int ii = 0; ii < 10000; ii++) {
for (int ii = 0; ii < 1000; ii++) {
ArraySink<TEST_COUNT> asink;
MyNetwork network(NUM_SLICES);
MyConstantSource constant(network, 4);
......@@ -225,3 +226,7 @@ void SimpleTest::TestBasic() {
PT_EXPECT(embb_get_bytes_allocated() == 0);
}
void SimpleTest::TestBasic() {
TrySimple(false);
TrySimple(true);
}
......@@ -35,6 +35,8 @@ class SimpleTest : public partest::TestCase {
private:
void TestBasic();
void TrySimple(bool reuse_main_thread);
};
#endif // DATAFLOW_CPP_TEST_DATAFLOW_CPP_TEST_SIMPLE_H_
......@@ -213,6 +213,102 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context(
return context;
}
mtapi_boolean_t embb_mtapi_scheduler_execute_task(
embb_mtapi_task_t * task,
embb_mtapi_node_t * node,
embb_mtapi_thread_context_t * thread_context) {
embb_mtapi_task_context_t task_context;
mtapi_boolean_t result = MTAPI_FALSE;
embb_mtapi_queue_t * local_queue = MTAPI_NULL;
embb_mtapi_group_t * local_group = MTAPI_NULL;
embb_mtapi_action_t * local_action = MTAPI_NULL;
/* is task associated with a queue? */
if (embb_mtapi_queue_pool_is_handle_valid(
node->queue_pool, task->queue)) {
local_queue =
embb_mtapi_queue_pool_get_storage_for_handle(
node->queue_pool, task->queue);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, task->group)) {
local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, task->group);
}
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
}
switch (embb_atomic_load_int(&task->state)) {
case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
case MTAPI_TASK_RUNNING:
/* there was work, execute it */
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, task);
if (embb_mtapi_task_execute(task, &task_context)) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
}
result = MTAPI_TRUE;
break;
case MTAPI_TASK_RETAINED:
/* put task into queue again for later execution */
embb_mtapi_scheduler_schedule_task(
node->scheduler, task, 0);
/* yield, as there may be only retained tasks in the queue */
embb_thread_yield();
/* task is not done, so do not notify queue */
break;
case MTAPI_TASK_CANCELLED:
/* set return value to cancelled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
if (MTAPI_NULL != local_group) {
embb_mtapi_task_queue_push(&local_group->queue, task);
}
}
if (MTAPI_NULL != local_action) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
break;
case MTAPI_TASK_COMPLETED:
case MTAPI_TASK_DELETED:
case MTAPI_TASK_WAITING:
case MTAPI_TASK_CREATED:
case MTAPI_TASK_PRENATAL:
case MTAPI_TASK_ERROR:
case MTAPI_TASK_INTENTIONALLY_UNUSED:
default:
/* do nothing, although this is an error */
break;
}
/* issue task complete callback if set */
if (MTAPI_NULL != task->attributes.complete_func) {
task->attributes.complete_func(task->handle, MTAPI_NULL);
}
return result;
}
void embb_mtapi_scheduler_execute_task_or_yield(
embb_mtapi_scheduler_t * that,
embb_mtapi_node_t * node,
......@@ -225,10 +321,7 @@ void embb_mtapi_scheduler_execute_task_or_yield(
that, node, thread_context);
/* if there was work, execute it */
if (MTAPI_NULL != new_task) {
embb_mtapi_task_context_t task_context;
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, new_task);
embb_mtapi_task_execute(new_task, &task_context);
embb_mtapi_scheduler_execute_task(new_task, node, thread_context);
} else {
embb_thread_yield();
}
......@@ -253,7 +346,6 @@ embb_mtapi_scheduler_worker_func(embb_mtapi_scheduler_t * that) {
int embb_mtapi_scheduler_worker(void * arg) {
embb_mtapi_thread_context_t * thread_context =
(embb_mtapi_thread_context_t*)arg;
embb_mtapi_task_context_t task_context;
embb_mtapi_node_t * node;
embb_duration_t sleep_duration;
int err;
......@@ -293,91 +385,8 @@ int embb_mtapi_scheduler_worker(void * arg) {
node->scheduler, node, thread_context);
/* check if there was work */
if (MTAPI_NULL != task) {
embb_mtapi_queue_t * local_queue = MTAPI_NULL;
embb_mtapi_group_t * local_group = MTAPI_NULL;
embb_mtapi_action_t * local_action = MTAPI_NULL;
/* is task associated with a queue? */
if (embb_mtapi_queue_pool_is_handle_valid(
node->queue_pool, task->queue)) {
local_queue =
embb_mtapi_queue_pool_get_storage_for_handle(
node->queue_pool, task->queue);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, task->group)) {
local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, task->group);
}
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
}
switch (embb_atomic_load_int(&task->state)) {
case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
case MTAPI_TASK_RUNNING:
/* there was work, execute it */
embb_mtapi_task_context_initialize_with_thread_context_and_task(
&task_context, thread_context, task);
if (embb_mtapi_task_execute(task, &task_context)) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
}
if (embb_mtapi_scheduler_execute_task(task, node, thread_context)) {
counter = 0;
break;
case MTAPI_TASK_RETAINED:
/* put task into queue again for later execution */
embb_mtapi_scheduler_schedule_task(
node->scheduler, task, 0);
/* yield, as there may be only retained tasks in the queue */
embb_thread_yield();
/* task is not done, so do not notify queue */
break;
case MTAPI_TASK_CANCELLED:
/* set return value to cancelled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
/* tell queue that a task is done */
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
if (MTAPI_NULL != local_group) {
embb_mtapi_task_queue_push(&local_group->queue, task);
}
}
if (MTAPI_NULL != local_action) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
break;
case MTAPI_TASK_COMPLETED:
case MTAPI_TASK_DELETED:
case MTAPI_TASK_WAITING:
case MTAPI_TASK_CREATED:
case MTAPI_TASK_PRENATAL:
case MTAPI_TASK_ERROR:
case MTAPI_TASK_INTENTIONALLY_UNUSED:
default:
/* do nothing, although this is an error */
break;
}
/* issue task complete callback if set */
if (MTAPI_NULL != task->attributes.complete_func) {
task->attributes.complete_func(task->handle, MTAPI_NULL);
}
if (MTAPI_TRUE == task->attributes.is_detached) {
embb_mtapi_task_delete(task, node->task_pool);
......
......@@ -138,6 +138,15 @@ embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context(
embb_mtapi_scheduler_t * that);
/**
* Executes the given task if the thread context is valid.
* \memberof embb_mtapi_scheduler_struct
*/
mtapi_boolean_t embb_mtapi_scheduler_execute_task(
embb_mtapi_task_t * task,
embb_mtapi_node_t * node,
embb_mtapi_thread_context_t * thread_context);
/**
* Fetches and executes a single task if the thread context is valid,
* yields otherwise.
* \memberof embb_mtapi_scheduler_struct
......
......@@ -43,6 +43,8 @@ class NodeAttributes {
public:
/**
* Constructs a NodeAttributes object.
*
* \waitfree
*/
NodeAttributes() {
mtapi_status_t status;
......@@ -52,6 +54,8 @@ class NodeAttributes {
/**
* Copies a NodeAttributes object.
*
* \waitfree
*/
NodeAttributes(
NodeAttributes const & other /**< The NodeAttributes to copy. */
......@@ -62,6 +66,8 @@ class NodeAttributes {
/**
* Copies a NodeAttributes object.
*
* \waitfree
*/
void operator=(
NodeAttributes const & other /**< The NodeAttributes to copy. */
......@@ -233,6 +239,22 @@ class NodeAttributes {
}
/**
* Enables or disables the reuse of the main thread as a worker.
*
* \returns Reference to this object.
* \notthreadsafe
*/
NodeAttributes & SetReuseMainThread(
mtapi_boolean_t reuse
) {
mtapi_status_t status;
mtapi_nodeattr_set(&attributes_, MTAPI_NODE_REUSE_MAIN_THREAD,
&reuse, sizeof(reuse), &status);
internal::CheckStatus(status);
return *this;
}
/**
* Returns the internal representation of this object.
* Allows for interoperability with the C interface.
*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment