From 4dadd0cdcedc0cc97d9281d249520bf489cad687 Mon Sep 17 00:00:00 2001 From: Marcus Winter Date: Thu, 10 Mar 2016 17:00:18 +0100 Subject: [PATCH] dataflow_cpp: removed Network::Make for simpler usage, added Network:IsValid to check network for errors tutorial: adapted to reflect changes to dataflow_cpp examples: adapted to reflect changes to dataflow_cpp --- dataflow_cpp/include/embb/dataflow/internal/clock_listener.h | 10 ---------- dataflow_cpp/include/embb/dataflow/internal/constant_source.h | 10 ++++++---- dataflow_cpp/include/embb/dataflow/internal/in.h | 5 ----- dataflow_cpp/include/embb/dataflow/internal/inputs.h | 148 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------- dataflow_cpp/include/embb/dataflow/internal/node.h | 2 +- dataflow_cpp/include/embb/dataflow/internal/out.h | 10 ++++------ dataflow_cpp/include/embb/dataflow/internal/outputs.h | 10 ++++++++++ dataflow_cpp/include/embb/dataflow/internal/process.h | 31 +++++++++++++------------------ dataflow_cpp/include/embb/dataflow/internal/process_executor.h | 50 -------------------------------------------------- dataflow_cpp/include/embb/dataflow/internal/select.h | 18 ++++++++---------- dataflow_cpp/include/embb/dataflow/internal/sink.h | 33 +++++++++++++++------------------ dataflow_cpp/include/embb/dataflow/internal/source.h | 8 ++++---- dataflow_cpp/include/embb/dataflow/internal/source_executor.h | 33 --------------------------------- dataflow_cpp/include/embb/dataflow/internal/switch.h | 17 +++++++---------- dataflow_cpp/include/embb/dataflow/network.h | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------------- dataflow_cpp/test/dataflow_cpp_test_simple.cc | 39 +++++++++++++++++++++------------------ doc/examples/dataflow/dataflow_add-snippet.h | 1 - doc/examples/dataflow/dataflow_declare_add_sources-snippet.h | 9 ++++----- doc/examples/dataflow/dataflow_declare_replace-snippet.h | 6 +++--- doc/examples/dataflow/dataflow_declare_sink-snippet.h | 2 +- doc/examples/dataflow/dataflow_declare_source-snippet.h | 2 +- doc/examples/dataflow/dataflow_linear-fragmented.cc | 3 +-- doc/examples/dataflow/dataflow_make-snippet.h | 2 +- doc/examples/dataflow/dataflow_network-snippet.h | 1 - doc/examples/dataflow/dataflow_nonlinear-fragmented.cc | 23 ++++++++++++++--------- doc/examples/dataflow/dataflow_run-snippet.h | 2 +- doc/tutorial/content/dataflow.tex | 13 ++++--------- tasks_cpp/src/node.cc | 6 +++--- 28 files changed, 290 insertions(+), 388 deletions(-) delete mode 100644 doc/examples/dataflow/dataflow_add-snippet.h diff --git a/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h b/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h index 35b27e2..f814d5b 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h +++ b/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h @@ -31,20 +31,10 @@ namespace embb { namespace dataflow { namespace internal { -class Scheduler; -class ClockListener; - -struct InitData { - int slices; - Scheduler * sched; - ClockListener * sink_listener; -}; - class ClockListener { public: virtual ~ClockListener() {} virtual void OnClock(int /*clock*/) = 0; - virtual void OnInit(InitData * /*sched*/) = 0; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/constant_source.h b/dataflow_cpp/include/embb/dataflow/internal/constant_source.h index 4830b5d..6b11039 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/constant_source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/constant_source.h @@ -46,7 +46,10 @@ class ConstantSource Type value_; public: - explicit ConstantSource(Type value) : value_(value) {} + explicit ConstantSource(Network & network, Type value) + : value_(value) { + SetScheduler(network.GetScheduler()); + } virtual bool HasOutputs() const { return outputs_.Size() > 0; @@ -56,9 +59,8 @@ class ConstantSource GetOutput<0>().Send(Signal(clock, value_)); } - virtual void Init(InitData * init_data) { - SetScheduler(init_data->sched); - GetOutput<0>().SendInit(init_data); + virtual bool IsFullyConnected() { + return outputs_.IsFullyConnected(); } virtual bool Start(int clock) { diff --git a/dataflow_cpp/include/embb/dataflow/internal/in.h b/dataflow_cpp/include/embb/dataflow/internal/in.h index efc5f8a..ec3d476 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/in.h +++ b/dataflow_cpp/include/embb/dataflow/internal/in.h @@ -113,11 +113,6 @@ class In { lock_.Unlock(); #endif } - - void ReceiveInit(InitData * init_data) { - SetSlices(init_data->slices); - listener_->OnInit(init_data); - } }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/inputs.h b/dataflow_cpp/include/embb/dataflow/internal/inputs.h index c8262e3..aef0df4 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/inputs.h +++ b/dataflow_cpp/include/embb/dataflow/internal/inputs.h @@ -56,7 +56,9 @@ class Inputs @@ -67,8 +69,14 @@ class Inputs , public ClockListener { public: - Inputs() : count_(NULL) { - test_count_ = 1; + explicit Inputs(int slices) : count_(NULL), slices_(slices) { + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 1; + } + this->template Get<0>().SetSlices(slices_); } ~Inputs() { if (NULL != count_) { @@ -98,21 +106,11 @@ class InputsOnClock(clock); } } - virtual void OnInit(InitData * init_data) { - if (--test_count_ == 0) { - slices_ = init_data->slices; - count_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - count_[ii] = 1; - } - listener_->OnInit(init_data); - } + bool IsFullyConnected() { + return this->template Get<0>().IsConnected(); } private: embb::base::Atomic * count_; - int test_count_; ClockListener * listener_; int slices_; }; @@ -124,8 +122,15 @@ class Inputs , public ClockListener { public: - Inputs() : count_(NULL) { - test_count_ = 2; + explicit Inputs(int slices) : count_(NULL), slices_(slices) { + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 2; + } + this->template Get<0>().SetSlices(slices_); + this->template Get<1>().SetSlices(slices_); } ~Inputs() { if (NULL != count_) { @@ -159,21 +164,12 @@ class InputsOnClock(clock); } } - virtual void OnInit(InitData * init_data) { - if (--test_count_ == 0) { - slices_ = init_data->slices; - count_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - count_[ii] = 2; - } - listener_->OnInit(init_data); - } + bool IsFullyConnected() { + return this->template Get<0>().IsConnected() & + this->template Get<1>().IsConnected(); } - private: +private: embb::base::Atomic * count_; - int test_count_; ClockListener * listener_; int slices_; }; @@ -185,8 +181,16 @@ class Inputs , public ClockListener { public: - Inputs() : count_(NULL) { - test_count_ = 3; + explicit Inputs(int slices) : count_(NULL), slices_(slices) { + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 3; + } + this->template Get<0>().SetSlices(slices_); + this->template Get<1>().SetSlices(slices_); + this->template Get<2>().SetSlices(slices_); } ~Inputs() { if (NULL != count_) { @@ -224,21 +228,13 @@ class InputsOnClock(clock); } } - virtual void OnInit(InitData * init_data) { - if (--test_count_ == 0) { - slices_ = init_data->slices; - count_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - count_[ii] = 3; - } - listener_->OnInit(init_data); - } + bool IsFullyConnected() { + return this->template Get<0>().IsConnected() & + this->template Get<1>().IsConnected() & + this->template Get<2>().IsConnected(); } - private: +private: embb::base::Atomic * count_; - int test_count_; ClockListener * listener_; int slices_; }; @@ -249,8 +245,17 @@ class Inputs In, embb::base::internal::Nil> , public ClockListener { public: - Inputs() : count_(NULL) { - test_count_ = 4; + explicit Inputs(int slices) : count_(NULL), slices_(slices) { + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 4; + } + this->template Get<0>().SetSlices(slices_); + this->template Get<1>().SetSlices(slices_); + this->template Get<2>().SetSlices(slices_); + this->template Get<3>().SetSlices(slices_); } ~Inputs() { if (NULL != count_) { @@ -292,21 +297,14 @@ class Inputs listener_->OnClock(clock); } } - virtual void OnInit(InitData * init_data) { - if (--test_count_ == 0) { - slices_ = init_data->slices; - count_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - count_[ii] = 4; - } - listener_->OnInit(init_data); - } + bool IsFullyConnected() { + return this->template Get<0>().IsConnected() & + this->template Get<1>().IsConnected() & + this->template Get<2>().IsConnected() & + this->template Get<3>().IsConnected(); } private: embb::base::Atomic * count_; - int test_count_; ClockListener * listener_; int slices_; }; @@ -318,8 +316,18 @@ class Inputs In, In > , public ClockListener { public: - Inputs() : count_(NULL) { - test_count_ = 5; + explicit Inputs(int slices) : count_(NULL), slices_(slices) { + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 5; + } + this->template Get<0>().SetSlices(slices_); + this->template Get<1>().SetSlices(slices_); + this->template Get<2>().SetSlices(slices_); + this->template Get<3>().SetSlices(slices_); + this->template Get<4>().SetSlices(slices_); } ~Inputs() { if (NULL != count_) { @@ -365,21 +373,15 @@ class Inputs listener_->OnClock(clock); } } - virtual void OnInit(InitData * init_data) { - if (--test_count_ == 0) { - slices_ = init_data->slices; - count_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - count_[ii] = 5; - } - listener_->OnInit(init_data); - } + bool IsFullyConnected() { + return this->template Get<0>().IsConnected() && + this->template Get<1>().IsConnected() & + this->template Get<2>().IsConnected() & + this->template Get<3>().IsConnected() & + this->template Get<4>().IsConnected(); } private: embb::base::Atomic * count_; - int test_count_; ClockListener * listener_; int slices_; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/node.h b/dataflow_cpp/include/embb/dataflow/internal/node.h index 55c5526..5dfebd8 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/node.h +++ b/dataflow_cpp/include/embb/dataflow/internal/node.h @@ -43,11 +43,11 @@ class Node { virtual bool HasInputs() const { return false; } virtual bool HasOutputs() const { return false; } virtual void Run(int clock) = 0; + virtual bool IsFullyConnected() = 0; virtual bool Start(int /*clock*/) { EMBB_THROW(embb::base::ErrorException, "Nodes are started implicitly."); } - virtual void Init(InitData * init_data) = 0; protected: Scheduler * sched_; diff --git a/dataflow_cpp/include/embb/dataflow/internal/out.h b/dataflow_cpp/include/embb/dataflow/internal/out.h index 9268780..3f5aea4 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/out.h +++ b/dataflow_cpp/include/embb/dataflow/internal/out.h @@ -52,12 +52,6 @@ class Out { } } - void SendInit(InitData * init_data) { - for (size_t ii = 0; ii < targets_.size(); ii++) { - targets_[ii]->ReceiveInit(init_data); - } - } - void Connect(InType & input) { if (input.IsConnected()) { EMBB_THROW(embb::base::ErrorException, @@ -72,6 +66,10 @@ class Out { Connect(input); } + bool IsConnected() const { + return targets_.size() > 0; + } + private: std::vector< InType * > targets_; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/outputs.h b/dataflow_cpp/include/embb/dataflow/internal/outputs.h index a4a45cc..2fce315 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/outputs.h +++ b/dataflow_cpp/include/embb/dataflow/internal/outputs.h @@ -50,6 +50,9 @@ class Outputs { public: + bool IsFullyConnected() { + return true; + } }; template @@ -59,6 +62,9 @@ class Outputs { public: + bool IsFullyConnected() { + return this->template Get<0>().IsConnected(); + } }; template @@ -67,6 +73,10 @@ class Outputs, Out, embb::base::internal::Nil, embb::base::internal::Nil, embb::base::internal::Nil> { public: + bool IsFullyConnected() { + return this->template Get<0>().IsConnected() && + this->template Get<1>().IsConnected(); + } }; template diff --git a/dataflow_cpp/include/embb/dataflow/internal/process.h b/dataflow_cpp/include/embb/dataflow/internal/process.h index 800cfe4..9d05881 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process.h @@ -53,10 +53,11 @@ class Process< Serial, Inputs, typedef ProcessExecutor< InputsType, OutputsType > ExecutorType; typedef typename ExecutorType::FunctionType FunctionType; - explicit Process(FunctionType function) - : executor_(function) + explicit Process(Network & network, FunctionType function) + : inputs_(network.GetSlices()) + , executor_(function) , action_(NULL) - , slices_(0) { + , slices_(network.GetSlices()) { next_clock_ = 0; queued_clock_ = 0; bool ordered = Serial; @@ -66,6 +67,13 @@ class Process< Serial, Inputs, queue_id_ = 0; } inputs_.SetListener(this); + action_ = reinterpret_cast( + embb::base::Allocation::Allocate( + sizeof(Action)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + action_[ii] = Action(); + } + SetScheduler(network.GetScheduler()); } ~Process() { @@ -86,17 +94,8 @@ class Process< Serial, Inputs, executor_.Execute(clock, inputs_, outputs_); } - virtual void Init(InitData * init_data) { - slices_ = init_data->slices; - //inputs_.SetSlices(init_data->slices); - action_ = reinterpret_cast( - embb::base::Allocation::Allocate( - sizeof(Action)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - action_[ii] = Action(); - } - SetScheduler(init_data->sched); - executor_.Init(init_data, outputs_); + virtual bool IsFullyConnected() { + return inputs_.IsFullyConnected() && outputs_.IsFullyConnected(); } InputsType & GetInputs() { @@ -161,10 +160,6 @@ class Process< Serial, Inputs, } } - virtual void OnInit(InitData * init_data) { - Init(init_data); - } - private: InputsType inputs_; OutputsType outputs_; diff --git a/dataflow_cpp/include/embb/dataflow/internal/process_executor.h b/dataflow_cpp/include/embb/dataflow/internal/process_executor.h index bb25a3e..5b82634 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process_executor.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process_executor.h @@ -62,10 +62,6 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -95,11 +91,6 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - outputs.template Get<1>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -133,12 +124,6 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - outputs.template Get<1>().SendInit(init_data); - outputs.template Get<2>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -176,13 +161,6 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - outputs.template Get<1>().SendInit(init_data); - outputs.template Get<2>().SendInit(init_data); - outputs.template Get<3>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -211,10 +189,6 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -246,11 +220,6 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - outputs.template Get<1>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -286,12 +255,6 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - outputs.template Get<1>().SendInit(init_data); - outputs.template Get<2>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -321,10 +284,6 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -358,11 +317,6 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - outputs.template Get<1>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -394,10 +348,6 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - } - private: FunctionType function_; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/select.h b/dataflow_cpp/include/embb/dataflow/internal/select.h index f0cff63..8cc28a0 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/select.h +++ b/dataflow_cpp/include/embb/dataflow/internal/select.h @@ -34,6 +34,9 @@ namespace embb { namespace dataflow { + +class Network; + namespace internal { template @@ -44,8 +47,10 @@ class Select typedef Inputs InputsType; typedef Outputs OutputsType; - Select() { + Select(Network & network) : inputs_(network.GetSlices()) { inputs_.SetListener(this); + slices_ = network.GetSlices(); + SetScheduler(network.GetScheduler()); } virtual bool HasInputs() const { @@ -80,11 +85,8 @@ class Select } } - virtual void Init(InitData * init_data) { - slices_ = init_data->slices; - //inputs_.SetSlices(slices_); - SetScheduler(init_data->sched); - GetOutput<0>().SendInit(init_data); + virtual bool IsFullyConnected() { + return inputs_.IsFullyConnected() && outputs_.IsFullyConnected(); } InputsType & GetInputs() { @@ -117,10 +119,6 @@ class Select Run(clock); } - virtual void OnInit(InitData * init_data) { - Init(init_data); - } - private: InputsType inputs_; OutputsType outputs_; diff --git a/dataflow_cpp/include/embb/dataflow/internal/sink.h b/dataflow_cpp/include/embb/dataflow/internal/sink.h index bfbb5bb..6ff367e 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/sink.h +++ b/dataflow_cpp/include/embb/dataflow/internal/sink.h @@ -48,13 +48,23 @@ class Sink< Inputs > typedef SinkExecutor< InputsType > ExecutorType; typedef typename ExecutorType::FunctionType FunctionType; - explicit Sink(FunctionType function) - : executor_(function) - , action_(NULL) { + explicit Sink(Network & network, FunctionType function) + : inputs_(network.GetSlices()) + , executor_(function) + , action_(NULL) + , slices_(network.GetSlices()) { next_clock_ = 0; queued_clock_ = 0; queue_id_ = GetNextProcessID(); inputs_.SetListener(this); + action_ = reinterpret_cast( + embb::base::Allocation::Allocate( + sizeof(Action)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + action_[ii] = Action(); + } + SetListener(&network); + SetScheduler(network.GetScheduler()); } ~Sink() { @@ -78,17 +88,8 @@ class Sink< Inputs > listener_->OnClock(clock); } - virtual void Init(InitData * init_data) { - slices_ = init_data->slices; - action_ = reinterpret_cast( - embb::base::Allocation::Allocate( - sizeof(Action)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - action_[ii] = Action(); - } - SetListener(init_data->sink_listener); - SetScheduler(init_data->sched); - listener_->OnInit(init_data); + virtual bool IsFullyConnected() { + return inputs_.IsFullyConnected(); } InputsType & GetInputs() { @@ -132,10 +133,6 @@ class Sink< Inputs > } } - virtual void OnInit(InitData * init_data) { - Init(init_data); - } - private: InputsType inputs_; ExecutorType executor_; diff --git a/dataflow_cpp/include/embb/dataflow/internal/source.h b/dataflow_cpp/include/embb/dataflow/internal/source.h index d8fb85b..067a85e 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source.h @@ -48,8 +48,9 @@ class Source< Outputs > typedef SourceExecutor< OutputsType > ExecutorType; typedef typename ExecutorType::FunctionType FunctionType; - explicit Source(FunctionType function) + explicit Source(Network & network, FunctionType function) : executor_(function), not_done_(true) { + SetScheduler(network.GetScheduler()); } virtual bool HasOutputs() const { @@ -60,9 +61,8 @@ class Source< Outputs > not_done_ = executor_.Execute(clock, outputs_); } - virtual void Init(InitData * init_data) { - SetScheduler(init_data->sched); - executor_.Init(init_data, outputs_); + virtual bool IsFullyConnected() { + return outputs_.IsFullyConnected(); } virtual bool Start(int clock) { diff --git a/dataflow_cpp/include/embb/dataflow/internal/source_executor.h b/dataflow_cpp/include/embb/dataflow/internal/source_executor.h index 876ea5b..309b9c9 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source_executor.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source_executor.h @@ -36,8 +36,6 @@ namespace embb { namespace dataflow { namespace internal { -class Scheduler; - template class SourceExecutor; @@ -59,10 +57,6 @@ class SourceExecutor< Outputs > { return result; } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -87,11 +81,6 @@ class SourceExecutor< Outputs > { return result; } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - outputs.template Get<1>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -118,12 +107,6 @@ class SourceExecutor< Outputs > { return result; } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - outputs.template Get<1>().SendInit(init_data); - outputs.template Get<2>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -152,13 +135,6 @@ class SourceExecutor< Outputs > { return result; } - void Init(InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - outputs.template Get<1>().SendInit(init_data); - outputs.template Get<2>().SendInit(init_data); - outputs.template Get<3>().SendInit(init_data); - } - private: FunctionType function_; }; @@ -190,15 +166,6 @@ class SourceExecutor< Outputs > { return result; } - void Init( - InitData * init_data, Outputs & outputs) { - outputs.template Get<0>().SendInit(init_data); - outputs.template Get<1>().SendInit(init_data); - outputs.template Get<2>().SendInit(init_data); - outputs.template Get<3>().SendInit(init_data); - outputs.template Get<4>().SendInit(init_data); - } - private: FunctionType function_; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/switch.h b/dataflow_cpp/include/embb/dataflow/internal/switch.h index 14108d6..55d4110 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/switch.h +++ b/dataflow_cpp/include/embb/dataflow/internal/switch.h @@ -34,6 +34,9 @@ namespace embb { namespace dataflow { + +class Network; + namespace internal { template @@ -44,8 +47,9 @@ class Switch typedef Inputs InputsType; typedef Outputs OutputsType; - Switch() { + Switch(Network & network) : inputs_(network.GetSlices()) { inputs_.SetListener(this); + SetScheduler(network.GetScheduler()); } virtual bool HasInputs() const { @@ -77,11 +81,8 @@ class Switch } } - virtual void Init(InitData * init_data) { - //inputs_.SetSlices(init_data->slices); - SetScheduler(init_data->sched); - GetOutput<0>().SendInit(init_data); - GetOutput<1>().SendInit(init_data); + virtual bool IsFullyConnected() { + return inputs_.IsFullyConnected() && outputs_.IsFullyConnected(); } InputsType & GetInputs() { @@ -114,10 +115,6 @@ class Switch Run(clock); } - virtual void OnInit(InitData * init_data) { - Init(init_data); - } - private: InputsType inputs_; OutputsType outputs_; diff --git a/dataflow_cpp/include/embb/dataflow/network.h b/dataflow_cpp/include/embb/dataflow/network.h index 296db6d..0719e3b 100644 --- a/dataflow_cpp/include/embb/dataflow/network.h +++ b/dataflow_cpp/include/embb/dataflow/network.h @@ -58,8 +58,9 @@ class Network { public: /** * Constructs an empty network. + * \param slices Number of concurrent tokens allowed in the network. */ - Network() {} + Network(int slices) {} /** * Input port class. @@ -196,9 +197,10 @@ class Network { /** * Constructs a SerialProcess with a user specified processing function. + * \param network The network this node is going to be part of. * \param function The Function to call to process a token. */ - explicit SerialProcess(FunctionType function); + explicit SerialProcess(Network & network, FunctionType function); /** * \returns \c true if the SerialProcess has any inputs, \c false @@ -277,9 +279,10 @@ class Network { /** * Constructs a ParallelProcess with a user specified processing function. + * \param network The network this node is going to be part of. * \param function The Function to call to process a token. */ - explicit ParallelProcess(FunctionType function); + explicit ParallelProcess(Network & network, FunctionType function); /** * \returns \c true if the ParallelProcess has any inputs, \c false @@ -339,6 +342,7 @@ class Network { */ template class Switch { + public: /** * Function type to use when processing tokens. */ @@ -355,6 +359,12 @@ class Network { typedef Outputs OutputsType; /** + * Constructs a Switch process. + * \param network The network this node is going to be part of. + */ + explicit Select(Network & network); + + /** * \returns Always \c true. */ virtual bool HasInputs() const; @@ -410,6 +420,7 @@ class Network { */ template class Select { + public: /** * Function type to use when processing tokens. */ @@ -426,6 +437,12 @@ class Network { typedef Outputs OutputsType; /** + * Constructs a Select process. + * \param network The network this node is going to be part of. + */ + explicit Select(Network & network); + + /** * \returns Always \c true. */ virtual bool HasInputs() const; @@ -500,9 +517,10 @@ class Network { /** * Constructs a Sink with a user specified processing function. + * \param network The network this node is going to be part of. * \param function The Function to call to process a token. */ - explicit Sink(FunctionType function); + explicit Sink(Network & network, FunctionType function); /** * \returns Always \c true. @@ -559,9 +577,10 @@ class Network { /** * Constructs a Source with a user specified processing function. + * \param network The network this node is going to be part of. * \param function The Function to call to emit a token. */ - explicit Source(FunctionType function); + explicit Source(Network & network, FunctionType function); /** * \returns Always \c false. @@ -594,13 +613,6 @@ class Network { }; /** - * Adds a new source process to the network. - * \param source The source process to add. - */ - template - void AddSource(Source & source); - - /** * Constant source process template. * * A constant source has one output port and emits a constant value given @@ -618,9 +630,10 @@ class Network { /** * Constructs a ConstantSource with a value to emit on each token. + * \param network The network this node is going to be part of. * \param value The value to emit. */ - explicit ConstantSource(Type value); + explicit ConstantSource(Network & network, Type value); /** * \returns Always \c false. @@ -653,19 +666,10 @@ class Network { }; /** - * Adds a new constant source process to the network. - * \param source The constant source process to add. + * Checks whether the network is completely connected and free of cycles. + * \returns \c true if everything is in order, \c false if not. */ - template - void AddSource(ConstantSource & source); - - /** - * Builds the network for usage with \c slices concurrent tokens. This - * function needs to be called after adding all sources and before - * executing the network. - * \param slices Number of concurrent tokens allowed in the network. - */ - void Make(int slices); + bool IsValid(); /** * Executes the network until one of the the sources returns \c false. @@ -677,12 +681,25 @@ class Network { class Network : public internal::ClockListener { public: - Network() : sched_(NULL) {} + Network(int slices) : sink_counter_(NULL), slices_(slices), sched_(NULL) { + sched_ = embb::base::Allocation::New(slices_); + sink_counter_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + sink_counter_[ii] = 0; + } + sink_count_ = 0; + } ~Network() { if (NULL != sched_) { - embb::base::Allocation::Delete(sched_); + embb::base::Allocation::Delete(sched_); + sched_ = NULL; + } + if (NULL != sink_counter_) { embb::base::Allocation::Free(sink_counter_); + sink_counter_ = NULL; } } @@ -690,16 +707,18 @@ class Network : public internal::ClockListener { typename T3 = embb::base::internal::Nil, typename T4 = embb::base::internal::Nil, typename T5 = embb::base::internal::Nil> - struct Inputs { - typedef internal::Inputs Type; + class Inputs : public internal::Inputs { + public: + explicit Inputs(int slices) : internal::Inputs(slices) {} }; template - struct Outputs { - typedef internal::Outputs Type; + class Outputs : public internal::Outputs { + public: + explicit Outputs() : internal::Outputs() {} }; template class SerialProcess; @@ -707,8 +726,8 @@ class Network : public internal::ClockListener { template < typename I1, typename I2, typename I3, typename I4, typename I5, typename O1, typename O2, typename O3, typename O4, typename O5> - class SerialProcess< internal::Inputs, - internal::Outputs > + class SerialProcess< Inputs, + Outputs > : public internal::Process< true, internal::Inputs, internal::Outputs > { @@ -717,11 +736,11 @@ class Network : public internal::ClockListener { internal::Inputs, internal::Outputs >::FunctionType FunctionType; - explicit SerialProcess(FunctionType function) + explicit SerialProcess(Network & network, FunctionType function) : internal::Process< true, internal::Inputs, - internal::Outputs >(function) { - //empty + internal::Outputs >(network, function) { + network.processes_.push_back(this); } }; @@ -730,8 +749,8 @@ class Network : public internal::ClockListener { template < typename I1, typename I2, typename I3, typename I4, typename I5, typename O1, typename O2, typename O3, typename O4, typename O5> - class ParallelProcess< internal::Inputs, - internal::Outputs > + class ParallelProcess< Inputs, + Outputs > : public internal::Process< false, internal::Inputs, internal::Outputs >{ @@ -740,22 +759,30 @@ class Network : public internal::ClockListener { internal::Inputs, internal::Outputs >::FunctionType FunctionType; - explicit ParallelProcess(FunctionType function) + explicit ParallelProcess(Network & network, FunctionType function) : internal::Process< false, internal::Inputs, - internal::Outputs >(function) { - //empty + internal::Outputs >(network, function) { + network.processes_.push_back(this); } }; template class Switch : public internal::Switch { public: + explicit Switch(Network & network) + : internal::Switch(network) { + network.processes_.push_back(this); + } }; template class Select : public internal::Select { public: + explicit Select(Network & network) + : internal::Select(network) { + network.processes_.push_back(this); + } }; template >::FunctionType FunctionType; - explicit Sink(FunctionType function) + explicit Sink(Network & network, FunctionType function) : internal::Sink< - internal::Inputs >(function) { - //empty + internal::Inputs >(network, function) { + network.sinks_.push_back(this); + network.sink_count_++; } }; @@ -786,62 +814,45 @@ class Network : public internal::ClockListener { internal::Outputs >::FunctionType FunctionType; - explicit Source(FunctionType function) + explicit Source(Network & network, FunctionType function) : internal::Source< - internal::Outputs >(function) { - //empty + internal::Outputs >(network, function) { + network.sources_.push_back(this); } }; - template - void AddSource(Source & source) { - sources_.push_back(&source); - } - template class ConstantSource : public internal::ConstantSource { public: - explicit ConstantSource(Type value) - : internal::ConstantSource(value) { - //empty + explicit ConstantSource(Network & network, Type value) + : internal::ConstantSource(network, value) { + network.sources_.push_back(this); } }; - template - void AddSource(ConstantSource & source) { - sources_.push_back(&source); + int GetSlices() const { + return slices_; } - void Make(int slices) { - slices_ = slices; - sched_ = embb::base::Allocation::New(slices_); - - internal::InitData init_data; - init_data.slices = slices_; - init_data.sched = sched_; - init_data.sink_listener = this; + internal::Scheduler * GetScheduler() const { + return sched_; + } - sink_counter_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - sink_counter_[ii] = 0; + bool IsValid() { + bool valid = true; + for (size_t ii = 0; ii < sources_.size(); ii++) { + valid = valid & sources_[ii]->IsFullyConnected(); } - - sink_count_ = 0; - for (size_t it = 0; it < sources_.size(); it++) - sources_[it]->Init(&init_data); - - for (int ii = 0; ii < slices_; ii++) { - sink_counter_[ii] = 0; + for (size_t ii = 0; ii < processes_.size(); ii++) { + valid = valid & processes_[ii]->IsFullyConnected(); + } + for (size_t ii = 0; ii < sinks_.size(); ii++) { + valid = valid & sinks_[ii]->IsFullyConnected(); } + return valid; } void operator () () { - if (NULL == sched_) { - throw embb::base::ErrorException("Network was not properly prepared"); - } - int clock = 0; while (clock >= 0) { const int idx = clock % slices_; @@ -873,15 +884,6 @@ class Network : public internal::ClockListener { --sink_counter_[idx]; } - /** - * Internal. - * \internal - * Gets called when an init token has reached all sinks. - */ - virtual void OnInit(internal::InitData * /*sched*/) { - sink_count_++; - } - private: std::vector processes_; std::vector sources_; diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.cc b/dataflow_cpp/test/dataflow_cpp_test_simple.cc index 5d30d35..da96399 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.cc +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.cc @@ -42,12 +42,12 @@ typedef embb::dataflow::Network MyNetwork; typedef MyNetwork::ConstantSource< int > MyConstantSource; typedef MyNetwork::Source< int > MySource; -typedef MyNetwork::SerialProcess< MyNetwork::Inputs::Type, - MyNetwork::Outputs::Type > MyPred; -typedef MyNetwork::ParallelProcess< MyNetwork::Inputs::Type, - MyNetwork::Outputs::Type > MyFilter; -typedef MyNetwork::ParallelProcess< MyNetwork::Inputs::Type, - MyNetwork::Outputs::Type > MyMult; +typedef MyNetwork::SerialProcess< MyNetwork::Inputs, + MyNetwork::Outputs > MyPred; +typedef MyNetwork::ParallelProcess< MyNetwork::Inputs, + MyNetwork::Outputs > MyFilter; +typedef MyNetwork::ParallelProcess< MyNetwork::Inputs, + MyNetwork::Outputs > MyMult; typedef MyNetwork::Sink< int > MySink; typedef MyNetwork::Switch< int > MySwitch; typedef MyNetwork::Select< int > MySelect; @@ -165,15 +165,15 @@ void SimpleTest::TestBasic() { for (int ii = 0; ii < 10000; ii++) { ArraySink asink; - MyNetwork network; - MyConstantSource constant(4); - MySource source(embb::base::MakeFunction(sourceFunc)); - MyFilter filter(embb::base::MakeFunction(filterFunc)); - MyMult mult(embb::base::MakeFunction(multFunc)); - MySink sink(embb::base::MakeFunction(asink, &ArraySink::Run)); - MyPred pred(embb::base::MakeFunction(predFunc)); - MySwitch sw; - MySelect sel; + MyNetwork network(NUM_SLICES); + MyConstantSource constant(network, 4); + MySource source(network, embb::base::MakeFunction(sourceFunc)); + MyFilter filter(network, embb::base::MakeFunction(filterFunc)); + MyMult mult(network, embb::base::MakeFunction(multFunc)); + MySink sink(network, embb::base::MakeFunction(asink, &ArraySink::Run)); + MyPred pred(network, embb::base::MakeFunction(predFunc)); + MySwitch sw(network); + MySelect sel(network); for (int kk = 0; kk < TEST_COUNT; kk++) { source_array[kk] = -1; @@ -208,12 +208,15 @@ void SimpleTest::TestBasic() { sel.GetOutput<0>() >> sink.GetInput<0>(); - network.AddSource(constant); - network.AddSource(source); +// network.AddSource(constant); +// network.AddSource(source); - network.Make(NUM_SLICES); +// network.Initialize(NUM_SLICES); try { + if (!network.IsValid()) { + EMBB_THROW(embb::base::ErrorException, "network is invalid"); + } network(); } catch (embb::base::ErrorException & e) { PT_ASSERT_MSG(false, e.What()); diff --git a/doc/examples/dataflow/dataflow_add-snippet.h b/doc/examples/dataflow/dataflow_add-snippet.h deleted file mode 100644 index ce51ef8..0000000 --- a/doc/examples/dataflow/dataflow_add-snippet.h +++ /dev/null @@ -1 +0,0 @@ - nw.AddSource(read); diff --git a/doc/examples/dataflow/dataflow_declare_add_sources-snippet.h b/doc/examples/dataflow/dataflow_declare_add_sources-snippet.h index f1cfcaa..dd94cc0 100644 --- a/doc/examples/dataflow/dataflow_declare_add_sources-snippet.h +++ b/doc/examples/dataflow/dataflow_declare_add_sources-snippet.h @@ -6,15 +6,14 @@ Network::Source source1( + network, embb::base::MakeFunction(producer1, &Producer::Run) ), source2( + network, embb::base::MakeFunction(producer2, &Producer::Run) ), source3( + network, embb::base::MakeFunction(producer3, &Producer::Run) ), source4( + network, embb::base::MakeFunction(producer4, &Producer::Run) ); - - nw.AddSource(source1); - nw.AddSource(source2); - nw.AddSource(source3); - nw.AddSource(source4); diff --git a/doc/examples/dataflow/dataflow_declare_replace-snippet.h b/doc/examples/dataflow/dataflow_declare_replace-snippet.h index e230807..4276ecb 100644 --- a/doc/examples/dataflow/dataflow_declare_replace-snippet.h +++ b/doc/examples/dataflow/dataflow_declare_replace-snippet.h @@ -1,5 +1,5 @@ Network::ParallelProcess< - Network::Inputs::Type, - Network::Outputs::Type> replace( - embb::base::MakeFunction(ReplaceFunction) + Network::Inputs, + Network::Outputs > replace( + network, embb::base::MakeFunction(ReplaceFunction) ); diff --git a/doc/examples/dataflow/dataflow_declare_sink-snippet.h b/doc/examples/dataflow/dataflow_declare_sink-snippet.h index dfdd6b2..3bf8a2d 100644 --- a/doc/examples/dataflow/dataflow_declare_sink-snippet.h +++ b/doc/examples/dataflow/dataflow_declare_sink-snippet.h @@ -1,3 +1,3 @@ Network::Sink write( - embb::base::MakeFunction(SinkFunction) + network, embb::base::MakeFunction(SinkFunction) ); diff --git a/doc/examples/dataflow/dataflow_declare_source-snippet.h b/doc/examples/dataflow/dataflow_declare_source-snippet.h index 470db74..4d028a8 100644 --- a/doc/examples/dataflow/dataflow_declare_source-snippet.h +++ b/doc/examples/dataflow/dataflow_declare_source-snippet.h @@ -1,3 +1,3 @@ Network::Source read( - embb::base::MakeFunction(SourceFunction) + network, embb::base::MakeFunction(SourceFunction) ); diff --git a/doc/examples/dataflow/dataflow_linear-fragmented.cc b/doc/examples/dataflow/dataflow_linear-fragmented.cc index 8de4c0b..a3d7ea4 100644 --- a/doc/examples/dataflow/dataflow_linear-fragmented.cc +++ b/doc/examples/dataflow/dataflow_linear-fragmented.cc @@ -52,11 +52,10 @@ std::string with("hello"); #include "dataflow/dataflow_sink_function-snippet.h" void RunDataflowLinear() { +#include "dataflow/dataflow_make-snippet.h" #include "dataflow/dataflow_declare_source-snippet.h" #include "dataflow/dataflow_declare_replace-snippet.h" #include "dataflow/dataflow_declare_sink-snippet.h" #include "dataflow/dataflow_connect-snippet.h" -#include "dataflow/dataflow_add-snippet.h" -#include "dataflow/dataflow_make-snippet.h" #include "dataflow/dataflow_run-snippet.h" } diff --git a/doc/examples/dataflow/dataflow_make-snippet.h b/doc/examples/dataflow/dataflow_make-snippet.h index 19f9aab..8e3ce7f 100644 --- a/doc/examples/dataflow/dataflow_make-snippet.h +++ b/doc/examples/dataflow/dataflow_make-snippet.h @@ -1 +1 @@ - nw.Make(4); + Network network(4); diff --git a/doc/examples/dataflow/dataflow_network-snippet.h b/doc/examples/dataflow/dataflow_network-snippet.h index 3e36693..de219d9 100644 --- a/doc/examples/dataflow/dataflow_network-snippet.h +++ b/doc/examples/dataflow/dataflow_network-snippet.h @@ -1,2 +1 @@ typedef embb::dataflow::Network Network; -static Network nw; diff --git a/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc b/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc index b2590ee..ed30ed0 100644 --- a/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc +++ b/doc/examples/dataflow/dataflow_nonlinear-fragmented.cc @@ -48,22 +48,28 @@ static int SimpleRand(int & seed) { #include "dataflow/dataflow_network-snippet.h" void RunDataflowNonLinear() { +#include "dataflow/dataflow_make-snippet.h" #include "dataflow/dataflow_declare_add_sources-snippet.h" Comparator comparator; Network::ParallelProcess< - Network::Inputs::Type, Network::Outputs::Type> - process1( embb::base::MakeFunction(comparator, &Comparator::Run) ), - process2( embb::base::MakeFunction(comparator, &Comparator::Run) ), - process3( embb::base::MakeFunction(comparator, &Comparator::Run) ), - process4( embb::base::MakeFunction(comparator, &Comparator::Run) ), - process5( embb::base::MakeFunction(comparator, &Comparator::Run) ); + Network::Inputs, Network::Outputs > + process1(network, + embb::base::MakeFunction(comparator, &Comparator::Run)), + process2(network, + embb::base::MakeFunction(comparator, &Comparator::Run)), + process3(network, + embb::base::MakeFunction(comparator, &Comparator::Run)), + process4(network, + embb::base::MakeFunction(comparator, &Comparator::Run)), + process5(network, + embb::base::MakeFunction(comparator, &Comparator::Run)); Consumer consumer; Network::Sink - sink1(embb::base::MakeFunction(consumer, &Consumer::Run)); + sink1(network, embb::base::MakeFunction(consumer, &Consumer::Run)); source1.GetOutput<0>() >> process1.GetInput<0>(); source2.GetOutput<0>() >> process2.GetInput<0>(); @@ -83,6 +89,5 @@ void RunDataflowNonLinear() { process5.GetOutput<1>() >> sink1.GetInput<2>(); process4.GetOutput<1>() >> sink1.GetInput<3>(); - nw.Make(4); - nw(); +#include "dataflow/dataflow_run-snippet.h" } diff --git a/doc/examples/dataflow/dataflow_run-snippet.h b/doc/examples/dataflow/dataflow_run-snippet.h index ff4f083..a0c3bfe 100644 --- a/doc/examples/dataflow/dataflow_run-snippet.h +++ b/doc/examples/dataflow/dataflow_run-snippet.h @@ -1 +1 @@ - nw(); + network(); diff --git a/doc/tutorial/content/dataflow.tex b/doc/tutorial/content/dataflow.tex index 6aba46d..16b9842 100644 --- a/doc/tutorial/content/dataflow.tex +++ b/doc/tutorial/content/dataflow.tex @@ -108,6 +108,10 @@ Then, we have to construct a \emph{network}. A network consists of a set of proc % \\\inputlisting{../examples/dataflow/dataflow_network-snippet.h} % +We need to prepare the network for the desired maximum number of elements that can be in the network at a time. The number of elements is limited to avoid that the network is flooded with new elements before the previous elements have been processed. In a linear pipeline, for example, this may happen if the source is faster than the sink. In our example, at most four elements may be processed simultaneously: one in the source, one in the sink, and two in the middle stage (see below). Finding an optimal value depends on the application and usually requires some experimentation. In general, large values boost the throughput but also increase the latency. Conversely, small values reduce the latency but may lead to a drop of performance in terms of throughput: +% +\\\inputlisting{../examples/dataflow/dataflow_make-snippet.h} +% As the next step, we have to construct the processes shown in Figure~\ref{fig:replace_par}. The easiest way to construct a process is to wrap the user-defined code in a lambda function and to pass it to the network. The network constructs an object for that process and executes the lambda function whenever new data is available. There are several methods for constructing processes depending on their type. The process \textbf{read} is a \emph{source} process, since it produces data (by reading it from the specified file) but does not consume any data. Source processes are constructed from a function object % \\\inputlisting{../examples/dataflow/dataflow_source_function-snippet.h} @@ -142,19 +146,10 @@ is used to construct the sink: \emph{\textbf{Note:} If you parallelize your own application using \embb and your compiler emits a lengthy error message containing lots of templates, it is very likely that for at least one process, the ports and their directions do not match the signature of the given function.} -The network needs to know about the source declared above, so we add it to our network: -% -\\\inputlisting{../examples/dataflow/dataflow_add-snippet.h} -% - As the last step, we have to connect the processes (ports). This is straightforward using the C++ stream operator: % \\\inputlisting{../examples/dataflow/dataflow_connect-snippet.h} % -Once all connections have been established, we need to prepare the network for the desired maximum number of elements that can be in the network at a time. The number of elements is limited to avoid that the network is flooded with new elements before the previous elements have been processed. In a linear pipeline, for example, this may happen if the source is faster than the sink. In our example, at most four elements may be processed simultaneously: one in the source, one in the sink, and two in the middle stage (see above). Finding an optimal value depends on the application and usually requires some experimentation. In general, large values boost the throughput but also increase the latency. Conversely, small values reduce the latency but may lead to a drop of performance in terms of throughput: -% -\\\inputlisting{../examples/dataflow/dataflow_make-snippet.h} -% Then we can start the network: % \\\inputlisting{../examples/dataflow/dataflow_run-snippet.h} diff --git a/tasks_cpp/src/node.cc b/tasks_cpp/src/node.cc index 7aa10b2..409510f 100644 --- a/tasks_cpp/src/node.cc +++ b/tasks_cpp/src/node.cc @@ -41,7 +41,7 @@ namespace { static embb::tasks::Node * node_instance = NULL; #if TASKS_CPP_AUTOMATIC_INITIALIZE -static embb::base::Mutex init_mutex; +static embb_spinlock_t init_mutex = { 0 }; #endif } @@ -207,13 +207,13 @@ bool Node::IsInitialized() { Node & Node::GetInstance() { #if TASKS_CPP_AUTOMATIC_INITIALIZE if (!IsInitialized()) { - init_mutex.Lock(); + embb_spin_lock(&init_mutex); if (!IsInitialized()) { Node::Initialize( TASKS_CPP_AUTOMATIC_DOMAIN_ID, TASKS_CPP_AUTOMATIC_NODE_ID); atexit(Node::Finalize); } - init_mutex.Unlock(); + embb_spin_unlock(&init_mutex); } return *node_instance; #else -- libgit2 0.26.0