Commit 0e87c034 by Marcus Winter

dataflow_cpp: fixed memory leaks

parent 5563ddbd
...@@ -51,6 +51,12 @@ class In { ...@@ -51,6 +51,12 @@ class In {
In() : values_(NULL), connected_(false), slices_(0) {} In() : values_(NULL), connected_(false), slices_(0) {}
~In() {
if (NULL != values_) {
embb::base::Allocation::Free(values_);
}
}
SignalType const & GetSignal(int clock) const { SignalType const & GetSignal(int clock) const {
return values_[clock % slices_]; return values_[clock % slices_];
} }
......
...@@ -67,9 +67,14 @@ class Inputs<T1, embb::base::internal::Nil, embb::base::internal::Nil, ...@@ -67,9 +67,14 @@ class Inputs<T1, embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil> embb::base::internal::Nil>
, public ClockListener { , public ClockListener {
public: public:
Inputs() { Inputs() : count_(NULL) {
test_count_ = 1; test_count_ = 1;
} }
~Inputs() {
if (NULL != count_) {
embb::base::Allocation::Free(count_);
}
}
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
this->template Get<0>().SetListener(this); this->template Get<0>().SetListener(this);
...@@ -122,9 +127,14 @@ class Inputs<T1, T2, embb::base::internal::Nil, ...@@ -122,9 +127,14 @@ class Inputs<T1, T2, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil> embb::base::internal::Nil, embb::base::internal::Nil>
, public ClockListener { , public ClockListener {
public: public:
Inputs() { Inputs() : count_(NULL) {
test_count_ = 2; test_count_ = 2;
} }
~Inputs() {
if (NULL != count_) {
embb::base::Allocation::Free(count_);
}
}
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
this->template Get<0>().SetListener(this); this->template Get<0>().SetListener(this);
...@@ -181,9 +191,14 @@ class Inputs<T1, T2, T3, embb::base::internal::Nil, ...@@ -181,9 +191,14 @@ class Inputs<T1, T2, T3, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil> embb::base::internal::Nil, embb::base::internal::Nil>
, public ClockListener { , public ClockListener {
public: public:
Inputs() { Inputs() : count_(NULL) {
test_count_ = 3; test_count_ = 3;
} }
~Inputs() {
if (NULL != count_) {
embb::base::Allocation::Free(count_);
}
}
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
this->template Get<0>().SetListener(this); this->template Get<0>().SetListener(this);
...@@ -243,9 +258,14 @@ class Inputs<T1, T2, T3, T4, embb::base::internal::Nil> ...@@ -243,9 +258,14 @@ class Inputs<T1, T2, T3, T4, embb::base::internal::Nil>
In<T4>, embb::base::internal::Nil> In<T4>, embb::base::internal::Nil>
, public ClockListener { , public ClockListener {
public: public:
Inputs() { Inputs() : count_(NULL) {
test_count_ = 4; test_count_ = 4;
} }
~Inputs() {
if (NULL != count_) {
embb::base::Allocation::Free(count_);
}
}
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
this->template Get<0>().SetListener(this); this->template Get<0>().SetListener(this);
...@@ -310,9 +330,14 @@ class Inputs ...@@ -310,9 +330,14 @@ class Inputs
In<T4>, In<T5> > In<T4>, In<T5> >
, public ClockListener { , public ClockListener {
public: public:
Inputs() { Inputs() : count_(NULL) {
test_count_ = 5; test_count_ = 5;
} }
~Inputs() {
if (NULL != count_) {
embb::base::Allocation::Free(count_);
}
}
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
this->template Get<0>().SetListener(this); this->template Get<0>().SetListener(this);
......
...@@ -55,6 +55,7 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>, ...@@ -55,6 +55,7 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
explicit Process(FunctionType function) explicit Process(FunctionType function)
: executor_(function) : executor_(function)
, action_(NULL)
, slices_(0) { , slices_(0) {
next_clock_ = 0; next_clock_ = 0;
queued_clock_ = 0; queued_clock_ = 0;
...@@ -67,6 +68,12 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>, ...@@ -67,6 +68,12 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
inputs_.SetListener(this); inputs_.SetListener(this);
} }
~Process() {
if (NULL != action_) {
embb::base::Allocation::Free(action_);
}
}
virtual bool HasInputs() const { virtual bool HasInputs() const {
return inputs_.Size() > 0; return inputs_.Size() > 0;
} }
......
...@@ -49,13 +49,20 @@ class Sink< Inputs<I1, I2, I3, I4, I5> > ...@@ -49,13 +49,20 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
typedef typename ExecutorType::FunctionType FunctionType; typedef typename ExecutorType::FunctionType FunctionType;
explicit Sink(FunctionType function) explicit Sink(FunctionType function)
: executor_(function) { : executor_(function)
, action_(NULL) {
next_clock_ = 0; next_clock_ = 0;
queued_clock_ = 0; queued_clock_ = 0;
queue_id_ = GetNextProcessID(); queue_id_ = GetNextProcessID();
inputs_.SetListener(this); inputs_.SetListener(this);
} }
~Sink() {
if (NULL != action_) {
embb::base::Allocation::Free(action_);
}
}
void SetListener(ClockListener * listener) { void SetListener(ClockListener * listener) {
listener_ = listener; listener_ = listener;
} }
......
...@@ -52,10 +52,8 @@ namespace dataflow { ...@@ -52,10 +52,8 @@ namespace dataflow {
/** /**
* Represents a set of processes, that are connected by communication channels. * Represents a set of processes, that are connected by communication channels.
* *
* \tparam Slices Number of concurrently processed tokens.
* \ingroup CPP_DATAFLOW * \ingroup CPP_DATAFLOW
*/ */
template <int Slices>
class Network { class Network {
public: public:
/** /**
...@@ -662,6 +660,14 @@ class Network { ...@@ -662,6 +660,14 @@ class Network {
void AddSource(ConstantSource<Type> & source); void AddSource(ConstantSource<Type> & source);
/** /**
* Builds the network for usage with \c slices concurrent tokens. This
* function needs to be called after adding all sources and before
* executing the network.
* \param slices Number of concurrent tokens allowed in the network.
*/
void Make(int slices);
/**
* Executes the network until one of the the sources returns \c false. * Executes the network until one of the the sources returns \c false.
*/ */
void operator () (); void operator () ();
...@@ -671,7 +677,14 @@ class Network { ...@@ -671,7 +677,14 @@ class Network {
class Network : public internal::ClockListener { class Network : public internal::ClockListener {
public: public:
Network() {} Network() : sched_(NULL) {}
~Network() {
if (NULL != sched_) {
embb::base::Allocation::Delete<internal::Scheduler>(sched_);
embb::base::Allocation::Free(sink_counter_);
}
}
template <typename T1, typename T2 = embb::base::internal::Nil, template <typename T1, typename T2 = embb::base::internal::Nil,
typename T3 = embb::base::internal::Nil, typename T3 = embb::base::internal::Nil,
...@@ -799,21 +812,18 @@ class Network : public internal::ClockListener { ...@@ -799,21 +812,18 @@ class Network : public internal::ClockListener {
sources_.push_back(&source); sources_.push_back(&source);
} }
void operator () (int slices) { void Make(int slices) {
slices_ = slices; slices_ = slices;
sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
internal::SchedulerSequential sched_seq;
internal::SchedulerMTAPI sched_mtapi(slices_);
internal::Scheduler * sched = &sched_mtapi;
internal::InitData init_data; internal::InitData init_data;
init_data.slices = slices_; init_data.slices = slices_;
init_data.sched = sched; init_data.sched = sched_;
init_data.sink_listener = this; init_data.sink_listener = this;
sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>( sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate( embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_)); sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) { for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0; sink_counter_[ii] = 0;
} }
...@@ -825,12 +835,18 @@ class Network : public internal::ClockListener { ...@@ -825,12 +835,18 @@ class Network : public internal::ClockListener {
for (int ii = 0; ii < slices_; ii++) { for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0; sink_counter_[ii] = 0;
} }
}
void operator () () {
if (NULL == sched_) {
throw embb::base::ErrorException("Network was not properly prepared");
}
int clock = 0; int clock = 0;
while (clock >= 0) { while (clock >= 0) {
const int idx = clock % slices_; const int idx = clock % slices_;
while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield(); while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield();
sched->WaitForSlice(idx); sched_->WaitForSlice(idx);
if (!SpawnClock(clock)) if (!SpawnClock(clock))
break; break;
clock++; clock++;
...@@ -841,7 +857,7 @@ class Network : public internal::ClockListener { ...@@ -841,7 +857,7 @@ class Network : public internal::ClockListener {
for (; ii < clock; ii++) { for (; ii < clock; ii++) {
const int idx = ii % slices_; const int idx = ii % slices_;
while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield(); while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield();
sched->WaitForSlice(idx); sched_->WaitForSlice(idx);
} }
} }
...@@ -875,6 +891,7 @@ class Network : public internal::ClockListener { ...@@ -875,6 +891,7 @@ class Network : public internal::ClockListener {
embb::base::Atomic<int> * sink_counter_; embb::base::Atomic<int> * sink_counter_;
int sink_count_; int sink_count_;
int slices_; int slices_;
internal::Scheduler * sched_;
#if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY
std::vector<int> spawn_history_[Slices]; std::vector<int> spawn_history_[Slices];
......
...@@ -211,8 +211,10 @@ void SimpleTest::TestBasic() { ...@@ -211,8 +211,10 @@ void SimpleTest::TestBasic() {
network.AddSource(constant); network.AddSource(constant);
network.AddSource(source); network.AddSource(source);
network.Make(NUM_SLICES);
try { try {
network(NUM_SLICES); network();
} catch (embb::base::ErrorException & e) { } catch (embb::base::ErrorException & e) {
PT_ASSERT_MSG(false, e.What()); PT_ASSERT_MSG(false, e.What());
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment