From 76afcbb3b452e766a1ba738b324b7676b6669ab2 Mon Sep 17 00:00:00 2001 From: Marcus Winter Date: Mon, 15 Feb 2016 18:03:13 +0100 Subject: [PATCH] dataflow_cpp: number of slices can now be set at runtime --- dataflow_cpp/include/embb/dataflow/internal/clock_listener.h | 1 + dataflow_cpp/include/embb/dataflow/internal/constant_source.h | 4 ++-- dataflow_cpp/include/embb/dataflow/internal/in.h | 28 ++++++++++++++++++++-------- dataflow_cpp/include/embb/dataflow/internal/inputs.h | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------------------- dataflow_cpp/include/embb/dataflow/internal/out.h | 4 ++-- dataflow_cpp/include/embb/dataflow/internal/outputs.h | 37 ++++++++++++++++++------------------- dataflow_cpp/include/embb/dataflow/internal/process.h | 32 +++++++++++++++++++++----------- dataflow_cpp/include/embb/dataflow/internal/process_executor.h | 100 ++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------- dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h | 21 ++++++++++++++------- dataflow_cpp/include/embb/dataflow/internal/select.h | 11 ++++++----- dataflow_cpp/include/embb/dataflow/internal/sink.h | 21 ++++++++++++++------- dataflow_cpp/include/embb/dataflow/internal/sink_executor.h | 30 +++++++++++++++--------------- dataflow_cpp/include/embb/dataflow/internal/source.h | 7 +++---- dataflow_cpp/include/embb/dataflow/internal/source_executor.h | 40 ++++++++++++++++++++-------------------- dataflow_cpp/include/embb/dataflow/internal/switch.h | 9 ++++----- dataflow_cpp/include/embb/dataflow/network.h | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------- dataflow_cpp/test/dataflow_cpp_test_simple.cc | 4 ++-- 17 files changed, 322 insertions(+), 246 deletions(-) diff --git a/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h b/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h index 5234740..e01e8ee 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h +++ b/dataflow_cpp/include/embb/dataflow/internal/clock_listener.h @@ -35,6 +35,7 @@ class Scheduler; class ClockListener; struct InitData { + int slices; Scheduler * sched; ClockListener * sink_listener; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/constant_source.h b/dataflow_cpp/include/embb/dataflow/internal/constant_source.h index 4ec14af..4838960 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/constant_source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/constant_source.h @@ -35,11 +35,11 @@ namespace embb { namespace dataflow { namespace internal { -template +template class ConstantSource : public Node { public: - typedef Outputs OutputsType; + typedef Outputs OutputsType; private: OutputsType outputs_; diff --git a/dataflow_cpp/include/embb/dataflow/internal/in.h b/dataflow_cpp/include/embb/dataflow/internal/in.h index 60b4024..8324026 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/in.h +++ b/dataflow_cpp/include/embb/dataflow/internal/in.h @@ -41,18 +41,18 @@ namespace internal { class Scheduler; -template +template class Out; -template +template class In { public: typedef Signal SignalType; - In() : connected_(false) {} + In() : values_(NULL), connected_(false), slices_(0) {} SignalType const & GetSignal(int clock) const { - return values_[clock % Slices]; + return values_[clock % slices_]; } Type GetValue(int clock) const { @@ -66,26 +66,37 @@ class In { bool IsConnected() const { return connected_; } void SetConnected() { connected_ = true; } + void SetSlices(int slices) { + slices_ = slices; + values_ = reinterpret_cast( + embb::base::Allocation::Allocate( + sizeof(SignalType)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + values_[ii] = SignalType(); + } + } + void SetListener(ClockListener * listener) { listener_ = listener; } void Clear(int clock) { - const int idx = clock % Slices; + const int idx = clock % slices_; values_[idx].Clear(); } - friend class Out; + friend class Out; private: - SignalType values_[Slices]; + SignalType * values_; ClockListener * listener_; bool connected_; + int slices_; #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY embb::base::Spinlock lock_; std::vector history_; #endif void Receive(SignalType const & value) { - const int idx = value.GetClock() % Slices; + const int idx = value.GetClock() % slices_; if (values_[idx].GetClock() >= value.GetClock()) EMBB_THROW(embb::base::ErrorException, "Received signal does not increase clock."); @@ -99,6 +110,7 @@ class In { } void ReceiveInit(InitData * init_data) { + SetSlices(init_data->slices); listener_->OnInit(init_data); } }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/inputs.h b/dataflow_cpp/include/embb/dataflow/internal/inputs.h index 7fc87b6..00c3190 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/inputs.h +++ b/dataflow_cpp/include/embb/dataflow/internal/inputs.h @@ -36,7 +36,6 @@ namespace dataflow { namespace internal { template < - int, typename = embb::base::internal::Nil, typename = embb::base::internal::Nil, typename = embb::base::internal::Nil, @@ -44,8 +43,8 @@ template < typename = embb::base::internal::Nil> class Inputs; -template -class Inputs +class Inputs : public Tuple -class Inputs +class Inputs - : public Tuple, embb::base::internal::Nil, + : public Tuple, embb::base::internal::Nil, embb::base::internal::Nil, embb::base::internal::Nil, embb::base::internal::Nil> , public ClockListener { public: Inputs() { - for (int ii = 0; ii < Slices; ii++) - count_[ii] = 1; test_count_ = 1; } void SetListener(ClockListener * listener) { @@ -89,7 +86,7 @@ class Inputstemplate Get<0>().Clear(clock); } virtual void OnClock(int clock) { - const int idx = clock % Slices; + const int idx = clock % slices_; if (count_[idx] == 0) { EMBB_THROW(embb::base::ErrorException, "All inputs already fired for this clock."); @@ -101,25 +98,31 @@ class Inputsslices; + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 1; + } listener_->OnInit(init_data); } } private: - embb::base::Atomic count_[Slices]; + embb::base::Atomic * count_; int test_count_; ClockListener * listener_; + int slices_; }; -template -class Inputs +class Inputs - : public Tuple, In, embb::base::internal::Nil, + : public Tuple, In, embb::base::internal::Nil, embb::base::internal::Nil, embb::base::internal::Nil> , public ClockListener { public: Inputs() { - for (int ii = 0; ii < Slices; ii++) - count_[ii] = 2; test_count_ = 2; } void SetListener(ClockListener * listener) { @@ -142,7 +145,7 @@ class Inputstemplate Get<1>().Clear(clock); } virtual void OnClock(int clock) { - const int idx = clock % Slices; + const int idx = clock % slices_; if (count_[idx] == 0) { EMBB_THROW(embb::base::ErrorException, "All inputs already fired for this clock."); @@ -154,25 +157,31 @@ class Inputsslices; + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 2; + } listener_->OnInit(init_data); } } private: - embb::base::Atomic count_[Slices]; + embb::base::Atomic * count_; int test_count_; ClockListener * listener_; + int slices_; }; -template -class Inputs +class Inputs - : public Tuple, In, In, + : public Tuple, In, In, embb::base::internal::Nil, embb::base::internal::Nil> , public ClockListener { public: Inputs() { - for (int ii = 0; ii < Slices; ii++) - count_[ii] = 3; test_count_ = 3; } void SetListener(ClockListener * listener) { @@ -199,7 +208,7 @@ class Inputstemplate Get<2>().Clear(clock); } virtual void OnClock(int clock) { - const int idx = clock % Slices; + const int idx = clock % slices_; if (count_[idx] == 0) { EMBB_THROW(embb::base::ErrorException, "All inputs already fired for this clock."); @@ -211,24 +220,30 @@ class Inputsslices; + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 3; + } listener_->OnInit(init_data); } } private: - embb::base::Atomic count_[Slices]; + embb::base::Atomic * count_; int test_count_; ClockListener * listener_; + int slices_; }; -template -class Inputs - : public Tuple, In, In, - In, embb::base::internal::Nil> +template +class Inputs + : public Tuple, In, In, + In, embb::base::internal::Nil> , public ClockListener { public: Inputs() { - for (int ii = 0; ii < Slices; ii++) - count_[ii] = 4; test_count_ = 4; } void SetListener(ClockListener * listener) { @@ -259,7 +274,7 @@ class Inputs this->template Get<3>().Clear(clock); } virtual void OnClock(int clock) { - const int idx = clock % Slices; + const int idx = clock % slices_; if (count_[idx] == 0) { EMBB_THROW(embb::base::ErrorException, "All inputs already fired for this clock."); @@ -271,25 +286,31 @@ class Inputs } virtual void OnInit(InitData * init_data) { if (--test_count_ == 0) { + slices_ = init_data->slices; + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 4; + } listener_->OnInit(init_data); } } private: - embb::base::Atomic count_[Slices]; + embb::base::Atomic * count_; int test_count_; ClockListener * listener_; + int slices_; }; -template class Inputs - : public Tuple, In, In, - In, In > + : public Tuple, In, In, + In, In > , public ClockListener { public: Inputs() { - for (int ii = 0; ii < Slices; ii++) - count_[ii] = 5; test_count_ = 5; } void SetListener(ClockListener * listener) { @@ -324,7 +345,7 @@ class Inputs this->template Get<4>().Clear(clock); } virtual void OnClock(int clock) { - const int idx = clock % Slices; + const int idx = clock % slices_; if (count_[idx] == 0) { EMBB_THROW(embb::base::ErrorException, "All inputs already fired for this clock."); @@ -336,13 +357,21 @@ class Inputs } virtual void OnInit(InitData * init_data) { if (--test_count_ == 0) { + slices_ = init_data->slices; + count_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + count_[ii] = 5; + } listener_->OnInit(init_data); } } private: - embb::base::Atomic count_[Slices]; + embb::base::Atomic * count_; int test_count_; ClockListener * listener_; + int slices_; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/out.h b/dataflow_cpp/include/embb/dataflow/internal/out.h index 9c2f1a1..7227737 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/out.h +++ b/dataflow_cpp/include/embb/dataflow/internal/out.h @@ -37,11 +37,11 @@ namespace internal { class Scheduler; -template +template class Out { public: typedef Signal SignalType; - typedef In InType; + typedef In InType; Out() { } diff --git a/dataflow_cpp/include/embb/dataflow/internal/outputs.h b/dataflow_cpp/include/embb/dataflow/internal/outputs.h index cf926b0..725a8c9 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/outputs.h +++ b/dataflow_cpp/include/embb/dataflow/internal/outputs.h @@ -35,7 +35,6 @@ namespace dataflow { namespace internal { template < - int, typename = embb::base::internal::Nil, typename = embb::base::internal::Nil, typename = embb::base::internal::Nil, @@ -43,8 +42,8 @@ template < typename = embb::base::internal::Nil > class Outputs; -template -class Outputs +class Outputs : public Tuple -class Outputs +class Outputs - : public Tuple, embb::base::internal::Nil, + : public Tuple, embb::base::internal::Nil, embb::base::internal::Nil, embb::base::internal::Nil, embb::base::internal::Nil> { public: }; -template -class Outputs +class Outputs - : public Tuple, Out, embb::base::internal::Nil, + : public Tuple, Out, embb::base::internal::Nil, embb::base::internal::Nil, embb::base::internal::Nil> { public: }; -template -class Outputs +class Outputs - : public Tuple, Out, Out, + : public Tuple, Out, Out, embb::base::internal::Nil, embb::base::internal::Nil> { public: }; -template -class Outputs - : public Tuple, Out, Out, - Out, embb::base::internal::Nil>{ +template +class Outputs + : public Tuple, Out, Out, + Out, embb::base::internal::Nil>{ public: }; -template class Outputs - : public Tuple, Out, Out, - Out, Out > { + : public Tuple, Out, Out, + Out, Out > { public: }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/process.h b/dataflow_cpp/include/embb/dataflow/internal/process.h index 4f5bc3f..0d3c578 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process.h @@ -37,24 +37,25 @@ namespace embb { namespace dataflow { namespace internal { -template class Process; +template class Process; template < - int Slices, bool Serial, + bool Serial, typename I1, typename I2, typename I3, typename I4, typename I5, typename O1, typename O2, typename O3, typename O4, typename O5> -class Process< Slices, Serial, Inputs, - Outputs > +class Process< Serial, Inputs, + Outputs > : public Node , public ClockListener { public: - typedef Inputs InputsType; - typedef Outputs OutputsType; + typedef Inputs InputsType; + typedef Outputs OutputsType; typedef ProcessExecutor< InputsType, OutputsType > ExecutorType; typedef typename ExecutorType::FunctionType FunctionType; explicit Process(FunctionType function) - : executor_(function) { + : executor_(function) + , slices_(0) { next_clock_ = 0; queued_clock_ = 0; bool ordered = Serial; @@ -79,6 +80,14 @@ class Process< Slices, Serial, Inputs, } virtual void Init(InitData * init_data) { + slices_ = init_data->slices; + //inputs_.SetSlices(init_data->slices); + action_ = reinterpret_cast( + embb::base::Allocation::Allocate( + sizeof(Action)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + action_[ii] = Action(); + } SetScheduler(init_data->sched); executor_.Init(init_data, outputs_); } @@ -117,7 +126,7 @@ class Process< Slices, Serial, Inputs, bool retry = true; while (retry) { int clk = next_clock_; - int clk_end = clk + Slices; + int clk_end = clk + slices_; int clk_res = clk; for (int ii = clk; ii < clk_end; ii++) { if (!inputs_.AreAtClock(ii)) { @@ -129,7 +138,7 @@ class Process< Slices, Serial, Inputs, if (next_clock_.CompareAndSwap(clk, clk_res)) { while (queued_clock_.Load() < clk) continue; for (int ii = clk; ii < clk_res; ii++) { - const int idx = ii % Slices; + const int idx = ii % slices_; action_[idx] = Action(this, ii); sched_->Enqueue(queue_id_, action_[idx]); } @@ -141,7 +150,7 @@ class Process< Slices, Serial, Inputs, } } } else { - const int idx = clock % Slices; + const int idx = clock % slices_; action_[idx] = Action(this, clock); sched_->Spawn(action_[idx]); } @@ -155,10 +164,11 @@ class Process< Slices, Serial, Inputs, InputsType inputs_; OutputsType outputs_; ExecutorType executor_; - Action action_[Slices]; + Action * action_; embb::base::Atomic next_clock_; embb::base::Atomic queued_clock_; int queue_id_; + int slices_; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/process_executor.h b/dataflow_cpp/include/embb/dataflow/internal/process_executor.h index 4219cea..9027508 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process_executor.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process_executor.h @@ -40,8 +40,8 @@ namespace internal { template class ProcessExecutor; -template -class ProcessExecutor< Inputs, Outputs > { +template +class ProcessExecutor< Inputs, Outputs > { public: typedef embb::base::Function FunctionType; @@ -49,8 +49,8 @@ class ProcessExecutor< Inputs, Outputs > { void Execute( int clock, - Inputs & inputs, - Outputs & outputs) { + Inputs & inputs, + Outputs & outputs) { if (inputs.AreNoneBlank(clock)) { O1 o1; function_( @@ -62,7 +62,7 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); } @@ -70,8 +70,8 @@ class ProcessExecutor< Inputs, Outputs > { FunctionType function_; }; -template -class ProcessExecutor< Inputs, Outputs > { +template +class ProcessExecutor< Inputs, Outputs > { public: typedef embb::base::Function FunctionType; @@ -79,8 +79,8 @@ class ProcessExecutor< Inputs, Outputs > { void Execute( int clock, - Inputs & inputs, - Outputs & outputs) { + Inputs & inputs, + Outputs & outputs) { if (inputs.AreNoneBlank(clock)) { O1 o1; O2 o2; @@ -95,7 +95,7 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); outputs.template Get<1>().SendInit(init_data); } @@ -104,8 +104,8 @@ class ProcessExecutor< Inputs, Outputs > { FunctionType function_; }; -template -class ProcessExecutor< Inputs, Outputs > { +template +class ProcessExecutor< Inputs, Outputs > { public: typedef embb::base::Function FunctionType; @@ -114,8 +114,8 @@ class ProcessExecutor< Inputs, Outputs > { void Execute( int clock, - Inputs & inputs, - Outputs & outputs) { + Inputs & inputs, + Outputs & outputs) { if (inputs.AreNoneBlank(clock)) { O1 o1; O2 o2; @@ -133,7 +133,7 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); outputs.template Get<1>().SendInit(init_data); outputs.template Get<2>().SendInit(init_data); @@ -143,9 +143,9 @@ class ProcessExecutor< Inputs, Outputs > { FunctionType function_; }; -template -class ProcessExecutor< Inputs, Outputs > { +class ProcessExecutor< Inputs, Outputs > { public: typedef embb::base::Function FunctionType; @@ -154,8 +154,8 @@ class ProcessExecutor< Inputs, Outputs > { void Execute( int clock, - Inputs & inputs, - Outputs & outputs) { + Inputs & inputs, + Outputs & outputs) { if (inputs.AreNoneBlank(clock)) { O1 o1; O2 o2; @@ -176,7 +176,7 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); outputs.template Get<1>().SendInit(init_data); outputs.template Get<2>().SendInit(init_data); @@ -187,8 +187,8 @@ class ProcessExecutor< Inputs, Outputs > { FunctionType function_; }; -template -class ProcessExecutor< Inputs, Outputs > { +template +class ProcessExecutor< Inputs, Outputs > { public: typedef embb::base::Function FunctionType; @@ -197,8 +197,8 @@ class ProcessExecutor< Inputs, Outputs > { void Execute( int clock, - Inputs & inputs, - Outputs & outputs) { + Inputs & inputs, + Outputs & outputs) { if (inputs.AreNoneBlank(clock)) { O1 o1; function_( @@ -211,7 +211,7 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); } @@ -219,8 +219,8 @@ class ProcessExecutor< Inputs, Outputs > { FunctionType function_; }; -template -class ProcessExecutor< Inputs, Outputs > { +template +class ProcessExecutor< Inputs, Outputs > { public: typedef embb::base::Function FunctionType; @@ -229,8 +229,8 @@ class ProcessExecutor< Inputs, Outputs > { void Execute( int clock, - Inputs & inputs, - Outputs & outputs) { + Inputs & inputs, + Outputs & outputs) { if (inputs.AreNoneBlank(clock)) { O1 o1; O2 o2; @@ -246,7 +246,7 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); outputs.template Get<1>().SendInit(init_data); } @@ -255,9 +255,9 @@ class ProcessExecutor< Inputs, Outputs > { FunctionType function_; }; -template -class ProcessExecutor< Inputs, Outputs > { +class ProcessExecutor< Inputs, Outputs > { public: typedef embb::base::Function FunctionType; @@ -266,8 +266,8 @@ class ProcessExecutor< Inputs, Outputs > { void Execute( int clock, - Inputs & inputs, - Outputs & outputs) { + Inputs & inputs, + Outputs & outputs) { if (inputs.AreNoneBlank(clock)) { O1 o1; O2 o2; @@ -286,7 +286,7 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); outputs.template Get<1>().SendInit(init_data); outputs.template Get<2>().SendInit(init_data); @@ -296,8 +296,8 @@ class ProcessExecutor< Inputs, Outputs > { FunctionType function_; }; -template -class ProcessExecutor< Inputs, Outputs > { +template +class ProcessExecutor< Inputs, Outputs > { public: typedef embb::base::Function FunctionType; @@ -306,8 +306,8 @@ class ProcessExecutor< Inputs, Outputs > { void Execute( int clock, - Inputs & inputs, - Outputs & outputs) { + Inputs & inputs, + Outputs & outputs) { if (inputs.AreNoneBlank(clock)) { O1 o1; function_( @@ -321,7 +321,7 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); } @@ -329,9 +329,9 @@ class ProcessExecutor< Inputs, Outputs > { FunctionType function_; }; -template -class ProcessExecutor< Inputs, Outputs > { +class ProcessExecutor< Inputs, Outputs > { public: typedef embb::base::Function FunctionType; @@ -340,8 +340,8 @@ class ProcessExecutor< Inputs, Outputs > { void Execute( int clock, - Inputs & inputs, - Outputs & outputs) { + Inputs & inputs, + Outputs & outputs) { if (inputs.AreNoneBlank(clock)) { O1 o1; O2 o2; @@ -358,7 +358,7 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); outputs.template Get<1>().SendInit(init_data); } @@ -367,9 +367,9 @@ class ProcessExecutor< Inputs, Outputs > { FunctionType function_; }; -template -class ProcessExecutor< Inputs, Outputs > { +class ProcessExecutor< Inputs, Outputs > { public: typedef embb::base::Function FunctionType; @@ -378,8 +378,8 @@ class ProcessExecutor< Inputs, Outputs > { void Execute( int clock, - Inputs & inputs, - Outputs & outputs) { + Inputs & inputs, + Outputs & outputs) { if (inputs.AreNoneBlank(clock)) { O1 o1; function_( @@ -394,7 +394,7 @@ class ProcessExecutor< Inputs, Outputs > { } } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); } diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h index 8f1271e..c2ca76d 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h @@ -36,12 +36,17 @@ namespace embb { namespace dataflow { namespace internal { -template class SchedulerMTAPI : public Scheduler { public: - SchedulerMTAPI() { + SchedulerMTAPI(int slices) + : slices_(slices) { embb::tasks::Node & node = embb::tasks::Node::GetInstance(); - for (int ii = 0; ii < Slices; ii++) { + + group_ = reinterpret_cast( + embb::base::Allocation::Allocate( + sizeof(embb::tasks::Group*)*slices_)); + + for (int ii = 0; ii < slices_; ii++) { embb::tasks::Group & group = node.CreateGroup(); group_[ii] = &group; } @@ -58,21 +63,22 @@ class SchedulerMTAPI : public Scheduler { } virtual ~SchedulerMTAPI() { embb::tasks::Node & node = embb::tasks::Node::GetInstance(); - for (int ii = 0; ii < Slices; ii++) { + for (int ii = 0; ii < slices_; ii++) { group_[ii]->WaitAll(MTAPI_INFINITE); node.DestroyGroup(*group_[ii]); } + embb::base::Allocation::Free(group_); for (int ii = 0; ii < queue_count_; ii++) { node.DestroyQueue(*queue_[ii]); } embb::base::Allocation::Free(queue_); } virtual void Spawn(Action & action) { - const int idx = action.GetClock() % Slices; + const int idx = action.GetClock() % slices_; group_[idx]->Spawn(embb::base::MakeFunction(action, &Action::RunMTAPI)); } virtual void Enqueue(int process_id, Action & action) { - const int idx = action.GetClock() % Slices; + const int idx = action.GetClock() % slices_; const int queue_id = process_id % queue_count_; queue_[queue_id]->Spawn(group_[idx], embb::base::MakeFunction(action, &Action::RunMTAPI)); @@ -82,9 +88,10 @@ class SchedulerMTAPI : public Scheduler { } private: - embb::tasks::Group * group_[Slices]; + embb::tasks::Group ** group_; embb::tasks::Queue ** queue_; int queue_count_; + int slices_; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/select.h b/dataflow_cpp/include/embb/dataflow/internal/select.h index f3a499f..da2384b 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/select.h +++ b/dataflow_cpp/include/embb/dataflow/internal/select.h @@ -27,7 +27,6 @@ #ifndef EMBB_DATAFLOW_INTERNAL_SELECT_H_ #define EMBB_DATAFLOW_INTERNAL_SELECT_H_ -#include #include #include #include @@ -37,13 +36,13 @@ namespace embb { namespace dataflow { namespace internal { -template +template class Select : public Node , public ClockListener { public: - typedef Inputs InputsType; - typedef Outputs OutputsType; + typedef Inputs InputsType; + typedef Outputs OutputsType; Select() { inputs_.SetListener(this); @@ -82,6 +81,8 @@ class Select } virtual void Init(InitData * init_data) { + slices_ = init_data->slices; + //inputs_.SetSlices(slices_); SetScheduler(init_data->sched); GetOutput<0>().SendInit(init_data); } @@ -124,7 +125,7 @@ class Select private: InputsType inputs_; OutputsType outputs_; - Action action_[Slices]; + int slices_; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/sink.h b/dataflow_cpp/include/embb/dataflow/internal/sink.h index 2c835ed..e23544c 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/sink.h +++ b/dataflow_cpp/include/embb/dataflow/internal/sink.h @@ -36,16 +36,15 @@ namespace embb { namespace dataflow { namespace internal { -template class Sink; +template class Sink; template < - int Slices, typename I1, typename I2, typename I3, typename I4, typename I5> -class Sink< Slices, Inputs > +class Sink< Inputs > : public Node , public ClockListener { public: - typedef Inputs InputsType; + typedef Inputs InputsType; typedef SinkExecutor< InputsType > ExecutorType; typedef typename ExecutorType::FunctionType FunctionType; @@ -73,6 +72,13 @@ class Sink< Slices, Inputs > } virtual void Init(InitData * init_data) { + slices_ = init_data->slices; + action_ = reinterpret_cast( + embb::base::Allocation::Allocate( + sizeof(Action)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + action_[ii] = Action(); + } SetListener(init_data->sink_listener); SetScheduler(init_data->sched); listener_->OnInit(init_data); @@ -96,7 +102,7 @@ class Sink< Slices, Inputs > bool retry = true; while (retry) { int clk = next_clock_; - int clk_end = clk + Slices; + int clk_end = clk + slices_; int clk_res = clk; for (int ii = clk; ii < clk_end; ii++) { if (!inputs_.AreAtClock(ii)) { @@ -108,7 +114,7 @@ class Sink< Slices, Inputs > if (next_clock_.CompareAndSwap(clk, clk_res)) { while (queued_clock_.Load() < clk) continue; for (int ii = clk; ii < clk_res; ii++) { - const int idx = ii % Slices; + const int idx = ii % slices_; action_[idx] = Action(this, ii); sched_->Enqueue(queue_id_, action_[idx]); } @@ -128,11 +134,12 @@ class Sink< Slices, Inputs > private: InputsType inputs_; ExecutorType executor_; - Action action_[Slices]; + Action * action_; ClockListener * listener_; embb::base::Atomic next_clock_; embb::base::Atomic queued_clock_; int queue_id_; + int slices_; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/sink_executor.h b/dataflow_cpp/include/embb/dataflow/internal/sink_executor.h index 447602d..b7623d8 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/sink_executor.h +++ b/dataflow_cpp/include/embb/dataflow/internal/sink_executor.h @@ -38,8 +38,8 @@ namespace internal { template class SinkExecutor; -template -class SinkExecutor< Inputs > { +template +class SinkExecutor< Inputs > { public: typedef embb::base::Function FunctionType; @@ -47,7 +47,7 @@ class SinkExecutor< Inputs > { void Execute( int clock, - Inputs & inputs) { + Inputs & inputs) { function_( inputs.template Get<0>().GetValue(clock)); } @@ -56,8 +56,8 @@ class SinkExecutor< Inputs > { FunctionType function_; }; -template -class SinkExecutor< Inputs > { +template +class SinkExecutor< Inputs > { public: typedef embb::base::Function FunctionType; @@ -65,7 +65,7 @@ class SinkExecutor< Inputs > { void Execute( int clock, - Inputs & inputs) { + Inputs & inputs) { function_( inputs.template Get<0>().GetValue(clock), inputs.template Get<1>().GetValue(clock)); @@ -75,8 +75,8 @@ class SinkExecutor< Inputs > { FunctionType function_; }; -template -class SinkExecutor< Inputs > { +template +class SinkExecutor< Inputs > { public: typedef embb::base::Function FunctionType; @@ -85,7 +85,7 @@ class SinkExecutor< Inputs > { void Execute( int clock, - Inputs & inputs) { + Inputs & inputs) { function_( inputs.template Get<0>().GetValue(clock), inputs.template Get<1>().GetValue(clock), @@ -96,8 +96,8 @@ class SinkExecutor< Inputs > { FunctionType function_; }; -template -class SinkExecutor< Inputs > { +template +class SinkExecutor< Inputs > { public: typedef embb::base::Function FunctionType; @@ -106,7 +106,7 @@ class SinkExecutor< Inputs > { void Execute( int clock, - Inputs & inputs) { + Inputs & inputs) { function_( inputs.template Get<0>().GetValue(clock), inputs.template Get<1>().GetValue(clock), @@ -118,9 +118,9 @@ class SinkExecutor< Inputs > { FunctionType function_; }; -template -class SinkExecutor< Inputs > { +class SinkExecutor< Inputs > { public: typedef embb::base::Function FunctionType; @@ -129,7 +129,7 @@ class SinkExecutor< Inputs > { void Execute( int clock, - Inputs & inputs) { + Inputs & inputs) { function_( inputs.template Get<0>().GetValue(clock), inputs.template Get<1>().GetValue(clock), diff --git a/dataflow_cpp/include/embb/dataflow/internal/source.h b/dataflow_cpp/include/embb/dataflow/internal/source.h index 72f198b..3d7c0d4 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source.h @@ -35,15 +35,14 @@ namespace embb { namespace dataflow { namespace internal { -template class Source; +template class Source; template < - int Slices, typename O1, typename O2, typename O3, typename O4, typename O5> -class Source< Slices, Outputs > +class Source< Outputs > : public Node { public: - typedef Outputs OutputsType; + typedef Outputs OutputsType; typedef SourceExecutor< OutputsType > ExecutorType; typedef typename ExecutorType::FunctionType FunctionType; diff --git a/dataflow_cpp/include/embb/dataflow/internal/source_executor.h b/dataflow_cpp/include/embb/dataflow/internal/source_executor.h index 1461285..01cb471 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/source_executor.h +++ b/dataflow_cpp/include/embb/dataflow/internal/source_executor.h @@ -41,8 +41,8 @@ class Scheduler; template class SourceExecutor; -template -class SourceExecutor< Outputs > { +template +class SourceExecutor< Outputs > { public: typedef embb::base::Function FunctionType; @@ -50,14 +50,14 @@ class SourceExecutor< Outputs > { bool Execute( int clock, - Outputs & outputs) { + Outputs & outputs) { O1 o1; bool result = function_(o1); outputs.template Get<0>().Send(Signal(clock, o1)); return result; } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); } @@ -65,8 +65,8 @@ class SourceExecutor< Outputs > { FunctionType function_; }; -template -class SourceExecutor< Outputs > { +template +class SourceExecutor< Outputs > { public: typedef embb::base::Function FunctionType; @@ -74,7 +74,7 @@ class SourceExecutor< Outputs > { bool Execute( int clock, - Outputs & outputs) { + Outputs & outputs) { O1 o1; O2 o2; bool result = function_(o1, o2); @@ -83,7 +83,7 @@ class SourceExecutor< Outputs > { return result; } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); outputs.template Get<1>().SendInit(init_data); } @@ -92,8 +92,8 @@ class SourceExecutor< Outputs > { FunctionType function_; }; -template -class SourceExecutor< Outputs > { +template +class SourceExecutor< Outputs > { public: typedef embb::base::Function FunctionType; @@ -101,7 +101,7 @@ class SourceExecutor< Outputs > { bool Execute( int clock, - Outputs & outputs) { + Outputs & outputs) { O1 o1; O2 o2; O3 o3; @@ -112,7 +112,7 @@ class SourceExecutor< Outputs > { return result; } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); outputs.template Get<1>().SendInit(init_data); outputs.template Get<2>().SendInit(init_data); @@ -122,8 +122,8 @@ class SourceExecutor< Outputs > { FunctionType function_; }; -template -class SourceExecutor< Outputs > { +template +class SourceExecutor< Outputs > { public: typedef embb::base::Function FunctionType; @@ -131,7 +131,7 @@ class SourceExecutor< Outputs > { bool Execute( int clock, - Outputs & outputs) { + Outputs & outputs) { O1 o1; O2 o2; O3 o3; @@ -144,7 +144,7 @@ class SourceExecutor< Outputs > { return result; } - void Init(InitData * init_data, Outputs & outputs) { + void Init(InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); outputs.template Get<1>().SendInit(init_data); outputs.template Get<2>().SendInit(init_data); @@ -155,9 +155,9 @@ class SourceExecutor< Outputs > { FunctionType function_; }; -template -class SourceExecutor< Outputs > { +class SourceExecutor< Outputs > { public: typedef embb::base::Function FunctionType; @@ -165,7 +165,7 @@ class SourceExecutor< Outputs > { bool Execute( int clock, - Outputs & outputs) { + Outputs & outputs) { O1 o1; O2 o2; O3 o3; @@ -181,7 +181,7 @@ class SourceExecutor< Outputs > { } void Init( - InitData * init_data, Outputs & outputs) { + InitData * init_data, Outputs & outputs) { outputs.template Get<0>().SendInit(init_data); outputs.template Get<1>().SendInit(init_data); outputs.template Get<2>().SendInit(init_data); diff --git a/dataflow_cpp/include/embb/dataflow/internal/switch.h b/dataflow_cpp/include/embb/dataflow/internal/switch.h index 3bf36ee..2ae3124 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/switch.h +++ b/dataflow_cpp/include/embb/dataflow/internal/switch.h @@ -27,7 +27,6 @@ #ifndef EMBB_DATAFLOW_INTERNAL_SWITCH_H_ #define EMBB_DATAFLOW_INTERNAL_SWITCH_H_ -#include #include #include #include @@ -37,13 +36,13 @@ namespace embb { namespace dataflow { namespace internal { -template +template class Switch : public Node , public ClockListener { public: - typedef Inputs InputsType; - typedef Outputs OutputsType; + typedef Inputs InputsType; + typedef Outputs OutputsType; Switch() { inputs_.SetListener(this); @@ -79,6 +78,7 @@ class Switch } virtual void Init(InitData * init_data) { + //inputs_.SetSlices(init_data->slices); SetScheduler(init_data->sched); GetOutput<0>().SendInit(init_data); GetOutput<1>().SendInit(init_data); @@ -122,7 +122,6 @@ class Switch private: InputsType inputs_; OutputsType outputs_; - Action action_[Slices]; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/network.h b/dataflow_cpp/include/embb/dataflow/network.h index 33dbaff..2a517db 100644 --- a/dataflow_cpp/include/embb/dataflow/network.h +++ b/dataflow_cpp/include/embb/dataflow/network.h @@ -669,7 +669,6 @@ class Network { #else -template class Network : public internal::ClockListener { public: Network() {} @@ -679,7 +678,7 @@ class Network : public internal::ClockListener { typename T4 = embb::base::internal::Nil, typename T5 = embb::base::internal::Nil> struct Inputs { - typedef internal::Inputs Type; + typedef internal::Inputs Type; }; template struct Outputs { - typedef internal::Outputs Type; + typedef internal::Outputs Type; }; template class SerialProcess; @@ -695,20 +694,20 @@ class Network : public internal::ClockListener { template < typename I1, typename I2, typename I3, typename I4, typename I5, typename O1, typename O2, typename O3, typename O4, typename O5> - class SerialProcess< internal::Inputs, - internal::Outputs > - : public internal::Process< Slices, true, - internal::Inputs, - internal::Outputs > { + class SerialProcess< internal::Inputs, + internal::Outputs > + : public internal::Process< true, + internal::Inputs, + internal::Outputs > { public: - typedef typename internal::Process< Slices, true, - internal::Inputs, - internal::Outputs >::FunctionType + typedef typename internal::Process< true, + internal::Inputs, + internal::Outputs >::FunctionType FunctionType; explicit SerialProcess(FunctionType function) - : internal::Process< Slices, true, - internal::Inputs, - internal::Outputs >(function) { + : internal::Process< true, + internal::Inputs, + internal::Outputs >(function) { //empty } }; @@ -718,31 +717,31 @@ class Network : public internal::ClockListener { template < typename I1, typename I2, typename I3, typename I4, typename I5, typename O1, typename O2, typename O3, typename O4, typename O5> - class ParallelProcess< internal::Inputs, - internal::Outputs > - : public internal::Process< Slices, false, - internal::Inputs, - internal::Outputs >{ + class ParallelProcess< internal::Inputs, + internal::Outputs > + : public internal::Process< false, + internal::Inputs, + internal::Outputs >{ public: - typedef typename internal::Process< Slices, false, - internal::Inputs, - internal::Outputs >::FunctionType + typedef typename internal::Process< false, + internal::Inputs, + internal::Outputs >::FunctionType FunctionType; explicit ParallelProcess(FunctionType function) - : internal::Process< Slices, false, - internal::Inputs, - internal::Outputs >(function) { + : internal::Process< false, + internal::Inputs, + internal::Outputs >(function) { //empty } }; template - class Switch : public internal::Switch { + class Switch : public internal::Switch { public: }; template - class Select : public internal::Select { + class Select : public internal::Select { public: }; @@ -750,15 +749,15 @@ class Network : public internal::ClockListener { typename I3 = embb::base::internal::Nil, typename I4 = embb::base::internal::Nil, typename I5 = embb::base::internal::Nil> - class Sink : public internal::Sink > { + class Sink : public internal::Sink< + internal::Inputs > { public: - typedef typename internal::Sink >::FunctionType FunctionType; + typedef typename internal::Sink< + internal::Inputs >::FunctionType FunctionType; explicit Sink(FunctionType function) - : internal::Sink >(function) { + : internal::Sink< + internal::Inputs >(function) { //empty } }; @@ -767,16 +766,16 @@ class Network : public internal::ClockListener { typename O3 = embb::base::internal::Nil, typename O4 = embb::base::internal::Nil, typename O5 = embb::base::internal::Nil> - class Source : public internal::Source > { + class Source : public internal::Source< + internal::Outputs > { public: - typedef typename internal::Source >::FunctionType + typedef typename internal::Source< + internal::Outputs >::FunctionType FunctionType; explicit Source(FunctionType function) - : internal::Source >(function) { + : internal::Source< + internal::Outputs >(function) { //empty } }; @@ -787,10 +786,10 @@ class Network : public internal::ClockListener { } template - class ConstantSource : public internal::ConstantSource { + class ConstantSource : public internal::ConstantSource { public: explicit ConstantSource(Type value) - : internal::ConstantSource(value) { + : internal::ConstantSource(value) { //empty } }; @@ -800,24 +799,36 @@ class Network : public internal::ClockListener { sources_.push_back(&source); } - void operator () () { + void operator () (int slices) { + slices_ = slices; + internal::SchedulerSequential sched_seq; - internal::SchedulerMTAPI sched_mtapi; + internal::SchedulerMTAPI sched_mtapi(slices_); internal::Scheduler * sched = &sched_mtapi; internal::InitData init_data; + init_data.slices = slices_; init_data.sched = sched; init_data.sink_listener = this; + sink_counter_ = reinterpret_cast*>( + embb::base::Allocation::Allocate( + sizeof(embb::base::Atomic)*slices_)); + for (int ii = 0; ii < slices_; ii++) { + sink_counter_[ii] = 0; + } + sink_count_ = 0; for (size_t it = 0; it < sources_.size(); it++) sources_[it]->Init(&init_data); - for (int ii = 0; ii < Slices; ii++) sink_counter_[ii] = 0; + for (int ii = 0; ii < slices_; ii++) { + sink_counter_[ii] = 0; + } int clock = 0; while (clock >= 0) { - const int idx = clock % Slices; + const int idx = clock % slices_; while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield(); sched->WaitForSlice(idx); if (!SpawnClock(clock)) @@ -825,10 +836,10 @@ class Network : public internal::ClockListener { clock++; } - int ii = clock - Slices + 1; + int ii = clock - slices_ + 1; if (ii < 0) ii = 0; for (; ii < clock; ii++) { - const int idx = ii % Slices; + const int idx = ii % slices_; while (sink_counter_[idx] > 0) embb::base::Thread::CurrentYield(); sched->WaitForSlice(idx); } @@ -841,7 +852,7 @@ class Network : public internal::ClockListener { * corresponding slot, thus allowing a new token to be emitted. */ virtual void OnClock(int clock) { - const int idx = clock % Slices; + const int idx = clock % slices_; const int cnt = --sink_counter_[idx]; if (cnt < 0) EMBB_THROW(embb::base::ErrorException, @@ -861,15 +872,16 @@ class Network : public internal::ClockListener { std::vector processes_; std::vector sources_; std::vector sinks_; - embb::base::Atomic sink_counter_[Slices]; + embb::base::Atomic * sink_counter_; int sink_count_; + int slices_; #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY std::vector spawn_history_[Slices]; #endif bool SpawnClock(int clock) { - const int idx = clock % Slices; + const int idx = clock % slices_; bool result = true; #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY spawn_history_[idx].push_back(clock); diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.cc b/dataflow_cpp/test/dataflow_cpp_test_simple.cc index b434625..810ad01 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.cc +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.cc @@ -39,7 +39,7 @@ #define NUM_SLICES 8 #define TEST_COUNT 12 -typedef embb::dataflow::Network MyNetwork; +typedef embb::dataflow::Network MyNetwork; typedef MyNetwork::ConstantSource< int > MyConstantSource; typedef MyNetwork::Source< int > MySource; typedef MyNetwork::SerialProcess< MyNetwork::Inputs::Type, @@ -206,7 +206,7 @@ void SimpleTest::TestBasic() { network.AddSource(source); try { - network(); + network(NUM_SLICES); } catch (embb::base::ErrorException & e) { PT_ASSERT_MSG(false, e.What()); } -- libgit2 0.26.0