Commit 049b3b00 by Marcus Winter

dataflow_cpp: sources need to return false if processing should be stopped

parent b7744ae6
...@@ -56,8 +56,9 @@ class ConstantSource ...@@ -56,8 +56,9 @@ class ConstantSource
GetOutput<0>().Send(Signal<Type>(clock, value_)); GetOutput<0>().Send(Signal<Type>(clock, value_));
} }
virtual void Start(int clock) { virtual bool Start(int clock) {
Run(clock); Run(clock);
return true;
} }
OutputsType & GetOutputs() { OutputsType & GetOutputs() {
......
...@@ -42,7 +42,7 @@ class Node { ...@@ -42,7 +42,7 @@ class Node {
virtual bool HasInputs() const { return false; } virtual bool HasInputs() const { return false; }
virtual bool HasOutputs() const { return false; } virtual bool HasOutputs() const { return false; }
virtual void Run(int clock) = 0; virtual void Run(int clock) = 0;
virtual void Start(int /*clock*/) { virtual bool Start(int /*clock*/) {
EMBB_THROW(embb::base::ErrorException, EMBB_THROW(embb::base::ErrorException,
"Nodes are started implicitly."); "Nodes are started implicitly.");
} }
......
...@@ -52,7 +52,7 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> > ...@@ -52,7 +52,7 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
typedef typename ExecutorType::FunctionType FunctionType; typedef typename ExecutorType::FunctionType FunctionType;
explicit Source(FunctionType function) explicit Source(FunctionType function)
: executor_(function) { : executor_(function), not_done_(true) {
next_clock_ = 0; next_clock_ = 0;
} }
...@@ -61,15 +61,18 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> > ...@@ -61,15 +61,18 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
} }
virtual void Run(int clock) { virtual void Run(int clock) {
executor_.Execute(clock, outputs_); not_done_ = executor_.Execute(clock, outputs_);
next_clock_++; next_clock_++;
} }
virtual void Start(int clock) { virtual bool Start(int clock) {
while (clock != next_clock_) embb::base::Thread::CurrentYield(); while (clock != next_clock_) embb::base::Thread::CurrentYield();
const int idx = clock % Slices; if (not_done_) {
action_[idx] = Action(this, clock); const int idx = clock % Slices;
sched_->Spawn(action_[idx]); action_[idx] = Action(this, clock);
sched_->Spawn(action_[idx]);
}
return not_done_;
} }
OutputsType & GetOutputs() { OutputsType & GetOutputs() {
...@@ -90,6 +93,7 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> > ...@@ -90,6 +93,7 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
OutputsType outputs_; OutputsType outputs_;
ExecutorType executor_; ExecutorType executor_;
Action action_[Slices]; Action action_[Slices];
volatile bool not_done_;
embb::base::Atomic<int> next_clock_; embb::base::Atomic<int> next_clock_;
}; };
......
...@@ -42,16 +42,17 @@ class SourceExecutor; ...@@ -42,16 +42,17 @@ class SourceExecutor;
template <int Slices, typename O1> template <int Slices, typename O1>
class SourceExecutor< Outputs<Slices, O1> > { class SourceExecutor< Outputs<Slices, O1> > {
public: public:
typedef embb::base::Function<void, O1 &> FunctionType; typedef embb::base::Function<bool, O1 &> FunctionType;
explicit SourceExecutor(FunctionType func) : function_(func) {} explicit SourceExecutor(FunctionType func) : function_(func) {}
void Execute( bool Execute(
int clock, int clock,
Outputs<Slices, O1> & outputs) { Outputs<Slices, O1> & outputs) {
O1 o1; O1 o1;
function_(o1); bool result = function_(o1);
outputs.template Get<0>().Send(Signal<O1>(clock, o1)); outputs.template Get<0>().Send(Signal<O1>(clock, o1));
return result;
} }
private: private:
...@@ -61,18 +62,19 @@ class SourceExecutor< Outputs<Slices, O1> > { ...@@ -61,18 +62,19 @@ class SourceExecutor< Outputs<Slices, O1> > {
template <int Slices, typename O1, typename O2> template <int Slices, typename O1, typename O2>
class SourceExecutor< Outputs<Slices, O1, O2> > { class SourceExecutor< Outputs<Slices, O1, O2> > {
public: public:
typedef embb::base::Function<void, O1 &, O2 &> FunctionType; typedef embb::base::Function<bool, O1 &, O2 &> FunctionType;
explicit SourceExecutor(FunctionType func) : function_(func) {} explicit SourceExecutor(FunctionType func) : function_(func) {}
void Execute( bool Execute(
int clock, int clock,
Outputs<Slices, O1, O2> & outputs) { Outputs<Slices, O1, O2> & outputs) {
O1 o1; O1 o1;
O2 o2; O2 o2;
function_(o1, o2); bool result = function_(o1, o2);
outputs.template Get<0>().Send(Signal<O1>(clock, o1)); outputs.template Get<0>().Send(Signal<O1>(clock, o1));
outputs.template Get<1>().Send(Signal<O2>(clock, o2)); outputs.template Get<1>().Send(Signal<O2>(clock, o2));
return result;
} }
private: private:
...@@ -82,20 +84,21 @@ class SourceExecutor< Outputs<Slices, O1, O2> > { ...@@ -82,20 +84,21 @@ class SourceExecutor< Outputs<Slices, O1, O2> > {
template <int Slices, typename O1, typename O2, typename O3> template <int Slices, typename O1, typename O2, typename O3>
class SourceExecutor< Outputs<Slices, O1, O2, O3> > { class SourceExecutor< Outputs<Slices, O1, O2, O3> > {
public: public:
typedef embb::base::Function<void, O1 &, O2 &, O3 &> FunctionType; typedef embb::base::Function<bool, O1 &, O2 &, O3 &> FunctionType;
explicit SourceExecutor(FunctionType func) : function_(func) {} explicit SourceExecutor(FunctionType func) : function_(func) {}
void Execute( bool Execute(
int clock, int clock,
Outputs<Slices, O1, O2, O3> & outputs) { Outputs<Slices, O1, O2, O3> & outputs) {
O1 o1; O1 o1;
O2 o2; O2 o2;
O3 o3; O3 o3;
function_(o1, o2, o3); bool result = function_(o1, o2, o3);
outputs.template Get<0>().Send(Signal<O1>(clock, o1)); outputs.template Get<0>().Send(Signal<O1>(clock, o1));
outputs.template Get<1>().Send(Signal<O2>(clock, o2)); outputs.template Get<1>().Send(Signal<O2>(clock, o2));
outputs.template Get<2>().Send(Signal<O3>(clock, o3)); outputs.template Get<2>().Send(Signal<O3>(clock, o3));
return result;
} }
private: private:
...@@ -105,22 +108,23 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3> > { ...@@ -105,22 +108,23 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3> > {
template <int Slices, typename O1, typename O2, typename O3, typename O4> template <int Slices, typename O1, typename O2, typename O3, typename O4>
class SourceExecutor< Outputs<Slices, O1, O2, O3, O4> > { class SourceExecutor< Outputs<Slices, O1, O2, O3, O4> > {
public: public:
typedef embb::base::Function<void, O1 &, O2 &, O3 &, O4 &> FunctionType; typedef embb::base::Function<bool, O1 &, O2 &, O3 &, O4 &> FunctionType;
explicit SourceExecutor(FunctionType func) : function_(func) {} explicit SourceExecutor(FunctionType func) : function_(func) {}
void Execute( bool Execute(
int clock, int clock,
Outputs<Slices, O1, O2, O3, O4> & outputs) { Outputs<Slices, O1, O2, O3, O4> & outputs) {
O1 o1; O1 o1;
O2 o2; O2 o2;
O3 o3; O3 o3;
O4 o4; O4 o4;
function_(o1, o2, o3, o4); bool result = function_(o1, o2, o3, o4);
outputs.template Get<0>().Send(Signal<O1>(clock, o1)); outputs.template Get<0>().Send(Signal<O1>(clock, o1));
outputs.template Get<1>().Send(Signal<O2>(clock, o2)); outputs.template Get<1>().Send(Signal<O2>(clock, o2));
outputs.template Get<2>().Send(Signal<O3>(clock, o3)); outputs.template Get<2>().Send(Signal<O3>(clock, o3));
outputs.template Get<3>().Send(Signal<O4>(clock, o4)); outputs.template Get<3>().Send(Signal<O4>(clock, o4));
return result;
} }
private: private:
...@@ -131,11 +135,11 @@ template <int Slices, typename O1, typename O2, typename O3, typename O4, ...@@ -131,11 +135,11 @@ template <int Slices, typename O1, typename O2, typename O3, typename O4,
typename O5> typename O5>
class SourceExecutor< Outputs<Slices, O1, O2, O3, O4, O5> > { class SourceExecutor< Outputs<Slices, O1, O2, O3, O4, O5> > {
public: public:
typedef embb::base::Function<void, O1 &, O2 &, O3 &, O4 &, O5 &> FunctionType; typedef embb::base::Function<bool, O1 &, O2 &, O3 &, O4 &, O5 &> FunctionType;
explicit SourceExecutor(FunctionType func) : function_(func) {} explicit SourceExecutor(FunctionType func) : function_(func) {}
void Execute( bool Execute(
int clock, int clock,
Outputs<Slices, O1, O2, O3, O4, O5> & outputs) { Outputs<Slices, O1, O2, O3, O4, O5> & outputs) {
O1 o1; O1 o1;
...@@ -143,12 +147,13 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3, O4, O5> > { ...@@ -143,12 +147,13 @@ class SourceExecutor< Outputs<Slices, O1, O2, O3, O4, O5> > {
O3 o3; O3 o3;
O4 o4; O4 o4;
O5 o5; O5 o5;
function_(o1, o2, o3, o4, o5); bool result = function_(o1, o2, o3, o4, o5);
outputs.template Get<0>().Send(Signal<O1>(clock, o1)); outputs.template Get<0>().Send(Signal<O1>(clock, o1));
outputs.template Get<1>().Send(Signal<O2>(clock, o2)); outputs.template Get<1>().Send(Signal<O2>(clock, o2));
outputs.template Get<2>().Send(Signal<O3>(clock, o3)); outputs.template Get<2>().Send(Signal<O3>(clock, o3));
outputs.template Get<3>().Send(Signal<O4>(clock, o4)); outputs.template Get<3>().Send(Signal<O4>(clock, o4));
outputs.template Get<4>().Send(Signal<O5>(clock, o5)); outputs.template Get<4>().Send(Signal<O5>(clock, o5));
return result;
} }
private: private:
......
...@@ -697,10 +697,9 @@ class Network { ...@@ -697,10 +697,9 @@ class Network {
void Add(ConstantSource<Type> & source); void Add(ConstantSource<Type> & source);
/** /**
* Executes the network for at most \c elements tokens. * Executes the network until one of the the sources returns \c false.
* \param elements Maximum number of tokens to process.
*/ */
void operator () (int elements); void operator () ();
}; };
#else #else
...@@ -862,7 +861,7 @@ class Network : public internal::ClockListener { ...@@ -862,7 +861,7 @@ class Network : public internal::ClockListener {
sources_.push_back(&source); sources_.push_back(&source);
} }
void operator () (int elements) { void operator () () {
internal::SchedulerSequential sched_seq; internal::SchedulerSequential sched_seq;
internal::SchedulerMTAPI<Slices> sched_mtapi; internal::SchedulerMTAPI<Slices> sched_mtapi;
internal::Scheduler * sched = &sched_mtapi; internal::Scheduler * sched = &sched_mtapi;
...@@ -876,16 +875,22 @@ class Network : public internal::ClockListener { ...@@ -876,16 +875,22 @@ class Network : public internal::ClockListener {
for (int ii = 0; ii < Slices; ii++) sink_count_[ii] = 0; for (int ii = 0; ii < Slices; ii++) sink_count_[ii] = 0;
for (int clock = 0; clock < elements; clock++) { int clock = 0;
while (clock >= 0) {
const int idx = clock % Slices; const int idx = clock % Slices;
while (sink_count_[idx] > 0) embb::base::Thread::CurrentYield(); while (sink_count_[idx] > 0) embb::base::Thread::CurrentYield();
sched->WaitForSlice(idx); sched->WaitForSlice(idx);
SpawnClock(clock); if (!SpawnClock(clock))
break;
clock++;
} }
for (int ii = 0; ii < Slices; ii++) { int ii = clock - Slices + 1;
while (sink_count_[ii] > 0) embb::base::Thread::CurrentYield(); if (ii < 0) ii = 0;
sched->WaitForSlice(ii); for (; ii < clock; ii++) {
const int idx = ii % Slices;
while (sink_count_[idx] > 0) embb::base::Thread::CurrentYield();
sched->WaitForSlice(idx);
} }
} }
...@@ -912,15 +917,17 @@ class Network : public internal::ClockListener { ...@@ -912,15 +917,17 @@ class Network : public internal::ClockListener {
std::vector<int> spawn_history_[Slices]; std::vector<int> spawn_history_[Slices];
#endif #endif
void SpawnClock(int clock) { bool SpawnClock(int clock) {
const int idx = clock % Slices; const int idx = clock % Slices;
bool result = true;
#if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY
spawn_history_[idx].push_back(clock); spawn_history_[idx].push_back(clock);
#endif #endif
sink_count_[idx] = static_cast<int>(sinks_.size()); sink_count_[idx] = static_cast<int>(sinks_.size());
for (size_t kk = 0; kk < sources_.size(); kk++) { for (size_t kk = 0; kk < sources_.size(); kk++) {
sources_[kk]->Start(clock); result &= sources_[kk]->Start(clock);
} }
return result;
} }
}; };
......
...@@ -53,11 +53,13 @@ typedef MyNetwork::Select< int > MySelect; ...@@ -53,11 +53,13 @@ typedef MyNetwork::Select< int > MySelect;
embb::base::Atomic<int> source_counter; embb::base::Atomic<int> source_counter;
int source_array[TEST_COUNT]; int source_array[TEST_COUNT];
void sourceFunc(int & out) { bool sourceFunc(int & out) {
out = source_counter; out = source_counter;
source_array[source_counter] = out; source_array[source_counter] = out;
source_counter++; source_counter++;
return source_counter < 12;
} }
embb::base::Atomic<int> pred_counter; embb::base::Atomic<int> pred_counter;
...@@ -195,7 +197,7 @@ void SimpleTest::TestBasic() { ...@@ -195,7 +197,7 @@ void SimpleTest::TestBasic() {
network.Add(sink); network.Add(sink);
network(TEST_COUNT); network();
PT_EXPECT(asink.Check()); PT_EXPECT(asink.Check());
} }
......
...@@ -65,5 +65,5 @@ void RunDataflowNonLinear() { ...@@ -65,5 +65,5 @@ void RunDataflowNonLinear() {
process5.GetOutput<1>() >> sink1.GetInput<2>(); process5.GetOutput<1>() >> sink1.GetInput<2>();
process4.GetOutput<1>() >> sink1.GetInput<3>(); process4.GetOutput<1>() >> sink1.GetInput<3>();
nw(10); nw();
} }
template <typename T> template <typename T>
class Producer { class Producer {
public: public:
explicit Producer(int seed) : seed_(seed) {} explicit Producer(int seed) : seed_(seed), count_(4) {}
void Run(T& x) { bool Run(T& x) {
// produce a new value x // produce a new value x
x = SimpleRand(seed_); x = SimpleRand(seed_);
count_--;
return count_ >= 0;
} }
private: private:
int seed_; int seed_;
int count_;
}; };
void SourceFunction(std::string & str) { bool SourceFunction(std::string & str) {
std::getline(file, str); std::getline(file, str);
return !file.eof();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment