diff --git a/dataflow_cpp/include/embb/dataflow/internal/process.h b/dataflow_cpp/include/embb/dataflow/internal/process.h index b71cbc7..ca201a7 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process.h @@ -55,7 +55,7 @@ class Process< Slices, Serial, Inputs, explicit Process(FunctionType function) : executor_(function) { - input_clock_expected_ = 0; + next_clock_ = 0; inputs_.SetListener(this); } @@ -68,16 +68,7 @@ class Process< Slices, Serial, Inputs, } virtual void Run(int clock) { - bool ordered = Serial; - if (ordered) { - // force ordering - while (input_clock_expected_ != clock) embb::base::Thread::CurrentYield(); - } - executor_.Execute(clock, inputs_, outputs_); - //inputs_.Clear(clock); - - input_clock_expected_ = clock + 1; } InputsType & GetInputs() { @@ -104,20 +95,35 @@ class Process< Slices, Serial, Inputs, } virtual void OnClock(int clock) { - const int idx = clock % Slices; if (!inputs_.AreAtClock(clock)) EMBB_THROW(embb::base::ErrorException, "Some inputs are not at expected clock.") - action_[idx] = Action(this, clock); - sched_->Spawn(action_[idx]); + + bool ordered = Serial; + if (ordered) { + lock_.Lock(); + for (int ii = next_clock_; ii < next_clock_ + Slices; ii++) { + if (!inputs_.AreAtClock(ii)) { + break; + } + next_clock_ = ii + 1; + Run(ii); + } + lock_.Unlock(); + } else { + const int idx = clock % Slices; + action_[idx] = Action(this, clock); + sched_->Spawn(action_[idx]); + } } private: InputsType inputs_; OutputsType outputs_; ExecutorType executor_; - embb::base::Atomic input_clock_expected_; + int next_clock_; Action action_[Slices]; + SpinLock lock_; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/sink.h b/dataflow_cpp/include/embb/dataflow/internal/sink.h index f711e11..1dd8278 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/sink.h +++ b/dataflow_cpp/include/embb/dataflow/internal/sink.h @@ -51,7 +51,7 @@ class Sink< Slices, Inputs > explicit Sink(FunctionType function) : executor_(function) { - input_clock_expected_ = 0; + next_clock_ = 0; inputs_.SetListener(this); } @@ -64,17 +64,10 @@ class Sink< Slices, Inputs > } virtual void Run(int clock) { - //const int idx = clock % Slices; - - // force ordering - while (input_clock_expected_ != clock) embb::base::Thread::CurrentYield(); - if (inputs_.AreNoneBlank(clock)) { executor_.Execute(clock, inputs_); } listener_->OnClock(clock); - - input_clock_expected_ = clock + 1; } InputsType & GetInputs() { @@ -87,26 +80,31 @@ class Sink< Slices, Inputs > } virtual void OnClock(int clock) { - lock_.Lock(); TrySpawn(clock); - lock_.Unlock(); } private: InputsType inputs_; ExecutorType executor_; - embb::base::Atomic input_clock_expected_; + int next_clock_; Action action_[Slices]; ClockListener * listener_; SpinLock lock_; void TrySpawn(int clock) { - const int idx = clock % Slices; if (!inputs_.AreAtClock(clock)) EMBB_THROW(embb::base::ErrorException, "Some inputs are not at expected clock.") - action_[idx] = Action(this, clock); - sched_->Spawn(action_[idx]); + + lock_.Lock(); + for (int ii = next_clock_; ii < next_clock_ + Slices; ii++) { + if (!inputs_.AreAtClock(ii)) { + break; + } + next_clock_ = ii + 1; + Run(ii); + } + lock_.Unlock(); } }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/source.h b/dataflow_cpp/include/embb/dataflow/internal/source.h index 155769a..0ad4784 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source.h @@ -66,11 +66,8 @@ class Source< Slices, Outputs > } virtual bool Start(int clock) { - while (clock != next_clock_) embb::base::Thread::CurrentYield(); if (not_done_) { - const int idx = clock % Slices; - action_[idx] = Action(this, clock); - sched_->Spawn(action_[idx]); + Run(clock); } return not_done_; } diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.cc b/dataflow_cpp/test/dataflow_cpp_test_simple.cc index db7c241..952ada9 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.cc +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.cc @@ -36,7 +36,7 @@ #include -typedef embb::dataflow::Network<4> MyNetwork; +typedef embb::dataflow::Network<8> MyNetwork; typedef MyNetwork::ConstantSource< int > MyConstantSource; typedef MyNetwork::Source< int > MySource; typedef MyNetwork::SerialProcess< MyNetwork::Inputs::Type,