Commit 7bc10024 by Marcus Winter

Merge branch 'embb564_dataflow_default_token_count' into development

parents ab653a94 ebdd3c18
......@@ -74,12 +74,21 @@ class In {
void SetConnected() { connected_ = true; }
void SetSlices(int slices) {
if (0 < slices_) {
for (int ii = 0; ii < slices_; ii++) {
values_[ii].~SignalType();
}
embb::base::Allocation::Free(values_);
values_ = NULL;
}
slices_ = slices;
values_ = reinterpret_cast<SignalType*>(
embb::base::Allocation::Allocate(
sizeof(SignalType)*slices_));
for (int ii = 0; ii < slices_; ii++) {
new (&values_[ii]) SignalType();
if (0 < slices_) {
values_ = reinterpret_cast<SignalType*>(
embb::base::Allocation::Allocate(
sizeof(SignalType)*slices_));
for (int ii = 0; ii < slices_; ii++) {
new (&values_[ii]) SignalType();
}
}
}
......
......@@ -59,6 +59,7 @@ class Inputs<embb::base::internal::Nil, embb::base::internal::Nil,
bool IsFullyConnected() {
return true;
}
void SetSlices(int /*slices*/) {}
};
template <typename T1>
......@@ -69,12 +70,22 @@ class Inputs<T1, embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 1;
explicit Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 1;
}
}
this->template Get<0>().SetSlices(slices_);
}
......@@ -122,12 +133,22 @@ class Inputs<T1, T2, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 2;
explicit Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 2;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
......@@ -181,12 +202,22 @@ class Inputs<T1, T2, T3, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 3;
explicit Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 3;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
......@@ -245,12 +276,22 @@ class Inputs<T1, T2, T3, T4, embb::base::internal::Nil>
In<T4>, embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 4;
explicit Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 4;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
......@@ -316,12 +357,22 @@ class Inputs
In<T4>, In<T5> >
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 5;
explicit Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 5;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
......
......@@ -44,17 +44,26 @@ class Node {
virtual bool HasOutputs() const { return false; }
virtual void Run(int clock) = 0;
virtual bool IsFullyConnected() = 0;
virtual bool IsSequential() { return true; }
virtual bool Start(int /*clock*/) {
EMBB_THROW(embb::base::ErrorException,
"Nodes are started implicitly.");
}
void SetScheduler(Scheduler * sched) {
sched_ = sched;
if (NULL != sched_) {
SetSlices(sched_->GetSlices());
} else {
SetSlices(0);
}
}
protected:
Scheduler * sched_;
static int next_process_id_;
void SetScheduler(Scheduler * sched) { sched_ = sched; }
static int GetNextProcessID() { return next_process_id_++; }
virtual void SetSlices(int /*slices*/) {};
};
} // namespace internal
......
......@@ -53,11 +53,11 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
typedef ProcessExecutor< InputsType, OutputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
Process(int slices, Scheduler * sched, FunctionType function)
: inputs_(slices)
Process(Scheduler * sched, FunctionType function)
: inputs_()
, executor_(function)
, action_(NULL)
, slices_(slices) {
, slices_(0) {
next_clock_ = 0;
queued_clock_ = 0;
bool ordered = Serial;
......@@ -67,12 +67,6 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
queue_id_ = 0;
}
inputs_.SetListener(this);
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
SetScheduler(sched);
}
......@@ -98,6 +92,10 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
}
virtual bool IsSequential() {
return Serial;
}
InputsType & GetInputs() {
return inputs_;
}
......@@ -169,6 +167,23 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
embb::base::Atomic<int> queued_clock_;
int queue_id_;
int slices_;
virtual void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(action_);
action_ = NULL;
}
slices_ = slices;
inputs_.SetSlices(slices);
if (0 < slices_) {
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
}
}
};
} // namespace internal
......
......@@ -40,6 +40,7 @@ class Scheduler {
virtual void Spawn(Action & action) = 0;
virtual void Enqueue(int process_id, Action & action) = 0;
virtual void WaitForSlice(int slice) = 0;
virtual int GetSlices() = 0;
};
} // namespace internal
......
......@@ -44,6 +44,13 @@ class SchedulerMTAPI : public Scheduler {
: slices_(slices) {
embb::tasks::Node & node = embb::tasks::Node::GetInstance();
int tl = std::min(
static_cast<int>(node.GetTaskLimit()),
static_cast<int>(node.GetGroupCount()));
if (tl < slices_) {
slices_ = tl;
}
group_ = reinterpret_cast<embb::tasks::Group**>(
embb::base::Allocation::Allocate(
sizeof(embb::tasks::Group*)*slices_));
......@@ -93,7 +100,7 @@ class SchedulerMTAPI : public Scheduler {
virtual void WaitForSlice(int slice) {
group_[slice]->WaitAll(MTAPI_INFINITE);
}
virtual int GetSlices() { return slices_; }
private:
embb::tasks::Group ** group_;
embb::tasks::Queue ** queue_;
......
......@@ -45,6 +45,7 @@ class SchedulerSequential : public Scheduler {
action.RunSequential();
}
virtual void WaitForSlice(int /*slice*/) {}
virtual int GetSlices() { return 1; }
};
} // namespace internal
......
......@@ -44,7 +44,7 @@ class Select
typedef Inputs<bool, Type, Type> InputsType;
typedef Outputs<Type> OutputsType;
Select(int slices, Scheduler * sched) : inputs_(slices), slices_(slices) {
Select(Scheduler * sched) : inputs_(), slices_(0) {
inputs_.SetListener(this);
SetScheduler(sched);
}
......@@ -119,6 +119,11 @@ class Select
InputsType inputs_;
OutputsType outputs_;
int slices_;
virtual void SetSlices(int slices) {
slices_ = slices;
inputs_.SetSlices(slices);
}
};
} // namespace internal
......
......@@ -48,23 +48,17 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
typedef SinkExecutor< InputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
Sink(int slices, Scheduler * sched, ClockListener * listener,
Sink(Scheduler * sched, ClockListener * listener,
FunctionType function)
: inputs_(slices)
: inputs_()
, executor_(function)
, action_(NULL)
, slices_(slices) {
, slices_(0) {
next_clock_ = 0;
queued_clock_ = 0;
queue_id_ = GetNextProcessID();
inputs_.SetListener(this);
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
SetListener(listener);
listener_ = listener;
SetScheduler(sched);
}
......@@ -74,10 +68,6 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
}
}
void SetListener(ClockListener * listener) {
listener_ = listener;
}
virtual bool HasInputs() const {
return inputs_.Size() > 0;
}
......@@ -143,6 +133,23 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
embb::base::Atomic<int> queued_clock_;
int queue_id_;
int slices_;
virtual void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(action_);
action_ = NULL;
}
slices_ = slices;
inputs_.SetSlices(slices);
if (0 < slices_) {
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
}
}
};
} // namespace internal
......
......@@ -44,7 +44,7 @@ class Switch
typedef Inputs<bool, Type> InputsType;
typedef Outputs<Type, Type> OutputsType;
Switch(int slices, Scheduler * sched) : inputs_(slices) {
Switch(Scheduler * sched) : inputs_() {
inputs_.SetListener(this);
SetScheduler(sched);
}
......@@ -115,6 +115,10 @@ class Switch
private:
InputsType inputs_;
OutputsType outputs_;
virtual void SetSlices(int slices) {
inputs_.SetSlices(slices);
}
};
} // namespace internal
......
......@@ -58,6 +58,14 @@ class Network {
public:
/**
* Constructs an empty network.
* \note The number of concurrent tokens will automatically be derived from
* the structure of the network on the first call to operator(), and the
* corresponding resources will be allocated then.
*/
Network() {}
/**
* Constructs an empty network.
* \param slices Number of concurrent tokens allowed in the network.
*/
explicit Network(int slices) {}
......@@ -673,6 +681,10 @@ class Network {
/**
* Executes the network until one of the the sources returns \c false.
* \note If the network was default constructed, the number of concurrent
* tokens will automatically be derived from the structure of the network
* on the first call of the operator, and the corresponding resources will
* be allocated then.
*/
void operator () ();
};
......@@ -681,16 +693,14 @@ class Network {
class Network : public internal::ClockListener {
public:
Network()
: sink_counter_(NULL), sink_count_(0), slices_(0), sched_(NULL) {
// empty
}
explicit Network(int slices)
: sink_counter_(NULL), slices_(slices), sched_(NULL) {
sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0;
}
sink_count_ = 0;
: sink_counter_(NULL), sink_count_(0), slices_(slices), sched_(NULL) {
PrepareSlices();
}
~Network() {
......@@ -704,24 +714,22 @@ class Network : public internal::ClockListener {
}
}
template <typename T1, typename T2 = embb::base::internal::Nil,
template <typename T1,
typename T2 = embb::base::internal::Nil,
typename T3 = embb::base::internal::Nil,
typename T4 = embb::base::internal::Nil,
typename T5 = embb::base::internal::Nil>
class Inputs : public internal::Inputs<T1, T2, T3, T4, T5> {
public:
explicit Inputs(int slices)
: internal::Inputs<T1, T2, T3, T4, T5>(slices) {}
class Inputs {
// empty
};
template <typename T1, typename T2 = embb::base::internal::Nil,
template <typename T1,
typename T2 = embb::base::internal::Nil,
typename T3 = embb::base::internal::Nil,
typename T4 = embb::base::internal::Nil,
typename T5 = embb::base::internal::Nil>
class Outputs : public internal::Outputs<T1, T2, T3, T4, T5> {
public:
Outputs()
: internal::Outputs<T1, T2, T3, T4, T5>() {}
class Outputs {
// empty
};
template <class Inputs, class Outputs> class SerialProcess;
......@@ -743,7 +751,7 @@ class Network : public internal::ClockListener {
: internal::Process< true,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >(
network.slices_, network.sched_, function) {
network.sched_, function) {
network.processes_.push_back(this);
}
};
......@@ -767,7 +775,7 @@ class Network : public internal::ClockListener {
: internal::Process< false,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >(
network.slices_, network.sched_, function) {
network.sched_, function) {
network.processes_.push_back(this);
}
};
......@@ -776,7 +784,7 @@ class Network : public internal::ClockListener {
class Switch : public internal::Switch<Type> {
public:
explicit Switch(Network & network)
: internal::Switch<Type>(network.slices_, network.sched_) {
: internal::Switch<Type>(network.sched_) {
network.processes_.push_back(this);
}
};
......@@ -785,7 +793,7 @@ class Network : public internal::ClockListener {
class Select : public internal::Select<Type> {
public:
explicit Select(Network & network)
: internal::Select<Type>(network.slices_, network.sched_) {
: internal::Select<Type>(network.sched_) {
network.processes_.push_back(this);
}
};
......@@ -803,7 +811,7 @@ class Network : public internal::ClockListener {
explicit Sink(Network & network, FunctionType function)
: internal::Sink<
internal::Inputs<I1, I2, I3, I4, I5> >(
network.slices_, network.sched_, &network, function) {
network.sched_, &network, function) {
network.sinks_.push_back(this);
network.sink_count_++;
}
......@@ -851,6 +859,27 @@ class Network : public internal::ClockListener {
}
void operator () () {
if (0 >= slices_) {
slices_ = static_cast<int>(
sources_.size() +
sinks_.size());
for (size_t ii = 0; ii < processes_.size(); ii++) {
int tt = processes_[ii]->IsSequential() ? 1 :
static_cast<int>(embb_core_count_available());
slices_ += tt;
}
PrepareSlices();
for (size_t ii = 0; ii < sources_.size(); ii++) {
sources_[ii]->SetScheduler(sched_);
}
for (size_t ii = 0; ii < processes_.size(); ii++) {
processes_[ii]->SetScheduler(sched_);
}
for (size_t ii = 0; ii < sinks_.size(); ii++) {
sinks_[ii]->SetScheduler(sched_);
}
}
int clock = 0;
while (clock >= 0) {
const int idx = clock % slices_;
......@@ -907,6 +936,19 @@ class Network : public internal::ClockListener {
}
return result;
}
void PrepareSlices() {
sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
if (sched_->GetSlices() != slices_) {
slices_ = sched_->GetSlices();
}
sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0;
}
}
};
#endif // DOXYGEN
......
......@@ -130,6 +130,24 @@ class Node {
}
/**
* Returns the number of available groups.
* \return The number of available groups
* \waitfree
*/
mtapi_uint_t GetGroupCount() const {
return group_count_;
}
/**
* Returns the number of available tasks.
* \return The number of available tasks
* \waitfree
*/
mtapi_uint_t GetTaskLimit() const {
return task_limit_;
}
/**
* Returns the number of available cores.
* \return The number of available cores
* \waitfree
......@@ -229,6 +247,8 @@ class Node {
mtapi_task_context_t * context);
mtapi_uint_t queue_count_;
mtapi_uint_t group_count_;
mtapi_uint_t task_limit_;
mtapi_uint_t core_count_;
mtapi_uint_t worker_thread_count_;
mtapi_action_hndl_t action_handle_;
......
......@@ -78,6 +78,12 @@ Node::Node(
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_QUEUES, &queue_count_,
sizeof(queue_count_), &status);
assert(MTAPI_SUCCESS == status);
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_GROUPS, &group_count_,
sizeof(group_count_), &status);
assert(MTAPI_SUCCESS == status);
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_TASKS, &task_limit_,
sizeof(queue_count_), &status);
assert(MTAPI_SUCCESS == status);
core_count_ = info.hardware_concurrency;
worker_thread_count_ = embb_core_set_count(&attr->core_affinity);
action_handle_ = mtapi_action_create(TASKS_CPP_JOB, action_func,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment