Commit 64b15b2a by Marcus Winter

dataflow_cpp: preparations for deferred slice initialization

parent f61ec036
...@@ -44,6 +44,7 @@ class Node { ...@@ -44,6 +44,7 @@ class Node {
virtual bool HasOutputs() const { return false; } virtual bool HasOutputs() const { return false; }
virtual void Run(int clock) = 0; virtual void Run(int clock) = 0;
virtual bool IsFullyConnected() = 0; virtual bool IsFullyConnected() = 0;
virtual bool IsSequential() { return true; }
virtual bool Start(int /*clock*/) { virtual bool Start(int /*clock*/) {
EMBB_THROW(embb::base::ErrorException, EMBB_THROW(embb::base::ErrorException,
"Nodes are started implicitly."); "Nodes are started implicitly.");
......
...@@ -98,6 +98,10 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>, ...@@ -98,6 +98,10 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected(); return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
} }
virtual bool IsSequential() {
return Serial;
}
InputsType & GetInputs() { InputsType & GetInputs() {
return inputs_; return inputs_;
} }
......
...@@ -40,6 +40,7 @@ class Scheduler { ...@@ -40,6 +40,7 @@ class Scheduler {
virtual void Spawn(Action & action) = 0; virtual void Spawn(Action & action) = 0;
virtual void Enqueue(int process_id, Action & action) = 0; virtual void Enqueue(int process_id, Action & action) = 0;
virtual void WaitForSlice(int slice) = 0; virtual void WaitForSlice(int slice) = 0;
virtual int GetSlices() = 0;
}; };
} // namespace internal } // namespace internal
......
...@@ -44,6 +44,13 @@ class SchedulerMTAPI : public Scheduler { ...@@ -44,6 +44,13 @@ class SchedulerMTAPI : public Scheduler {
: slices_(slices) { : slices_(slices) {
embb::tasks::Node & node = embb::tasks::Node::GetInstance(); embb::tasks::Node & node = embb::tasks::Node::GetInstance();
int tl = std::min(
static_cast<int>(node.GetTaskLimit()),
static_cast<int>(node.GetGroupCount()));
if (tl < slices_) {
slices_ = tl;
}
group_ = reinterpret_cast<embb::tasks::Group**>( group_ = reinterpret_cast<embb::tasks::Group**>(
embb::base::Allocation::Allocate( embb::base::Allocation::Allocate(
sizeof(embb::tasks::Group*)*slices_)); sizeof(embb::tasks::Group*)*slices_));
...@@ -93,7 +100,7 @@ class SchedulerMTAPI : public Scheduler { ...@@ -93,7 +100,7 @@ class SchedulerMTAPI : public Scheduler {
virtual void WaitForSlice(int slice) { virtual void WaitForSlice(int slice) {
group_[slice]->WaitAll(MTAPI_INFINITE); group_[slice]->WaitAll(MTAPI_INFINITE);
} }
virtual int GetSlices() { return slices_; }
private: private:
embb::tasks::Group ** group_; embb::tasks::Group ** group_;
embb::tasks::Queue ** queue_; embb::tasks::Queue ** queue_;
......
...@@ -45,6 +45,7 @@ class SchedulerSequential : public Scheduler { ...@@ -45,6 +45,7 @@ class SchedulerSequential : public Scheduler {
action.RunSequential(); action.RunSequential();
} }
virtual void WaitForSlice(int /*slice*/) {} virtual void WaitForSlice(int /*slice*/) {}
virtual int GetSlices() { return 1; }
}; };
} // namespace internal } // namespace internal
......
...@@ -687,6 +687,9 @@ class Network : public internal::ClockListener { ...@@ -687,6 +687,9 @@ class Network : public internal::ClockListener {
slices_ = int(embb_core_count_available())*4; slices_ = int(embb_core_count_available())*4;
} }
sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_); sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
if (sched_->GetSlices() != slices_) {
slices_ = sched_->GetSlices();
}
sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>( sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate( embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_)); sizeof(embb::base::Atomic<int>)*slices_));
......
...@@ -130,6 +130,15 @@ class Node { ...@@ -130,6 +130,15 @@ class Node {
} }
/** /**
* Returns the number of available groups.
* \return The number of available groups
* \waitfree
*/
mtapi_uint_t GetGroupCount() const {
return group_count_;
}
/**
* Returns the number of available tasks. * Returns the number of available tasks.
* \return The number of available tasks * \return The number of available tasks
* \waitfree * \waitfree
...@@ -238,6 +247,7 @@ class Node { ...@@ -238,6 +247,7 @@ class Node {
mtapi_task_context_t * context); mtapi_task_context_t * context);
mtapi_uint_t queue_count_; mtapi_uint_t queue_count_;
mtapi_uint_t group_count_;
mtapi_uint_t task_limit_; mtapi_uint_t task_limit_;
mtapi_uint_t core_count_; mtapi_uint_t core_count_;
mtapi_uint_t worker_thread_count_; mtapi_uint_t worker_thread_count_;
......
...@@ -78,6 +78,9 @@ Node::Node( ...@@ -78,6 +78,9 @@ Node::Node(
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_QUEUES, &queue_count_, mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_QUEUES, &queue_count_,
sizeof(queue_count_), &status); sizeof(queue_count_), &status);
assert(MTAPI_SUCCESS == status); assert(MTAPI_SUCCESS == status);
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_GROUPS, &group_count_,
sizeof(group_count_), &status);
assert(MTAPI_SUCCESS == status);
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_TASKS, &task_limit_, mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_TASKS, &task_limit_,
sizeof(queue_count_), &status); sizeof(queue_count_), &status);
assert(MTAPI_SUCCESS == status); assert(MTAPI_SUCCESS == status);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment