\chapter{Dataflow}
\label{cha:dataflow}

With the \emph{Dataflow} building block, \embb provides generic skeletons for the development of parallel stream-based applications. These skeletons are based on dataflow process networks (DPNs), a model of computation which is, due to its simplicity and flexibility, widely employed in different domains like digital signal processing and imaging. As a major advantage, DPNs are deterministic which significantly simplifies testing and debugging. This is particularly important in safety-critical systems, where high demands are put on correctness and reliability. Moreover, DPNs are inherently parallel and hence, lend themselves well for execution on a multicore processor. In fact, they can be viewed as a generalization of pipelining, a frequently encountered parallel pattern.

%\begin{itemize}\setlength{\itemsep}{0pt}
% \item generic programming in the style of STL
% \item efficienct implementation using templates
% \item flexibility through pluggable components
% \item lock-free algorithms and data structures
% \item type safety and const correctness
% \item support for lambda functions and streaming operators
% \item abstraction from underlying hardware
% \item non-linear pipelines with multiple sources/sinks
% \item automatic memory management and load balancing
% \item inplace and non-inplace operations
% \item serial (in-order) and parallel (out-of-order) stages
% \item dynamic control of data streams (conditional execution)
%\end{itemize}

\section{Linear Pipelines}

Before we go into detail, we demonstrate the basic concepts of this building block by means of a simple application which finds and replaces strings in a file. Let us start with the sequential implementation. The program shown in Listing~\ref{lst:replace_seq} reads a file line by line and replaces each occurrence of a given string with a new string. The main part consists of the \lstinline|while| loop which performs three steps:

\begin{enumerate}\setlength{\itemsep}{0pt}
 \item read a line from \lstinline|file| and store it in the string \lstinline|str|
 \item replace each occurrence of \lstinline|what| in \lstinline|str| with \lstinline|with|
 \item write the resulting string to \lstinline|cout|
\end{enumerate}

\begin{lstlisting}[float,caption={Sequential program for replacing strings in a file},label={lst:replace_seq}]
#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib>

using namespace std;

// replace all ocurrences of 'what' in 'str' with 'with'
void repl(string& str, const string &what,
          const string& with) {
  string::size_type pos = 0;
  while((pos = str.find(what, pos)) != string::npos) {
    str.replace(pos, what.length(), with);
    pos += with.length();
  }
}

int main(int argc, char *argv[]) {
  // check and read command line arguments
  if(argc != 4) {
    cerr << "Usage: replace <what> <with> <file>" << endl;
    exit(EXIT_FAILURE);
  }
  const string what(argv[1]), with(argv[2]);

  // open input file
  ifstream file(argv[3]);
  if(!file) {
    cerr << "Cannot open file " << argv[3] << endl;
    exit(EXIT_FAILURE);
  }

  // read input file line by line and replace strings
  string str;
  while(getline(file, str)) {
    repl(str, what, with);
    cout << str << endl;
  }

  // close file and exit
  file.close();
  exit(EXIT_SUCCESS);
}
\end{lstlisting}

To run this program on a multicore processor, we may execute the above steps in a pipelined fashion. In this way, a new line can be read from the hard disk while the previous one is still being processed. Likewise, processing a string and writing the result to standard output can be performed in parallel. Thus, the pipeline may consist of three stages as depicted in Figure~\ref{fig:replace_par}.

\begin{figure}[!htb]
\centering%
\begin{tikzpicture}[box/.style={rectangle, draw, minimum width=2cm, minimum height=0.75cm, anchor=base}, arrow/.style={thick, ->, >=stealth'}]
\node (READ) [box] at (0,0) {};
\node (REPLACE) [box] at (3,0) {};
\node (WRITE) [box] at (6,0) {};
\node (FILE) [draw, cylinder, shape border rotate=90, minimum width=1.5cm, minimum height=1cm, anchor=base] at (0,-1.85) {};
\node (IO) [draw, trapezium, trapezium left angle=70, trapezium right angle=110, minimum width=1cm, minimum height=1cm, anchor=base] at (6,-1.75) {};
\node [anchor=mid] at (0,0) {\textbf{read}};
\node [anchor=mid] at (3,0) {\textbf{process}};
\node [anchor=mid] at (6,0) {\textbf{write}};
\draw [arrow] (FILE.north) -- (READ.south);
\draw [arrow] (READ.east) -- (REPLACE.west);
\draw [arrow] (REPLACE.east) -- (WRITE.west);
\draw [arrow] (WRITE.south) -- (IO.north);
\end{tikzpicture}
\caption{Pipeline for replacing strings in a file}
\label{fig:replace_par}
\end{figure}

This pipeline can be easily implemented using the Dataflow building block. As the first step, we have to include the \lstinline|dataflow.h| header file:
%
\\\inputlisting{../examples/dataflow/dataflow_include-snippet.h}
%
Then, we have to construct a \emph{network}. A network consists of a set of processes that are connected by communication channels.
%\footnote{Pipelines belong to the most simple networks, where the processes are connected in string-like (linear) fashion.}
%\embb provides a class template \lstinline|Network| that can be customized to your needs. For the moment, we are satisfied with the default configuration and omit the template arguments:
%
\\\inputlisting{../examples/dataflow/dataflow_network-snippet.h}
%
As the next step, we have to construct the processes shown in Figure~\ref{fig:replace_par}. The easiest way to construct a process is to wrap the user-defined code in a lambda function and to pass it to the network. The network constructs an object for that process and executes the lambda function whenever new data is available. There are several methods for constructing processes depending on their type. The process \textbf{read} is a \emph{source} process, since it produces data (by reading it from the specified file) but does not consume any data. Source processes are constructed from a function object
%
\\\inputlisting{../examples/dataflow/dataflow_source_function-snippet.h}
%
like this:
%
\\\inputlisting{../examples/dataflow/dataflow_declare_source-snippet.h}
%
%There are a couple of things to mention here. Firstly,
Note the template argument \lstinline|string| to \lstinline|Source|. This tells \embb that the process has exactly one \emph{port} of type \lstinline|string| and that this port is used to transmit data to other processes. The user-defined code can access the ports via the parameters of the function. Thus, each parameter corresponds to exactly one port. In our example, the result of the process is stored in a variable \lstinline|str|, which is passed by reference.
%Secondly, note that sources have to return a Boolean value. If the returned value is \lstinline|true|, the network will continue to execute the corresponding process. Otherwise, it will stop execution of the process and terminate when all previously produced data is processed.

The replacement of the strings can be done by a \emph{parallel} process, which means that multiple invocations of the process may be executed simultaneously. In general, processes that neither have any side effects nor maintain a state can safely be executed in parallel. This helps to avoid bottlenecks that arise when some processes are faster than others. Suppose, for example, that \textbf{replace} requires up to 50 ms to execute, whereas \textbf{read} and \textbf{write} each requires 10 ms to execute. If only one invocation of \textbf{replace} could be executed at a time, the throughput would be at most 20 elements per second. Since \textbf{replace} is a parallel process, however, the network may start a new invocation every 10 ms. Hence, up to five invocations may be executed in parallel, yielding a throughput of 100 elements per second. To compensate for variations in the runtime of parallel stages, \embb may execute them \emph{out-of-order}. As a result, the order in which the elements of a stream enter and leave parallel stages is not necessarily preserved. In our example, the runtime of \textbf{replace} may vary significantly due to the fact that not all lines have the same length and that the number of replacements depends on the content. However, let us now return to our example. The \textbf{replace} process is constructed from the function
%
\\\inputlisting{../examples/dataflow/dataflow_replace_function-snippet.h}
%
like this:
%
\\\inputlisting{../examples/dataflow/dataflow_declare_replace-snippet.h}
%
The template parameter \lstinline|Network::Inputs<string>| specifies that the process has one port serving as input. Analogously, \lstinline|Network::Outputs<string>| specifies that there is one port serving as output.

Since the last process (\textbf{write}) does not have any outputs, we make it a \emph{Sink}. Unlike parallel processes, sinks are always executed \emph{in-order}. \embb takes care that the elements are automatically reordered according to their original order in the stream. In this way, the externally visible behavior is preserved even if some parallel stages may be executed out-of-order. The function
%
\\\inputlisting{../examples/dataflow/dataflow_sink_function-snippet.h}
%
is used to construct the sink:
%
\\\inputlisting{../examples/dataflow/dataflow_declare_sink-snippet.h}
%
%In order to avoid that the received string is overwritten accidentally, the parameter \lstinline|str| corresponding to the input port of \lstinline|write| must be constant.\\

\emph{Note: If you parallelize your own application using \embb and your compiler emits a lengthy error message containing lots of templates, it is very likely that for at least one process, the ports and their directions do not match the signature of the given function.}

The network needs to know about the processes declared above, so we add them to our network:
%
\\\inputlisting{../examples/dataflow/dataflow_add-snippet.h}
%

As the last step, we have to connect the processes (ports). This is straightforward using the C++ stream operator:
%
\\\inputlisting{../examples/dataflow/dataflow_connect-snippet.h}
%
Once all connections have been established, we can start the network:
%
\\\inputlisting{../examples/dataflow/dataflow_run-snippet.h}
%
The integer passed to the network's function operator specifies the maximum number of elements that can be in the network at a time. The number of elements is limited to avoid that the network is flooded with new elements before the previous elements have been processed. In a linear pipeline, for example, this may happen if the source is faster than the sink. In our example, at most four elements may be processed simultaneously: one in the source, one in the sink, and two in the middle stage (see above). Finding an optimal value depends on the application and usually requires some experimentation. In general, large values boost the throughput but also increase the latency. Conversely, small values reduce the latency but may lead to a drop of performance in terms of throughput.

Note that you will probably not observe a speedup when you run this program on a multicore processor. One reason for this is that input$/$output operations like reading a file from the hard disk and writing the output to the screen are typically a bottleneck. Moreover, the amount of work done in the middle stage of the pipeline (\textbf{replace}) is rather low. To outweigh the overhead for parallel execution, the amount of work must be much higher. In image processing, for example, a single pipeline stage may process a complete image. To sum up, we haven chosen this example for its simplicity, not for its efficiency.

% ----------------------------------------------------------------------
\section{Nonlinear Pipelines}
% ----------------------------------------------------------------------

Some applications exhibit a more complex structure than the linear pipeline presented in the previous section. Typical examples are applications where the result of a pipeline stage is used by more than one successor stage. Such pipelines are said to be nonlinear. In principle, every nonlinear pipeline can be transformed to a linear one as depicted in Figure~\ref{fig:linearization}. However, this increases the latency and complicates the implementation due to data that must be passed through intermediate stages. In Figure~\ref{fig:linearization}, for example, the data transferred from stage A to stage C must be passed through stage B in the linearized implementation.

\begin{figure}[!htb]
\centering%
\begin{tikzpicture}
\matrix[column sep=1cm, row sep=0cm] {
  \dpnser{A}{A}; & & \dpnser{D}{D}; \\
  & \dpnser{C}{C}; & \\
  \dpnser{B}{B}; & & \dpnser{E}{E}; \\
};
\draw [edgestyle] (A.east) -- (C);
\draw [edgestyle] (B.east) -- (C);
\draw [edgestyle] (C) -- (D.west);
\draw [edgestyle] (C) -- (E.west);
\end{tikzpicture}
\hspace{1cm}
\begin{tikzpicture}
\matrix[column sep=1cm, row sep=0cm] {
  \dpnser{A}{A}; & & \dpnser{D}{D}; \\
  & \dpnser{C}{C}; & \\
  \dpnser{B}{B}; & & \dpnser{E}{E}; \\
};
\draw [edgestyle] (A) -- (B);
\draw [edgestyle] (B.east) -- (C);
\draw [edgestyle] (C) -- (D.west);
\draw [edgestyle] (D) -- (E);
\end{tikzpicture}
\caption{Nonlinear pipeline and linearized variant}
\label{fig:linearization}
\end{figure}

Nonlinear pipelines can be implemented as they are using \embb, i.e., there is need not linearize them. As an example, let us consider the implementation of a sorting network. Sorting networks consist of a set of interconnected comparators and are used to sort sequences of data items. As depicted in Figure~\ref{fig:comparator}, each comparator sorts a pair of values by putting the smaller value to one output, and the larger one to the other output. Thus, a comparator can be viewed as a switch that transfers the values at the inputs to the outputs, either directly or by swapping them (cf.~Figure~\ref{fig:example_comparator}).

\begin{figure}[!htb]
\centering%
\begin{tikzpicture}[comparator/.style={rectangle, draw, minimum width=1cm, minimum height=2cm, anchor=south west}, arrow/.style={thick, ->, >=stealth'}]
\node [comparator] at (1,0) {};
\node [left] at (0.25,1.5) {$a$};
\node [left] at (0.25,0.5) {$b$};
\node [right] at (2.75,1.5) {$\min(a,b)$};
\node [right] at (2.75,0.5) {$\max(a,b)$};
\draw [arrow] (0.25,0.5) -- (1,0.5);
\draw [arrow] (0.25,1.5) -- (1,1.5);
\draw [arrow] (2,0.5) -- (2.75,0.5);
\draw [arrow] (2,1.5) -- (2.75,1.5);
\end{tikzpicture}
\caption{Block diagram of a comparator}
\label{fig:comparator}
\end{figure}

\begin{figure}[!htb]
\centering%
\begin{tikzpicture}[comparator/.style={rectangle, draw, minimum width=1cm, minimum height=2cm, anchor=south west}, line/.style={thick, dashed}, arrow/.style={thick, ->, >=stealth'}]
\node [comparator] at (1,0) {};
\node [left] at (0.25,1.5) {$1$};
\node [left] at (0.25,0.5) {$3$};
\node [right] at (2.75,1.5) {$1$};
\node [right] at (2.75,0.5) {$3$};
\draw [arrow] (0.25,0.5) -- (1,0.5);
\draw [arrow] (0.25,1.5) -- (1,1.5);
\draw [arrow] (2,0.5) -- (2.75,0.5);
\draw [arrow] (2,1.5) -- (2.75,1.5);
\draw [line] (1,1.5) -- (2,1.5);
\draw [line] (1,0.5) -- (2,0.5);
\end{tikzpicture}
\hspace{2cm}
\begin{tikzpicture}[comparator/.style={rectangle, draw, minimum width=1cm, minimum height=2cm, anchor=south west}, line/.style={thick, dashed}, arrow/.style={thick, ->, >=stealth'}]
\node [comparator] at (1,0) {};
\node [left] at (0.25,1.5) {$5$};
\node [left] at (0.25,0.5) {$2$};
\node [right] at (2.75,1.5) {$2$};
\node [right] at (2.75,0.5) {$5$};
\draw [arrow] (0.25,0.5) -- (1,0.5);
\draw [arrow] (0.25,1.5) -- (1,1.5);
\draw [arrow] (2,0.5) -- (2.75,0.5);
\draw [arrow] (2,1.5) -- (2.75,1.5);
\draw [line] (1,1.5) -- (2,0.5);
\draw [line] (1,0.5) -- (2,1.5);
\end{tikzpicture}
\caption{Example for the operating principle of a comparator}
\label{fig:example_comparator}
\end{figure}

Figure~\ref{fig:sorting_network} shows a sorting network with four inputs$/$outputs and five comparators. The numbers at the interconnections exemplify a ``run'' of the network. As can be seen from Figure~\ref{fig:sorting_network}, the comparators $C_1-C_4$ "sink" the largest value to the bottom and "float" the smallest value to the top. The final comparator $C_5$ simply sorts out the middle two values. In this way it is guaranteed that the values at the outputs occur in ascending order. %Note that the structure of the network makes it well-suited for sorting continuous streams of data in a pipelined fashion.

\begin{figure}[!htb]
\centering%
\begin{tikzpicture}[comparator/.style={rectangle, draw, minimum width=1cm, minimum height=2.5cm, anchor=south west}, line/.style={thick}, arrow/.style={thick, ->, >=stealth'}]
\node [comparator, minimum height=4cm] at (1,1.5) {$C_1$};
\node [comparator, minimum height=4cm] at (3,0) {$C_2$};
\node [comparator] at (5,3) {$C_3$};
\node [comparator] at (5,0) {$C_4$};
\node [comparator] at (7,1.5) {$C_5$};
\draw [arrow] (0,5) -- (1,5);
\draw [arrow] (2,5) -- (5,5);
\draw [arrow] (6,5) -- (9,5);
\draw [line] (0,3.5) -- (1,3.5);
\draw [arrow] (2,3.5) -- (3,3.5);
\draw [arrow] (4,3.5) -- (5,3.5);
\draw [arrow] (6,3.5) -- (7,3.5);
\draw [arrow] (8,3.5) -- (9,3.5);
\draw [arrow] (0,2) -- (1,2);
\draw [line] (2,2) -- (3,2);
\draw [arrow] (4,2) -- (5,2);
\draw [arrow] (6,2) -- (7,2);
\draw [arrow] (8,2) -- (9,2);
\draw [arrow] (0,0.5) -- (3,0.5);
\draw [arrow] (4,0.5) -- (5,0.5);
\draw [arrow] (6,0.5) -- (9,0.5);
\node [left] at (0,5.0) {$3$};
\node [left] at (0,3.5) {$2$};
\node [left] at (0,2.0) {$4$};
\node [left] at (0,0.5) {$1$};
\node [above] at (2.5,5.0) {$3$};
\node [above] at (4.5,3.5) {$1$};
\node [above] at (2.5,2.0) {$4$};
\node [above] at (4.5,0.5) {$2$};
\node [above] at (6.5,3.5) {$3$};
\node [above] at (6.5,2.0) {$2$};
\node [right] at (9,5.0) {$1$};
\node [right] at (9,3.5) {$2$};
\node [right] at (9,2.0) {$3$};
\node [right] at (9,0.5) {$4$};
\end{tikzpicture}
\caption{Sorting network with four inputs$/$outputs and five comparators}
\label{fig:sorting_network}
\end{figure}

Let us now consider the implementation of the sorting network using \embb. As in the previous example, we need three types of processes: one or more sources that produce a stream of data items, a total number of five processes that implement the comparators, and one or more sinks that consume the sorted sequences. The processes should be generic so that they can be used with different types. For example, in one application we might use the network to sort integers, and in another application to sort floating point values.

The following Listing shows the implementation of the source processes using classes instead of functions.\footnote{For the sake of brevity, we omit the functionality. A complete implementation can be found in the examples directory.}
%
\\\inputlisting{../examples/dataflow/dataflow_producer-snippet.h}
%
%In order to use an instance of a class as a process, it must be derived from one of the predfined base classes. 
The class-based approach has several advantages besides the use of templates: Firstly, the creation of multiple processes is straightforward. Secondly, one can derive other processes from a given base class such as \lstinline|Producer|. Thirdly, it eases migration of existing code. For example, if you want to use an object of an existing class \lstinline|foo| as a process, you might derive a class \lstinline|bar| from \lstinline|foo|. The function operator of \lstinline|bar| then has access to the members provided by \lstinline|foo|.
%Thirdly, it eases migration of existing code by means of multiple inheritance. For example, if you want to use an object of an existing class \lstinline|foo| as a process, you might derive a class \lstinline|bar| from \lstinline|foo| and \lstinline|Network::Source<...>| (or any other process class). The function operator of \lstinline|bar| then has access to the members provided by \lstinline|foo|.

Each instance of the class \lstinline|Network| maintains a list of source processes that belong to the network.
% When you create a source process using \lstinline|MakeSource|, it is automatically added to this list. Otherwise, you must explicitly add it by a call to \lstinline|Add|. For example, if we want to feed our sorting network \lstinline|nw| with streams of integer values, we may write:
You must explicitly add all processes to the network by a call to \lstinline|Add|. For example, if we want to feed our sorting network \lstinline|nw| with four streams of integer values, we may write:
%
\\\inputlisting{../examples/dataflow/dataflow_declare_add_sources-snippet.h}
%
%This is only necessary for source processes. All other processes are automatically found via a depth-first search starting from the source processes.

The code for the comparators looks like this:
%
\\\inputlisting{../examples/dataflow/dataflow_comparator-snippet.h}
%
Since the comparators neither have any side effects nor maintain a state, we allow multiple invocations to be executed in parallel.
% by deriving the class \lstinline|comparator| from the base class \lstinline|network<>::parallel|.

%In the above implementation of \lstinline|Comparator|, the inputs \lstinline|a| and \lstinline|b| are passed by value, which works fine if \lstinline|T| is a scalar type such as an integer or a float. However, since objects are passed by reference for efficiency reasons (unless otherwise specified), we cannot use classes as template argument for \lstinline|comparator|. To solve this problem, the classes \lstinline|in|, \lstinline|out|, and \lstinline|inout| define the parameter type that matches the given data type. As an example, \lstinline|in<int>::parameter_type| is simply \lstinline|int|, while \lstinline|in<foo>::parameter_type| yields \lstinline|const foo&| for a class \lstinline|foo|. A fully generic implementation of \lstinline|comparator| is shwon in Listing~\ref{lst:comparator_gen} (the objects of type \lstinline|T| must be comparable in order to compute the minimum and the maximum).
%
%\begin{lstlisting}[caption={Fully generic comparator process},label={lst:comparator_gen}]
%template <typename T>
%class Comparator {
%public:
%  void Run(const T& a, const T& b, T& x, T& y) const {
%    x = min(a,b);
%    y = max(a,b);
%  }
%};
%\end{lstlisting}

To check whether the resulting values are sorted, we use a single sink with four inputs:
%
\\\inputlisting{../examples/dataflow/dataflow_consumer-snippet.h}
%
In general, however, we could also have a sink for each output of the sorting network. There is no restriction on the number of sources and sinks a network may have.