Commit a5182aa5 by Marcus Winter

dataflow_cpp: support for default token count

parent 64b15b2a
......@@ -74,12 +74,21 @@ class In {
void SetConnected() { connected_ = true; }
void SetSlices(int slices) {
if (0 < slices_) {
for (int ii = 0; ii < slices_; ii++) {
values_[ii].~SignalType();
}
embb::base::Allocation::Free(values_);
values_ = NULL;
}
slices_ = slices;
values_ = reinterpret_cast<SignalType*>(
embb::base::Allocation::Allocate(
sizeof(SignalType)*slices_));
for (int ii = 0; ii < slices_; ii++) {
new (&values_[ii]) SignalType();
if (0 < slices_) {
values_ = reinterpret_cast<SignalType*>(
embb::base::Allocation::Allocate(
sizeof(SignalType)*slices_));
for (int ii = 0; ii < slices_; ii++) {
new (&values_[ii]) SignalType();
}
}
}
......
......@@ -59,6 +59,7 @@ class Inputs<embb::base::internal::Nil, embb::base::internal::Nil,
bool IsFullyConnected() {
return true;
}
void SetSlices(int /*slices*/) {}
};
template <typename T1>
......@@ -69,12 +70,22 @@ class Inputs<T1, embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 1;
explicit Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 1;
}
}
this->template Get<0>().SetSlices(slices_);
}
......@@ -122,12 +133,22 @@ class Inputs<T1, T2, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 2;
explicit Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 2;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
......@@ -181,12 +202,22 @@ class Inputs<T1, T2, T3, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 3;
explicit Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 3;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
......@@ -245,12 +276,22 @@ class Inputs<T1, T2, T3, T4, embb::base::internal::Nil>
In<T4>, embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 4;
explicit Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 4;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
......@@ -316,12 +357,22 @@ class Inputs
In<T4>, In<T5> >
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 5;
explicit Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 5;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
......
......@@ -49,13 +49,21 @@ class Node {
EMBB_THROW(embb::base::ErrorException,
"Nodes are started implicitly.");
}
void SetScheduler(Scheduler * sched) {
sched_ = sched;
if (NULL != sched_) {
SetSlices(sched_->GetSlices());
} else {
SetSlices(0);
}
}
protected:
Scheduler * sched_;
static int next_process_id_;
void SetScheduler(Scheduler * sched) { sched_ = sched; }
static int GetNextProcessID() { return next_process_id_++; }
virtual void SetSlices(int /*slices*/) {};
};
} // namespace internal
......
......@@ -53,11 +53,11 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
typedef ProcessExecutor< InputsType, OutputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
Process(int slices, Scheduler * sched, FunctionType function)
: inputs_(slices)
Process(Scheduler * sched, FunctionType function)
: inputs_()
, executor_(function)
, action_(NULL)
, slices_(slices) {
, slices_(0) {
next_clock_ = 0;
queued_clock_ = 0;
bool ordered = Serial;
......@@ -67,12 +67,6 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
queue_id_ = 0;
}
inputs_.SetListener(this);
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
SetScheduler(sched);
}
......@@ -173,6 +167,23 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
embb::base::Atomic<int> queued_clock_;
int queue_id_;
int slices_;
virtual void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(action_);
action_ = NULL;
}
slices_ = slices;
inputs_.SetSlices(slices);
if (0 < slices_) {
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
}
}
};
} // namespace internal
......
......@@ -44,7 +44,7 @@ class Select
typedef Inputs<bool, Type, Type> InputsType;
typedef Outputs<Type> OutputsType;
Select(int slices, Scheduler * sched) : inputs_(slices), slices_(slices) {
Select(Scheduler * sched) : inputs_(), slices_(0) {
inputs_.SetListener(this);
SetScheduler(sched);
}
......@@ -119,6 +119,11 @@ class Select
InputsType inputs_;
OutputsType outputs_;
int slices_;
virtual void SetSlices(int slices) {
slices_ = slices;
inputs_.SetSlices(slices);
}
};
} // namespace internal
......
......@@ -48,23 +48,17 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
typedef SinkExecutor< InputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
Sink(int slices, Scheduler * sched, ClockListener * listener,
Sink(Scheduler * sched, ClockListener * listener,
FunctionType function)
: inputs_(slices)
: inputs_()
, executor_(function)
, action_(NULL)
, slices_(slices) {
, slices_(0) {
next_clock_ = 0;
queued_clock_ = 0;
queue_id_ = GetNextProcessID();
inputs_.SetListener(this);
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
SetListener(listener);
listener_ = listener;
SetScheduler(sched);
}
......@@ -74,10 +68,6 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
}
}
void SetListener(ClockListener * listener) {
listener_ = listener;
}
virtual bool HasInputs() const {
return inputs_.Size() > 0;
}
......@@ -143,6 +133,23 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
embb::base::Atomic<int> queued_clock_;
int queue_id_;
int slices_;
virtual void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(action_);
action_ = NULL;
}
slices_ = slices;
inputs_.SetSlices(slices);
if (0 < slices_) {
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
}
}
};
} // namespace internal
......
......@@ -44,7 +44,7 @@ class Switch
typedef Inputs<bool, Type> InputsType;
typedef Outputs<Type, Type> OutputsType;
Switch(int slices, Scheduler * sched) : inputs_(slices) {
Switch(Scheduler * sched) : inputs_() {
inputs_.SetListener(this);
SetScheduler(sched);
}
......@@ -115,6 +115,10 @@ class Switch
private:
InputsType inputs_;
OutputsType outputs_;
virtual void SetSlices(int slices) {
inputs_.SetSlices(slices);
}
};
} // namespace internal
......
......@@ -58,6 +58,14 @@ class Network {
public:
/**
* Constructs an empty network.
* \note The number of concurrent tokens will be derived from the structure
* of the network automatically on the first call to operator () and the
* corresponding resources will be allocated then.
*/
Network() {}
/**
* Constructs an empty network.
* \param slices Number of concurrent tokens allowed in the network.
*/
explicit Network(int slices) {}
......@@ -673,6 +681,10 @@ class Network {
/**
* Executes the network until one of the the sources returns \c false.
* \note If the network was default constructed the number of concurrent
* tokens will be derived from the structure of the network automatically
* on the first call of the operator and the corresponding resources will
* be allocated then.
*/
void operator () ();
};
......@@ -681,22 +693,14 @@ class Network {
class Network : public internal::ClockListener {
public:
explicit Network(int slices = 0)
: sink_counter_(NULL), slices_(slices), sched_(NULL) {
if (0 >= slices) {
slices_ = int(embb_core_count_available())*4;
}
sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
if (sched_->GetSlices() != slices_) {
slices_ = sched_->GetSlices();
}
sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0;
}
sink_count_ = 0;
Network()
: sink_counter_(NULL), sink_count_(0), slices_(0), sched_(NULL) {
// empty
}
explicit Network(int slices)
: sink_counter_(NULL), sink_count_(0), slices_(slices), sched_(NULL) {
PrepareSlices();
}
~Network() {
......@@ -710,24 +714,22 @@ class Network : public internal::ClockListener {
}
}
template <typename T1, typename T2 = embb::base::internal::Nil,
template <typename T1,
typename T2 = embb::base::internal::Nil,
typename T3 = embb::base::internal::Nil,
typename T4 = embb::base::internal::Nil,
typename T5 = embb::base::internal::Nil>
class Inputs : public internal::Inputs<T1, T2, T3, T4, T5> {
public:
explicit Inputs(int slices)
: internal::Inputs<T1, T2, T3, T4, T5>(slices) {}
class Inputs {
// empty
};
template <typename T1, typename T2 = embb::base::internal::Nil,
template <typename T1,
typename T2 = embb::base::internal::Nil,
typename T3 = embb::base::internal::Nil,
typename T4 = embb::base::internal::Nil,
typename T5 = embb::base::internal::Nil>
class Outputs : public internal::Outputs<T1, T2, T3, T4, T5> {
public:
Outputs()
: internal::Outputs<T1, T2, T3, T4, T5>() {}
class Outputs {
// empty
};
template <class Inputs, class Outputs> class SerialProcess;
......@@ -749,7 +751,7 @@ class Network : public internal::ClockListener {
: internal::Process< true,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >(
network.slices_, network.sched_, function) {
network.sched_, function) {
network.processes_.push_back(this);
}
};
......@@ -773,7 +775,7 @@ class Network : public internal::ClockListener {
: internal::Process< false,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >(
network.slices_, network.sched_, function) {
network.sched_, function) {
network.processes_.push_back(this);
}
};
......@@ -782,7 +784,7 @@ class Network : public internal::ClockListener {
class Switch : public internal::Switch<Type> {
public:
explicit Switch(Network & network)
: internal::Switch<Type>(network.slices_, network.sched_) {
: internal::Switch<Type>(network.sched_) {
network.processes_.push_back(this);
}
};
......@@ -791,7 +793,7 @@ class Network : public internal::ClockListener {
class Select : public internal::Select<Type> {
public:
explicit Select(Network & network)
: internal::Select<Type>(network.slices_, network.sched_) {
: internal::Select<Type>(network.sched_) {
network.processes_.push_back(this);
}
};
......@@ -809,7 +811,7 @@ class Network : public internal::ClockListener {
explicit Sink(Network & network, FunctionType function)
: internal::Sink<
internal::Inputs<I1, I2, I3, I4, I5> >(
network.slices_, network.sched_, &network, function) {
network.sched_, &network, function) {
network.sinks_.push_back(this);
network.sink_count_++;
}
......@@ -857,6 +859,27 @@ class Network : public internal::ClockListener {
}
void operator () () {
if (0 >= slices_) {
slices_ = static_cast<int>(
sources_.size() +
sinks_.size());
for (size_t ii = 0; ii < processes_.size(); ii++) {
int tt = processes_[ii]->IsSequential() ? 1 :
static_cast<int>(embb_core_count_available());
slices_ += tt;
}
PrepareSlices();
for (size_t ii = 0; ii < sources_.size(); ii++) {
sources_[ii]->SetScheduler(sched_);
}
for (size_t ii = 0; ii < processes_.size(); ii++) {
processes_[ii]->SetScheduler(sched_);
}
for (size_t ii = 0; ii < sinks_.size(); ii++) {
sinks_[ii]->SetScheduler(sched_);
}
}
int clock = 0;
while (clock >= 0) {
const int idx = clock % slices_;
......@@ -913,6 +936,19 @@ class Network : public internal::ClockListener {
}
return result;
}
void PrepareSlices() {
sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
if (sched_->GetSlices() != slices_) {
slices_ = sched_->GetSlices();
}
sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0;
}
}
};
#endif // DOXYGEN
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment