Commit b1ecc318 by Marcus Winter

dataflow_cpp: added support for priorities and affinities

parent ae4813ec
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include <cstddef> #include <cstddef>
#include <embb/base/exceptions.h> #include <embb/base/exceptions.h>
#include <embb/mtapi/execution_policy.h>
#include <embb/dataflow/internal/scheduler.h> #include <embb/dataflow/internal/scheduler.h>
#include <embb/dataflow/internal/clock_listener.h> #include <embb/dataflow/internal/clock_listener.h>
...@@ -58,10 +59,14 @@ class Node { ...@@ -58,10 +59,14 @@ class Node {
SetSlices(0); SetSlices(0);
} }
} }
void SetPolicy(embb::mtapi::ExecutionPolicy const & policy) {
policy_ = policy;
}
protected: protected:
Scheduler * sched_; Scheduler * sched_;
static int next_process_id_; static int next_process_id_;
embb::mtapi::ExecutionPolicy policy_;
static int GetNextProcessID() { return next_process_id_++; } static int GetNextProcessID() { return next_process_id_++; }
virtual void SetSlices(int /*slices*/) {} virtual void SetSlices(int /*slices*/) {}
......
...@@ -146,7 +146,7 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>, ...@@ -146,7 +146,7 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
for (int ii = clk; ii < clk_res; ii++) { for (int ii = clk; ii < clk_res; ii++) {
const int idx = ii % slices_; const int idx = ii % slices_;
action_[idx] = Action(this, ii); action_[idx] = Action(this, ii);
sched_->Enqueue(queue_id_, action_[idx]); sched_->Enqueue(queue_id_, action_[idx], policy_);
} }
queued_clock_.Store(clk_res); queued_clock_.Store(clk_res);
retry = false; retry = false;
...@@ -158,7 +158,7 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>, ...@@ -158,7 +158,7 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
} else { } else {
const int idx = clock % slices_; const int idx = clock % slices_;
action_[idx] = Action(this, clock); action_[idx] = Action(this, clock);
sched_->Start(action_[idx]); sched_->Start(action_[idx], policy_);
} }
} }
......
...@@ -27,6 +27,8 @@ ...@@ -27,6 +27,8 @@
#ifndef EMBB_DATAFLOW_INTERNAL_SCHEDULER_H_ #ifndef EMBB_DATAFLOW_INTERNAL_SCHEDULER_H_
#define EMBB_DATAFLOW_INTERNAL_SCHEDULER_H_ #define EMBB_DATAFLOW_INTERNAL_SCHEDULER_H_
#include <embb/mtapi/execution_policy.h>
namespace embb { namespace embb {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
...@@ -37,8 +39,13 @@ class Scheduler { ...@@ -37,8 +39,13 @@ class Scheduler {
public: public:
Scheduler() {} Scheduler() {}
virtual ~Scheduler() {} virtual ~Scheduler() {}
virtual void Start(Action & action) = 0; virtual void Start(
virtual void Enqueue(int process_id, Action & action) = 0; Action & action,
embb::mtapi::ExecutionPolicy const & policy) = 0;
virtual void Enqueue(
int process_id,
Action & action,
embb::mtapi::ExecutionPolicy const & policy) = 0;
virtual void WaitForSlice(int slice) = 0; virtual void WaitForSlice(int slice) = 0;
virtual int GetSlices() = 0; virtual int GetSlices() = 0;
}; };
......
...@@ -96,14 +96,25 @@ class SchedulerMTAPI : public Scheduler { ...@@ -96,14 +96,25 @@ class SchedulerMTAPI : public Scheduler {
embb::base::Allocation::Free(group_); embb::base::Allocation::Free(group_);
embb::base::Allocation::Free(queue_); embb::base::Allocation::Free(queue_);
} }
virtual void Start(Action & action) { virtual void Start(
Action & action,
embb::mtapi::ExecutionPolicy const & policy) {
const int idx = action.GetClock() % slices_; const int idx = action.GetClock() % slices_;
group_[idx].Start(job_, &action, static_cast<void*>(NULL)); embb::mtapi::TaskAttributes task_attr;
task_attr.SetPolicy(policy);
group_[idx].Start(job_, &action, static_cast<void*>(NULL),
task_attr);
} }
virtual void Enqueue(int process_id, Action & action) { virtual void Enqueue(
int process_id,
Action & action,
embb::mtapi::ExecutionPolicy const & policy) {
const int idx = action.GetClock() % slices_; const int idx = action.GetClock() % slices_;
const int queue_id = process_id % queue_count_; const int queue_id = process_id % queue_count_;
queue_[queue_id].Enqueue(&action, static_cast<void*>(NULL), group_[idx]); embb::mtapi::TaskAttributes task_attr;
task_attr.SetPolicy(policy);
queue_[queue_id].Enqueue(&action, static_cast<void*>(NULL),
task_attr, group_[idx]);
} }
virtual void WaitForSlice(int slice) { virtual void WaitForSlice(int slice) {
group_[slice].WaitAll(MTAPI_INFINITE); group_[slice].WaitAll(MTAPI_INFINITE);
......
...@@ -38,10 +38,15 @@ class SchedulerSequential : public Scheduler { ...@@ -38,10 +38,15 @@ class SchedulerSequential : public Scheduler {
public: public:
SchedulerSequential() {} SchedulerSequential() {}
virtual ~SchedulerSequential() {} virtual ~SchedulerSequential() {}
virtual void Start(Action & action) { virtual void Start(
Action & action,
embb::mtapi::ExecutionPolicy const &) {
action.RunSequential(); action.RunSequential();
} }
virtual void Enqueue(int, Action & action) { virtual void Enqueue(
int,
Action & action,
embb::mtapi::ExecutionPolicy const &) {
action.RunSequential(); action.RunSequential();
} }
virtual void WaitForSlice(int /*slice*/) {} virtual void WaitForSlice(int /*slice*/) {}
......
...@@ -113,7 +113,7 @@ class Sink< Inputs<I1, I2, I3, I4, I5> > ...@@ -113,7 +113,7 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
for (int ii = clk; ii < clk_res; ii++) { for (int ii = clk; ii < clk_res; ii++) {
const int idx = ii % slices_; const int idx = ii % slices_;
action_[idx] = Action(this, ii); action_[idx] = Action(this, ii);
sched_->Enqueue(queue_id_, action_[idx]); sched_->Enqueue(queue_id_, action_[idx], policy_);
} }
queued_clock_.Store(clk_res); queued_clock_.Store(clk_res);
retry = false; retry = false;
......
...@@ -71,6 +71,19 @@ class Network { ...@@ -71,6 +71,19 @@ class Network {
explicit Network(int slices) {} explicit Network(int slices) {}
/** /**
* Constructs an empty network.
* \param policy Default execution policy of the processes in the network.
*/
explicit Network(embb::mtapi::ExecutionPolicy const & policy) {}
/**
* Constructs an empty network.
* \param slices Number of concurrent tokens allowed in the network.
* \param policy Default execution policy of the processes in the network.
*/
Network(int slices, embb::mtapi::ExecutionPolicy const & policy) {}
/**
* Input port class. * Input port class.
*/ */
template <typename Type> template <typename Type>
...@@ -208,7 +221,16 @@ class Network { ...@@ -208,7 +221,16 @@ class Network {
* \param network The network this node is going to be part of. * \param network The network this node is going to be part of.
* \param function The Function to call to process a token. * \param function The Function to call to process a token.
*/ */
explicit SerialProcess(Network & network, FunctionType function); SerialProcess(Network & network, FunctionType function);
/**
* Constructs a SerialProcess with a user specified processing function.
* \param network The network this node is going to be part of.
* \param function The Function to call to process a token.
* \param policy The execution policy of the process.
*/
SerialProcess(Network & network, FunctionType function,
embb::mtapi::ExecutionPolicy const & policy);
/** /**
* \returns \c true if the SerialProcess has any inputs, \c false * \returns \c true if the SerialProcess has any inputs, \c false
...@@ -254,75 +276,84 @@ class Network { ...@@ -254,75 +276,84 @@ class Network {
}; };
/** /**
* Generic parallel process template. * Generic parallel process template.
* *
* Implements a generic parallel process in the network that may have one to * Implements a generic parallel process in the network that may have one to
* four input ports and one to four output ports but no more that five total * four input ports and one to four output ports but no more that five total
* ports. * ports.
* Tokens are processed as soon as all inputs for that token are complete. * Tokens are processed as soon as all inputs for that token are complete.
* *
* \see Source, SerialProcess, Sink, Switch, Select * \see Source, SerialProcess, Sink, Switch, Select
* *
* \tparam Inputs Inputs of the process. * \tparam Inputs Inputs of the process.
* \tparam Outputs Outputs of the process. * \tparam Outputs Outputs of the process.
*/ */
template <class Inputs, class Outputs> template <class Inputs, class Outputs>
class ParallelProcess { class ParallelProcess {
public: public:
/** /**
* Function type to use when processing tokens. * Function type to use when processing tokens.
*/ */
typedef embb::base::Function<void, INPUT_TYPE_LIST, OUTPUT_TYPE_LIST> typedef embb::base::Function<void, INPUT_TYPE_LIST, OUTPUT_TYPE_LIST>
FunctionType; FunctionType;
/** /**
* Input port type list. * Input port type list.
*/ */
typedef Inputs<INPUT_TYPE_LIST> InputsType; typedef Inputs<INPUT_TYPE_LIST> InputsType;
/** /**
* Output port type list. * Output port type list.
*/ */
typedef Outputs<OUTPUT_TYPE_LIST> OutputsType; typedef Outputs<OUTPUT_TYPE_LIST> OutputsType;
/** /**
* Constructs a ParallelProcess with a user specified processing function. * Constructs a ParallelProcess with a user specified processing function.
* \param network The network this node is going to be part of. * \param network The network this node is going to be part of.
* \param function The Function to call to process a token. * \param function The Function to call to process a token.
*/ */
explicit ParallelProcess(Network & network, FunctionType function); ParallelProcess(Network & network, FunctionType function);
/** /**
* \returns \c true if the ParallelProcess has any inputs, \c false * Constructs a ParallelProcess with a user specified processing function.
* otherwise. * \param network The network this node is going to be part of.
*/ * \param function The Function to call to process a token.
* \param policy The execution policy of the process.
*/
ParallelProcess(Network & network, FunctionType function,
embb::mtapi::ExecutionPolicy const & policy);
/**
* \returns \c true if the ParallelProcess has any inputs, \c false
* otherwise.
*/
virtual bool HasInputs() const; virtual bool HasInputs() const;
/** /**
* \returns Reference to a list of all input ports. * \returns Reference to a list of all input ports.
*/ */
InputsType & GetInputs(); InputsType & GetInputs();
/** /**
* \returns Input port at Index. * \returns Input port at Index.
*/ */
template <int Index> template <int Index>
typename InputsType::Types<Index>::Result & GetInput(); typename InputsType::Types<Index>::Result & GetInput();
/** /**
* \returns \c true if the ParallelProcess has any outputs, \c false * \returns \c true if the ParallelProcess has any outputs, \c false
* otherwise. * otherwise.
*/ */
virtual bool HasOutputs() const; virtual bool HasOutputs() const;
/** /**
* \returns Reference to a list of all output ports. * \returns Reference to a list of all output ports.
*/ */
OutputsType & GetOutputs(); OutputsType & GetOutputs();
/** /**
* \returns Output port at Index. * \returns Output port at Index.
*/ */
template <int Index> template <int Index>
typename OutputsType::Types<Index>::Result & GetOutput(); typename OutputsType::Types<Index>::Result & GetOutput();
...@@ -373,6 +404,13 @@ class Network { ...@@ -373,6 +404,13 @@ class Network {
explicit Select(Network & network); explicit Select(Network & network);
/** /**
* Constructs a Switch process.
* \param network The network this node is going to be part of.
* \param policy The execution policy of the process.
*/
Select(Network & network, embb::mtapi::ExecutionPolicy const & policy);
/**
* \returns Always \c true. * \returns Always \c true.
*/ */
virtual bool HasInputs() const; virtual bool HasInputs() const;
...@@ -451,6 +489,13 @@ class Network { ...@@ -451,6 +489,13 @@ class Network {
explicit Select(Network & network); explicit Select(Network & network);
/** /**
* Constructs a Select process.
* \param network The network this node is going to be part of.
* \param policy The execution policy of the process.
*/
Select(Network & network, embb::mtapi::ExecutionPolicy const & policy);
/**
* \returns Always \c true. * \returns Always \c true.
*/ */
virtual bool HasInputs() const; virtual bool HasInputs() const;
...@@ -528,7 +573,16 @@ class Network { ...@@ -528,7 +573,16 @@ class Network {
* \param network The network this node is going to be part of. * \param network The network this node is going to be part of.
* \param function The Function to call to process a token. * \param function The Function to call to process a token.
*/ */
explicit Sink(Network & network, FunctionType function); Sink(Network & network, FunctionType function);
/**
* Constructs a Sink with a user specified processing function.
* \param network The network this node is going to be part of.
* \param function The Function to call to process a token.
* \param policy The execution policy of the process.
*/
Sink(Network & network, FunctionType function,
embb::mtapi::ExecutionPolicy const & policy);
/** /**
* \returns Always \c true. * \returns Always \c true.
...@@ -588,7 +642,16 @@ class Network { ...@@ -588,7 +642,16 @@ class Network {
* \param network The network this node is going to be part of. * \param network The network this node is going to be part of.
* \param function The Function to call to emit a token. * \param function The Function to call to emit a token.
*/ */
explicit Source(Network & network, FunctionType function); Source(Network & network, FunctionType function);
/**
* Constructs a Source with a user specified processing function.
* \param network The network this node is going to be part of.
* \param function The Function to call to emit a token.
* \param policy The execution policy of the process.
*/
Source(Network & network, FunctionType function,
embb::mtapi::ExecutionPolicy const & policy);
/** /**
* \returns Always \c false. * \returns Always \c false.
...@@ -641,7 +704,16 @@ class Network { ...@@ -641,7 +704,16 @@ class Network {
* \param network The network this node is going to be part of. * \param network The network this node is going to be part of.
* \param value The value to emit. * \param value The value to emit.
*/ */
explicit ConstantSource(Network & network, Type value); ConstantSource(Network & network, Type value);
/**
* Constructs a ConstantSource with a value to emit on each token.
* \param network The network this node is going to be part of.
* \param value The value to emit.
* \param policy The execution policy of the process.
*/
ConstantSource(Network & network, Type value,
embb::mtapi::ExecutionPolicy const & policy);
/** /**
* \returns Always \c false. * \returns Always \c false.
...@@ -698,12 +770,29 @@ class Network { ...@@ -698,12 +770,29 @@ class Network {
class Network : public internal::ClockListener { class Network : public internal::ClockListener {
public: public:
Network() Network()
: sink_counter_(NULL), sink_count_(0), slices_(0), sched_(NULL) { : sink_counter_(NULL), sink_count_(0)
, slices_(0), sched_(NULL)
, policy_() {
// empty // empty
} }
explicit Network(int slices) explicit Network(int slices)
: sink_counter_(NULL), sink_count_(0), slices_(slices), sched_(NULL) { : sink_counter_(NULL), sink_count_(0),
slices_(slices), sched_(NULL)
, policy_() {
PrepareSlices();
}
explicit Network(embb::mtapi::ExecutionPolicy const & policy)
: sink_counter_(NULL), sink_count_(0)
, slices_(0), sched_(NULL)
, policy_(policy) {
}
Network(int slices, embb::mtapi::ExecutionPolicy const & policy)
: sink_counter_(NULL), sink_count_(0)
, slices_(slices), sched_(NULL)
, policy_(policy) {
PrepareSlices(); PrepareSlices();
} }
...@@ -751,11 +840,23 @@ class Network : public internal::ClockListener { ...@@ -751,11 +840,23 @@ class Network : public internal::ClockListener {
internal::Inputs<I1, I2, I3, I4, I5>, internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType
FunctionType; FunctionType;
explicit SerialProcess(Network & network, FunctionType function)
SerialProcess(Network & network, FunctionType function)
: internal::Process< true, : internal::Process< true,
internal::Inputs<I1, I2, I3, I4, I5>, internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >( internal::Outputs<O1, O2, O3, O4, O5> >(
network.sched_, function) { network.sched_, function) {
SetPolicy(network.policy_);
network.processes_.push_back(this);
}
SerialProcess(Network & network, FunctionType function,
embb::mtapi::ExecutionPolicy const & policy)
: internal::Process< true,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >(
network.sched_, function) {
SetPolicy(policy);
network.processes_.push_back(this); network.processes_.push_back(this);
} }
}; };
...@@ -775,11 +876,23 @@ class Network : public internal::ClockListener { ...@@ -775,11 +876,23 @@ class Network : public internal::ClockListener {
internal::Inputs<I1, I2, I3, I4, I5>, internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType
FunctionType; FunctionType;
explicit ParallelProcess(Network & network, FunctionType function)
ParallelProcess(Network & network, FunctionType function)
: internal::Process< false, : internal::Process< false,
internal::Inputs<I1, I2, I3, I4, I5>, internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >( internal::Outputs<O1, O2, O3, O4, O5> >(
network.sched_, function) { network.sched_, function) {
SetPolicy(network.policy_);
network.processes_.push_back(this);
}
ParallelProcess(Network & network, FunctionType function,
embb::mtapi::ExecutionPolicy const & policy)
: internal::Process< false,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >(
network.sched_, function) {
SetPolicy(policy);
network.processes_.push_back(this); network.processes_.push_back(this);
} }
}; };
...@@ -789,6 +902,13 @@ class Network : public internal::ClockListener { ...@@ -789,6 +902,13 @@ class Network : public internal::ClockListener {
public: public:
explicit Switch(Network & network) explicit Switch(Network & network)
: internal::Switch<Type>(network.sched_) { : internal::Switch<Type>(network.sched_) {
SetPolicy(network.policy_);
network.processes_.push_back(this);
}
Switch(Network & network, embb::mtapi::ExecutionPolicy const & policy)
: internal::Switch<Type>(network.sched_) {
SetPolicy(policy);
network.processes_.push_back(this); network.processes_.push_back(this);
} }
}; };
...@@ -798,6 +918,13 @@ class Network : public internal::ClockListener { ...@@ -798,6 +918,13 @@ class Network : public internal::ClockListener {
public: public:
explicit Select(Network & network) explicit Select(Network & network)
: internal::Select<Type>(network.sched_) { : internal::Select<Type>(network.sched_) {
SetPolicy(network.policy_);
network.processes_.push_back(this);
}
Select(Network & network, embb::mtapi::ExecutionPolicy const & policy)
: internal::Select<Type>(network.sched_) {
SetPolicy(policy);
network.processes_.push_back(this); network.processes_.push_back(this);
} }
}; };
...@@ -812,10 +939,21 @@ class Network : public internal::ClockListener { ...@@ -812,10 +939,21 @@ class Network : public internal::ClockListener {
typedef typename internal::Sink< typedef typename internal::Sink<
internal::Inputs<I1, I2, I3, I4, I5> >::FunctionType FunctionType; internal::Inputs<I1, I2, I3, I4, I5> >::FunctionType FunctionType;
explicit Sink(Network & network, FunctionType function) Sink(Network & network, FunctionType function)
: internal::Sink< : internal::Sink<
internal::Inputs<I1, I2, I3, I4, I5> >( internal::Inputs<I1, I2, I3, I4, I5> >(
network.sched_, &network, function) { network.sched_, &network, function) {
SetPolicy(network.policy_);
network.sinks_.push_back(this);
network.sink_count_++;
}
Sink(Network & network, FunctionType function,
embb::mtapi::ExecutionPolicy const & policy)
: internal::Sink<
internal::Inputs<I1, I2, I3, I4, I5> >(
network.sched_, &network, function) {
SetPolicy(policy);
network.sinks_.push_back(this); network.sinks_.push_back(this);
network.sink_count_++; network.sink_count_++;
} }
...@@ -832,9 +970,18 @@ class Network : public internal::ClockListener { ...@@ -832,9 +970,18 @@ class Network : public internal::ClockListener {
internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType
FunctionType; FunctionType;
explicit Source(Network & network, FunctionType function) Source(Network & network, FunctionType function)
: internal::Source< : internal::Source<
internal::Outputs<O1, O2, O3, O4, O5> >(network.sched_, function) { internal::Outputs<O1, O2, O3, O4, O5> >(network.sched_, function) {
SetPolicy(network.policy_);
network.sources_.push_back(this);
}
Source(Network & network, FunctionType function,
embb::mtapi::ExecutionPolicy const & policy)
: internal::Source<
internal::Outputs<O1, O2, O3, O4, O5> >(network.sched_, function) {
SetPolicy(policy);
network.sources_.push_back(this); network.sources_.push_back(this);
} }
}; };
...@@ -842,8 +989,16 @@ class Network : public internal::ClockListener { ...@@ -842,8 +989,16 @@ class Network : public internal::ClockListener {
template<typename Type> template<typename Type>
class ConstantSource : public internal::ConstantSource<Type> { class ConstantSource : public internal::ConstantSource<Type> {
public: public:
explicit ConstantSource(Network & network, Type value) ConstantSource(Network & network, Type value)
: internal::ConstantSource<Type>(network.sched_, value) {
SetPolicy(network.policy_);
network.sources_.push_back(this);
}
ConstantSource(Network & network, Type value,
embb::mtapi::ExecutionPolicy const & policy)
: internal::ConstantSource<Type>(network.sched_, value) { : internal::ConstantSource<Type>(network.sched_, value) {
SetPolicy(policy);
network.sources_.push_back(this); network.sources_.push_back(this);
} }
}; };
...@@ -928,6 +1083,7 @@ class Network : public internal::ClockListener { ...@@ -928,6 +1083,7 @@ class Network : public internal::ClockListener {
int sink_count_; int sink_count_;
int slices_; int slices_;
internal::Scheduler * sched_; internal::Scheduler * sched_;
embb::mtapi::ExecutionPolicy policy_;
#if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY
std::vector<int> spawn_history_[Slices]; std::vector<int> spawn_history_[Slices];
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment