From a90d311adfc7770952d1099eed8a0db78aa76fec Mon Sep 17 00:00:00 2001 From: Marcus Winter Date: Tue, 5 Apr 2016 16:41:53 +0200 Subject: [PATCH] dataflow_cpp: ported from tasks_cpp to mtapi_cpp --- dataflow_cpp/CMakeLists.txt | 8 ++++---- dataflow_cpp/include/embb/dataflow/internal/action.h | 4 ++-- dataflow_cpp/include/embb/dataflow/internal/process.h | 2 +- dataflow_cpp/include/embb/dataflow/internal/scheduler.h | 2 +- dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h | 65 +++++++++++++++++++++++++++++++++++++++++++---------------------- dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h | 2 +- dataflow_cpp/test/dataflow_cpp_test_simple.cc | 17 ++++++++--------- 7 files changed, 60 insertions(+), 40 deletions(-) diff --git a/dataflow_cpp/CMakeLists.txt b/dataflow_cpp/CMakeLists.txt index 657d4bd..133d066 100644 --- a/dataflow_cpp/CMakeLists.txt +++ b/dataflow_cpp/CMakeLists.txt @@ -17,16 +17,16 @@ include_directories(${EMBB_DATAFLOW_CPP_INCLUDE_DIRS} ${CMAKE_CURRENT_SOURCE_DIR}/../base_cpp/include ${CMAKE_CURRENT_BINARY_DIR}/../base_cpp/include ${CMAKE_CURRENT_SOURCE_DIR}/../mtapi_c/include - ${CMAKE_CURRENT_SOURCE_DIR}/../tasks_cpp/include - ${CMAKE_CURRENT_BINARY_DIR}/../tasks_cpp/include) + ${CMAKE_CURRENT_SOURCE_DIR}/../mtapi_cpp/include + ${CMAKE_CURRENT_BINARY_DIR}/../mtapi_cpp/include) add_library (embb_dataflow_cpp ${EMBB_DATAFLOW_CPP_SOURCES} ${EMBB_DATAFLOW_CPP_HEADERS}) -target_link_libraries(embb_dataflow_cpp embb_tasks_cpp embb_base_cpp embb_mtapi_c embb_base_c) +target_link_libraries(embb_dataflow_cpp embb_mtapi_cpp embb_base_cpp embb_mtapi_c embb_base_c) if (BUILD_TESTS STREQUAL ON) include_directories(${CMAKE_CURRENT_BINARY_DIR}/../partest/include) add_executable (embb_dataflow_cpp_test ${EMBB_DATAFLOW_CPP_TEST_SOURCES}) - target_link_libraries(embb_dataflow_cpp_test embb_dataflow_cpp embb_tasks_cpp embb_mtapi_c partest + target_link_libraries(embb_dataflow_cpp_test embb_dataflow_cpp embb_mtapi_cpp embb_mtapi_c partest embb_base_cpp embb_base_c ${compiler_libs}) CopyBin(BIN embb_dataflow_cpp_test DEST ${local_install_dir}) endif() diff --git a/dataflow_cpp/include/embb/dataflow/internal/action.h b/dataflow_cpp/include/embb/dataflow/internal/action.h index 61f4cfa..4b6d0b7 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/action.h +++ b/dataflow_cpp/include/embb/dataflow/internal/action.h @@ -29,7 +29,7 @@ #include -#include +#include #include @@ -46,7 +46,7 @@ class Action { node_->Run(clock_); } - void RunMTAPI(embb::tasks::TaskContext & /*context*/) { + void RunMTAPI(embb::mtapi::TaskContext & /*context*/) { node_->Run(clock_); } diff --git a/dataflow_cpp/include/embb/dataflow/internal/process.h b/dataflow_cpp/include/embb/dataflow/internal/process.h index 1dfade2..25fd664 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/process.h +++ b/dataflow_cpp/include/embb/dataflow/internal/process.h @@ -156,7 +156,7 @@ class Process< Serial, Inputs, } else { const int idx = clock % slices_; action_[idx] = Action(this, clock); - sched_->Spawn(action_[idx]); + sched_->Start(action_[idx]); } } diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h index 9f86e96..7aaba7c 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler.h @@ -37,7 +37,7 @@ class Scheduler { public: Scheduler() {} virtual ~Scheduler() {} - virtual void Spawn(Action & action) = 0; + virtual void Start(Action & action) = 0; virtual void Enqueue(int process_id, Action & action) = 0; virtual void WaitForSlice(int slice) = 0; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h index 856b4a2..f901841 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_mtapi.h @@ -29,7 +29,7 @@ #include #include -#include +#include #include #include @@ -38,65 +38,86 @@ namespace embb { namespace dataflow { namespace internal { +#define EMBB_DATAFLOW_JOB_ID 1 + class SchedulerMTAPI : public Scheduler { public: explicit SchedulerMTAPI(int slices) : slices_(slices) { - embb::tasks::Node & node = embb::tasks::Node::GetInstance(); + embb::mtapi::Node & node = embb::mtapi::Node::GetInstance(); + + job_ = node.GetJob(EMBB_DATAFLOW_JOB_ID); + action_ = node.CreateAction(EMBB_DATAFLOW_JOB_ID, SchedulerMTAPI::action_func); - group_ = reinterpret_cast( + group_ = reinterpret_cast( embb::base::Allocation::Allocate( - sizeof(embb::tasks::Group*)*slices_)); + sizeof(embb::mtapi::Group)*slices_)); for (int ii = 0; ii < slices_; ii++) { - embb::tasks::Group & group = node.CreateGroup(); - group_[ii] = &group; + group_[ii] = node.CreateGroup(); } queue_count_ = std::min( static_cast(node.GetQueueCount()), static_cast(node.GetWorkerThreadCount()) ); - queue_ = reinterpret_cast( + queue_ = reinterpret_cast( embb::base::Allocation::Allocate( - sizeof(embb::tasks::Queue*)*queue_count_)); + sizeof(embb::mtapi::Queue)*queue_count_)); + embb::mtapi::QueueAttributes queue_attr; + queue_attr + .SetPriority(0) + .SetOrdered(true); for (int ii = 0; ii < queue_count_; ii++) { - embb::tasks::Queue & queue = node.CreateQueue(0, true); - queue_[ii] = &queue; + queue_[ii] = node.CreateQueue(job_, queue_attr); } } virtual ~SchedulerMTAPI() { - if (embb::tasks::Node::IsInitialized()) { + if (embb::mtapi::Node::IsInitialized()) { // only destroy groups and queues if there still is an instance - embb::tasks::Node & node = embb::tasks::Node::GetInstance(); for (int ii = 0; ii < slices_; ii++) { - group_[ii]->WaitAll(MTAPI_INFINITE); - node.DestroyGroup(*group_[ii]); + group_[ii].WaitAll(MTAPI_INFINITE); + group_[ii].Delete(); } for (int ii = 0; ii < queue_count_; ii++) { - node.DestroyQueue(*queue_[ii]); + queue_[ii].Delete(); } } embb::base::Allocation::Free(group_); embb::base::Allocation::Free(queue_); } - virtual void Spawn(Action & action) { + virtual void Start(Action & action) { const int idx = action.GetClock() % slices_; - group_[idx]->Spawn(embb::base::MakeFunction(action, &Action::RunMTAPI)); + group_[idx].Start(job_, &action, static_cast(NULL)); } virtual void Enqueue(int process_id, Action & action) { const int idx = action.GetClock() % slices_; const int queue_id = process_id % queue_count_; - queue_[queue_id]->Spawn(group_[idx], - embb::base::MakeFunction(action, &Action::RunMTAPI)); + queue_[queue_id].Enqueue(&action, static_cast(NULL), group_[idx]); } virtual void WaitForSlice(int slice) { - group_[slice]->WaitAll(MTAPI_INFINITE); + group_[slice].WaitAll(MTAPI_INFINITE); } private: - embb::tasks::Group ** group_; - embb::tasks::Queue ** queue_; + static void action_func( + const void* args, + mtapi_size_t /*args_size*/, + void* /*result_buffer*/, + mtapi_size_t /*result_buffer_size*/, + const void* /*node_local_data*/, + mtapi_size_t /*node_local_data_size*/, + mtapi_task_context_t * context) { + Action * action = + reinterpret_cast(const_cast(args)); + embb::mtapi::TaskContext task_context(context); + action->RunMTAPI(task_context); + } + + embb::mtapi::Action action_; + embb::mtapi::Job job_; + embb::mtapi::Group * group_; + embb::mtapi::Queue * queue_; int queue_count_; int slices_; }; diff --git a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h index 575fea3..841e1dd 100644 --- a/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h +++ b/dataflow_cpp/include/embb/dataflow/internal/scheduler_sequential.h @@ -38,7 +38,7 @@ class SchedulerSequential : public Scheduler { public: SchedulerSequential() {} virtual ~SchedulerSequential() {} - virtual void Spawn(Action & action) { + virtual void Start(Action & action) { action.RunSequential(); } virtual void Enqueue(int, Action & action) { diff --git a/dataflow_cpp/test/dataflow_cpp_test_simple.cc b/dataflow_cpp/test/dataflow_cpp_test_simple.cc index c73a0f1..71f58eb 100644 --- a/dataflow_cpp/test/dataflow_cpp_test_simple.cc +++ b/dataflow_cpp/test/dataflow_cpp_test_simple.cc @@ -29,7 +29,7 @@ #include #include -#include +#include #include #include @@ -153,15 +153,14 @@ SimpleTest::SimpleTest() { void SimpleTest::TestBasic() { // All available cores embb::base::CoreSet core_set(true); - embb::tasks::Node::Initialize( + embb::mtapi::NodeAttributes node_attr; + node_attr + .SetCoreAffinity(core_set) + .SetQueueLimit(2); + embb::mtapi::Node::Initialize( MTAPI_DOMAIN_ID, MTAPI_NODE_ID, - core_set, - 1024, // max tasks (default: 1024) - 128, // max groups (default: 128) - 2, // max queues (default: 16) - 1024, // queue capacity (default: 1024) - 4); // num priorities (default: 4) + node_attr); for (int ii = 0; ii < 10000; ii++) { ArraySink asink; @@ -221,7 +220,7 @@ void SimpleTest::TestBasic() { PT_EXPECT(asink.Check()); } - embb::tasks::Node::Finalize(); + embb::mtapi::Node::Finalize(); PT_EXPECT(embb_get_bytes_allocated() == 0); } -- libgit2 0.26.0