Commit 172d4ed3 by Marcus Winter

dataflow_cpp: fixed bug causing the network to hang

parent e6fe323c
......@@ -55,7 +55,7 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
explicit Process(FunctionType function)
: executor_(function) {
input_clock_expected_ = 0;
next_clock_ = 0;
inputs_.SetListener(this);
}
......@@ -68,16 +68,7 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
}
virtual void Run(int clock) {
bool ordered = Serial;
if (ordered) {
// force ordering
while (input_clock_expected_ != clock) embb::base::Thread::CurrentYield();
}
executor_.Execute(clock, inputs_, outputs_);
//inputs_.Clear(clock);
input_clock_expected_ = clock + 1;
}
InputsType & GetInputs() {
......@@ -104,20 +95,35 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
}
virtual void OnClock(int clock) {
const int idx = clock % Slices;
if (!inputs_.AreAtClock(clock))
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
action_[idx] = Action(this, clock);
sched_->Spawn(action_[idx]);
bool ordered = Serial;
if (ordered) {
lock_.Lock();
for (int ii = next_clock_; ii < next_clock_ + Slices; ii++) {
if (!inputs_.AreAtClock(ii)) {
break;
}
next_clock_ = ii + 1;
Run(ii);
}
lock_.Unlock();
} else {
const int idx = clock % Slices;
action_[idx] = Action(this, clock);
sched_->Spawn(action_[idx]);
}
}
private:
InputsType inputs_;
OutputsType outputs_;
ExecutorType executor_;
embb::base::Atomic<int> input_clock_expected_;
int next_clock_;
Action action_[Slices];
SpinLock lock_;
};
} // namespace internal
......
......@@ -51,7 +51,7 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
explicit Sink(FunctionType function)
: executor_(function) {
input_clock_expected_ = 0;
next_clock_ = 0;
inputs_.SetListener(this);
}
......@@ -64,17 +64,10 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
}
virtual void Run(int clock) {
//const int idx = clock % Slices;
// force ordering
while (input_clock_expected_ != clock) embb::base::Thread::CurrentYield();
if (inputs_.AreNoneBlank(clock)) {
executor_.Execute(clock, inputs_);
}
listener_->OnClock(clock);
input_clock_expected_ = clock + 1;
}
InputsType & GetInputs() {
......@@ -87,26 +80,31 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
}
virtual void OnClock(int clock) {
lock_.Lock();
TrySpawn(clock);
lock_.Unlock();
}
private:
InputsType inputs_;
ExecutorType executor_;
embb::base::Atomic<int> input_clock_expected_;
int next_clock_;
Action action_[Slices];
ClockListener * listener_;
SpinLock lock_;
void TrySpawn(int clock) {
const int idx = clock % Slices;
if (!inputs_.AreAtClock(clock))
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
action_[idx] = Action(this, clock);
sched_->Spawn(action_[idx]);
lock_.Lock();
for (int ii = next_clock_; ii < next_clock_ + Slices; ii++) {
if (!inputs_.AreAtClock(ii)) {
break;
}
next_clock_ = ii + 1;
Run(ii);
}
lock_.Unlock();
}
};
......
......@@ -66,11 +66,8 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
}
virtual bool Start(int clock) {
while (clock != next_clock_) embb::base::Thread::CurrentYield();
if (not_done_) {
const int idx = clock % Slices;
action_[idx] = Action(this, clock);
sched_->Spawn(action_[idx]);
Run(clock);
}
return not_done_;
}
......
......@@ -36,7 +36,7 @@
#include <embb/dataflow/dataflow.h>
typedef embb::dataflow::Network<4> MyNetwork;
typedef embb::dataflow::Network<8> MyNetwork;
typedef MyNetwork::ConstantSource< int > MyConstantSource;
typedef MyNetwork::Source< int > MySource;
typedef MyNetwork::SerialProcess< MyNetwork::Inputs<int>::Type,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment