diff --git a/dataflow_cpp/include/embb/dataflow/internal/node.h b/dataflow_cpp/include/embb/dataflow/internal/node.h index 5dfebd8..57a8a94 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/node.h +++ b/dataflow_cpp/include/embb/dataflow/internal/node.h @@ -44,6 +44,7 @@ class Node { virtual bool HasOutputs() const { return false; } virtual void Run(int clock) = 0; virtual bool IsFullyConnected() = 0; + virtual bool IsSequential() { return true; } virtual bool Start(int /*clock*/) { EMBB_THROW(embb::base::ErrorException, "Nodes are started implicitly."); diff --git a/dataflow_cpp/include/embb/dataflow/internal/process.h b/dataflow_cpp/include/embb/dataflow/internal/process.h index 1dfade2..584c948 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process.h @@ -98,6 +98,10 @@ class Process< Serial, Inputs, return inputs_.IsFullyConnected() && outputs_.IsFullyConnected(); } + virtual bool IsSequential() { + return Serial; + } + InputsType & GetInputs() { return inputs_; } diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h index 9f86e96..dbf5556 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h @@ -40,6 +40,7 @@ class Scheduler { virtual void Spawn(Action & action) = 0; virtual void Enqueue(int process_id, Action & action) = 0; virtual void WaitForSlice(int slice) = 0; + virtual int GetSlices() = 0; }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h index 856b4a2..0f7319d 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h @@ -44,6 +44,13 @@ class SchedulerMTAPI : public Scheduler { : slices_(slices) { embb::tasks::Node & node = embb::tasks::Node::GetInstance(); + int tl = std::min( + static_cast(node.GetTaskLimit()), + static_cast(node.GetGroupCount())); + if (tl < slices_) { + slices_ = tl; + } + group_ = reinterpret_cast( embb::base::Allocation::Allocate( sizeof(embb::tasks::Group*)*slices_)); @@ -93,7 +100,7 @@ class SchedulerMTAPI : public Scheduler { virtual void WaitForSlice(int slice) { group_[slice]->WaitAll(MTAPI_INFINITE); } - + virtual int GetSlices() { return slices_; } private: embb::tasks::Group ** group_; embb::tasks::Queue ** queue_; diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h index 575fea3..c9c4b9d 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h @@ -45,6 +45,7 @@ class SchedulerSequential : public Scheduler { action.RunSequential(); } virtual void WaitForSlice(int /*slice*/) {} + virtual int GetSlices() { return 1; } }; } // namespace internal diff --git a/dataflow_cpp/include/embb/dataflow/network.h b/dataflow_cpp/include/embb/dataflow/network.h index 7dccb0b..53b17e0 100644 --- a/dataflow_cpp/include/embb/dataflow/network.h +++ b/dataflow_cpp/include/embb/dataflow/network.h @@ -687,6 +687,9 @@ class Network : public internal::ClockListener { slices_ = int(embb_core_count_available())*4; } sched_ = embb::base::Allocation::New(slices_); + if (sched_->GetSlices() != slices_) { + slices_ = sched_->GetSlices(); + } sink_counter_ = reinterpret_cast*>( embb::base::Allocation::Allocate( sizeof(embb::base::Atomic)*slices_)); diff --git a/tasks_cpp/include/embb/tasks/node.h b/tasks_cpp/include/embb/tasks/node.h index 3561a32..1695f54 100644 --- a/tasks_cpp/include/embb/tasks/node.h +++ b/tasks_cpp/include/embb/tasks/node.h @@ -130,6 +130,15 @@ class Node { } /** + * Returns the number of available groups. + * \return The number of available groups + * \waitfree + */ + mtapi_uint_t GetGroupCount() const { + return group_count_; + } + + /** * Returns the number of available tasks. * \return The number of available tasks * \waitfree @@ -238,6 +247,7 @@ class Node { mtapi_task_context_t * context); mtapi_uint_t queue_count_; + mtapi_uint_t group_count_; mtapi_uint_t task_limit_; mtapi_uint_t core_count_; mtapi_uint_t worker_thread_count_; diff --git a/tasks_cpp/src/node.cc b/tasks_cpp/src/node.cc index 031338a..6d92e9c 100644 --- a/tasks_cpp/src/node.cc +++ b/tasks_cpp/src/node.cc @@ -78,6 +78,9 @@ Node::Node( mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_QUEUES, &queue_count_, sizeof(queue_count_), &status); assert(MTAPI_SUCCESS == status); + mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_GROUPS, &group_count_, + sizeof(group_count_), &status); + assert(MTAPI_SUCCESS == status); mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_TASKS, &task_limit_, sizeof(queue_count_), &status); assert(MTAPI_SUCCESS == status);