From 55983af6ef294534d38ffda863d37f4a634e2a5f Mon Sep 17 00:00:00 2001 From: Marcus Winter Date: Thu, 27 Oct 2016 11:46:16 +0200 Subject: [PATCH] dataflow_cpp: moved processing of sources to worker threads --- dataflow_cpp/include/embb/dataflow/internal/scheduler.h | 3 +++ dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h | 10 ++++++++++ dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h | 5 +++++ dataflow_cpp/include/embb/dataflow/internal/source.h | 3 ++- dataflow_cpp/test/dataflow_cpp_test_simple.cc | 1 + 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h index 3a82187..9f0c23c 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h @@ -46,6 +46,9 @@ class Scheduler { int process_id, Action & action, embb::mtapi::ExecutionPolicy const & policy) = 0; + virtual void Run( + Action & action, + embb::mtapi::ExecutionPolicy const & policy) = 0; virtual void WaitForSlice(int slice) = 0; virtual int GetSlices() = 0; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h index 05ea5d9..61e80ae 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h @@ -105,6 +105,16 @@ class SchedulerMTAPI : public Scheduler { group_[idx].Start(job_, &action, static_cast(NULL), task_attr); } + virtual void Run( + Action & action, + embb::mtapi::ExecutionPolicy const & policy) { + embb::mtapi::Node & node = embb::mtapi::Node::GetInstance(); + embb::mtapi::TaskAttributes task_attr; + task_attr.SetPolicy(policy); + embb::mtapi::Task task = node.Start(job_, &action, static_cast(NULL), + task_attr); + task.Wait(); + } virtual void Enqueue( int process_id, Action & action, diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h index 04a6c0e..1e150cc 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h @@ -43,6 +43,11 @@ class SchedulerSequential : public Scheduler { embb::mtapi::ExecutionPolicy const &) { action.RunSequential(); } + virtual void Run( + Action & action, + embb::mtapi::ExecutionPolicy const &) { + action.RunSequential(); + } virtual void Enqueue( int, Action & action, diff --git a/dataflow_cpp/include/embb/dataflow/internal/source.h b/dataflow_cpp/include/embb/dataflow/internal/source.h index 26d3734..4586123 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source.h @@ -67,7 +67,8 @@ class Source< Outputs > virtual bool Start(int clock) { if (not_done_) { - Run(clock); + Action act(this, clock); + sched_->Run(act, embb::mtapi::ExecutionPolicy()); } return not_done_; } diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.cc b/dataflow_cpp/test/dataflow_cpp_test_simple.cc index c0b56db..e25186a 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.cc +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.cc @@ -155,6 +155,7 @@ void SimpleTest::TestBasic() { embb::base::CoreSet core_set(true); embb::mtapi::NodeAttributes node_attr; node_attr + .SetReuseMainThread(MTAPI_TRUE) .SetCoreAffinity(core_set) .SetMaxQueues(2); embb::mtapi::Node::Initialize( -- libgit2 0.26.0