diff --git a/dataflow_cpp/include/embb/dataflow/internal/in.h b/dataflow_cpp/include/embb/dataflow/internal/in.h index 8324026..da1cada 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/in.h +++ b/dataflow_cpp/include/embb/dataflow/internal/in.h @@ -51,6 +51,12 @@ class In { In() : values_(NULL), connected_(false), slices_(0) {} + ~In() { + if (NULL != values_) { + embb::base::Allocation::Free(values_); + } + } + SignalType const & GetSignal(int clock) const { return values_[clock % slices_]; } diff --git a/dataflow_cpp/include/embb/dataflow/internal/inputs.h b/dataflow_cpp/include/embb/dataflow/internal/inputs.h index 00c3190..12ec06d 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/inputs.h +++ b/dataflow_cpp/include/embb/dataflow/internal/inputs.h @@ -67,9 +67,14 @@ class Inputs , public ClockListener { public: - Inputs() { + Inputs() : count_(NULL) { test_count_ = 1; } + ~Inputs() { + if (NULL != count_) { + embb::base::Allocation::Free(count_); + } + } void SetListener(ClockListener * listener) { listener_ = listener; this->template Get<0>().SetListener(this); @@ -122,9 +127,14 @@ class Inputs , public ClockListener { public: - Inputs() { + Inputs() : count_(NULL) { test_count_ = 2; } + ~Inputs() { + if (NULL != count_) { + embb::base::Allocation::Free(count_); + } + } void SetListener(ClockListener * listener) { listener_ = listener; this->template Get<0>().SetListener(this); @@ -181,9 +191,14 @@ class Inputs , public ClockListener { public: - Inputs() { + Inputs() : count_(NULL) { test_count_ = 3; } + ~Inputs() { + if (NULL != count_) { + embb::base::Allocation::Free(count_); + } + } void SetListener(ClockListener * listener) { listener_ = listener; this->template Get<0>().SetListener(this); @@ -243,9 +258,14 @@ class Inputs In, embb::base::internal::Nil> , public ClockListener { public: - Inputs() { + Inputs() : count_(NULL) { test_count_ = 4; } + ~Inputs() { + if (NULL != count_) { + embb::base::Allocation::Free(count_); + } + } void SetListener(ClockListener * listener) { listener_ = listener; this->template Get<0>().SetListener(this); @@ -310,9 +330,14 @@ class Inputs In, In > , public ClockListener { public: - Inputs() { + Inputs() : count_(NULL) { test_count_ = 5; } + ~Inputs() { + if (NULL != count_) { + embb::base::Allocation::Free(count_); + } + } void SetListener(ClockListener * listener) { listener_ = listener; this->template Get<0>().SetListener(this); diff --git a/dataflow_cpp/include/embb/dataflow/internal/process.h b/dataflow_cpp/include/embb/dataflow/internal/process.h index 372d36d..fda3971 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process.h @@ -55,6 +55,7 @@ class Process< Serial, Inputs, explicit Process(FunctionType function) : executor_(function) + , action_(NULL) , slices_(0) { next_clock_ = 0; queued_clock_ = 0; @@ -67,6 +68,12 @@ class Process< Serial, Inputs, inputs_.SetListener(this); } + ~Process() { + if (NULL != action_) { + embb::base::Allocation::Free(action_); + } + } + virtual bool HasInputs() const { return inputs_.Size() > 0; } diff --git a/dataflow_cpp/include/embb/dataflow/internal/sink.h b/dataflow_cpp/include/embb/dataflow/internal/sink.h index e23544c..152c602 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/sink.h +++ b/dataflow_cpp/include/embb/dataflow/internal/sink.h @@ -49,13 +49,20 @@ class Sink< Inputs > typedef typename ExecutorType::FunctionType FunctionType; explicit Sink(FunctionType function) - : executor_(function) { + : executor_(function) + , action_(NULL) { next_clock_ = 0; queued_clock_ = 0; queue_id_ = GetNextProcessID(); inputs_.SetListener(this); } + ~Sink() { + if (NULL != action_) { + embb::base::Allocation::Free(action_); + } + } + void SetListener(ClockListener * listener) { listener_ = listener; } diff --git a/dataflow_cpp/include/embb/dataflow/network.h b/dataflow_cpp/include/embb/dataflow/network.h index 2a517db..8f6405a 100644 --- a/dataflow_cpp/include/embb/dataflow/network.h +++ b/dataflow_cpp/include/embb/dataflow/network.h @@ -52,10 +52,8 @@ namespace dataflow { /** * Represents a set of processes, that are connected by communication channels. * - * \tparam Slices Number of concurrently processed tokens. * \ingroup CPP_DATAFLOW */ -template class Network { public: /** @@ -662,6 +660,14 @@ class Network { void AddSource(ConstantSource & source); /** + * Builds the network for usage with \c slices concurrent tokens. This + * function needs to be called after adding all sources and before + * executing the network. + * \param slices Number of concurrent tokens allowed in the network. + */ + void Make(int slices); + + /** * Executes the network until one of the the sources returns \c false. */ void operator () (); @@ -671,7 +677,14 @@ class Network { class Network : public internal::ClockListener { public: - Network() {} + Network() : sched_(NULL) {} + + ~Network() { + if (NULL != sched_) { + embb::base::Allocation::Delete(sched_); + embb::base::Allocation::Free(sink_counter_); + } + } template (slices_); internal::InitData init_data; init_data.slices = slices_; - init_data.sched = sched; + init_data.sched = sched_; init_data.sink_listener = this; sink_counter_ = reinterpret_cast*>( embb::base::Allocation::Allocate( - sizeof(embb::base::Atomic)*slices_)); + sizeof(embb::base::Atomic)*slices_)); for (int ii = 0; ii < slices_; ii++) { sink_counter_[ii] = 0; } @@ -825,12 +835,18 @@ class Network : public internal::ClockListener { for (int ii = 0; ii < slices_; ii++) { sink_counter_[ii] = 0; } + } + + void operator () () { + if (NULL == sched_) { + throw embb::base::ErrorException("Network was not properly prepared"); + } int clock = 0; while (clock >= 0) { const int idx = clock % slices_; while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield(); - sched->WaitForSlice(idx); + sched_->WaitForSlice(idx); if (!SpawnClock(clock)) break; clock++; @@ -841,7 +857,7 @@ class Network : public internal::ClockListener { for (; ii < clock; ii++) { const int idx = ii % slices_; while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield(); - sched->WaitForSlice(idx); + sched_->WaitForSlice(idx); } } @@ -875,6 +891,7 @@ class Network : public internal::ClockListener { embb::base::Atomic * sink_counter_; int sink_count_; int slices_; + internal::Scheduler * sched_; #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY std::vector spawn_history_[Slices]; diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.cc b/dataflow_cpp/test/dataflow_cpp_test_simple.cc index 01a364e..ec54562 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.cc +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.cc @@ -211,8 +211,10 @@ void SimpleTest::TestBasic() { network.AddSource(constant); network.AddSource(source); + network.Make(NUM_SLICES); + try { - network(NUM_SLICES); + network(); } catch (embb::base::ErrorException & e) { PT_ASSERT_MSG(false, e.What()); }