Commit 29acd08a by Marcus Winter

dataflow_cpp: removed spinlocks

parent 36054ae2
......@@ -26,7 +26,7 @@ target_link_libraries(embb_dataflow_cpp embb_mtapi_cpp embb_base_cpp embb_mtapi_
if (BUILD_TESTS STREQUAL ON)
include_directories(${CMAKE_CURRENT_BINARY_DIR}/../partest/include)
add_executable (embb_dataflow_cpp_test ${EMBB_DATAFLOW_CPP_TEST_SOURCES})
target_link_libraries(embb_dataflow_cpp_test embb_mtapi_cpp embb_mtapi_c partest
target_link_libraries(embb_dataflow_cpp_test embb_dataflow_cpp embb_mtapi_cpp embb_mtapi_c partest
embb_base_cpp embb_base_c ${compiler_libs})
CopyBin(BIN embb_dataflow_cpp_test DEST ${local_install_dir})
endif()
......
......@@ -27,6 +27,7 @@
#ifndef EMBB_DATAFLOW_INTERNAL_INPUTS_H_
#define EMBB_DATAFLOW_INTERNAL_INPUTS_H_
#include <embb/base/atomic.h>
#include <embb/dataflow/internal/tuple.h>
#include <embb/dataflow/internal/in.h>
......
......@@ -28,7 +28,7 @@
#define EMBB_DATAFLOW_INTERNAL_NODE_H_
#include <cstddef>
#include <embb/base/exceptions.h>
#include <embb/dataflow/internal/scheduler.h>
namespace embb {
......@@ -50,6 +50,9 @@ class Node {
protected:
Scheduler * sched_;
static int next_process_id_;
static int GetNextProcessID() { return next_process_id_++; }
};
} // namespace internal
......
......@@ -56,6 +56,13 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
explicit Process(FunctionType function)
: executor_(function) {
next_clock_ = 0;
queued_clock_ = 0;
bool ordered = Serial;
if (ordered) {
queue_id_ = GetNextProcessID();
} else {
queue_id_ = 0;
}
inputs_.SetListener(this);
}
......@@ -95,21 +102,39 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
}
virtual void OnClock(int clock) {
if (!inputs_.AreAtClock(clock))
if (!inputs_.AreAtClock(clock)) {
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
}
bool ordered = Serial;
if (ordered) {
lock_.Lock();
for (int ii = next_clock_; ii < next_clock_ + Slices; ii++) {
bool retry = true;
while (retry) {
int clk = next_clock_;
int clk_end = clk + Slices;
int clk_res = clk;
for (int ii = clk; ii < clk_end; ii++) {
if (!inputs_.AreAtClock(ii)) {
break;
}
next_clock_ = ii + 1;
Run(ii);
clk_res++;
}
if (clk_res > clk) {
if (next_clock_.CompareAndSwap(clk, clk_res)) {
while (queued_clock_.Load() < clk);
for (int ii = clk; ii < clk_res; ii++) {
const int idx = ii % Slices;
action_[idx] = Action(this, ii);
sched_->Enqueue(queue_id_, action_[idx]);
}
queued_clock_.Store(clk_res);
retry = false;
}
} else {
retry = false;
}
}
lock_.Unlock();
} else {
const int idx = clock % Slices;
action_[idx] = Action(this, clock);
......@@ -121,9 +146,10 @@ class Process< Slices, Serial, Inputs<Slices, I1, I2, I3, I4, I5>,
InputsType inputs_;
OutputsType outputs_;
ExecutorType executor_;
int next_clock_;
Action action_[Slices];
SpinLock lock_;
embb::base::Atomic<int> next_clock_;
embb::base::Atomic<int> queued_clock_;
int queue_id_;
};
} // namespace internal
......
......@@ -38,6 +38,7 @@ class Scheduler {
Scheduler() {}
virtual ~Scheduler() {}
virtual void Spawn(Action & action) = 0;
virtual void Enqueue(int process_id, Action & action) = 0;
virtual void WaitForSlice(int slice) = 0;
};
......
......@@ -45,6 +45,16 @@ class SchedulerMTAPI : public Scheduler {
embb::mtapi::Group & group = node.CreateGroup();
group_[ii] = &group;
}
queue_count_ = static_cast<int>(node.GetWorkerThreadCount());
queue_ = reinterpret_cast<embb::mtapi::Queue**>(
embb::base::Allocation::Allocate(
sizeof(embb::mtapi::Queue*)*queue_count_));
for (int ii = 0; ii < queue_count_; ii++) {
embb::mtapi::Queue & queue = node.CreateQueue(0, true);
queue_[ii] = &queue;
}
}
virtual ~SchedulerMTAPI() {
embb::mtapi::Node & node = embb::mtapi::Node::GetInstance();
......@@ -52,17 +62,29 @@ class SchedulerMTAPI : public Scheduler {
group_[ii]->WaitAll(MTAPI_INFINITE);
node.DestroyGroup(*group_[ii]);
}
for (int ii = 0; ii < queue_count_; ii++) {
node.DestroyQueue(*queue_[ii]);
}
embb::base::Allocation::Free(queue_);
}
virtual void Spawn(Action & action) {
const int idx = action.GetClock() % Slices;
group_[idx]->Spawn(embb::base::MakeFunction(action, &Action::RunMTAPI));
}
virtual void Enqueue(int process_id, Action & action) {
const int idx = action.GetClock() % Slices;
const int queue_id = process_id % queue_count_;
queue_[queue_id]->Spawn(group_[idx],
embb::base::MakeFunction(action, &Action::RunMTAPI));
}
virtual void WaitForSlice(int slice) {
group_[slice]->WaitAll(MTAPI_INFINITE);
}
private:
embb::mtapi::Group * group_[Slices];
embb::mtapi::Queue ** queue_;
int queue_count_;
};
} // namespace internal
......
......@@ -41,6 +41,9 @@ class SchedulerSequential : public Scheduler {
virtual void Spawn(Action & action) {
action.RunSequential();
}
virtual void Enqueue(int, Action & action) {
action.RunSequential();
}
virtual void WaitForSlice(int /*slice*/) {}
};
......
......@@ -27,7 +27,7 @@
#ifndef EMBB_DATAFLOW_INTERNAL_SIGNAL_H_
#define EMBB_DATAFLOW_INTERNAL_SIGNAL_H_
#include <embb/dataflow/internal/spinlock.h>
#include <embb/base/c/atomic.h>
namespace embb {
namespace dataflow {
......@@ -42,27 +42,23 @@ class Signal {
Signal(Signal const & other)
: blank_(other.blank_), value_(other.value_), clock_(other.clock_) {}
void operator = (Signal const & rhs) {
lock_.Lock();
blank_ = rhs.blank_;
value_ = rhs.value_;
clock_ = rhs.clock_;
lock_.Unlock();
embb_atomic_memory_barrier();
}
int GetClock() const { return clock_; }
bool IsBlank() const { return blank_; }
Type const & GetValue() const { return value_; }
void Clear() {
lock_.Lock();
blank_ = true;
clock_ = -1;
lock_.Unlock();
}
private:
bool blank_;
Type value_;
int clock_;
SpinLock lock_;
};
} // namespace internal
......
......@@ -52,6 +52,8 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
explicit Sink(FunctionType function)
: executor_(function) {
next_clock_ = 0;
queued_clock_ = 0;
queue_id_ = GetNextProcessID();
inputs_.SetListener(this);
}
......@@ -80,32 +82,47 @@ class Sink< Slices, Inputs<Slices, I1, I2, I3, I4, I5> >
}
virtual void OnClock(int clock) {
TrySpawn(clock);
}
private:
InputsType inputs_;
ExecutorType executor_;
int next_clock_;
Action action_[Slices];
ClockListener * listener_;
SpinLock lock_;
void TrySpawn(int clock) {
if (!inputs_.AreAtClock(clock))
if (!inputs_.AreAtClock(clock)) {
EMBB_THROW(embb::base::ErrorException,
"Some inputs are not at expected clock.")
}
lock_.Lock();
for (int ii = next_clock_; ii < next_clock_ + Slices; ii++) {
bool retry = true;
while (retry) {
int clk = next_clock_;
int clk_end = clk + Slices;
int clk_res = clk;
for (int ii = clk; ii < clk_end; ii++) {
if (!inputs_.AreAtClock(ii)) {
break;
}
next_clock_ = ii + 1;
Run(ii);
clk_res++;
}
if (clk_res > clk) {
if (next_clock_.CompareAndSwap(clk, clk_res)) {
while (queued_clock_.Load() < clk);
for (int ii = clk; ii < clk_res; ii++) {
const int idx = ii % Slices;
action_[idx] = Action(this, ii);
sched_->Enqueue(queue_id_, action_[idx]);
}
lock_.Unlock();
queued_clock_.Store(clk_res);
retry = false;
}
} else {
retry = false;
}
}
}
private:
InputsType inputs_;
ExecutorType executor_;
Action action_[Slices];
ClockListener * listener_;
embb::base::Atomic<int> next_clock_;
embb::base::Atomic<int> queued_clock_;
int queue_id_;
};
} // namespace internal
......
......@@ -27,13 +27,9 @@
#ifndef EMBB_DATAFLOW_INTERNAL_SOURCE_H_
#define EMBB_DATAFLOW_INTERNAL_SOURCE_H_
#include <embb/base/atomic.h>
#include <embb/base/thread.h>
#include <embb/dataflow/internal/node.h>
#include <embb/dataflow/internal/outputs.h>
#include <embb/dataflow/internal/source_executor.h>
#include <embb/dataflow/internal/action.h>
namespace embb {
namespace dataflow {
......@@ -53,7 +49,6 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
explicit Source(FunctionType function)
: executor_(function), not_done_(true) {
next_clock_ = 0;
}
virtual bool HasOutputs() const {
......@@ -62,7 +57,6 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
virtual void Run(int clock) {
not_done_ = executor_.Execute(clock, outputs_);
next_clock_++;
}
virtual bool Start(int clock) {
......@@ -89,9 +83,7 @@ class Source< Slices, Outputs<Slices, O1, O2, O3, O4, O5> >
private:
OutputsType outputs_;
ExecutorType executor_;
Action action_[Slices];
volatile bool not_done_;
embb::base::Atomic<int> next_clock_;
};
} // namespace internal
......
......@@ -24,3 +24,6 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <embb/dataflow/internal/node.h>
int embb::dataflow::internal::Node::next_process_id_ = 0;
......@@ -60,7 +60,7 @@ bool sourceFunc(int & out) {
source_array[source_counter] = out;
source_counter++;
return source_counter < 12;
return source_counter < TEST_COUNT;
}
embb::base::Atomic<int> pred_counter;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment