diff --git a/dataflow_cpp/include/embb/dataflow/internal/in.h b/dataflow_cpp/include/embb/dataflow/internal/in.h index ec3d476..8e39a79 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/in.h +++ b/dataflow_cpp/include/embb/dataflow/internal/in.h @@ -74,12 +74,21 @@ class In { void SetConnected() { connected_ = true; } void SetSlices(int slices) { + if (0 < slices_) { + for (int ii = 0; ii < slices_; ii++) { + values_[ii].~SignalType(); + } + embb::base::Allocation::Free(values_); + values_ = NULL; + } slices_ = slices; - values_ = reinterpret_cast( - embb::base::Allocation::Allocate( - sizeof(SignalType)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - new (&values_[ii]) SignalType(); + if (0 < slices_) { + values_ = reinterpret_cast( + embb::base::Allocation::Allocate( + sizeof(SignalType)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + new (&values_[ii]) SignalType(); + } } } diff --git a/dataflow_cpp/include/embb/dataflow/internal/inputs.h b/dataflow_cpp/include/embb/dataflow/internal/inputs.h index 81282dd..207e2a3 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/inputs.h +++ b/dataflow_cpp/include/embb/dataflow/internal/inputs.h @@ -59,6 +59,7 @@ class Inputs @@ -69,12 +70,22 @@ class Inputs , public ClockListener { public: - explicit Inputs(int slices) : count_(NULL), slices_(slices) { - count_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - count_[ii] = 1; + explicit Inputs() : count_(NULL), slices_(0) { + // empty + } + void SetSlices(int slices) { + if (0 < slices_) { + embb::base::Allocation::Free(count_); + count_ = NULL; + } + slices_ = slices; + if (0 < slices_) { + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 1; + } } this->template Get<0>().SetSlices(slices_); } @@ -122,12 +133,22 @@ class Inputs , public ClockListener { public: - explicit Inputs(int slices) : count_(NULL), slices_(slices) { - count_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - count_[ii] = 2; + explicit Inputs() : count_(NULL), slices_(0) { + // empty + } + void SetSlices(int slices) { + if (0 < slices_) { + embb::base::Allocation::Free(count_); + count_ = NULL; + } + slices_ = slices; + if (0 < slices_) { + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 2; + } } this->template Get<0>().SetSlices(slices_); this->template Get<1>().SetSlices(slices_); @@ -181,12 +202,22 @@ class Inputs , public ClockListener { public: - explicit Inputs(int slices) : count_(NULL), slices_(slices) { - count_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - count_[ii] = 3; + explicit Inputs() : count_(NULL), slices_(0) { + // empty + } + void SetSlices(int slices) { + if (0 < slices_) { + embb::base::Allocation::Free(count_); + count_ = NULL; + } + slices_ = slices; + if (0 < slices_) { + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 3; + } } this->template Get<0>().SetSlices(slices_); this->template Get<1>().SetSlices(slices_); @@ -245,12 +276,22 @@ class Inputs In, embb::base::internal::Nil> , public ClockListener { public: - explicit Inputs(int slices) : count_(NULL), slices_(slices) { - count_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - count_[ii] = 4; + explicit Inputs() : count_(NULL), slices_(0) { + // empty + } + void SetSlices(int slices) { + if (0 < slices_) { + embb::base::Allocation::Free(count_); + count_ = NULL; + } + slices_ = slices; + if (0 < slices_) { + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 4; + } } this->template Get<0>().SetSlices(slices_); this->template Get<1>().SetSlices(slices_); @@ -316,12 +357,22 @@ class Inputs In, In > , public ClockListener { public: - explicit Inputs(int slices) : count_(NULL), slices_(slices) { - count_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - count_[ii] = 5; + explicit Inputs() : count_(NULL), slices_(0) { + // empty + } + void SetSlices(int slices) { + if (0 < slices_) { + embb::base::Allocation::Free(count_); + count_ = NULL; + } + slices_ = slices; + if (0 < slices_) { + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 5; + } } this->template Get<0>().SetSlices(slices_); this->template Get<1>().SetSlices(slices_); diff --git a/dataflow_cpp/include/embb/dataflow/internal/node.h b/dataflow_cpp/include/embb/dataflow/internal/node.h index 57a8a94..8aa2805 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/node.h +++ b/dataflow_cpp/include/embb/dataflow/internal/node.h @@ -49,13 +49,21 @@ class Node { EMBB_THROW(embb::base::ErrorException, "Nodes are started implicitly."); } + void SetScheduler(Scheduler * sched) { + sched_ = sched; + if (NULL != sched_) { + SetSlices(sched_->GetSlices()); + } else { + SetSlices(0); + } + } protected: Scheduler * sched_; static int next_process_id_; - void SetScheduler(Scheduler * sched) { sched_ = sched; } static int GetNextProcessID() { return next_process_id_++; } + virtual void SetSlices(int /*slices*/) {}; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/process.h b/dataflow_cpp/include/embb/dataflow/internal/process.h index 584c948..b728d81 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process.h @@ -53,11 +53,11 @@ class Process< Serial, Inputs, typedef ProcessExecutor< InputsType, OutputsType > ExecutorType; typedef typename ExecutorType::FunctionType FunctionType; - Process(int slices, Scheduler * sched, FunctionType function) - : inputs_(slices) + Process(Scheduler * sched, FunctionType function) + : inputs_() , executor_(function) , action_(NULL) - , slices_(slices) { + , slices_(0) { next_clock_ = 0; queued_clock_ = 0; bool ordered = Serial; @@ -67,12 +67,6 @@ class Process< Serial, Inputs, queue_id_ = 0; } inputs_.SetListener(this); - action_ = reinterpret_cast( - embb::base::Allocation::Allocate( - sizeof(Action)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - action_[ii] = Action(); - } SetScheduler(sched); } @@ -173,6 +167,23 @@ class Process< Serial, Inputs, embb::base::Atomic queued_clock_; int queue_id_; int slices_; + + virtual void SetSlices(int slices) { + if (0 < slices_) { + embb::base::Allocation::Free(action_); + action_ = NULL; + } + slices_ = slices; + inputs_.SetSlices(slices); + if (0 < slices_) { + action_ = reinterpret_cast( + embb::base::Allocation::Allocate( + sizeof(Action)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + action_[ii] = Action(); + } + } + } }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/select.h b/dataflow_cpp/include/embb/dataflow/internal/select.h index c90758f..40d6b6f 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/select.h +++ b/dataflow_cpp/include/embb/dataflow/internal/select.h @@ -44,7 +44,7 @@ class Select typedef Inputs InputsType; typedef Outputs OutputsType; - Select(int slices, Scheduler * sched) : inputs_(slices), slices_(slices) { + Select(Scheduler * sched) : inputs_(), slices_(0) { inputs_.SetListener(this); SetScheduler(sched); } @@ -119,6 +119,11 @@ class Select InputsType inputs_; OutputsType outputs_; int slices_; + + virtual void SetSlices(int slices) { + slices_ = slices; + inputs_.SetSlices(slices); + } }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/sink.h b/dataflow_cpp/include/embb/dataflow/internal/sink.h index 1f59464..e690a99 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/sink.h +++ b/dataflow_cpp/include/embb/dataflow/internal/sink.h @@ -48,23 +48,17 @@ class Sink< Inputs > typedef SinkExecutor< InputsType > ExecutorType; typedef typename ExecutorType::FunctionType FunctionType; - Sink(int slices, Scheduler * sched, ClockListener * listener, + Sink(Scheduler * sched, ClockListener * listener, FunctionType function) - : inputs_(slices) + : inputs_() , executor_(function) , action_(NULL) - , slices_(slices) { + , slices_(0) { next_clock_ = 0; queued_clock_ = 0; queue_id_ = GetNextProcessID(); inputs_.SetListener(this); - action_ = reinterpret_cast( - embb::base::Allocation::Allocate( - sizeof(Action)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - action_[ii] = Action(); - } - SetListener(listener); + listener_ = listener; SetScheduler(sched); } @@ -74,10 +68,6 @@ class Sink< Inputs > } } - void SetListener(ClockListener * listener) { - listener_ = listener; - } - virtual bool HasInputs() const { return inputs_.Size() > 0; } @@ -143,6 +133,23 @@ class Sink< Inputs > embb::base::Atomic queued_clock_; int queue_id_; int slices_; + + virtual void SetSlices(int slices) { + if (0 < slices_) { + embb::base::Allocation::Free(action_); + action_ = NULL; + } + slices_ = slices; + inputs_.SetSlices(slices); + if (0 < slices_) { + action_ = reinterpret_cast( + embb::base::Allocation::Allocate( + sizeof(Action)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + action_[ii] = Action(); + } + } + } }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/switch.h b/dataflow_cpp/include/embb/dataflow/internal/switch.h index db02058..788685b 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/switch.h +++ b/dataflow_cpp/include/embb/dataflow/internal/switch.h @@ -44,7 +44,7 @@ class Switch typedef Inputs InputsType; typedef Outputs OutputsType; - Switch(int slices, Scheduler * sched) : inputs_(slices) { + Switch(Scheduler * sched) : inputs_() { inputs_.SetListener(this); SetScheduler(sched); } @@ -115,6 +115,10 @@ class Switch private: InputsType inputs_; OutputsType outputs_; + + virtual void SetSlices(int slices) { + inputs_.SetSlices(slices); + } }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/network.h b/dataflow_cpp/include/embb/dataflow/network.h index 53b17e0..4e77782 100644 --- a/dataflow_cpp/include/embb/dataflow/network.h +++ b/dataflow_cpp/include/embb/dataflow/network.h @@ -58,6 +58,14 @@ class Network { public: /** * Constructs an empty network. + * \note The number of concurrent tokens will be derived from the structure + * of the network automatically on the first call to operator () and the + * corresponding resources will be allocated then. + */ + Network() {} + + /** + * Constructs an empty network. * \param slices Number of concurrent tokens allowed in the network. */ explicit Network(int slices) {} @@ -673,6 +681,10 @@ class Network { /** * Executes the network until one of the the sources returns \c false. + * \note If the network was default constructed the number of concurrent + * tokens will be derived from the structure of the network automatically + * on the first call of the operator and the corresponding resources will + * be allocated then. */ void operator () (); }; @@ -681,22 +693,14 @@ class Network { class Network : public internal::ClockListener { public: - explicit Network(int slices = 0) - : sink_counter_(NULL), slices_(slices), sched_(NULL) { - if (0 >= slices) { - slices_ = int(embb_core_count_available())*4; - } - sched_ = embb::base::Allocation::New(slices_); - if (sched_->GetSlices() != slices_) { - slices_ = sched_->GetSlices(); - } - sink_counter_ = reinterpret_cast*>( - embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); - for (int ii = 0; ii < slices_; ii++) { - sink_counter_[ii] = 0; - } - sink_count_ = 0; + Network() + : sink_counter_(NULL), sink_count_(0), slices_(0), sched_(NULL) { + // empty + } + + explicit Network(int slices) + : sink_counter_(NULL), sink_count_(0), slices_(slices), sched_(NULL) { + PrepareSlices(); } ~Network() { @@ -710,24 +714,22 @@ class Network : public internal::ClockListener { } } - template - class Inputs : public internal::Inputs { - public: - explicit Inputs(int slices) - : internal::Inputs(slices) {} + class Inputs { + // empty }; - template - class Outputs : public internal::Outputs { - public: - Outputs() - : internal::Outputs() {} + class Outputs { + // empty }; template class SerialProcess; @@ -749,7 +751,7 @@ class Network : public internal::ClockListener { : internal::Process< true, internal::Inputs, internal::Outputs >( - network.slices_, network.sched_, function) { + network.sched_, function) { network.processes_.push_back(this); } }; @@ -773,7 +775,7 @@ class Network : public internal::ClockListener { : internal::Process< false, internal::Inputs, internal::Outputs >( - network.slices_, network.sched_, function) { + network.sched_, function) { network.processes_.push_back(this); } }; @@ -782,7 +784,7 @@ class Network : public internal::ClockListener { class Switch : public internal::Switch { public: explicit Switch(Network & network) - : internal::Switch(network.slices_, network.sched_) { + : internal::Switch(network.sched_) { network.processes_.push_back(this); } }; @@ -791,7 +793,7 @@ class Network : public internal::ClockListener { class Select : public internal::Select { public: explicit Select(Network & network) - : internal::Select(network.slices_, network.sched_) { + : internal::Select(network.sched_) { network.processes_.push_back(this); } }; @@ -809,7 +811,7 @@ class Network : public internal::ClockListener { explicit Sink(Network & network, FunctionType function) : internal::Sink< internal::Inputs >( - network.slices_, network.sched_, &network, function) { + network.sched_, &network, function) { network.sinks_.push_back(this); network.sink_count_++; } @@ -857,6 +859,27 @@ class Network : public internal::ClockListener { } void operator () () { + if (0 >= slices_) { + slices_ = static_cast( + sources_.size() + + sinks_.size()); + for (size_t ii = 0; ii < processes_.size(); ii++) { + int tt = processes_[ii]->IsSequential() ? 1 : + static_cast(embb_core_count_available()); + slices_ += tt; + } + PrepareSlices(); + for (size_t ii = 0; ii < sources_.size(); ii++) { + sources_[ii]->SetScheduler(sched_); + } + for (size_t ii = 0; ii < processes_.size(); ii++) { + processes_[ii]->SetScheduler(sched_); + } + for (size_t ii = 0; ii < sinks_.size(); ii++) { + sinks_[ii]->SetScheduler(sched_); + } + } + int clock = 0; while (clock >= 0) { const int idx = clock % slices_; @@ -913,6 +936,19 @@ class Network : public internal::ClockListener { } return result; } + + void PrepareSlices() { + sched_ = embb::base::Allocation::New(slices_); + if (sched_->GetSlices() != slices_) { + slices_ = sched_->GetSlices(); + } + sink_counter_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + sink_counter_[ii] = 0; + } + } }; #endif // DOXYGEN