diff --git a/dataflow_cpp/CMakeLists.txt b/dataflow_cpp/CMakeLists.txt index 0ad76a0..133d066 100644 --- a/dataflow_cpp/CMakeLists.txt +++ b/dataflow_cpp/CMakeLists.txt @@ -26,7 +26,7 @@ target_link_libraries(embb_dataflow_cpp embb_mtapi_cpp embb_base_cpp embb_mtapi_ if (BUILD_TESTS STREQUAL ON) include_directories(${CMAKE_CURRENT_BINARY_DIR}/../partest/include) add_executable (embb_dataflow_cpp_test ${EMBB_DATAFLOW_CPP_TEST_SOURCES}) - target_link_libraries(embb_dataflow_cpp_test embb_mtapi_cpp embb_mtapi_c partest + target_link_libraries(embb_dataflow_cpp_test embb_dataflow_cpp embb_mtapi_cpp embb_mtapi_c partest embb_base_cpp embb_base_c ${compiler_libs}) CopyBin(BIN embb_dataflow_cpp_test DEST ${local_install_dir}) endif() diff --git a/dataflow_cpp/include/embb/dataflow/internal/inputs.h b/dataflow_cpp/include/embb/dataflow/internal/inputs.h index afcb011..4f4430a 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/inputs.h +++ b/dataflow_cpp/include/embb/dataflow/internal/inputs.h @@ -27,6 +27,7 @@ #ifndef EMBB_DATAFLOW_INTERNAL_INPUTS_H_ #define EMBB_DATAFLOW_INTERNAL_INPUTS_H_ +#include #include #include diff --git a/dataflow_cpp/include/embb/dataflow/internal/node.h b/dataflow_cpp/include/embb/dataflow/internal/node.h index daf80b3..122750e 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/node.h +++ b/dataflow_cpp/include/embb/dataflow/internal/node.h @@ -28,7 +28,7 @@ #define EMBB_DATAFLOW_INTERNAL_NODE_H_ #include - +#include #include namespace embb { @@ -50,6 +50,9 @@ class Node { protected: Scheduler * sched_; + static int next_process_id_; + + static int GetNextProcessID() { return next_process_id_++; } }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/process.h b/dataflow_cpp/include/embb/dataflow/internal/process.h index 1765db0..caa81db 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process.h @@ -56,6 +56,13 @@ class Process< Slices, Serial, Inputs, explicit Process(FunctionType function) : executor_(function) { next_clock_ = 0; + queued_clock_ = 0; + bool ordered = Serial; + if (ordered) { + queue_id_ = GetNextProcessID(); + } else { + queue_id_ = 0; + } inputs_.SetListener(this); } @@ -95,21 +102,39 @@ class Process< Slices, Serial, Inputs, } virtual void OnClock(int clock) { - if (!inputs_.AreAtClock(clock)) + if (!inputs_.AreAtClock(clock)) { EMBB_THROW(embb::base::ErrorException, "Some inputs are not at expected clock.") + } bool ordered = Serial; if (ordered) { - lock_.Lock(); - for (int ii = next_clock_; ii < next_clock_ + Slices; ii++) { - if (!inputs_.AreAtClock(ii)) { - break; + bool retry = true; + while (retry) { + int clk = next_clock_; + int clk_end = clk + Slices; + int clk_res = clk; + for (int ii = clk; ii < clk_end; ii++) { + if (!inputs_.AreAtClock(ii)) { + break; + } + clk_res++; + } + if (clk_res > clk) { + if (next_clock_.CompareAndSwap(clk, clk_res)) { + while (queued_clock_.Load() < clk); + for (int ii = clk; ii < clk_res; ii++) { + const int idx = ii % Slices; + action_[idx] = Action(this, ii); + sched_->Enqueue(queue_id_, action_[idx]); + } + queued_clock_.Store(clk_res); + retry = false; + } + } else { + retry = false; } - next_clock_ = ii + 1; - Run(ii); } - lock_.Unlock(); } else { const int idx = clock % Slices; action_[idx] = Action(this, clock); @@ -121,9 +146,10 @@ class Process< Slices, Serial, Inputs, InputsType inputs_; OutputsType outputs_; ExecutorType executor_; - int next_clock_; Action action_[Slices]; - SpinLock lock_; + embb::base::Atomic next_clock_; + embb::base::Atomic queued_clock_; + int queue_id_; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h index 811c7dd..5770217 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h @@ -38,6 +38,7 @@ class Scheduler { Scheduler() {} virtual ~Scheduler() {} virtual void Spawn(Action & action) = 0; + virtual void Enqueue(int process_id, Action & action) = 0; virtual void WaitForSlice(int slice) = 0; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h index 7e72812..692b4b8 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h @@ -45,6 +45,16 @@ class SchedulerMTAPI : public Scheduler { embb::mtapi::Group & group = node.CreateGroup(); group_[ii] = &group; } + + queue_count_ = static_cast(node.GetWorkerThreadCount()); + queue_ = reinterpret_cast( + embb::base::Allocation::Allocate( + sizeof(embb::mtapi::Queue*)*queue_count_)); + + for (int ii = 0; ii < queue_count_; ii++) { + embb::mtapi::Queue & queue = node.CreateQueue(0, true); + queue_[ii] = &queue; + } } virtual ~SchedulerMTAPI() { embb::mtapi::Node & node = embb::mtapi::Node::GetInstance(); @@ -52,17 +62,29 @@ class SchedulerMTAPI : public Scheduler { group_[ii]->WaitAll(MTAPI_INFINITE); node.DestroyGroup(*group_[ii]); } + for (int ii = 0; ii < queue_count_; ii++) { + node.DestroyQueue(*queue_[ii]); + } + embb::base::Allocation::Free(queue_); } virtual void Spawn(Action & action) { const int idx = action.GetClock() % Slices; group_[idx]->Spawn(embb::base::MakeFunction(action, &Action::RunMTAPI)); } + virtual void Enqueue(int process_id, Action & action) { + const int idx = action.GetClock() % Slices; + const int queue_id = process_id % queue_count_; + queue_[queue_id]->Spawn(group_[idx], + embb::base::MakeFunction(action, &Action::RunMTAPI)); + } virtual void WaitForSlice(int slice) { group_[slice]->WaitAll(MTAPI_INFINITE); } private: embb::mtapi::Group * group_[Slices]; + embb::mtapi::Queue ** queue_; + int queue_count_; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h index a553548..919a3a3 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h @@ -41,6 +41,9 @@ class SchedulerSequential : public Scheduler { virtual void Spawn(Action & action) { action.RunSequential(); } + virtual void Enqueue(int, Action & action) { + action.RunSequential(); + } virtual void WaitForSlice(int /*slice*/) {} }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/signal.h b/dataflow_cpp/include/embb/dataflow/internal/signal.h index 951e75a..dde39cc 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/signal.h +++ b/dataflow_cpp/include/embb/dataflow/internal/signal.h @@ -27,7 +27,7 @@ #ifndef EMBB_DATAFLOW_INTERNAL_SIGNAL_H_ #define EMBB_DATAFLOW_INTERNAL_SIGNAL_H_ -#include +#include namespace embb { namespace dataflow { @@ -42,27 +42,23 @@ class Signal { Signal(Signal const & other) : blank_(other.blank_), value_(other.value_), clock_(other.clock_) {} void operator = (Signal const & rhs) { - lock_.Lock(); blank_ = rhs.blank_; value_ = rhs.value_; clock_ = rhs.clock_; - lock_.Unlock(); + embb_atomic_memory_barrier(); } int GetClock() const { return clock_; } bool IsBlank() const { return blank_; } Type const & GetValue() const { return value_; } void Clear() { - lock_.Lock(); blank_ = true; clock_ = -1; - lock_.Unlock(); } private: bool blank_; Type value_; int clock_; - SpinLock lock_; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/sink.h b/dataflow_cpp/include/embb/dataflow/internal/sink.h index 7066995..0487834 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/sink.h +++ b/dataflow_cpp/include/embb/dataflow/internal/sink.h @@ -52,6 +52,8 @@ class Sink< Slices, Inputs > explicit Sink(FunctionType function) : executor_(function) { next_clock_ = 0; + queued_clock_ = 0; + queue_id_ = GetNextProcessID(); inputs_.SetListener(this); } @@ -80,32 +82,47 @@ class Sink< Slices, Inputs > } virtual void OnClock(int clock) { - TrySpawn(clock); + if (!inputs_.AreAtClock(clock)) { + EMBB_THROW(embb::base::ErrorException, + "Some inputs are not at expected clock.") + } + + bool retry = true; + while (retry) { + int clk = next_clock_; + int clk_end = clk + Slices; + int clk_res = clk; + for (int ii = clk; ii < clk_end; ii++) { + if (!inputs_.AreAtClock(ii)) { + break; + } + clk_res++; + } + if (clk_res > clk) { + if (next_clock_.CompareAndSwap(clk, clk_res)) { + while (queued_clock_.Load() < clk); + for (int ii = clk; ii < clk_res; ii++) { + const int idx = ii % Slices; + action_[idx] = Action(this, ii); + sched_->Enqueue(queue_id_, action_[idx]); + } + queued_clock_.Store(clk_res); + retry = false; + } + } else { + retry = false; + } + } } private: InputsType inputs_; ExecutorType executor_; - int next_clock_; Action action_[Slices]; ClockListener * listener_; - SpinLock lock_; - - void TrySpawn(int clock) { - if (!inputs_.AreAtClock(clock)) - EMBB_THROW(embb::base::ErrorException, - "Some inputs are not at expected clock.") - - lock_.Lock(); - for (int ii = next_clock_; ii < next_clock_ + Slices; ii++) { - if (!inputs_.AreAtClock(ii)) { - break; - } - next_clock_ = ii + 1; - Run(ii); - } - lock_.Unlock(); - } + embb::base::Atomic next_clock_; + embb::base::Atomic queued_clock_; + int queue_id_; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/source.h b/dataflow_cpp/include/embb/dataflow/internal/source.h index 48f5772..232a50e 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source.h @@ -27,13 +27,9 @@ #ifndef EMBB_DATAFLOW_INTERNAL_SOURCE_H_ #define EMBB_DATAFLOW_INTERNAL_SOURCE_H_ -#include -#include - #include #include #include -#include namespace embb { namespace dataflow { @@ -53,7 +49,6 @@ class Source< Slices, Outputs > explicit Source(FunctionType function) : executor_(function), not_done_(true) { - next_clock_ = 0; } virtual bool HasOutputs() const { @@ -62,7 +57,6 @@ class Source< Slices, Outputs > virtual void Run(int clock) { not_done_ = executor_.Execute(clock, outputs_); - next_clock_++; } virtual bool Start(int clock) { @@ -89,9 +83,7 @@ class Source< Slices, Outputs > private: OutputsType outputs_; ExecutorType executor_; - Action action_[Slices]; volatile bool not_done_; - embb::base::Atomic next_clock_; }; } // namespace internal diff --git a/dataflow_cpp/src/dummy.cc b/dataflow_cpp/src/node.cc similarity index 96% rename from dataflow_cpp/src/dummy.cc rename to dataflow_cpp/src/node.cc index e95756a..55b91c7 100644 --- a/dataflow_cpp/src/dummy.cc +++ b/dataflow_cpp/src/node.cc @@ -24,3 +24,6 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include + +int embb::dataflow::internal::Node::next_process_id_ = 0; diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.cc b/dataflow_cpp/test/dataflow_cpp_test_simple.cc index 3fe9272..52d1212 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.cc +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.cc @@ -60,7 +60,7 @@ bool sourceFunc(int & out) { source_array[source_counter] = out; source_counter++; - return source_counter < 12; + return source_counter < TEST_COUNT; } embb::base::Atomic pred_counter;