Commit a90d311a by Marcus Winter

dataflow_cpp: ported from tasks_cpp to mtapi_cpp

parent bdfa7dc4
......@@ -17,16 +17,16 @@ include_directories(${EMBB_DATAFLOW_CPP_INCLUDE_DIRS}
${CMAKE_CURRENT_SOURCE_DIR}/../base_cpp/include
${CMAKE_CURRENT_BINARY_DIR}/../base_cpp/include
${CMAKE_CURRENT_SOURCE_DIR}/../mtapi_c/include
${CMAKE_CURRENT_SOURCE_DIR}/../tasks_cpp/include
${CMAKE_CURRENT_BINARY_DIR}/../tasks_cpp/include)
${CMAKE_CURRENT_SOURCE_DIR}/../mtapi_cpp/include
${CMAKE_CURRENT_BINARY_DIR}/../mtapi_cpp/include)
add_library (embb_dataflow_cpp ${EMBB_DATAFLOW_CPP_SOURCES} ${EMBB_DATAFLOW_CPP_HEADERS})
target_link_libraries(embb_dataflow_cpp embb_tasks_cpp embb_base_cpp embb_mtapi_c embb_base_c)
target_link_libraries(embb_dataflow_cpp embb_mtapi_cpp embb_base_cpp embb_mtapi_c embb_base_c)
if (BUILD_TESTS STREQUAL ON)
include_directories(${CMAKE_CURRENT_BINARY_DIR}/../partest/include)
add_executable (embb_dataflow_cpp_test ${EMBB_DATAFLOW_CPP_TEST_SOURCES})
target_link_libraries(embb_dataflow_cpp_test embb_dataflow_cpp embb_tasks_cpp embb_mtapi_c partest
target_link_libraries(embb_dataflow_cpp_test embb_dataflow_cpp embb_mtapi_cpp embb_mtapi_c partest
embb_base_cpp embb_base_c ${compiler_libs})
CopyBin(BIN embb_dataflow_cpp_test DEST ${local_install_dir})
endif()
......
......@@ -29,7 +29,7 @@
#include <cstddef>
#include <embb/tasks/task_context.h>
#include <embb/mtapi/task_context.h>
#include <embb/dataflow/internal/node.h>
......@@ -46,7 +46,7 @@ class Action {
node_->Run(clock_);
}
void RunMTAPI(embb::tasks::TaskContext & /*context*/) {
void RunMTAPI(embb::mtapi::TaskContext & /*context*/) {
node_->Run(clock_);
}
......
......@@ -156,7 +156,7 @@ class Process< Serial, Inputs<I1, I2, I3, I4, I5>,
} else {
const int idx = clock % slices_;
action_[idx] = Action(this, clock);
sched_->Spawn(action_[idx]);
sched_->Start(action_[idx]);
}
}
......
......@@ -37,7 +37,7 @@ class Scheduler {
public:
Scheduler() {}
virtual ~Scheduler() {}
virtual void Spawn(Action & action) = 0;
virtual void Start(Action & action) = 0;
virtual void Enqueue(int process_id, Action & action) = 0;
virtual void WaitForSlice(int slice) = 0;
};
......
......@@ -29,7 +29,7 @@
#include <embb/dataflow/internal/action.h>
#include <embb/dataflow/internal/scheduler.h>
#include <embb/tasks/node.h>
#include <embb/mtapi/mtapi.h>
#include <embb/base/function.h>
#include <algorithm>
......@@ -38,65 +38,86 @@ namespace embb {
namespace dataflow {
namespace internal {
#define EMBB_DATAFLOW_JOB_ID 1
class SchedulerMTAPI : public Scheduler {
public:
explicit SchedulerMTAPI(int slices)
: slices_(slices) {
embb::tasks::Node & node = embb::tasks::Node::GetInstance();
embb::mtapi::Node & node = embb::mtapi::Node::GetInstance();
job_ = node.GetJob(EMBB_DATAFLOW_JOB_ID);
action_ = node.CreateAction(EMBB_DATAFLOW_JOB_ID, SchedulerMTAPI::action_func);
group_ = reinterpret_cast<embb::tasks::Group**>(
group_ = reinterpret_cast<embb::mtapi::Group*>(
embb::base::Allocation::Allocate(
sizeof(embb::tasks::Group*)*slices_));
sizeof(embb::mtapi::Group)*slices_));
for (int ii = 0; ii < slices_; ii++) {
embb::tasks::Group & group = node.CreateGroup();
group_[ii] = &group;
group_[ii] = node.CreateGroup();
}
queue_count_ = std::min(
static_cast<int>(node.GetQueueCount()),
static_cast<int>(node.GetWorkerThreadCount()) );
queue_ = reinterpret_cast<embb::tasks::Queue**>(
queue_ = reinterpret_cast<embb::mtapi::Queue*>(
embb::base::Allocation::Allocate(
sizeof(embb::tasks::Queue*)*queue_count_));
sizeof(embb::mtapi::Queue)*queue_count_));
embb::mtapi::QueueAttributes queue_attr;
queue_attr
.SetPriority(0)
.SetOrdered(true);
for (int ii = 0; ii < queue_count_; ii++) {
embb::tasks::Queue & queue = node.CreateQueue(0, true);
queue_[ii] = &queue;
queue_[ii] = node.CreateQueue(job_, queue_attr);
}
}
virtual ~SchedulerMTAPI() {
if (embb::tasks::Node::IsInitialized()) {
if (embb::mtapi::Node::IsInitialized()) {
// only destroy groups and queues if there still is an instance
embb::tasks::Node & node = embb::tasks::Node::GetInstance();
for (int ii = 0; ii < slices_; ii++) {
group_[ii]->WaitAll(MTAPI_INFINITE);
node.DestroyGroup(*group_[ii]);
group_[ii].WaitAll(MTAPI_INFINITE);
group_[ii].Delete();
}
for (int ii = 0; ii < queue_count_; ii++) {
node.DestroyQueue(*queue_[ii]);
queue_[ii].Delete();
}
}
embb::base::Allocation::Free(group_);
embb::base::Allocation::Free(queue_);
}
virtual void Spawn(Action & action) {
virtual void Start(Action & action) {
const int idx = action.GetClock() % slices_;
group_[idx]->Spawn(embb::base::MakeFunction(action, &Action::RunMTAPI));
group_[idx].Start(job_, &action, static_cast<void*>(NULL));
}
virtual void Enqueue(int process_id, Action & action) {
const int idx = action.GetClock() % slices_;
const int queue_id = process_id % queue_count_;
queue_[queue_id]->Spawn(group_[idx],
embb::base::MakeFunction(action, &Action::RunMTAPI));
queue_[queue_id].Enqueue(&action, static_cast<void*>(NULL), group_[idx]);
}
virtual void WaitForSlice(int slice) {
group_[slice]->WaitAll(MTAPI_INFINITE);
group_[slice].WaitAll(MTAPI_INFINITE);
}
private:
embb::tasks::Group ** group_;
embb::tasks::Queue ** queue_;
static void action_func(
const void* args,
mtapi_size_t /*args_size*/,
void* /*result_buffer*/,
mtapi_size_t /*result_buffer_size*/,
const void* /*node_local_data*/,
mtapi_size_t /*node_local_data_size*/,
mtapi_task_context_t * context) {
Action * action =
reinterpret_cast<Action*>(const_cast<void*>(args));
embb::mtapi::TaskContext task_context(context);
action->RunMTAPI(task_context);
}
embb::mtapi::Action action_;
embb::mtapi::Job job_;
embb::mtapi::Group * group_;
embb::mtapi::Queue * queue_;
int queue_count_;
int slices_;
};
......
......@@ -38,7 +38,7 @@ class SchedulerSequential : public Scheduler {
public:
SchedulerSequential() {}
virtual ~SchedulerSequential() {}
virtual void Spawn(Action & action) {
virtual void Start(Action & action) {
action.RunSequential();
}
virtual void Enqueue(int, Action & action) {
......
......@@ -29,7 +29,7 @@
#include <iostream>
#include <sstream>
#include <embb/tasks/tasks.h>
#include <embb/mtapi/mtapi.h>
#include <embb/base/function.h>
#include <embb/base/c/memory_allocation.h>
......@@ -153,15 +153,14 @@ SimpleTest::SimpleTest() {
void SimpleTest::TestBasic() {
// All available cores
embb::base::CoreSet core_set(true);
embb::tasks::Node::Initialize(
embb::mtapi::NodeAttributes node_attr;
node_attr
.SetCoreAffinity(core_set)
.SetQueueLimit(2);
embb::mtapi::Node::Initialize(
MTAPI_DOMAIN_ID,
MTAPI_NODE_ID,
core_set,
1024, // max tasks (default: 1024)
128, // max groups (default: 128)
2, // max queues (default: 16)
1024, // queue capacity (default: 1024)
4); // num priorities (default: 4)
node_attr);
for (int ii = 0; ii < 10000; ii++) {
ArraySink<TEST_COUNT> asink;
......@@ -221,7 +220,7 @@ void SimpleTest::TestBasic() {
PT_EXPECT(asink.Check());
}
embb::tasks::Node::Finalize();
embb::mtapi::Node::Finalize();
PT_EXPECT(embb_get_bytes_allocated() == 0);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment