diff --git a/dataflow_cpp/include/embb/dataflow/internal/node.h b/dataflow_cpp/include/embb/dataflow/internal/node.h index 5c7d732..0c3fd2b 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/node.h +++ b/dataflow_cpp/include/embb/dataflow/internal/node.h @@ -29,6 +29,7 @@ #include #include +#include #include #include @@ -58,10 +59,14 @@ class Node { SetSlices(0); } } + void SetPolicy(embb::mtapi::ExecutionPolicy const & policy) { + policy_ = policy; + } protected: Scheduler * sched_; static int next_process_id_; + embb::mtapi::ExecutionPolicy policy_; static int GetNextProcessID() { return next_process_id_++; } virtual void SetSlices(int /*slices*/) {} diff --git a/dataflow_cpp/include/embb/dataflow/internal/process.h b/dataflow_cpp/include/embb/dataflow/internal/process.h index ad373b4..5a063dd 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process.h @@ -146,7 +146,7 @@ class Process< Serial, Inputs, for (int ii = clk; ii < clk_res; ii++) { const int idx = ii % slices_; action_[idx] = Action(this, ii); - sched_->Enqueue(queue_id_, action_[idx]); + sched_->Enqueue(queue_id_, action_[idx], policy_); } queued_clock_.Store(clk_res); retry = false; @@ -158,7 +158,7 @@ class Process< Serial, Inputs, } else { const int idx = clock % slices_; action_[idx] = Action(this, clock); - sched_->Start(action_[idx]); + sched_->Start(action_[idx], policy_); } } diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h index 61aaa6a..3a82187 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h @@ -27,6 +27,8 @@ #ifndef EMBB_DATAFLOW_INTERNAL_SCHEDULER_H_ #define EMBB_DATAFLOW_INTERNAL_SCHEDULER_H_ +#include + namespace embb { namespace dataflow { namespace internal { @@ -37,8 +39,13 @@ class Scheduler { public: Scheduler() {} virtual ~Scheduler() {} - virtual void Start(Action & action) = 0; - virtual void Enqueue(int process_id, Action & action) = 0; + virtual void Start( + Action & action, + embb::mtapi::ExecutionPolicy const & policy) = 0; + virtual void Enqueue( + int process_id, + Action & action, + embb::mtapi::ExecutionPolicy const & policy) = 0; virtual void WaitForSlice(int slice) = 0; virtual int GetSlices() = 0; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h index a7eee70..05ea5d9 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h @@ -96,14 +96,25 @@ class SchedulerMTAPI : public Scheduler { embb::base::Allocation::Free(group_); embb::base::Allocation::Free(queue_); } - virtual void Start(Action & action) { + virtual void Start( + Action & action, + embb::mtapi::ExecutionPolicy const & policy) { const int idx = action.GetClock() % slices_; - group_[idx].Start(job_, &action, static_cast(NULL)); + embb::mtapi::TaskAttributes task_attr; + task_attr.SetPolicy(policy); + group_[idx].Start(job_, &action, static_cast(NULL), + task_attr); } - virtual void Enqueue(int process_id, Action & action) { + virtual void Enqueue( + int process_id, + Action & action, + embb::mtapi::ExecutionPolicy const & policy) { const int idx = action.GetClock() % slices_; const int queue_id = process_id % queue_count_; - queue_[queue_id].Enqueue(&action, static_cast(NULL), group_[idx]); + embb::mtapi::TaskAttributes task_attr; + task_attr.SetPolicy(policy); + queue_[queue_id].Enqueue(&action, static_cast(NULL), + task_attr, group_[idx]); } virtual void WaitForSlice(int slice) { group_[slice].WaitAll(MTAPI_INFINITE); diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h index 500d41a..04a6c0e 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h @@ -38,10 +38,15 @@ class SchedulerSequential : public Scheduler { public: SchedulerSequential() {} virtual ~SchedulerSequential() {} - virtual void Start(Action & action) { + virtual void Start( + Action & action, + embb::mtapi::ExecutionPolicy const &) { action.RunSequential(); } - virtual void Enqueue(int, Action & action) { + virtual void Enqueue( + int, + Action & action, + embb::mtapi::ExecutionPolicy const &) { action.RunSequential(); } virtual void WaitForSlice(int /*slice*/) {} diff --git a/dataflow_cpp/include/embb/dataflow/internal/sink.h b/dataflow_cpp/include/embb/dataflow/internal/sink.h index e690a99..73bb542 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/sink.h +++ b/dataflow_cpp/include/embb/dataflow/internal/sink.h @@ -113,7 +113,7 @@ class Sink< Inputs > for (int ii = clk; ii < clk_res; ii++) { const int idx = ii % slices_; action_[idx] = Action(this, ii); - sched_->Enqueue(queue_id_, action_[idx]); + sched_->Enqueue(queue_id_, action_[idx], policy_); } queued_clock_.Store(clk_res); retry = false; diff --git a/dataflow_cpp/include/embb/dataflow/network.h b/dataflow_cpp/include/embb/dataflow/network.h index 4658bbb..1e00a77 100644 --- a/dataflow_cpp/include/embb/dataflow/network.h +++ b/dataflow_cpp/include/embb/dataflow/network.h @@ -71,6 +71,19 @@ class Network { explicit Network(int slices) {} /** + * Constructs an empty network. + * \param policy Default execution policy of the processes in the network. + */ + explicit Network(embb::mtapi::ExecutionPolicy const & policy) {} + + /** + * Constructs an empty network. + * \param slices Number of concurrent tokens allowed in the network. + * \param policy Default execution policy of the processes in the network. + */ + Network(int slices, embb::mtapi::ExecutionPolicy const & policy) {} + + /** * Input port class. */ template @@ -208,7 +221,16 @@ class Network { * \param network The network this node is going to be part of. * \param function The Function to call to process a token. */ - explicit SerialProcess(Network & network, FunctionType function); + SerialProcess(Network & network, FunctionType function); + + /** + * Constructs a SerialProcess with a user specified processing function. + * \param network The network this node is going to be part of. + * \param function The Function to call to process a token. + * \param policy The execution policy of the process. + */ + SerialProcess(Network & network, FunctionType function, + embb::mtapi::ExecutionPolicy const & policy); /** * \returns \c true if the SerialProcess has any inputs, \c false @@ -254,75 +276,84 @@ class Network { }; /** - * Generic parallel process template. - * - * Implements a generic parallel process in the network that may have one to - * four input ports and one to four output ports but no more that five total - * ports. - * Tokens are processed as soon as all inputs for that token are complete. - * - * \see Source, SerialProcess, Sink, Switch, Select - * - * \tparam Inputs Inputs of the process. - * \tparam Outputs Outputs of the process. - */ + * Generic parallel process template. + * + * Implements a generic parallel process in the network that may have one to + * four input ports and one to four output ports but no more that five total + * ports. + * Tokens are processed as soon as all inputs for that token are complete. + * + * \see Source, SerialProcess, Sink, Switch, Select + * + * \tparam Inputs Inputs of the process. + * \tparam Outputs Outputs of the process. + */ template class ParallelProcess { public: /** - * Function type to use when processing tokens. - */ + * Function type to use when processing tokens. + */ typedef embb::base::Function FunctionType; /** - * Input port type list. - */ + * Input port type list. + */ typedef Inputs InputsType; /** - * Output port type list. - */ + * Output port type list. + */ typedef Outputs OutputsType; /** - * Constructs a ParallelProcess with a user specified processing function. - * \param network The network this node is going to be part of. - * \param function The Function to call to process a token. - */ - explicit ParallelProcess(Network & network, FunctionType function); + * Constructs a ParallelProcess with a user specified processing function. + * \param network The network this node is going to be part of. + * \param function The Function to call to process a token. + */ + ParallelProcess(Network & network, FunctionType function); /** - * \returns \c true if the ParallelProcess has any inputs, \c false - * otherwise. - */ + * Constructs a ParallelProcess with a user specified processing function. + * \param network The network this node is going to be part of. + * \param function The Function to call to process a token. + * \param policy The execution policy of the process. + */ + ParallelProcess(Network & network, FunctionType function, + embb::mtapi::ExecutionPolicy const & policy); + + /** + * \returns \c true if the ParallelProcess has any inputs, \c false + * otherwise. + */ virtual bool HasInputs() const; /** - * \returns Reference to a list of all input ports. - */ + * \returns Reference to a list of all input ports. + */ InputsType & GetInputs(); /** - * \returns Input port at Index. - */ + * \returns Input port at Index. + */ template typename InputsType::Types::Result & GetInput(); /** - * \returns \c true if the ParallelProcess has any outputs, \c false - * otherwise. - */ + * \returns \c true if the ParallelProcess has any outputs, \c false + * otherwise. + */ virtual bool HasOutputs() const; /** - * \returns Reference to a list of all output ports. - */ + * \returns Reference to a list of all output ports. + */ OutputsType & GetOutputs(); /** - * \returns Output port at Index. - */ + * \returns Output port at Index. + */ template typename OutputsType::Types::Result & GetOutput(); @@ -373,6 +404,13 @@ class Network { explicit Select(Network & network); /** + * Constructs a Switch process. + * \param network The network this node is going to be part of. + * \param policy The execution policy of the process. + */ + Select(Network & network, embb::mtapi::ExecutionPolicy const & policy); + + /** * \returns Always \c true. */ virtual bool HasInputs() const; @@ -451,6 +489,13 @@ class Network { explicit Select(Network & network); /** + * Constructs a Select process. + * \param network The network this node is going to be part of. + * \param policy The execution policy of the process. + */ + Select(Network & network, embb::mtapi::ExecutionPolicy const & policy); + + /** * \returns Always \c true. */ virtual bool HasInputs() const; @@ -528,7 +573,16 @@ class Network { * \param network The network this node is going to be part of. * \param function The Function to call to process a token. */ - explicit Sink(Network & network, FunctionType function); + Sink(Network & network, FunctionType function); + + /** + * Constructs a Sink with a user specified processing function. + * \param network The network this node is going to be part of. + * \param function The Function to call to process a token. + * \param policy The execution policy of the process. + */ + Sink(Network & network, FunctionType function, + embb::mtapi::ExecutionPolicy const & policy); /** * \returns Always \c true. @@ -588,7 +642,16 @@ class Network { * \param network The network this node is going to be part of. * \param function The Function to call to emit a token. */ - explicit Source(Network & network, FunctionType function); + Source(Network & network, FunctionType function); + + /** + * Constructs a Source with a user specified processing function. + * \param network The network this node is going to be part of. + * \param function The Function to call to emit a token. + * \param policy The execution policy of the process. + */ + Source(Network & network, FunctionType function, + embb::mtapi::ExecutionPolicy const & policy); /** * \returns Always \c false. @@ -641,7 +704,16 @@ class Network { * \param network The network this node is going to be part of. * \param value The value to emit. */ - explicit ConstantSource(Network & network, Type value); + ConstantSource(Network & network, Type value); + + /** + * Constructs a ConstantSource with a value to emit on each token. + * \param network The network this node is going to be part of. + * \param value The value to emit. + * \param policy The execution policy of the process. + */ + ConstantSource(Network & network, Type value, + embb::mtapi::ExecutionPolicy const & policy); /** * \returns Always \c false. @@ -698,12 +770,29 @@ class Network { class Network : public internal::ClockListener { public: Network() - : sink_counter_(NULL), sink_count_(0), slices_(0), sched_(NULL) { + : sink_counter_(NULL), sink_count_(0) + , slices_(0), sched_(NULL) + , policy_() { // empty } explicit Network(int slices) - : sink_counter_(NULL), sink_count_(0), slices_(slices), sched_(NULL) { + : sink_counter_(NULL), sink_count_(0), + slices_(slices), sched_(NULL) + , policy_() { + PrepareSlices(); + } + + explicit Network(embb::mtapi::ExecutionPolicy const & policy) + : sink_counter_(NULL), sink_count_(0) + , slices_(0), sched_(NULL) + , policy_(policy) { + } + + Network(int slices, embb::mtapi::ExecutionPolicy const & policy) + : sink_counter_(NULL), sink_count_(0) + , slices_(slices), sched_(NULL) + , policy_(policy) { PrepareSlices(); } @@ -751,11 +840,23 @@ class Network : public internal::ClockListener { internal::Inputs, internal::Outputs >::FunctionType FunctionType; - explicit SerialProcess(Network & network, FunctionType function) + + SerialProcess(Network & network, FunctionType function) : internal::Process< true, internal::Inputs, internal::Outputs >( network.sched_, function) { + SetPolicy(network.policy_); + network.processes_.push_back(this); + } + + SerialProcess(Network & network, FunctionType function, + embb::mtapi::ExecutionPolicy const & policy) + : internal::Process< true, + internal::Inputs, + internal::Outputs >( + network.sched_, function) { + SetPolicy(policy); network.processes_.push_back(this); } }; @@ -775,11 +876,23 @@ class Network : public internal::ClockListener { internal::Inputs, internal::Outputs >::FunctionType FunctionType; - explicit ParallelProcess(Network & network, FunctionType function) + + ParallelProcess(Network & network, FunctionType function) : internal::Process< false, internal::Inputs, internal::Outputs >( network.sched_, function) { + SetPolicy(network.policy_); + network.processes_.push_back(this); + } + + ParallelProcess(Network & network, FunctionType function, + embb::mtapi::ExecutionPolicy const & policy) + : internal::Process< false, + internal::Inputs, + internal::Outputs >( + network.sched_, function) { + SetPolicy(policy); network.processes_.push_back(this); } }; @@ -789,6 +902,13 @@ class Network : public internal::ClockListener { public: explicit Switch(Network & network) : internal::Switch(network.sched_) { + SetPolicy(network.policy_); + network.processes_.push_back(this); + } + + Switch(Network & network, embb::mtapi::ExecutionPolicy const & policy) + : internal::Switch(network.sched_) { + SetPolicy(policy); network.processes_.push_back(this); } }; @@ -798,6 +918,13 @@ class Network : public internal::ClockListener { public: explicit Select(Network & network) : internal::Select(network.sched_) { + SetPolicy(network.policy_); + network.processes_.push_back(this); + } + + Select(Network & network, embb::mtapi::ExecutionPolicy const & policy) + : internal::Select(network.sched_) { + SetPolicy(policy); network.processes_.push_back(this); } }; @@ -812,10 +939,21 @@ class Network : public internal::ClockListener { typedef typename internal::Sink< internal::Inputs >::FunctionType FunctionType; - explicit Sink(Network & network, FunctionType function) + Sink(Network & network, FunctionType function) : internal::Sink< internal::Inputs >( network.sched_, &network, function) { + SetPolicy(network.policy_); + network.sinks_.push_back(this); + network.sink_count_++; + } + + Sink(Network & network, FunctionType function, + embb::mtapi::ExecutionPolicy const & policy) + : internal::Sink< + internal::Inputs >( + network.sched_, &network, function) { + SetPolicy(policy); network.sinks_.push_back(this); network.sink_count_++; } @@ -832,9 +970,18 @@ class Network : public internal::ClockListener { internal::Outputs >::FunctionType FunctionType; - explicit Source(Network & network, FunctionType function) + Source(Network & network, FunctionType function) : internal::Source< internal::Outputs >(network.sched_, function) { + SetPolicy(network.policy_); + network.sources_.push_back(this); + } + + Source(Network & network, FunctionType function, + embb::mtapi::ExecutionPolicy const & policy) + : internal::Source< + internal::Outputs >(network.sched_, function) { + SetPolicy(policy); network.sources_.push_back(this); } }; @@ -842,8 +989,16 @@ class Network : public internal::ClockListener { template class ConstantSource : public internal::ConstantSource { public: - explicit ConstantSource(Network & network, Type value) + ConstantSource(Network & network, Type value) + : internal::ConstantSource(network.sched_, value) { + SetPolicy(network.policy_); + network.sources_.push_back(this); + } + + ConstantSource(Network & network, Type value, + embb::mtapi::ExecutionPolicy const & policy) : internal::ConstantSource(network.sched_, value) { + SetPolicy(policy); network.sources_.push_back(this); } }; @@ -928,6 +1083,7 @@ class Network : public internal::ClockListener { int sink_count_; int slices_; internal::Scheduler * sched_; + embb::mtapi::ExecutionPolicy policy_; #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY std::vector spawn_history_[Slices];