Commit 4dadd0cd by Marcus Winter

dataflow_cpp: removed Network::Make for simpler usage, added Network:IsValid to…

dataflow_cpp: removed Network::Make for simpler usage, added Network:IsValid to check network for errors
tutorial: adapted to reflect changes to dataflow_cpp
examples: adapted to reflect changes to dataflow_cpp
parent ed0ecbf4
......@@ -31,20 +31,10 @@ namespace embb {
namespace dataflow {
namespace internal {
class Scheduler;
class ClockListener;
struct InitData {
int slices;
Scheduler * sched;
ClockListener * sink_listener;
};
class ClockListener {
public:
virtual ~ClockListener() {}
virtual void OnClock(int /*clock*/) = 0;
virtual void OnInit(InitData * /*sched*/) = 0;
};
} // namespace internal
......
......@@ -46,7 +46,10 @@ class ConstantSource
Type value_;
public:
explicit ConstantSource(Type value) : value_(value) {}
explicit ConstantSource(Network & network, Type value)
: value_(value) {
SetScheduler(network.GetScheduler());
}
virtual bool HasOutputs() const {
return outputs_.Size() > 0;
......@@ -56,9 +59,8 @@ class ConstantSource
GetOutput<0>().Send(Signal<Type>(clock, value_));
}
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
GetOutput<0>().SendInit(init_data);
virtual bool IsFullyConnected() {
return outputs_.IsFullyConnected();
}
virtual bool Start(int clock) {
......
......@@ -113,11 +113,6 @@ class In {
lock_.Unlock();
#endif
}
void ReceiveInit(InitData * init_data) {
SetSlices(init_data->slices);
listener_->OnInit(init_data);
}
};
} // namespace internal
......
......@@ -56,7 +56,9 @@ class Inputs<embb::base::internal::Nil, embb::base::internal::Nil,
bool AreNoneBlank(int /*clock*/) { return false; }
bool AreAtClock(int /*clock*/) { return true; }
virtual void OnClock(int /*clock*/) {}
virtual void OnInit(InitData * /*init_data*/) {}
bool IsFullyConnected() {
return true;
}
};
template <typename T1>
......@@ -67,8 +69,14 @@ class Inputs<T1, embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil>
, public ClockListener {
public:
Inputs() : count_(NULL) {
test_count_ = 1;
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 1;
}
this->template Get<0>().SetSlices(slices_);
}
~Inputs() {
if (NULL != count_) {
......@@ -98,21 +106,11 @@ class Inputs<T1, embb::base::internal::Nil, embb::base::internal::Nil,
listener_->OnClock(clock);
}
}
virtual void OnInit(InitData * init_data) {
if (--test_count_ == 0) {
slices_ = init_data->slices;
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 1;
}
listener_->OnInit(init_data);
}
bool IsFullyConnected() {
return this->template Get<0>().IsConnected();
}
private:
embb::base::Atomic<int> * count_;
int test_count_;
ClockListener * listener_;
int slices_;
};
......@@ -124,8 +122,15 @@ class Inputs<T1, T2, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil>
, public ClockListener {
public:
Inputs() : count_(NULL) {
test_count_ = 2;
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 2;
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
}
~Inputs() {
if (NULL != count_) {
......@@ -159,21 +164,12 @@ class Inputs<T1, T2, embb::base::internal::Nil,
listener_->OnClock(clock);
}
}
virtual void OnInit(InitData * init_data) {
if (--test_count_ == 0) {
slices_ = init_data->slices;
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 2;
}
listener_->OnInit(init_data);
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &
this->template Get<1>().IsConnected();
}
}
private:
private:
embb::base::Atomic<int> * count_;
int test_count_;
ClockListener * listener_;
int slices_;
};
......@@ -185,8 +181,16 @@ class Inputs<T1, T2, T3, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil>
, public ClockListener {
public:
Inputs() : count_(NULL) {
test_count_ = 3;
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 3;
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
this->template Get<2>().SetSlices(slices_);
}
~Inputs() {
if (NULL != count_) {
......@@ -224,21 +228,13 @@ class Inputs<T1, T2, T3, embb::base::internal::Nil,
listener_->OnClock(clock);
}
}
virtual void OnInit(InitData * init_data) {
if (--test_count_ == 0) {
slices_ = init_data->slices;
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 3;
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &
this->template Get<1>().IsConnected() &
this->template Get<2>().IsConnected();
}
listener_->OnInit(init_data);
}
}
private:
private:
embb::base::Atomic<int> * count_;
int test_count_;
ClockListener * listener_;
int slices_;
};
......@@ -249,8 +245,17 @@ class Inputs<T1, T2, T3, T4, embb::base::internal::Nil>
In<T4>, embb::base::internal::Nil>
, public ClockListener {
public:
Inputs() : count_(NULL) {
test_count_ = 4;
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 4;
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
this->template Get<2>().SetSlices(slices_);
this->template Get<3>().SetSlices(slices_);
}
~Inputs() {
if (NULL != count_) {
......@@ -292,21 +297,14 @@ class Inputs<T1, T2, T3, T4, embb::base::internal::Nil>
listener_->OnClock(clock);
}
}
virtual void OnInit(InitData * init_data) {
if (--test_count_ == 0) {
slices_ = init_data->slices;
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 4;
}
listener_->OnInit(init_data);
}
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &
this->template Get<1>().IsConnected() &
this->template Get<2>().IsConnected() &
this->template Get<3>().IsConnected();
}
private:
embb::base::Atomic<int> * count_;
int test_count_;
ClockListener * listener_;
int slices_;
};
......@@ -318,8 +316,18 @@ class Inputs
In<T4>, In<T5> >
, public ClockListener {
public:
Inputs() : count_(NULL) {
test_count_ = 5;
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 5;
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
this->template Get<2>().SetSlices(slices_);
this->template Get<3>().SetSlices(slices_);
this->template Get<4>().SetSlices(slices_);
}
~Inputs() {
if (NULL != count_) {
......@@ -365,21 +373,15 @@ class Inputs
listener_->OnClock(clock);
}
}
virtual void OnInit(InitData * init_data) {
if (--test_count_ == 0) {
slices_ = init_data->slices;
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 5;
}
listener_->OnInit(init_data);
}
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected() &
this->template Get<2>().IsConnected() &
this->template Get<3>().IsConnected() &
this->template Get<4>().IsConnected();
}
private:
embb::base::Atomic<int> * count_;
int test_count_;
ClockListener * listener_;
int slices_;
};
......
......@@ -43,11 +43,11 @@ class Node {
virtual bool HasInputs() const { return false; }
virtual bool HasOutputs() const { return false; }
virtual void Run(int clock) = 0;
virtual bool IsFullyConnected() = 0;
virtual bool Start(int /*clock*/) {
EMBB_THROW(embb::base::ErrorException,
"Nodes are started implicitly.");
}
virtual void Init(InitData * init_data) = 0;
protected:
Scheduler * sched_;
......
......@@ -52,12 +52,6 @@ class Out {
}
}
void SendInit(InitData * init_data) {
for (size_t ii = 0; ii < targets_.size(); ii++) {
targets_[ii]->ReceiveInit(init_data);
}
}
void Connect(InType & input) {
if (input.IsConnected()) {
EMBB_THROW(embb::base::ErrorException,
......@@ -72,6 +66,10 @@ class Out {
Connect(input);
}
bool IsConnected() const {
return targets_.size() > 0;
}
private:
std::vector< InType * > targets_;
};
......
......@@ -50,6 +50,9 @@ class Outputs<embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil> {
public:
bool IsFullyConnected() {
return true;
}
};
template <typename T1>
......@@ -59,6 +62,9 @@ class Outputs<T1, embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil> {
public:
bool IsFullyConnected() {
return this->template Get<0>().IsConnected();
}
};
template <typename T1, typename T2>
......@@ -67,6 +73,10 @@ class Outputs<T1, T2, embb::base::internal::Nil,
: public Tuple<Out<T1>, Out<T2>, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil> {
public:
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected();
}
};
template <typename T1, typename T2, typename T3>
......
......@@ -53,10 +53,11 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
typedef ProcessExecutor< InputsType, OutputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
explicit Process(FunctionType function)
: executor_(function)
explicit Process(Network & network, FunctionType function)
: inputs_(network.GetSlices())
, executor_(function)
, action_(NULL)
, slices_(0) {
, slices_(network.GetSlices()) {
next_clock_ = 0;
queued_clock_ = 0;
bool ordered = Serial;
......@@ -66,6 +67,13 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
queue_id_ = 0;
}
inputs_.SetListener(this);
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
SetScheduler(network.GetScheduler());
}
~Process() {
......@@ -86,17 +94,8 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
executor_.Execute(clock, inputs_, outputs_);
}
virtual void Init(InitData * init_data) {
slices_ = init_data->slices;
//inputs_.SetSlices(init_data->slices);
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
SetScheduler(init_data->sched);
executor_.Init(init_data, outputs_);
virtual bool IsFullyConnected() {
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
}
InputsType & GetInputs() {
......@@ -161,10 +160,6 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
}
}
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private:
InputsType inputs_;
OutputsType outputs_;
......
......@@ -62,10 +62,6 @@ class ProcessExecutor< Inputs<I1>, Outputs<O1> > {
}
}
void Init(InitData * init_data, Outputs<O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -95,11 +91,6 @@ class ProcessExecutor< Inputs<I1>, Outputs<O1, O2> > {
}
}
void Init(InitData * init_data, Outputs<O1, O2> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -133,12 +124,6 @@ class ProcessExecutor< Inputs<I1>, Outputs<O1, O2, O3> > {
}
}
void Init(InitData * init_data, Outputs<O1, O2, O3> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -176,13 +161,6 @@ class ProcessExecutor< Inputs<I1>, Outputs<O1, O2, O3, O4> > {
}
}
void Init(InitData * init_data, Outputs<O1, O2, O3, O4> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
outputs.template Get<3>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -211,10 +189,6 @@ class ProcessExecutor< Inputs<I1, I2>, Outputs<O1> > {
}
}
void Init(InitData * init_data, Outputs<O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -246,11 +220,6 @@ class ProcessExecutor< Inputs<I1, I2>, Outputs<O1, O2> > {
}
}
void Init(InitData * init_data, Outputs<O1, O2> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -286,12 +255,6 @@ class ProcessExecutor< Inputs<I1, I2>, Outputs<O1, O2, O3> > {
}
}
void Init(InitData * init_data, Outputs<O1, O2, O3> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -321,10 +284,6 @@ class ProcessExecutor< Inputs<I1, I2, I3>, Outputs<O1> > {
}
}
void Init(InitData * init_data, Outputs<O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -358,11 +317,6 @@ class ProcessExecutor< Inputs<I1, I2, I3>, Outputs<O1, O2> > {
}
}
void Init(InitData * init_data, Outputs<O1, O2> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -394,10 +348,6 @@ class ProcessExecutor< Inputs<I1, I2, I3, I4>, Outputs<O1> > {
}
}
void Init(InitData * init_data, Outputs<O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private:
FunctionType function_;
};
......
......@@ -34,6 +34,9 @@
namespace embb {
namespace dataflow {
class Network;
namespace internal {
template <typename Type>
......@@ -44,8 +47,10 @@ class Select
typedef Inputs<bool, Type, Type> InputsType;
typedef Outputs<Type> OutputsType;
Select() {
Select(Network & network) : inputs_(network.GetSlices()) {
inputs_.SetListener(this);
slices_ = network.GetSlices();
SetScheduler(network.GetScheduler());
}
virtual bool HasInputs() const {
......@@ -80,11 +85,8 @@ class Select
}
}
virtual void Init(InitData * init_data) {
slices_ = init_data->slices;
//inputs_.SetSlices(slices_);
SetScheduler(init_data->sched);
GetOutput<0>().SendInit(init_data);
virtual bool IsFullyConnected() {
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
}
InputsType & GetInputs() {
......@@ -117,10 +119,6 @@ class Select
Run(clock);
}
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private:
InputsType inputs_;
OutputsType outputs_;
......
......@@ -48,13 +48,23 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
typedef SinkExecutor< InputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
explicit Sink(FunctionType function)
: executor_(function)
, action_(NULL) {
explicit Sink(Network & network, FunctionType function)
: inputs_(network.GetSlices())
, executor_(function)
, action_(NULL)
, slices_(network.GetSlices()) {
next_clock_ = 0;
queued_clock_ = 0;
queue_id_ = GetNextProcessID();
inputs_.SetListener(this);
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
SetListener(&network);
SetScheduler(network.GetScheduler());
}
~Sink() {
......@@ -78,17 +88,8 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
listener_->OnClock(clock);
}
virtual void Init(InitData * init_data) {
slices_ = init_data->slices;
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
SetListener(init_data->sink_listener);
SetScheduler(init_data->sched);
listener_->OnInit(init_data);
virtual bool IsFullyConnected() {
return inputs_.IsFullyConnected();
}
InputsType & GetInputs() {
......@@ -132,10 +133,6 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
}
}
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private:
InputsType inputs_;
ExecutorType executor_;
......
......@@ -48,8 +48,9 @@ class Source< Outputs<O1, O2, O3, O4, O5> >
typedef SourceExecutor< OutputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
explicit Source(FunctionType function)
explicit Source(Network & network, FunctionType function)
: executor_(function), not_done_(true) {
SetScheduler(network.GetScheduler());
}
virtual bool HasOutputs() const {
......@@ -60,9 +61,8 @@ class Source< Outputs<O1, O2, O3, O4, O5> >
not_done_ = executor_.Execute(clock, outputs_);
}
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
executor_.Init(init_data, outputs_);
virtual bool IsFullyConnected() {
return outputs_.IsFullyConnected();
}
virtual bool Start(int clock) {
......
......@@ -36,8 +36,6 @@ namespace embb {
namespace dataflow {
namespace internal {
class Scheduler;
template <class OUTPUTS>
class SourceExecutor;
......@@ -59,10 +57,6 @@ class SourceExecutor< Outputs<O1> > {
return result;
}
void Init(InitData * init_data, Outputs<O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -87,11 +81,6 @@ class SourceExecutor< Outputs<O1, O2> > {
return result;
}
void Init(InitData * init_data, Outputs<O1, O2> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -118,12 +107,6 @@ class SourceExecutor< Outputs<O1, O2, O3> > {
return result;
}
void Init(InitData * init_data, Outputs<O1, O2, O3> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -152,13 +135,6 @@ class SourceExecutor< Outputs<O1, O2, O3, O4> > {
return result;
}
void Init(InitData * init_data, Outputs<O1, O2, O3, O4> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
outputs.template Get<3>().SendInit(init_data);
}
private:
FunctionType function_;
};
......@@ -190,15 +166,6 @@ class SourceExecutor< Outputs<O1, O2, O3, O4, O5> > {
return result;
}
void Init(
InitData * init_data, Outputs<O1, O2, O3, O4, O5> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
outputs.template Get<3>().SendInit(init_data);
outputs.template Get<4>().SendInit(init_data);
}
private:
FunctionType function_;
};
......
......@@ -34,6 +34,9 @@
namespace embb {
namespace dataflow {
class Network;
namespace internal {
template <typename Type>
......@@ -44,8 +47,9 @@ class Switch
typedef Inputs<bool, Type> InputsType;
typedef Outputs<Type, Type> OutputsType;
Switch() {
Switch(Network & network) : inputs_(network.GetSlices()) {
inputs_.SetListener(this);
SetScheduler(network.GetScheduler());
}
virtual bool HasInputs() const {
......@@ -77,11 +81,8 @@ class Switch
}
}
virtual void Init(InitData * init_data) {
//inputs_.SetSlices(init_data->slices);
SetScheduler(init_data->sched);
GetOutput<0>().SendInit(init_data);
GetOutput<1>().SendInit(init_data);
virtual bool IsFullyConnected() {
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
}
InputsType & GetInputs() {
......@@ -114,10 +115,6 @@ class Switch
Run(clock);
}
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private:
InputsType inputs_;
OutputsType outputs_;
......
......@@ -58,8 +58,9 @@ class Network {
public:
/**
* Constructs an empty network.
* \param slices Number of concurrent tokens allowed in the network.
*/
Network() {}
Network(int slices) {}
/**
* Input port class.
......@@ -196,9 +197,10 @@ class Network {
/**
* Constructs a SerialProcess with a user specified processing function.
* \param network The network this node is going to be part of.
* \param function The Function to call to process a token.
*/
explicit SerialProcess(FunctionType function);
explicit SerialProcess(Network & network, FunctionType function);
/**
* \returns \c true if the SerialProcess has any inputs, \c false
......@@ -277,9 +279,10 @@ class Network {
/**
* Constructs a ParallelProcess with a user specified processing function.
* \param network The network this node is going to be part of.
* \param function The Function to call to process a token.
*/
explicit ParallelProcess(FunctionType function);
explicit ParallelProcess(Network & network, FunctionType function);
/**
* \returns \c true if the ParallelProcess has any inputs, \c false
......@@ -339,6 +342,7 @@ class Network {
*/
template<typename Type>
class Switch {
public:
/**
* Function type to use when processing tokens.
*/
......@@ -355,6 +359,12 @@ class Network {
typedef Outputs<Type> OutputsType;
/**
* Constructs a Switch process.
* \param network The network this node is going to be part of.
*/
explicit Select(Network & network);
/**
* \returns Always \c true.
*/
virtual bool HasInputs() const;
......@@ -410,6 +420,7 @@ class Network {
*/
template<typename Type>
class Select {
public:
/**
* Function type to use when processing tokens.
*/
......@@ -426,6 +437,12 @@ class Network {
typedef Outputs<Type> OutputsType;
/**
* Constructs a Select process.
* \param network The network this node is going to be part of.
*/
explicit Select(Network & network);
/**
* \returns Always \c true.
*/
virtual bool HasInputs() const;
......@@ -500,9 +517,10 @@ class Network {
/**
* Constructs a Sink with a user specified processing function.
* \param network The network this node is going to be part of.
* \param function The Function to call to process a token.
*/
explicit Sink(FunctionType function);
explicit Sink(Network & network, FunctionType function);
/**
* \returns Always \c true.
......@@ -559,9 +577,10 @@ class Network {
/**
* Constructs a Source with a user specified processing function.
* \param network The network this node is going to be part of.
* \param function The Function to call to emit a token.
*/
explicit Source(FunctionType function);
explicit Source(Network & network, FunctionType function);
/**
* \returns Always \c false.
......@@ -594,13 +613,6 @@ class Network {
};
/**
* Adds a new source process to the network.
* \param source The source process to add.
*/
template<typename O1, typename O2, typename O3, typename O4, typename O5>
void AddSource(Source<O1, O2, O3, O4, O5> & source);
/**
* Constant source process template.
*
* A constant source has one output port and emits a constant value given
......@@ -618,9 +630,10 @@ class Network {
/**
* Constructs a ConstantSource with a value to emit on each token.
* \param network The network this node is going to be part of.
* \param value The value to emit.
*/
explicit ConstantSource(Type value);
explicit ConstantSource(Network & network, Type value);
/**
* \returns Always \c false.
......@@ -653,19 +666,10 @@ class Network {
};
/**
* Adds a new constant source process to the network.
* \param source The constant source process to add.
* Checks whether the network is completely connected and free of cycles.
* \returns \c true if everything is in order, \c false if not.
*/
template<typename Type>
void AddSource(ConstantSource<Type> & source);
/**
* Builds the network for usage with \c slices concurrent tokens. This
* function needs to be called after adding all sources and before
* executing the network.
* \param slices Number of concurrent tokens allowed in the network.
*/
void Make(int slices);
bool IsValid();
/**
* Executes the network until one of the the sources returns \c false.
......@@ -677,12 +681,25 @@ class Network {
class Network : public internal::ClockListener {
public:
Network() : sched_(NULL) {}
Network(int slices) : sink_counter_(NULL), slices_(slices), sched_(NULL) {
sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0;
}
sink_count_ = 0;
}
~Network() {
if (NULL != sched_) {
embb::base::Allocation::Delete<internal::Scheduler>(sched_);
embb::base::Allocation::Delete(sched_);
sched_ = NULL;
}
if (NULL != sink_counter_) {
embb::base::Allocation::Free(sink_counter_);
sink_counter_ = NULL;
}
}
......@@ -690,16 +707,18 @@ class Network : public internal::ClockListener {
typename T3 = embb::base::internal::Nil,
typename T4 = embb::base::internal::Nil,
typename T5 = embb::base::internal::Nil>
struct Inputs {
typedef internal::Inputs<T1, T2, T3, T4, T5> Type;
class Inputs : public internal::Inputs<T1, T2, T3, T4, T5> {
public:
explicit Inputs(int slices) : internal::Inputs(slices) {}
};
template <typename T1, typename T2 = embb::base::internal::Nil,
typename T3 = embb::base::internal::Nil,
typename T4 = embb::base::internal::Nil,
typename T5 = embb::base::internal::Nil>
struct Outputs {
typedef internal::Outputs<T1, T2, T3, T4, T5> Type;
class Outputs : public internal::Outputs<T1, T2, T3, T4, T5> {
public:
explicit Outputs() : internal::Outputs() {}
};
template <class Inputs, class Outputs> class SerialProcess;
......@@ -707,8 +726,8 @@ class Network : public internal::ClockListener {
template <
typename I1, typename I2, typename I3, typename I4, typename I5,
typename O1, typename O2, typename O3, typename O4, typename O5>
class SerialProcess< internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >
class SerialProcess< Inputs<I1, I2, I3, I4, I5>,
Outputs<O1, O2, O3, O4, O5> >
: public internal::Process< true,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> > {
......@@ -717,11 +736,11 @@ class Network : public internal::ClockListener {
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType
FunctionType;
explicit SerialProcess(FunctionType function)
explicit SerialProcess(Network & network, FunctionType function)
: internal::Process< true,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >(function) {
//empty
internal::Outputs<O1, O2, O3, O4, O5> >(network, function) {
network.processes_.push_back(this);
}
};
......@@ -730,8 +749,8 @@ class Network : public internal::ClockListener {
template <
typename I1, typename I2, typename I3, typename I4, typename I5,
typename O1, typename O2, typename O3, typename O4, typename O5>
class ParallelProcess< internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >
class ParallelProcess< Inputs<I1, I2, I3, I4, I5>,
Outputs<O1, O2, O3, O4, O5> >
: public internal::Process< false,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >{
......@@ -740,22 +759,30 @@ class Network : public internal::ClockListener {
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType
FunctionType;
explicit ParallelProcess(FunctionType function)
explicit ParallelProcess(Network & network, FunctionType function)
: internal::Process< false,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >(function) {
//empty
internal::Outputs<O1, O2, O3, O4, O5> >(network, function) {
network.processes_.push_back(this);
}
};
template<typename Type>
class Switch : public internal::Switch<Type> {
public:
explicit Switch(Network & network)
: internal::Switch<Type>(network) {
network.processes_.push_back(this);
}
};
template<typename Type>
class Select : public internal::Select<Type> {
public:
explicit Select(Network & network)
: internal::Select<Type>(network) {
network.processes_.push_back(this);
}
};
template<typename I1, typename I2 = embb::base::internal::Nil,
......@@ -768,10 +795,11 @@ class Network : public internal::ClockListener {
typedef typename internal::Sink<
internal::Inputs<I1, I2, I3, I4, I5> >::FunctionType FunctionType;
explicit Sink(FunctionType function)
explicit Sink(Network & network, FunctionType function)
: internal::Sink<
internal::Inputs<I1, I2, I3, I4, I5> >(function) {
//empty
internal::Inputs<I1, I2, I3, I4, I5> >(network, function) {
network.sinks_.push_back(this);
network.sink_count_++;
}
};
......@@ -786,62 +814,45 @@ class Network : public internal::ClockListener {
internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType
FunctionType;
explicit Source(FunctionType function)
explicit Source(Network & network, FunctionType function)
: internal::Source<
internal::Outputs<O1, O2, O3, O4, O5> >(function) {
//empty
internal::Outputs<O1, O2, O3, O4, O5> >(network, function) {
network.sources_.push_back(this);
}
};
template<typename O1, typename O2, typename O3, typename O4, typename O5>
void AddSource(Source<O1, O2, O3, O4, O5> & source) {
sources_.push_back(&source);
}
template<typename Type>
class ConstantSource : public internal::ConstantSource<Type> {
public:
explicit ConstantSource(Type value)
: internal::ConstantSource<Type>(value) {
//empty
explicit ConstantSource(Network & network, Type value)
: internal::ConstantSource<Type>(network, value) {
network.sources_.push_back(this);
}
};
template<typename Type>
void AddSource(ConstantSource<Type> & source) {
sources_.push_back(&source);
int GetSlices() const {
return slices_;
}
void Make(int slices) {
slices_ = slices;
sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
internal::InitData init_data;
init_data.slices = slices_;
init_data.sched = sched_;
init_data.sink_listener = this;
sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0;
internal::Scheduler * GetScheduler() const {
return sched_;
}
sink_count_ = 0;
for (size_t it = 0; it < sources_.size(); it++)
sources_[it]->Init(&init_data);
for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0;
bool IsValid() {
bool valid = true;
for (size_t ii = 0; ii < sources_.size(); ii++) {
valid = valid & sources_[ii]->IsFullyConnected();
}
for (size_t ii = 0; ii < processes_.size(); ii++) {
valid = valid & processes_[ii]->IsFullyConnected();
}
void operator () () {
if (NULL == sched_) {
throw embb::base::ErrorException("Network was not properly prepared");
for (size_t ii = 0; ii < sinks_.size(); ii++) {
valid = valid & sinks_[ii]->IsFullyConnected();
}
return valid;
}
void operator () () {
int clock = 0;
while (clock >= 0) {
const int idx = clock % slices_;
......@@ -873,15 +884,6 @@ class Network : public internal::ClockListener {
--sink_counter_[idx];
}
/**
* Internal.
* \internal
* Gets called when an init token has reached all sinks.
*/
virtual void OnInit(internal::InitData * /*sched*/) {
sink_count_++;
}
private:
std::vector<internal::Node*> processes_;
std::vector<internal::Node*> sources_;
......
......@@ -42,12 +42,12 @@
typedef embb::dataflow::Network MyNetwork;
typedef MyNetwork::ConstantSource< int > MyConstantSource;
typedef MyNetwork::Source< int > MySource;
typedef MyNetwork::SerialProcess< MyNetwork::Inputs<int>::Type,
MyNetwork::Outputs<bool>::Type > MyPred;
typedef MyNetwork::ParallelProcess< MyNetwork::Inputs<int>::Type,
MyNetwork::Outputs<int>::Type > MyFilter;
typedef MyNetwork::ParallelProcess< MyNetwork::Inputs<int, int>::Type,
MyNetwork::Outputs<int>::Type > MyMult;
typedef MyNetwork::SerialProcess< MyNetwork::Inputs<int>,
MyNetwork::Outputs<bool> > MyPred;
typedef MyNetwork::ParallelProcess< MyNetwork::Inputs<int>,
MyNetwork::Outputs<int> > MyFilter;
typedef MyNetwork::ParallelProcess< MyNetwork::Inputs<int, int>,
MyNetwork::Outputs<int> > MyMult;
typedef MyNetwork::Sink< int > MySink;
typedef MyNetwork::Switch< int > MySwitch;
typedef MyNetwork::Select< int > MySelect;
......@@ -165,15 +165,15 @@ void SimpleTest::TestBasic() {
for (int ii = 0; ii < 10000; ii++) {
ArraySink<TEST_COUNT> asink;
MyNetwork network;
MyConstantSource constant(4);
MySource source(embb::base::MakeFunction(sourceFunc));
MyFilter filter(embb::base::MakeFunction(filterFunc));
MyMult mult(embb::base::MakeFunction(multFunc));
MySink sink(embb::base::MakeFunction(asink, &ArraySink<TEST_COUNT>::Run));
MyPred pred(embb::base::MakeFunction(predFunc));
MySwitch sw;
MySelect sel;
MyNetwork network(NUM_SLICES);
MyConstantSource constant(network, 4);
MySource source(network, embb::base::MakeFunction(sourceFunc));
MyFilter filter(network, embb::base::MakeFunction(filterFunc));
MyMult mult(network, embb::base::MakeFunction(multFunc));
MySink sink(network, embb::base::MakeFunction(asink, &ArraySink<TEST_COUNT>::Run));
MyPred pred(network, embb::base::MakeFunction(predFunc));
MySwitch sw(network);
MySelect sel(network);
for (int kk = 0; kk < TEST_COUNT; kk++) {
source_array[kk] = -1;
......@@ -208,12 +208,15 @@ void SimpleTest::TestBasic() {
sel.GetOutput<0>() >> sink.GetInput<0>();
network.AddSource(constant);
network.AddSource(source);
// network.AddSource(constant);
// network.AddSource(source);
network.Make(NUM_SLICES);
// network.Initialize(NUM_SLICES);
try {
if (!network.IsValid()) {
EMBB_THROW(embb::base::ErrorException, "network is invalid");
}
network();
} catch (embb::base::ErrorException & e) {
PT_ASSERT_MSG(false, e.What());
......
......@@ -6,15 +6,14 @@
Network::Source<int>
source1(
network,
embb::base::MakeFunction(producer1, &Producer<int>::Run) ),
source2(
network,
embb::base::MakeFunction(producer2, &Producer<int>::Run) ),
source3(
network,
embb::base::MakeFunction(producer3, &Producer<int>::Run) ),
source4(
network,
embb::base::MakeFunction(producer4, &Producer<int>::Run) );
nw.AddSource(source1);
nw.AddSource(source2);
nw.AddSource(source3);
nw.AddSource(source4);
Network::ParallelProcess<
Network::Inputs<std::string>::Type,
Network::Outputs<std::string>::Type> replace(
embb::base::MakeFunction(ReplaceFunction)
Network::Inputs<std::string>,
Network::Outputs<std::string> > replace(
network, embb::base::MakeFunction(ReplaceFunction)
);
Network::Sink<std::string> write(
embb::base::MakeFunction(SinkFunction)
network, embb::base::MakeFunction(SinkFunction)
);
Network::Source<std::string> read(
embb::base::MakeFunction(SourceFunction)
network, embb::base::MakeFunction(SourceFunction)
);
......@@ -52,11 +52,10 @@ std::string with("hello");
#include "dataflow/dataflow_sink_function-snippet.h"
void RunDataflowLinear() {
#include "dataflow/dataflow_make-snippet.h"
#include "dataflow/dataflow_declare_source-snippet.h"
#include "dataflow/dataflow_declare_replace-snippet.h"
#include "dataflow/dataflow_declare_sink-snippet.h"
#include "dataflow/dataflow_connect-snippet.h"
#include "dataflow/dataflow_add-snippet.h"
#include "dataflow/dataflow_make-snippet.h"
#include "dataflow/dataflow_run-snippet.h"
}
typedef embb::dataflow::Network Network;
static Network nw;
......@@ -48,22 +48,28 @@ static int SimpleRand(int & seed) {
#include "dataflow/dataflow_network-snippet.h"
void RunDataflowNonLinear() {
#include "dataflow/dataflow_make-snippet.h"
#include "dataflow/dataflow_declare_add_sources-snippet.h"
Comparator<int> comparator;
Network::ParallelProcess<
Network::Inputs<int, int>::Type, Network::Outputs<int, int>::Type>
process1( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ),
process2( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ),
process3( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ),
process4( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ),
process5( embb::base::MakeFunction(comparator, &Comparator<int>::Run) );
Network::Inputs<int, int>, Network::Outputs<int, int> >
process1(network,
embb::base::MakeFunction(comparator, &Comparator<int>::Run)),
process2(network,
embb::base::MakeFunction(comparator, &Comparator<int>::Run)),
process3(network,
embb::base::MakeFunction(comparator, &Comparator<int>::Run)),
process4(network,
embb::base::MakeFunction(comparator, &Comparator<int>::Run)),
process5(network,
embb::base::MakeFunction(comparator, &Comparator<int>::Run));
Consumer<int> consumer;
Network::Sink<int, int, int, int>
sink1(embb::base::MakeFunction(consumer, &Consumer<int>::Run));
sink1(network, embb::base::MakeFunction(consumer, &Consumer<int>::Run));
source1.GetOutput<0>() >> process1.GetInput<0>();
source2.GetOutput<0>() >> process2.GetInput<0>();
......@@ -83,6 +89,5 @@ void RunDataflowNonLinear() {
process5.GetOutput<1>() >> sink1.GetInput<2>();
process4.GetOutput<1>() >> sink1.GetInput<3>();
nw.Make(4);
nw();
#include "dataflow/dataflow_run-snippet.h"
}
......@@ -108,6 +108,10 @@ Then, we have to construct a \emph{network}. A network consists of a set of proc
%
\\\inputlisting{../examples/dataflow/dataflow_network-snippet.h}
%
We need to prepare the network for the desired maximum number of elements that can be in the network at a time. The number of elements is limited to avoid that the network is flooded with new elements before the previous elements have been processed. In a linear pipeline, for example, this may happen if the source is faster than the sink. In our example, at most four elements may be processed simultaneously: one in the source, one in the sink, and two in the middle stage (see below). Finding an optimal value depends on the application and usually requires some experimentation. In general, large values boost the throughput but also increase the latency. Conversely, small values reduce the latency but may lead to a drop of performance in terms of throughput:
%
\\\inputlisting{../examples/dataflow/dataflow_make-snippet.h}
%
As the next step, we have to construct the processes shown in Figure~\ref{fig:replace_par}. The easiest way to construct a process is to wrap the user-defined code in a lambda function and to pass it to the network. The network constructs an object for that process and executes the lambda function whenever new data is available. There are several methods for constructing processes depending on their type. The process \textbf{read} is a \emph{source} process, since it produces data (by reading it from the specified file) but does not consume any data. Source processes are constructed from a function object
%
\\\inputlisting{../examples/dataflow/dataflow_source_function-snippet.h}
......@@ -142,19 +146,10 @@ is used to construct the sink:
\emph{\textbf{Note:} If you parallelize your own application using \embb and your compiler emits a lengthy error message containing lots of templates, it is very likely that for at least one process, the ports and their directions do not match the signature of the given function.}
The network needs to know about the source declared above, so we add it to our network:
%
\\\inputlisting{../examples/dataflow/dataflow_add-snippet.h}
%
As the last step, we have to connect the processes (ports). This is straightforward using the C++ stream operator:
%
\\\inputlisting{../examples/dataflow/dataflow_connect-snippet.h}
%
Once all connections have been established, we need to prepare the network for the desired maximum number of elements that can be in the network at a time. The number of elements is limited to avoid that the network is flooded with new elements before the previous elements have been processed. In a linear pipeline, for example, this may happen if the source is faster than the sink. In our example, at most four elements may be processed simultaneously: one in the source, one in the sink, and two in the middle stage (see above). Finding an optimal value depends on the application and usually requires some experimentation. In general, large values boost the throughput but also increase the latency. Conversely, small values reduce the latency but may lead to a drop of performance in terms of throughput:
%
\\\inputlisting{../examples/dataflow/dataflow_make-snippet.h}
%
Then we can start the network:
%
\\\inputlisting{../examples/dataflow/dataflow_run-snippet.h}
......
......@@ -41,7 +41,7 @@ namespace {
static embb::tasks::Node * node_instance = NULL;
#if TASKS_CPP_AUTOMATIC_INITIALIZE
static embb::base::Mutex init_mutex;
static embb_spinlock_t init_mutex = { 0 };
#endif
}
......@@ -207,13 +207,13 @@ bool Node::IsInitialized() {
Node & Node::GetInstance() {
#if TASKS_CPP_AUTOMATIC_INITIALIZE
if (!IsInitialized()) {
init_mutex.Lock();
embb_spin_lock(&init_mutex);
if (!IsInitialized()) {
Node::Initialize(
TASKS_CPP_AUTOMATIC_DOMAIN_ID, TASKS_CPP_AUTOMATIC_NODE_ID);
atexit(Node::Finalize);
}
init_mutex.Unlock();
embb_spin_unlock(&init_mutex);
}
return *node_instance;
#else
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment