Commit 55983af6 by Marcus Winter

dataflow_cpp: moved processing of sources to worker threads

parent 754854ab
...@@ -46,6 +46,9 @@ class Scheduler { ...@@ -46,6 +46,9 @@ class Scheduler {
int process_id, int process_id,
Action & action, Action & action,
embb::mtapi::ExecutionPolicy const & policy) = 0; embb::mtapi::ExecutionPolicy const & policy) = 0;
virtual void Run(
Action & action,
embb::mtapi::ExecutionPolicy const & policy) = 0;
virtual void WaitForSlice(int slice) = 0; virtual void WaitForSlice(int slice) = 0;
virtual int GetSlices() = 0; virtual int GetSlices() = 0;
}; };
......
...@@ -105,6 +105,16 @@ class SchedulerMTAPI : public Scheduler { ...@@ -105,6 +105,16 @@ class SchedulerMTAPI : public Scheduler {
group_[idx].Start(job_, &action, static_cast<void*>(NULL), group_[idx].Start(job_, &action, static_cast<void*>(NULL),
task_attr); task_attr);
} }
virtual void Run(
Action & action,
embb::mtapi::ExecutionPolicy const & policy) {
embb::mtapi::Node & node = embb::mtapi::Node::GetInstance();
embb::mtapi::TaskAttributes task_attr;
task_attr.SetPolicy(policy);
embb::mtapi::Task task = node.Start(job_, &action, static_cast<void*>(NULL),
task_attr);
task.Wait();
}
virtual void Enqueue( virtual void Enqueue(
int process_id, int process_id,
Action & action, Action & action,
......
...@@ -43,6 +43,11 @@ class SchedulerSequential : public Scheduler { ...@@ -43,6 +43,11 @@ class SchedulerSequential : public Scheduler {
embb::mtapi::ExecutionPolicy const &) { embb::mtapi::ExecutionPolicy const &) {
action.RunSequential(); action.RunSequential();
} }
virtual void Run(
Action & action,
embb::mtapi::ExecutionPolicy const &) {
action.RunSequential();
}
virtual void Enqueue( virtual void Enqueue(
int, int,
Action & action, Action & action,
......
...@@ -67,7 +67,8 @@ class Source< Outputs<O1, O2, O3, O4, O5> > ...@@ -67,7 +67,8 @@ class Source< Outputs<O1, O2, O3, O4, O5> >
virtual bool Start(int clock) { virtual bool Start(int clock) {
if (not_done_) { if (not_done_) {
Run(clock); Action act(this, clock);
sched_->Run(act, embb::mtapi::ExecutionPolicy());
} }
return not_done_; return not_done_;
} }
......
...@@ -155,6 +155,7 @@ void SimpleTest::TestBasic() { ...@@ -155,6 +155,7 @@ void SimpleTest::TestBasic() {
embb::base::CoreSet core_set(true); embb::base::CoreSet core_set(true);
embb::mtapi::NodeAttributes node_attr; embb::mtapi::NodeAttributes node_attr;
node_attr node_attr
.SetReuseMainThread(MTAPI_TRUE)
.SetCoreAffinity(core_set) .SetCoreAffinity(core_set)
.SetMaxQueues(2); .SetMaxQueues(2);
embb::mtapi::Node::Initialize( embb::mtapi::Node::Initialize(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment