From 049b3b001376c1c38b740933e53e27a80924c316 Mon Sep 17 00:00:00 2001 From: Marcus Winter Date: Tue, 14 Oct 2014 13:02:36 +0200 Subject: [PATCH] dataflow_cpp: sources need to return false if processing should be stopped --- dataflow_cpp/include/embb/dataflow/internal/constant_source.h | 3 ++- dataflow_cpp/include/embb/dataflow/internal/node.h | 2 +- dataflow_cpp/include/embb/dataflow/internal/source.h | 16 ++++++++++------ dataflow_cpp/include/embb/dataflow/internal/source_executor.h | 35 ++++++++++++++++++++--------------- dataflow_cpp/include/embb/dataflow/network.h | 29 ++++++++++++++++++----------- dataflow_cpp/test/dataflow_cpp_test_simple.cc | 6 ++++-- doc/examples/dataflow/dataflow_nonlinear-fragmented.cc | 2 +- doc/examples/dataflow/dataflow_producer-snippet.h | 7 +++++-- doc/examples/dataflow/dataflow_run-snippet.h | 2 +- doc/examples/dataflow/dataflow_source_function-snippet.h | 3 ++- 10 files changed, 64 insertions(+), 41 deletions(-) diff --git a/dataflow_cpp/include/embb/dataflow/internal/constant_source.h b/dataflow_cpp/include/embb/dataflow/internal/constant_source.h index 68127d6..28d014a 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/constant_source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/constant_source.h @@ -56,8 +56,9 @@ class ConstantSource GetOutput<0>().Send(Signal(clock, value_)); } - virtual void Start(int clock) { + virtual bool Start(int clock) { Run(clock); + return true; } OutputsType & GetOutputs() { diff --git a/dataflow_cpp/include/embb/dataflow/internal/node.h b/dataflow_cpp/include/embb/dataflow/internal/node.h index fca2700..c446f10 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/node.h +++ b/dataflow_cpp/include/embb/dataflow/internal/node.h @@ -42,7 +42,7 @@ class Node { virtual bool HasInputs() const { return false; } virtual bool HasOutputs() const { return false; } virtual void Run(int clock) = 0; - virtual void Start(int /*clock*/) { + virtual bool Start(int /*clock*/) { EMBB_THROW(embb::base::ErrorException, "Nodes are started implicitly."); } diff --git a/dataflow_cpp/include/embb/dataflow/internal/source.h b/dataflow_cpp/include/embb/dataflow/internal/source.h index d0e022c..155769a 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source.h @@ -52,7 +52,7 @@ class Source< Slices, Outputs > typedef typename ExecutorType::FunctionType FunctionType; explicit Source(FunctionType function) - : executor_(function) { + : executor_(function), not_done_(true) { next_clock_ = 0; } @@ -61,15 +61,18 @@ class Source< Slices, Outputs > } virtual void Run(int clock) { - executor_.Execute(clock, outputs_); + not_done_ = executor_.Execute(clock, outputs_); next_clock_++; } - virtual void Start(int clock) { + virtual bool Start(int clock) { while (clock != next_clock_) embb::base::Thread::CurrentYield(); - const int idx = clock % Slices; - action_[idx] = Action(this, clock); - sched_->Spawn(action_[idx]); + if (not_done_) { + const int idx = clock % Slices; + action_[idx] = Action(this, clock); + sched_->Spawn(action_[idx]); + } + return not_done_; } OutputsType & GetOutputs() { @@ -90,6 +93,7 @@ class Source< Slices, Outputs > OutputsType outputs_; ExecutorType executor_; Action action_[Slices]; + volatile bool not_done_; embb::base::Atomic next_clock_; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/source_executor.h b/dataflow_cpp/include/embb/dataflow/internal/source_executor.h index 6f7b109..7d316e9 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source_executor.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source_executor.h @@ -42,16 +42,17 @@ class SourceExecutor; template class SourceExecutor< Outputs > { public: - typedef embb::base::Function FunctionType; + typedef embb::base::Function FunctionType; explicit SourceExecutor(FunctionType func) : function_(func) {} - void Execute( + bool Execute( int clock, Outputs & outputs) { O1 o1; - function_(o1); + bool result = function_(o1); outputs.template Get<0>().Send(Signal(clock, o1)); + return result; } private: @@ -61,18 +62,19 @@ class SourceExecutor< Outputs > { template class SourceExecutor< Outputs > { public: - typedef embb::base::Function FunctionType; + typedef embb::base::Function FunctionType; explicit SourceExecutor(FunctionType func) : function_(func) {} - void Execute( + bool Execute( int clock, Outputs & outputs) { O1 o1; O2 o2; - function_(o1, o2); + bool result = function_(o1, o2); outputs.template Get<0>().Send(Signal(clock, o1)); outputs.template Get<1>().Send(Signal(clock, o2)); + return result; } private: @@ -82,20 +84,21 @@ class SourceExecutor< Outputs > { template class SourceExecutor< Outputs > { public: - typedef embb::base::Function FunctionType; + typedef embb::base::Function FunctionType; explicit SourceExecutor(FunctionType func) : function_(func) {} - void Execute( + bool Execute( int clock, Outputs & outputs) { O1 o1; O2 o2; O3 o3; - function_(o1, o2, o3); + bool result = function_(o1, o2, o3); outputs.template Get<0>().Send(Signal(clock, o1)); outputs.template Get<1>().Send(Signal(clock, o2)); outputs.template Get<2>().Send(Signal(clock, o3)); + return result; } private: @@ -105,22 +108,23 @@ class SourceExecutor< Outputs > { template class SourceExecutor< Outputs > { public: - typedef embb::base::Function FunctionType; + typedef embb::base::Function FunctionType; explicit SourceExecutor(FunctionType func) : function_(func) {} - void Execute( + bool Execute( int clock, Outputs & outputs) { O1 o1; O2 o2; O3 o3; O4 o4; - function_(o1, o2, o3, o4); + bool result = function_(o1, o2, o3, o4); outputs.template Get<0>().Send(Signal(clock, o1)); outputs.template Get<1>().Send(Signal(clock, o2)); outputs.template Get<2>().Send(Signal(clock, o3)); outputs.template Get<3>().Send(Signal(clock, o4)); + return result; } private: @@ -131,11 +135,11 @@ template class SourceExecutor< Outputs > { public: - typedef embb::base::Function FunctionType; + typedef embb::base::Function FunctionType; explicit SourceExecutor(FunctionType func) : function_(func) {} - void Execute( + bool Execute( int clock, Outputs & outputs) { O1 o1; @@ -143,12 +147,13 @@ class SourceExecutor< Outputs > { O3 o3; O4 o4; O5 o5; - function_(o1, o2, o3, o4, o5); + bool result = function_(o1, o2, o3, o4, o5); outputs.template Get<0>().Send(Signal(clock, o1)); outputs.template Get<1>().Send(Signal(clock, o2)); outputs.template Get<2>().Send(Signal(clock, o3)); outputs.template Get<3>().Send(Signal(clock, o4)); outputs.template Get<4>().Send(Signal(clock, o5)); + return result; } private: diff --git a/dataflow_cpp/include/embb/dataflow/network.h b/dataflow_cpp/include/embb/dataflow/network.h index bdd9bdd..625d269 100644 --- a/dataflow_cpp/include/embb/dataflow/network.h +++ b/dataflow_cpp/include/embb/dataflow/network.h @@ -697,10 +697,9 @@ class Network { void Add(ConstantSource & source); /** - * Executes the network for at most \c elements tokens. - * \param elements Maximum number of tokens to process. + * Executes the network until one of the the sources returns \c false. */ - void operator () (int elements); + void operator () (); }; #else @@ -862,7 +861,7 @@ class Network : public internal::ClockListener { sources_.push_back(&source); } - void operator () (int elements) { + void operator () () { internal::SchedulerSequential sched_seq; internal::SchedulerMTAPI sched_mtapi; internal::Scheduler * sched = &sched_mtapi; @@ -876,16 +875,22 @@ class Network : public internal::ClockListener { for (int ii = 0; ii < Slices; ii++) sink_count_[ii] = 0; - for (int clock = 0; clock < elements; clock++) { + int clock = 0; + while (clock >= 0) { const int idx = clock % Slices; while (sink_count_[idx] > 0) embb::base::Thread::CurrentYield(); sched->WaitForSlice(idx); - SpawnClock(clock); + if (!SpawnClock(clock)) + break; + clock++; } - for (int ii = 0; ii < Slices; ii++) { - while (sink_count_[ii] > 0) embb::base::Thread::CurrentYield(); - sched->WaitForSlice(ii); + int ii = clock - Slices + 1; + if (ii < 0) ii = 0; + for (; ii < clock; ii++) { + const int idx = ii % Slices; + while (sink_count_[idx] > 0) embb::base::Thread::CurrentYield(); + sched->WaitForSlice(idx); } } @@ -912,15 +917,17 @@ class Network : public internal::ClockListener { std::vector spawn_history_[Slices]; #endif - void SpawnClock(int clock) { + bool SpawnClock(int clock) { const int idx = clock % Slices; + bool result = true; #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY spawn_history_[idx].push_back(clock); #endif sink_count_[idx] = static_cast(sinks_.size()); for (size_t kk = 0; kk < sources_.size(); kk++) { - sources_[kk]->Start(clock); + result &= sources_[kk]->Start(clock); } + return result; } }; diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.cc b/dataflow_cpp/test/dataflow_cpp_test_simple.cc index 13c1466..cb7d7c9 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.cc +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.cc @@ -53,11 +53,13 @@ typedef MyNetwork::Select< int > MySelect; embb::base::Atomic source_counter; int source_array[TEST_COUNT]; -void sourceFunc(int & out) { +bool sourceFunc(int & out) { out = source_counter; source_array[source_counter] = out; source_counter++; + + return source_counter < 12; } embb::base::Atomic pred_counter; @@ -195,7 +197,7 @@ void SimpleTest::TestBasic() { network.Add(sink); - network(TEST_COUNT); + network(); PT_EXPECT(asink.Check()); } diff --git a/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc b/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc index 511d378..47b589e 100644 --- a/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc +++ b/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc @@ -65,5 +65,5 @@ void RunDataflowNonLinear() { process5.GetOutput<1>() >> sink1.GetInput<2>(); process4.GetOutput<1>() >> sink1.GetInput<3>(); - nw(10); + nw(); } diff --git a/doc/examples/dataflow/dataflow_producer-snippet.h b/doc/examples/dataflow/dataflow_producer-snippet.h index 92c61c2..ffaf79e 100644 --- a/doc/examples/dataflow/dataflow_producer-snippet.h +++ b/doc/examples/dataflow/dataflow_producer-snippet.h @@ -1,12 +1,15 @@ template class Producer { public: - explicit Producer(int seed) : seed_(seed) {} - void Run(T& x) { + explicit Producer(int seed) : seed_(seed), count_(4) {} + bool Run(T& x) { // produce a new value x x = SimpleRand(seed_); + count_--; + return count_ >= 0; } private: int seed_; + int count_; }; diff --git a/doc/examples/dataflow/dataflow_run-snippet.h b/doc/examples/dataflow/dataflow_run-snippet.h index a5d17a5..ff4f083 100644 --- a/doc/examples/dataflow/dataflow_run-snippet.h +++ b/doc/examples/dataflow/dataflow_run-snippet.h @@ -1 +1 @@ - nw(4); + nw(); diff --git a/doc/examples/dataflow/dataflow_source_function-snippet.h b/doc/examples/dataflow/dataflow_source_function-snippet.h index 6df637b..6c1cdea 100644 --- a/doc/examples/dataflow/dataflow_source_function-snippet.h +++ b/doc/examples/dataflow/dataflow_source_function-snippet.h @@ -1,3 +1,4 @@ -void SourceFunction(std::string & str) { +bool SourceFunction(std::string & str) { std::getline(file, str); + return !file.eof(); } -- libgit2 0.26.0