diff --git a/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h b/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h index c8e7fd5..5234740 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h +++ b/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h @@ -31,10 +31,19 @@ namespace embb { namespace dataflow { namespace internal { +class Scheduler; +class ClockListener; + +struct InitData { + Scheduler * sched; + ClockListener * sink_listener; +}; + class ClockListener { public: virtual ~ClockListener() {} virtual void OnClock(int /*clock*/) = 0; + virtual void OnInit(InitData * /*sched*/) = 0; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/constant_source.h b/dataflow_cpp/include/embb/dataflow/internal/constant_source.h index 58da428..4ec14af 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/constant_source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/constant_source.h @@ -56,6 +56,11 @@ class ConstantSource GetOutput<0>().Send(Signal(clock, value_)); } + virtual void Init(InitData * init_data) { + SetScheduler(init_data->sched); + GetOutput<0>().SendInit(init_data); + } + virtual bool Start(int clock) { Run(clock); return true; diff --git a/dataflow_cpp/include/embb/dataflow/internal/in.h b/dataflow_cpp/include/embb/dataflow/internal/in.h index 84ad9a3..71034cb 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/in.h +++ b/dataflow_cpp/include/embb/dataflow/internal/in.h @@ -39,6 +39,8 @@ namespace embb { namespace dataflow { namespace internal { +class Scheduler; + template class Out; @@ -95,6 +97,10 @@ class In { lock_.Unlock(); #endif } + + void ReceiveInit(InitData * init_data) { + listener_->OnInit(init_data); + } }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/inputs.h b/dataflow_cpp/include/embb/dataflow/internal/inputs.h index 4f4430a..7fc87b6 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/inputs.h +++ b/dataflow_cpp/include/embb/dataflow/internal/inputs.h @@ -57,6 +57,7 @@ class Inputs @@ -70,6 +71,7 @@ class InputsOnClock(clock); } } + virtual void OnInit(InitData * init_data) { + if (--test_count_ == 0) { + listener_->OnInit(init_data); + } + } private: embb::base::Atomic count_[Slices]; + int test_count_; ClockListener * listener_; }; @@ -112,6 +120,7 @@ class InputsOnClock(clock); } } + virtual void OnInit(InitData * init_data) { + if (--test_count_ == 0) { + listener_->OnInit(init_data); + } + } private: embb::base::Atomic count_[Slices]; + int test_count_; ClockListener * listener_; }; @@ -158,6 +173,7 @@ class InputsOnClock(clock); } } + virtual void OnInit(InitData * init_data) { + if (--test_count_ == 0) { + listener_->OnInit(init_data); + } + } private: embb::base::Atomic count_[Slices]; + int test_count_; ClockListener * listener_; }; @@ -207,6 +229,7 @@ class Inputs Inputs() { for (int ii = 0; ii < Slices; ii++) count_[ii] = 4; + test_count_ = 4; } void SetListener(ClockListener * listener) { listener_ = listener; @@ -246,8 +269,14 @@ class Inputs listener_->OnClock(clock); } } + virtual void OnInit(InitData * init_data) { + if (--test_count_ == 0) { + listener_->OnInit(init_data); + } + } private: embb::base::Atomic count_[Slices]; + int test_count_; ClockListener * listener_; }; @@ -261,6 +290,7 @@ class Inputs Inputs() { for (int ii = 0; ii < Slices; ii++) count_[ii] = 5; + test_count_ = 5; } void SetListener(ClockListener * listener) { listener_ = listener; @@ -304,8 +334,14 @@ class Inputs listener_->OnClock(clock); } } + virtual void OnInit(InitData * init_data) { + if (--test_count_ == 0) { + listener_->OnInit(init_data); + } + } private: embb::base::Atomic count_[Slices]; + int test_count_; ClockListener * listener_; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/node.h b/dataflow_cpp/include/embb/dataflow/internal/node.h index 122750e..ab47fee 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/node.h +++ b/dataflow_cpp/include/embb/dataflow/internal/node.h @@ -30,6 +30,7 @@ #include #include #include +#include namespace embb { namespace dataflow { @@ -46,12 +47,13 @@ class Node { EMBB_THROW(embb::base::ErrorException, "Nodes are started implicitly."); } - void SetScheduler(Scheduler * sched) { sched_ = sched; } + virtual void Init(InitData * init_data) = 0; protected: Scheduler * sched_; static int next_process_id_; + void SetScheduler(Scheduler * sched) { sched_ = sched; } static int GetNextProcessID() { return next_process_id_++; } }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/out.h b/dataflow_cpp/include/embb/dataflow/internal/out.h index 382f6ad..9c2f1a1 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/out.h +++ b/dataflow_cpp/include/embb/dataflow/internal/out.h @@ -35,6 +35,8 @@ namespace embb { namespace dataflow { namespace internal { +class Scheduler; + template class Out { public: @@ -50,6 +52,12 @@ class Out { } } + void SendInit(InitData * init_data) { + for (size_t ii = 0; ii < targets_.size(); ii++) { + targets_[ii]->ReceiveInit(init_data); + } + } + void Connect(InType & input) { if (input.IsConnected()) { EMBB_THROW(embb::base::ErrorException, diff --git a/dataflow_cpp/include/embb/dataflow/internal/process.h b/dataflow_cpp/include/embb/dataflow/internal/process.h index 581f162..4f5bc3f 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process.h @@ -78,6 +78,11 @@ class Process< Slices, Serial, Inputs, executor_.Execute(clock, inputs_, outputs_); } + virtual void Init(InitData * init_data) { + SetScheduler(init_data->sched); + executor_.Init(init_data, outputs_); + } + InputsType & GetInputs() { return inputs_; } @@ -142,6 +147,10 @@ class Process< Slices, Serial, Inputs, } } + virtual void OnInit(InitData * init_data) { + Init(init_data); + } + private: InputsType inputs_; OutputsType outputs_; diff --git a/dataflow_cpp/include/embb/dataflow/internal/process_executor.h b/dataflow_cpp/include/embb/dataflow/internal/process_executor.h index 7756722..4219cea 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process_executor.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process_executor.h @@ -62,6 +62,10 @@ class ProcessExecutor< Inputs, Outputs > { } } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -91,6 +95,11 @@ class ProcessExecutor< Inputs, Outputs > { } } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + outputs.template Get<1>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -124,6 +133,12 @@ class ProcessExecutor< Inputs, Outputs > { } } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + outputs.template Get<1>().SendInit(init_data); + outputs.template Get<2>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -161,6 +176,13 @@ class ProcessExecutor< Inputs, Outputs > { } } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + outputs.template Get<1>().SendInit(init_data); + outputs.template Get<2>().SendInit(init_data); + outputs.template Get<3>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -189,6 +211,10 @@ class ProcessExecutor< Inputs, Outputs > { } } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -220,6 +246,11 @@ class ProcessExecutor< Inputs, Outputs > { } } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + outputs.template Get<1>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -255,6 +286,12 @@ class ProcessExecutor< Inputs, Outputs > { } } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + outputs.template Get<1>().SendInit(init_data); + outputs.template Get<2>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -284,6 +321,10 @@ class ProcessExecutor< Inputs, Outputs > { } } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -317,6 +358,11 @@ class ProcessExecutor< Inputs, Outputs > { } } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + outputs.template Get<1>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -348,6 +394,10 @@ class ProcessExecutor< Inputs, Outputs > { } } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + } + private: FunctionType function_; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/select.h b/dataflow_cpp/include/embb/dataflow/internal/select.h index 423e2a5..f3a499f 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/select.h +++ b/dataflow_cpp/include/embb/dataflow/internal/select.h @@ -81,6 +81,11 @@ class Select } } + virtual void Init(InitData * init_data) { + SetScheduler(init_data->sched); + GetOutput<0>().SendInit(init_data); + } + InputsType & GetInputs() { return inputs_; } @@ -112,6 +117,10 @@ class Select Run(clock); } + virtual void OnInit(InitData * init_data) { + Init(init_data); + } + private: InputsType inputs_; OutputsType outputs_; diff --git a/dataflow_cpp/include/embb/dataflow/internal/sink.h b/dataflow_cpp/include/embb/dataflow/internal/sink.h index b0b4063..2c835ed 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/sink.h +++ b/dataflow_cpp/include/embb/dataflow/internal/sink.h @@ -72,6 +72,12 @@ class Sink< Slices, Inputs > listener_->OnClock(clock); } + virtual void Init(InitData * init_data) { + SetListener(init_data->sink_listener); + SetScheduler(init_data->sched); + listener_->OnInit(init_data); + } + InputsType & GetInputs() { return inputs_; } @@ -115,6 +121,10 @@ class Sink< Slices, Inputs > } } + virtual void OnInit(InitData * init_data) { + Init(init_data); + } + private: InputsType inputs_; ExecutorType executor_; diff --git a/dataflow_cpp/include/embb/dataflow/internal/source.h b/dataflow_cpp/include/embb/dataflow/internal/source.h index 232a50e..72f198b 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source.h @@ -59,6 +59,11 @@ class Source< Slices, Outputs > not_done_ = executor_.Execute(clock, outputs_); } + virtual void Init(InitData * init_data) { + SetScheduler(init_data->sched); + executor_.Init(init_data, outputs_); + } + virtual bool Start(int clock) { if (not_done_) { Run(clock); diff --git a/dataflow_cpp/include/embb/dataflow/internal/source_executor.h b/dataflow_cpp/include/embb/dataflow/internal/source_executor.h index 2e37faf..1461285 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source_executor.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source_executor.h @@ -36,6 +36,8 @@ namespace embb { namespace dataflow { namespace internal { +class Scheduler; + template class SourceExecutor; @@ -55,6 +57,10 @@ class SourceExecutor< Outputs > { return result; } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -77,6 +83,11 @@ class SourceExecutor< Outputs > { return result; } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + outputs.template Get<1>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -101,6 +112,12 @@ class SourceExecutor< Outputs > { return result; } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + outputs.template Get<1>().SendInit(init_data); + outputs.template Get<2>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -127,6 +144,13 @@ class SourceExecutor< Outputs > { return result; } + void Init(InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + outputs.template Get<1>().SendInit(init_data); + outputs.template Get<2>().SendInit(init_data); + outputs.template Get<3>().SendInit(init_data); + } + private: FunctionType function_; }; @@ -156,6 +180,15 @@ class SourceExecutor< Outputs > { return result; } + void Init( + InitData * init_data, Outputs & outputs) { + outputs.template Get<0>().SendInit(init_data); + outputs.template Get<1>().SendInit(init_data); + outputs.template Get<2>().SendInit(init_data); + outputs.template Get<3>().SendInit(init_data); + outputs.template Get<4>().SendInit(init_data); + } + private: FunctionType function_; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/switch.h b/dataflow_cpp/include/embb/dataflow/internal/switch.h index 7a1eb13..3bf36ee 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/switch.h +++ b/dataflow_cpp/include/embb/dataflow/internal/switch.h @@ -78,6 +78,12 @@ class Switch } } + virtual void Init(InitData * init_data) { + SetScheduler(init_data->sched); + GetOutput<0>().SendInit(init_data); + GetOutput<1>().SendInit(init_data); + } + InputsType & GetInputs() { return inputs_; } @@ -109,6 +115,10 @@ class Switch Run(clock); } + virtual void OnInit(InitData * init_data) { + Init(init_data); + } + private: InputsType inputs_; OutputsType outputs_; diff --git a/dataflow_cpp/include/embb/dataflow/network.h b/dataflow_cpp/include/embb/dataflow/network.h index ffe5633..33dbaff 100644 --- a/dataflow_cpp/include/embb/dataflow/network.h +++ b/dataflow_cpp/include/embb/dataflow/network.h @@ -246,13 +246,6 @@ class Network { }; /** - * Adds a new serial process to the network. - * \param proc The process to add. - */ - template - void Add(SerialProcess & proc); - - /** * Generic parallel process template. * * Implements a generic parallel process in the network that may have one to @@ -334,13 +327,6 @@ class Network { }; /** - * Adds a new parallel process to the network. - * \param proc The process to add. - */ - template - void Add(ParallelProcess & proc); - - /** * Switch process template. * * A switch has 2 inputs and 2 outputs. Input port 0 is of type boolean and @@ -412,13 +398,6 @@ class Network { }; /** - * Adds a new switch process to the network. - * \param sw The switch process to add. - */ - template - void Add(Switch & sw); - - /** * Select process template. * * A select has 3 inputs and 1 output. Input port 0 is of type boolean and @@ -490,13 +469,6 @@ class Network { }; /** - * Adds a new select process to the network. - * \param sel The select process to add. - */ - template - void Add(Select & sel); - - /** * Sink process template. * * A sink marks the end of a particular processing chain. It can have one to @@ -557,13 +529,6 @@ class Network { }; /** - * Adds a new sink process to the network. - * \param sink The sink process to add. - */ - template - void Add(Sink & sink); - - /** * Source process template. * * A source marks the start of a processing chain. It can have one to five @@ -635,7 +600,7 @@ class Network { * \param source The source process to add. */ template - void Add(Source & source); + void AddSource(Source & source); /** * Constant source process template. @@ -694,7 +659,7 @@ class Network { * \param source The constant source process to add. */ template - void Add(ConstantSource & source); + void AddSource(ConstantSource & source); /** * Executes the network until one of the the sources returns \c false. @@ -748,11 +713,6 @@ class Network : public internal::ClockListener { } }; - template - void Add(SerialProcess & proc) { - processes_.push_back(&proc); - } - template class ParallelProcess; template < @@ -776,31 +736,16 @@ class Network : public internal::ClockListener { } }; - template - void Add(ParallelProcess & proc) { - processes_.push_back(&proc); - } - template class Switch : public internal::Switch { public: }; - template - void Add(Switch & sw) { - processes_.push_back(&sw); - } - template class Select : public internal::Select { public: }; - template - void Add(Select & sel) { - processes_.push_back(&sel); - } - template - void Add(Sink & sink) { - sink.SetListener(this); - sinks_.push_back(&sink); - } - template - void Add(Source & source) { + void AddSource(Source & source) { sources_.push_back(&source); } @@ -857,7 +796,7 @@ class Network : public internal::ClockListener { }; template - void Add(ConstantSource & source) { + void AddSource(ConstantSource & source) { sources_.push_back(&source); } @@ -866,19 +805,20 @@ class Network : public internal::ClockListener { internal::SchedulerMTAPI sched_mtapi; internal::Scheduler * sched = &sched_mtapi; + internal::InitData init_data; + init_data.sched = sched; + init_data.sink_listener = this; + + sink_count_ = 0; for (size_t it = 0; it < sources_.size(); it++) - sources_[it]->SetScheduler(sched); - for (size_t it = 0; it < processes_.size(); it++) - processes_[it]->SetScheduler(sched); - for (size_t it = 0; it < sinks_.size(); it++) - sinks_[it]->SetScheduler(sched); + sources_[it]->Init(&init_data); - for (int ii = 0; ii < Slices; ii++) sink_count_[ii] = 0; + for (int ii = 0; ii < Slices; ii++) sink_counter_[ii] = 0; int clock = 0; while (clock >= 0) { const int idx = clock % Slices; - while (sink_count_[idx] > 0) embb::base::Thread::CurrentYield(); + while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield(); sched->WaitForSlice(idx); if (!SpawnClock(clock)) break; @@ -889,7 +829,7 @@ class Network : public internal::ClockListener { if (ii < 0) ii = 0; for (; ii < clock; ii++) { const int idx = ii % Slices; - while (sink_count_[idx] > 0) embb::base::Thread::CurrentYield(); + while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield(); sched->WaitForSlice(idx); } } @@ -902,17 +842,28 @@ class Network : public internal::ClockListener { */ virtual void OnClock(int clock) { const int idx = clock % Slices; - const int cnt = --sink_count_[idx]; + const int cnt = --sink_counter_[idx]; if (cnt < 0) EMBB_THROW(embb::base::ErrorException, "More sinks than expected signaled reception of given clock.") } + /** + * Internal. + * \internal + * Gets called when an init token has reached all sinks. + */ + virtual void OnInit(internal::InitData * /*sched*/) { + sink_count_++; + } + private: std::vector processes_; std::vector sources_; std::vector sinks_; - embb::base::Atomic sink_count_[Slices]; + embb::base::Atomic sink_counter_[Slices]; + int sink_count_; + #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY std::vector spawn_history_[Slices]; #endif @@ -923,7 +874,7 @@ class Network : public internal::ClockListener { #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY spawn_history_[idx].push_back(clock); #endif - sink_count_[idx] = static_cast(sinks_.size()); + sink_counter_[idx] = sink_count_; for (size_t kk = 0; kk < sources_.size(); kk++) { result &= sources_[kk]->Start(clock); } diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.cc b/dataflow_cpp/test/dataflow_cpp_test_simple.cc index c63a57e..bf5b386 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.cc +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.cc @@ -186,17 +186,8 @@ void SimpleTest::TestBasic() { sel.GetOutput<0>() >> sink.GetInput<0>(); - network.Add(constant); - network.Add(source); - - network.Add(filter); - network.Add(mult); - - network.Add(pred); - network.Add(sw); - network.Add(sel); - - network.Add(sink); + network.AddSource(constant); + network.AddSource(source); network(); diff --git a/doc/examples/dataflow/dataflow_add-snippet.h b/doc/examples/dataflow/dataflow_add-snippet.h index 70a1157..ce51ef8 100644 --- a/doc/examples/dataflow/dataflow_add-snippet.h +++ b/doc/examples/dataflow/dataflow_add-snippet.h @@ -1,3 +1 @@ - nw.Add(read); - nw.Add(replace); - nw.Add(write); + nw.AddSource(read); diff --git a/doc/examples/dataflow/dataflow_declare_add_sources-snippet.h b/doc/examples/dataflow/dataflow_declare_add_sources-snippet.h index 630dedd..f1cfcaa 100644 --- a/doc/examples/dataflow/dataflow_declare_add_sources-snippet.h +++ b/doc/examples/dataflow/dataflow_declare_add_sources-snippet.h @@ -14,7 +14,7 @@ source4( embb::base::MakeFunction(producer4, &Producer::Run) ); - nw.Add(source1); - nw.Add(source2); - nw.Add(source3); - nw.Add(source4); + nw.AddSource(source1); + nw.AddSource(source2); + nw.AddSource(source3); + nw.AddSource(source4); diff --git a/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc b/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc index 5708773..e7f419d 100644 --- a/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc +++ b/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc @@ -60,19 +60,11 @@ void RunDataflowNonLinear() { process4( embb::base::MakeFunction(comparator, &Comparator::Run) ), process5( embb::base::MakeFunction(comparator, &Comparator::Run) ); - nw.Add(process1); - nw.Add(process2); - nw.Add(process3); - nw.Add(process4); - nw.Add(process5); - Consumer consumer; Network::Sink sink1(embb::base::MakeFunction(consumer, &Consumer::Run)); - nw.Add(sink1); - source1.GetOutput<0>() >> process1.GetInput<0>(); source2.GetOutput<0>() >> process2.GetInput<0>(); source3.GetOutput<0>() >> process1.GetInput<1>(); diff --git a/doc/tutorial/content/dataflow.tex b/doc/tutorial/content/dataflow.tex index 7d09e6b..e7a07a1 100644 --- a/doc/tutorial/content/dataflow.tex +++ b/doc/tutorial/content/dataflow.tex @@ -142,7 +142,7 @@ is used to construct the sink: \emph{\textbf{Note:} If you parallelize your own application using \embb and your compiler emits a lengthy error message containing lots of templates, it is very likely that for at least one process, the ports and their directions do not match the signature of the given function.} -The network needs to know about the processes declared above, so we add them to our network: +The network needs to know about the source declared above, so we add it to our network: % \\\inputlisting{../examples/dataflow/dataflow_add-snippet.h} % @@ -303,11 +303,11 @@ The class-based approach has several advantages besides the use of templates: Fi Each instance of the class \lstinline|Network| maintains a list of source processes that belong to the network. % When you create a source process using \lstinline|MakeSource|, it is automatically added to this list. Otherwise, you must explicitly add it by a call to \lstinline|Add|. For example, if we want to feed our sorting network \lstinline|nw| with streams of integer values, we may write: -You must explicitly add all processes to the network by a call to \lstinline|Add|. For example, if we want to feed our sorting network \lstinline|nw| with four streams of integer values, we may write: +You must explicitly add all sources to the network by a call to \lstinline|AddSource|. For example, if we want to feed our sorting network \lstinline|nw| with four streams of integer values, we may write: % \\\inputlisting{../examples/dataflow/dataflow_declare_add_sources-snippet.h} % -%This is only necessary for source processes. All other processes are automatically found via a depth-first search starting from the source processes. +This is only necessary for source processes. All other processes are automatically found via a depth-first search starting from the source processes. The code for the comparators looks like this: %