/* * Copyright (c) 2014, Siemens AG. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef EMBB_DATAFLOW_NETWORK_H_ #define EMBB_DATAFLOW_NETWORK_H_ #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY #include #endif #include #include #include #include #include #include #include #include #include #include namespace embb { namespace dataflow { #ifdef DOXYGEN /** * Represents a set of processes, that are connected by communication channels. * * \tparam Slices Number of concurrently processed tokens. * \ingroup CPP_DATAFLOW */ template class Network { public: /** * Constructs an empty network. */ Network() {} /** * Input port class. */ template class In { }; /** * Output port class. */ template class Out { public: /** * Input port class that can be connected to this output port. */ typedef In InType; /** * Connects this output port to the input port \c input. * If the input port already was connected to a different * output an ErrorException is thrown. * \param input The input port to connect to. */ void Connect(InType & input); /** * Connects this output port to the input port \c input. * If the input port already was connected to a different * output an ErrorException is thrown. * \param input The input port to connect to. */ void operator >> (InType & input); }; /** * Provides the input port types for a process. * \tparam T1 Type of first port. * \tparam T2 Optional type of second port. * \tparam T3 Optional type of third port. * \tparam T4 Optional type of fourth port. * \tparam T5 Optional type of fifth port. */ template struct Inputs { /** * Type list used to derive input port types from Index. * \tparam Index The index of the input port type to query. */ template struct Types { /** * Result of an input port type query. * T_Index is T1 if Index is 0, T2 if Index is 1 and so on. */ typedef In Result; }; /** * \returns Reference to input port at Index. */ template typename Types::Result & Get(); }; /** * Provides the output port types for a process. * \tparam T1 Type of first port. * \tparam T2 Optional type of second port. * \tparam T3 Optional type of third port. * \tparam T4 Optional type of fourth port. * \tparam T5 Optional type of fifth port. */ template struct Outputs { /** * Type list used to derive output port types from Index. * \tparam Index The index of the output port type to query. */ template struct Types { /** * Result of an output port type query. * T_Index is T1 if Index is 0, T2 if Index is 1 and so on. */ typedef Out Result; }; /** * \returns Reference to output port at Index. */ template typename Types::Result & Get(); }; /** * Generic serial process template. * * Implements a generic serial process in the network that may have one to * four input ports and one to four output ports but no more that five total * ports. * Tokens are processed in order. * * \see Source, ParallelProcess, Sink, Switch, Select * * \tparam Inputs Inputs of the process. * \tparam Outputs Outputs of the process. */ template class SerialProcess { public: /** * Function type to use when processing tokens. */ typedef embb::base::Function FunctionType; /** * Input port type list. */ typedef Inputs InputsType; /** * Output port type list. */ typedef Outputs OutputsType; /** * Constructs a SerialProcess with a user specified processing function. * \param function The Function to call to process a token. */ explicit SerialProcess(FunctionType function); /** * \returns \c true if the SerialProcess has any inputs, \c false * otherwise. */ virtual bool HasInputs() const; /** * \returns Reference to a list of all input ports. */ InputsType & GetInputs(); /** * \returns Input port at Index. */ template typename InputsType::Types::Result & GetInput(); /** * \returns \c true if the SerialProcess has any outputs, \c false * otherwise. */ virtual bool HasOutputs() const; /** * \returns Reference to a list of all output ports. */ OutputsType & GetOutputs(); /** * \returns Output port at Index. */ template typename OutputsType::Types::Result & GetOutput(); }; /** * Adds a new serial process to the network. * \param proc The process to add. */ template void Add(SerialProcess & proc); /** * Generic parallel process template. * * Implements a generic parallel process in the network that may have one to * four input ports and one to four output ports but no more that five total * ports. * Tokens are processed as soon as all inputs for that token are complete. * * \see Source, SerialProcess, Sink, Switch, Select * * \tparam Inputs Inputs of the process. * \tparam Outputs Outputs of the process. */ template class ParallelProcess { public: /** * Function type to use when processing tokens. */ typedef embb::base::Function FunctionType; /** * Input port type list. */ typedef Inputs InputsType; /** * Output port type list. */ typedef Outputs OutputsType; /** * Constructs a ParallelProcess with a user specified processing function. * \param function The Function to call to process a token. */ explicit ParallelProcess(FunctionType function); /** * \returns \c true if the ParallelProcess has any inputs, \c false * otherwise. */ virtual bool HasInputs() const; /** * \returns Reference to a list of all input ports. */ InputsType & GetInputs(); /** * \returns Input port at Index. */ template typename InputsType::Types::Result & GetInput(); /** * \returns \c true if the ParallelProcess has any outputs, \c false * otherwise. */ virtual bool HasOutputs() const; /** * \returns Reference to a list of all output ports. */ OutputsType & GetOutputs(); /** * \returns Output port at Index. */ template typename OutputsType::Types::Result & GetOutput(); }; /** * Adds a new parallel process to the network. * \param proc The process to add. */ template void Add(ParallelProcess & proc); /** * Switch process template. * * A switch has 2 inputs and 2 outputs. Input port 0 is of type boolean and * selects to which output port the value of input port 1 of type \c Type * is sent. If input port 0 is set to true the value goes to output port 0 * and to output port 1 otherwise. * Tokens are processed as soon as all inputs for that token are complete. * * \see Select * * \tparam Type The type of input port 1 and output port 0 and 1. */ template class Switch { /** * Function type to use when processing tokens. */ typedef embb::base::Function FunctionType; /** * Input port type list. */ typedef Inputs InputsType; /** * Output port type list. */ typedef Outputs OutputsType; /** * \returns Always \c true. */ virtual bool HasInputs() const; /** * \returns Reference to a list of all input ports. */ InputsType & GetInputs(); /** * \returns Input port at Index. */ template typename InputsType::Types::Result & GetInput(); /** * \returns Always \c true. */ virtual bool HasOutputs() const; /** * \returns Reference to a list of all output ports. */ OutputsType & GetOutputs(); /** * \returns Output port at Index. */ template typename OutputsType::Types::Result & GetOutput(); }; /** * Adds a new switch process to the network. * \param sw The switch process to add. */ template void Add(Switch & sw); /** * Select process template. * * A select has 3 inputs and 1 output. Input port 0 is of type boolean and * selects which of input port 1 or 2 (of type \c Type) is sent to output * port 0 (of type \c Type). If input port 0 is set to true the value of * input port 1 is selected, otherwise the value of input port 2 is taken. * Tokens are processed as soon as all inputs for that token are complete. * * \see Switch * * \tparam Type The type of input port 1 and 2 and output port 0. */ template class Select { /** * Function type to use when processing tokens. */ typedef embb::base::Function FunctionType; /** * Input port type list. */ typedef Inputs InputsType; /** * Output port type list. */ typedef Outputs OutputsType; /** * \returns Always \c true. */ virtual bool HasInputs() const; /** * \returns Reference to a list of all input ports. */ InputsType & GetInputs(); /** * \returns Input port at Index. */ template typename InputsType::Types::Result & GetInput(); /** * \returns Always \c true. */ virtual bool HasOutputs() const; /** * \returns Reference to a list of all output ports. */ OutputsType & GetOutputs(); /** * \returns Output port at Index. */ template typename OutputsType::Types::Result & GetOutput(); }; /** * Adds a new select process to the network. * \param sel The select process to add. */ template void Add(Select & sel); /** * Sink process template. * * A sink marks the end of a particular processing chain. It can have one to * five input ports and no output ports. * Tokens are processed in order by the sink, regardless in which order they * arrive at the input ports. * * \see Source, SerialProcess, ParallelProcess * * \tparam I1 Type of first input port. * \tparam I2 Optional type of second input port. * \tparam I3 Optional type of third input port. * \tparam I4 Optional type of fourth input port. * \tparam I5 Optional type of fifth input port. */ template class Sink { public: /** * Function type to use when processing tokens. */ typedef embb::base::Function FunctionType; /** * Input port type list. */ typedef Inputs InputsType; /** * Constructs a Sink with a user specified processing function. * \param function The Function to call to process a token. */ explicit Sink(FunctionType function); /** * \returns Always \c true. */ virtual bool HasInputs() const; /** * \returns Reference to a list of all input ports. */ InputsType & GetInputs(); /** * \returns Input port at Index. */ template typename InputsType::Types::Result & GetInput(); /** * \returns Always \c false. */ virtual bool HasOutputs() const; }; /** * Adds a new sink process to the network. * \param sink The sink process to add. */ template void Add(Sink & sink); /** * Source process template. * * A source marks the start of a processing chain. It can have one to five * output ports and no input ports. * Tokens are emitted in order by the source. * * \see SerialProcess, ParallelProcess, Sink * * \tparam O1 Type of first output port. * \tparam O2 Optional type of second output port. * \tparam O3 Optional type of third output port. * \tparam O4 Optional type of fourth output port. * \tparam O5 Optional type of fifth output port. */ template class Source { public: /** * Function type to use when processing tokens. */ typedef embb::base::Function FunctionType; /** * Output port type list. */ typedef Outputs OutputsType; /** * Constructs a Source with a user specified processing function. * \param function The Function to call to emit a token. */ explicit Source(FunctionType function); /** * \returns Always \c false. */ virtual bool HasInputs() const; /** * \returns Always \c true. */ virtual bool HasOutputs() const; /** * \returns Reference to a list of all output ports. */ OutputsType & GetOutputs(); /** * \returns Output port at INDEX. */ template typename OutputsType::Types::Result & GetOutput(); }; /** * Adds a new source process to the network. * \param source The source process to add. */ template void Add(Source & source); /** * Constant source process template. * * A constant source has one output port and emits a constant value given * at construction time for each token. * * \tparam Type The type of output port 0. */ template class ConstantSource { public: /** * Output port type list. */ typedef Outputs OutputsType; /** * Constructs a ConstantSource with a value to emit on each token. * \param value The value to emit. */ explicit ConstantSource(Type value); /** * \returns Always \c false. */ virtual bool HasInputs() const; /** * \returns Always \c true. */ virtual bool HasOutputs() const; /** * \returns Reference to a list of all output ports. */ OutputsType & GetOutputs(); /** * \returns Output port at Index. */ template typename OutputsType::Types::Result & GetOutput(); }; /** * Adds a new constant source process to the network. * \param source The constant source process to add. */ template void Add(ConstantSource & source); /** * Executes the network for at most \c elements tokens. * \param elements Maximum number of tokens to process. */ void operator () (int elements); }; #else template class Network : public internal::ClockListener { public: Network() {} template struct Inputs { typedef internal::Inputs Type; }; template struct Outputs { typedef internal::Outputs Type; }; template class SerialProcess; template < typename I1, typename I2, typename I3, typename I4, typename I5, typename O1, typename O2, typename O3, typename O4, typename O5> class SerialProcess< internal::Inputs, internal::Outputs > : public internal::Process< Slices, true, internal::Inputs, internal::Outputs > { public: typedef typename internal::Process< Slices, true, internal::Inputs, internal::Outputs >::FunctionType FunctionType; explicit SerialProcess(FunctionType function) : internal::Process< Slices, true, internal::Inputs, internal::Outputs >(function) { //empty } }; template void Add(SerialProcess & proc) { processes_.push_back(&proc); } template class ParallelProcess; template < typename I1, typename I2, typename I3, typename I4, typename I5, typename O1, typename O2, typename O3, typename O4, typename O5> class ParallelProcess< internal::Inputs, internal::Outputs > : public internal::Process< Slices, false, internal::Inputs, internal::Outputs >{ public: typedef typename internal::Process< Slices, false, internal::Inputs, internal::Outputs >::FunctionType FunctionType; explicit ParallelProcess(FunctionType function) : internal::Process< Slices, false, internal::Inputs, internal::Outputs >(function) { //empty } }; template void Add(ParallelProcess & proc) { processes_.push_back(&proc); } template class Switch : public internal::Switch { public: }; template void Add(Switch & sw) { processes_.push_back(&sw); } template class Select : public internal::Select { public: }; template void Add(Select & sel) { processes_.push_back(&sel); } template class Sink : public internal::Sink > { public: typedef typename internal::Sink >::FunctionType FunctionType; explicit Sink(FunctionType function) : internal::Sink >(function) { //empty } }; template void Add(Sink & sink) { sink.SetListener(this); sinks_.push_back(&sink); } template class Source : public internal::Source > { public: typedef typename internal::Source >::FunctionType FunctionType; explicit Source(FunctionType function) : internal::Source >(function) { //empty } }; template void Add(Source & source) { sources_.push_back(&source); } template class ConstantSource : public internal::ConstantSource { public: explicit ConstantSource(Type value) : internal::ConstantSource(value) { //empty } }; template void Add(ConstantSource & source) { sources_.push_back(&source); } void operator () (int elements) { internal::SchedulerSequential sched_seq; internal::SchedulerMTAPI sched_mtapi; internal::Scheduler * sched = &sched_mtapi; for (size_t it = 0; it < sources_.size(); it++) sources_[it]->SetScheduler(sched); for (size_t it = 0; it < processes_.size(); it++) processes_[it]->SetScheduler(sched); for (size_t it = 0; it < sinks_.size(); it++) sinks_[it]->SetScheduler(sched); for (int ii = 0; ii < Slices; ii++) sink_count_[ii] = 0; for (int clock = 0; clock < elements; clock++) { const int idx = clock % Slices; while (sink_count_[idx] > 0) embb::base::Thread::CurrentYield(); sched->WaitForSlice(idx); SpawnClock(clock); } for (int ii = 0; ii < Slices; ii++) { while (sink_count_[ii] > 0) embb::base::Thread::CurrentYield(); sched->WaitForSlice(ii); } } /** * Internal. * \internal * Gets called when a token has reached all sinks and frees up the * corresponding slot, thus allowing a new token to be emitted. */ virtual void OnClock(int clock) { const int idx = clock % Slices; const int cnt = --sink_count_[idx]; if (cnt < 0) EMBB_THROW(embb::base::ErrorException, "More sinks than expected signaled reception of given clock.") } private: std::vector processes_; std::vector sources_; std::vector sinks_; embb::base::Atomic sink_count_[Slices]; #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY std::vector spawn_history_[Slices]; #endif void SpawnClock(int clock) { const int idx = clock % Slices; #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY spawn_history_[idx].push_back(clock); #endif sink_count_[idx] = static_cast(sinks_.size()); for (size_t kk = 0; kk < sources_.size(); kk++) { sources_[kk]->Start(clock); } } }; #endif // DOXYGEN } // namespace dataflow } // namespace embb #endif // EMBB_DATAFLOW_NETWORK_H_