Commit 646ed8e0 by Marcus Winter

Merge branch 'development' into embb458_mtapi_initialization

# Conflicts:
#	dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h
parents f304e139 8e973926
Embedded Multicore Building Blocks (EMB²)
Embedded Multicore Building Blocks (EMB²)
=========================================
......
......@@ -35,6 +35,7 @@ class ClockListener {
public:
virtual ~ClockListener() {}
virtual void OnClock(int /*clock*/) = 0;
virtual bool OnHasCycle(ClockListener * /*node*/) { return false; }
};
} // namespace internal
......
......@@ -73,8 +73,20 @@ class In {
bool IsConnected() const { return connected_; }
void SetConnected() { connected_ = true; }
bool HasCycle(ClockListener * node) {
return listener_->OnHasCycle(node);
}
void SetSlices(int slices) {
if (0 < slices_) {
for (int ii = 0; ii < slices_; ii++) {
values_[ii].~SignalType();
}
embb::base::Allocation::Free(values_);
values_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
values_ = reinterpret_cast<SignalType*>(
embb::base::Allocation::Allocate(
sizeof(SignalType)*slices_));
......@@ -82,6 +94,7 @@ class In {
new (&values_[ii]) SignalType();
}
}
}
void SetListener(ClockListener * listener) { listener_ = listener; }
......
......@@ -59,6 +59,7 @@ class Inputs<embb::base::internal::Nil, embb::base::internal::Nil,
bool IsFullyConnected() {
return true;
}
void SetSlices(int /*slices*/) {}
};
template <typename T1>
......@@ -69,13 +70,23 @@ class Inputs<T1, embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 1;
}
}
this->template Get<0>().SetSlices(slices_);
}
~Inputs() {
......@@ -106,6 +117,9 @@ class Inputs<T1, embb::base::internal::Nil, embb::base::internal::Nil,
listener_->OnClock(clock);
}
}
virtual bool OnHasCycle(ClockListener * node) {
return listener_->OnHasCycle(node);
}
bool IsFullyConnected() {
return this->template Get<0>().IsConnected();
}
......@@ -122,13 +136,23 @@ class Inputs<T1, T2, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 2;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
}
......@@ -164,6 +188,9 @@ class Inputs<T1, T2, embb::base::internal::Nil,
listener_->OnClock(clock);
}
}
virtual bool OnHasCycle(ClockListener * node) {
return listener_->OnHasCycle(node);
}
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &
this->template Get<1>().IsConnected();
......@@ -181,13 +208,23 @@ class Inputs<T1, T2, T3, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 3;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
this->template Get<2>().SetSlices(slices_);
......@@ -228,6 +265,9 @@ class Inputs<T1, T2, T3, embb::base::internal::Nil,
listener_->OnClock(clock);
}
}
virtual bool OnHasCycle(ClockListener * node) {
return listener_->OnHasCycle(node);
}
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &
this->template Get<1>().IsConnected() &
......@@ -245,13 +285,23 @@ class Inputs<T1, T2, T3, T4, embb::base::internal::Nil>
In<T4>, embb::base::internal::Nil>
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 4;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
this->template Get<2>().SetSlices(slices_);
......@@ -297,6 +347,9 @@ class Inputs<T1, T2, T3, T4, embb::base::internal::Nil>
listener_->OnClock(clock);
}
}
virtual bool OnHasCycle(ClockListener * node) {
return listener_->OnHasCycle(node);
}
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &
this->template Get<1>().IsConnected() &
......@@ -316,13 +369,23 @@ class Inputs
In<T4>, In<T5> >
, public ClockListener {
public:
explicit Inputs(int slices) : count_(NULL), slices_(slices) {
Inputs() : count_(NULL), slices_(0) {
// empty
}
void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(count_);
count_ = NULL;
}
slices_ = slices;
if (0 < slices_) {
count_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
count_[ii] = 5;
}
}
this->template Get<0>().SetSlices(slices_);
this->template Get<1>().SetSlices(slices_);
this->template Get<2>().SetSlices(slices_);
......@@ -373,6 +436,9 @@ class Inputs
listener_->OnClock(clock);
}
}
virtual bool OnHasCycle(ClockListener * node) {
return listener_->OnHasCycle(node);
}
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected() &
......
......@@ -44,17 +44,27 @@ class Node {
virtual bool HasOutputs() const { return false; }
virtual void Run(int clock) = 0;
virtual bool IsFullyConnected() = 0;
virtual bool IsSequential() { return true; }
virtual bool HasCycle() { return false; }
virtual bool Start(int /*clock*/) {
EMBB_THROW(embb::base::ErrorException,
"Nodes are started implicitly.");
}
void SetScheduler(Scheduler * sched) {
sched_ = sched;
if (NULL != sched_) {
SetSlices(sched_->GetSlices());
} else {
SetSlices(0);
}
}
protected:
Scheduler * sched_;
static int next_process_id_;
void SetScheduler(Scheduler * sched) { sched_ = sched; }
static int GetNextProcessID() { return next_process_id_++; }
virtual void SetSlices(int /*slices*/) {}
};
} // namespace internal
......
......@@ -36,6 +36,7 @@ namespace dataflow {
namespace internal {
class Scheduler;
class ClockListener;
template <typename Type>
class Out {
......@@ -70,6 +71,14 @@ class Out {
return targets_.size() > 0;
}
bool HasCycle(ClockListener * node) {
bool result = false;
for (size_t ii = 0; ii < targets_.size() && !result; ii++) {
result = result || targets_[ii]->HasCycle(node);
}
return result;
}
private:
std::vector< InType * > targets_;
};
......
......@@ -42,6 +42,8 @@ template <
typename = embb::base::internal::Nil >
class Outputs;
class ClockListener;
template <>
class Outputs<embb::base::internal::Nil, embb::base::internal::Nil,
embb::base::internal::Nil, embb::base::internal::Nil,
......@@ -53,6 +55,9 @@ class Outputs<embb::base::internal::Nil, embb::base::internal::Nil,
bool IsFullyConnected() {
return true;
}
bool HasCycle(ClockListener * /*node*/) {
return false;
}
};
template <typename T1>
......@@ -65,6 +70,9 @@ class Outputs<T1, embb::base::internal::Nil, embb::base::internal::Nil,
bool IsFullyConnected() {
return this->template Get<0>().IsConnected();
}
bool HasCycle(ClockListener * node) {
return this->template Get<0>().HasCycle(node);
}
};
template <typename T1, typename T2>
......@@ -77,6 +85,10 @@ class Outputs<T1, T2, embb::base::internal::Nil,
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected();
}
bool HasCycle(ClockListener * node) {
return this->template Get<0>().HasCycle(node) ||
this->template Get<1>().HasCycle(node);
}
};
template <typename T1, typename T2, typename T3>
......@@ -85,6 +97,16 @@ class Outputs<T1, T2, T3, embb::base::internal::Nil,
: public Tuple<Out<T1>, Out<T2>, Out<T3>,
embb::base::internal::Nil, embb::base::internal::Nil> {
public:
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected() &&
this->template Get<2>().IsConnected();
}
bool HasCycle(ClockListener * node) {
return this->template Get<0>().HasCycle(node) ||
this->template Get<1>().HasCycle(node) ||
this->template Get<2>().HasCycle(node);
}
};
template <typename T1, typename T2, typename T3, typename T4>
......@@ -92,6 +114,18 @@ class Outputs<T1, T2, T3, T4, embb::base::internal::Nil>
: public Tuple<Out<T1>, Out<T2>, Out<T3>,
Out<T4>, embb::base::internal::Nil>{
public:
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected() &&
this->template Get<2>().IsConnected() &&
this->template Get<3>().IsConnected();
}
bool HasCycle(ClockListener * node) {
return this->template Get<0>().HasCycle(node) ||
this->template Get<1>().HasCycle(node) ||
this->template Get<2>().HasCycle(node) ||
this->template Get<3>().HasCycle(node);
}
};
template <typename T1, typename T2, typename T3, typename T4,
......@@ -100,6 +134,20 @@ class Outputs
: public Tuple<Out<T1>, Out<T2>, Out<T3>,
Out<T4>, Out<T5> > {
public:
bool IsFullyConnected() {
return this->template Get<0>().IsConnected() &&
this->template Get<1>().IsConnected() &&
this->template Get<2>().IsConnected() &&
this->template Get<3>().IsConnected() &&
this->template Get<4>().IsConnected();
}
bool HasCycle(ClockListener * node) {
return this->template Get<0>().HasCycle(node) ||
this->template Get<1>().HasCycle(node) ||
this->template Get<2>().HasCycle(node) ||
this->template Get<3>().HasCycle(node) ||
this->template Get<4>().HasCycle(node);
}
};
} // namespace internal
......
......@@ -53,11 +53,11 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
typedef ProcessExecutor< InputsType, OutputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
Process(int slices, Scheduler * sched, FunctionType function)
: inputs_(slices)
Process(Scheduler * sched, FunctionType function)
: inputs_()
, executor_(function)
, action_(NULL)
, slices_(slices) {
, slices_(0) {
next_clock_ = 0;
queued_clock_ = 0;
bool ordered = Serial;
......@@ -67,12 +67,6 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
queue_id_ = 0;
}
inputs_.SetListener(this);
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
SetScheduler(sched);
}
......@@ -98,6 +92,14 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
}
virtual bool IsSequential() {
return Serial;
}
virtual bool HasCycle() {
return outputs_.HasCycle(this);
}
InputsType & GetInputs() {
return inputs_;
}
......@@ -160,6 +162,15 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
}
}
virtual bool OnHasCycle(ClockListener * node) {
ClockListener * this_node = this;
if (this_node == node) {
return true;
} else {
return outputs_.HasCycle(node);
}
}
private:
InputsType inputs_;
OutputsType outputs_;
......@@ -169,6 +180,23 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
embb::base::Atomic<int> queued_clock_;
int queue_id_;
int slices_;
virtual void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(action_);
action_ = NULL;
}
slices_ = slices;
inputs_.SetSlices(slices);
if (0 < slices_) {
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
}
}
};
} // namespace internal
......
......@@ -40,6 +40,7 @@ class Scheduler {
virtual void Start(Action & action) = 0;
virtual void Enqueue(int process_id, Action & action) = 0;
virtual void WaitForSlice(int slice) = 0;
virtual int GetSlices() = 0;
};
} // namespace internal
......
......@@ -46,6 +46,13 @@ class SchedulerMTAPI : public Scheduler {
: slices_(slices) {
embb::mtapi::Node & node = embb::mtapi::Node::GetInstance();
int tl = std::min(
static_cast<int>(node.GetTaskLimit()),
static_cast<int>(node.GetGroupCount()));
if (tl < slices_) {
slices_ = tl;
}
job_ = node.GetJob(EMBB_DATAFLOW_JOB_ID);
action_ = node.CreateAction(EMBB_DATAFLOW_JOB_ID, SchedulerMTAPI::action_func);
......@@ -103,6 +110,7 @@ class SchedulerMTAPI : public Scheduler {
embb::mtapi::Node & node = embb::mtapi::Node::GetInstance();
group_[slice] = node.CreateGroup();
}
virtual int GetSlices() { return slices_; }
private:
static void action_func(
......
......@@ -45,6 +45,7 @@ class SchedulerSequential : public Scheduler {
action.RunSequential();
}
virtual void WaitForSlice(int /*slice*/) {}
virtual int GetSlices() { return 1; }
};
} // namespace internal
......
......@@ -44,7 +44,7 @@ class Select
typedef Inputs<bool, Type, Type> InputsType;
typedef Outputs<Type> OutputsType;
Select(int slices, Scheduler * sched) : inputs_(slices), slices_(slices) {
explicit Select(Scheduler * sched) : inputs_(), slices_(0) {
inputs_.SetListener(this);
SetScheduler(sched);
}
......@@ -85,6 +85,15 @@ class Select
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
}
virtual bool OnHasCycle(ClockListener * node) {
ClockListener * this_node = this;
if (this_node == node) {
return true;
} else {
return outputs_.HasCycle(node);
}
}
InputsType & GetInputs() {
return inputs_;
}
......@@ -119,6 +128,11 @@ class Select
InputsType inputs_;
OutputsType outputs_;
int slices_;
virtual void SetSlices(int slices) {
slices_ = slices;
inputs_.SetSlices(slices);
}
};
} // namespace internal
......
......@@ -48,23 +48,17 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
typedef SinkExecutor< InputsType > ExecutorType;
typedef typename ExecutorType::FunctionType FunctionType;
Sink(int slices, Scheduler * sched, ClockListener * listener,
Sink(Scheduler * sched, ClockListener * listener,
FunctionType function)
: inputs_(slices)
: inputs_()
, executor_(function)
, action_(NULL)
, slices_(slices) {
, slices_(0) {
next_clock_ = 0;
queued_clock_ = 0;
queue_id_ = GetNextProcessID();
inputs_.SetListener(this);
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
SetListener(listener);
listener_ = listener;
SetScheduler(sched);
}
......@@ -74,10 +68,6 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
}
}
void SetListener(ClockListener * listener) {
listener_ = listener;
}
virtual bool HasInputs() const {
return inputs_.Size() > 0;
}
......@@ -143,6 +133,23 @@ class Sink< Inputs<I1, I2, I3, I4, I5> >
embb::base::Atomic<int> queued_clock_;
int queue_id_;
int slices_;
virtual void SetSlices(int slices) {
if (0 < slices_) {
embb::base::Allocation::Free(action_);
action_ = NULL;
}
slices_ = slices;
inputs_.SetSlices(slices);
if (0 < slices_) {
action_ = reinterpret_cast<Action*>(
embb::base::Allocation::Allocate(
sizeof(Action)*slices_));
for (int ii = 0; ii < slices_; ii++) {
action_[ii] = Action();
}
}
}
};
} // namespace internal
......
......@@ -44,7 +44,7 @@ class Switch
typedef Inputs<bool, Type> InputsType;
typedef Outputs<Type, Type> OutputsType;
Switch(int slices, Scheduler * sched) : inputs_(slices) {
explicit Switch(Scheduler * sched) : inputs_() {
inputs_.SetListener(this);
SetScheduler(sched);
}
......@@ -82,6 +82,15 @@ class Switch
return inputs_.IsFullyConnected() && outputs_.IsFullyConnected();
}
virtual bool OnHasCycle(ClockListener * node) {
ClockListener * this_node = this;
if (this_node == node) {
return true;
} else {
return outputs_.HasCycle(node);
}
}
InputsType & GetInputs() {
return inputs_;
}
......@@ -115,6 +124,10 @@ class Switch
private:
InputsType inputs_;
OutputsType outputs_;
virtual void SetSlices(int slices) {
inputs_.SetSlices(slices);
}
};
} // namespace internal
......
......@@ -58,6 +58,14 @@ class Network {
public:
/**
* Constructs an empty network.
* \note The number of concurrent tokens will automatically be derived from
* the structure of the network on the first call to operator(), and the
* corresponding resources will be allocated then.
*/
Network() {}
/**
* Constructs an empty network.
* \param slices Number of concurrent tokens allowed in the network.
*/
explicit Network(int slices) {}
......@@ -668,11 +676,19 @@ class Network {
/**
* Checks whether the network is completely connected and free of cycles.
* \returns \c true if everything is in order, \c false if not.
* \note Executing an invalid network results in an exception. For this
* reason, it is recommended to first check the network using IsValid().
*/
bool IsValid();
/**
* Executes the network until one of the the sources returns \c false.
* \note If the network was default constructed, the number of concurrent
* tokens will automatically be derived from the structure of the network
* on the first call of the operator, and the corresponding resources will
* be allocated then.
* \note Executing an invalid network results in an exception. For this
* reason, it is recommended to first check the network using IsValid().
*/
void operator () ();
};
......@@ -681,16 +697,14 @@ class Network {
class Network : public internal::ClockListener {
public:
explicit Network(int slices)
: sink_counter_(NULL), slices_(slices), sched_(NULL) {
sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0;
Network()
: sink_counter_(NULL), sink_count_(0), slices_(0), sched_(NULL) {
// empty
}
sink_count_ = 0;
explicit Network(int slices)
: sink_counter_(NULL), sink_count_(0), slices_(slices), sched_(NULL) {
PrepareSlices();
}
~Network() {
......@@ -704,24 +718,22 @@ class Network : public internal::ClockListener {
}
}
template <typename T1, typename T2 = embb::base::internal::Nil,
template <typename T1,
typename T2 = embb::base::internal::Nil,
typename T3 = embb::base::internal::Nil,
typename T4 = embb::base::internal::Nil,
typename T5 = embb::base::internal::Nil>
class Inputs : public internal::Inputs<T1, T2, T3, T4, T5> {
public:
explicit Inputs(int slices)
: internal::Inputs<T1, T2, T3, T4, T5>(slices) {}
class Inputs {
// empty
};
template <typename T1, typename T2 = embb::base::internal::Nil,
template <typename T1,
typename T2 = embb::base::internal::Nil,
typename T3 = embb::base::internal::Nil,
typename T4 = embb::base::internal::Nil,
typename T5 = embb::base::internal::Nil>
class Outputs : public internal::Outputs<T1, T2, T3, T4, T5> {
public:
Outputs()
: internal::Outputs<T1, T2, T3, T4, T5>() {}
class Outputs {
// empty
};
template <class Inputs, class Outputs> class SerialProcess;
......@@ -743,7 +755,7 @@ class Network : public internal::ClockListener {
: internal::Process< true,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >(
network.slices_, network.sched_, function) {
network.sched_, function) {
network.processes_.push_back(this);
}
};
......@@ -767,7 +779,7 @@ class Network : public internal::ClockListener {
: internal::Process< false,
internal::Inputs<I1, I2, I3, I4, I5>,
internal::Outputs<O1, O2, O3, O4, O5> >(
network.slices_, network.sched_, function) {
network.sched_, function) {
network.processes_.push_back(this);
}
};
......@@ -776,7 +788,7 @@ class Network : public internal::ClockListener {
class Switch : public internal::Switch<Type> {
public:
explicit Switch(Network & network)
: internal::Switch<Type>(network.slices_, network.sched_) {
: internal::Switch<Type>(network.sched_) {
network.processes_.push_back(this);
}
};
......@@ -785,7 +797,7 @@ class Network : public internal::ClockListener {
class Select : public internal::Select<Type> {
public:
explicit Select(Network & network)
: internal::Select<Type>(network.slices_, network.sched_) {
: internal::Select<Type>(network.sched_) {
network.processes_.push_back(this);
}
};
......@@ -803,7 +815,7 @@ class Network : public internal::ClockListener {
explicit Sink(Network & network, FunctionType function)
: internal::Sink<
internal::Inputs<I1, I2, I3, I4, I5> >(
network.slices_, network.sched_, &network, function) {
network.sched_, &network, function) {
network.sinks_.push_back(this);
network.sink_count_++;
}
......@@ -838,19 +850,45 @@ class Network : public internal::ClockListener {
bool IsValid() {
bool valid = true;
// check connectivity
for (size_t ii = 0; ii < sources_.size() && valid; ii++) {
valid = valid && sources_[ii]->IsFullyConnected();
}
for (size_t ii = 0; ii < processes_.size() && valid; ii++) {
valid = valid && processes_[ii]->IsFullyConnected();
}
for (size_t ii = 0; ii < sinks_.size() && valid; ii++) {
valid = valid && sinks_[ii]->IsFullyConnected();
}
// check for cycles
for (size_t ii = 0; ii < processes_.size() && valid; ii++) {
valid = valid && !processes_[ii]->HasCycle();
}
return valid;
}
void operator () () {
if (0 >= slices_) {
slices_ = static_cast<int>(
sources_.size() +
sinks_.size());
for (size_t ii = 0; ii < processes_.size(); ii++) {
int tt = processes_[ii]->IsSequential() ? 1 :
static_cast<int>(embb_core_count_available());
slices_ += tt;
}
PrepareSlices();
for (size_t ii = 0; ii < sources_.size(); ii++) {
valid = valid & sources_[ii]->IsFullyConnected();
sources_[ii]->SetScheduler(sched_);
}
for (size_t ii = 0; ii < processes_.size(); ii++) {
valid = valid & processes_[ii]->IsFullyConnected();
processes_[ii]->SetScheduler(sched_);
}
for (size_t ii = 0; ii < sinks_.size(); ii++) {
valid = valid & sinks_[ii]->IsFullyConnected();
sinks_[ii]->SetScheduler(sched_);
}
return valid;
}
void operator () () {
int clock = 0;
while (clock >= 0) {
const int idx = clock % slices_;
......@@ -907,6 +945,19 @@ class Network : public internal::ClockListener {
}
return result;
}
void PrepareSlices() {
sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
if (sched_->GetSlices() != slices_) {
slices_ = sched_->GetSlices();
}
sink_counter_ = reinterpret_cast<embb::base::Atomic<int>*>(
embb::base::Allocation::Allocate(
sizeof(embb::base::Atomic<int>)*slices_));
for (int ii = 0; ii < slices_; ii++) {
sink_counter_[ii] = 0;
}
}
};
#endif // DOXYGEN
......
......@@ -40,6 +40,7 @@
#include <embb_mtapi_action_t.h>
#include <embb_mtapi_alloc.h>
#include <embb_mtapi_queue_t.h>
#include <embb_mtapi_group_t.h>
/* ---- CLASS MEMBERS ------------------------------------------------------ */
......@@ -293,6 +294,8 @@ int embb_mtapi_scheduler_worker(void * arg) {
/* check if there was work */
if (MTAPI_NULL != task) {
embb_mtapi_queue_t * local_queue = MTAPI_NULL;
embb_mtapi_group_t * local_group = MTAPI_NULL;
embb_mtapi_action_t * local_action = MTAPI_NULL;
/* is task associated with a queue? */
if (embb_mtapi_queue_pool_is_handle_valid(
......@@ -302,6 +305,21 @@ int embb_mtapi_scheduler_worker(void * arg) {
node->queue_pool, task->queue);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, task->group)) {
local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, task->group);
}
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, task->action)) {
local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, task->action);
}
switch (embb_atomic_load_int(&task->state)) {
case MTAPI_TASK_SCHEDULED:
/* multi-instance task, another instance might be running */
......@@ -328,7 +346,7 @@ int embb_mtapi_scheduler_worker(void * arg) {
break;
case MTAPI_TASK_CANCELLED:
/* set return value to canceled */
/* set return value to cancelled */
task->error_code = MTAPI_ERR_ACTION_CANCELLED;
if (embb_atomic_fetch_and_add_unsigned_int(
&task->instances_todo, (unsigned int)-1) == 0) {
......@@ -336,6 +354,12 @@ int embb_mtapi_scheduler_worker(void * arg) {
if (MTAPI_NULL != local_queue) {
embb_mtapi_queue_task_finished(local_queue);
}
if (MTAPI_NULL != local_group) {
embb_mtapi_task_queue_push(&local_group->queue, task);
}
}
if (MTAPI_NULL != local_action) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
}
break;
......
......@@ -501,7 +501,6 @@ void mtapi_task_cancel(
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t* local_task =
embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
/* call plugin action cancel function */
if (embb_mtapi_action_pool_is_handle_valid(
......@@ -511,8 +510,14 @@ void mtapi_task_cancel(
node->action_pool, local_task->action);
if (local_action->is_plugin_action) {
local_action->plugin_task_cancel_function(task, &local_status);
} else {
embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
local_task->error_code = MTAPI_ERR_ACTION_CANCELLED;
local_status = MTAPI_SUCCESS;
}
} else {
embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
local_task->error_code = MTAPI_ERR_ACTION_CANCELLED;
local_status = MTAPI_SUCCESS;
}
} else {
......
......@@ -139,6 +139,24 @@ class Node {
}
/**
* Returns the number of available groups.
* \return The number of available groups
* \waitfree
*/
mtapi_uint_t GetGroupCount() const {
return group_count_;
}
/**
* Returns the number of available tasks.
* \return The number of available tasks
* \waitfree
*/
mtapi_uint_t GetTaskLimit() const {
return task_limit_;
}
/**
* Starts a new Task.
*
* \returns The handle to the started Task.
......@@ -349,6 +367,8 @@ class Node {
mtapi_status_t status;
mtapi_info_t info;
queue_count_ = attr.GetInternal().max_queues;
group_count_ = attr.GetInternal().max_groups;
task_limit_ = attr.GetInternal().max_tasks;
mtapi_initialize(domain_id, node_id, &attr.GetInternal(), &info, &status);
internal::CheckStatus(status);
......@@ -383,6 +403,8 @@ class Node {
mtapi_uint_t core_count_;
mtapi_uint_t worker_thread_count_;
mtapi_uint_t queue_count_;
mtapi_uint_t group_count_;
mtapi_uint_t task_limit_;
};
} // namespace mtapi
......
......@@ -71,8 +71,10 @@ void embb_mtapi_network_finalize() {
}
enum embb_mtapi_network_operation_enum {
EMBB_MTAPI_NETWORK_START_TASK,
EMBB_MTAPI_NETWORK_RETURN_RESULT
EMBB_MTAPI_NETWORK_START_TASK = 0x01AFFE01,
EMBB_MTAPI_NETWORK_RETURN_RESULT = 0x02AFFE02,
EMBB_MTAPI_NETWORK_RETURN_FAILURE = 0x03AFFE03,
EMBB_MTAPI_NETWORK_CANCEL_TASK = 0x04AFFE04
};
struct embb_mtapi_network_plugin_struct {
......@@ -84,6 +86,8 @@ struct embb_mtapi_network_plugin_struct {
embb_mutex_t send_mutex;
embb_mtapi_network_buffer_t send_buffer;
embb_mtapi_network_buffer_t recv_buffer;
};
typedef struct embb_mtapi_network_plugin_struct embb_mtapi_network_plugin_t;
......@@ -112,12 +116,40 @@ struct embb_mtapi_network_task_struct {
typedef struct embb_mtapi_network_task_struct embb_mtapi_network_task_t;
static void embb_mtapi_network_return_failure(
int32_t remote_task_id,
int32_t remote_task_tag,
mtapi_status_t status,
embb_mtapi_network_socket_t * socket,
embb_mtapi_network_buffer_t * buffer) {
embb_mtapi_network_buffer_clear(buffer);
// packet size
embb_mtapi_network_buffer_push_back_int32(
buffer, 16);
// operation
embb_mtapi_network_buffer_push_back_int32(
buffer, EMBB_MTAPI_NETWORK_RETURN_FAILURE);
// task handle
embb_mtapi_network_buffer_push_back_int32(
buffer, remote_task_id);
embb_mtapi_network_buffer_push_back_int32(
buffer, remote_task_tag);
// status
embb_mtapi_network_buffer_push_back_int32(
buffer, (int32_t)status);
embb_mtapi_network_socket_sendbuffer(
socket, buffer);
}
static void embb_mtapi_network_task_complete(
MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
int err;
EMBB_UNUSED_IN_RELEASE(err);
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
......@@ -138,37 +170,69 @@ static void embb_mtapi_network_task_complete(
(embb_mtapi_network_task_t*)local_task->attributes.user_data;
embb_mtapi_network_buffer_t * send_buf = &plugin->send_buffer;
embb_atomic_memory_barrier();
local_task->attributes.complete_func = NULL;
embb_atomic_memory_barrier();
// serialize sending of results
embb_mutex_lock(&plugin->send_mutex);
embb_mtapi_network_buffer_clear(send_buf);
if (local_task->error_code == MTAPI_SUCCESS) {
// actual counts bytes actually put into the buffer
int actual = 0;
// expected counts bytes we intended to put into the buffer
int expected =
4 + // operation
4 + 4 + // remote task handle
4 + // status
4 + (int)local_task->result_size; // result buffer
// packet size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, expected);
expected += 4;
// operation is "return result"
err = embb_mtapi_network_buffer_push_back_int8(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, EMBB_MTAPI_NETWORK_RETURN_RESULT);
assert(err == 1);
// remote task id
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, network_task->remote_task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, network_task->remote_task_tag);
assert(err == 4);
// status
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, local_task->error_code);
assert(err == 4);
// result size
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->result_size);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_rawdata(
actual += embb_mtapi_network_buffer_push_back_rawdata(
send_buf, (int32_t)local_task->result_size,
local_task->result_buffer);
assert(err == (int)local_task->result_size);
err = embb_mtapi_network_socket_sendbuffer(
if (expected == actual) {
int sent = embb_mtapi_network_socket_sendbuffer(
&network_task->socket, send_buf);
assert(sent == send_buf->size);
EMBB_UNUSED_IN_RELEASE(sent);
} else {
embb_mtapi_network_return_failure(
network_task->remote_task_id,
network_task->remote_task_tag,
MTAPI_ERR_UNKNOWN,
&network_task->socket, send_buf);
}
} else {
embb_mtapi_network_return_failure(
network_task->remote_task_id,
network_task->remote_task_tag,
local_task->error_code,
&network_task->socket, send_buf);
assert(err == send_buf->size);
}
// sending done
embb_mutex_unlock(&plugin->send_mutex);
......@@ -177,6 +241,14 @@ static void embb_mtapi_network_task_complete(
embb_free((void*)local_task->arguments);
embb_free(local_task->result_buffer);
void * data = local_task->attributes.user_data;
embb_atomic_memory_barrier();
local_task->attributes.user_data = NULL;
embb_atomic_memory_barrier();
embb_free(data);
local_status = MTAPI_SUCCESS;
}
}
......@@ -185,108 +257,92 @@ static void embb_mtapi_network_task_complete(
mtapi_status_set(status, local_status);
}
static int embb_mtapi_network_thread(void * args) {
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
embb_mtapi_network_buffer_t buffer;
int err;
EMBB_UNUSED(args);
embb_mtapi_network_buffer_initialize(&buffer, (int)plugin->buffer_size);
while (embb_atomic_load_int(&plugin->run)) {
err = embb_mtapi_network_socket_select(
plugin->sockets, plugin->socket_count, 100);
if (0 == err) {
// listening socket, accept connection
embb_mtapi_network_socket_t accept_socket;
err = embb_mtapi_network_socket_accept(
&plugin->sockets[0], &accept_socket);
if (0 < err) {
// add socket to socket list
plugin->sockets[plugin->socket_count] = accept_socket;
plugin->socket_count++;
}
} else if (0 < err) {
static mtapi_status_t embb_mtapi_network_handle_start_task(
embb_mtapi_network_socket_t * socket,
embb_mtapi_network_buffer_t * buffer,
int packet_size) {
int32_t domain_id;
int32_t job_id;
int32_t results_size;
void * results;
int8_t operation;
embb_mtapi_network_socket_t * socket = &plugin->sockets[err];
embb_mtapi_network_buffer_clear(&buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 1);
if (err == 0) {
// there was some socket error, ignore
continue;
}
assert(err == 1);
err = embb_mtapi_network_buffer_pop_front_int8(
&buffer, &operation);
assert(err == 1);
embb_mtapi_network_buffer_clear(&buffer);
int err;
EMBB_UNUSED_IN_RELEASE(err);
if (operation == EMBB_MTAPI_NETWORK_START_TASK) {
int32_t arguments_size;
int32_t remote_task_id;
int32_t remote_task_tag;
mtapi_uint_t priority = 0;
mtapi_job_hndl_t job_hndl;
mtapi_task_attributes_t task_attr;
void * arguments;
mtapi_task_complete_function_t func = embb_mtapi_network_task_complete;
void * func_void;
embb_mtapi_network_task_t * network_task =
(embb_mtapi_network_task_t*)embb_alloc(
sizeof(embb_mtapi_network_task_t));
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 28);
assert(err == 28);
// check if we have at least 28 bytes
if (packet_size >= 28) {
// domain id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &domain_id);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &domain_id);
assert(err == 4);
// job id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &job_id);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &job_id);
assert(err == 4);
// priority
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, (int32_t*)&priority);
buffer, (int32_t*)&priority);
assert(err == 4);
// remote task handle
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &network_task->remote_task_id);
buffer, &remote_task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &network_task->remote_task_tag);
buffer, &remote_task_tag);
assert(err == 4);
// result size
err = embb_mtapi_network_buffer_pop_front_int32(&buffer,
err = embb_mtapi_network_buffer_pop_front_int32(buffer,
&results_size);
assert(err == 4);
results = embb_alloc((size_t)results_size);
assert(results != NULL);
// arguments size
embb_mtapi_network_buffer_pop_front_int32(&buffer, &arguments_size);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &arguments_size);
assert(err == 4);
arguments = embb_alloc((size_t)arguments_size);
assert(arguments != NULL);
embb_mtapi_network_buffer_clear(&buffer);
embb_mtapi_network_task_t * network_task =
(embb_mtapi_network_task_t*)embb_alloc(
sizeof(embb_mtapi_network_task_t));
if (network_task == NULL) {
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN,
socket, buffer);
return MTAPI_ERR_UNKNOWN;
}
network_task->remote_task_id = remote_task_id;
network_task->remote_task_tag = remote_task_tag;
// check packet_size again
if (packet_size == 28 + arguments_size) {
// allocate buffers
results = embb_alloc((size_t)results_size);
if (results == NULL) {
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN,
socket, buffer);
return MTAPI_ERR_UNKNOWN;
}
arguments = embb_alloc((size_t)arguments_size);
if (arguments == NULL) {
embb_free(results);
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, MTAPI_ERR_UNKNOWN,
socket, buffer);
return MTAPI_ERR_UNKNOWN;
}
// arguments
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, arguments_size);
assert(err == arguments_size);
err = embb_mtapi_network_buffer_pop_front_rawdata(
&buffer, arguments_size, arguments);
buffer, arguments_size, arguments);
assert(err == arguments_size);
embb_mtapi_network_buffer_clear(&buffer);
network_task->socket = *socket;
mtapi_taskattr_init(&task_attr, &local_status);
assert(local_status == MTAPI_SUCCESS);
......@@ -306,56 +362,60 @@ static int embb_mtapi_network_thread(void * args) {
assert(local_status == MTAPI_SUCCESS);
job_hndl = mtapi_job_get((mtapi_job_id_t)job_id,
(mtapi_domain_t)domain_id, &local_status);
assert(local_status == MTAPI_SUCCESS);
if (local_status == MTAPI_SUCCESS) {
mtapi_task_start(
MTAPI_TASK_ID_NONE, job_hndl,
arguments, (mtapi_size_t)arguments_size,
results, (mtapi_size_t)results_size,
&task_attr, MTAPI_GROUP_NONE,
&local_status);
assert(local_status == MTAPI_SUCCESS);
}
if (local_status != MTAPI_SUCCESS) {
embb_free(arguments);
embb_free(results);
embb_mtapi_network_return_failure(
remote_task_id, remote_task_tag, local_status, socket, buffer);
}
}
}
// send back result of task creation
//embb_mtapi_network_buffer_push_back_int32(
// &buffer, local_status);
//embb_mtapi_network_socket_sendbuffer(
// socket, &buffer);
return local_status;
}
embb_mtapi_network_buffer_clear(&buffer);
} else if (operation == EMBB_MTAPI_NETWORK_RETURN_RESULT) {
int task_status;
int task_id;
int task_tag;
static mtapi_status_t embb_mtapi_network_handle_return_result(
embb_mtapi_network_buffer_t * buffer,
int packet_size) {
int32_t task_status;
int32_t task_id;
int32_t task_tag;
embb_mtapi_network_buffer_clear(&buffer);
int32_t results_size;
int err;
EMBB_UNUSED_IN_RELEASE(err);
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_task_hndl_t task;
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, 16);
assert(err == 16);
// do we have at least 16 bytes?
if (packet_size >= 16) {
// local task id
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_id);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(&buffer, &task_tag);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_tag);
assert(err == 4);
// task status
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &task_status);
buffer, &task_status);
assert(err == 4);
// result size
err = embb_mtapi_network_buffer_pop_front_int32(
&buffer, &results_size);
buffer, &results_size);
assert(err == 4);
embb_mtapi_network_buffer_clear(&buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, &buffer, results_size);
assert(err == results_size);
// check packet_size again
if (packet_size == 16 + results_size) {
task.id = (mtapi_task_id_t)task_id;
task.tag = (mtapi_uint_t)task_tag;
......@@ -375,7 +435,7 @@ static int embb_mtapi_network_thread(void * args) {
(embb_mtapi_network_action_t*)local_action->plugin_data;*/
err = embb_mtapi_network_buffer_pop_front_rawdata(
&buffer, results_size, local_task->result_buffer);
buffer, results_size, local_task->result_buffer);
assert(err == results_size);
local_task->error_code = (mtapi_status_t)task_status;
......@@ -390,14 +450,192 @@ static int embb_mtapi_network_thread(void * args) {
node->group_pool, local_task->group);
embb_mtapi_task_queue_push(&local_group->queue, local_task);
}
local_status = MTAPI_SUCCESS;
}
}
}
}
}
return local_status;
}
static mtapi_status_t embb_mtapi_network_handle_return_failure(
embb_mtapi_network_buffer_t * buffer,
int packet_size) {
int32_t task_status;
int32_t task_id;
int32_t task_tag;
int err;
EMBB_UNUSED_IN_RELEASE(err);
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_task_hndl_t task;
// do we have 12 bytes?
if (packet_size == 12) {
// local task id
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &task_tag);
assert(err == 4);
// task status
err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &task_status);
assert(err == 4);
task.id = (mtapi_task_id_t)task_id;
task.tag = (mtapi_uint_t)task_tag;
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t * local_task =
embb_mtapi_task_pool_get_storage_for_handle(
node->task_pool, task);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, local_task->action)) {
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, local_task->action);
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
local_task->error_code = (mtapi_status_t)task_status;
if (MTAPI_ERR_ACTION_CANCELLED == task_status) {
embb_atomic_store_int(&local_task->state, MTAPI_TASK_CANCELLED);
} else {
embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR);
}
/* is task associated with a group? */
if (embb_mtapi_group_pool_is_handle_valid(
node->group_pool, local_task->group)) {
embb_mtapi_group_t* local_group =
embb_mtapi_group_pool_get_storage_for_handle(
node->group_pool, local_task->group);
embb_mtapi_task_queue_push(&local_group->queue, local_task);
}
local_status = MTAPI_SUCCESS;
}
}
}
}
return local_status;
}
static mtapi_status_t embb_mtapi_network_handle_cancel_task(
embb_mtapi_network_buffer_t * buffer,
int packet_size) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
int32_t remote_task_id;
int32_t remote_task_tag;
int err;
EMBB_UNUSED_IN_RELEASE(err);
// do we have 8 bytes?
if (packet_size == 8) {
// get task handle
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &remote_task_id);
assert(err == 4);
err = embb_mtapi_network_buffer_pop_front_int32(buffer, &remote_task_tag);
assert(err == 4);
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
// search for task to cancel
for (mtapi_uint_t ii = 0; ii < node->attributes.max_tasks; ii++) {
embb_mtapi_task_t * task = &node->task_pool->storage[ii];
// is this our task?
if (embb_mtapi_network_task_complete ==
task->attributes.complete_func) {
embb_mtapi_network_task_t * network_task =
(embb_mtapi_network_task_t*)task->attributes.user_data;
// is this task the one matching the given remote task?
if (remote_task_id == network_task->remote_task_id &&
remote_task_tag == network_task->remote_task_tag) {
mtapi_task_cancel(task->handle, &local_status);
break;
}
}
}
}
}
return local_status;
}
static int embb_mtapi_network_thread(void * args) {
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
embb_mtapi_network_buffer_t * buffer = &plugin->recv_buffer;
int err;
EMBB_UNUSED(args);
while (embb_atomic_load_int(&plugin->run)) {
err = embb_mtapi_network_socket_select(
plugin->sockets, plugin->socket_count, 100);
if (0 == err) {
// listening socket, accept connection
embb_mtapi_network_socket_t accept_socket;
err = embb_mtapi_network_socket_accept(
&plugin->sockets[0], &accept_socket);
if (0 < err) {
// add socket to socket list
plugin->sockets[plugin->socket_count] = accept_socket;
plugin->socket_count++;
}
} else if (0 < err) {
int32_t operation;
int32_t packet_size;
embb_mtapi_network_socket_t * socket = &plugin->sockets[err];
embb_mtapi_network_buffer_clear(buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, buffer, 4);
if (err == 4) {
err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &packet_size);
assert(err == 4);
embb_mtapi_network_buffer_clear(buffer);
err = embb_mtapi_network_socket_recvbuffer_sized(
socket, buffer, packet_size);
if (err == packet_size) {
err = embb_mtapi_network_buffer_pop_front_int32(
buffer, &operation);
assert(err == 4);
packet_size -= 4;
switch (operation) {
case EMBB_MTAPI_NETWORK_START_TASK:
embb_mtapi_network_handle_start_task(socket, buffer, packet_size);
break;
case EMBB_MTAPI_NETWORK_RETURN_RESULT:
embb_mtapi_network_handle_return_result(buffer, packet_size);
break;
case EMBB_MTAPI_NETWORK_RETURN_FAILURE:
embb_mtapi_network_handle_return_failure(buffer, packet_size);
break;
case EMBB_MTAPI_NETWORK_CANCEL_TASK:
embb_mtapi_network_handle_cancel_task(buffer, packet_size);
break;
default:
// invalid, ignore
break;
}
}
}
embb_mtapi_network_buffer_finalize(&buffer);
embb_mtapi_network_buffer_clear(buffer);
}
}
return EMBB_SUCCESS;
}
......@@ -408,42 +646,106 @@ void mtapi_network_plugin_initialize(
MTAPI_IN mtapi_uint16_t max_connections,
MTAPI_IN mtapi_size_t buffer_size,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
embb_mtapi_network_plugin_t * plugin = &embb_mtapi_network_plugin;
int err;
mtapi_status_set(status, MTAPI_ERR_UNKNOWN);
plugin->socket_count = 0;
plugin->buffer_size = 0;
plugin->sockets = NULL;
embb_atomic_store_int(&plugin->run, 0);
err = embb_mtapi_network_initialize();
if (err) {
embb_atomic_store_int(&plugin->run, 1);
if (0 == err) return;
err = embb_mtapi_network_buffer_initialize(
&plugin->recv_buffer, (int)buffer_size);
if (0 == err) {
embb_mtapi_network_finalize();
return;
}
err = embb_mtapi_network_buffer_initialize(
&plugin->send_buffer, (int)buffer_size);
if (0 == err) {
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
embb_mtapi_network_finalize();
return;
}
plugin->buffer_size = buffer_size;
plugin->socket_count = 1;
// 1 listening socket and max_connections connections
// (2 sockets each if local)
plugin->sockets = (embb_mtapi_network_socket_t*)embb_alloc(
sizeof(embb_mtapi_network_socket_t) * (1 + max_connections * 2));
if (NULL == plugin->sockets) {
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
embb_mtapi_network_buffer_initialize(
&plugin->send_buffer, (int)plugin->buffer_size);
embb_mutex_init(&plugin->send_mutex, 0);
err = embb_mutex_init(&plugin->send_mutex, 0);
if (EMBB_SUCCESS != err) {
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
if (NULL != plugin->sockets) {
err = embb_mtapi_network_socket_initialize(&plugin->sockets[0]);
if (err) {
if (0 == err) {
embb_mutex_destroy(&plugin->send_mutex);
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
plugin->socket_count = 1;
err = embb_mtapi_network_socket_bind_and_listen(
&plugin->sockets[0], host, port, max_connections);
if (err) {
if (0 == err) {
embb_mtapi_network_socket_finalize(&plugin->sockets[0]);
plugin->socket_count = 0;
embb_mutex_destroy(&plugin->send_mutex);
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
embb_atomic_store_int(&plugin->run, 1);
err = embb_thread_create(
&plugin->thread, NULL, embb_mtapi_network_thread, NULL);
if (EMBB_SUCCESS == err) {
local_status = MTAPI_SUCCESS;
}
}
}
}
if (EMBB_SUCCESS != err) {
embb_atomic_store_int(&plugin->run, 0);
embb_mtapi_network_socket_finalize(&plugin->sockets[0]);
plugin->socket_count = 0;
embb_mutex_destroy(&plugin->send_mutex);
embb_free(plugin->sockets);
plugin->sockets = NULL;
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
plugin->buffer_size = 0;
embb_mtapi_network_finalize();
return;
}
mtapi_status_set(status, local_status);
mtapi_status_set(status, MTAPI_SUCCESS);
}
void mtapi_network_plugin_finalize(
......@@ -458,6 +760,8 @@ void mtapi_network_plugin_finalize(
embb_mutex_destroy(&plugin->send_mutex);
embb_mtapi_network_buffer_finalize(&plugin->send_buffer);
embb_mtapi_network_buffer_finalize(&plugin->recv_buffer);
embb_mtapi_network_socket_finalize(&plugin->sockets[0]);
embb_free(plugin->sockets);
embb_mtapi_network_finalize();
......@@ -468,9 +772,8 @@ void mtapi_network_plugin_finalize(
static void network_task_start(
MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
int err;
EMBB_UNUSED_IN_RELEASE(err);
// assume failure
mtapi_status_set(status, MTAPI_ERR_UNKNOWN);
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
......@@ -491,69 +794,151 @@ static void network_task_start(
// serialize sending
embb_mutex_lock(&network_action->send_mutex);
embb_mtapi_network_buffer_clear(send_buf);
// actual counts bytes actually put into the buffer
int actual = 0;
// expected counts bytes we intended to put into the buffer
int expected =
4 + // operation
4 + // domain_id
4 + // job_id
4 + // priority
4 + 4 + // task handle
4 + // result_size
4 + local_task->arguments_size; // arguments buffer
// packet size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)expected);
expected += 4;
// operation is "start task"
err = embb_mtapi_network_buffer_push_back_int8(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, EMBB_MTAPI_NETWORK_START_TASK);
assert(err == 1);
err = embb_mtapi_network_buffer_push_back_int32(
// domain_id
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)network_action->domain_id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
// job_id
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)network_action->job_id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
// priority
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->attributes.priority);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
// task handle
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.id);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.tag);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
// result size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->result_size);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_int32(
// arguments buffer
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->arguments_size);
assert(err == 4);
err = embb_mtapi_network_buffer_push_back_rawdata(
actual += embb_mtapi_network_buffer_push_back_rawdata(
send_buf, (int32_t)local_task->arguments_size, local_task->arguments);
assert(err == (int)local_task->arguments_size);
err = embb_mtapi_network_socket_sendbuffer(
&network_action->socket, send_buf);
assert(err == send_buf->size);
// check if everything fit into the buffer
if (actual == expected) {
embb_atomic_fetch_and_add_int(&local_action->num_tasks, 1);
embb_atomic_store_int(&local_task->state, MTAPI_TASK_RUNNING);
int sent = embb_mtapi_network_socket_sendbuffer(
&network_action->socket, send_buf);
// was everything sent?
if (sent == send_buf->size) {
// we've done it, success!
mtapi_status_set(status, MTAPI_SUCCESS);
} else {
// could not send the whole task, this will fail on the remote side,
// so we can safely assume that the task is in error
embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR);
}
}
embb_mtapi_network_buffer_clear(send_buf);
embb_mutex_unlock(&network_action->send_mutex);
local_status = MTAPI_SUCCESS;
}
}
}
mtapi_status_set(status, local_status);
}
static void network_task_cancel(
MTAPI_IN mtapi_task_hndl_t task,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
// assume failure
mtapi_status_set(status, MTAPI_ERR_UNKNOWN);
EMBB_UNUSED(task);
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
mtapi_status_set(status, local_status);
if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
embb_mtapi_task_t * local_task =
embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
if (embb_mtapi_action_pool_is_handle_valid(
node->action_pool, local_task->action)) {
embb_mtapi_action_t * local_action =
embb_mtapi_action_pool_get_storage_for_handle(
node->action_pool, local_task->action);
embb_mtapi_network_action_t * network_action =
(embb_mtapi_network_action_t*)local_action->plugin_data;
embb_mtapi_network_buffer_t * send_buf = &network_action->send_buffer;
// serialize sending
embb_mutex_lock(&network_action->send_mutex);
embb_mtapi_network_buffer_clear(send_buf);
// actual counts bytes actually put into the buffer
int actual = 0;
// expected counts bytes we intended to put into the buffer
int expected =
4 + // operation
4 + 4; // task handle
// packet size
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)expected);
expected += 4;
// operation is "cancel task"
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, EMBB_MTAPI_NETWORK_CANCEL_TASK);
// task handle
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.id);
actual += embb_mtapi_network_buffer_push_back_int32(
send_buf, (int32_t)local_task->handle.tag);
// check if everything fit into the buffer
if (actual == expected) {
int sent = embb_mtapi_network_socket_sendbuffer(
&network_action->socket, send_buf);
// was everything sent?
if (sent == send_buf->size) {
// we've done it, success!
mtapi_status_set(status, MTAPI_SUCCESS);
} else {
embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR);
}
} else {
embb_atomic_store_int(&local_task->state, MTAPI_TASK_ERROR);
}
embb_mtapi_network_buffer_clear(send_buf);
embb_mutex_unlock(&network_action->send_mutex);
}
}
}
}
static void network_action_finalize(
......@@ -602,15 +987,15 @@ mtapi_action_hndl_t mtapi_network_action_create(
action->domain_id = domain_id;
action->job_id = remote_job_id;
embb_mtapi_network_buffer_initialize(
err = embb_mtapi_network_buffer_initialize(
&action->send_buffer, (int)plugin->buffer_size);
embb_mutex_init(&action->send_mutex, 0);
if (0 != err) {
err = embb_mutex_init(&action->send_mutex, 0);
if (EMBB_SUCCESS == err) {
action->host = host;
action->port = port;
embb_mtapi_network_socket_initialize(&action->socket);
err = embb_mtapi_network_socket_connect(&action->socket, host, port);
if (0 != err) {
// store socket for select
plugin->sockets[plugin->socket_count] = action->socket;
......@@ -632,6 +1017,8 @@ mtapi_action_hndl_t mtapi_network_action_create(
embb_free(action);
}
}
}
}
mtapi_status_set(status, local_status);
return action_hndl;
......
......@@ -28,9 +28,10 @@
#include <embb/base/c/memory_allocation.h>
#include <string.h>
void embb_mtapi_network_buffer_initialize(
int embb_mtapi_network_buffer_initialize(
embb_mtapi_network_buffer_t * that,
int capacity) {
int result = 1;
that->position = 0;
that->size = 0;
that->data = (char*)embb_alloc((size_t)capacity);
......@@ -38,7 +39,9 @@ void embb_mtapi_network_buffer_initialize(
that->capacity = capacity;
} else {
that->capacity = 0;
result = 0;
}
return result;
}
void embb_mtapi_network_buffer_finalize(
......
......@@ -43,7 +43,7 @@ struct embb_mtapi_network_buffer_struct {
typedef struct embb_mtapi_network_buffer_struct embb_mtapi_network_buffer_t;
void embb_mtapi_network_buffer_initialize(
int embb_mtapi_network_buffer_initialize(
embb_mtapi_network_buffer_t * that,
int capacity
);
......
......@@ -116,7 +116,8 @@ int embb_mtapi_network_socket_connect(
if (SOCKET_ERROR == connect(that->handle, (struct sockaddr *)&addr,
sizeof(addr))) {
#ifdef _WIN32
if (WSAEWOULDBLOCK != WSAGetLastError())
int err = WSAGetLastError();
if (WSAEWOULDBLOCK != err)
#else
if (EAGAIN != errno)
#endif
......
......@@ -61,13 +61,52 @@ static void test(
}
}
static void cancel_test(
void const * /*arguments*/,
mtapi_size_t /*arguments_size*/,
void * /*result_buffer*/,
mtapi_size_t /*result_buffer_size*/,
void const * /*node_local_data*/,
mtapi_size_t /*node_local_data_size*/,
mtapi_task_context_t * context) {
mtapi_status_t status;
while (true) {
mtapi_task_state_t state = mtapi_context_taskstate_get(context, &status);
if (status != MTAPI_SUCCESS) {
break;
} else {
if (state == MTAPI_TASK_CANCELLED) {
break;
}
}
}
}
NetworkTaskTest::NetworkTaskTest() {
CreateUnit("mtapi network task test").Add(&NetworkTaskTest::TestBasic, this);
CreateUnit("mtapi network task test")
.Add(&NetworkTaskTest::TestBasic, this);
}
void NetworkTaskTest::TestBasic() {
mtapi_status_t status;
mtapi_initialize(
NETWORK_DOMAIN,
NETWORK_LOCAL_NODE,
MTAPI_NULL,
MTAPI_NULL,
&status);
MTAPI_CHECK_STATUS(status);
TestSimple();
TestCancel();
mtapi_finalize(&status);
MTAPI_CHECK_STATUS(status);
}
void NetworkTaskTest::TestSimple() {
mtapi_status_t status;
mtapi_job_hndl_t job;
mtapi_task_hndl_t task;
mtapi_action_hndl_t network_action, local_action;
......@@ -81,14 +120,6 @@ void NetworkTaskTest::TestBasic() {
arguments[ii + kElements] = static_cast<float>(ii);
}
mtapi_initialize(
NETWORK_DOMAIN,
NETWORK_LOCAL_NODE,
MTAPI_NULL,
MTAPI_NULL,
&status);
MTAPI_CHECK_STATUS(status);
mtapi_network_plugin_initialize("127.0.0.1", 12345, 5,
kElements * 4 * 3 + 32, &status);
MTAPI_CHECK_STATUS(status);
......@@ -139,7 +170,68 @@ void NetworkTaskTest::TestBasic() {
mtapi_network_plugin_finalize(&status);
MTAPI_CHECK_STATUS(status);
}
void NetworkTaskTest::TestCancel() {
mtapi_status_t status;
mtapi_job_hndl_t job;
mtapi_task_hndl_t task;
mtapi_action_hndl_t network_action, local_action;
float argument = 1.0f;
float result;
mtapi_network_plugin_initialize("127.0.0.1", 12345, 5,
4 * 3 + 32, &status);
MTAPI_CHECK_STATUS(status);
mtapi_finalize(&status);
float node_remote = 1.0f;
local_action = mtapi_action_create(
NETWORK_REMOTE_JOB,
cancel_test,
&node_remote, sizeof(float),
MTAPI_DEFAULT_ACTION_ATTRIBUTES,
&status);
MTAPI_CHECK_STATUS(status);
network_action = mtapi_network_action_create(
NETWORK_DOMAIN,
NETWORK_LOCAL_JOB,
NETWORK_REMOTE_JOB,
"127.0.0.1", 12345,
&status);
MTAPI_CHECK_STATUS(status);
status = MTAPI_ERR_UNKNOWN;
job = mtapi_job_get(NETWORK_LOCAL_JOB, NETWORK_DOMAIN, &status);
MTAPI_CHECK_STATUS(status);
task = mtapi_task_start(
MTAPI_TASK_ID_NONE,
job,
&argument, sizeof(float),
&result, sizeof(float),
MTAPI_DEFAULT_TASK_ATTRIBUTES,
MTAPI_GROUP_NONE,
&status);
MTAPI_CHECK_STATUS(status);
mtapi_task_wait(task, 1, &status);
PT_ASSERT_EQ(status, MTAPI_TIMEOUT);
mtapi_task_cancel(task, &status);
MTAPI_CHECK_STATUS(status);
mtapi_task_wait(task, MTAPI_INFINITE, &status);
PT_ASSERT_NE(status, MTAPI_TIMEOUT);
PT_ASSERT_EQ(status, MTAPI_ERR_ACTION_CANCELLED);
mtapi_action_delete(network_action, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
mtapi_action_delete(local_action, MTAPI_INFINITE, &status);
MTAPI_CHECK_STATUS(status);
mtapi_network_plugin_finalize(&status);
MTAPI_CHECK_STATUS(status);
}
......@@ -35,6 +35,9 @@ class NetworkTaskTest : public partest::TestCase {
private:
void TestBasic();
void TestSimple();
void TestCancel();
};
#endif // MTAPI_PLUGINS_C_MTAPI_NETWORK_C_TEST_EMBB_MTAPI_NETWORK_TEST_TASK_H_
......@@ -130,6 +130,24 @@ class Node {
}
/**
* Returns the number of available groups.
* \return The number of available groups
* \waitfree
*/
mtapi_uint_t GetGroupCount() const {
return group_count_;
}
/**
* Returns the number of available tasks.
* \return The number of available tasks
* \waitfree
*/
mtapi_uint_t GetTaskLimit() const {
return task_limit_;
}
/**
* Returns the number of available cores.
* \return The number of available cores
* \waitfree
......@@ -229,6 +247,8 @@ class Node {
mtapi_task_context_t * context);
mtapi_uint_t queue_count_;
mtapi_uint_t group_count_;
mtapi_uint_t task_limit_;
mtapi_uint_t core_count_;
mtapi_uint_t worker_thread_count_;
mtapi_action_hndl_t action_handle_;
......
......@@ -78,6 +78,12 @@ Node::Node(
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_QUEUES, &queue_count_,
sizeof(queue_count_), &status);
assert(MTAPI_SUCCESS == status);
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_GROUPS, &group_count_,
sizeof(group_count_), &status);
assert(MTAPI_SUCCESS == status);
mtapi_node_get_attribute(node_id, MTAPI_NODE_MAX_TASKS, &task_limit_,
sizeof(queue_count_), &status);
assert(MTAPI_SUCCESS == status);
core_count_ = info.hardware_concurrency;
worker_thread_count_ = embb_core_set_count(&attr->core_affinity);
action_handle_ = mtapi_action_create(TASKS_CPP_JOB, action_func,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment