Commit 779135d5 by Marcus Winter

Merge remote-tracking branch 'origin/development' into mtapi_distributed

parents 7d3357c9 53859644
...@@ -31,10 +31,19 @@ namespace embb { ...@@ -31,10 +31,19 @@ namespace embb {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
class Scheduler;
class ClockListener;
struct InitData {
Scheduler * sched;
ClockListener * sink_listener;
};
class ClockListener { class ClockListener {
public: public:
virtual ~ClockListener() {} virtual ~ClockListener() {}
virtual void OnClock(int /*clock*/) = 0; virtual void OnClock(int /*clock*/) = 0;
virtual void OnInit(InitData * /*sched*/) = 0;
}; };
} // namespace internal } // namespace internal
......
...@@ -56,6 +56,11 @@ class ConstantSource ...@@ -56,6 +56,11 @@ class ConstantSource
GetOutput<0>().Send(Signal<Type>(clock, value_)); GetOutput<0>().Send(Signal<Type>(clock, value_));
} }
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
GetOutput<0>().SendInit(init_data);
}
virtual bool Start(int clock) { virtual bool Start(int clock) {
Run(clock); Run(clock);
return true; return true;
......
...@@ -39,6 +39,8 @@ namespace embb { ...@@ -39,6 +39,8 @@ namespace embb {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
class Scheduler;
template <typename, int> template <typename, int>
class Out; class Out;
...@@ -95,6 +97,10 @@ class In { ...@@ -95,6 +97,10 @@ class In {
lock_.Unlock(); lock_.Unlock();
#endif #endif
} }
void ReceiveInit(InitData * init_data) {
listener_->OnInit(init_data);
}
}; };
} // namespace internal } // namespace internal
......
...@@ -57,6 +57,7 @@ class Inputs<Slices, embb::base::internal::Nil, embb::base::internal::Nil, ...@@ -57,6 +57,7 @@ class Inputs<Slices, embb::base::internal::Nil, embb::base::internal::Nil,
bool AreNoneBlank(int /*clock*/) { return false; } bool AreNoneBlank(int /*clock*/) { return false; }
bool AreAtClock(int /*clock*/) { return true; } bool AreAtClock(int /*clock*/) { return true; }
virtual void OnClock(int /*clock*/) {} virtual void OnClock(int /*clock*/) {}
virtual void OnInit(InitData * /*init_data*/) {}
}; };
template <int Slices, typename T1> template <int Slices, typename T1>
...@@ -70,6 +71,7 @@ class Inputs<Slices, T1, embb::base::internal::Nil, embb::base::internal::Nil, ...@@ -70,6 +71,7 @@ class Inputs<Slices, T1, embb::base::internal::Nil, embb::base::internal::Nil,
Inputs() { Inputs() {
for (int ii = 0; ii < Slices; ii++) for (int ii = 0; ii < Slices; ii++)
count_[ii] = 1; count_[ii] = 1;
test_count_ = 1;
} }
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
...@@ -97,8 +99,14 @@ class Inputs<Slices, T1, embb::base::internal::Nil, embb::base::internal::Nil, ...@@ -97,8 +99,14 @@ class Inputs<Slices, T1, embb::base::internal::Nil, embb::base::internal::Nil,
listener_->OnClock(clock); listener_->OnClock(clock);
} }
} }
virtual void OnInit(InitData * init_data) {
if (--test_count_ == 0) {
listener_->OnInit(init_data);
}
}
private: private:
embb::base::Atomic<int> count_[Slices]; embb::base::Atomic<int> count_[Slices];
int test_count_;
ClockListener * listener_; ClockListener * listener_;
}; };
...@@ -112,6 +120,7 @@ class Inputs<Slices, T1, T2, embb::base::internal::Nil, ...@@ -112,6 +120,7 @@ class Inputs<Slices, T1, T2, embb::base::internal::Nil,
Inputs() { Inputs() {
for (int ii = 0; ii < Slices; ii++) for (int ii = 0; ii < Slices; ii++)
count_[ii] = 2; count_[ii] = 2;
test_count_ = 2;
} }
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
...@@ -143,8 +152,14 @@ class Inputs<Slices, T1, T2, embb::base::internal::Nil, ...@@ -143,8 +152,14 @@ class Inputs<Slices, T1, T2, embb::base::internal::Nil,
listener_->OnClock(clock); listener_->OnClock(clock);
} }
} }
virtual void OnInit(InitData * init_data) {
if (--test_count_ == 0) {
listener_->OnInit(init_data);
}
}
private: private:
embb::base::Atomic<int> count_[Slices]; embb::base::Atomic<int> count_[Slices];
int test_count_;
ClockListener * listener_; ClockListener * listener_;
}; };
...@@ -158,6 +173,7 @@ class Inputs<Slices, T1, T2, T3, embb::base::internal::Nil, ...@@ -158,6 +173,7 @@ class Inputs<Slices, T1, T2, T3, embb::base::internal::Nil,
Inputs() { Inputs() {
for (int ii = 0; ii < Slices; ii++) for (int ii = 0; ii < Slices; ii++)
count_[ii] = 3; count_[ii] = 3;
test_count_ = 3;
} }
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
...@@ -193,8 +209,14 @@ class Inputs<Slices, T1, T2, T3, embb::base::internal::Nil, ...@@ -193,8 +209,14 @@ class Inputs<Slices, T1, T2, T3, embb::base::internal::Nil,
listener_->OnClock(clock); listener_->OnClock(clock);
} }
} }
virtual void OnInit(InitData * init_data) {
if (--test_count_ == 0) {
listener_->OnInit(init_data);
}
}
private: private:
embb::base::Atomic<int> count_[Slices]; embb::base::Atomic<int> count_[Slices];
int test_count_;
ClockListener * listener_; ClockListener * listener_;
}; };
...@@ -207,6 +229,7 @@ class Inputs<Slices, T1, T2, T3, T4, embb::base::internal::Nil> ...@@ -207,6 +229,7 @@ class Inputs<Slices, T1, T2, T3, T4, embb::base::internal::Nil>
Inputs() { Inputs() {
for (int ii = 0; ii < Slices; ii++) for (int ii = 0; ii < Slices; ii++)
count_[ii] = 4; count_[ii] = 4;
test_count_ = 4;
} }
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
...@@ -246,8 +269,14 @@ class Inputs<Slices, T1, T2, T3, T4, embb::base::internal::Nil> ...@@ -246,8 +269,14 @@ class Inputs<Slices, T1, T2, T3, T4, embb::base::internal::Nil>
listener_->OnClock(clock); listener_->OnClock(clock);
} }
} }
virtual void OnInit(InitData * init_data) {
if (--test_count_ == 0) {
listener_->OnInit(init_data);
}
}
private: private:
embb::base::Atomic<int> count_[Slices]; embb::base::Atomic<int> count_[Slices];
int test_count_;
ClockListener * listener_; ClockListener * listener_;
}; };
...@@ -261,6 +290,7 @@ class Inputs ...@@ -261,6 +290,7 @@ class Inputs
Inputs() { Inputs() {
for (int ii = 0; ii < Slices; ii++) for (int ii = 0; ii < Slices; ii++)
count_[ii] = 5; count_[ii] = 5;
test_count_ = 5;
} }
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
...@@ -304,8 +334,14 @@ class Inputs ...@@ -304,8 +334,14 @@ class Inputs
listener_->OnClock(clock); listener_->OnClock(clock);
} }
} }
virtual void OnInit(InitData * init_data) {
if (--test_count_ == 0) {
listener_->OnInit(init_data);
}
}
private: private:
embb::base::Atomic<int> count_[Slices]; embb::base::Atomic<int> count_[Slices];
int test_count_;
ClockListener * listener_; ClockListener * listener_;
}; };
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include <cstddef> #include <cstddef>
#include <embb/base/exceptions.h> #include <embb/base/exceptions.h>
#include <embb/dataflow/internal/scheduler.h> #include <embb/dataflow/internal/scheduler.h>
#include <embb/dataflow/internal/clock_listener.h>
namespace embb { namespace embb {
namespace dataflow { namespace dataflow {
...@@ -46,12 +47,13 @@ class Node { ...@@ -46,12 +47,13 @@ class Node {
EMBB_THROW(embb::base::ErrorException, EMBB_THROW(embb::base::ErrorException,
"Nodes are started implicitly."); "Nodes are started implicitly.");
} }
void SetScheduler(Scheduler * sched) { sched_ = sched; } virtual void Init(InitData * init_data) = 0;
protected: protected:
Scheduler * sched_; Scheduler * sched_;
static int next_process_id_; static int next_process_id_;
void SetScheduler(Scheduler * sched) { sched_ = sched; }
static int GetNextProcessID() { return next_process_id_++; } static int GetNextProcessID() { return next_process_id_++; }
}; };
......
...@@ -35,6 +35,8 @@ namespace embb { ...@@ -35,6 +35,8 @@ namespace embb {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
class Scheduler;
template <typename Type, int Slices> template <typename Type, int Slices>
class Out { class Out {
public: public:
...@@ -50,6 +52,12 @@ class Out { ...@@ -50,6 +52,12 @@ class Out {
} }
} }
void SendInit(InitData * init_data) {
for (size_t ii = 0; ii < targets_.size(); ii++) {
targets_[ii]->ReceiveInit(init_data);
}
}
void Connect(InType & input) { void Connect(InType & input) {
if (input.IsConnected()) { if (input.IsConnected()) {
EMBB_THROW(embb::base::ErrorException, EMBB_THROW(embb::base::ErrorException,
......
...@@ -78,6 +78,11 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>, ...@@ -78,6 +78,11 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
executor_.Execute(clock, inputs_, outputs_); executor_.Execute(clock, inputs_, outputs_);
} }
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
executor_.Init(init_data, outputs_);
}
InputsType & GetInputs() { InputsType & GetInputs() {
return inputs_; return inputs_;
} }
...@@ -142,6 +147,10 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>, ...@@ -142,6 +147,10 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
} }
} }
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private: private:
InputsType inputs_; InputsType inputs_;
OutputsType outputs_; OutputsType outputs_;
......
...@@ -62,6 +62,10 @@ class ProcessExecutor< Inputs<Slices, I1>, Outputs<Slices, O1> > { ...@@ -62,6 +62,10 @@ class ProcessExecutor< Inputs<Slices, I1>, Outputs<Slices, O1> > {
} }
} }
void Init(InitData * init_data, Outputs<Slices, O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -91,6 +95,11 @@ class ProcessExecutor< Inputs<Slices, I1>, Outputs<Slices, O1, O2> > { ...@@ -91,6 +95,11 @@ class ProcessExecutor< Inputs<Slices, I1>, Outputs<Slices, O1, O2> > {
} }
} }
void Init(InitData * init_data, Outputs<Slices, O1, O2> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -124,6 +133,12 @@ class ProcessExecutor< Inputs<Slices, I1>, Outputs<Slices, O1, O2, O3> > { ...@@ -124,6 +133,12 @@ class ProcessExecutor< Inputs<Slices, I1>, Outputs<Slices, O1, O2, O3> > {
} }
} }
void Init(InitData * init_data, Outputs<Slices, O1, O2, O3> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -161,6 +176,13 @@ class ProcessExecutor< Inputs<Slices, I1>, Outputs<Slices, O1, O2, O3, O4> > { ...@@ -161,6 +176,13 @@ class ProcessExecutor< Inputs<Slices, I1>, Outputs<Slices, O1, O2, O3, O4> > {
} }
} }
void Init(InitData * init_data, Outputs<Slices, O1, O2, O3, O4> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
outputs.template Get<3>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -189,6 +211,10 @@ class ProcessExecutor< Inputs<Slices, I1, I2>, Outputs<Slices, O1> > { ...@@ -189,6 +211,10 @@ class ProcessExecutor< Inputs<Slices, I1, I2>, Outputs<Slices, O1> > {
} }
} }
void Init(InitData * init_data, Outputs<Slices, O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -220,6 +246,11 @@ class ProcessExecutor< Inputs<Slices, I1, I2>, Outputs<Slices, O1, O2> > { ...@@ -220,6 +246,11 @@ class ProcessExecutor< Inputs<Slices, I1, I2>, Outputs<Slices, O1, O2> > {
} }
} }
void Init(InitData * init_data, Outputs<Slices, O1, O2> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -255,6 +286,12 @@ class ProcessExecutor< Inputs<Slices, I1, I2>, Outputs<Slices, O1, O2, O3> > { ...@@ -255,6 +286,12 @@ class ProcessExecutor< Inputs<Slices, I1, I2>, Outputs<Slices, O1, O2, O3> > {
} }
} }
void Init(InitData * init_data, Outputs<Slices, O1, O2, O3> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -284,6 +321,10 @@ class ProcessExecutor< Inputs<Slices, I1, I2, I3>, Outputs<Slices, O1> > { ...@@ -284,6 +321,10 @@ class ProcessExecutor< Inputs<Slices, I1, I2, I3>, Outputs<Slices, O1> > {
} }
} }
void Init(InitData * init_data, Outputs<Slices, O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -317,6 +358,11 @@ class ProcessExecutor< Inputs<Slices, I1, I2, I3>, Outputs<Slices, O1, O2> > { ...@@ -317,6 +358,11 @@ class ProcessExecutor< Inputs<Slices, I1, I2, I3>, Outputs<Slices, O1, O2> > {
} }
} }
void Init(InitData * init_data, Outputs<Slices, O1, O2> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -348,6 +394,10 @@ class ProcessExecutor< Inputs<Slices, I1, I2, I3, I4>, Outputs<Slices, O1> > { ...@@ -348,6 +394,10 @@ class ProcessExecutor< Inputs<Slices, I1, I2, I3, I4>, Outputs<Slices, O1> > {
} }
} }
void Init(InitData * init_data, Outputs<Slices, O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
......
...@@ -81,6 +81,11 @@ class Select ...@@ -81,6 +81,11 @@ class Select
} }
} }
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
GetOutput<0>().SendInit(init_data);
}
InputsType & GetInputs() { InputsType & GetInputs() {
return inputs_; return inputs_;
} }
...@@ -112,6 +117,10 @@ class Select ...@@ -112,6 +117,10 @@ class Select
Run(clock); Run(clock);
} }
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private: private:
InputsType inputs_; InputsType inputs_;
OutputsType outputs_; OutputsType outputs_;
......
...@@ -72,6 +72,12 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> > ...@@ -72,6 +72,12 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
listener_->OnClock(clock); listener_->OnClock(clock);
} }
virtual void Init(InitData * init_data) {
SetListener(init_data->sink_listener);
SetScheduler(init_data->sched);
listener_->OnInit(init_data);
}
InputsType & GetInputs() { InputsType & GetInputs() {
return inputs_; return inputs_;
} }
...@@ -115,6 +121,10 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> > ...@@ -115,6 +121,10 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
} }
} }
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private: private:
InputsType inputs_; InputsType inputs_;
ExecutorType executor_; ExecutorType executor_;
......
...@@ -59,6 +59,11 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> > ...@@ -59,6 +59,11 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
not_done_ = executor_.Execute(clock, outputs_); not_done_ = executor_.Execute(clock, outputs_);
} }
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
executor_.Init(init_data, outputs_);
}
virtual bool Start(int clock) { virtual bool Start(int clock) {
if (not_done_) { if (not_done_) {
Run(clock); Run(clock);
......
...@@ -36,6 +36,8 @@ namespace embb { ...@@ -36,6 +36,8 @@ namespace embb {
namespace dataflow { namespace dataflow {
namespace internal { namespace internal {
class Scheduler;
template <class OUTPUTS> template <class OUTPUTS>
class SourceExecutor; class SourceExecutor;
...@@ -55,6 +57,10 @@ class SourceExecutor< Outputs<Slices, O1> > { ...@@ -55,6 +57,10 @@ class SourceExecutor< Outputs<Slices, O1> > {
return result; return result;
} }
void Init(InitData * init_data, Outputs<Slices, O1> & outputs) {
outputs.template Get<0>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -77,6 +83,11 @@ class SourceExecutor< Outputs<Slices, O1, O2> > { ...@@ -77,6 +83,11 @@ class SourceExecutor< Outputs<Slices, O1, O2> > {
return result; return result;
} }
void Init(InitData * init_data, Outputs<Slices, O1, O2> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -101,6 +112,12 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3> > { ...@@ -101,6 +112,12 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3> > {
return result; return result;
} }
void Init(InitData * init_data, Outputs<Slices, O1, O2, O3> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -127,6 +144,13 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3, O4> > { ...@@ -127,6 +144,13 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3, O4> > {
return result; return result;
} }
void Init(InitData * init_data, Outputs<Slices, O1, O2, O3, O4> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
outputs.template Get<3>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
...@@ -156,6 +180,15 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3, O4, O5> > { ...@@ -156,6 +180,15 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3, O4, O5> > {
return result; return result;
} }
void Init(
InitData * init_data, Outputs<Slices, O1, O2, O3, O4, O5> & outputs) {
outputs.template Get<0>().SendInit(init_data);
outputs.template Get<1>().SendInit(init_data);
outputs.template Get<2>().SendInit(init_data);
outputs.template Get<3>().SendInit(init_data);
outputs.template Get<4>().SendInit(init_data);
}
private: private:
FunctionType function_; FunctionType function_;
}; };
......
...@@ -78,6 +78,12 @@ class Switch ...@@ -78,6 +78,12 @@ class Switch
} }
} }
virtual void Init(InitData * init_data) {
SetScheduler(init_data->sched);
GetOutput<0>().SendInit(init_data);
GetOutput<1>().SendInit(init_data);
}
InputsType & GetInputs() { InputsType & GetInputs() {
return inputs_; return inputs_;
} }
...@@ -109,6 +115,10 @@ class Switch ...@@ -109,6 +115,10 @@ class Switch
Run(clock); Run(clock);
} }
virtual void OnInit(InitData * init_data) {
Init(init_data);
}
private: private:
InputsType inputs_; InputsType inputs_;
OutputsType outputs_; OutputsType outputs_;
......
...@@ -246,13 +246,6 @@ class Network { ...@@ -246,13 +246,6 @@ class Network {
}; };
/** /**
* Adds a new serial process to the network.
* \param proc The process to add.
*/
template <class Inputs, class Outputs>
void Add(SerialProcess<Inputs, Outputs> & proc);
/**
* Generic parallel process template. * Generic parallel process template.
* *
* Implements a generic parallel process in the network that may have one to * Implements a generic parallel process in the network that may have one to
...@@ -334,13 +327,6 @@ class Network { ...@@ -334,13 +327,6 @@ class Network {
}; };
/** /**
* Adds a new parallel process to the network.
* \param proc The process to add.
*/
template <class Inputs, class Outputs>
void Add(ParallelProcess<Inputs, Outputs> & proc);
/**
* Switch process template. * Switch process template.
* *
* A switch has 2 inputs and 2 outputs. Input port 0 is of type boolean and * A switch has 2 inputs and 2 outputs. Input port 0 is of type boolean and
...@@ -412,13 +398,6 @@ class Network { ...@@ -412,13 +398,6 @@ class Network {
}; };
/** /**
* Adds a new switch process to the network.
* \param sw The switch process to add.
*/
template <typename Type>
void Add(Switch<Type> & sw);
/**
* Select process template. * Select process template.
* *
* A select has 3 inputs and 1 output. Input port 0 is of type boolean and * A select has 3 inputs and 1 output. Input port 0 is of type boolean and
...@@ -490,13 +469,6 @@ class Network { ...@@ -490,13 +469,6 @@ class Network {
}; };
/** /**
* Adds a new select process to the network.
* \param sel The select process to add.
*/
template <typename Type>
void Add(Select<Type> & sel);
/**
* Sink process template. * Sink process template.
* *
* A sink marks the end of a particular processing chain. It can have one to * A sink marks the end of a particular processing chain. It can have one to
...@@ -557,13 +529,6 @@ class Network { ...@@ -557,13 +529,6 @@ class Network {
}; };
/** /**
* Adds a new sink process to the network.
* \param sink The sink process to add.
*/
template<typename I1, typename I2, typename I3, typename I4, typename I5>
void Add(Sink<I1, I2, I3, I4, I5> & sink);
/**
* Source process template. * Source process template.
* *
* A source marks the start of a processing chain. It can have one to five * A source marks the start of a processing chain. It can have one to five
...@@ -635,7 +600,7 @@ class Network { ...@@ -635,7 +600,7 @@ class Network {
* \param source The source process to add. * \param source The source process to add.
*/ */
template<typename O1, typename O2, typename O3, typename O4, typename O5> template<typename O1, typename O2, typename O3, typename O4, typename O5>
void Add(Source<O1, O2, O3, O4, O5> & source); void AddSource(Source<O1, O2, O3, O4, O5> & source);
/** /**
* Constant source process template. * Constant source process template.
...@@ -694,7 +659,7 @@ class Network { ...@@ -694,7 +659,7 @@ class Network {
* \param source The constant source process to add. * \param source The constant source process to add.
*/ */
template<typename Type> template<typename Type>
void Add(ConstantSource<Type> & source); void AddSource(ConstantSource<Type> & source);
/** /**
* Executes the network until one of the the sources returns \c false. * Executes the network until one of the the sources returns \c false.
...@@ -748,11 +713,6 @@ class Network : public internal::ClockListener { ...@@ -748,11 +713,6 @@ class Network : public internal::ClockListener {
} }
}; };
template <class Inputs, class Outputs>
void Add(SerialProcess<Inputs, Outputs> & proc) {
processes_.push_back(&proc);
}
template <class Inputs, class Outputs> class ParallelProcess; template <class Inputs, class Outputs> class ParallelProcess;
template < template <
...@@ -776,31 +736,16 @@ class Network : public internal::ClockListener { ...@@ -776,31 +736,16 @@ class Network : public internal::ClockListener {
} }
}; };
template <class Inputs, class Outputs>
void Add(ParallelProcess<Inputs, Outputs> & proc) {
processes_.push_back(&proc);
}
template<typename Type> template<typename Type>
class Switch : public internal::Switch<Slices, Type> { class Switch : public internal::Switch<Slices, Type> {
public: public:
}; };
template <typename Type>
void Add(Switch<Type> & sw) {
processes_.push_back(&sw);
}
template<typename Type> template<typename Type>
class Select : public internal::Select<Slices, Type> { class Select : public internal::Select<Slices, Type> {
public: public:
}; };
template <typename Type>
void Add(Select<Type> & sel) {
processes_.push_back(&sel);
}
template<typename I1, typename I2 = embb::base::internal::Nil, template<typename I1, typename I2 = embb::base::internal::Nil,
typename I3 = embb::base::internal::Nil, typename I3 = embb::base::internal::Nil,
typename I4 = embb::base::internal::Nil, typename I4 = embb::base::internal::Nil,
...@@ -818,12 +763,6 @@ class Network : public internal::ClockListener { ...@@ -818,12 +763,6 @@ class Network : public internal::ClockListener {
} }
}; };
template<typename I1, typename I2, typename I3, typename I4, typename I5>
void Add(Sink<I1, I2, I3, I4, I5> & sink) {
sink.SetListener(this);
sinks_.push_back(&sink);
}
template<typename O1, typename O2 = embb::base::internal::Nil, template<typename O1, typename O2 = embb::base::internal::Nil,
typename O3 = embb::base::internal::Nil, typename O3 = embb::base::internal::Nil,
typename O4 = embb::base::internal::Nil, typename O4 = embb::base::internal::Nil,
...@@ -843,7 +782,7 @@ class Network : public internal::ClockListener { ...@@ -843,7 +782,7 @@ class Network : public internal::ClockListener {
}; };
template<typename O1, typename O2, typename O3, typename O4, typename O5> template<typename O1, typename O2, typename O3, typename O4, typename O5>
void Add(Source<O1, O2, O3, O4, O5> & source) { void AddSource(Source<O1, O2, O3, O4, O5> & source) {
sources_.push_back(&source); sources_.push_back(&source);
} }
...@@ -857,7 +796,7 @@ class Network : public internal::ClockListener { ...@@ -857,7 +796,7 @@ class Network : public internal::ClockListener {
}; };
template<typename Type> template<typename Type>
void Add(ConstantSource<Type> & source) { void AddSource(ConstantSource<Type> & source) {
sources_.push_back(&source); sources_.push_back(&source);
} }
...@@ -866,19 +805,20 @@ class Network : public internal::ClockListener { ...@@ -866,19 +805,20 @@ class Network : public internal::ClockListener {
internal::SchedulerMTAPI<Slices> sched_mtapi; internal::SchedulerMTAPI<Slices> sched_mtapi;
internal::Scheduler * sched = &sched_mtapi; internal::Scheduler * sched = &sched_mtapi;
internal::InitData init_data;
init_data.sched = sched;
init_data.sink_listener = this;
sink_count_ = 0;
for (size_t it = 0; it < sources_.size(); it++) for (size_t it = 0; it < sources_.size(); it++)
sources_[it]->SetScheduler(sched); sources_[it]->Init(&init_data);
for (size_t it = 0; it < processes_.size(); it++)
processes_[it]->SetScheduler(sched);
for (size_t it = 0; it < sinks_.size(); it++)
sinks_[it]->SetScheduler(sched);
for (int ii = 0; ii < Slices; ii++) sink_count_[ii] = 0; for (int ii = 0; ii < Slices; ii++) sink_counter_[ii] = 0;
int clock = 0; int clock = 0;
while (clock >= 0) { while (clock >= 0) {
const int idx = clock % Slices; const int idx = clock % Slices;
while (sink_count_[idx] > 0) embb::base::Thread::CurrentYield(); while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield();
sched->WaitForSlice(idx); sched->WaitForSlice(idx);
if (!SpawnClock(clock)) if (!SpawnClock(clock))
break; break;
...@@ -889,7 +829,7 @@ class Network : public internal::ClockListener { ...@@ -889,7 +829,7 @@ class Network : public internal::ClockListener {
if (ii < 0) ii = 0; if (ii < 0) ii = 0;
for (; ii < clock; ii++) { for (; ii < clock; ii++) {
const int idx = ii % Slices; const int idx = ii % Slices;
while (sink_count_[idx] > 0) embb::base::Thread::CurrentYield(); while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield();
sched->WaitForSlice(idx); sched->WaitForSlice(idx);
} }
} }
...@@ -902,17 +842,28 @@ class Network : public internal::ClockListener { ...@@ -902,17 +842,28 @@ class Network : public internal::ClockListener {
*/ */
virtual void OnClock(int clock) { virtual void OnClock(int clock) {
const int idx = clock % Slices; const int idx = clock % Slices;
const int cnt = --sink_count_[idx]; const int cnt = --sink_counter_[idx];
if (cnt < 0) if (cnt < 0)
EMBB_THROW(embb::base::ErrorException, EMBB_THROW(embb::base::ErrorException,
"More sinks than expected signaled reception of given clock.") "More sinks than expected signaled reception of given clock.")
} }
/**
* Internal.
* \internal
* Gets called when an init token has reached all sinks.
*/
virtual void OnInit(internal::InitData * /*sched*/) {
sink_count_++;
}
private: private:
std::vector<internal::Node*> processes_; std::vector<internal::Node*> processes_;
std::vector<internal::Node*> sources_; std::vector<internal::Node*> sources_;
std::vector<internal::Node*> sinks_; std::vector<internal::Node*> sinks_;
embb::base::Atomic<int> sink_count_[Slices]; embb::base::Atomic<int> sink_counter_[Slices];
int sink_count_;
#if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY
std::vector<int> spawn_history_[Slices]; std::vector<int> spawn_history_[Slices];
#endif #endif
...@@ -923,7 +874,7 @@ class Network : public internal::ClockListener { ...@@ -923,7 +874,7 @@ class Network : public internal::ClockListener {
#if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY
spawn_history_[idx].push_back(clock); spawn_history_[idx].push_back(clock);
#endif #endif
sink_count_[idx] = static_cast<int>(sinks_.size()); sink_counter_[idx] = sink_count_;
for (size_t kk = 0; kk < sources_.size(); kk++) { for (size_t kk = 0; kk < sources_.size(); kk++) {
result &= sources_[kk]->Start(clock); result &= sources_[kk]->Start(clock);
} }
......
...@@ -186,17 +186,8 @@ void SimpleTest::TestBasic() { ...@@ -186,17 +186,8 @@ void SimpleTest::TestBasic() {
sel.GetOutput<0>() >> sink.GetInput<0>(); sel.GetOutput<0>() >> sink.GetInput<0>();
network.Add(constant); network.AddSource(constant);
network.Add(source); network.AddSource(source);
network.Add(filter);
network.Add(mult);
network.Add(pred);
network.Add(sw);
network.Add(sel);
network.Add(sink);
network(); network();
......
nw.Add(read); nw.AddSource(read);
nw.Add(replace);
nw.Add(write);
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
source4( source4(
embb::base::MakeFunction(producer4, &Producer<int>::Run) ); embb::base::MakeFunction(producer4, &Producer<int>::Run) );
nw.Add(source1); nw.AddSource(source1);
nw.Add(source2); nw.AddSource(source2);
nw.Add(source3); nw.AddSource(source3);
nw.Add(source4); nw.AddSource(source4);
...@@ -60,19 +60,11 @@ void RunDataflowNonLinear() { ...@@ -60,19 +60,11 @@ void RunDataflowNonLinear() {
process4( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ), process4( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ),
process5( embb::base::MakeFunction(comparator, &Comparator<int>::Run) ); process5( embb::base::MakeFunction(comparator, &Comparator<int>::Run) );
nw.Add(process1);
nw.Add(process2);
nw.Add(process3);
nw.Add(process4);
nw.Add(process5);
Consumer<int> consumer; Consumer<int> consumer;
Network::Sink<int, int, int, int> Network::Sink<int, int, int, int>
sink1(embb::base::MakeFunction(consumer, &Consumer<int>::Run)); sink1(embb::base::MakeFunction(consumer, &Consumer<int>::Run));
nw.Add(sink1);
source1.GetOutput<0>() >> process1.GetInput<0>(); source1.GetOutput<0>() >> process1.GetInput<0>();
source2.GetOutput<0>() >> process2.GetInput<0>(); source2.GetOutput<0>() >> process2.GetInput<0>();
source3.GetOutput<0>() >> process1.GetInput<1>(); source3.GetOutput<0>() >> process1.GetInput<1>();
......
...@@ -142,7 +142,7 @@ is used to construct the sink: ...@@ -142,7 +142,7 @@ is used to construct the sink:
\emph{\textbf{Note:} If you parallelize your own application using \embb and your compiler emits a lengthy error message containing lots of templates, it is very likely that for at least one process, the ports and their directions do not match the signature of the given function.} \emph{\textbf{Note:} If you parallelize your own application using \embb and your compiler emits a lengthy error message containing lots of templates, it is very likely that for at least one process, the ports and their directions do not match the signature of the given function.}
The network needs to know about the processes declared above, so we add them to our network: The network needs to know about the source declared above, so we add it to our network:
% %
\\\inputlisting{../examples/dataflow/dataflow_add-snippet.h} \\\inputlisting{../examples/dataflow/dataflow_add-snippet.h}
% %
...@@ -303,11 +303,11 @@ The class-based approach has several advantages besides the use of templates: Fi ...@@ -303,11 +303,11 @@ The class-based approach has several advantages besides the use of templates: Fi
Each instance of the class \lstinline|Network| maintains a list of source processes that belong to the network. Each instance of the class \lstinline|Network| maintains a list of source processes that belong to the network.
% When you create a source process using \lstinline|MakeSource|, it is automatically added to this list. Otherwise, you must explicitly add it by a call to \lstinline|Add|. For example, if we want to feed our sorting network \lstinline|nw| with streams of integer values, we may write: % When you create a source process using \lstinline|MakeSource|, it is automatically added to this list. Otherwise, you must explicitly add it by a call to \lstinline|Add|. For example, if we want to feed our sorting network \lstinline|nw| with streams of integer values, we may write:
You must explicitly add all processes to the network by a call to \lstinline|Add|. For example, if we want to feed our sorting network \lstinline|nw| with four streams of integer values, we may write: You must explicitly add all sources to the network by a call to \lstinline|AddSource|. For example, if we want to feed our sorting network \lstinline|nw| with four streams of integer values, we may write:
% %
\\\inputlisting{../examples/dataflow/dataflow_declare_add_sources-snippet.h} \\\inputlisting{../examples/dataflow/dataflow_declare_add_sources-snippet.h}
% %
%This is only necessary for source processes. All other processes are automatically found via a depth-first search starting from the source processes. This is only necessary for source processes. All other processes are automatically found via a depth-first search starting from the source processes.
The code for the comparators looks like this: The code for the comparators looks like this:
% %
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment