Commit 60cb6593 by Marcus Winter

Merge branch 'embb531_codesonar_fixes' into development

# Conflicts:
#	base_c/src/thread.c
#	mtapi_c/src/embb_mtapi_node_t.c
parents f459e5ef 5f59e812
......@@ -51,10 +51,14 @@ int embb_duration_set_nanoseconds(embb_duration_t* duration,
}
if (nanoseconds > 0) {
if (embb_duration_min()->nanoseconds > nanoseconds) {
duration->seconds = 0;
duration->nanoseconds = 0;
return EMBB_UNDERFLOW;
}
const embb_duration_t* max = embb_duration_max();
if (max->seconds * 1000000000 + max->nanoseconds < nanoseconds) {
duration->seconds = max->seconds;
duration->nanoseconds = max->nanoseconds;
return EMBB_OVERFLOW;
}
}
......@@ -70,10 +74,14 @@ int embb_duration_set_microseconds(embb_duration_t* duration,
}
if (microseconds > 0) {
if (embb_duration_min()->nanoseconds > microseconds*1000) {
duration->seconds = 0;
duration->nanoseconds = 0;
return EMBB_UNDERFLOW;
}
const embb_duration_t* max = embb_duration_max();
if (max->seconds * 1000000 + max->nanoseconds / 1000 < microseconds) {
duration->seconds = max->seconds;
duration->nanoseconds = max->nanoseconds;
return EMBB_OVERFLOW;
}
}
......@@ -89,10 +97,14 @@ int embb_duration_set_milliseconds(embb_duration_t* duration,
}
if (milliseconds > 0) {
if (embb_duration_min()->nanoseconds > milliseconds*1000000) {
duration->seconds = 0;
duration->nanoseconds = 0;
return EMBB_UNDERFLOW;
}
const embb_duration_t* max = embb_duration_max();
if (max->seconds * 1000 + max->nanoseconds / 1000000 < milliseconds) {
duration->seconds = max->seconds;
duration->nanoseconds = max->nanoseconds;
return EMBB_OVERFLOW;
}
}
......@@ -108,10 +120,14 @@ int embb_duration_set_seconds(embb_duration_t* duration,
}
if (seconds > 0) {
if (embb_duration_min()->nanoseconds > seconds*1000000000) {
duration->seconds = 0;
duration->nanoseconds = 0;
return EMBB_UNDERFLOW;
}
const embb_duration_t* max = embb_duration_max();
if (max->seconds + max->nanoseconds / 1000000000 < seconds) {
duration->seconds = max->seconds;
duration->nanoseconds = max->nanoseconds;
return EMBB_OVERFLOW;
}
}
......@@ -126,6 +142,8 @@ int embb_duration_add(embb_duration_t* lhs, const embb_duration_t* rhs) {
}
int carry = (int)((lhs->nanoseconds + rhs->nanoseconds) / 1000000000);
if (lhs->seconds + rhs->seconds + carry > EMBB_DURATION_MAX_SECONDS) {
lhs->seconds = 0;
lhs->nanoseconds = 0;
return EMBB_OVERFLOW;
}
lhs->nanoseconds = (lhs->nanoseconds + rhs->nanoseconds) % 1000000000;
......
......@@ -90,7 +90,6 @@ void embb_log_write_internal(
case EMBB_LOG_LEVEL_NONE:
default:
log_level_str = " ";
break;
}
#if defined(EMBB_PLATFORM_COMPILER_MSVC)
......
......@@ -85,7 +85,10 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
}
thread->embb_internal_arg = (embb_internal_thread_arg_t*)
embb_alloc(sizeof(embb_internal_thread_arg_t));
if (thread->embb_internal_arg == NULL) return EMBB_NOMEM;
if (thread->embb_internal_arg == NULL) {
thread->embb_internal_handle = NULL;
return EMBB_NOMEM;
}
thread->embb_internal_arg->func = func;
thread->embb_internal_arg->arg = arg;
......@@ -97,6 +100,8 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
0, /* no creation arguments */
0); /* no system thread ID */
if (thread->embb_internal_handle == NULL) {
embb_free(thread->embb_internal_arg);
thread->embb_internal_arg = NULL;
return EMBB_ERROR;
}
......@@ -234,7 +239,11 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
}
}
status = pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset);
if (status != 0) return EMBB_ERROR;
if (status != 0) {
thread->embb_internal_arg = NULL;
thread->embb_internal_handle = NULL;
return EMBB_ERROR;
}
#else
embb_log_write("base_c", EMBB_LOG_LEVEL_WARNING, "Could not set thread "
"affinity, since no implementation available!\n");
......@@ -244,6 +253,11 @@ int embb_thread_create(embb_thread_t* thread, const embb_core_set_t* core_set,
/* Dynamic allocation of thread arguments. Freed on call of join. */
thread->embb_internal_arg = (embb_internal_thread_arg_t*)
embb_alloc(sizeof(embb_internal_thread_arg_t));
if (thread->embb_internal_arg == NULL) {
thread->embb_internal_handle = NULL;
pthread_attr_destroy(&attr);
return EMBB_NOMEM;
}
thread->embb_internal_arg->func = func;
thread->embb_internal_arg->arg = arg;
......@@ -265,11 +279,14 @@ int embb_thread_join(embb_thread_t* thread, int *result_code) {
return EMBB_ERROR;
}
int status = 0;
if (thread == NULL) return EMBB_ERROR;
status = pthread_join(thread->embb_internal_handle, NULL);
if (thread->embb_internal_arg != NULL) {
if (result_code != NULL) {
*result_code = thread->embb_internal_arg->result;
}
embb_free(thread->embb_internal_arg);
}
if (status != 0) {
return EMBB_ERROR;
}
......
......@@ -76,5 +76,7 @@ void* embb_tss_get(const embb_tss_t* tss) {
void embb_tss_delete(embb_tss_t* tss) {
assert(tss != NULL);
if (tss->values != NULL) {
embb_free_aligned(tss->values);
}
}
......@@ -152,6 +152,9 @@ void AllocTest::TestMixedAllocs() {
void* plain = NULL;
plain = embb_alloc(2);
PT_EXPECT_NE(plain, static_cast<void*>(NULL));
if (NULL == plain) {
return;
}
allocated = embb_get_bytes_allocated();
#ifdef EMBB_DEBUG
expected += 2 + 2*sizeof(size_t);
......@@ -162,6 +165,10 @@ void AllocTest::TestMixedAllocs() {
void* aligned = NULL;
aligned = embb_alloc_aligned(2*sizeof(void*), 2);
PT_EXPECT_NE(aligned, static_cast<void*>(NULL));
if (NULL == aligned) {
embb_free(plain);
return;
}
allocated = embb_get_bytes_allocated();
#ifdef EMBB_DEBUG
expected += (1 + 1) * 2 * sizeof(void*) + 3 * sizeof(size_t) - 1;
......@@ -172,6 +179,11 @@ void AllocTest::TestMixedAllocs() {
void* cache_aligned = NULL;
cache_aligned = embb_alloc_cache_aligned(2);
PT_EXPECT_NE(cache_aligned, static_cast<void*>(NULL));
if (NULL == cache_aligned) {
embb_free(plain);
embb_free_aligned(aligned);
return;
}
allocated = embb_get_bytes_allocated();
#ifdef EMBB_DEBUG
expected += (1 + 1) * EMBB_PLATFORM_CACHE_LINE_SIZE + 3 * sizeof(size_t) - 1;
......
......@@ -50,7 +50,11 @@ void ThreadSpecificStorageTest::Test() {
size_t rank = partest::TestSuite::GetCurrentThreadID();
void* value = embb_tss_get(&tss_);
if (value == NULL) {
int status = embb_tss_set(&tss_, new size_t(rank));
size_t * prank = new size_t(rank);
int status = embb_tss_set(&tss_, prank);
if (EMBB_SUCCESS != status) {
delete prank;
}
PT_EXPECT_EQ(status, EMBB_SUCCESS);
} else {
size_t stored_rank = *static_cast<size_t*>(value);
......
......@@ -173,10 +173,6 @@ AtomicTest::TestStressSwap::TestStressSwap(
size_t number_threads, size_t number_iterations)
: TestUnit("Swap Stress test for Atomics"), swap1_counter(1)
, swap2_counter(2) {
bitsets[0] = std::bitset<ATOMIC_TESTS_ITERATIONS * 2 + 1>();
bitsets[1] = std::bitset<ATOMIC_TESTS_ITERATIONS * 2 + 1>();
bitsets[2] = std::bitset<ATOMIC_TESTS_ITERATIONS * 2 + 1>();
PT_ASSERT(number_threads == 1);
Pre(&TestStressSwap::Init, this);
......
......@@ -56,6 +56,7 @@ void LogTest::Test() {
logged_message = null;
Log::Trace("chn", test_msg);
#ifdef EMBB_DEBUG
PT_ASSERT_NE(logged_message, null);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [TRACE] hello"));
#else
PT_EXPECT_EQ(null, logged_message);
......@@ -63,15 +64,18 @@ void LogTest::Test() {
logged_message = null;
Log::Info("chn", test_msg);
#ifdef EMBB_DEBUG
PT_ASSERT_NE(logged_message, null);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [INFO ] hello"));
#else
PT_EXPECT_EQ(null, logged_message);
#endif
logged_message = null;
Log::Warning("chn", test_msg);
PT_ASSERT_NE(logged_message, null);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [WARN ] hello"));
logged_message = null;
Log::Error("chn", test_msg);
PT_ASSERT_NE(logged_message, null);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [ERROR] hello"));
Log::SetLogLevel(EMBB_LOG_LEVEL_INFO);
......@@ -81,15 +85,18 @@ void LogTest::Test() {
logged_message = null;
Log::Info("chn", test_msg);
#ifdef EMBB_DEBUG
PT_ASSERT_NE(logged_message, null);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [INFO ] hello"));
#else
PT_EXPECT_EQ(null, logged_message);
#endif
logged_message = null;
Log::Warning("chn", test_msg);
PT_ASSERT_NE(logged_message, null);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [WARN ] hello"));
logged_message = null;
Log::Error("chn", test_msg);
PT_ASSERT_NE(logged_message, null);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [ERROR] hello"));
Log::SetLogLevel(EMBB_LOG_LEVEL_WARNING);
......@@ -101,9 +108,11 @@ void LogTest::Test() {
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
Log::Warning("chn", test_msg);
PT_ASSERT_NE(logged_message, null);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [WARN ] hello"));
logged_message = null;
Log::Error("chn", test_msg);
PT_ASSERT_NE(logged_message, null);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [ERROR] hello"));
Log::SetLogLevel(EMBB_LOG_LEVEL_ERROR);
......@@ -118,6 +127,7 @@ void LogTest::Test() {
PT_EXPECT_EQ(null, logged_message);
logged_message = null;
Log::Error("chn", test_msg);
PT_ASSERT_NE(logged_message, null);
PT_EXPECT(0 == strcmp(logged_message, "[chn] - [ERROR] hello"));
Log::SetLogLevel(EMBB_LOG_LEVEL_NONE);
......
......@@ -123,8 +123,10 @@ bool LockFreeStack< Type, ValuePool >::TryPop(Type & element) {
top_cached = top;
// Stack empty, cannot pop
if (top_cached == NULL)
if (top_cached == NULL) {
element = Type();
return false;
}
// Guard top_cached
hazardPointer.Guard(0, top_cached);
......
......@@ -113,14 +113,17 @@ allocate_rec(int node, Type& element) {
int pool_index = NodeIndexToPoolIndex(node);
Type expected = pool_[pool_index];
if (expected == Undefined)
if (expected == Undefined) {
element = Type();
return -1;
}
if (pool_[pool_index].CompareAndSwap(expected, Undefined)) {
element = expected;
return pool_index;
}
element = Type();
return -1;
}
......@@ -133,8 +136,10 @@ allocate_rec(int node, Type& element) {
do {
current = tree_[node];
desired = current - 1;
if (desired < 0)
if (desired < 0) {
element = Type();
return -1;
}
} while (!tree_[node].CompareAndSwap(current, desired));
int leftResult = allocate_rec(GetLeftChildIndex(node), element);
......
......@@ -55,6 +55,7 @@ Allocate(Type & element) {
return i;
}
}
element = Type();
return -1;
}
......
......@@ -144,6 +144,7 @@ void HazardPointerTest::HazardPointerTest1ThreadMethod() {
for (int i = 0; i != n_elements_per_thread_; ++i) {
embb::base::Atomic<int>* allocated_object = object_pool_->Allocate(0);
PT_ASSERT(NULL != allocated_object);
hazard_pointer_->Guard(0, allocated_object);
......@@ -210,8 +211,9 @@ void HazardPointerTest2::DeletePointerCallback(int* to_delete) {
}
bool HazardPointerTest2::SetRelativeGuards() {
unsigned int thread_index;
embb_internal_thread_index(&thread_index);
unsigned int thread_index = 0;
int result = embb_internal_thread_index(&thread_index);
PT_ASSERT(EMBB_SUCCESS == result);
unsigned int my_begin = guards_per_phread_count_*thread_index;
int guard_number = 0;
......@@ -247,6 +249,7 @@ void HazardPointerTest2::HazardPointerTest2Master() {
// while the hazard pointer guard array is not full
int** allocatedLocal = static_cast<int**>(
embb::base::Allocation::Allocate(sizeof(int*)*guaranteed_capacity_pool_));
PT_ASSERT(NULL != allocatedLocal);
bool full = false;
while (!full) {
......@@ -294,16 +297,19 @@ void HazardPointerTest2::HazardPointerTest2Pre() {
// first the test pool has to be created
test_pool_ = embb::base::Allocation::New<IntObjectTestPool>
(pool_size_using_hazard_pointer_);
PT_ASSERT(NULL != test_pool_);
// after the pool has been created, we create the hp class
hazard_pointer_ = embb::base::Allocation::New <
embb::containers::internal::HazardPointer<int*> >
(delete_pointer_callback_, static_cast<int*>(NULL),
static_cast<int>(guards_per_phread_count_), n_threads);
PT_ASSERT(NULL != hazard_pointer_);
shared_guarded_ = static_cast<embb::base::Atomic<int*>*>(
embb::base::Allocation::Allocate(sizeof(embb::base::Atomic<int*>)*
guaranteed_capacity_pool_));
PT_ASSERT(NULL != shared_guarded_);
for (unsigned int i = 0; i != guaranteed_capacity_pool_; ++i) {
// in-place new for each array cell
......@@ -450,8 +456,9 @@ void HazardPointerTest2::HazardPointerTest2Post() {
void HazardPointerTest2::HazardPointerTest2ThreadMethod() {
for (;;) {
unsigned int thread_index;
embb_internal_thread_index(&thread_index);
unsigned int thread_index = 0;
int result = embb_internal_thread_index(&thread_index);
PT_ASSERT(EMBB_SUCCESS == result);
if (thread_index == current_master_) {
HazardPointerTest2Master();
......
......@@ -279,6 +279,7 @@ void mtapi_group_wait_any(
MTAPI_IN mtapi_timeout_t timeout,
MTAPI_OUT mtapi_status_t* status) {
mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
void* local_result = MTAPI_NULL;
embb_mtapi_log_trace("mtapi_group_wait_any() called\n");
......@@ -335,10 +336,7 @@ void mtapi_group_wait_any(
}
/* was there a timeout, or is there a result? */
if (MTAPI_NULL != local_task) {
/* store result */
if (MTAPI_NULL != result) {
*result = local_task->result_buffer;
}
local_result = local_task->result_buffer;
/* return error code set by the task */
local_status = local_task->error_code;
......@@ -356,6 +354,11 @@ void mtapi_group_wait_any(
local_status = MTAPI_ERR_NODE_NOTINIT;
}
/* store result */
if (MTAPI_NULL != result) {
*result = local_result;
}
mtapi_status_set(status, local_status);
embb_mtapi_log_trace("mtapi_group_wait_any() returns\n");
}
......
......@@ -36,14 +36,19 @@ void embb_mtapi_id_pool_initialize(
mtapi_uint_t capacity) {
mtapi_uint_t ii;
that->capacity = capacity;
that->id_buffer = (mtapi_uint_t*)
embb_mtapi_alloc_allocate(sizeof(mtapi_uint_t)*(capacity + 1));
if (NULL != that->id_buffer) {
that->capacity = capacity;
that->id_buffer[0] = EMBB_MTAPI_IDPOOL_INVALID_ID;
for (ii = 1; ii <= capacity; ii++) {
that->id_buffer[ii] = ii;
}
that->ids_available = capacity;
} else {
that->capacity = 0;
that->ids_available = 0;
}
that->put_id_position = 0;
that->get_id_position = 1;
embb_spin_init(&that->lock);
......
......@@ -41,6 +41,9 @@
mtapi_boolean_t embb_mtapi_job_initialize_list(embb_mtapi_node_t * node) {
node->job_list = (embb_mtapi_job_t*)embb_mtapi_alloc_allocate(
sizeof(embb_mtapi_job_t)*(node->attributes.max_jobs + 1));
if (NULL == node->job_list) {
return MTAPI_FALSE;
}
mtapi_uint_t ii;
for (ii = 0; ii <= node->attributes.max_jobs; ii++) {
embb_mtapi_job_initialize(
......@@ -112,12 +115,16 @@ void embb_mtapi_job_initialize(
that->domain_id = 0;
that->node_id = 0;
that->num_actions = 0;
that->max_actions = max_actions;
that->actions = (mtapi_action_hndl_t*)
embb_mtapi_alloc_allocate(sizeof(mtapi_action_hndl_t)*max_actions);
if (NULL != that->actions) {
that->max_actions = max_actions;
for (ii = 0; ii < max_actions; ii++) {
that->actions[ii].id = EMBB_MTAPI_IDPOOL_INVALID_ID;
}
} else {
that->max_actions = 0;
}
}
void embb_mtapi_job_finalize(embb_mtapi_job_t * that) {
......@@ -159,7 +166,7 @@ void embb_mtapi_job_remove_action(
embb_mtapi_action_t * action) {
assert(MTAPI_NULL != that);
assert(MTAPI_NULL != action);
mtapi_uint_t ii = 0;
mtapi_uint_t ii;
for (ii = 0; ii + 1 < that->num_actions; ii++) {
if (that->actions[ii].id == action->handle.id &&
......
......@@ -117,7 +117,16 @@ void mtapi_initialize(
node->attributes.max_tasks);
node->queue_pool = embb_mtapi_queue_pool_new(
node->attributes.max_queues);
if (MTAPI_NULL == node->job_list ||
MTAPI_NULL == node->action_pool ||
MTAPI_NULL == node->group_pool ||
MTAPI_NULL == node->task_pool ||
MTAPI_NULL == node->queue_pool) {
mtapi_finalize(NULL);
local_status = MTAPI_ERR_NODE_INITFAILED;
}
if (local_status == MTAPI_SUCCESS) {
/* initialize scheduler for local node */
node->scheduler = embb_mtapi_scheduler_new();
if (MTAPI_NULL != node->scheduler) {
......@@ -136,16 +145,11 @@ void mtapi_initialize(
/* initialization succeeded, tell workers to start working */
embb_atomic_store_int(&node->is_scheduler_running, MTAPI_TRUE);
if (MTAPI_SUCCESS != local_status) {
mtapi_finalize(MTAPI_NULL);
local_status = MTAPI_ERR_NODE_INITFAILED;
}
} else {
mtapi_finalize(MTAPI_NULL);
local_status = MTAPI_ERR_NODE_INITFAILED;
}
}
} else {
embb_mtapi_alloc_deallocate(node);
local_status = MTAPI_ERR_PARAMETER;
......@@ -171,19 +175,29 @@ void mtapi_finalize(MTAPI_OUT mtapi_status_t* status) {
}
/* finalize storage in reverse order */
if (MTAPI_NULL != node->queue_pool) {
embb_mtapi_queue_pool_delete(node->queue_pool);
node->queue_pool = MTAPI_NULL;
}
if (MTAPI_NULL != node->task_pool) {
embb_mtapi_task_pool_delete(node->task_pool);
node->task_pool = MTAPI_NULL;
}
if (MTAPI_NULL != node->group_pool) {
embb_mtapi_group_pool_delete(node->group_pool);
node->group_pool = MTAPI_NULL;
}
if (MTAPI_NULL != node->action_pool) {
embb_mtapi_action_pool_delete(node->action_pool);
node->action_pool = MTAPI_NULL;
}
if (MTAPI_NULL != node->job_list) {
embb_mtapi_job_finalize_list(node);
}
/* free system instance */
embb_mtapi_alloc_deallocate(node);
......
......@@ -61,6 +61,7 @@ mtapi_boolean_t embb_mtapi_##TYPE##_pool_initialize( \
embb_mtapi_id_pool_initialize(&that->id_pool, capacity); \
that->storage = (embb_mtapi_##TYPE##_t*)embb_mtapi_alloc_allocate( \
sizeof(embb_mtapi_##TYPE##_t)*(capacity + 1)); \
if (NULL != that->storage) { \
for (ii = 0; ii <= capacity; ii++) { \
that->storage[ii].handle.id = EMBB_MTAPI_IDPOOL_INVALID_ID; \
that->storage[ii].handle.tag = 0; \
......@@ -68,6 +69,10 @@ mtapi_boolean_t embb_mtapi_##TYPE##_pool_initialize( \
/* use entry 0 as invalid */ \
embb_mtapi_##TYPE##_initialize(that->storage); \
return MTAPI_TRUE; \
} else { \
that->id_pool.ids_available = 0; \
return MTAPI_FALSE; \
} \
} \
\
void embb_mtapi_##TYPE##_pool_finalize(embb_mtapi_##TYPE##_pool_t * that) { \
......
......@@ -325,7 +325,7 @@ mtapi_queue_hndl_t mtapi_queue_get(
if (embb_mtapi_node_is_initialized()) {
embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
mtapi_uint_t ii = 0;
mtapi_uint_t ii;
local_status = MTAPI_ERR_QUEUE_INVALID;
for (ii = 0; ii < node->attributes.max_queues; ii++) {
......
......@@ -78,8 +78,7 @@ embb_mtapi_task_t * embb_mtapi_scheduler_get_next_task_vhpf(
embb_mtapi_node_t * node,
embb_mtapi_thread_context_t * thread_context) {
embb_mtapi_task_t * task = MTAPI_NULL;
mtapi_uint_t ii = 0;
mtapi_uint_t kk = 0;
mtapi_uint_t ii;
assert(MTAPI_NULL != that);
assert(MTAPI_NULL != node);
......@@ -102,6 +101,7 @@ embb_mtapi_task_t * embb_mtapi_scheduler_get_next_task_vhpf(
*/
mtapi_uint_t context_index =
(thread_context->worker_index + 1) % that->worker_count;
mtapi_uint_t kk;
for (kk = 0;
kk < that->worker_count - 1 && MTAPI_NULL == task;
kk++) {
......@@ -121,8 +121,7 @@ embb_mtapi_task_t * embb_mtapi_scheduler_get_next_task_lf(
embb_mtapi_node_t * node,
embb_mtapi_thread_context_t * thread_context) {
embb_mtapi_task_t * task = MTAPI_NULL;
mtapi_uint_t prio = 0;
mtapi_uint_t kk = 0;
mtapi_uint_t prio;
assert(MTAPI_NULL != that);
assert(MTAPI_NULL != node);
......@@ -153,6 +152,7 @@ embb_mtapi_task_t * embb_mtapi_scheduler_get_next_task_lf(
prio++) {
mtapi_uint_t context_index =
(thread_context->worker_index + 1) % that->worker_count;
mtapi_uint_t kk;
for (kk = 0;
kk < that->worker_count - 1 && MTAPI_NULL == task;
kk++) {
......@@ -195,7 +195,7 @@ embb_mtapi_task_t * embb_mtapi_scheduler_get_next_task(
embb_mtapi_thread_context_t * embb_mtapi_scheduler_get_current_thread_context(
embb_mtapi_scheduler_t * that) {
mtapi_uint_t ii = 0;
mtapi_uint_t ii;
embb_mtapi_thread_context_t * context = NULL;
assert(MTAPI_NULL != that);
......@@ -434,7 +434,7 @@ mtapi_boolean_t embb_mtapi_scheduler_initialize_with_mode(
embb_mtapi_scheduler_t * that,
embb_mtapi_scheduler_mode_t mode) {
embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
mtapi_uint_t ii = 0;
mtapi_uint_t ii;
embb_mtapi_log_trace("embb_mtapi_scheduler_initialize() called\n");
......@@ -456,6 +456,10 @@ mtapi_boolean_t embb_mtapi_scheduler_initialize_with_mode(
that->worker_contexts = (embb_mtapi_thread_context_t*)
embb_mtapi_alloc_allocate(
sizeof(embb_mtapi_thread_context_t)*that->worker_count);
if (NULL == that->worker_contexts) {
return MTAPI_FALSE;
}
mtapi_boolean_t isinit = MTAPI_TRUE;
for (ii = 0; ii < that->worker_count; ii++) {
unsigned int core_num = 0;
mtapi_uint_t ll = 0;
......@@ -467,9 +471,12 @@ mtapi_boolean_t embb_mtapi_scheduler_initialize_with_mode(
}
core_num++;
}
embb_mtapi_thread_context_initialize_with_node_worker_and_core(
isinit &= embb_mtapi_thread_context_initialize_with_node_worker_and_core(
&that->worker_contexts[ii], node, ii, core_num);
}
if (!isinit) {
return MTAPI_FALSE;
}
for (ii = 0; ii < that->worker_count; ii++) {
if (MTAPI_FALSE == embb_mtapi_thread_context_start(
&that->worker_contexts[ii], that)) {
......@@ -481,11 +488,12 @@ mtapi_boolean_t embb_mtapi_scheduler_initialize_with_mode(
}
void embb_mtapi_scheduler_finalize(embb_mtapi_scheduler_t * that) {
mtapi_uint_t ii = 0;
mtapi_uint_t ii;
embb_mtapi_log_trace("embb_mtapi_scheduler_finalize() called\n");
assert(MTAPI_NULL != that);
if (MTAPI_NULL != that->worker_contexts) {
/* finalize all workers */
for (ii = 0; ii < that->worker_count; ii++) {
embb_mtapi_thread_context_stop(&that->worker_contexts[ii]);
......@@ -497,6 +505,7 @@ void embb_mtapi_scheduler_finalize(embb_mtapi_scheduler_t * that) {
that->worker_count = 0;
embb_mtapi_alloc_deallocate(that->worker_contexts);
that->worker_contexts = MTAPI_NULL;
}
}
embb_mtapi_scheduler_t * embb_mtapi_scheduler_new() {
......@@ -506,6 +515,7 @@ embb_mtapi_scheduler_t * embb_mtapi_scheduler_new() {
if (MTAPI_NULL != that) {
if (MTAPI_FALSE == embb_mtapi_scheduler_initialize(that)) {
/* on error delete and return MTAPI_NULL */
embb_mtapi_scheduler_finalize(that);
embb_mtapi_scheduler_delete(that);
return MTAPI_NULL;
}
......
......@@ -38,12 +38,13 @@
/* ---- CLASS MEMBERS ------------------------------------------------------ */
void embb_mtapi_thread_context_initialize_with_node_worker_and_core(
mtapi_boolean_t embb_mtapi_thread_context_initialize_with_node_worker_and_core(
embb_mtapi_thread_context_t* that,
embb_mtapi_node_t* node,
mtapi_uint_t worker_index,
mtapi_uint_t core_num) {
mtapi_uint_t ii;
mtapi_boolean_t result = MTAPI_TRUE;
assert(MTAPI_NULL != that);
assert(MTAPI_NULL != node);
......@@ -52,25 +53,55 @@ void embb_mtapi_thread_context_initialize_with_node_worker_and_core(
that->worker_index = worker_index;
that->core_num = core_num;
that->priorities = node->attributes.max_priorities;
that->is_initialized = MTAPI_FALSE;
embb_atomic_store_int(&that->run, 0);
that->queue = (embb_mtapi_task_queue_t**)embb_mtapi_alloc_allocate(
sizeof(embb_mtapi_task_queue_t)*that->priorities);
that->private_queue = (embb_mtapi_task_queue_t**)embb_mtapi_alloc_allocate(
sizeof(embb_mtapi_task_queue_t)*that->priorities);
if (that->queue == NULL) {
that->private_queue = NULL;
return MTAPI_FALSE;
}
for (ii = 0; ii < that->priorities; ii++) {
that->queue[ii] = (embb_mtapi_task_queue_t*)
embb_mtapi_alloc_allocate(sizeof(embb_mtapi_task_queue_t));
if (that->queue[ii] != NULL) {
embb_mtapi_task_queue_initialize_with_capacity(
that->queue[ii], node->attributes.queue_limit);
} else {
result = MTAPI_FALSE;
}
}
if (!result) {
return MTAPI_FALSE;
}
that->private_queue = (embb_mtapi_task_queue_t**)embb_mtapi_alloc_allocate(
sizeof(embb_mtapi_task_queue_t)*that->priorities);
if (that->private_queue == NULL) {
return MTAPI_FALSE;
}
for (ii = 0; ii < that->priorities; ii++) {
that->private_queue[ii] = (embb_mtapi_task_queue_t*)
embb_mtapi_alloc_allocate(sizeof(embb_mtapi_task_queue_t));
if (that->private_queue[ii] != NULL) {
embb_mtapi_task_queue_initialize_with_capacity(
that->private_queue[ii], node->attributes.queue_limit);
} else {
result = MTAPI_FALSE;
}
}
if (!result) {
return MTAPI_FALSE;
}
embb_mutex_init(&that->work_available_mutex, EMBB_MUTEX_PLAIN);
embb_condition_init(&that->work_available);
embb_atomic_store_int(&that->is_sleeping, 0);
that->is_initialized = MTAPI_TRUE;
return MTAPI_TRUE;
}
mtapi_boolean_t embb_mtapi_thread_context_start(
......@@ -126,22 +157,37 @@ void embb_mtapi_thread_context_finalize(embb_mtapi_thread_context_t* that) {
embb_mtapi_log_trace("embb_mtapi_thread_context_finalize() called\n");
if (that->is_initialized) {
embb_condition_destroy(&that->work_available);
embb_mutex_destroy(&that->work_available_mutex);
}
if (that->queue != NULL) {
for (ii = 0; ii < that->priorities; ii++) {
if (that->queue[ii] != NULL) {
embb_mtapi_task_queue_finalize(that->queue[ii]);
embb_mtapi_alloc_deallocate(that->queue[ii]);
that->queue[ii] = MTAPI_NULL;
}
}
embb_mtapi_alloc_deallocate(that->queue);
that->queue = MTAPI_NULL;
}
if (that->private_queue != NULL) {
for (ii = 0; ii < that->priorities; ii++) {
if (that->private_queue[ii] != NULL) {
embb_mtapi_task_queue_finalize(that->private_queue[ii]);
embb_mtapi_alloc_deallocate(that->private_queue[ii]);
that->private_queue[ii] = MTAPI_NULL;
}
embb_mtapi_alloc_deallocate(that->queue);
that->queue = MTAPI_NULL;
}
embb_mtapi_alloc_deallocate(that->private_queue);
that->private_queue = MTAPI_NULL;
}
that->priorities = 0;
that->is_initialized = MTAPI_FALSE;
that->node = MTAPI_NULL;
}
......
......@@ -67,6 +67,7 @@ struct embb_mtapi_thread_context_struct {
mtapi_uint_t core_num;
embb_atomic_int run;
mtapi_status_t status;
mtapi_boolean_t is_initialized;
};
#include <embb_mtapi_thread_context_t_fwd.h>
......@@ -74,8 +75,9 @@ struct embb_mtapi_thread_context_struct {
/**
* Constructor using attributes from node and a given core number.
* \memberof embb_mtapi_thread_context_struct
* \returns MTAPI_TRUE if successful, MTAPI_FALSE on error
*/
void embb_mtapi_thread_context_initialize_with_node_worker_and_core(
mtapi_boolean_t embb_mtapi_thread_context_initialize_with_node_worker_and_core(
embb_mtapi_thread_context_t* that,
embb_mtapi_node_t* node,
mtapi_uint_t worker_index,
......
......@@ -107,6 +107,7 @@ int embb_mtapi_network_buffer_pop_front_int8(
embb_mtapi_network_buffer_t * that,
int8_t * value) {
if (that->position + 1 > that->size) {
*value = 0;
return 0;
}
memcpy(value, that->data + that->position, 1);
......@@ -118,6 +119,7 @@ int embb_mtapi_network_buffer_pop_front_int16(
embb_mtapi_network_buffer_t * that,
int16_t * value) {
if (that->position + 2 > that->size) {
*value = 0;
return 0;
}
memcpy(value, that->data + that->position, 2);
......@@ -129,6 +131,7 @@ int embb_mtapi_network_buffer_pop_front_int32(
embb_mtapi_network_buffer_t * that,
int32_t * value) {
if (that->position + 4 > that->size) {
*value = 0;
return 0;
}
memcpy(value, that->data + that->position, 4);
......@@ -141,6 +144,7 @@ int embb_mtapi_network_buffer_pop_front_rawdata(
int32_t size,
void * rawdata) {
if (that->position + size > that->size) {
memset(rawdata, 0, (size_t)size);
return 0;
}
memcpy(rawdata, that->data + that->position, (size_t)size);
......
......@@ -71,13 +71,6 @@ int embb_mtapi_network_socket_bind_and_listen(
uint16_t port,
uint16_t max_connections) {
struct sockaddr_in in_addr;
int reuseaddr_on = 1;
// addr reuse
if (SOCKET_ERROR == setsockopt(that->handle, SOL_SOCKET, SO_REUSEADDR,
(const char *)&reuseaddr_on, sizeof(reuseaddr_on))) {
return 0;
}
// bind & listen
memset(&in_addr, 0, sizeof(in_addr));
......
......@@ -126,7 +126,7 @@ void Node::Initialize(
mtapi_nodeattr_set(&attr, MTAPI_NODE_MAX_ACTIONS,
&tmp, sizeof(tmp), &status);
assert(MTAPI_SUCCESS == status);
tmp = 4;
// tmp = 4;
mtapi_nodeattr_set(&attr, MTAPI_NODE_MAX_JOBS,
&tmp, sizeof(tmp), &status);
assert(MTAPI_SUCCESS == status);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment